Backend
home
📩

WebSocket(STOMP) 채팅 - Redis Pub/Sub에서 Kafka로 전환

생성 일시
2025/10/11 09:53
태그
SpringBoot
게시일
2025/10/11
최종 편집 일시
2025/10/15 10:52

개요

DungeonTalk은 AI와 함께하는 TRPG(테이블탑 롤플레잉 게임) 플랫폼이다. 초기에는 Redis Pub/Sub을 사용하여 채팅 메시지를 브로드캐스팅했지만, 서비스 안전성과 확장성을 고려하여 Kafka로의 전환을 시도하게 되었다. 왜 Kafka로 전환을 했고 어떻게 구현했는지 그리고 어떤 문제를 겪었는지에 대해 정리해보고자 한다.
이전 Redis Pub/Sub 비동기 처리 관련 글 참고: https://codesche.oopy.io/287de3f7-e3a8-8018-9783-fcc4aa2aba36

1. 전환 배경: 왜 Redis Pub/Sub에서 Kafka로?

Redis Pub/Sub의 한계

프로젝트 초기에는 MVP 구현을 위해 Redis Pub/Sub을 선택했다. 간단하고 빠르게 구현할 수 있었고, 실시간 메시지 전달에는 문제가 없었다.
// 기존 Redis Pub/Sub 방식 @Component public class RedisPublisher { private final RedisTemplate<String, Object> redisTemplate; @Async("chatRedisExecutor") public CompletableFuture<Void> publishAsync(String roomId, String message) { redisTemplate.convertAndSend("chatroom." + roomId, message); return CompletableFuture.completedFuture(null); } }
Java
복사
하지만 다음과 같은 치명적인 한계를 발견했다:
1.
메시지 유실 가능성: 구독자(Subscriber)가 없거나 연결이 끊어진 순간에는 메시지가 사라진다.
2.
메시지 재처리 불가: 한 번 발행된 메시지는 다시 읽을 수 없다.
3.
확장성 제한: Redis는 단일 서버 구조로 대용량 트래픽 처리에 한계가 있다.

문제 상황

일부 사용자의 메시지가 전달되지 않음
서버 재시작 시 진행 중이던 채팅이 손실됨
트래픽이 몰릴 때 메시지 처리 지연이 발생
⇒ 이러한 문제들을 해결하고자 메시지 큐 시스템 도입을 검토하게 되었다.

2. 왜 Kafka인가?

다른 메시지 큐 시스템과의 비교

항목
Redis Pub/Sub
RabbitMQ
Kafka
메시지 지속성
없음
있음
있음
메시지 재처리
불가능
제한적
자유로움
처리량
~5K msg/s
~20K msg/s
~50K+ msg/s
확장성
제한적
중간
우수
순서 보장
없음
제한적
파티션별 보장
학습 곡선
쉬움
중간
높음

Kafka를 선택한 이유

1.
메시지 영속성: 디스크에 저장되어 유실 걱정이 없음
2.
높은 처리량: 대용량 트래픽 대응 가능 (초당 50,000+ 메시지)
3.
파티셔닝: roomId 를 파티션 키로 사용하여 같은 방의 메시지 순서 보장
4.
Consumer Group: 여러 서버에서 메시지를 나눠 처리 가능
5.
메시지 재처리: 문제 발생 시 offset을 조정하여 메시지 재처리 가능

3. Kafka 아키텍처 설계

Topic 전략

채팅 도메인에 맞게 단순한 Topic 구조를 설계했다.
dungeontalk.chat.regular - 일반 채팅방 메시지 (우선 적용)
Java
복사
토픽 설정:
# 개발 환경 기준 partitions=3 # 파티션 3개 replication.factor=1 # 단일 브로커 (개발용) retention.ms=86400000 # 1일 보관 compression.type=lz4 # 빠른 압축
YAML
복사

파티셔닝 전략:

핵심: roomId 를 파티션 키로 사용하여 같은 방의 메시지는 항상 같은 파티션으로 전송
// roomId를 파티션 키로 사용 kafkaTemplate.send("dungeontalk.chat.regular", roomId, messageDto);
Java
복사

4. 구현 과정

인프라 구성

