The spring-boot-starter dependency is the core starter, it includes auto-configuration, logging, and YAML support. The spring-boot-starter-test includes the dependencies for testing Spring Boot applications with libraries that include JUnit, Hamcrest and Mockito.
A dependency on spring-kafka is added. We also include spring-kafka-test to have access to an embedded Kafka broker when running our unit test.
Note that the version of Spring Kafka is linked to the version of the Apache Kafka client that is used. You need to align the version of Spring Kafka to the version of the Kafka broker you connect to. For more information consult the complete Kafka client compatibility list.
In the plugins section, you’ll find the Spring Boot Maven Plugin: spring-boot-maven-plugin. It allows us to build a single, runnable “uber-jar”. This is a convenient way to execute and transport code. Also, the plugin allows you to start the example via a Maven command.
4. Spring Boot Setup
We use Spring Boot so that we have a Spring Kafka application that you can “just run”. Start by creating a SpringKafkaApplication class. It contains the main() method that uses Spring Boot’s SpringApplication.run() to launch the application.
Note that @SpringBootApplication is a convenience annotation that adds: @Configuration, @EnableAutoConfiguration, and @ComponentScan.
The below sections will detail how to create a sender and receiver together with their respective configurations. It is also possible to have Spring Boot autoconfigure Spring Kafka using default values so that actual code that needs to be written is reduced to a bare minimum.
This example will send/receive a simple String. If you would like to send more complex objects you could, for example, use an Avro Kafka serializer or the Kafka Jsonserializer that ships with Spring Kafka.
We also create an application.ymlYAML properties file under src/main/resources. Properties from this file will be injected by Spring Boot into our configuration beans using the @Value annotation.
5. Create a Spring Kafka Message Producer
For sending messages we will be using the KafkaTemplate which wraps a Producer and provides convenience methods to send data to Kafka topics. The template provides asynchronous send methods which return a ListenableFuture.
In the Sender class, the KafkaTemplate is auto-wired as the creation will be done further below in a separate SenderConfig class.
For this example, we will use the send() method that takes as input a String payload that needs to be sent.
Note that the Kafka broker default settings cause it to auto-create a topic when a request for an unknown topic is received.
The creation of the KafkaTemplate and Sender is handled in the SenderConfig class. The class is annotated with @Configuration which indicates that the class can be used by the Spring IoC container as a source of bean definitions.
In order to be able to use the Spring Kafka template, we need to configure a ProducerFactory and provide it in the template’s constructor.
The producer factory needs to be set with some mandatory properties amongst which the 'BOOTSTRAP_SERVERS_CONFIG' property that specifies a list of host:port pairs used for establishing the initial connections to the Kafka cluster. Note that this value is configurable as it is fetched from the application.yml configuration file.
A message in Kafka is a key-value pair with a small amount of associated metadata. As Kafka stores and transports Byte arrays, we need to specify the format from which the key and value will be serialized. In this example we are sending a String as payload, as such we specify the StringSerializer class which will take care of the needed transformation.
Like with any messaging-based application, you need to create a receiver that will handle the published messages. The Receiver is nothing more than a simple POJO that defines a method for receiving messages. In the below example we named the method receive(), but you can name it anything you like.
The @KafkaListener annotation creates a ConcurrentMessageListenerContainer message listener container behind the scenes for each annotated method. To do so, a factory bean with name kafkaListenerContainerFactory is expected that we will configure in the next section.
Using the topics element, we specify the topics for this listener.
For more information on the other available elements on the KafkaListener, you can consult the API documentation.
For testing convenience, we added a CountDownLatch. This allows the POJO to signal that a message is received. This is something you are not likely to implement in a production application.
The creation and configuration of the different Spring Beans needed for the Receiver POJO are grouped in the ReceiverConfig class. Similar to the SenderConfig it is annotated with @Configuration.
Note the @EnableKafka annotation which enables the detection of the @KafkaListener annotation that was used on the previous Receiver class.
The kafkaListenerContainerFactory() is used by the @KafkaListener annotation from the Receiver in order to configure a MessageListenerContainer. To create it, a ConsumerFactory and accompanying configuration Map is needed.
In this example, a number of mandatory properties are set amongst which the initial connection and deserializer parameters.
We also specify a 'GROUP_ID_CONFIG' which allows to identify the group this consumer belongs to. Messages will be load balanced over consumer instances that have the same group id.
On top of that, we also set 'AUTO_OFFSET_RESET_CONFIG' to "earliest". This ensures that our consumer reads from the beginning of the topic even if some messages were already sent before it was able to startup.
A basic SpringKafkaApplicationTest is provided to verify that we are able to send and receive a message to and from Apache Kafka. It contains a testReceiver() unit test case that uses the Sender bean to send a message to the 'helloworld.t' topic on the Kafka bus.
We then check if the CountDownLatch from the Receiver was lowered from 1 to 0 as this indicates a message was processed by the receive() method.
An embedded Kafka broker is started by using the @EmbeddedKafka annotation.
As the embedded server is started on a random port, we provide a dedicated src/test/resources/apppication.yml properties file for testing which uses the spring.embedded.kafka.brokers system property to set the correct address of the broker(s).
Below test case can also be executed after you install Kafka and Zookeeper on your local system. Just comment out @EmbeddedKafka and change the 'bootstrap-servers' property of the application properties file located in src/test/resources to the address of the local broker.
In order to run above test case, open a command prompt in the project root folder and execute following Maven command:
The result is a successful build during which a Hello World message is sent and received using Kafka.
If you would like to run the above code sample you can get the full source code here.
In this getting started tutorial you learned how to create a Spring Kafka template and Spring Kafka listener to send/receive messages.
If you found this sample useful or have a question you would like to ask, drop a line below!