極光日記

【Java / Spring】Introduction to Spring for Apache Kafka

Created:

Introduction

To study Spring and Kafka, I created an application that sends and receives messages to and from Kafka by following the official Quick Tour. This application can be used to verify the behavior of Spring’s Kafka integration features. The source code has been pushed to the following repository:

Running the Application

Below are the steps to clone the repository mentioned above and verify its behavior. The application was tested with Java 17, and it should work without issues on later versions as well.

First, start Kafka using the provided compose.yaml file for testing:

docker compose up -d

Next, start the application:

./gradlew bootRun

Accessing /send triggers the Spring application to send a message to Kafka, so send a simple request using curl:

curl -X GET "http://localhost:8080/send?message=HelloKafka"

This application not only sends messages but also consumes messages from the same topic. As a result, as soon as you send a message via curl, the application immediately receives the same message. Received messages are logged, so you can verify them in the application logs.

The Docker Compose setup also starts kafka-ui, a container that allows you to inspect Kafka state via a web interface. You can access it at localhost:8081, where you can view sent messages or send new ones manually.

Implementation Details

Docker Compose

As shown by the following configuration for kafka0:

ports:
  - "9092:9092"

Kafka can be accessed at localhost:9092. Additionally, kafka-ui is exposed at localhost:8081.

Java

Sending Messages

In the official Spring documentation’s Quick Start example, messages are sent from an ApplicationRunner and are published only once at application startup. For testing purposes, this approach is inconvenient, so I added a controller and modified the implementation to allow messages to be sent via API requests instead.

Receiving Messages

In the official Quick Start consumer example, the listener method is defined as follows:

@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
    System.out.println(in);
}

I changed this to:

@KafkaListener(id = "myId", topics = "topic1")
public void listen(ConsumerRecord<String, String> record) {
    log.info(record.toString());
}

When the argument type is String, only the message payload is available inside the method. By using ConsumerRecord<String, String>, you can also access metadata such as the partition, which is particularly useful for verification and debugging.