kafka에서 메시지를 꺼낼 때 에러가 발생한 경우에는 spring 영역에서 메시지를 꺼내기 전이므로 에러 핸들링이 불가하다
해당 deserializer를 사용하면 실제 deserializer로 delegate 하는 형식으로 한 단계 wrapper가 생긴다
spring.json.trusted.packages :
JSON 형식으로 deserialize 할때, 신뢰할수있는패키지목록을지정하는설정
* 는모든패키지를신뢰한다는의미
Topic 등록
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public NewTopics topics() {
return new NewTopics(
TopicBuilder.name("TOPIC_NAME").partitions(3).replicas(3).build()
);
}
}
Producer
@Component
public class TestProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public Mono<Result> sendTest(Message message) {
CompletableFuture<SendResult<String, Object>> kafkaFuture =
kafkaTemplate.send("TOPIC_NAME", message).completable();
return Mono.fromFuture(kafkaFuture)
.map(item -> new Result(message.getRequestId()))
.timeout(Duration.ofSeconds(3))
.doOnError(
e -> service.insertFailed(Code.FAILED_KAFKA.getCode(), e.getMessage())
)
.onErrorResume(ex -> {
log.warn("publish message to kafka failed", ex);
return Mono.just(Result.PRODUCE_KAFKA_MESSAGE_FAILED);
});
}
}
Consumer
@Component
public class PushMessageConsumer {
@KafkaListener(topics = "TOPIC_NAME")
public void kafkaListener(Message message) {
callApi(message);
}
}