Mastering Serialization and Deserialization in Kafka Streams: Building Efficient Data Pipelines

The Java Trail
12 min readMar 29, 2024

Serialization and deserialization (SerDes) are fundamental operations in Kafka Streams, facilitating the conversion of data between its binary representation and the native data types of programming languages. Let’s delve deeper into these concepts with examples:

Serialization: (Object to Binary)

Serialization is the process of converting data objects into a binary format suitable for transmission or storage.

Purpose: Data in Kafka is stored and transmitted in binary format for efficiency. Serialization converts objects into bytes for network transmission or storage in Kafka topics.

Example: When producing messages to a Kafka topic in Kafka Streams, you serialize the key and value objects into byte arrays using a serializer.

  • For instance, if you have a Java object representing a message with a key and value, you would use a serializer like StringSerializer, IntegerSerializer, JsonSerializer, AvroSerializer, etc., to convert these objects into bytes before sending them to Kafka.

Deserialization: (Binary to Object)

Deserialization is the process of converting binary data back into its original object form.

Purpose: When consuming messages from Kafka topics, deserialization converts byte arrays back into their original data types.

Example: When consuming messages from a Kafka topic in Kafka Streams, you deserialize the byte arrays back into key and value objects using a deserializer.

  • For instance, if you’re consuming messages with keys and values represented as strings, integers, JSON objects, Avro records, etc., you would use deserializers like StringDeserializer, IntegerDeserializer, JsonDeserializer, AvroDeserializer, etc., to convert the bytes back into their respective data types.

Importance of Serialization & Deserialization

Serialization and deserialization (SerDes) play crucial roles in Kafka and Kafka Streams for efficient data transmission, storage, and processing. Here’s why they are required:

  1. Efficient Data Transmission: SerDes convert data objects into compact binary format, reducing message size for efficient transmission over the network. Smaller messages consume less bandwidth, leading to improved performance and reduced latency in data transmission.
  2. Data Persistence: Serialized data occupies less disk space, optimizing storage utilization in Kafka brokers.
  3. Interoperability: SerDes enable compatibility between producers and consumers implemented in different languages or platforms.
  4. Schema Evolution: Serialization frameworks like Avro, Protobuf, or JSON Schema support schema evolution, allowing flexible changes to data schemas over time. SerDes manage schema compatibility, ensuring that producers and consumers can handle evolving data structures without disruptions.

Custom Serialize Deserialize

Custom serializers and deserializers are used in Kafka Streams when the default serializers and deserializers provided by Kafka do not meet your requirements.

When custom serialization and deserialization is necessary

  1. Custom Data Formats: When your data is in a format that is not supported by the built-in serializers and deserializers of Kafka (e.g., XML, Protocol Buffers, Avro, JSON), you can create custom serializers and deserializers to handle these data formats.
  2. Complex Data Types: If your data contains complex data types that require specialized handling during serialization and deserialization, such as nested structures or custom objects, custom serializers and deserializers can be implemented to handle these complexities.
  3. Performance Optimization: Custom serialization and deserialization can be optimized for performance in specific use cases, such as reducing the size of serialized data, improving serialization/deserialization speed, or minimizing resource utilization.
  4. Integration with External Systems: When integrating Kafka Streams with external systems or services that use non-standard data formats, custom serialization and deserialization may be necessary to ensure compatibility and interoperability with external system.
  5. Data Transformation: If your application requires data transformation or preprocessing before serialization or after deserialization, custom serializers and deserializers can be used to implement these transformations.

Suppose you have a Kafka Streams application that processes financial transactions represented by the Transaction class. Each Transaction object contains various fields such as transactionId, amount, timestamp, etc. Additionally, you want to include additional metadata in the serialized form of the transaction, such as the source system from which the transaction originated.

public class TransactionSerializer implements Serializer<Transaction> {
@Override
public byte[] serialize(String topic, Transaction data) {
// Serialize the Transaction object and include additional metadata
// Example: transactionId|amount|timestamp|sourceSystem
String serializedData = data.getTransactionId() + "|" +
data.getAmount() + "|" +
data.getTimestamp() + "|" +
data.getSourceSystem();
return serializedData.getBytes(StandardCharsets.UTF_8);
}
}

public class TransactionDeserializer implements Deserializer<Transaction> {
@Override
public Transaction deserialize(String topic, byte[] data) {
// Deserialize the byte array and extract fields to create Transaction object
String[] fields = new String(data, StandardCharsets.UTF_8).split("\\|");
String transactionId = fields[0];
double amount = Double.parseDouble(fields[1]);
long timestamp = Long.parseLong(fields[2]);
String sourceSystem = fields[3];
return new Transaction(transactionId, amount, timestamp, sourceSystem);
}
}

Kafka Producer Configuration:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TransactionSerializer.class.getName());

Producer<String, Transaction> producer = new KafkaProducer<>(props);

Kafka Consumer Configuration:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TransactionDeserializer.class.getName());

Consumer<String, Transaction> consumer = new KafkaConsumer<>(props);

In this example, the custom serializer (TransactionSerializer) serializes the Transaction object into a custom string format that includes the additional metadata (sourceSystem). The custom deserializer (TransactionDeserializer) deserializes the custom string format back into a Transaction object. This allows you to include custom metadata in the serialized form of the transaction while maintaining compatibility with Kafka Streams.

Schema Registry

Schema Registry is a centralized service that manages and stores schemas for data exchanged in Apache Kafka. It provides a repository for storing and retrieving Avro, JSON Schema, or Protobuf schemas used for serializing and deserializing data in Kafka.

Why Schema Registry?

Kafka, at its core, only transfers data in byte format. There is no data verification that’s being done at the Kafka cluster level. In fact, Kafka doesn’t even know what kind of data it is sending or receiving; whether it is a string or integer.

Producer sending data in byte format to Kafka Cluster and being consumed by a consumer.

Due to the decoupled nature of Kafka, producers and consumers do not communicate with each other directly, but rather information transfer happens via Kafka topic. At the same time, the consumer still needs to know the type of data the producer is sending in order to deserialize it. Imagine if the producer starts sending bad data to Kafka or if the data type of your data gets changed. Your downstream consumers will start breaking. We need a way to have a common data type that must be agreed upon.

Advantages of using Schema Registry

  • Centralized schema management and storage, which makes it easier to track and maintain different versions of schemas used by various producers and consumers.
  • Schema validation, which means Schema Registry validates the structure and compatibility of schemas. This ensures that topic message data conforms to a standard format and is error-free, reducing the risk of data loss or corruption.
  • Compatibility checking of schemas between producers and consumers to ensure that message data can be consumed by different applications and systems without resulting in errors or data loss due to message formatting.
  • Versioning of schemas, which allows for updates to schemas without breaking compatibility with existing data. This provides a smooth transition to new versions of a schema with continued support for legacy data, and reduces the need for expensive and time-consuming data migration.

Problems Without Schema Registry

  1. No Schema Validation: Without Schema Registry, there’s no centralized repository for managing schemas. Each producer and consumer needs to handle schema management independently. This can lead to inconsistencies and compatibility issues between different versions of producers and consumers.
  2. You’ll need to implement custom serialization and deserialization logic for your data types. This can be error-prone and time-consuming, especially for complex data structures.
  3. Potential Data Inconsistencies: Without a centralized schema registry, there’s a risk of data inconsistencies between different components of your system. For example, if a producer uses one version of the schema while a consumer expects a different version, it can lead to data parsing errors or incorrect data interpretation.

That’s where Schema Registry comes into the picture. It is an application that resides outside of your Kafka cluster and handles the distribution of schemas to the producer and consumer by storing a copy of schema in its local cache.

How Schema Registry Works

With the schema registry in place, the producer, before sending the data to Kafka, talks to the schema registry first and checks if the schema is available. If it doesn’t find the schema then it registers and caches it in the schema registry. Once the producer gets the schema, it will serialize the data with the schema and send it to Kafka in binary format prepended with a unique schema ID. When the consumer processes this message, it will communicate with the schema registry using the schema ID it got from the producer and deserialize it using the same schema. If there is a schema mismatch, the schema registry will throw an error letting the producer know that it’s breaking the schema agreement.

Schema Evolution Management: The Schema Registry acts as a centralized repository for managing schemas (i.e., the structure of data) used for serialization and deserialization. It provides a mechanism for registering, storing, and retrieving schemas associated with different data types or topics.

