web/kafka

Kafka Connect 정리

반응형

Kafka Connect

- kafka에서 외부로 데이터로 출력을 하거나 kafka로 데이터를 입력하도록 도와주는 도구

- kafka connect는 kafka connect와 플러그인 형태로 되어 있으며 데이터를 producer를 통해 넣는 부분을 source라고 하고 consumer 쪽으로 보내는 곳을 sink라고 한다.

- 공개된 플러그인이 있으며 sink, source 양쪽 모두 공개된 플러그인만 사용하는게 좋다. (신뢰도가 떨어지는 플러그인은 버그가..)

- kafka connect는 브로커와 동일한 서버에서 동작할 수 있기 때문에 kafka 클러스터와 kafka connect 클러스터를 함께 구성도 가능

- kafka connect 클러스터에 source, sinke의 플러그인으로 데이터를 입출력할 때는 논리적인 작업이 필요한데 이 작업을 connector 인스턴스라고 부른다. 

- kafka connect의 동작은 standalone과 distributed 모드가 있으며 standalone 모드는 kafka connect 한 개로 동작하고 distributed는 클러스터를 구성해서 사용 가능

- bootstrap.servers 파라미터를 사용해서 여러 브로커를 설정할 수 있고 group.id를 설정하여 같은 group.id를 가진 connect는 같은 클러스터에 묶이게 할 수있다.


 

Kafka Connect 연결 예제 (FileStream Connector)

작성된 file의 내용을 source connector에 연결하여 브로커에 전송하고 sink connector를 사용하여 새로운 파일로 작성하는 예제를 만들어보자.

 

wedul.site/682를 통해 만든 kafka cluster를 사용해서 테스트 한다.

 

실행

server 설정 및 group id 지정을 위해 connect-distributed.properties를 수정하여 broker를 하나만 둔 kafka connect distributed 모드를 실행시켜보자.

// connect-distributed.properties 내용

...
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=centos1:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster-datahub-1
...

 

그리고 수정한 properties를 사용하여 connect distributed 모드로 실행시킨다.

connect-distributed ./connect-distributed.properties

 

정상적으로 수행되면 plugin load 로그와 최종 connector가 실행된 로그를 볼 수 있고 api로 connector-plugin 리스트와 동작 상태들을 확인할 수 있다.

// 실행 완료 로그
[2021-03-14 06:37:24,971] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)


// curl로 연결 확인
[root@f6ed7547e36f kafka]# curl centos1:8083
{"version":"2.0.1-cp8","commit":"c34a431c349189df","kafka_cluster_id":"gGrIL7_TQIe_NoaHzLtV9w"}[root@f6ed7547e36f kafka]#


// 사용가능한 connector-plugins 체크 (써드파티 플러그인 추가 가능)
[root@f6ed7547e36f kafka]# curl centos1:8083/connector-plugins | python -m json.tool
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   878  100   878    0     0  28154      0 --:--:-- --:--:-- --:--:-- 28322
[
    {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "5.0.4"
    },
    {
        "class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "type": "sink",
        "version": "5.0.4"
    },
    {
        "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
        "type": "source",
        "version": "2.0.1-cp8"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "5.0.4"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "5.0.4"
    },
    {
        "class": "io.confluent.connect.s3.S3SinkConnector",
        "type": "sink",
        "version": "5.0.4"
    },
    {
        "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
        "type": "source",
        "version": "2.0.1-cp8"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.0.1-cp8"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.0.1-cp8"
    }
]

 

source connector 추가

전송할 대상이 될 txt 파일 생성

# path
/zaiko/latest.txt

# 내용
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP001,480,2018-10-01 01:01:01
ITEM001,SHOP001,25,2018-10-01 01:01:01
ITEM003,SHOP001,6902,2018-10-01 02:02:02

 

FilestreamSourceConnect를 연결

- name : 커넥터 이름

- connector.class : 사용할 커넥터의 클래스명을 지정

- file : 읽을 파일명을 지정한다.

- topic : 커넥터가 카프카에 투입할 때 사용할 토픽명

