Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka 사내 스터디 #45

Open
boojongmin opened this issue Nov 28, 2021 · 0 comments
Open

kafka 사내 스터디 #45

boojongmin opened this issue Nov 28, 2021 · 0 comments

Comments

@boojongmin
Copy link
Owner

boojongmin commented Nov 28, 2021

카프카1 주키퍼 1 실행

git clone https://github.com/onlybooks/kafka2

cd kafka2/appendix_C/single_zk_kafka/
version: "3.5"
services:
  zk:
    image: confluentinc/cp-zookeeper:5.5.1
    restart: always
    hostname: zk
    container_name: zk
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_SERVER_ID=1
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zk:2888:3888

  kafka:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "9999:9999"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zk:2181
      KAFKA_LISTENERS: INTERNAL://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9999

  kafka_manager:
    image: hlebalbau/kafka-manager:stable
    container_name: cmak
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zk:2181"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null
docker-compose up -d
docker-compose ps
docker-compose logs

모니터링 툴

image

image

콘솔을 이용한 producer, consumer 예제

# windows
docker exec -it kafka //bin//bash
# mac,linux
docker exec -it kafka /bin/bash

which kafka-topics
# /usr/bin/kafka-topics
 kafka-topics --bootstrap-server kafka:9092 --create --topic hello-topic --partitions 1 --replication-factor 1

참고

root@kafka:/usr/bin# ls *kafka*
kafka-acls                 kafka-consumer-perf-test  kafka-mirror-maker                kafka-server-start
kafka-broker-api-versions  kafka-delegation-tokens   kafka-preferred-replica-election  kafka-server-stop
kafka-configs              kafka-delete-records      kafka-producer-perf-test          kafka-streams-application-reset
kafka-console-consumer     kafka-dump-log            kafka-reassign-partitions         kafka-topics
kafka-console-producer     kafka-leader-election     kafka-replica-verification        kafka-verifiable-consumer
kafka-consumer-groups      kafka-log-dirs            kafka-run-class                   kafka-verifiable-producer

새로운 터미널에서 실행

docker exec -it kafka /bin/bash
kafka-console-consumer --bootstrap-server kafka:9092 --topic hello-topic

이전 터미널에서 실행

kafka-console-producer --help
kafka-console-producer --bootstrap-server kafka:9092 --topic hello-topic

파티션 증가

kafka-topics --bootstrap-server kafka:9092 --alter --topic hello-topic --partitions 2

kafka-console-producer --bootstrap-server kafka:9092 --topic hello-topic --partitions 1

partiton을 줄이면 에러(무조건 증가만)

root@kafka:/# kafka-topics --bootstrap-server kafka:9092 --alter --topic hello-topic --partitions 1

Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.
[2021-11-28 13:50:41,830] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.alterTopic(TopicCommand.scala:270)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:64)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.

카프카 cluster(주키퍼 3, 카프카 3)

cd ../cluster_zk_kafka/
docker-compose up -d

$ docker-compose ps
 Name               Command               State                            Ports
----------------------------------------------------------------------------------------------------------
cmak     /kafka-manager/bin/cmak -D ...   Up      0.0.0.0:9000->9000/tcp
kafka1   /etc/confluent/docker/run        Up      0.0.0.0:9091->9091/tcp, 9092/tcp, 0.0.0.0:9991->9991/tcp
kafka2   /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp, 0.0.0.0:9992->9992/tcp
kafka3   /etc/confluent/docker/run        Up      9092/tcp, 0.0.0.0:9093->9093/tcp, 0.0.0.0:9993->9993/tcp
zk1      /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
zk2      /etc/confluent/docker/run        Up      2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp, 3888/tcp
zk3      /etc/confluent/docker/run        Up      2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp, 3888/tcp
version: "3.5"
services:
  zk1:
    image: confluentinc/cp-zookeeper:5.5.1
    restart: always
    hostname: zk1
    container_name: zk1
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_SERVER_ID=1
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zk1:2888:3888;zk2:2888:3888;zk3:2888:3888
  zk2:
    image: confluentinc/cp-zookeeper:5.5.1
    restart: always
    hostname: zk2
    container_name: zk2
    ports:
      - "2182:2182"
    environment:
      - ZOOKEEPER_SERVER_ID=2
      - ZOOKEEPER_CLIENT_PORT=2182
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zk1:2888:3888;zk2:2888:3888;zk3:2888:3888
  zk3:
    image: confluentinc/cp-zookeeper:5.5.1
    restart: always
    hostname: zk3
    container_name: zk3
    ports:
      - "2183:2183"
    environment:
      - ZOOKEEPER_SERVER_ID=3
      - ZOOKEEPER_CLIENT_PORT=2183
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SERVERS=zk1:2888:3888;zk2:2888:3888;zk3:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9091:9091"
      - "9991:9991"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2182,zk3:2183
      KAFKA_LISTENERS: INTERNAL://kafka1:9091
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9991
  kafka2:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9092:9092"
      - "9992:9992"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2182,zk3:2183
      KAFKA_LISTENERS: INTERNAL://kafka2:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9992
  kafka3:
    image: confluentinc/cp-kafka:5.5.1
    restart: always
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9093:9093"
      - "9993:9993"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2182,zk3:2183
      KAFKA_LISTENERS: INTERNAL://kafka3:9093
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_JMX_PORT: 9993

  kafka_manager:
    image: hlebalbau/kafka-manager:stable
    container_name: cmak
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zk1:2181,zk2:2182,zk3:2183"
      APPLICATION_SECRET: "random-secret"
    command: -Dpidfile.path=/dev/null

replication-factor를 broker 갯수보다 많을 때 에러

root@kafka1:/# kafka-topics --bootstrap-server kafka1:9091 --create --topic hello-topic --partitions 1 --replication-factor 4
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
[2021-11-28 14:02:41,338] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:244)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:196)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:191)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:219)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)

참고 zoopkeeper

https://zookeeper.apache.org/doc/r3.4.6/zookeeperOver.html

https://zookeeper.apache.org/doc/r3.4.6/zookeeperProgrammers.html

image

세그먼트 log 확인

kafka-topics --bootstrap-server kafka1:9091 --create --topic hello-topic --partitions 2 --replication-factor 2
kafka-console-producer --bootstrap-server kafka1:9091 --topic hello-topic
/var/log/kafka# find / -name "*.log"

apt update
  apt install hexedit
hexedit /var/lib/kafka/data/hello-topic-1/00000000000000000000.log
  • kafka1
    image
docker exec -it kafka2 //bin//bash
  • kafka2
    image
docker exec -it kafka3 //bin//bash
  • kafka3
    image

producer(스터디 2회차에 진행 예정)

https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html

@boojongmin boojongmin changed the title kafka study kafka 사내 스터디 Dec 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant