Exploring Apache Kafka’s APIs: A Guide with Implementation

The Java Trail
8 min readJan 14, 2024

--

Kafka serves as a robust foundation for real-time data processing, offering a distributed streaming platform that excels in constructing dynamic data pipelines and streaming applications. At the heart of Kafka’s architecture are its versatile APIs, allowing developers to seamlessly produce and consume streams of records.

For instance, imagine a scenario where an e-commerce platform utilizes Kafka’s Producer API to efficiently log customer transactions and activities. Simultaneously, various consumer applications employ the Consumer API to process these streams in real-time, enabling functionalities such as personalized recommendations or fraud detection. The Streams API facilitates transformative operations on these data streams, while the Connector API seamlessly integrates Kafka with external systems, ensuring the platform’s adaptability and versatility across diverse use cases.

Four important APIs which is widely used in Kafka’s architecture are described below:

  1. Producer API
  2. Consumer API
  3. Stream API
  4. Connector API (SourceConnector & SinkConnector)

#Producer API:

The Producer API is used to publish data records (messages) to Kafka topics. Producers are responsible for asynchronous and synchronous sending records to Kafka brokers, which then distribute them to the appropriate partitions of the specified topics

Key Components:

  • KafkaProducer: The main class for creating a Kafka producer instance.
  • ProducerRecord: Represents a key-value pair to be sent to a Kafka topic.

Configuration:

  • Bootstrap Servers: bootstrap.servers specifies the initial list of brokers to connect to.
  • Key and Value Serializers: Specify how to serialize keys and values before sending them to Kafka.
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value");
producer.send(record);
producer.close();

#Consumer API:

The Consumer API allows applications to subscribe to Kafka topics and process streams of data records. Consumers pull messages from Kafka topic partitions and can be part of a consumer group for parallel processing.

Key Components:

  • KafkaConsumer: Represents a consumer instance.
  • ConsumerRecord: Represents a key-value pair received from a Kafka topic.

Configuration:

  • Bootstrap Servers: bootstrap.servers specifies the initial list of brokers to connect to.
  • Group ID: group.id identifies the consumer group to which a consumer belongs.
  • Example Use Case: Processing real-time events, analyzing logs, or updating a search index based on changes in a Kafka topic.
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received record with key %s and value %s%n", record.key(), record.value());
}
}

#Streams API:

With the Streams API, Kafka authorizes the systems to work as stream processors engine. It allows developers to build applications that consume input streams from Kafka topics, process the data, and produce output streams to other different Kafka topics.

Key Components:

  • StreamsBuilder: Used for constructing Kafka Streams topology.
  • KStream and KTable: Abstractions representing streams and tables for processing.

Example Use Case: Real-time data transformation, filtering, and aggregation. For an election result dashboard raw data is transformed to serve different data format (seatwise, partywise etc) different other topics

Kafka Streams Processing:

  • The Kafka Streams API is used to create a stream processing application that consumes the events from the Kafka topics.
  • Real-time data transformations are applied to convert raw event data into a format suitable for the dashboard.
  • Filtering is employed to focus on relevant events, discarding noise or non-actionable data.
  • Aggregation is performed to compute live scores, player statistics, and other relevant metrics.
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "live-score-dashboard");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Event> eventsStream = builder.stream("sports-events-topic");

KTable<String, Integer> liveScores = eventsStream
.filter((key, event) -> event.getType().equals("goal"))
.groupBy((key, event) -> event.getMatchId())
.count(Materialized.as("live-scores"));

liveScores.toStream().to("live-scores-topic", Produced.with(Serdes.String(), Serdes.Integer()));

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();

In this simplified example, the Kafka Streams application filters events to only consider goals, groups them by match ID, and maintains a count of goals for each match in a KTable. The aggregated live scores are then sent to a new Kafka topic (live-scores-topic), where they can be consumed by the live score dashboard. It displays real-time scores, player statistics, and other relevant information for each match.

This use case demonstrates how the Kafka Streams API can be applied for real-time data processing, enabling the creation of responsive and accurate live score dashboards for sports enthusiasts.

