Kafka CLI

Kafka CLI 실습 - 토픽 실습

topic 생성

single partition 토픽 생성

topic 을 생성해봅니다. docker-compose 내의 kafka 라는 이름의 service 에 접속해서 topic1 을 생성하는 CLI 입니다.

$ docker-compose exec kafka kafka-topics --create --topic topic1 --bootstrap-server kafka:29092
Created topic topic1.

partition 5 개 갖는 토픽 생성

partition 을 5개 갖는 토픽을 생성합니다.

$ docker-compose exec kafka kafka-topics --create --topic topic2 --bootstrap-server kafka:9092 --partitions 5
Created topic topic2.

partition 5개, replication factor 1 인 토픽 생성

이번에는 partition 의 갯수는 5개이면서 각각의 파티션이 1개씩 복제 되도록 replication factor 를 1 로 지정하는 토픽을 생성합니다.

$ docker-compose exec kafka kafka-topics --create --topic topic3 --bootstrap-server kafka:9092 --partitions 5 --replication-factor 1
Created topic topic3.

토픽들을 출력(list)

$ docker-compose exec kafka kafka-topics --list --bootstrap-server kafka:9092
topic1
topic2
topic3

토픽 describe

리눅스의 ls -al 명령처럼 현재 토픽들의 상세 상황을 살펴봅니다.

$ docker-compose exec kafka kafka-topics --describe --topic topic2 --bootstrap-server kafka:9092
Topic: topic2   TopicId: 8tbqDaMzSmWAvuHvwOGP5A PartitionCount: 5       ReplicationFactor: 1    Configs:
        Topic: topic2   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic2   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic2   Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic2   Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic2   Partition: 4    Leader: 1       Replicas: 1     Isr: 1

토픽 삭제

토픽 삭제는 아래와 같이 해줄수 있습니다.

$ docker-compose exec kafka kafka-topics --delete --topic topic1 --bootstrap-server kafka:9092

카프카 프로듀서 CLI

카프카 프로듀서 CLI 는 docker-compose 안으로 직접 진입해야만 접근이 가능합니다.

$ docker-compose exec kafka bash

CLI 에서 사용자의 입력을 받는 producer 를 console-producer 라고 하며, 카프카를 직접 zip 파일로 다운받아보면 그 곳에 producer, consumer 등의 쉘스크립트가 있는데, 이 것들을 보통 콘솔 프로듀서, 콘솔 컨슈머라고 부릅니다.

Console Producer 실습

토픽 topic1 로 프로듀서 기동합니다. docker를 새로 띄워서 topic 이 만들어져 있지 않은 상태라면 아래 프로듀서가 topic에 데이터를 보내면 토픽이 생성됩니다.

$ kafka-console-producer --topic topic1 --broker-list kafka:9092

이 상태에서 데이터를 전송하면, 토픽을 외부에서 만들지 않은채로 프로듀서로 데이터를 전송하면서 생성한 것에 대한 경고문구가 나타납니다. 에러는 아니기에 장애가 나거나 하는 것은 아니지만, 조심해서 사용해야합니다.

[2024-05-30 21:58:08,575] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

실무에서는 이렇게 쓰는 편은 아니고, 동적 생성을 막는 프로퍼티도 따로 있기 때문에 이런 프로퍼티들을 적용하기도 합니다.

위와 같이 명령할수도 있고 아래와 같이 브로커리스트, 프로듀서 프로퍼티를 추가해서 프로듀서를 기동하는 것 역시 가능합니다.

$ kafka-console-producer --topic
topic1 --broker-list kafka:9092 --producer-property acks=all

이번에는 key, value 로 데이터를 생산해봅니다. key,value 형태로 데이터를 보내기 위해서는 key.separator 라는 속성이 필요하며, 만약 key.separator: 으로 지정했을때 name:트럼프 라고 데이터를 전송하면 카프카는 key = name, value = 트럼프 와 같은 형식으로 데이터를 인식하며 생산합니다. 아래는 그 예제입니다.