# connector 연결
[root@f6ed7547e36f zaiko]# echo '
{
"name": "load-zaiko-data",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
> "file": "/zaiko/latest.txt",
> "topic" :"zaiko-data"
> }
> }
> ' | curl -X POST -d @- http://centos1:8083/connectors \
> --header "content-Type:application/json"
{"name":"load-zaiko-data","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","file":"/zaiko/latest.txt","topic":"zaiko-data","name":"load-zaiko-data"},"tasks":[],"type":"source"}

# connectors 추가 확인
[root@f6ed7547e36f zaiko]# curl http://centos1:8083/connectors
["load-zaiko-data"]

 

source connectors 정상 연동 확인을 위해 지정한 topic (zaiko-data) 내용 확인

[root@f6ed7547e36f zaiko]# kafka-console-consumer --bootstrap-server=centos1:9092,centos2:9092,centos3:9093 --topic zaiko-data --from-beginning
{"schema":{"type":"string","optional":false},"payload":"ITEM001,SHOP001,929,2018-10-01 01:01:01"}
{"schema":{"type":"string","optional":false},"payload":"ITEM002,SHOP001,480,2018-10-01 01:01:01"}
{"schema":{"type":"string","optional":false},"payload":"ITEM001,SHOP001,25,2018-10-01 01:01:01"}
{"schema":{"type":"string","optional":false},"payload":"ITEM003,SHOP001,6902,2018-10-01 02:02:02"}

 

sink connectors를 추가하여 source로 부터 전달된 데이터 파일로 내보내기

# connector 연결
[root@f6ed7547e36f zaiko]# echo '
> {
> "name": "sink-zaiko-data",
> "config": {
> "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
> "file": "/ec/zaiko-latest.txt",
> "topics":"zaiko-data"
> }
> }
> ' | curl -X POST -d @- http://centos1:8083/connectors \
> --header "content-Type:application/json"
{"name":"sink-zaiko-data","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","file":"/ec/zaiko-latest.txt","topics":"zaiko-data","name":"sink-zaiko-data"},"tasks":[],"type":"sink"}


# connector 추가 확인
[root@f6ed7547e36f zaiko]# curl http://centos1:8083/connectors
["sink-zaiko-data","load-zaiko-data"]

 

sink로 연결한 /ec/zaiko-latest.txt에 source로 연결한 /zaiko/latest.txt의 내용이 들어가 있는지 확인

[root@f6ed7547e36f zaiko]# tail -f /ec/zaiko-latest.txt
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP001,480,2018-10-01 01:01:01
ITEM001,SHOP001,25,2018-10-01 01:01:01
ITEM003,SHOP001,6902,2018-10-01 02:02:02

 

추가로 들어가는 데이터도 잘 들어가는지 확인

# 실시간 변동사항 전송 확인 (신규 row 추가)
[root@f6ed7547e36f /]# cat << EOF >> /zaiko/latest.txt
> ITEM001,SHOP001,959,2018-10-03 03:00:00
> ITEM002,SHOP002,913,2019-11-03 04:00:00
> EOF

# 실시간 추가 확인
[root@f6ed7547e36f zaiko]# tail -f /ec/zaiko-latest.txt
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP001,480,2018-10-01 01:01:01
ITEM001,SHOP001,25,2018-10-01 01:01:01
ITEM003,SHOP001,6902,2018-10-01 02:02:02
ITEM001,SHOP001,959,2018-10-03 03:00:00
ITEM002,SHOP002,913,2019-11-03 04:00:00

 

사용한 리소스 제거

# sink, source connector 제거
[root@f6ed7547e36f zaiko]# curl -X DELETE http://centos1:8083/connectors/load-zaiko-data
[root@f6ed7547e36f zaiko]# curl -X DELETE http://centos1:8083/connectors/sink-zaiko-data

# connector 리스트 확인 (정상 제거 여부 확인)
[root@f6ed7547e36f zaiko]# curl http://centos1:8083/connectors
[]

 

mariadb, elasticsaerch, s3, mongodb 등등의 connectors를 이용해서 데이터 전송을 용이하게 할 수 있을 것 같다.

 

출처 : 실전 아파치 카프카 (한빛미디어, 저 사사키도루)

반응형