동기적으로 API를 호출한다면 응답이 올 때까지 기다렸다가,
응답을 받으면 해당 응답의 결과 값을 기반으로 다음 로직을 진행한다.
다만, 최근에는 kafka와 같은 message queue의 도입으로 요청을 보내는 쪽에서 결과에 대한 처리를 보장하는 것보다는
요청을 받은 쪽에서 멱등성을 보장해 주는 역할이 더 커진 것 같다.
이러한 consumer의 역할을 만족하기 위해서 spring-kafka에서는
consumer에서 요청을 처리하는 과정에 exception이 발생했을 때 쉽게 처리할 수 있도록 @RetryableTopic 기능을 제공한다.
쉽게 말하면, Consumer에서 요청을 처리하는 과정에서 문제가 생겼을 때
다시 처리할 수 있도록 재처리-토픽을 두고, 해당 토픽에 메시지를 전송해 주는 것이다.
@RetryableTopic
- backoff : 재수행 간격 지정
- attempt : 재수행 횟수
- 해당 숫자에서 1을 뺀 만큼 retryTopic이 생성된다
- 첫 시도 topic + retryTopic(attempt-1)
- include : 특정 exception이 발생한 경우만 재수행
- topicSuffixingStrategy : retry 토픽명 지정 옵션
- dltStrategy : dlt 수행에 대한 옵션
해당 설정은 @KafkaListener와 함께 정의하면 해당 topic에 대한 재시도를 수행할 수 있다.
다만, 재시도를 원하는 모든 consumer에 해당 설정을 추가해주어야 하는 boilerplate code가 될 수 있다
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 10 * 1000, multiplier = 3, maxDelay = 10 * 60 * 1000),
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
dltStrategy = DltStrategy.ALWAYS_RETRY_ON_ERROR
include = FailedException.class
)
@KafkaListener(topics = "test-topic")
public void consume() {
// ..
}
RetryTopicConfiguration
해당 설정은 RetryTopicConfiguration을 Bean으로 등록해 주고
includeTopic 설정을 추가해 주어서 재시도를 하고 싶은 topic을 지정해 줄 수 있다.
@Bean
public RetryTopicConfiguration retryableTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.maxAttempts(3)
.exponentialBackoff(10 * 1000L, 2, 5 * 60 * 1000L)
.autoCreateTopics(true, 3, (short) 3)
.includeTopic(retryTopic)
.retryOn(FailedException.class)
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.dltProcessingFailureStrategy(DltStrategy.ALWAYS_RETRY_ON_ERROR)
.create(template);
}