개요
DungeonTalk은 AI와 함께하는 TRPG(테이블탑 롤플레잉 게임) 플랫폼이다. 초기에는 Redis Pub/Sub을 사용하여 채팅 메시지를 브로드캐스팅했지만, 서비스 안전성과 확장성을 고려하여 Kafka로의 전환을 시도하게 되었다. 왜 Kafka로 전환을 했고 어떻게 구현했는지 그리고 어떤 문제를 겪었는지에 대해 정리해보고자 한다.
1. 전환 배경: 왜 Redis Pub/Sub에서 Kafka로?
Redis Pub/Sub의 한계
프로젝트 초기에는 MVP 구현을 위해 Redis Pub/Sub을 선택했다. 간단하고 빠르게 구현할 수 있었고, 실시간 메시지 전달에는 문제가 없었다.
// 기존 Redis Pub/Sub 방식
@Component
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
@Async("chatRedisExecutor")
public CompletableFuture<Void> publishAsync(String roomId, String message) {
redisTemplate.convertAndSend("chatroom." + roomId, message);
return CompletableFuture.completedFuture(null);
}
}
Java
복사
하지만 다음과 같은 치명적인 한계를 발견했다:
1.
메시지 유실 가능성: 구독자(Subscriber)가 없거나 연결이 끊어진 순간에는 메시지가 사라진다.
2.
메시지 재처리 불가: 한 번 발행된 메시지는 다시 읽을 수 없다.
3.
확장성 제한: Redis는 단일 서버 구조로 대용량 트래픽 처리에 한계가 있다.
문제 상황
•
일부 사용자의 메시지가 전달되지 않음
•
서버 재시작 시 진행 중이던 채팅이 손실됨
•
트래픽이 몰릴 때 메시지 처리 지연이 발생
⇒ 이러한 문제들을 해결하고자 메시지 큐 시스템 도입을 검토하게 되었다.
2. 왜 Kafka인가?
다른 메시지 큐 시스템과의 비교
항목 | Redis Pub/Sub | RabbitMQ | Kafka |
메시지 지속성 | |||
메시지 재처리 | |||
처리량 | ~5K msg/s | ~20K msg/s | ~50K+ msg/s |
확장성 | |||
순서 보장 | |||
학습 곡선 |
Kafka를 선택한 이유
1.
메시지 영속성: 디스크에 저장되어 유실 걱정이 없음
2.
높은 처리량: 대용량 트래픽 대응 가능 (초당 50,000+ 메시지)
3.
파티셔닝: roomId 를 파티션 키로 사용하여 같은 방의 메시지 순서 보장
4.
Consumer Group: 여러 서버에서 메시지를 나눠 처리 가능
5.
메시지 재처리: 문제 발생 시 offset을 조정하여 메시지 재처리 가능
3. Kafka 아키텍처 설계
Topic 전략
채팅 도메인에 맞게 단순한 Topic 구조를 설계했다.
dungeontalk.chat.regular - 일반 채팅방 메시지 (우선 적용)
Java
복사
토픽 설정:
# 개발 환경 기준
partitions=3 # 파티션 3개
replication.factor=1 # 단일 브로커 (개발용)
retention.ms=86400000 # 1일 보관
compression.type=lz4 # 빠른 압축
YAML
복사
파티셔닝 전략:
핵심: roomId 를 파티션 키로 사용하여 같은 방의 메시지는 항상 같은 파티션으로 전송
// roomId를 파티션 키로 사용
kafkaTemplate.send("dungeontalk.chat.regular", roomId, messageDto);
Java
복사
4. 구현 과정
인프라 구성
•
Docker Compose로 Kafka 환경 구축
# docker-compose-kafka.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: dungeontalk-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: dungeontalk-kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
YAML
복사
•
의존성 추가
// build.gradle
dependencies {
// 기존 의존성...
// Kafka
implementation 'org.springframework.kafka:spring-kafka'
// Test
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
YAML
복사
Kafka 설정
# ======================= Kafka 설정 =======================
# Kafka 브로커 연결
spring.kafka.bootstrap-servers=localhost:9092
# Producer 설정
spring.kafka.producer.acks=1
spring.kafka.producer.retries=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
# Consumer 설정
spring.kafka.consumer.group-id=dungeontalk-chat-consumer
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=org.com.dungeontalk.*
# Topic 이름
kafka.topics.chat.regular=dungeontalk.chat.regular
YAML
복사
•
KafkaConfig.java
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
// ============ Producer ============
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// ============ Consumer ============
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.Object");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2); // 동시 처리 스레드 2개
return factory;
}
}
Java
복사
Producer 구현
•
KafkaPublisher.java
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Value("${kafka.topics.chat.regular}")
private String chatTopic;
/**
* 채팅 메시지 비동기 발행
* @param roomId 채팅방 ID (파티션 키)
* @param message 메시지 객체
*/
@Async("chatKafkaExecutor")
public CompletableFuture<Void> publishChatAsync(String roomId, Object message) {
return kafkaTemplate.send(chatTopic, roomId, message)
.thenAccept(result -> {
log.debug("Kafka 메시지 발행 완료: topic={}, partition={}, offset={}, roomId={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
roomId);
})
.exceptionally(ex -> {
log.error("Kafka 메시지 발행 실패: roomId={}, error={}", roomId, ex.getMessage(), ex);
return null;
});
}
}
Java
복사
Consumer 구현
•
KafkaSubscriber.java
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaSubscriber {
private final SimpMessageSendingOperations messagingTemplate;
/**
* 채팅 메시지 Consumer
* - 메시지를 받아서 WebSocket으로 브로드캐스트
*/
@KafkaListener(
topics = "${kafka.topics.chat.regular}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeChat(ConsumerRecord<String, Object> record) {
String roomId = record.key();
Object message = record.value();
try {
log.debug("Kafka 메시지 수신: topic={}, partition={}, offset={}, roomId={}",
record.topic(), record.partition(), record.offset(), roomId);
// WebSocket으로 브로드캐스트
String destination = "/sub/chat/room/" + roomId;
messagingTemplate.convertAndSend(destination, message);
log.debug("WebSocket 브로드캐스트 완료: destination={}, roomId={}", destination, roomId);
} catch (Exception e) {
log.error("메시지 처리 실패: roomId={}, error={}", roomId, e.getMessage(), e);
// 에러 발생 시 로깅만 하고 계속 진행
}
}
}
Java
복사
비동기 처리 Executor 추가
•
ChatAsyncConfig.java
/**
* Kafka 메시지 발행용 스레드 풀
*/
@Bean(name = "chatKafkaExecutor")
public Executor chatKafkaExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(15);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Chat-Kafka-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
log.warn("채팅 Kafka 실행자 큐가 가득 찼습니다. 호출 스레드에서 작업을 실행합니다.");
runnable.run();
});
executor.initialize();
return executor;
}
Java
복사
채팅 서비스 쪽 수정
// Before: Redis Pub/Sub
private final RedisPublisher redisPublisher;
public ChatMessageDto processMessage(ChatMessageSendRequestDto dto) throws JsonProcessingException {
// ... 메시지 처리 로직
if (chatMessageDto != null) {
redisPublisher.publishAsync(dto.getRoomId(), objectMapper.writeValueAsString(chatMessageDto));
}
return chatMessageDto;
}
Java
복사
// After: Kafka
private final KafkaPublisher kafkaPublisher;
public ChatMessageDto processMessage(ChatMessageSendRequestDto dto) throws JsonProcessingException {
// ... 메시지 처리 로직
if (chatMessageDto != null) {
kafkaPublisher.publishChatAsync(dto.getRoomId(), chatMessageDto);
}
return chatMessageDto;
}
Java
복사
주요 변경사항:
•
RedisPublisher → KafkaPublisher 로 교체
•
JSON 직렬화를 수동으로 하지 않고 객체 그대로 전달 (Kafka가 자동 직렬화)
트러블슈팅
Jackson 역직렬화 실패
•
에러 메시지
com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Cannot construct instance of `org.com.dungeontalk.domain.chat.dto.ChatMessageDto`
(no Creators, like default constructor, exist)
Java
복사
•
원인
ChatMessageDto에 기본 생성자가 없어서 Jackson이 역직렬화를 할 수 없었다.
•
해결
@NoArgsConstructor와 @AllArgsConstructor 추가:
// After: 기본 생성자 추가
@Getter
@Setter
@Builder
@NoArgsConstructor // Jackson이 역직렬화할 때 필요
@AllArgsConstructor // Builder와 함께 사용
public class ChatMessageDto {
// ...
}
Java
복사
Lombok의 @Builder 만 사용하면 기본 생성자가 생성되지 않기 때문에 Jackson 역직렬화를 위해서는
반드시 @NoArgsConstructor를 추가해야 한다.
Topic에 잘못된 메시지가 저장됨
처음 테스트할 때 DTO 구조가 맞지 않는 메시지가 Kafka에 저장되었고, Consumer가 계속 같은 메시지에서 실패하는 현상이 발생했다.
•
해결
Kafka Topic을 삭제하고 다시 시작:
# Topic 삭제
# 애플리케이션 재시작 시 Topic 자동 생성됨
docker exec dungeontalk-kafka kafka-topics \
--bootstrap-server localhost:9092 \
--delete --topic dungeontalk.chat.regular
Shell
복사
개발 중에는 Topic을 쉽게 초기화할 수 있도록 준비해야 한다.
ErrorHandlingDeserializer 설정 누락
초기에는 역직렬화 에러가 발생하면 Consumer가 멈춰버리는 현상이 발생했다.
•
해결
KafkaConfig에서 JsonDeserializer 설정 개선:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// ... 기본 설정
// 역직렬화 설정 추가
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.lang.Object");
return new DefaultKafkaConsumerFactory<>(config);
}
Java
복사
Kafka Consumer 설정은 에러 핸들링까지 고려해야 안정적이다.
Kafka 적용 후
•
플레이어 채팅 기능 정상 동작 확인
•
로그 확인
•
Kafka Topic 확인
haminsung@codesche-ui-MacBookPro ~ $ docker exec dungeontalk-kafka kafka-topics --bootstrap-server localhost:9092 --list
__consumer_offsets
dungeontalk.chat.regular
docker exec dungeontalk-kafka kafka-topics --bootstrap-server localhost:9092 --describe --topic dungeontalk.chat.regular
Topic: dungeontalk.chat.regular TopicId: Z8Y1uUmYTZ-9Fi2ddOI8Hg PartitionCount: 3 ReplicationFactor: 1 Configs: compression.type=lz4,retention.ms=86400000
Topic: dungeontalk.chat.regular Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: dungeontalk.chat.regular Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: dungeontalk.chat.regular Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Java
복사
7. Before & After 비교
Redis Pub/Sub (Before)
장점:
•
구현이 간단함
•
낮은 지연시간
단점:
•
메시지 유실 가능
•
메시지 재처리 불가
•
확장성 제한
•
장애 복구 어려움
Kafka (After)
장점:
•
메시지 유실 방지 (디스크 저장)
•
메시지 재처리 가능 (offset 조정)
•
높은 처리량 (~50K+ msg/s)
•
파티션 기반 확장성
•
Consumer Group으로 장애 복구
단점:
•
초기 설정이 복잡함
•
운영 비용 증가 (Kafka 인프라)
•
학습 곡선이 높음
성능 비교
항목 | Redis Pub/Sub | Kafka |
처리량 | ~5,000 msg/s | ~50,000+ msg/s |
메시지 유실 | 발생 가능 | 없음 |
순서 보장 | ||
확장성 | 제한적 | 우수 |
8. 배운 점
•
Jackson 역직렬화의 중요성: DTO 클래스 설계 시 기본 생성자를 반드시 고려해야 한다.
•
Kafka 파티셔닝 전략: roomId 를 파티션 키로 사용하면 순서 보장과 병렬 처리를 동시에 달성할 수 있다.
•
비동기 처리의 중요성: Kafka Producer를 별도 스레드 풀에서 처리하여 메인 스레드 블로킹을 방지했다.




