docker에서 Kafka + zookeeper 클러스터 구성하기
web/kafka

docker에서 Kafka + zookeeper 클러스터 구성하기

반응형

설명

주키퍼와 브로커가 한 서버에 위치하는 구조의 클러스터를 구성해보자.


 

Docker

#network

- docker container가 서로 통신할 수 있도록 bridge driver형태의 네크워크 생성

sudo docker network create mynetwork

 

#centos docker container 생성

- 일반 centos 이미지를 사용할 경우 systemd command가 정상동작하지 않기 때문에 centos/systemd 이미지를 사용할 것

docker run --privileged -d --name centos1 --network mynetwork -P centos/systemd init
docker run --privileged -d --name centos2 --network mynetwork -P centos/systemd init
docker run --privileged -d --name centos3 --network mynetwork -P centos/systemd init

 

#docker network 구성 확인

- 생성한 bridge network에 컨테이너가 묶였는지 확인

docker network inspect mynetwork

[
    {
        "Name": "mynetwork",
        "Id": "5bc456d8ce383f11c2f00a0d740be674fb75547819777a71eb792129c00e7987",
        "Created": "2021-02-21T23:29:05.3471901Z",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": {},
            "Config": [
                {
                    "Subnet": "172.18.0.0/16",
                    "Gateway": "172.18.0.1"
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Ingress": false,
        "ConfigFrom": {
            "Network": ""
        },
        "ConfigOnly": false,
        "Containers": {
            "81dcda36bb45cc42e3893c0e7fa13749d851fcb577a952850df63ea59e229055": {
                "Name": "centos2",
                "EndpointID": "18c64f01f734324d1acb6bf874cdddd4a65cb9185a48af2b4beaa644e37f25ad",
                "MacAddress": "02:42:ac:12:00:05",
                "IPv4Address": "172.18.0.5/16",
                "IPv6Address": ""
            },
            "e492ba70191d3dcc7d3e34ef8fb2095aaa24244fc29feef3079f77f97f893f8e": {
                "Name": "centos3",
                "EndpointID": "effe7141d57c45d446bd3e1a3ba94e972eddef20631c025ffc96bc6817048f23",
                "MacAddress": "02:42:ac:12:00:02",
                "IPv4Address": "172.18.0.2/16",
                "IPv6Address": ""
            },
            "f6ed7547e36fdce0b996f670d3e48d24b297cb594062a9ba9a90ad13770fe700": {
                "Name": "centos1",
                "EndpointID": "2dc785e43ca1f2e22941e3631877c0f579af42906a637f2f553e2d4cb991e0bf",
                "MacAddress": "02:42:ac:12:00:04",
                "IPv4Address": "172.18.0.4/16",
                "IPv6Address": ""
            }
        },
        "Options": {},
        "Labels": {}
    }
]

 

#container간 통신 체크

docker exec centos1 ping centos2


 

설치

# 필요 라이브러리 설치

yum -y install net-tools
yum -y install sudo
yum -y install telnet

 

# open jdk 설치

yum -y install java-1.8.0-openjdk
yum -y install java-1.8.0-openjdk-devel

 

# jdk path 설정

JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.282.b08-1.el7_9.x86_64
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar

export JAVA_HOME PATH CLASSPATH

 

# jdk path 반영

source /etc/profile

 

# confluent platform 등록

sudo rpm --import https://packages.confluent.io/rpm/5.0/archive.key

vi /etc/yum.repos.d/confluent.repo
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.0/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.0/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.0
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.0/archive.key
enabled=1

 

# 캐시 삭제

yum clean all

 

# kafka 설치

sudo yum -y install confluent-platform-oss-2.11

 

# kafka 디렉토리 설정

vi /etc/kafka/server.properties
...
log.dirs=/var/lib/kafka/data
...

sudo mkdir /var/lib/kafka/data
sudo chown cp-kafka:confluent /var/lib/kafka/data

 

# zookeeper 설정

vi /etc/kafka/zookeeper.properties

initLimit=10
syncLimit=5

server.1=centos1:2888:3888
server.2=centos2:2888:3888
server.3=centos3:2888:3888

 

# myid 설정 (각 서버별 자신의 myid에 맞는 것 실행)

echo <myid> | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid

ex)
echo 1 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
echo 2 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
echo 3 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid

 

# 브로커 설정

vi /etc/kafka/server.properties

broker.id=<id> (ex. 1)
broker.id.generation.enable=false
...
zookeeper.connect=centos1:2181,centos2:2181,centos3:2181

 

 

실행

# zookeeper 실행 & 로그 주소

sudo systemctl start confluent-zookeeper
/var/log/kafka/zookeeper.out

 

# 카프카 실행 & 로그 주소

sudo systemctl start confluent-kafka
/var/log/kafka/server.out

 

# 연결 성공 카프카 로그

[2021-02-22 02:09:43,302] INFO ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [PLAINTEXT://81dcda36bb45:9092, PLAINTEXT://e492ba70191d:9092]
        buffer.memory = 33554432
        client.id =
        compression.type = none
        confluent.batch.expiry.ms = 30000
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 10000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig)
[2021-02-22 02:09:43,352] INFO Kafka version : 2.0.1-cp8 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-22 02:09:43,352] INFO Kafka commitId : c34a431c349189df (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-22 02:09:43,387] INFO Cluster ID: gGrIL7_TQIe_NoaHzLtV9w (org.apache.kafka.clients.Metadata)
[2021-02-22 02:09:43,456] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-02-22 02:09:43,465] INFO Successfully submitted metrics to Kafka topic __confluent.support.metrics (io.confluent.support.metrics.submitters.KafkaSubmitter)
[2021-02-22 02:09:45,678] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter)

 

 

테스트 

# 토픽 생성

- Leader : Leader Replica가 어떤 브로커에 있는지 표시 (브로커 아이디가 표시됨, server.properties에 broker.id=에 기재된 데이터)

- Replicas : 각 파티션의 레플리카를 보유하고 있는 브로커의 목록

- ISR : In-Sync Replicas의 약자로 복제본 중 Leader Replica와 올바르게 동기가 실행된 복제본을 보유하고 있는 브로커의 목록이 표시 됨

// 토픽 생성
kafka-topics --zookeeper centos1:2181,centos2:2181,centos3:2181 --create --topic wedul --partitions 3 --replication-factor 3

// 토픽 조회
kafka-topics --zookeeper centos1:2181,centos2:2181,centos3:2181 --create --topic wedul --partitions 3 --replication-factor 3

 

# 메시지 송수신 테스트

1. producer에 메시지 송신

--broker-list : 메시지를 보내는 카프카 클러스터의 브로커 호스트명과 포트번호를 지정 (호스트명: 포트), 기본포트는 9092

kafka-topics --zookeeper centos1:2181,centos2:2181,centos3:2181 --create --topic wedul --partitions 3 --replication-factor 3

 

2. consumer 메시지 수신

--bootstrap-server : 메시지를 수신하는 카프카 클러스터의 브로커 호스트명과 포트

kafka-console-consumer --bootstrap-server centos1:9092,centos2:9092,centos3:9092 --topic wedul

 

프로듀서에 메시지 보내기
consumer에 메시지 수신


 

주키퍼, 카프카 정지

sudo systemctl stop confluent-kafka
sudo systemctl stop confluent-zookeeper
반응형