Spring Kafka - Spring Boot Example
Table of Contents
Spring Boot auto-configuration attempts to automatically configure your Spring application based on the JAR dependencies that have been added. In other words, if the spring-kafka-1.2.2.RELEASE.jar
is on the classpath and you have not manually configured any Consumer
or Provider
beans, then Spring Boot will auto-configure them using default values.
In order to demonstrate this behavior we will start from a previous Spring Kafka tutorial in which we send/receive messages to/from an Apache Kafka topic using Spring Kafka. The original code will be reduced to a bare minimum in order to demonstrate Spring Boot’s autoconfiguration.
General Project Setup #
Tools used:
- Spring Kafka 1.2
- Spring Boot 1.5
- Maven 3.5
The project is built using Maven. The Maven POM file contains the needed dependencies for Spring Boot and Spring Kafka as shown below.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.codenotfound</groupId>
<artifactId>spring-kafka-boot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-kafka-boot</name>
<description>Spring Kafka - Spring Boot Example</description>
<url>https://www.codenotfound.com/spring-kafka-boot-example.html</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- spring-boot-maven-plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The SpringKafkaApplication
remains unchanged. What is important to note is that in order for the auto-configuration to work we need to opt-in by adding the @EnableAutoConfiguration
or @SpringBootApplication
(which is same as adding @Configuration
@EnableAutoConfiguration
@ComponentScan
) annotation to one of our @Configuration
classes.
You should only ever add one
@EnableAutoConfiguration
annotation. It is recommended to add it to your primary@Configuration
class.
package com.codenotfound.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
}
Autoconfigure the Spring Kafka Message Producer #
The setup and creation of the KafkaTemplate
and Producer
beans is automatically done by Spring Boot. The only things left to do are auto-wiring the KafkaTemplate
and using it in the send()
method.
By annotating the
Sender
class with@Component
, Spring will instantiate this class as a bean that we will use in our test case. In order for this to work, we also need the@EnableAutoConfiguration
which was indirectly specified onSpringKafkaApplication
by using the@SpringBootApplication
annotation.
package com.codenotfound.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
Autoconfigure the Spring Kafka Message Consumer #
Similar to the Sender
, the setup and creation of the ConcurrentKafkaListenerContainerFactory
and KafkaMessageListenerContainer
beans is automatically done by Spring Boot.
The @KafkaListener
annotation creates a message listener container for the annotated receive()
method. The topic name is specified using the ${kafka.topic.boot}
placeholder for which the value will be automatically fetched from the application.yml
properties file.
package com.codenotfound.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.boot}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
latch.countDown();
}
}
For the Receiver
, Spring Boot takes care of most of the configuration. There are however two properties that need to be explicitly set in the application.yml
properties file:
- The
kafka.consumer.auto-offset-reset
property needs to be set to'earliest'
which ensures the new consumer group will get the message sent in case the container started after the send was completed. - The
kafka.consumer.group-id
property needs to be specified as we are using group management to assign topic partitions to consumers. In this example we will assign it the value'boot'
.
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: boot
kafka:
topic:
boot: boot.t
Scroll down to
# APACHE KAFKA
in the following link in order to get a complete overview of all the Spring Kafka properties that can be set for auto configuration using the Spring Boot application properties file.
Testing the Sender and Receiver #
In order to verify that our code works, a basic SpringKafkaApplicationTest
test case is used. It contains a testReceiver()
unit test case that uses the Sender
to send a message to the 'boot.t'
topic on the Kafka bus. We then use the CountDownLatch
from the Receiver
to verify that a message was successfully received.
The test case runs using the embedded Kafka broker which is started via a JUnit @ClassRule.
Note that we have added a dedicated
application.yml
properties file undersrc/test/resources
in order to override the default broker address with the address of the embedded broker using thespring.kafka.bootstrap-servers
property.
package com.codenotfound.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.junit4.SpringRunner;
import com.codenotfound.kafka.consumer.Receiver;
import com.codenotfound.kafka.producer.Sender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
private static String BOOT_TOPIC = "boot.t";
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BOOT_TOPIC);
@Test
public void testReceive() throws Exception {
sender.send(BOOT_TOPIC, "Hello Boot!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
Fire up the above test case by opening a command prompt and execute following Maven command:
mvn test
Maven will then download the dependencies, compile the code and run the unit test case during which following logs should be generated:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.4.RELEASE)
20:58:18.284 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on cnf-pc with PID 6032 (started by CodeNotFound in c:\codenotfound\code\spring-kafka\spring-kafka-avr
o)
20:58:18.284 [main] DEBUG c.c.kafka.SpringKafkaApplicationTest - Running with Spring Boot v1.5.4.RELEASE, Spring v4.3.9.RELEASE
20:58:18.285 [main] INFO c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
20:58:19.011 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 1.07 seconds (JVM running for 5.484)
20:58:20.348 [main] INFO c.codenotfound.kafka.producer.Sender - sending user='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:20.364 [main] DEBUG c.c.kafka.serializer.AvroSerializer - data='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:20.364 [main] DEBUG c.c.kafka.serializer.AvroSerializer - serialized data='104A6F686E20446F6502000A677265656E'
20:58:20.383 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG c.c.k.serializer.AvroDeserializer - data='104A6F686E20446F6502000A677265656E'
20:58:20.384 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG c.c.k.serializer.AvroDeserializer - deserialized data='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:20.390 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - received user='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:21.599 [main] ERROR o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.248 sec - in com.codenotfound.kafka.SpringKafkaApplicationTest
Results :
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.217 s
[INFO] Finished at: 2017-08-02T20:58:22+02:00
[INFO] Final Memory: 19M/211M
[INFO] ------------------------------------------------------------------------
Using Spring Boot’s autoconfiguration we were able to setup a Sender
and Receiver
using only a couple of lines of code. Hopefully, this example will kick-start your Spring Kafka development.
Feel free to leave a comment in case something was not clear or just to let me know if everything worked.