The Kafka Connector will allow to connect to Apache Kafka messaging system. The technical details of this connection are all abstracted from the user to make implementation quick and easy.

Additional Info

Requires Mule Enterprise License

Yes  

Requires Entitlement

No  

Mule Version

3.7.0 or higher

Configs


Configuration

<apachekafka:config>

Configuration Kafka connector configuration. Here you can configure properties to be used by connector in order to establish connection with Kafka server.

Attributes

Name Java Type Description Default Value Required

name

String

The name of this configuration. With this name can be later referenced.

x 

bootstrapServers

String

Comma separated host/port pairs to use for establishing the initial connection to the Kafka cluster. This is the same as "bootstrap.servers" that you have to provide to Kafka clients (producer/consumer). If this property is provided with producer/consumer properties files this value is ignored and the one from properties file is used.

 

producerPropertiesFile

String

Path to properties file where you can customize producer. It is similar to what you provide to Kafka command line tools. If you do not specify a value for "bootstrap.servers" within properties file, the value provided with "Bootstrap Servers" is going to be used. Also if you do not specify a value for "key.serializer" and "value.serializer" they will be set to org.apache.kafka.common.serialization.StringSerializer.

 

consumerPropertiesFile

String

Path to properties file where you can customize consumer. It is similar to what you provide to Kafka command line tools. If you do not specify a value for "bootstrap.servers" within properties file, the value provided with "Bootstrap Servers" is going to be used. Also if you do not specify a value for "key.serializer" and "value.serializer" they will be set to org.apache.kafka.common.serialization.StringDeserializer.

 

Processors


Producer

<apachekafka:producer>

Operation that facilitates Kafka messages sending into the given topic.

XML Sample

<apachekafka:producer config-ref="kafka-default-config" topic="testTopic" key="messageKey" message="This is a test message"/>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

message

String

- message that is going to be sent

#[payload]

 

key

String

- key of the message that is going to be sent

x 

topic

String

- topic to send the message to

x 

Sources


Consumer

<apachekafka:consumer>

Operation that facilitates Kafka messages consumption from the given topic.

XML Sample

<apachekafka:consumer config-ref="kafka-default-config" topic="testTopic" partitions="1"/>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

callback

SourceCallback

- callback used by operation to fed consumed messages

x 

topic

String

- name of Kafka topic to consumer messages from

x 

partitions

int

- number of partitions assigned at topic creation

x 

Returns

Return Java Type Description

StopSourceCallback

stop callback that will shutdown the consumer