백엔드

[Kafka] 실시간 채팅에 카프카 적용하기

말감78 2025. 2. 28. 19:03
반응형

이번 글은 협업플랫폼 개발 프로젝트에서 Kafka를 적용하기까지 공부한 내용과 과정을 담을 예정이다.

Kafka(카프카)란?

Apache Kafka는 실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산 데이터 스트리밍 플랫폼입니다. 

기록 스트림은 데이터 스트림을 기반으로 한 데이터를 저장하거나 처리하는 것을 의미합니다. 
데이터 스트림(data stream)은 데이터가 지속적으로 생성되고, 시간에 따라 값이 변하는 데이터의 흐름을 의미합니다.

 

기본 개념

Kafka는 발행-구독(pub-sub) 메시징 시스템을 기반으로 하며, 데이터 스트림을 처리하는 분산 플랫폼입니다. LinkedIn에서 개발되었으며, 현재는 Apache 소프트웨어 재단에서 관리하고 있습니다.

 

주요 특징

1. 고성능: 초당 수백만 개의 메시지를 처리할 수 있다.

2. 내구성: 데이터는 디스크에 저장되며 복제되어 데이터 손실을 방지한다.

3. 확장성: 클러스터를 수평적으로 확장할 수 있다. -> 데이터가 많은 곳은 확장을 통해 안정적으로 처리가능

4. 분산처리: 여러 서버에 부하를 분산시킨다

5. 실시간 처리: 데이터 스트림을 실시간으로 처리

6. 내결함성: 서버 장애 시에도 데이터 손실 없이 작동

 

Kafka  활용 분야

1. 로그집계: 여러 서비스의 로그를 중앙에서 수집 및 처리

2. 메트릭 모니터링: 시스템 성능 지표수집 및 분석

3. 이벤트 소싱: 이벤트 기반 아키텍처 구현

4. 스트림 처리: 실시간 데이터 변환 및 분석

5. 메시지 큐: 비동기 통신을 위한 메시지 교환

6. 활동 추적: 사용자 활동 데이터 수집 및 분석

 

Kafka를 선택하기 전 고려할 점

1. 복잡한 설정: 초기 설정과 튜닝이 복잡하다

2. 대규모 클러스터 관리에 전문 지식이 필요

3. 많은 메모리를 사용할 수 있다.

4. 지속적인 모니터링이 필요

5. 개념 이해와 효율적인 사용에 시간이 필요


핵심 구성요소

1. 브로커(Broker)

  • Kafka 서버 인스턴스로, 메시지를 저장하고 처리한다.
  • 여러 브로커가 클러스터를 구성한다.

2. 토픽(Topic)

  • 메시지가 발행되는 카테고리
  • 데이터 스트림을 구분하는 단위

3. 파티션(Partition)

  • 토픽을 여러 파티션으로 나누어 병렬 처리
  • 각 파티션은 순서가 보장된 메시지 시퀀스

4. 프로듀서(Producer)

  • 메시지를 생성하여 토픽에 게시하는 역할
  • 메시지를 특정 토픽의 파티션에 전송

5. 컨슈머(Consumer)

  • 토픽에서 메시지를 구독하고 처리하는 역할
  • 여러 컨슈머가 그룹을 형성하여 파티션 분담 가능

6. 컨슈머 그룹(Consumer Group)

  • 특정 토픽의 메시지를 분산 처리하는컨슈머들의 집합
  • 각 파티션은 컨슈머 그룹내의 하나의 컨슈머만 소비가능
  • 즉, 같은 컨슈머 그룹에 속하면 브로커에서 당겨온 메시지를 하나의 컨슈머에서만 소비하고 나머지 컨슈머는 처리하지 않는다.

7. 주키퍼(Zookeeper)

  • Kafka 클러스터 관리와 조정에 사용됨
  • 브로커 상태, 토픽 구성 등의 메타데이터 관리
  • kafka 사용을 위해서는 필수로 주키퍼 의존성을 설치해야 한다.

작동 방법

메시지 발행 과정

  1. 프로듀서가 메시지를 생성하여 토픽에 전송
  2. 브로커는 메시지를 해당 토픽의 파티션에 기록
  3. 메시지는 파티션 내에서 순차적으로 저장되며, 각 메시지에는 오프셋(offset)이라는 고유 식별자 할당

메시지 구독 과정

  1. 컨슈머는 관심있는 토픽 구독
  2. 컨슈머는 브로커로부터 메시지를 가져옴(pull방식)
  3. 메시지를 처리한 후, 컨슈머는 처리한 메시지의 오프셋을 커밋
  4. 커밋된 오프셋은 컨슈머가 어디까지 메시지를 처리했는지 추적하는데 사용

프로젝트 적용 

1. build.gradle에 kafka 의존성 추가

dependencies {
	implementation 'org.springframework.kafka:spring-kafka' // Spring Kafka 의존성 추가
    // implementation 'org.springframework.kafka:spring-kafka:2.8.0' // 트정 버전 지정 가능
}

2. application.yml 설정

spring:
	kafka:
            bootstrap-servers: kafka:9092
            consumer:
              group-id: groupId
              auto-offset-reset: earliest
auto.offset.reset은 컨슈머가 토픽의 어느 offset부터 소비할지 설정하는 역할
- latest(Default) : 가장 마지막 offset부터
- earliest : 가장 처음 offset부터
- none : 해당 consumer group이 가져가고자 하는 topic의 consuer offset정보가 없으면 exception을 발생시킴.
latest를 설정할 경우 가장 최신의 offset부터 소비하면서 메시지가 유실될 가능성이 있다. 내 프로젝트를 메시지가 유실되면 안되므로 earliest 설정

Kafka Config 설정(ProducerFactory, ConsumerFactory)

@Configuration
public class KafkaProducerConfig {

    //    ====Producer
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapServer;

    @Bean
    public ProducerFactory<String, Object> producerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer); // 서버위치 넣어주는 것
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //StringSerializer는, kafka에 있는걸 불러오기
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); //JsonSerializer도 kafka 안에 있는걸로

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean // 템플릿 제작
    public KafkaTemplate<String, Object> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
    
    //    ====Consumer
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffSet;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 그룹id 주입
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffSet); // autoOffSet 주입

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }
}

 

코드 설명 보충 예정

반응형