새소식

반응형
Kafka

[Kafka] SpringBoot+Kafka 연동

  • -
반응형

dependency 추가 

implementation 'org.springframework.kafka:spring-kafka'

 

application.yml 설정

spring:
  kafka:
    bootstrap-servers:
      - "127.0.0.1:9092"
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        interceptor.classes: com.test.api.config.kafka.KafkaProducerInterceptor
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.json.trusted.packages: "*"
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  • bootstrap-servers : 연결할 Kafka 브로커의 위치를 설정
  • value-serializer :
    • 데이터를 Kafka 브로커로 전송하기 전에 데이터를 byte array 변환하는 사용하는 직렬화 메커니즘을 설정
    • Kafka 네트워크를 통해 데이터를 전송하기 때문에, 객체를 byte array 변환하는 직렬화 과정이 필요
    • ErrorHandlingDeserializer
      • kafka에서 메시지를 꺼낼 때 에러가 발생한 경우에는 spring 영역에서 메시지를 꺼내기 전이므로 에러 핸들링이 불가하다
      • 해당 deserializer를 사용하면 실제 deserializer로 delegate 하는 형식으로 한 단계 wrapper가 생긴다
  • spring.json.trusted.packages :
    • JSON 형식으로 deserialize 할때, 신뢰할 있는 패키지 목록을 지정하는 설정
    • * 모든 패키지를 신뢰한다는 의미

Topic 등록

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public NewTopics topics() {
        return new NewTopics(
            TopicBuilder.name("TOPIC_NAME").partitions(3).replicas(3).build()
        );
    }
}

 

Producer

@Component
public class TestProducer {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    public Mono<Result> sendTest(Message message) {
        CompletableFuture<SendResult<String, Object>> kafkaFuture = 
                         kafkaTemplate.send("TOPIC_NAME", message).completable();
        return Mono.fromFuture(kafkaFuture)
                   .map(item -> new Result(message.getRequestId()))
                   .timeout(Duration.ofSeconds(3))
                   .doOnError(
                       e -> service.insertFailed(Code.FAILED_KAFKA.getCode(), e.getMessage())
                   )
                   .onErrorResume(ex -> {
                       log.warn("publish message to kafka failed", ex);
                       return Mono.just(Result.PRODUCE_KAFKA_MESSAGE_FAILED);
                   });
    }
}

 

Consumer

@Component
public class PushMessageConsumer {
    @KafkaListener(topics = "TOPIC_NAME")
    public void kafkaListener(Message message) {
        callApi(message);
    }
}
반응형
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.