-
[Kafka] Commit📖 개발 공부/kafka 2023. 3. 26. 15:11
컨슈머가 poll() 을 호출할 때마다 컨슈머 그룹은 저장되어 있는 아직 읽지 않은 메시지를 가져온다.
컨슈머 그룹이 메시지를 어디까지 가져갔는지 알 수 있기 때문이다.- 오프셋 : 컨슈머들이 각각의 파티션에 자신이 가져간 메시지의 위치 정보
- 커밋 : 각 파티션에 대해 현재 위치를 업데이트하는 동작
카프카 내에 별도로 내부에서 사용하는 토픽(__comsumer_offsets)을 만들고, 그 토픽에 오프셋 정보를 저장하고 있다.
(이전 카프카에서는 이 오프셋 정보를 주키퍼에 저장했다.)자동 커밋 (enable.auto.commit=true)
auto.commit: 시간단위
5초(기본값)마다 컨슈머는 poll()을 호출할 때 가장 마지막 오프셋을 커밋한다.
컨슈머는 poll을 요청할 때마다 커밋할 시간인지 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋한다.
주의
마지막 커밋 이후 poll() 주기 5초 진행 중에 3초가 지난 시점에 리밸런스가 발생하는 경우
→ 마지막 자동 커밋 시점으로부터 진행된 3초의 데이터들의 중복이 발생하게 된다.수동 커밋 (enable.auto.commit=false)
메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용한다.
만약 컨슈머가 데이터베이스에 메시지를 저장한다고 가정한다면, 데이터베이스에 데이터가 저장되는 시점까지를 메시지를 가져온 시점이라고 보고 이때 커밋하는 경우이다.
자동 커밋을 사용하는 경우엔 자동 커밋의 주기로 인해 poll 하면서 마지막 값의 오프셋으로 자동 커밋이 되었고, 일부 메시지들은 데이터베이스에는 저장하지 못한 상태로 컨슈머 장애가 발생한다면 해당 메시지들은 손실될 수도 있다.
이 경우엔, 데이터베이스에 메시지를 저장한 후 커밋을 해야 안전하다.
이처럼 수동 커밋은 메시지를 가져온 것으로 간주되는 시점을 자유롭게 조정할 수 있는 장점이 있다.주의
메시지를 데이터베이스에 저장하는 도중에 실패하여 마지막 커밋된 오프셋부터 메시지를 다시 가져오는 경우, 메시지들이 중복으로 저장될 수 있다
→ 데이터베이스에 3개의 메시지가 저장되고 그 다음 메시지 저장되는 시점에 에러가 발생했다면 이미 저장되어있는 3개의 메시지들을 다시 가져오게되기 때문이다.
수동 커밋을 할 경우 commitSync(), commitAsync()를 통해 직접 커밋을 한다.- commitSync(): 동기 커밋
- ConsumerRecord 처리 순서를 보장한다.
- 커밋이 완료될 때까지 block 되어 가장 느리다.
- poll() 메서드로 가지고 온 ConsumerRecord의 마지막 offset을 커밋한다.
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync(); } catch (CommitFailedException e) { log.error("commit failed", e) } }
- commitAsync(): 비동기 커밋
- 동기 커밋보다 빠르다.
- 중복이 발생할 수 있다.
- 이전 offset보다 이후 offset이 먼저 커밋되는 경우
- ConsumerRecord 처리 순서를 보장하지 못한다.
commitAsync() 메서드는 동기와 다르게 재시도를 하지 않는다.while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); }
8번 offset이 polling을 성공하고, 3번 offset에서 전송한 커밋 요청에 에러가 발생했을 때 재시도 후 offset을 commit하게 되면 최신 offset은 3이 된다. 이때 poll()을 다시 요청하게 되면 3번부터 시작하게 되어 3~8번까지 데이터 중복이 발생할 수 있다. → 비동기 방식은 실패에 대해 polling을 재시도 하지 않는다.
이렇게 카프카에서는 메시지는 한 번씩 전달되지만 장애 등의 이유로 중복이 발생할 수 있기 때문에 카프카는 적어도 한번을 보장한다. (중복은 있지만 손실은 없다.)
__consumer_offsets$ ./kafka-console-consumer.sh \\ --bootstrap-server localhost:9092 \\ --topic __consumer_offsets \\ --from-beginning \\ --formatter "kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter" ... [console-consumer-96201,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616918718895,ExpirationTime 1617005118895] ...
[컨슈머 그룹,토픽,파티션]::[메시지 내용]
🔗 참고 링크
728x90반응형'📖 개발 공부 > kafka' 카테고리의 다른 글
[Kafka] Spring Kafka JSON Batch (0) 2023.03.26 [Kafka] reset offset (0) 2023.02.25 [Kafka] Rebalancing (리밸런싱) (0) 2023.01.31 [Kafka] Partition 딥다이브 (0) 2023.01.18 [Kafka] Kafka Architecture (0) 2023.01.14