Spring Kafka - JSON Serializer Deserializer Example
Table of Contents
JSON (JavaScript Object Notation) is a lightweight data-interchange format that uses human-readable text to transmit data objects. It is built on two structures: a collection of name/value pairs and an ordered list of values.
The following tutorial illustrates how to send/receive a Java object as a JSON byte[]
array to/from Apache Kafka using Spring Kafka, Spring Boot and Maven.
General Project Setup #
Tools used:
- Spring Kafka 1.2
- Spring Boot 1.5
- Maven 3.5
Apache Kafka stores and transports Byte
arrays in its topics. It ships with a number of built in (de)serializers but a JSON one is not included. Luckily, the Spring Kafka framework includes a support package that contains a JSON (de)serializer that uses a Jackson ObjectMapper
under the covers.
We base the below example on a previous Spring Kafka example. The only thing that needs to be added to the Maven POM file for working with JSON is the spring-boot-starter-web
dependency which will indirectly include the needed jackson-*
JAR dependencies.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.codenotfound</groupId>
<artifactId>spring-kafka-json</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-kafka-json</name>
<description>Spring Kafka - JSON Serializer Deserializer Example</description>
<url>https://www.codenotfound.com/spring-kafka-json-serializer-deserializer-example.html</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- spring-boot-maven-plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Object Model to Serialize/Deserialize #
To illustrate the example we will send a Car
object to a json.t
topic. Let’s use following class representing a car with a basic structure.
package com.codenotfound.model;
public class Car {
private String make;
private String manufacturer;
private String id;
public Car() {
super();
}
public Car(String make, String manufacturer, String id) {
super();
this.make = make;
this.manufacturer = manufacturer;
this.id = id;
}
public String getMake() {
return make;
}
public void setMake(String make) {
this.make = make;
}
public String getManufacturer() {
return manufacturer;
}
public void setManufacturer(String manufacturer) {
this.manufacturer = manufacturer;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public String toString() {
return "Car [make=" + make + ", manufacturer=" + manufacturer + ", id=" + id + "]";
}
}
Producing JSON Messages to a Kafka Topic #
In order to use the JsonSerializer
, shipped with Spring Kafka, we need to set the value of the producer’s VALUE_SERIALIZER_CLASS_CONFIG
configuration property to the JsonSerializer
class. In addition, we change the ProducerFactory
and KafkaTemplate
generic type so that it specifies Car
instead of String
. This will result in the Car
object to be serialized in a JSON byte[]
message.
package com.codenotfound.kafka.producer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.codenotfound.model.Car;
@Configuration
public class SenderConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Car> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Car> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
}
The Sender
class is updated accordingly so that it’s send()
method accepts a Car
object as input. We also update the KafkaTemplate
generic type from String
to Car
.
package com.codenotfound.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.model.Car;
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Value("${kafka.topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Car> kafkaTemplate;
public void send(Car car) {
LOGGER.info("sending car='{}'", car.toString());
kafkaTemplate.send(jsonTopic, car);
}
}
Consuming JSON Messages from a Kafka Topic #
To receive the JSON serialized message we need to update the value of the VALUE_DESERIALIZER_CLASS_CONFIG
property so that it points to the JsonDeserializer
class. The ConsumerFactory
and ConcurrentKafkaListenerContainerFactory
generic type needs to be changed so that it specifies Car
instead of String
.
Note that the
JsonDeserializer
requires aClass<?>
argument to allow the deserialization of a consumedbyte[]
to the proper target object (in this example theCar
class).
package com.codenotfound.kafka.consumer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.codenotfound.model.Car;
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
return props;
}
@Bean
public ConsumerFactory<String, Car> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Car.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Car> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Car> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
Identical to the updated Sender
class, the argument of the receive()
method of the Receiver
class needs to be changed to the Car
type.
package com.codenotfound.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import com.codenotfound.model.Car;
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kakfa.topic.json}")
public void receive(Car car) {
LOGGER.info("received car='{}'", car.toString());
latch.countDown();
}
}
Test Sending and Receiving JSON Messages on Kafka #
The Maven project contains a SpringKafkaApplicationTest
test case to demonstrate the above sample code. A JUnit ClassRule starts an embedded Kafka and ZooKeeper server.
Using @Before
we wait until all the partitions are assigned to our Receiver
by looping over the available ConcurrentMessageListenerContainer
(if we don’t do this the message will already be sent before the listeners are assigned to the topic).
In the testReceiver()
test case we create a Car
object and send it to the json.t
topic. Finally the CountDownLatch
from the Receiver
is used to verify that a message was successfully received.
package com.codenotfound.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import com.codenotfound.kafka.consumer.Receiver;
import com.codenotfound.kafka.producer.Sender;
import com.codenotfound.model.Car;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "json.t");
@Before
public void setUp() throws Exception {
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
@Test
public void testReceive() throws Exception {
Car car = new Car("Passat", "Volkswagen", "ABC-123");
sender.send(car);
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
In order to run the above example open a command prompt and execute following Maven command:
mvn test
Maven will download the needed dependencies, compile the code and run the unit test case. The result should be a successful build during which following logs are generated:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.4.RELEASE)
16:38:11.745 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on cnf-pc with PID 6116 (started by CodeNotFound in c:\codenotfound\code\spring-kafka\spring-kafka-json)
16:38:11.745 [main] INFO c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
16:38:13.633 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 2.184 seconds (JVM running for 6.418)
16:38:15.021 [main] INFO c.codenotfound.kafka.producer.Sender - sending car='Car [make=Passat, manufacturer=Volkswagen, id=ABC-123]'
16:38:15.115 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - received car='Car [make=Passat, manufacturer=Volkswagen, id=ABC-123]'
16:38:18.391 [main] ERROR o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.622 sec - in com.codenotfound.kafka.SpringKafkaApplicationTest
Results :
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.714 s
[INFO] Finished at: 2017-08-02T16:38:19+02:00
[INFO] Final Memory: 29M/214M
[INFO] ------------------------------------------------------------------------
This concludes the example on how to use the Spring Kafka JsonSerializer/JsonDeserializer in combination with Apache Kafka.
If you have some questions or remarks, drop me a line below.