Kafka Connect
- kafka에서 외부로 데이터로 출력을 하거나 kafka로 데이터를 입력하도록 도와주는 도구
- kafka connect는 kafka connect와 플러그인 형태로 되어 있으며 데이터를 producer를 통해 넣는 부분을 source라고 하고 consumer 쪽으로 보내는 곳을 sink라고 한다.
- 공개된 플러그인이 있으며 sink, source 양쪽 모두 공개된 플러그인만 사용하는게 좋다. (신뢰도가 떨어지는 플러그인은 버그가..)
- kafka connect는 브로커와 동일한 서버에서 동작할 수 있기 때문에 kafka 클러스터와 kafka connect 클러스터를 함께 구성도 가능
- kafka connect 클러스터에 source, sinke의 플러그인으로 데이터를 입출력할 때는 논리적인 작업이 필요한데 이 작업을 connector 인스턴스라고 부른다.
- kafka connect의 동작은 standalone과 distributed 모드가 있으며 standalone 모드는 kafka connect 한 개로 동작하고 distributed는 클러스터를 구성해서 사용 가능
- bootstrap.servers 파라미터를 사용해서 여러 브로커를 설정할 수 있고 group.id를 설정하여 같은 group.id를 가진 connect는 같은 클러스터에 묶이게 할 수있다.
Kafka Connect 연결 예제 (FileStream Connector)
작성된 file의 내용을 source connector에 연결하여 브로커에 전송하고 sink connector를 사용하여 새로운 파일로 작성하는 예제를 만들어보자.
wedul.site/682를 통해 만든 kafka cluster를 사용해서 테스트 한다.
실행
server 설정 및 group id 지정을 위해 connect-distributed.properties를 수정하여 broker를 하나만 둔 kafka connect distributed 모드를 실행시켜보자.
// connect-distributed.properties 내용
...
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=centos1:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster-datahub-1
...
그리고 수정한 properties를 사용하여 connect distributed 모드로 실행시킨다.
connect-distributed ./connect-distributed.properties
정상적으로 수행되면 plugin load 로그와 최종 connector가 실행된 로그를 볼 수 있고 api로 connector-plugin 리스트와 동작 상태들을 확인할 수 있다.
// 실행 완료 로그
[2021-03-14 06:37:24,971] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)
// curl로 연결 확인
[root@f6ed7547e36f kafka]# curl centos1:8083
{"version":"2.0.1-cp8","commit":"c34a431c349189df","kafka_cluster_id":"gGrIL7_TQIe_NoaHzLtV9w"}[root@f6ed7547e36f kafka]#
// 사용가능한 connector-plugins 체크 (써드파티 플러그인 추가 가능)
[root@f6ed7547e36f kafka]# curl centos1:8083/connector-plugins | python -m json.tool
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 878 100 878 0 0 28154 0 --:--:-- --:--:-- --:--:-- 28322
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "5.0.4"
},
{
"class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"type": "sink",
"version": "5.0.4"
},
{
"class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
"type": "source",
"version": "2.0.1-cp8"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "5.0.4"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "5.0.4"
},
{
"class": "io.confluent.connect.s3.S3SinkConnector",
"type": "sink",
"version": "5.0.4"
},
{
"class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
"type": "source",
"version": "2.0.1-cp8"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.0.1-cp8"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.0.1-cp8"
}
]
source connector 추가
전송할 대상이 될 txt 파일 생성
# path
/zaiko/latest.txt
# 내용
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP001,480,2018-10-01 01:01:01
ITEM001,SHOP001,25,2018-10-01 01:01:01
ITEM003,SHOP001,6902,2018-10-01 02:02:02
FilestreamSourceConnect를 연결
- name : 커넥터 이름
- connector.class : 사용할 커넥터의 클래스명을 지정
- file : 읽을 파일명을 지정한다.
- topic : 커넥터가 카프카에 투입할 때 사용할 토픽명
# connector 연결
[root@f6ed7547e36f zaiko]# echo '
{
"name": "load-zaiko-data",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
> "file": "/zaiko/latest.txt",
> "topic" :"zaiko-data"
> }
> }
> ' | curl -X POST -d @- http://centos1:8083/connectors \
> --header "content-Type:application/json"
{"name":"load-zaiko-data","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","file":"/zaiko/latest.txt","topic":"zaiko-data","name":"load-zaiko-data"},"tasks":[],"type":"source"}
# connectors 추가 확인
[root@f6ed7547e36f zaiko]# curl http://centos1:8083/connectors
["load-zaiko-data"]
source connectors 정상 연동 확인을 위해 지정한 topic (zaiko-data) 내용 확인
[root@f6ed7547e36f zaiko]# kafka-console-consumer --bootstrap-server=centos1:9092,centos2:9092,centos3:9093 --topic zaiko-data --from-beginning
{"schema":{"type":"string","optional":false},"payload":"ITEM001,SHOP001,929,2018-10-01 01:01:01"}
{"schema":{"type":"string","optional":false},"payload":"ITEM002,SHOP001,480,2018-10-01 01:01:01"}
{"schema":{"type":"string","optional":false},"payload":"ITEM001,SHOP001,25,2018-10-01 01:01:01"}
{"schema":{"type":"string","optional":false},"payload":"ITEM003,SHOP001,6902,2018-10-01 02:02:02"}
sink connectors를 추가하여 source로 부터 전달된 데이터 파일로 내보내기
# connector 연결
[root@f6ed7547e36f zaiko]# echo '
> {
> "name": "sink-zaiko-data",
> "config": {
> "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
> "file": "/ec/zaiko-latest.txt",
> "topics":"zaiko-data"
> }
> }
> ' | curl -X POST -d @- http://centos1:8083/connectors \
> --header "content-Type:application/json"
{"name":"sink-zaiko-data","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","file":"/ec/zaiko-latest.txt","topics":"zaiko-data","name":"sink-zaiko-data"},"tasks":[],"type":"sink"}
# connector 추가 확인
[root@f6ed7547e36f zaiko]# curl http://centos1:8083/connectors
["sink-zaiko-data","load-zaiko-data"]
sink로 연결한 /ec/zaiko-latest.txt에 source로 연결한 /zaiko/latest.txt의 내용이 들어가 있는지 확인
[root@f6ed7547e36f zaiko]# tail -f /ec/zaiko-latest.txt
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP001,480,2018-10-01 01:01:01
ITEM001,SHOP001,25,2018-10-01 01:01:01
ITEM003,SHOP001,6902,2018-10-01 02:02:02
추가로 들어가는 데이터도 잘 들어가는지 확인
# 실시간 변동사항 전송 확인 (신규 row 추가)
[root@f6ed7547e36f /]# cat << EOF >> /zaiko/latest.txt
> ITEM001,SHOP001,959,2018-10-03 03:00:00
> ITEM002,SHOP002,913,2019-11-03 04:00:00
> EOF
# 실시간 추가 확인
[root@f6ed7547e36f zaiko]# tail -f /ec/zaiko-latest.txt
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP001,480,2018-10-01 01:01:01
ITEM001,SHOP001,25,2018-10-01 01:01:01
ITEM003,SHOP001,6902,2018-10-01 02:02:02
ITEM001,SHOP001,959,2018-10-03 03:00:00
ITEM002,SHOP002,913,2019-11-03 04:00:00
사용한 리소스 제거
# sink, source connector 제거
[root@f6ed7547e36f zaiko]# curl -X DELETE http://centos1:8083/connectors/load-zaiko-data
[root@f6ed7547e36f zaiko]# curl -X DELETE http://centos1:8083/connectors/sink-zaiko-data
# connector 리스트 확인 (정상 제거 여부 확인)
[root@f6ed7547e36f zaiko]# curl http://centos1:8083/connectors
[]
mariadb, elasticsaerch, s3, mongodb 등등의 connectors를 이용해서 데이터 전송을 용이하게 할 수 있을 것 같다.
출처 : 실전 아파치 카프카 (한빛미디어, 저 사사키도루)
'web > kafka' 카테고리의 다른 글
kafka cluster replica partition reassignment (0) | 2021.03.20 |
---|---|
Kafka cluster에서 topic 지우기 (0) | 2021.03.20 |
docker에서 Kafka + zookeeper 클러스터 구성하기 (0) | 2021.02.22 |
[번역] shared message queues와 publish-subscribe 방식에 Custom Group 방식을 더한 Kafka 소개 (0) | 2019.02.14 |
Kafka 요약 정리 (0) | 2019.01.24 |