새소식

반응형
Kafka

[Kafka] Spring Kafka Listener Concurrency 설정

  • -
반응형

Spring-Kafka를 이용하여 Consumer를 구현하게 되면, 보통 @KafkaListener를 많이 사용한다.

@KafkaListener(
    topics = "testTopic1",
    groupId = "test-consumer-group1",
    concurrency = "1"
)
public void test(Message message) {
  //...
}

 

@KafkaListener의 설정 중에 concurrency라는 설정을 알아보자.

해당 설정은 consumer의 thread 개수를 지정하는 값이다.

 

그렇다면, 하나의 consumer에서 동시에 여러 message를 처리할 수 있도록 thread를 늘리면 좋을 것이라는 생각이 드는데, thread 개수를 늘린다고 무조건 처리량이 좋아지는 것은 아니다.

 

consume 하는 topic의 partition 개수와 상응하는 값으로 설정해야 유후 thread 없이 작업양을 늘릴 수 있다 

 

for (int i = 0; i < 10; i++) {
  Message message = new Message("message " + i);
  log.info("[test] " + message.getText());
  kafkaTemplate.send("testTopic1", message);
}

 

@KafkaListener(
    topics = "testTopic1",
    groupId = "test-consumer-group1",
    concurrency = "${value}"
)
public void test(Message message) {
    try {
        Thread.sleep(1000); 
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

1 partition 2 concurrency 

10개의 message를 생성하고 하나의 message를 처리하는데 1초의 시간이 걸린다고 가정

총 걸리는 시간은 ? 

10초 

하나의 partition에 할당되는 thread의 개수는 1개이다

 

2 partition 2 concurrency 

10개의 message를 생성하고 하나의 message를 처리하는데 1초의 시간이 걸린다고 가정

총 걸리는 시간은? 

5초 

 

반응형
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.