#Connector API:

The Connector API simplifies the integration of external systems with Kafka. Connectors facilitate the movement of data between Kafka topics and external data sources or sinks.

SourceConnector and SinkConnector: Source connectors ingest data from external systems into Kafka, while sink connectors export data from Kafka to external systems. Source connectors act as producers, and sink connectors act as consumers in the Kafka Connect framework.

Configuration: Connector-specific properties such as file paths, database connection details, or format settings.

Example Use Case: Ingesting data from relational databases, logs, or files into Kafka using source connectors, or exporting data from Kafka to external systems like databases or Elasticsearch using sink connectors.

*Kafka Source Connector Use Cases:

>> Database Change Capture/Event Driven Architecture (Debezium): A company wants to capture changes in its relational database tables (inserts, updates, deletes) in real-time and stream this data to Kafka for further processing.

Use a source connector like Debezium to monitor the database’s change log and produce change events to Kafka topics.

>> Log File Streaming: Centralized log analysis for troubleshooting, monitoring, and compliance. File Source Connector regularly monitors log files and produces records for new log entries.

Aggregates logs from distributed systems into a centralized Kafka topic for real-time analysis.

*Kafka Sink Connector Use Cases:

>> Elasticsearch Indexing/Real Time Search of Updated Products: An e-commerce platform wants to index product data changes from Kafka into Elasticsearch to provide real-time search capabilities.
Configure the Elasticsearch Sink Connector to consume messages from relevant Kafka topics and index them into Elasticsearch.

>> Relational Database Sink (Data Synchronization): An organization needs to synchronize data from Kafka to a relational database for reporting and business intelligence purposes.
Use Case: Use a sink connector like JDBC Sink Connector to map and write Kafka records to corresponding tables in the relational database.

>> Hadoop Data Lake Integration: A company stores large volumes of data in Hadoop and wants to export selected data from Kafka to the Hadoop Data Lake for long-term storage and batch processing.
Use Case: Deploy HDFS Sink Connector to consume messages from Kafka and write them to Hadoop Distributed File System (HDFS).

==========================================================

Use Case Scenario (Kafka Stream API): Real-time Live Score Dashboard

Sports events, such as football matches, generate live score updates. The scores are published to a Kafka topic as individual events.

Objective: Build a live score dashboard that provides real-time updates on ongoing matches.

Producer (Score Updater): Publishes live score updates to a Kafka topic.

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

ProducerRecord<String, String> record = new ProducerRecord<>("score_updates_topic", "match_id", "score_data");
producer.send(record);
producer.close();

Kafka Streams Processing (Live Score Processor): Reads live score updates from the Kafka topic, Transforms and filters the data, Aggregates scores for each match.

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> scoreUpdatesStream = builder.stream("score_updates_topic");

// Transform data (parse JSON, extract relevant fields)
KStream<String, ScoreData> transformedStream = scoreUpdatesStream
.mapValues(value -> jsonParser.parse(value, ScoreData.class));

// Filter only ongoing matches
KStream<String, ScoreData> ongoingMatchesStream = transformedStream
.filter((key, scoreData) -> scoreData.getStatus().equals("ongoing"));

// Aggregate scores for each match
KTable<String, Integer> matchScoresTable = ongoingMatchesStream
.groupBy((key, scoreData) -> scoreData.getMatchId())
.aggregate(
() -> 0,
(key, scoreData, aggregate) -> aggregate + scoreData.getScore(),
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("match_scores_store")
);

// Send aggregated scores to an output topic
matchScoresTable.toStream().to("live_scores_dashboard_topic", Produced.with(Serdes.String(), Serdes.Integer()));

KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

Consumer (Live Score Dashboard UI): Consumes aggregated scores from the output topic and updates the live score dashboard UI in real-time.

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "live_score_dashboard_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");

KafkaConsumer<String, Integer> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("live_scores_dashboard_topic"));

while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Integer> record : records) {
// Update live score dashboard UI with the received match scores
updateLiveScoreDashboard(record.key(), record.value());
}
}

=========================================================

