This is the third post in this series where we go through the basics of using Kafka. We saw in the previous posts how to produce and consume JSON messages using the plain Java client and Jackson. We will see here how to create our own serializers and deserializers.
What we are going to build in this tutorial
In the previous posts, we had created a Kotlin data class for our data model:
data class Person(
val firstName: String,
val lastName: String,
val birthDate: Date
)
We were then using a Jackson ObjectMapper
to convert data between Person
objects and JSON strings:
{"firstName":"Quentin","lastName":"Corkery","birthDate":"1984-10-26T03:52:14.449+0000"}
...
We had seen that we were using a StringSerializer
in the producer, and a StringDeserializer
in the consumer. We will now see how to build our own SerDe (Serializer/Deserializer) to abstract the serialization/deserialization process away from the main code of the application.
The serializer
To build a serializer, the first thing to do is to create a class that implements the org.apache.kafka.common.serialization.Serializer
interface. This is a generic type so that you can indicate what type is going to be converted into an array of bytes:
class PersonSerializer : Serializer<Person> {
override fun serialize(topic: String, data: Person?): ByteArray? {
if (data == null) return null
return jsonMapper.writeValueAsBytes(data)
}
override fun close() {}
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {}
}
Notice that you might have to "help" the Kotlin compiler a little to let it know whether the data types are nullable or not (e.g. Person
is non-nullable, Person?
is nullable). In this case, I made the data
parameter as well as the return value nullable so as to account for null values, just in case.
We can then replace the StringSerializer
with our own serializer when creating the producer, and change the generic type of our producer:
private fun createProducer(brokers: String): Producer<String, Person> {
...
props["value.serializer"] = PersonSerializer::class.java
return KafkaProducer<String, Person>(props)
}
We can now send Person
objects in our records without having the convert them to String
by hand:
val fakePerson = Person(...)
val futureResult = producer.send(ProducerRecord(personsTopic, fakePerson))
The deserializer
In a similar fashion, we can build a deserializer by creating a class that implements the org.apache.kafka.common.serialization.Deserializer
interface:
class PersonDeserializer : Deserializer<Person> {
override fun deserialize(topic: String, data: ByteArray?): Person? {
if (data == null) return null
return jsonMapper.readValue(data, Person::class.java)
}
override fun close() {}
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {}
}
We then update the code that creates the consumer:
private fun createConsumer(brokers: String): Consumer<String, Person> {
...
props["value.deserializer"] = PersonDeserializer::class.java
return KafkaConsumer<String, Person>(props)
}
Finally, the value of our records contain Person
objects rather than String
s:
records.iterator().forEach {
val person: Person = it.value()
...
}
Conclusion
We have seen how to create our own SerDe to abstract away the serialization code from the main logic of our application. That was simple, but you now know how a Kafka SerDe works in case you need to use an existing one or build your own.
As Avro is a common serialization type for Kafka, we will see how to use Avro in the next post.
The code of this tutorial can be found here.
Feel free to ask questions in the comments section below!