반응형
간단하게 Kafka 설치
docker-compose.yml 생성 후 docker-compose up -d를 통해 설치
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_HOST_NAME: wedul.pos KAFKA_CREATE_TOPICS: "test:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock | cs |
설치된 프로세스 확인
생성된 토픽 확인
- test 토픽이 파티션 1와 replication 1로 생성되었는지 여부 확인
1 2 3 | $ docker exec -it simple_kafka_1 bash $ cd /opt/kafka/bin $ kafka-topics.sh --describe --topic test --zookeeper simple_zookeeper_1 | cs |
Spring Boot 프로젝트 생성
카프카 설정
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | package com.kafka.study.configuration; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import java.util.HashMap; import java.util.Map; /** * 카프카 설정 * * @author wedul * @since 2019-01-24 **/ @Configuration @EnableKafka @PropertySource("classpath:kafka.properties") public class KafkaConfiguration { @Autowired private Environment env; private Map<String, Object> producerConfig() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers")); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return config; } @Bean public KafkaTemplate<String, String > kafkaTemplate() { return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig())); } } | cs |
카프카 컨트롤러
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | package com.kafka.study.ctrl; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * study * * @author wedul * @since 2019-01-24 **/ @RestController @Slf4j public class KafkaCtrl { private final KafkaTemplate kafkaTemplate; public KafkaCtrl(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @PostMapping("/send") public ResponseEntity<String> sendMessage(String message) { if(!StringUtils.isEmpty(message)) kafkaTemplate.send("test", "Message is " + message); log.info(message); return ResponseEntity.ok(""); } } | cs |
Console-consumer 모니터링 모드
카프카에 메시지를 send 했을 때 모니터링하기 위한 모드 스크립트 실행
1 | bash-4.4# kafka-console-consumer.sh --bootstrap-server wedul.pos:9092 --topic test | cs |
실행해보면 콘솔에 메시지가 전송된것을 확인할 수 있다.
Kafka Licenser 설정
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | package com.kafka.study.configuration; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * study * * @author wedul * @since 2019-01-25 **/ @Slf4j @Component public class ReceiveConfiguration { @KafkaListener(topics = "test", groupId = "console-consumer-1970") public void receive(String payload) { log.info("received payload='{}'", payload); } } | cs |
보내면 바로 consumer에서 메시지를 받을 수 있도록 리스너를 설정해보자.
그리고 테스트!
1 2 3 4 5 | 2019-01-25 00:09:43.033 INFO 1760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1 2019-01-25 00:09:43.034 INFO 1760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5 2019-01-25 00:09:43.041 INFO 1760 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: 8gNzLx__SHq-4p0b_WsydA 2019-01-25 00:09:43.047 INFO 1760 --- [nio-8080-exec-1] com.kafka.study.ctrl.KafkaCtrl : babo 2019-01-25 00:09:43.069 INFO 1760 --- [ntainer#0-0-C-1] c.k.s.c.ReceiveConfiguration : received payload='Message is babo' | cs |
git 저장소 : https://github.com/weduls/kafka_example
반응형
'web > Spring' 카테고리의 다른 글
creating bean with name 'webMvcRequestHandlerProvider' defined in URL 에러처리 (0) | 2019.03.27 |
---|---|
Spring Boot에서 6.4 Elasticsearch 연결 및 간단 CRUD (5) | 2019.02.09 |
Spring Reactive Web Application (0) | 2019.01.12 |
생성한 Custom validation으로 에러메시지 출력하기 (4) | 2018.12.25 |
Custom Validation 만들어서 추가하기 (0) | 2018.12.24 |