Spring-Kafka를 이용하여 Consumer를 구현하게 되면, 보통 @KafkaListener를 많이 사용한다.
@KafkaListener(
topics = "testTopic1",
groupId = "test-consumer-group1",
concurrency = "1"
)
public void test(Message message) {
//...
}
@KafkaListener의 설정 중에 concurrency라는 설정을 알아보자.
해당 설정은 consumer의 thread 개수를 지정하는 값이다.
그렇다면, 하나의 consumer에서 동시에 여러 message를 처리할 수 있도록 thread를 늘리면 좋을 것이라는 생각이 드는데, thread 개수를 늘린다고 무조건 처리량이 좋아지는 것은 아니다.
consume 하는 topic의 partition 개수와 상응하는 값으로 설정해야 유후 thread 없이 작업양을 늘릴 수 있다
for (int i = 0; i < 10; i++) {
Message message = new Message("message " + i);
log.info("[test] " + message.getText());
kafkaTemplate.send("testTopic1", message);
}
@KafkaListener(
topics = "testTopic1",
groupId = "test-consumer-group1",
concurrency = "${value}"
)
public void test(Message message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
1 partition 2 concurrency
10개의 message를 생성하고 하나의 message를 처리하는데 1초의 시간이 걸린다고 가정
총 걸리는 시간은 ?
10초
하나의 partition에 할당되는 thread의 개수는 1개이다
2 partition 2 concurrency
10개의 message를 생성하고 하나의 message를 처리하는데 1초의 시간이 걸린다고 가정
총 걸리는 시간은?
5초