Kafka CLI 실습 - 토픽 실습
topic 생성
single partition 토픽 생성
topic 을 생성해봅니다. docker-compose 내의 kafka 라는 이름의 service 에 접속해서 topic1
을 생성하는 CLI 입니다.
$ docker-compose exec kafka kafka-topics --create --topic topic1 --bootstrap-server kafka:29092
Created topic topic1.
partition 5 개 갖는 토픽 생성
partition 을 5개 갖는 토픽을 생성합니다.
$ docker-compose exec kafka kafka-topics --create --topic topic2 --bootstrap-server kafka:9092 --partitions 5
Created topic topic2.
partition 5개, replication factor 1 인 토픽 생성
이번에는 partition 의 갯수는 5개이면서 각각의 파티션이 1개씩 복제 되도록 replication factor 를 1 로 지정하는 토픽을 생성합니다.
$ docker-compose exec kafka kafka-topics --create --topic topic3 --bootstrap-server kafka:9092 --partitions 5 --replication-factor 1
Created topic topic3.
토픽들을 출력(list)
$ docker-compose exec kafka kafka-topics --list --bootstrap-server kafka:9092
topic1
topic2
topic3
토픽 describe
리눅스의 ls -al 명령처럼 현재 토픽들의 상세 상황을 살펴봅니다.
$ docker-compose exec kafka kafka-topics --describe --topic topic2 --bootstrap-server kafka:9092
Topic: topic2 TopicId: 8tbqDaMzSmWAvuHvwOGP5A PartitionCount: 5 ReplicationFactor: 1 Configs:
Topic: topic2 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: topic2 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: topic2 Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: topic2 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
토픽 삭제
토픽 삭제는 아래와 같이 해줄수 있습니다.
$ docker-compose exec kafka kafka-topics --delete --topic topic1 --bootstrap-server kafka:9092
카프카 프로듀서 CLI
카프카 프로듀서 CLI 는 docker-compose 안으로 직접 진입해야만 접근이 가능합니다.
$ docker-compose exec kafka bash
CLI 에서 사용자의 입력을 받는 producer 를 console-producer 라고 하며, 카프카를 직접 zip 파일로 다운받아보면 그 곳에 producer, consumer 등의 쉘스크립트가 있는데, 이 것들을 보통 콘솔 프로듀서, 콘솔 컨슈머라고 부릅니다.
Console Producer 실습
토픽 topic1
로 프로듀서 기동합니다. docker를 새로 띄워서 topic 이 만들어져 있지 않은 상태라면 아래 프로듀서가 topic에 데이터를 보내면 토픽이 생성됩니다.
$ kafka-console-producer --topic topic1 --broker-list kafka:9092
이 상태에서 데이터를 전송하면, 토픽을 외부에서 만들지 않은채로 프로듀서로 데이터를 전송하면서 생성한 것에 대한 경고문구가 나타납니다. 에러는 아니기에 장애가 나거나 하는 것은 아니지만, 조심해서 사용해야합니다.
[2024-05-30 21:58:08,575] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
실무에서는 이렇게 쓰는 편은 아니고, 동적 생성을 막는 프로퍼티도 따로 있기 때문에 이런 프로퍼티들을 적용하기도 합니다.
위와 같이 명령할수도 있고 아래와 같이 브로커리스트, 프로듀서 프로퍼티를 추가해서 프로듀서를 기동하는 것 역시 가능합니다.
$ kafka-console-producer --topic
topic1 --broker-list kafka:9092 --producer-property acks=all
이번에는 key, value 로 데이터를 생산해봅니다. key,value 형태로 데이터를 보내기 위해서는 key.separator
라는 속성이 필요하며, 만약 key.separator
를 :
으로 지정했을때 name:트럼프
라고 데이터를 전송하면 카프카는 key = name, value = 트럼프 와 같은 형식으로 데이터를 인식하며 생산합니다. 아래는 그 예제입니다.
$ kafka-console-producer --topic topic1 --broker-list kafka:9092 --property parse.key=true --property key.separator=:
>name:트럼프
카프카 컨슈머 CLI
단순 Consumer 예제
먼저 카프카 도커 컨테이너의 bash 로 진입합니다.
$ docker-compose exec kafka bash
데이터의 생산,소비를 위해 아래의 명령어를 입력합니다. 파티션을 3 으로 지정했고 토픽 명은 topic2
로 지정했습니다.
$ kafka-topics --bootstrap-server localhost:9092 --topic topic2 --create --partitions 3
Created topic topic2.
방금 생성한 토픽인 topic2
에 대해서 컨슈머를 기동합니다.
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2
새로 터미널 창을 열어서 도커 컨테이너의 bash 로 진입합니다.
$ docker-compose exec kafka bash
아래와 같이 프로듀서를 기동하고 데이터 2건을 생산합니다.
$ kafka-console-producer --boorap-server localhost:9092 --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic topic2
>productId:1
>productId:2
컨슈머측을 열어둔 터미널을 확인해보면 아래와 같이 데이터가 수신되었음을 확인 가능합니다.
productId:1
productId:2
Ctrl + C 를 입력해서 컨슈머에서 빠져나온 후 다시 컨슈머를 실행해보면 아무 데이터도 안옵니다. 기본 수신 정책이 최근의 값만 받도록 하는 속성으로 설정되어 있기 때문입니다.
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2
이번에는 콘솔 컨슈머를 닫았다가 다시 실행할때 데이터를 처음부터 소비해봅니다. --from-beginning
을 주면 처음부터 데이터를 읽어들이게 됩니다.
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2 --from-beginning
productId:2
productId:1
아래는 key, value, timestamp 를 받는 예제입니다.
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic2 --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --property print.partition=true --from-beginning
Consumer Group (1)
파티션 3개, 레플리케이션 1을 지정해서 토픽 topic3
을 생성합니다.
$ kafka-topics --create --topic t
opic3 --bootstrap-server kafka:9092 --partitions 3 --repl
ication-factor 1
Created topic topic3.
컨슈머를 기동합니다. 컨슈머 그룹명을 created-products
로 지정해서 기동했습니다.
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic3 --group created-products
프로듀서를 띄웁니다. Partitioner 를 RoundRobin 으로 지정했고, topic 은 topic3 에 생산합니다.
$ kafka-console-producer --bootst
rap-server localhost:9092 --producer-property partitioner
.class=org.apache.kafka.clients.producer.RoundRobinPartitioner --topic topic3
컨슈머를 3개 더 기동합니다. 파티션 갯수 보다 컨슈머가 더 많을 때 어떻게 되는지 직접 확인해보기 위해서입니다. 아래 명령어를 터미널 3개에 모두 컨슈머를 띄운후 실행합니다.
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic topic3 --group created-products
프로듀서측 터미널에서 데이터를 전송해봅니다.
>productId:1
>productId:2
>productId:3
>productId:4
컨슈머 측에서는 아래와 같이 나타납니다.
# 첫번째로 띄운 컨슈머
productId:2
# 두번째로 띄운 컨슈머
productId:1
productId:4
# 세번째로 띄운 컨슈머
# 네번째로 띄운 컨슈머
productId:3
세번째로 띄운 컨슈머는 데이터를 받지 못하는 상황이 발생했습니다.
Consumer Group (2)
도커 컨테이너에 bash 로 접속 후 아래의 명령어를 실행하면 컨슈머 그룹을 리스트해볼 수 있습니다.
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
created-products
describe
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group created-products
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
created-products topic3 1 2 2 0 console-consumer-0ca87f5b-45df-4a86-ae29-f5bc667f24b2 /172.19.0.3 console-consumer
created-products topic3 0 1 1 0
console-consumer-0aa38bf9-3491-4ecc-b7dc-74fe243ea41e /172.19.0.3 console-consumer
created-products topic3 2 1 1
LEO, LAG