설명
주키퍼와 브로커가 한 서버에 위치하는 구조의 클러스터를 구성해보자.
Docker
#network
- docker container가 서로 통신할 수 있도록 bridge driver형태의 네크워크 생성
sudo docker network create mynetwork
#centos docker container 생성
- 일반 centos 이미지를 사용할 경우 systemd command가 정상동작하지 않기 때문에 centos/systemd 이미지를 사용할 것
docker run --privileged -d --name centos1 --network mynetwork -P centos/systemd init
docker run --privileged -d --name centos2 --network mynetwork -P centos/systemd init
docker run --privileged -d --name centos3 --network mynetwork -P centos/systemd init
#docker network 구성 확인
- 생성한 bridge network에 컨테이너가 묶였는지 확인
docker network inspect mynetwork
[
{
"Name": "mynetwork",
"Id": "5bc456d8ce383f11c2f00a0d740be674fb75547819777a71eb792129c00e7987",
"Created": "2021-02-21T23:29:05.3471901Z",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": {},
"Config": [
{
"Subnet": "172.18.0.0/16",
"Gateway": "172.18.0.1"
}
]
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {
"81dcda36bb45cc42e3893c0e7fa13749d851fcb577a952850df63ea59e229055": {
"Name": "centos2",
"EndpointID": "18c64f01f734324d1acb6bf874cdddd4a65cb9185a48af2b4beaa644e37f25ad",
"MacAddress": "02:42:ac:12:00:05",
"IPv4Address": "172.18.0.5/16",
"IPv6Address": ""
},
"e492ba70191d3dcc7d3e34ef8fb2095aaa24244fc29feef3079f77f97f893f8e": {
"Name": "centos3",
"EndpointID": "effe7141d57c45d446bd3e1a3ba94e972eddef20631c025ffc96bc6817048f23",
"MacAddress": "02:42:ac:12:00:02",
"IPv4Address": "172.18.0.2/16",
"IPv6Address": ""
},
"f6ed7547e36fdce0b996f670d3e48d24b297cb594062a9ba9a90ad13770fe700": {
"Name": "centos1",
"EndpointID": "2dc785e43ca1f2e22941e3631877c0f579af42906a637f2f553e2d4cb991e0bf",
"MacAddress": "02:42:ac:12:00:04",
"IPv4Address": "172.18.0.4/16",
"IPv6Address": ""
}
},
"Options": {},
"Labels": {}
}
]
#container간 통신 체크
docker exec centos1 ping centos2
설치
# 필요 라이브러리 설치
yum -y install net-tools
yum -y install sudo
yum -y install telnet
# open jdk 설치
yum -y install java-1.8.0-openjdk
yum -y install java-1.8.0-openjdk-devel
# jdk path 설정
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.282.b08-1.el7_9.x86_64
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
# jdk path 반영
source /etc/profile
# confluent platform 등록
sudo rpm --import https://packages.confluent.io/rpm/5.0/archive.key
vi /etc/yum.repos.d/confluent.repo
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.0/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.0/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.0
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.0/archive.key
enabled=1
# 캐시 삭제
yum clean all
# kafka 설치
sudo yum -y install confluent-platform-oss-2.11
# kafka 디렉토리 설정
vi /etc/kafka/server.properties
...
log.dirs=/var/lib/kafka/data
...
sudo mkdir /var/lib/kafka/data
sudo chown cp-kafka:confluent /var/lib/kafka/data
# zookeeper 설정
vi /etc/kafka/zookeeper.properties
initLimit=10
syncLimit=5
server.1=centos1:2888:3888
server.2=centos2:2888:3888
server.3=centos3:2888:3888
# myid 설정 (각 서버별 자신의 myid에 맞는 것 실행)
echo <myid> | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
ex)
echo 1 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
echo 2 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
echo 3 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
# 브로커 설정
vi /etc/kafka/server.properties
broker.id=<id> (ex. 1)
broker.id.generation.enable=false
...
zookeeper.connect=centos1:2181,centos2:2181,centos3:2181
실행
# zookeeper 실행 & 로그 주소
sudo systemctl start confluent-zookeeper
/var/log/kafka/zookeeper.out
# 카프카 실행 & 로그 주소
sudo systemctl start confluent-kafka
/var/log/kafka/server.out
# 연결 성공 카프카 로그
[2021-02-22 02:09:43,302] INFO ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [PLAINTEXT://81dcda36bb45:9092, PLAINTEXT://e492ba70191d:9092]
buffer.memory = 33554432
client.id =
compression.type = none
confluent.batch.expiry.ms = 30000
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 10000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig)
[2021-02-22 02:09:43,352] INFO Kafka version : 2.0.1-cp8 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-22 02:09:43,352] INFO Kafka commitId : c34a431c349189df (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-22 02:09:43,387] INFO Cluster ID: gGrIL7_TQIe_NoaHzLtV9w (org.apache.kafka.clients.Metadata)
[2021-02-22 02:09:43,456] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-02-22 02:09:43,465] INFO Successfully submitted metrics to Kafka topic __confluent.support.metrics (io.confluent.support.metrics.submitters.KafkaSubmitter)
[2021-02-22 02:09:45,678] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter)
테스트
# 토픽 생성
- Leader : Leader Replica가 어떤 브로커에 있는지 표시 (브로커 아이디가 표시됨, server.properties에 broker.id=에 기재된 데이터)
- Replicas : 각 파티션의 레플리카를 보유하고 있는 브로커의 목록
- ISR : In-Sync Replicas의 약자로 복제본 중 Leader Replica와 올바르게 동기가 실행된 복제본을 보유하고 있는 브로커의 목록이 표시 됨
// 토픽 생성
kafka-topics --zookeeper centos1:2181,centos2:2181,centos3:2181 --create --topic wedul --partitions 3 --replication-factor 3
// 토픽 조회
kafka-topics --zookeeper centos1:2181,centos2:2181,centos3:2181 --create --topic wedul --partitions 3 --replication-factor 3
# 메시지 송수신 테스트
1. producer에 메시지 송신
--broker-list : 메시지를 보내는 카프카 클러스터의 브로커 호스트명과 포트번호를 지정 (호스트명: 포트), 기본포트는 9092
kafka-topics --zookeeper centos1:2181,centos2:2181,centos3:2181 --create --topic wedul --partitions 3 --replication-factor 3
2. consumer 메시지 수신
--bootstrap-server : 메시지를 수신하는 카프카 클러스터의 브로커 호스트명과 포트
kafka-console-consumer --bootstrap-server centos1:9092,centos2:9092,centos3:9092 --topic wedul
주키퍼, 카프카 정지
sudo systemctl stop confluent-kafka
sudo systemctl stop confluent-zookeeper
'web > kafka' 카테고리의 다른 글
Kafka cluster에서 topic 지우기 (0) | 2021.03.20 |
---|---|
Kafka Connect 정리 (0) | 2021.03.14 |
[번역] shared message queues와 publish-subscribe 방식에 Custom Group 방식을 더한 Kafka 소개 (0) | 2019.02.14 |
Kafka 요약 정리 (0) | 2019.01.24 |
Kafka 정리 (1) | 2018.12.30 |