$ kafka-console-producer --topic topic1 --broker-list kafka:9092 --property parse.key=true --property key.separator=:
>name:트럼프

카프카 컨슈머 CLI

단순 Consumer 예제

먼저 카프카 도커 컨테이너의 bash 로 진입합니다.

$ docker-compose exec kafka bash

데이터의 생산,소비를 위해 아래의 명령어를 입력합니다. 파티션을 3 으로 지정했고 토픽 명은 topic2 로 지정했습니다.

$ kafka-topics --bootstrap-server localhost:9092 --topic topic2 --create --partitions 3
Created topic topic2.

방금 생성한 토픽인 topic2 에 대해서 컨슈머를 기동합니다.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2
 

새로 터미널 창을 열어서 도커 컨테이너의 bash 로 진입합니다.

$ docker-compose exec kafka bash

아래와 같이 프로듀서를 기동하고 데이터 2건을 생산합니다.

$ kafka-console-producer --boorap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic topic2
 
>productId:1
>productId:2

컨슈머측을 열어둔 터미널을 확인해보면 아래와 같이 데이터가 수신되었음을 확인 가능합니다.

productId:1
productId:2

Ctrl + C 를 입력해서 컨슈머에서 빠져나온 후 다시 컨슈머를 실행해보면 아무 데이터도 안옵니다. 기본 수신 정책이 최근의 값만 받도록 하는 속성으로 설정되어 있기 때문입니다.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2

이번에는 콘솔 컨슈머를 닫았다가 다시 실행할때 데이터를 처음부터 소비해봅니다. --from-beginning 을 주면 처음부터 데이터를 읽어들이게 됩니다.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2 --from-beginning
productId:2
productId:1

아래는 key, value, timestamp 를 받는 예제입니다.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2 --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning

Consumer Group (1)

파티션 3개, 레플리케이션 1을 지정해서 토픽 topic3 을 생성합니다.

$ kafka-topics --create --topic t
opic3 --bootstrap-server kafka:9092 --partitions 3 --repl
ication-factor 1
Created topic topic3.

컨슈머를 기동합니다. 컨슈머 그룹명을 created-products 로 지정해서 기동했습니다.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic3 --group created-products

프로듀서를 띄웁니다. Partitioner 를 RoundRobin 으로 지정했고, topic 은 topic3 에 생산합니다.

$ kafka-console-producer --bootst
rap-server localhost:9092 --producer-property partitioner
.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic topic3

컨슈머를 3개 더 기동합니다. 파티션 갯수 보다 컨슈머가 더 많을 때 어떻게 되는지 직접 확인해보기 위해서입니다. 아래 명령어를 터미널 3개에 모두 컨슈머를 띄운후 실행합니다.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic3 --group created-products

프로듀서측 터미널에서 데이터를 전송해봅니다.

>productId:1
>productId:2
>productId:3
>productId:4

컨슈머 측에서는 아래와 같이 나타납니다.

# 첫번째로 띄운 컨슈머 
productId:2
 
# 두번째로 띄운 컨슈머
productId:1
productId:4
# 세번째로 띄운 컨슈머
# 네번째로 띄운 컨슈머
productId:3

세번째로 띄운 컨슈머는 데이터를 받지 못하는 상황이 발생했습니다.

Consumer Group (2)

도커 컨테이너에 bash 로 접속 후 아래의 명령어를 실행하면 컨슈머 그룹을 리스트해볼 수 있습니다.

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
created-products

describe

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group created-products
 
GROUP            TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
created-products topic3          1          2               2               0               console-consumer-0ca87f5b-45df-4a86-ae29-f5bc667f24b2 /172.19.0.3     console-consumer
created-products topic3          0          1               1               0
  console-consumer-0aa38bf9-3491-4ecc-b7dc-74fe243ea41e /172.19.0.3     console-consumer
created-products topic3          2          1               1   

LEO, LAG