Kafka
검색어 이벤트 처리
•
검색어 입력 → producer → kafka broker → consumer → ElasticSearch
Kafka vs ELS(Logstash 기반) 차이
Kafka란?
•
많은 데이터를 빠르고, 안전하게, 분산환경에서 처리하고 전달해주는 시스템
•
주요특징
◦
분산 구조
▪
여러 대의 서버에 데이터를 분산하여 저장 & 처리
▪
서버가 죽어도 데이터가 사라지지 않도록 복제
◦
대용량 처리
▪
초당 수십만 ~ 수백만 건의 데이터 처리 가능
◦
실시간 데이터 처리
▪
데이터가 들어오자마자 바로 다른 시스템으로 전달 가능
◦
내구성/신뢰성
▪
저장된 데이터는 쉽게 사라지지 않고, 소비자가 읽을 때까지 안전하게 저장
Kafka 개념
•
Producer(프로듀서): 메세지를 “카프카”에 보내는 역할 (예: 로그 발생 서버)
•
Broker(브로커): 카프카 서버(메세지 저장소)
•
Topic(토픽): 메세지를 주제별로 구분하는 논리적 공간(폴더)
•
Consumer(컨슈머): 카프카에서 메세지를 꺼내가는 역할 (예: 분석 서버, 알림 서버 등)
•
Consumer Group: 여러 컨슈머가 같은 토픽의 데이터를 나눠서 읽는 그룹
Producer: 글을 쓰는 사람
Topic: 게시판
Broker: 카페 서버
Consumer: 글을 읽는 사람
Kafka 실습
docker-compose.monitoring.yml 쪽에 Kafka 내용 추가
kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
restart: unless-stopped
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
networks:
- prod_server
YAML
복사
application-properties 설정
spring.kafka.bootstrap-servers=localhost:29092
spring.kafka.consumer.group-id=search-log-group
spring.kafka.consumer.auto-offset-reset=earliest
management.health.elasticsearch.enabled=false
Markdown
복사
searchlog 패키지 생성 및 코드 작성
•
domain > SearchLogDocument
package org.example.backendproject.board.searchlog.domain;
import jakarta.persistence.Id;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.annotations.Document;
@Document(indexName = "search-log-index")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SearchLogDocument {
// 엘라스틱 서치에 저장되는 검색 데이터
@Id
private String id;
private String keyword;
private String userId;
private String searchedAt;
}
Java
복사
•
dto > SearchLogMessage
package org.example.backendproject.board.searchlog.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SearchLogMessage {
// kafka로 주고받는 메세지 포맷(DTO)
private String keyword; // 검색된 키워드
private String userId; // 검색한 유저 Id
private String searchedAt; // 검색한 시간
}
Java
복사
•
repository
package org.example.backendproject.board.searchlog.repository;
import org.example.backendproject.board.searchlog.domain.SearchLogDocument;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface SearchLogRepository extends ElasticsearchRepository<SearchLogDocument, String> {
// 엘라스틱 서치 저장/검색용 레포지토리
}
Java
복사
•
kafka
◦
KafkaConsumerConfig
package org.example.backendproject.board.searchlog.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.backendproject.board.searchlog.dto.SearchLogMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
// Kafka에서 메시지를 받아오기 위한 설정을 담당하는 Spring @Configuration 클래스입니다.
// 보통 Kafka를 사용할 때는 ProducerConfig(보내기), ConsumerConfig(받기) 두 가지를 별도 클래스로 관리합니다.
@Configuration
public class KafkaConsumerConfig {
// Kafka에서 메시지를 받을 설정 클래스
// 1. Kafka에서 메시지를 꺼내올 ConsumerFactory 빈(Bean)을 만듭니다.
// @Bean을 붙여서, 스프링이 이 메서드의 리턴값을 Bean(객체)로 등록하도록 만듭니다.
// ConsumerFactory는 Kafka로부터 메시지를 꺼낼 때 필요한 '소비자' 객체를 만드는 팩토리입니다.
@Bean
public ConsumerFactory<String, SearchLogMessage> consumerFactory() {
//Kafka로부터 받은 메시지를 SearchLogMessage 객체로 자동 변환(역직렬화) 해주는 역할
JsonDeserializer<SearchLogMessage> deserializer = new JsonDeserializer<>(
SearchLogMessage.class);
//메시지의 타입 정보를 헤더에서 제거할지 설정하는 부분 (false로 두면 타입 정보가 유지되어 타입 매핑이 안전)
deserializer.setRemoveTypeHeaders(false);
//역직렬화(메시지 → 객체)할 때 어떤 패키지의 클래스까지 허용할지 설정 (현재는 모든 패키지 허용)
//역직렬화 대상이 되는 클래스의 “패키지”를 신뢰 목록에 추가
//만약 신뢰하지 않는(=내가 모르는) 패키지의 클래스로 역직렬화가 가능하게 하면, 악성 객체 주입 등 보안 위험이 생길 수 있음.
deserializer.addTrustedPackages("*");
//Kafka 메시지의 Key에 대해서도 타입 매퍼를 사용하겠다는 옵션
deserializer.setUseTypeMapperForKey(true);
//Kafka Consumer의 필수 옵션들을 담는 Map
Map<String, Object> config = new HashMap<>();
//afka 서버의 주소(포트 포함)**를 지정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
//이 Consumer가 속한 Consumer Group 이름
//같은 Group 내 Consumer들이 서로 메시지를 분배해서 소비
config.put(ConsumerConfig.GROUP_ID_CONFIG, "search-log-group");
//**Key(메시지의 식별자)**를 어떻게 읽을지 지정 (여기선 String)
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//제로 Kafka Consumer 객체를 만들어주는 Factory를 리턴
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
deserializer
);
}
//@KafkaListener에서 사용할 Listener 컨테이너 팩토리 Bean을 등록.
//여러 개의 Kafka Consumer가 동시에 메시지를 병렬로 처리할 수 있게 해주는 설정.
//위에서 만든 consumerFactory를 연결시킴.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SearchLogMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SearchLogMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
/**
이 코드는 Kafka에서 메시지를 읽어오는 설정을 담당합니다.
메시지는 JSON → SearchLogMessage 객체로 자동 변환됩니다.
ConsumerFactory는 “실제 메시지 꺼내는 소비자(Consumer)”를 만들어주고,
ListenerContainerFactory는 “여러 소비자가 동시에 메시지를 처리할 수 있도록 도와주는 역할”을 합니다.
**/
}
Java
복사
◦
KafkaProducerConfig
package org.example.backendproject.board.searchlog.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.example.backendproject.board.searchlog.dto.SearchLogMessage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
// Kafka로 메시지(=이벤트)를 보낼 때 필요한 설정을 해주는 Spring @Configuration 클래
@Configuration
public class KafkaProducerConfig {
//Kafka로 메시지 보낼 설정 클래스
//Kafka로 메시지를 보낼 때 사용하는 프로듀서(Producer) 객체를 만드는 팩토리
//key 타입은 String, value 타입은 SearchLogMessage (우리가 보낼 데이터 타입)
@Bean
public ProducerFactory<String, SearchLogMessage> producerFactory() {
//Kafka Producer 설정을 위한 Map 생성
Map<String, Object> config = new HashMap<>();
//Kafka 서버 주소를 등록
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
//메시지의 key를 String으로 직렬화
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//메시지의 value(SearchLogMessage)를 JSON 형태로 직렬화
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//위 설정(Map)을 사용해서 ProducerFactory 객체 생성
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, SearchLogMessage> kafkaTemplate() {
//스프링에서 Kafka로 메시지 보낼 때 사용하는 핵심 객체
//에서 만든 ProducerFactory로 KafkaTemplate을 생성해서 Bean으로 등록
return new KafkaTemplate<>(producerFactory());
}
}
Java
복사
◦
SearchLogConsumer
package org.example.backendproject.board.searchlog.kafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.backendproject.board.searchlog.domain.SearchLogDocument;
import org.example.backendproject.board.searchlog.dto.SearchLogMessage;
import org.example.backendproject.board.searchlog.service.SearchLogEsService;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@RequiredArgsConstructor
@Service
public class SearchLogConsumer {
// 카프카에서 메세지를 꺼내서 엘라스틱서치로 넘기는 클래스
private final SearchLogEsService searchLogEsService;
@KafkaListener(
topics = "search-log", // 구독한 토픽 이름
groupId = "search-log-group", // 이 컨슈머가 어떤 컨슈머 그룹에 속하는지
containerFactory = "kafkaListenerContainerFactory" // 사용할 리스너 컨테이너 설정 Bean
)
public void consume(SearchLogMessage message) {
log.info("카프카에서 메세지 수신 : {}", message);
// 카프카에서 받은 메세지를 엘라스틱 전용 객체로 변환
SearchLogDocument doc = SearchLogDocument.builder()
.keyword(message.getKeyword())
.userId(message.getUserId())
.searchedAt(message.getSearchedAt())
.build();
// 엘라스틱서치에 저장
searchLogEsService.save(doc);
}
}
Java
복사
◦
service > SearchLogEsService
package org.example.backendproject.board.searchlog.service;
import lombok.RequiredArgsConstructor;
import org.example.backendproject.board.searchlog.domain.SearchLogDocument;
import org.example.backendproject.board.searchlog.repository.SearchLogRepository;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Service
public class SearchLogEsService {
// 엘라스틱서치 저장, 통계 집계 비즈니스 로직 서비스
private final SearchLogRepository searchLogRepository;
// 카프카에서 전달 받은 검색데이터 저장 메서드
public void save(SearchLogDocument searchLogDocument) {
searchLogRepository.save(searchLogDocument);
}
}
Java
복사
◦
검색어 정보 카프카로 전송
package org.example.backendproject.board.elasticsearch.controller;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.example.backendproject.board.elasticsearch.dto.BoardEsDocument;
import org.example.backendproject.board.elasticsearch.service.BoardEsService;
import org.example.backendproject.board.searchlog.dto.SearchLogMessage;
import org.springframework.data.domain.Page;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
@RestController
@RequestMapping("/boards")
public class BoardEsController {
private final BoardEsService boardEsService;
// 스프링에서 카프카로 메세지를 전송하기 위한 컴포넌트
private final KafkaTemplate<String, SearchLogMessage> kafkaTemplate;
@GetMapping("/elasticsearch")
// 엘라스틱서치 검색 결과를 page 형태로 감싼 다음 HTTP 응답을 json으로 반환
public ResponseEntity<Page<BoardEsDocument>> elasticSearch(
@RequestParam String keyword,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size) {
// 검색어 정보 카프카 전송
String userId = "1";
String searchedAt = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
SearchLogMessage message = new SearchLogMessage(keyword, userId, searchedAt);
kafkaTemplate.send("search-log", message); // search-log 토픽으로 메세지 전달
return ResponseEntity.ok(boardEsService.search(keyword, page, size));
}
}
Java
복사
코드 작성 후 실행 중인 Kafka 확인
ElasticSearch - 조회수, 검색어 순위 키워드 조회, 정렬 작업
조회수 칼럼 추가
•
Board 엔티티에 조회수 칼럼 추가
@Column(name = "view_count", nullable = false)
private Long viewCount = 0L; // 기본값 0으로 초기화
Java
복사
DTO 수정 - viewCount 추가
package org.example.backendproject.board.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class BoardDTO {
private Long id;
private String title;
private String content;
private String username;
private Long user_id;
private LocalDateTime created_date;
private LocalDateTime updated_date;
private String batchKey;
@JsonProperty("view_count")
private Long viewCount;
public BoardDTO(Long id, String title, String content, String username, Long user_id,
LocalDateTime created_date, LocalDateTime updated_date, Long viewCount) {
this.id = id;
this.title = title;
this.content = content;
this.username = username;
this.user_id = user_id;
this.created_date = created_date;
this.updated_date = updated_date;
this.viewCount = viewCount;
}
}
Java
복사
BoardEsDocument 쪽에 view_count 추가
package org.example.backendproject.board.elasticsearch.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import jakarta.persistence.Id;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.example.backendproject.board.dto.BoardDTO;
import org.springframework.data.elasticsearch.annotations.Document;
@JsonIgnoreProperties(ignoreUnknown = true)
@Document(indexName = "board-index")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class BoardEsDocument {
// 엘라스틱서치에 적용될 문서를 자바 객체로 정의한 클래스
// 엘라스틱 전용 DTO
@Id
private String id;
private String title;
private String content;
private String username;
private Long userId;
private String created_date;
private String updated_date;
private Long view_count = 0L;
// BoardDTO를 ElasticSearch 전용 DTO로 변환하는 정적 메서드
public static BoardEsDocument from(BoardDTO dto) {
// BoardDTO를 받아서 ElasticSearch DTO로 변환한다.
return BoardEsDocument.builder()
.id(String.valueOf(dto.getId()))
.title(dto.getTitle())
.content(dto.getContent())
.username(dto.getUsername())
.userId(dto.getUser_id())
.created_date(dto.getCreated_date() != null ? dto.getCreated_date().toString() : null)
.updated_date(dto.getUpdated_date() != null ? dto.getUpdated_date().toString() : null)
.view_count(dto.getViewCount())
.build();
}
}
Java
복사
BoardRepository 수정 - 조회수 추가
package org.example.backendproject.board.repository;
import java.util.List;
import org.example.backendproject.board.dto.BoardDTO;
import org.example.backendproject.board.entity.Board;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
public interface BoardRepository extends JpaRepository<Board, Long> {
// 특정 User의 게시글만 조회
//@Query(value = "SELECT * FROM board WHERE user_id = :userId", nativeQuery = true) //이거 사용안해도 되는데 예시로 네이티브쿼라로 작성한거임
List<Board> findByUserId(Long userId);
//보드 엔티티를 기준으로 조회하되
//Board 엔티티 전체를 반환하는게 아니라 원하는 값만 보드dto 생성자에 넣어서 리스트로 반환합니다
//대소문자 구분없이 검색하는 옵션
//title에 해당 키워드가 포함되어 있거나 content에 키워드가 포함되어잇는거 출력
/** 페이징 적용 전 **/
/** 검색기능 **/
// 제목 또는 내용에 키워드가 포함된 글 검색 (대소문자 구분 없음)
@Query("SELECT new org.example.backendproject.board.dto.BoardDTO(" +
"b.id, b.title, b.content,b.user.userProfile.username, b.user.id, b.created_date, b.updated_date, b.viewCount" +
") " +
"FROM Board b " +
"WHERE LOWER(b.title) LIKE LOWER(CONCAT('%', :keyword, '%')) " +
"OR LOWER(b.content) LIKE LOWER(CONCAT('%', :keyword, '%'))")
List<BoardDTO> searchKeyword(@Param("keyword") String keyword);
/** 페이징 적용 후 **/
//페이징 전체 목록
@Query("SELECT new org.example.backendproject.board.dto.BoardDTO(" +
"b.id, b.title, b.content,b.user.userProfile.username, b.user.id,b.created_date, b.updated_date, b.viewCount) " +
"FROM Board b "
+ "ORDER BY b.created_date DESC") //쿼리로 정렬
Page<BoardDTO> findAllPaging(Pageable pageable);
//페이징 처리 결과를 담는 페이징 객체입니다.
//전체 페이지수, 현재 페이지 번호,전체 아이템 겟수 등 페이징 관련 모든 정보들을 반환합니다.
//Pageable은 jpa에서 제공하는 페이징 정보를 담은 객체입니다.
//page번호, 한페이지당 데이터 갯수 ,정렬 기준 등 파라미터를 받아 원하는 조건으로 페이징 및 정렬 쿼리를 생성할 수 있습니다.
//페이징 검색 목록
@Query("SELECT new org.example.backendproject.board.dto.BoardDTO(" +
"b.id, b.title, b.content,b.user.userProfile.username, b.user.id, b.created_date, b.updated_date, b.viewCount) " +
"FROM Board b " +
"WHERE LOWER(b.title) LIKE LOWER(CONCAT('%', :keyword, '%')) " +
"OR LOWER(b.content) LIKE LOWER(CONCAT('%', :keyword, '%'))"
+ "ORDER BY b.created_date DESC")// 쿼리로 정렬
Page<BoardDTO> searchKeywordPaging(@Param("keyword") String keyword, Pageable pageable);
}
Java
복사
BatchInsert 쪽의 조회수(view_count) 추가해주기
•
BatchRepository
package org.example.backendproject.board.repository;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.example.backendproject.board.dto.BoardDTO;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
@RequiredArgsConstructor
public class BatchRepository {
private final JdbcTemplate jdbcTemplate;
public void batchInsert(List<BoardDTO> boardDTO) {
String sql = "INSERT INTO board (title, content, user_id, created_date, updated_date, batch_key, view_count) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?) ";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
BoardDTO dto = boardDTO.get(i);
ps.setString(1, dto.getTitle());
ps.setString(2, dto.getContent());
ps.setLong(3, dto.getUser_id());
ps.setString(4, String.valueOf(dto.getCreated_date()));
ps.setString(5, String.valueOf(dto.getUpdated_date()));
ps.setString(6, dto.getBatchKey());
ps.setLong(7, dto.getViewCount() != null ? dto.getViewCount() : 0L);
}
@Override
public int getBatchSize() {
return boardDTO.size();
}
});
}
public List<BoardDTO> findByBatchKey(String batchKey) {
String sql = "SELECT b.id, b.title, b.content, b.user_id, b.created_date, b.updated_date, b.batch_key, b.view_count, up.username " +
"FROM board b " +
"JOIN user u ON b.user_id = u.id " +
"JOIN user_profile up ON up.user_id = u.id " +
"WHERE b.batch_key = ?";
return jdbcTemplate.query(sql, new Object[]{batchKey}, (rs, rowNum) -> {
BoardDTO dto = new BoardDTO();
dto.setId(rs.getLong("id"));
dto.setTitle(rs.getString("title"));
dto.setContent(rs.getString("content"));
dto.setUser_id(rs.getLong("user_id"));
dto.setCreated_date(rs.getTimestamp("created_date") != null ? rs.getTimestamp("created_date").toLocalDateTime() : null);
dto.setUpdated_date(rs.getTimestamp("updated_date") != null ? rs.getTimestamp("updated_date").toLocalDateTime() : null);
dto.setBatchKey(rs.getString("batch_key"));
dto.setUsername(rs.getString("username"));
dto.setViewCount(rs.getLong("view_count"));
return dto;
});
}
}
Java
복사
•
BoardService
... 중략 ...
/** 배치 작업 (JdbcTemplate) **/
@Transactional
public void batchSaveBoard(List<BoardDTO> boardDTOList) {
Long start = System.currentTimeMillis();
int batchsize = 10; // 한번에 처리할 배치 크기
for (int i = 0; i < boardDTOList.size(); i += batchsize) { // i는 1000씩 증가
// 전체 데이터를 1000개씩 잘라서 배치리스트에 담는다.
long batchStart = System.currentTimeMillis();
int end = Math.min(boardDTOList.size(), i + batchsize); // 두 개의 숫자 중 작은 수를 반환
List<BoardDTO> batchList = boardDTOList.subList(i, end);
// 전체 데이터에서 1000씩 작업을 하는데 마지막 데이터가 1000개가 안 될수도 있으니
// Math.min()으로 전체 크기를 넘지 않게 마지막 인덱스를 계산해서 작업한다.
// 내가 넣은 데이터만 엘라스틱서치에 동기화하기 위해 uuid 생성
String batchKey = UUID.randomUUID().toString();
for (BoardDTO dto : batchList) {
dto.setBatchKey(batchKey);
}
// 1. MySQL로 INSERT
batchRepository.batchInsert(batchList);
// 2. MySQL에 Insert한 데이터를 다시 조회
List<BoardDTO> saveBoards = batchRepository.findByBatchKey(batchKey);
// 3. 엘라스틱서치용으로 변환
List<BoardEsDocument> documents = saveBoards.stream()
.map(BoardEsDocument::from) // DTO -> 엘라스틱서치용 dto로 변환
.toList();
// 4. ElasticSearch 벌크 인덱싱 (재시도 포함)
boolean success = false;
int retryCount = 3;
while (retryCount-- > 0 && !success) {
try {
// 4. 엘라스틱서치 bulk 인덱싱
boardEsService.bulkIndexInsert(documents);
success = true;
} catch (Exception e) {
log.warn("[BOARD][BATCH] Elasticsearch 인덱싱 실패 (남은 재시도 {}회): {}", retryCount, e.getMessage());
if (retryCount == 0) {
log.error("[BOARD][BATCH] Elasticsearch 최종 실패. 수동 복구 필요: {}", e.getMessage(), e);
}
try {
Thread.sleep(1000); // 1초 대기 후 재시도
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
long batchEnd = System.currentTimeMillis();
log.info("[BOARD][BATCH] {}~{} ({}건) 처리 완료 - {} ms",
i, end - 1, batchList.size(), batchEnd - batchStart);
}
Long end = System.currentTimeMillis();
log.info("[BOARD][BATCH] 전체 저장 소요 시간(ms): {}", (end - start));
}
... 중략 ...
Java
복사
board-index 내용 변경
•
mappings 부분에서 view_count 추가
◦
추가 후 ElasticSearch의 Dev Tool에서 인덱스 지우고 다시 생성해준 다음 데이터를 추가해줘야 확인 가능
◦
실무에서는 인덱스를 복제하는 방식으로 진행
"updated_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSSSSS||yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
},
"view_count": {
"type": "long"
}
Java
복사
BoardEsService - 검색어 순위 키워드 조회 + 정렬
... 중략 ...
// 검색어 순위 키워드 조회 기능
public List<String> getTopSearchKeyword() {
// TermsAggreagation 엘라스틱서치의 집계 메서드
TermsAggregation termsAggregation = TermsAggregation.of(t -> t
.field("keyword.keyword") // 집계 기준 필드
.size(10)); // 상위 10개만 불러오기
// 집계 요청
SearchRequest request = SearchRequest.of(s -> s
.index("search-log-index") // 집계를 가져올 인덱스 이름
.size(0) // 집계만 가져오고 검색 결과는 가져오지 않음
.aggregations("top_keywords", a -> a.terms(termsAggregation)) // 인기 검색어 집계
);
try {
// 집계 응답
SearchResponse<Void> response = client.search(request, void.class);
return response.aggregations() // 응답 결과에서 집계 결과만 꺼냄
.get("top_keywords") // 위에서 내가 집계요청한 이름
.sterms() // String terms로 변환
.buckets() // 집계 결과 버킷 리스트
.array() // 버킷 리스트를 배열로 변환
.stream() // 배열을 스트림으로 변환
.map(bucket -> bucket.key().stringValue()) // 버킷의 key값을 문자열로 꺼냄
.map(Object::toString) // String으로 변환
.collect(Collectors.toList()); // 스트림 결과를 리스트로 모아서 반환
} catch (IOException e) {
throw new RuntimeException("검색어 통계중 오류 발생", e);
}
}
Java
복사
BoardEsController - BoardEsService의 검색어 순위 키워드 조회 서비스 호출
@GetMapping("/top-keywords")
public ResponseEntity<List<String>> getTopKeyWord() {
List<String> keywords = boardEsService.getTopSearchKeyword();
return ResponseEntity.ok(keywords);
}
Java
복사
작업 이후 로그인하여 조회 결과 및 WAS 쪽 로그 확인
Kibana에서 검색어 통계 확인
Discover 에서 로그 확인
Postman 테스트 - 검색어 순위 키워드 조회
오늘 푸시한 커밋리스트
날짜 | 커밋 메시지 |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 | |
2025-07-01 |