Listener Error Handlers
@KafkaListener의 attribute -> errorHandler
KafkaListenerErrorHandler 인터페이스를 구현
@KafkaListener(topics = "test.topic", errorHandler = "testListenerErrorHandler")
public void test(@Payload Param param) {
// ....
}
@Component
public class TestListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
Param payload = (Param) message.getPayload();
log.error(payload.toString(), exception);
return null;
}
}
The error handler can throw the original or a new exception, which is thrown to the container.
Anything returned by the error handler is ignored
Container Error Handlers
KafkaListenerContainerFactory에 설정된 CommonErrorHandler
- default configuration (FixedBackOff(0L, 9))
- Failures are simply logged after retries are exhausted
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 9)));
...
return factory;
}
DefaultErrorHandler에서 재시도를 하지 않는 exception 종류
- DeserializationException
- MessageConversionException
- ConversionException
- MethodArgumentResolutionException
- NoSuchMethodException
- ClassCastException
아래의 설정으로 customize 가능
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}