Docker Compose로 Kafka 환경 구축
# docker-compose-kafka.yml version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 container_name: dungeontalk-zookeeper environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:7.5.0 container_name: dungeontalk-kafka depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
YAML
복사
의존성 추가
// build.gradle dependencies { // 기존 의존성... // Kafka implementation 'org.springframework.kafka:spring-kafka' // Test testImplementation 'org.springframework.kafka:spring-kafka-test' }
YAML
복사

Kafka 설정

# ======================= Kafka 설정 ======================= # Kafka 브로커 연결 spring.kafka.bootstrap-servers=localhost:9092 # Producer 설정 spring.kafka.producer.acks=1 spring.kafka.producer.retries=1 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer # Consumer 설정 spring.kafka.consumer.group-id=dungeontalk-chat-consumer spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=org.com.dungeontalk.* # Topic 이름 kafka.topics.chat.regular=dungeontalk.chat.regular
YAML
복사
KafkaConfig.java
@EnableKafka @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; // ============ Producer ============ @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); config.put(ProducerConfig.ACKS_CONFIG, "1"); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } // ============ Consumer ============ @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.Object"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(2); // 동시 처리 스레드 2개 return factory; } }
Java
복사

Producer 구현

KafkaPublisher.java
@Slf4j @Component @RequiredArgsConstructor public class KafkaPublisher { private final KafkaTemplate<String, Object> kafkaTemplate; @Value("${kafka.topics.chat.regular}") private String chatTopic; /** * 채팅 메시지 비동기 발행 * @param roomId 채팅방 ID (파티션 키) * @param message 메시지 객체 */ @Async("chatKafkaExecutor") public CompletableFuture<Void> publishChatAsync(String roomId, Object message) { return kafkaTemplate.send(chatTopic, roomId, message) .thenAccept(result -> { log.debug("Kafka 메시지 발행 완료: topic={}, partition={}, offset={}, roomId={}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset(), roomId); }) .exceptionally(ex -> { log.error("Kafka 메시지 발행 실패: roomId={}, error={}", roomId, ex.getMessage(), ex); return null; }); } }
Java
복사

Consumer 구현

KafkaSubscriber.java
@Slf4j @Service @RequiredArgsConstructor public class KafkaSubscriber { private final SimpMessageSendingOperations messagingTemplate; /** * 채팅 메시지 Consumer * - 메시지를 받아서 WebSocket으로 브로드캐스트 */ @KafkaListener( topics = "${kafka.topics.chat.regular}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerContainerFactory" ) public void consumeChat(ConsumerRecord<String, Object> record) { String roomId = record.key(); Object message = record.value(); try { log.debug("Kafka 메시지 수신: topic={}, partition={}, offset={}, roomId={}", record.topic(), record.partition(), record.offset(), roomId); // WebSocket으로 브로드캐스트 String destination = "/sub/chat/room/" + roomId; messagingTemplate.convertAndSend(destination, message); log.debug("WebSocket 브로드캐스트 완료: destination={}, roomId={}", destination, roomId); } catch (Exception e) { log.error("메시지 처리 실패: roomId={}, error={}", roomId, e.getMessage(), e); // 에러 발생 시 로깅만 하고 계속 진행 } } }
Java
복사

비동기 처리 Executor 추가

ChatAsyncConfig.java
/** * Kafka 메시지 발행용 스레드 풀 */ @Bean(name = "chatKafkaExecutor") public Executor chatKafkaExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(15); executor.setQueueCapacity(100); executor.setThreadNamePrefix("Chat-Kafka-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(30); executor.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> { log.warn("채팅 Kafka 실행자 큐가 가득 찼습니다. 호출 스레드에서 작업을 실행합니다."); runnable.run(); }); executor.initialize(); return executor; }
Java
복사

채팅 서비스 쪽 수정

// Before: Redis Pub/Sub private final RedisPublisher redisPublisher; public ChatMessageDto processMessage(ChatMessageSendRequestDto dto) throws JsonProcessingException { // ... 메시지 처리 로직 if (chatMessageDto != null) { redisPublisher.publishAsync(dto.getRoomId(), objectMapper.writeValueAsString(chatMessageDto)); } return chatMessageDto; }
Java
복사
// After: Kafka private final KafkaPublisher kafkaPublisher; public ChatMessageDto processMessage(ChatMessageSendRequestDto dto) throws JsonProcessingException { // ... 메시지 처리 로직 if (chatMessageDto != null) { kafkaPublisher.publishChatAsync(dto.getRoomId(), chatMessageDto); } return chatMessageDto; }
Java
복사
주요 변경사항:
RedisPublisherKafkaPublisher 로 교체
JSON 직렬화를 수동으로 하지 않고 객체 그대로 전달 (Kafka가 자동 직렬화)

트러블슈팅

Jackson 역직렬화 실패

에러 메시지
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.com.dungeontalk.domain.chat.dto.ChatMessageDto` (no Creators, like default constructor, exist)
Java
복사
원인
ChatMessageDto에 기본 생성자가 없어서 Jackson이 역직렬화를 할 수 없었다.
해결
@NoArgsConstructor@AllArgsConstructor 추가:
// After: 기본 생성자 추가 @Getter @Setter @Builder @NoArgsConstructor // Jackson이 역직렬화할 때 필요 @AllArgsConstructor // Builder와 함께 사용 public class ChatMessageDto { // ... }
Java
복사
Lombok의 @Builder 만 사용하면 기본 생성자가 생성되지 않기 때문에 Jackson 역직렬화를 위해서는
반드시 @NoArgsConstructor를 추가해야 한다.

Topic에 잘못된 메시지가 저장됨

처음 테스트할 때 DTO 구조가 맞지 않는 메시지가 Kafka에 저장되었고, Consumer가 계속 같은 메시지에서 실패하는 현상이 발생했다.
해결
Kafka Topic을 삭제하고 다시 시작:
# Topic 삭제 # 애플리케이션 재시작 시 Topic 자동 생성됨 docker exec dungeontalk-kafka kafka-topics \ --bootstrap-server localhost:9092 \ --delete --topic dungeontalk.chat.regular
Shell
복사
개발 중에는 Topic을 쉽게 초기화할 수 있도록 준비해야 한다.

ErrorHandlingDeserializer 설정 누락

초기에는 역직렬화 에러가 발생하면 Consumer가 멈춰버리는 현상이 발생했다.
해결
KafkaConfig에서 JsonDeserializer 설정 개선:
@Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); // ... 기본 설정 // 역직렬화 설정 추가 config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.Object"); return new DefaultKafkaConsumerFactory<>(config); }
Java
복사
Kafka Consumer 설정은 에러 핸들링까지 고려해야 안정적이다.

Kafka 적용 후

플레이어 채팅 기능 정상 동작 확인
로그 확인
Kafka Topic 확인
haminsung@codesche-ui-MacBookPro ~ $ docker exec dungeontalk-kafka kafka-topics --bootstrap-server localhost:9092 --list __consumer_offsets dungeontalk.chat.regular docker exec dungeontalk-kafka kafka-topics --bootstrap-server localhost:9092 --describe --topic dungeontalk.chat.regular Topic: dungeontalk.chat.regular TopicId: Z8Y1uUmYTZ-9Fi2ddOI8Hg PartitionCount: 3 ReplicationFactor: 1 Configs: compression.type=lz4,retention.ms=86400000 Topic: dungeontalk.chat.regular Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: dungeontalk.chat.regular Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: dungeontalk.chat.regular Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Java
복사

7. Before & After 비교

Redis Pub/Sub (Before)

장점:
구현이 간단함
낮은 지연시간
단점:
메시지 유실 가능
메시지 재처리 불가
확장성 제한
장애 복구 어려움

Kafka (After)

장점:
메시지 유실 방지 (디스크 저장)
메시지 재처리 가능 (offset 조정)
높은 처리량 (~50K+ msg/s)
파티션 기반 확장성
Consumer Group으로 장애 복구
단점:
초기 설정이 복잡함
운영 비용 증가 (Kafka 인프라)
학습 곡선이 높음

성능 비교

항목
Redis Pub/Sub
Kafka
처리량
~5,000 msg/s
~50,000+ msg/s
메시지 유실
발생 가능
없음
순서 보장
(파티션별)
확장성
제한적
우수

8. 배운 점

Jackson 역직렬화의 중요성: DTO 클래스 설계 시 기본 생성자를 반드시 고려해야 한다.
Kafka 파티셔닝 전략: roomId 를 파티션 키로 사용하면 순서 보장과 병렬 처리를 동시에 달성할 수 있다.
비동기 처리의 중요성: Kafka Producer를 별도 스레드 풀에서 처리하여 메인 스레드 블로킹을 방지했다.