Spring Kafka - Avro Bijection Example
Table of Contents
Twitter Bijection is an invertible function library that converts back and forth between two types. It supports a number of types including Apache Avro.
In the following tutorial, we will configure, build and run an example in which we will send/receive an Avro message to/from Apache Kafka using Bijection, Apache Avro, Spring Kafka, Spring Boot and Maven.
General Project Setup #
Tools used:
- Twitter Bijection 0.9
- Apache Avro 1.8
- Spring Kafka 1.2
- Spring Boot 1.5
- Maven 3.5
We base this example on a previous Spring Kafka Avro serializer/deserializer example in which we used the Avro API’s to serialize and deserialize objects. For this code sample, we will be using the Bijection APIs which are a bit easier to use as we will see further down below.
Starting point is again the user.avsc
schema from the Avro getting started guide. It describes the fields and their types of a User
type.
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
We setup our project using Maven. In the POM file we add the bijection-avro_2.11
dependency. The artifactId
suffix of the dependency (in this case _2.11) highlights the Scala version used to compile the JAR.
Note that we choose the
2.11
version ofbijection-avro
sincespring-kafka-test
includes a dependency on the2.11
version of thescala-library
.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.codenotfound</groupId>
<artifactId>spring-kafka-avro-bijection</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-kafka-avro-bijection</name>
<description>Spring Kafka - Avro Bijection Example</description>
<url>https://www.codenotfound.com/spring-kafka-avro-bijection-example.html</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
<avro.version>1.8.2</avro.version>
<bijection.version>0.9.5</bijection.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
<!-- avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<!-- bijection-avro -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.11</artifactId>
<version>${bijection.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- spring-boot-maven-plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- avro-maven-plugin -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
<outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Generation of the Avro User
class is done by executing below Maven command. The result is a User
class that contains the schema and Builder
methods.
mvn generate-sources
Producing Avro Messages to a Kafka Topic #
Serializing an Avro message to a byte[]
array using Bijection can be achieved in just two lines of code as shown below.
We first create an Injection
which is an object that can make the conversion in one way or the other. This is done by calling the static toBinary()
method on the GenericAvroCodecs
class. The result is an Injection
capable of serializing and deserializing a generic Avro record using org.apache.avro.io.BinaryEncoder
. As an input parameter, we need to supply the Avro schema which we get from the passed object.
The ‘apply()’ method is then used to create the Byte
array which is returned.
package com.codenotfound.kafka.serializer;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroSerializer.class);
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// No-op
}
@Override
public byte[] serialize(String topic, T data) {
LOGGER.debug("data to serialize='{}'", data);
Injection<GenericRecord, byte[]> genericRecordInjection =
GenericAvroCodecs.toBinary(data.getSchema());
byte[] result = genericRecordInjection.apply(data);
LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result));
return result;
}
}
Consuming Avro Messages from a Kafka Topic #
Deserializing an Avro message from a byte[]
array using Bijection is also done using an Injection
. Creation is identical as to what we did in the AvroSerializer
class.
We then create a GenericRecord from the received data using the invert()
method. Finally, using deepCopy()
we extract the received data object and return it.
package com.codenotfound.kafka.serializer;
import java.util.Arrays;
import java.util.Map;
import javax.xml.bind.DatatypeConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
protected final Class<T> targetType;
public AvroDeserializer(Class<T> targetType) {
this.targetType = targetType;
}
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// No-op
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(String topic, byte[] data) {
LOGGER.debug("data to deserialize='{}'", DatatypeConverter.printHexBinary(data));
try {
// get the schema
Schema schema = targetType.newInstance().getSchema();
Injection<GenericRecord, byte[]> genericRecordInjection = GenericAvroCodecs.toBinary(schema);
GenericRecord genericRecord = genericRecordInjection.invert((byte[]) data).get();
T result = (T) SpecificData.get().deepCopy(schema, genericRecord);
LOGGER.debug("data='{}'", result);
return result;
} catch (Exception e) {
throw new SerializationException(
"Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", e);
}
}
}
Test Sending and Receiving Avro Messages on Kafka #
The SpringKafkaApplicationTest
test case demonstrates the above sample code. An embedded Kafka and ZooKeeper server are automatically started using a JUnit ClassRule
.
Using @Before
we wait until all the partitions are assigned to our Receiver
by looping over the available ConcurrentMessageListenerContainer
(if we don’t do this the message will already be sent before the listeners are assigned to the topic).
In the testReceiver()
test case an Avro User
object is created using the Builder
methods. This user is then sent to the avro-bijection.t
topic. Finally, the CountDownLatch
from the Receiver
is used to verify that a message was successfully received.
package com.codenotfound.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import com.codenotfound.kafka.consumer.Receiver;
import com.codenotfound.kafka.producer.Sender;
import example.avro.User;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "avro-bijection.t");
@Before
public void setUp() throws Exception {
// wait until the partitions are assigned
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafka.getPartitionsPerTopic());
}
}
@Test
public void testReceiver() throws Exception {
User user = User.newBuilder().setName("John Doe").setFavoriteColor("blue")
.setFavoriteNumber(null).build();
sender.send(user);
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
Note that the sample code also contains
AvroSerializerTest
andAvroDeserializerTest
unit test cases to verify the serialization classes.
Trigger the above test case using a command prompt and following Maven command:
mvn test
Maven will do the necessary and the outcome should be a successful build as shown below:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.4.RELEASE)
20:53:53.927 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on cnf-pc with PID 3880 (started by CodeNotFound in c:\codenotfound\code\spring-kafka\spring-kafka-avro-bijection)
20:53:53.929 [main] INFO c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
20:53:54.614 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 0.987 seconds (JVM running for 5.282)
20:53:55.947 [main] INFO c.codenotfound.kafka.producer.Sender - sending user='{"name": "John Doe", "favorite_number": null, "favorite_color": "blue"}'
20:53:55.964 [main] INFO c.c.kafka.serializer.AvroSerializer - data to serialize='{"name": "John Doe", "favorite_number": null, "favorite_color": "blue"}'
20:53:55.964 [main] INFO c.c.kafka.serializer.AvroSerializer - serialized data='104A6F686E20446F65020008626C7565'
20:53:55.986 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.k.serializer.AvroDeserializer - data to deserialize='104A6F686E20446F65020008626C7565'
20:53:55.987 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.k.serializer.AvroDeserializer - data='{"name": "John Doe", "favorite_number": null, "favorite_color": "blue"}'
20:53:55.992 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - received user='{"name": "John Doe", "favorite_number": null, "favorite_color": "blue"}'
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.069 sec - in com.codenotfound.kafka.SpringKafkaApplicationTest
Results :
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10.920 s
[INFO] Finished at: 2017-08-02T20:53:58+02:00
[INFO] Final Memory: 19M/211M
[INFO] ------------------------------------------------------------------------
This wraps up the example on how to send/receive Avro messages using Twitter Bijection and Spring Kafka.
Let me know if something is missing or if you were able to successfully use the above code.