The Kafka Connector allows you to connect to the Apache Kafka messaging system. The technical details of this connection are all abstracted from the user to make implementation quick and easy.

Additional Info

Requires Mule Enterprise License

Yes  

Requires Entitlement

No  

Mule Version

3.7.0 or higher

Configs


Configuration

<apachekafka:config>

Configuration Kafka connector configuration. Here you can configure properties for connector in order to establish the connection with Kafka server.

Attributes

Name Java Type Description Default Value Required

name

String

The name of this configuration. With this name can be later referenced.

x 

bootstrapServers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. This is the same as the "bootstrap.servers" value you must provide to Kafka clients (producer/consumer). If this property is provided with producer/consumer properties files, this value is ignored and the one from the properties file is used.

 

producerPropertiesFile

String

Path to properties file where you can customize producer. It is similar to what you provide to Kafka command line tools. If you do not specify a value for "bootstrap.servers" within the properties file, the value provided with "Bootstrap Servers" is going to be used. Also if you do not specify a value for "key.serializer" and "value.serializer" they will be set to org.apache.kafka.common.serialization.StringSerializer.

 

consumerPropertiesFile

String

Path to properties file where you can customize consumer. This is similar to what you provide to Kafka command line tools. If you do not specify a value for "bootstrap.servers" within properties file, the value provided here with "Bootstrap Servers" is going to be used. If you do not specify a value for "key.serializer" and "value.serializer", these will be set to org.apache.kafka.common.serialization.StringDeserializer.

 

Processors


Producer

<apachekafka:producer>

Operation that facilitates Kafka messages sending into the given topic.

XML Sample

<apachekafka:producer config-ref="kafka-default-config" topic="testTopic" key="messageKey" message="This is a test message"/>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

message

String

Message to be sent

#[payload]

 

key

String

Key belonging to the message that is going to be sent

x 

topic

String

Topic to send the message to

x 

Sources


Consumer

<apachekafka:consumer>

Operation that facilitates Kafka message consumption of messages from a given topic.

XML Sample

<apachekafka:consumer config-ref="kafka-default-config" topic="testTopic" partitions="1"/>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

callback

SourceCallback

Callback used by operation to feed consumed messages

x 

topic

String

Name of Kafka topic to consume messages from

x 

partitions

int

Number of partitions assigned at topic creation

x 

Returns

Return Java Type Description

StopSourceCallback

A callback that will shut down the consumer