Serialization: When serializing data objects into binary format, Kafka producers consult the Schema Registry to obtain the schema corresponding to the data type being serialized. The schema is used to encode the data object into a binary representation compliant with the schema format (e.g., Avro schema). The serialized data, along with the schema ID, is then published to Kafka topics.

Deserialization: When consuming messages from Kafka topics, >> Kafka consumers retrieve the schema ID associated with each message from a message header. >> The schema ID is used to fetch the corresponding schema from the Schema Registry. >> Deserializers then utilize the retrieved schema to decode the binary message payload back into its original data object form.

Schema Evolution: The Schema Registry supports schema evolution, allowing schemas to evolve over time while ensuring backward and forward compatibility. This enables seamless interoperability between producers and consumers even as schemas evolve, ensuring that old and new versions of data can be serialized and deserialized without errors.

Avro Schema

If you choose to use Avro serialization for your Kafka messages, it’s recommended to integrate Schema Registry into your architecture. The Schema Registry acts as a central repository for storing and managing Avro schemas. It ensures that all producers and consumers use compatible schemas when serializing and deserializing data.

SerDes in Kafka Streams:


// Define a custom class for data serialization
class CustomData {
private String key;
private int value;
// Constructor, getters, setters
}

public class SerDesExample {

public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "serdes-example");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

// Define default SerDes for string key and integer value
Serde<String> stringSerde = Serdes.String();
Serde<Integer> intSerde = Serdes.Integer();

// Define custom SerDes for CustomData class
Serializer<CustomData> customDataSerializer = new CustomDataSerializer();
Deserializer<CustomData> customDataDeserializer = new CustomDataDeserializer();
Serde<CustomData> customDataSerde = Serdes.serdeFrom(customDataSerializer, customDataDeserializer);

// Read from input topic using default SerDes
KStream<String, Integer> inputStream = builder.stream("input-topic", Consumed.with(stringSerde, intSerde));

// Process stream and transform data using default SerDes
inputStream.mapValues(value -> value * 2)
.to("output-topic", Produced.with(stringSerde, intSerde));

// Read from input topic using custom SerDes
KStream<String, CustomData> customStream = builder.stream("custom-input-topic", Consumed.with(stringSerde, customDataSerde));

// Process stream and transform data using custom SerDes
customStream.mapValues(customData -> new CustomData(customData.getKey(), customData.getValue() * 2))
.to("custom-output-topic", Produced.with(stringSerde, customDataSerde));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}

// Custom serializer for CustomData class
static class CustomDataSerializer implements Serializer<CustomData> {
@Override
public byte[] serialize(String topic, CustomData data) {
// Serialization logic for CustomData object
return null;
}
}

// Custom deserializer for CustomData class
static class CustomDataDeserializer implements Deserializer<CustomData> {
@Override
public CustomData deserialize(String topic, byte[] data) {
// Deserialization logic for CustomData object
return null;
}
}
}

Purpose: SerDes ensures that data can be transmitted efficiently and reliably between producers and consumers in Kafka while preserving the structure and content of the data.

Default SerDes: Kafka provides default implementations of SerDes for common data types such as strings, integers, longs, bytes, and other primitive types. These default SerDes are readily available for use without the need for explicit implementation, making it convenient for handling basic data types in Kafka applications.

Custom SerDes: In addition to default SerDes, Kafka allows developers to define custom SerDes for handling complex data types or formats that are not natively supported.

Custom SerDes can be implemented to handle serialization and deserialization of custom Java classes or data formats like JSON, Avro, Protobuf, etc.

Option 1: Using Custom Avro Serializer Deserializer

To implement custom SerDes for Avro data format in Apache Kafka, you’ll need to follow these steps:

Step 1: Define your Avro schema & Generate Java classes from the Avro schema using maven build.

{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"}
]
}

Step 2: Create custom serializers and deserializers for the generated Java classes.



public class OrderAvroSerializer implements Serializer<Order> {

@Override
public byte[] serialize(String topic, Order data) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<Order> writer = new SpecificDatumWriter<>(Order.class);
writer.write(data, encoder);
encoder.flush();
out.close();
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Error serializing Avro message", e);
}
}
}

