Support Category: Select
Anypoint Connector for Microsoft Azure Service Bus 2.0.0
Anypoint Connector for Microsoft Azure Service Bus (Azure Service Bus Connector) enables message integration with Azure Service Bus on the cloud. This connector supports communication with queues and topics. In addition, you can use the built-in management API to discover Azure Service Bus objects automatically and to provision them.
Prerequisites
To use Azure Service Bus Connector, you must be familiar with Mule, Anypoint Connectors, Anypoint Studio, Mule concepts, elements in a Mule flow, and global elements.
You need login credentials to test your connection with your target resource. You can use Basic Authentication, or you can pass credentials in a connection string.
For software requirements and compatibility information, see the connector’s release notes.
Install the Connector in Anypoint Studio
-
In Anypoint Studio, click the Exchange icon in the Studio task bar.
-
Click Login in Anypoint Exchange.
-
Search for the connector and click Install.
-
Follow the prompts to install the connector.
Configure the Connector
To start configuring the connector, you first need to select and configure an input source. Azure Service Bus Connector supports three input sources:
-
HTTP connector, which receives messages from a web server. Using the HTTP connector enables you to test the connector.
-
Queue listener, which receives messages from an Azure queue.
-
Subscription listener, which receives messages from an Azure subscription.
The following steps use an HTTP connector as an in input source. For information on using the Queue listener and Subscription listener, see Input Sources.
-
In the Mule Palette, search for HTTP.
-
Drag the HTTP connector onto the Studio canvas.
-
Click + next to the Connector Configuration field to configure a global element for the connector.
-
Accept the defaults by clicking OK.
-
In the Mule Palette, search for Azure.
-
Select the Microsoft Azure Service Bus Connector.
-
Drag the connector next to the HTTP connector on the canvas.
-
Click + next to the Connector Configuration field to configure a global element for the connector.
-
Select an authentication type:
-
Basic authentication
With Basic authentication, applications gain access to Azure Service Bus resources by providing a shared access key in the Azure Service Bus namespace. The shared access key enables user access to Azure Service Bus resources without having to store user account access keys in the application.
Field Description Name
Name of the shared access signature.
Service namespace
Name of the service namespace to address Service Bus resources within your application.
Shared access key name
Name of the access key configured in the Azure Service Bus namespace. An access key created at a lower level, such as a topic-level shared key, does not work with this option unless you disable the connectivity test at startup.
Shared access key
56-bit primary key used to authorize access to storage resources.
-
Connection string
With Connection string authorization, the application passes a string that includes the information used to access Azure Service Bus resources.
Field Description Name
Name of the connection string.
Connection string
Link provided by the Microsoft Azure portal that contains values for the Endpoint, SharedAccessKeyName, and SharedAccessKey parameters. For example:
Endpoint=Endpoint=sb://samplenamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=kHuIoiu79jbjuNgHYJKbn7698BtjKohGuKMouGHyJkX=
Service namespace
Name of the service namespace to address Service Bus resources within your application.
-
Input Sources
Azure Service Bus Connector has two input source operations:
Queue Listener Configuration
Use the Queue listener input source when you want the app to receive messages from an Azure queue. Configure the source as follows:
Field | Description |
---|---|
Queue name |
Queue to receive events. To receive events from the dead-letter
queue, enter |
Receive mode |
|
Server timeout (ms) |
Maximum duration, in milliseconds, within which the client keeps renewing the message lock if the processing of the message is not completed by the handler. |
Prefetch count |
When |
CRON expression |
(Optional) UNIX CRON expression that specifies when to trigger
the receiver action. For example, setting this field to |
Max. messages to receive |
Maximum number of messages to receive during the scheduled operation. |
Subscription Listener Configuration
Use the * Subscription listener* input source when you want the app to receive messages from an Azure subscription. Configure the source as follows:
Field | Description |
---|---|
Topic |
Name of the topic to connect to. |
Subscription |
Subscription to receive events. To receive events from the dead
letter queue, specify |
Receive mode |
|
Server timeout (ms) |
Maximum duration within which the client keeps renewing the message lock if message processing is not completed by the handler. |
Prefetch count |
When |
CRON Expression |
(Optional) UNIX CRON expression that specifies when to trigger
the receiver action. For example, setting this field to |
Max. messages to receive |
Maximum number of messages to receive during the scheduled operation. |
[][examples]] == Examples
This topic contains the following examples:
Send a Message to a Queue or Topic
This example configures a Mule app to send a message to an Azure Service Bus queue or topic. The following diagram shows the flow for this example:
-
Create a new Mule app in Studio.
-
In the Mule Palette, search for HTTP.
-
Drag the HTTP connector onto the Studio canvas.
-
Select the HTTP connector.
-
Click + next to the Connector Configuration field to configure a global element for the connector.
-
Leave the defaults field values and click OK.
-
In the Mule Palette, search for Set Payload.
-
Drag the Set Payload transformer next to the HTTP connector on the Studio canvas.
-
In the Value field, enter a message value for the queue.
-
In the Mule Palette, search for Azure.
-
Drag the Microsoft Azure Service Bus Connector next to the Set Payload transformer on the Studio canvas.
-
Click + next to the Connector Configuration field for the Microsoft Azure Service Bus connector to configure a global element for the it. Select and configure authentication for the connector, as described in Configure the Connector.
-
On the Microsoft Azure Bus Service properties dialog, configure the connector as follows:
-
Set the Operation value to 'Publish message' and the Destination type to
QUEUE
orTOPIC
. -
Enter the value for Destination name.
-
-
Because the Publish message operation has no return value, use another Set Payload transformer to return a message:
#["message sent"]
.
When you call http://localhost:8081/send
, the connector
uploads the message to the queue.
Receive Messages from a Queue or Topic with a Manual Acknowledge Message
This example configures a Mule app to send an acknowledgment after receiving a message from a queue or topic. The following diagram shows the flow for this example:
-
Create a new Mule app in Studio.
-
Add a Microsoft Azure Service Bus connector as the flow source, and configure the connector:
-
In the Operation field, select Queue listener or Subscription listener.
-
Click + next to the Connector Configuration field to configure a global element for the connector. For more information about configuring a global element, see Configure the Connector.
-
To send messages to a queue, enter the name of the queue in the Queue name field.
-
To send messages to a topic, enter the name of the topic in the Topic field and the name of the subscription in the Subscription field.
-
-
Set the Receive mode field to
MANUAL
. -
Drag a Record Variable, Session Variable, or Variable transformer next to the Microsoft Azure Service Bus on the canvas.
-
Store the lock token value from the received message using this expression:
#[message.inboundProperties.get('lockToken')]
-
For testing purposes, perform an action with the message, such as logging it by using a Logger component.
-
Add another Microsoft Azure Service Bus connector next to the variable component on the canvas, and select the Acknowledge message operation.
-
For the Lock field value, retrieve the stored
lockToken
value.
The connector processes every message received and acknowledges each message that is processed successfully.
To have the connector automatically acknowledge messages, set the Receive mode field to AUTOMATIC
.
Schedule the Reception of Messages
The following example configures a Mule app that receives messages from a queue or topic on a specified schedule:
Create a Mule app to receive messages from a queue or topic, as described in Send a Message to a Queue or Topic.
When you create the app:
-
Set the CRON Expression field to a CRON expression that follows the UNIX standard. For example, to connect and receive messages every day at 8 AM, use
0 8 * * *
. -
Set the Max. Messages to receive field to the maximum number of messages for the connector to receive every time the CRON expression triggers a receive.
The connector executes a batch receive every time the CRON expression commands it. Any number of available messages up to the maximum set are retrieved and processed by the flow.
Send Multiple Messages to a Queue
The following example configures a Mule app to send multiple messages to a queue:
-
Create a Mule app to receive messages from a queue or topic, as described in Send a Message to a Queue or Topic.
-
Replace the Publish message operation with the Publish batch of messages operation. Make sure the Destination type and Destination name are properly set.
-
When crafting the message, be sure to create a comma-separated list of strings, because the app sends many messages simultaneously.
To split a comma-separated string of messages separated into separate messages, drag a Transform message transformer to the canvas and use the following script:
%dw 1.0
%output application/java
payload splitBy ','
Schedule the Logical Enqueuing of a Message
The following example configures a Mule app to send a message to all listeners at a specified time:
-
Create a Mule app to receive messages from a queue or topic, as described in Send a Message to a Queue or Topic.
-
In the properties window for Azure Service Bus Connector, enter a value for the Scheduled enqueue time UTC" field with the date or time when you want the message to be dispatched. For example:
2019-06-27T21:16:46.866Z
.The message is sent to the destination, which will dispatch the message at the specified time.
Supported REST Operations
Azure Service Bus Connector supports the following Azure Service Bus REST API operations:
-
Queue
-
Create
-
Get
-
Get Size
-
List
-
Update
-
Delete
-
-
Topic
-
Create
-
Get
-
List
-
Update
-
Delete
-
-
Subscription
-
Create
-
Get
-
Get Size
-
List
-
Update
-
Delete
-
-
Rule
-
Create
-
Get
-
List
-
Update
-
Delete
-
Additional Configuration Information
Queue and Subscription Listener Operations
For the Queue listener and Subscription listener operations, the output payload is a string with the message. The connector returns additional attributes in the outbound properties.
Restricted Access Policies
In situations where your resources can only send and receive amqp messages, enable the option AMQP-only credentials inside the Advanced tab.
Handling a Very High Number of Messages
If Mule processes a very high number of messages (such as thousands per second), especially if it publishes to and reads from the same destination in one Mule app, you might see messages like the following in the log:
[ERROR 2019-05-22 00:15:26,362 [ForkJoinPool.commonPool-worker-3]
com.microsoft.azure.servicebus.MessageAndSessionPump: onMessage with
message containing sequence number '95' threw exception com.mulesoft.connectors.microsoft.azure.servicebus.internal.error.exception.ConsumerException: Failed
setting attributes from original API message.]
The error can occur because Mule processes errors asynchronously by default.
To fix this error, select queue-asynchronous
when you
configure the message listener in a flow. If a message waits for more
than 30,000 ms to be processed by the flow, Mule throws an exception
that causes an error (timeout in the Mule SEDA queue).
To avoid this error, create a custom queue-asynchronic configuration and do either or both of the following:
-
Increase the number of threads in the
maxThreads
property (default 16). -
Increase the waiting time in the
threadWaitTimeout`property
(default 30,000 ms).
To modify the configuration:
-
Select the flow.
-
Select General → Optional Processing Strategy → Processing Strategy Ref.
-
Click +.
-
Add a new Queued Asynchronous Processing Strategy.
For more information, see Flow Processing Strategies.
Random Errors When Using a Manual ACK
You might receive errors like this one when working with a high number of messages while a using manual ACK:
[com.mulesoft.connectors.microsoft.azure.servicebus.internal.error.exception.ConsumerException: Message
with lockToken 2dc1312f-b263-4282-bbb0-566998eff6e6 could not be ACK at com.mulesoft.connectors.microsoft.azure.servicebus.internal.connection.Connection.ack(Connection.java:192)]
[ERROR 2019-05-22 00:17:30,822 [ReactorThread6f355ff5-5a36-487b-bb70-1a995a6ddf74]
com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Completing pending
updateState operation for delivery '? ? &??I??=????' with exception com.microsoft.azure.servicebus.primitives.MessageLockLostException: The lock supplied
is invalid. Either the lock expired, or the message has already been removed from the queue.]
When the connector performs a prefetch, the lockToken’s validity time is fixed in relation to that moment. The problem arises when the lockToken’s validity time is not long enough to process the entire batch of records. In these cases, Mule might throw an error because the lockToken expired before you do an ACK.
To prevent this issue, reduce the size of the prefetch (default 100), and increase the validity time of the lock token, or both. You can do this in the lock duration property when creating the queue or subscription from the connector or from the Azure portal (for existing queues). The maximum duration value for the lock token in Azure is 5 minutes (300 seconds).
Reduce Noise in Mule Apps Logs
In some circumstances, the underlying library used by the connector might
regularly log complete stack traces with level WARN. These messages
do not represent an issue, but they can clutter the logs. To reduce
the noise in the logs, make either of the following changes to the
src/main/resources/log4j2.xml
file:
-
Specify an AsyncLogger for the library’s package, raising the level of the log:
<!-- Recommended for packages not belonging to Mule -->
<AsyncLogger name="com.microsoft.azure.servicebus" level="ERROR"/>
-
Add a RegexFilter to the existing appender (that is, RollingFile):
<!-- Avoids the log of messages that contain the specified regexp -->
<RegexFilter regex = ".message to filter.*" onMatch="DENY" onMismatch="ACCEPT"/>
Important
|
If the message is multiline, RegexFilter might not work correctly. |
Listeners (Sources) in Network Disconnection and Reconnection Scenarios
When there are network connection problems, you can direct the source components for the Azure Service Bus connector to automatically execute reconnection attempts by specifying a reconnection strategy. You can specify a reconnection strategy when you create or edit the connector configuration:
-
In the General tab of the connector configuration, select + next to the Connector Configuration field.
-
Select one of the Connector Configuration options.
-
In the Reconnection tab, select Standard Reconnection.
-
Optionally, specify a nondefault value for the Frequency (ms) and Reconnection Attempts fields.
Eventual Send Operation Timed Out
When there are micro-cuts in the network connection, asynchronous message-sending flows might experience exceptions and message losses.
This happens because Azure TimeoutException exceptions (com.microsoft.azure.servicebus.primitives.TimeoutException
)
are not affected by connector reconnection strategies. Therefore, if
an exception is generated within a thread pool that executes calls
asynchronously, the exception does not use a catch exception strategy.
To implement this strategy, you must use XML to program a solution
in the Mule app flow.
Connector Namespace and Schema
When designing your application in Anypoint Studio, dragging the connector from the Mule Palette view onto the Anypoint Studio canvas automatically populates the XML code with the connector namespace and schema location:
-
Namespace:
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
-
Schema location:
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
If you are manually coding the Mule application in the Studio XML editor
or another text editor, paste these statements into the header of your
configuration XML, inside the <mule>
tag:
<mule
...
xmlns:microsoft-azure-service-bus="http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus"
...
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
...
...">