Real-Life Scenario (Connector API): E-Commerce Product Catalog and Search

Consider an e-commerce platform with a large product catalog, where the product data is stored in a relational database (e.g., MySQL). The platform aims to provide real-time updates to users about product changes, maintain an efficient and fast search experience, and support analytics on user interactions with the product catalog.

1. Debezium MySQL Connector Configuration: The goal is to keep the product catalog information up-to-date across various microservices, analytics pipelines, and denormalized views.

Configure Debezium to capture changes in the product tables of the MySQL database and produce change events for inserts, updates, and deletes to Kafka topics.

{
"name": "mysql-source-connector",
"config": {
// Debezium MySQL Connector class
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// MySQL database connection details
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "user",
"database.password": "password",
// Unique identifier for the connector
"database.server.id": "1",
// Logical name for the MySQL server, used as Kafka topic prefix
"database.server.name": "ecommerce-db",
// Tables to monitor (whitelist)
"table.whitelist": "product_table",
// Prefix for Kafka topics to which change events are sent
"topic.prefix": "product-change-events-",
// Bootstrap servers for storing database schema history in Kafka
"database.history.kafka.bootstrap.servers": "localhost:9092",
// Kafka topic for storing database schema changes
"database.history.kafka.topic": "schema-changes"
}
}

With this configuration, the Debezium connector will produce topics like:

  • product-change-events-new-product
  • product-change-events-update-product
  • product-change-events-delete-product

The table.whitelist configuration specifies which tables to capture changes from.

2. Elasticsearch Sink Connector: Configure the Elasticsearch Sink Connector to consume Kafka messages containing product change events.

  • Users can perform near-instantaneous searches on the product catalog stored in Elasticsearch. Search results reflect the latest changes in the product catalog, ensuring a real-time search experience.
{
"name": "elasticsearch-sink-connector",
"config": {
// Elasticsearch Sink Connector class
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
// Maximum number of tasks for parallelism
"tasks.max": "1",
// Comma-separated list of Kafka topics to consume from
"topics": "product-change-events-new-product,product-change-events-update-product,product-change-events-delete-product",
// URL for connecting to Elasticsearch
"connection.url": "http://elasticsearch-host:9200",

// Specifies the document type in Elasticsearch (deprecated in recent versions)
"type.name": "product-change-event",

// Converters for serializing Kafka key and value to JSON
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

// Specifies how to handle null values
"behavior.on.null.values": "ignore"
}
}
  • Connects to Elasticsearch at the specified URL.
  • Consumes messages from Kafka topics related to product changes.
  • Indexes documents into Elasticsearch with the type “product-change-event.”
  • Converts Kafka key and value to JSON format.

Message Formats

// For product-change-events-new-product
{
"op": "c",
"after": { "id": 1, "name": "New Product", "price": 29.99 }
}

// For product-change-events-update-product
{
"op": "u",
"before": { "id": 1, "name": "Original Product", "price": 19.99 },
"after": { "id": 1, "name": "Updated Product", "price": 24.99 }
}

// For product-change-events-delete-product
{
"op": "d",
"before": { "id": 1, "name": "Deleted Product", "price": 29.99 }
}
  • op: Indicates the operation type (c for create, u for update, d for delete).
  • before: Represents the state of the record before the change (applicable for updates and deletes).
  • after: Represents the state of the record after the change (applicable for inserts and updates).

Workflow:

  1. A new product is added, an existing product is updated, or a product is removed from the e-commerce platform.
  2. Debezium captures changes in the MySQL database (“product_table”) and produces messages to Kafka topics (“product-change-events-…”).
  3. Elasticsearch Sink Connector consumes messages from specified Kafka topics and indexes them into Elasticsearch.
  4. Microservices subscribe to relevant Kafka topics for real-time updates on the product catalog.
  5. Users interact with the e-commerce platform, and Elasticsearch provides efficient and real-time search capabilities.

--

--

The Java Trail

Scalable Distributed System, Backend Performance Optimization, Java Enthusiast. (mazumder.dip.auvi@gmail.com Or, +8801741240520)