public class OrderAvroDeserializer implements Deserializer<Order> {

@Override
public Order deserialize(String topic, byte[] data) {
try {
ByteArrayInputStream in = new ByteArrayInputStream(data);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
DatumReader<Order> reader = new SpecificDatumReader<>(Order.class);
Order result = reader.read(null, decoder);
in.close();
return result;
} catch (IOException e) {
throw new RuntimeException("Error deserializing Avro message", e);
}
}
}

Step 3: Add custom avro serializer/desrializer in application.yml:

spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.example.kafka.OrderAvroSerializer
consumer:
bootstrap-servers: localhost:9092
group-id: order-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.example.kafka.OrderAvroDeserializer

Step 4: Use the serializer/deserializer in producer or consumer


@Component
public class OrderProducer {

private static final String TOPIC = "orders-topic";

@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

public void sendOrder(Order order) {
kafkaTemplate.send(TOPIC, order.getOrderId(), order);
}
}

@Component
public class OrderConsumer {

private static final String TOPIC = "orders-topic";

@KafkaListener(topics = TOPIC, groupId = "order-consumer-group")
public void consumeOrder(Order order) {
System.out.println("Received order: " + order);
// Process the order as needed
}
}

Option 2: Using Confluent’s Avro Serializer Deserializer

Producer, Consumer, Schema Registry Implementation

Step1: Avro Schema (transaction.avsc):

{
"type": "record",
"name": "Transaction",
"fields": [
{"name": "id", "type": "int"},
{"name": "amount", "type": "double"},
{"name": "merchant", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}

Step 2:pom.xml (Maven Configuration): Java class generated from Avro schema using Maven into Transaction Model (Transaction.java):

<build>
<plugins>
<!-- Avro Plugin -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<includes>
<include>*.avsc</include>
</includes>
<fieldVisibility>private</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Step 3: Kafka Producer (KafkaProducerService.java):

@Service
public class KafkaProducerService {
@Value("${spring.kafka.producer.topic}")
private String topic;
private final KafkaTemplate<String, Transaction> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, Transaction> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void produceTransaction(Transaction transaction) {
ListenableFuture<SendResult<String, Transaction>> future = kafkaTemplate.send(topic, transaction.getId().toString(), transaction);
future.addCallback(System.out::println, e -> System.err.println("Error producing transaction: " + e.getMessage()));
}
}

Step 4: Kafka Consumer (KafkaConsumerService.java):

@Service
public class KafkaConsumerService {
@Value("${spring.kafka.consumer.topic}")
private String topic;
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeTransaction(ConsumerRecord<String, Transaction> record) {
Transaction transaction = record.value();
// Perform fraud detection logic here
System.out.println("Received transaction: " + transaction);
}
}

Step 5: application.yml (Kafka Configuration):

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
topic: financial-transactions
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://schema-registry:8081
consumer:
topic: financial-transactions
group-id: fraud-detection-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://schema-registry:8081

How data/schema validation occurs here?

During Serialization (Producer Side):

  • When producing messages to Kafka, the data (transactions) are serialized into a binary format using the Avro serializer (io.confluent.kafka.serializers.KafkaAvroSerializer).
  • Before serialization, the data is validated against the Avro schema (transaction.avsc). Any discrepancies or mismatches between the data and the schema will result in an error during serialization.
  • The Avro serializer includes the schema ID in the message header, which allows consumers to fetch the corresponding schema from the Schema Registry during deserialization.

During Deserialization (Consumer Side):

  • When consuming messages from Kafka, the Avro deserializer (io.confluent.kafka.serializers.KafkaAvroDeserializer) retrieves the schema ID from the message header.
  • The deserializer then fetches the corresponding Avro schema from the Schema Registry using the schema ID.
  • The deserializer deserializes the binary data back into its original Java object form, ensuring that the data adheres to the schema.
  • If the serialized data does not conform to the expected schema (e.g., missing fields, incorrect data types), deserialization will fail, and an error will be raised.

--

--

The Java Trail

Scalable Distributed System, Backend Performance Optimization, Java Enthusiast. (mazumder.dip.auvi@gmail.com Or, +8801741240520)