이 글의 코드는 해당 링크에서 확인할 수 있습니다.

목표

  • Spring Boot 에서 Apache Kafka 사용 방법

  • 자주 사용하는 설정 정리

테스트 개발 환경

  • Spring Boot 3.1.0

  • Java 17

  • Spring Kafka 3.0.7

    • kafka-client 3.4.0
  • Gradle

  • Docker

Broker

테스트 개발 환경에서는 카프카를 로컬에서 Docker 파일로 띄우고, Spring 서버에서 이를 연결한다.

로컬에서 Docker로 Apache Kafka 실행하기

kafka docker image 비교

  • bitnami

  • confluentinc

  • wurstmeister

링크다운로드 수star 수특징
confluentincKafka / Zookeeper100M+394confluent 에서 제공하는 기능이 포함
bitnamiKafka / Zookeeper100M+669순수 카프카 이미지
wurstmeisterKafka / Zookeeper100M+1.6K순수 카프카 이미지

confluetntinc 이미지는 컨플루언트에서 제공하는 기능을 사용할 수 있는 이미지이다. 나머지 bitnami와 wurstmeister 이미지는 둘 다 순수하게 Apache Kafka 이미지를 담은 것으로 둘 중 아무거나 사용해도 될 듯하다.

카프카는 카프카의 메타 데이터를 관리해주는 zookeeper와 함께 동작한다. 따라서 카프카 브로커와 같이 실행을 해주어야 하는데, 둘 다 docker를 따로 실행해주기보다는 docker-compose 파일을 만들어서 같이 편리하게 실행해주는 설정파일을 만들었다.

Kafka docker 이미지마다 환경 설정 중 필수요소가 다르다. 예를 들어, wurstmeister 이미지는 KAFKA_LISTENERS 설정이 필수로 필요하다. 이는 문서를 참고하거나 실행해보고 docker container의 로그를 살펴보는 것도 방법이다.

아래는 confluentinc와 wurstmeister 이미지를 사용한 docker-compose 파일이다.

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    # Mac M1 chip 사용 시 아래 platform 추가
    platform: linux/amd64

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    # Mac M1 chip 사용 시 아래 platform 추가
    platform: linux/amd64

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
    depends_on:
      - zookeeper
# docker-compose 실행 
docker-compose up -d

# docker container 확인 
docker ps

#  docker-compose 종료
docker-compose down

참고자료

Spring Boot에서 Kafka Broker 연결하기

Spring Boot 환경에서는 application.yml 파일을 통해 최대한 자동 설정을 하는 것이 좋다.

Spring Boot를 사용하는 큰 이유이기도 하며, 보통 개발자들은 Spring Boot 프로젝트 설정을은 application.yml (or application.properties)에 선언한다고 보통 생각하므로 여기서 찾는다.

spring:
  kafa:
    bootstrap-servers:
      - localhost:9092

broker 설정만 가지고는 스프링을 실행해도 로그에는 딱히 카프카가 연동되었다거나 하는 정보를 찾아볼 수 없다. Spring kafka는 기본적으로 카프카에 대한 요청이 있을 때, 카프카를 연동한다. 따라서 컨슈머를 설정하면, 컨슈머는 바로 실행이 필요하므로 스프링을 띄울 때 연동되는 것을 확인할 수 있고, 프로듀서는 프로듀서가 실행될 때, 카프카 연동을 하는 것을 볼 수 있다.

간단한 Producer & Consumer 예제

실제로 연동한 Kafka broker가 정상적으로 동작하는지 간단한 프로듀서와 컨슈머 예제를 살펴보자. 토픽에 하나에 문자열 데이터를 전송 및 소비하는 예제이고, 프로듀서의 트리거는 HTTP API이다.

먼저, Producer 예제이다.

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

application.yml에 프로듀서 관련 설정이다. 이 예제에서는 간단히 메시지에 대한 직렬화 설정만 했다. 위는 String으로 직렬화를 한다는 설정이고, 사실 이는 기본 설정이라 설정을 해주지 않아도 문제는 없다.

@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
        log.info("Message sent: {}", message);
    }
}

실제로 카프카에 메시지를 보내는 코드이다. KafkaTemplate 을 사용해서 key, value 모두 String 문자열로 보낸다.

@RequiredArgsConstructor
@RestController
@RequestMapping("messages")
public class MessageController {

    private final KafkaProducer kafkaProducer;

    @PostMapping
    public void sendMessage(@RequestBody String message) {
        kafkaProducer.sendMessage("my-topic", message);
    }
}

위는 HTTP API를 호출했을 때, 카프카에 메시지를 보내는 코드이다. POST 요청으로 request body에 선언된 문자열을 카프카 메시지로 보낸다.

다음은 Consumer 예제이다.

위 Producer 예제에서 카프카 브로커에 문자열 데이터를 보냈고, 이를 소비해서 사용하는 컨슈머 예제이다.

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
    listener:
      type: batch
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

application.yml 설정부터 살펴보자. consumer 외에도 listener 라는 설정이 있는데, Spring Kafka는 카프카 리스너를 통해 카프카 브로커로부터 데이터를 읽은 역할을 하고, 컨슈머는 카프카 브로커 내의 토픽에서 메시지를 소비하는 역할을 한다. 정리하면 리스터는 데이터를 가져오고, 컨슈머는 가져온 데이터를 사용한다.

위 설정은 리스너는 카브카 브로커로부터 batch 타입으로 가져오는데, 이는 단일이 아닌 여러 개의 데이터를 가져오겠다는 의미이다. 컨슈머는 단순히 프로듀서와 같이 역직렬화에 대한 설정이고, 기본 설정과 같다.

Producer, Listener, Consumer에 대한 자세한 설정은 아래에서 살펴볼 예정이다.

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        log.info("Received message: {}", message);
        // 메시지 처리 로직을 작성합니다.
    }
}

@KafkaListener 애노테이션으로도 컨슈머에 대한 설정을 할 수 있다. 위는 토픽과 컨슈머 그룹 ID를 애노테이션에 설정한 모습이다. 이러한 애노테이션 설정으로 하나의 서버에서 여러 개의 서로 다른 토픽 및 컨슈머 그룹 ID를 설정해서 사용할 수 있다.

이제 설정한 프로듀서와 컨슈머 테스트를 해보자.

먼저, 서버를 실행해보자.

2023-05-29T09:20:57.012+09:00  INFO 10791 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : my-group-id: partitions assigned: [my-topic-0]

서버를 실행하면, 카프카 컨슈머에 대한 설정을 보여주고 위 로그처럼 @KafkaListener 애노테이션으로 설정한 토픽과 컨슈머 그룹 ID가 정상적으로 할당되었다는 것을 알 수 있다.

이제 Producer를 동작시키는 컨트롤러에 HTTP API 요청을 보내서 메시지를 카프카에 produe하고, 이를 consume하는 것을 해보자.

IntelliJ IDEA를 사용하면, IDE 내에서 .http 파일에서 HTTP 요청을 보낼 수 있다.

POST http://localhost:8090/messages
Content-Type: application/json

"Hello Kafka!"

위를 실행해보면, 스프링 서버에서 카프카 프로듀서가 연결되어 컨슈머와 같이 설정들을 보여줄 것이다.

2023-05-29T09:23:41.221+09:00  INFO 10791 --- [nio-8090-exec-1] m.p.s.KafkaProducer                      : Message sent: "Hello Kafka!"
2023-05-29T09:23:41.234+09:00  INFO 10791 --- [ntainer#0-0-C-1] m.p.s.KafkaConsumer                      : Received message: "Hello Kafka!"

그리고 위 로그와 같이 프로듀서와 컨슈머에 각각 추가해둔 로그가 정상적으로 실행된 것을 볼 수 있다.

모니터링 (w. 오픈소스)

카프카는 여러 오픈소스 모니터링이 존재한다. (카프카 모니터링 도구 비교)

Kafka-ui

여기서는 Kafka-ui를 사용해볼 것이다. 이는 현재도 활발히 개발중인 것으로 보인다.

사용방법은 docker를 사용할 것이고, 기존 docker-compose 파일에 추가해보자.

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    # Mac M1 chip 사용 시 아래 platform 추가
    platform: linux/amd64

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENERS: INSIDE://:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
    depends_on:
      - zookeeper

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    depends_on:
      - kafka
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: wizard_local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
  • kafka를 외부와 내부 리스너를 나누었다.

  • kafka-ui는 내부 카프카 리스너와 통신을 하고, Spring Boot 서버는 외부 카프카 리스너와 통신한다.

위로 변경 후, 다시 docker를 실행하고, http://localhost:8080 에 접속해보자.

kafkaui1 메인 화면을 보면, cluster-name에 설정한 ‘wizard_local’을 볼 수 있다. 정상적으로 카프카와 연결된 것을 알 수 있다.

다시 한 번 예제에서 “Hello Kafka” 메시지를 만들어보자. (재실행했기 때문에 이전에 테스트했던 데이터는 초기화된 상태이다.)

그러면 다음과 같이 토픽 내에 메시지가 정상적으로 쌓인 것을 볼 수 있다.

kafkaui2

Redpanda

추가 예정

Producer 설정

추가 예정

Consumer 설정

추가 예정