Backend
home

2025-7-1 (화)

생성일
2025/06/30 23:12
태그
ElasticSearch
Kubernetes
Kafka
Kibana

Kafka

검색어 이벤트 처리

검색어 입력 → producer → kafka broker → consumer → ElasticSearch

Kafka vs ELS(Logstash 기반) 차이

Kafka란?

많은 데이터를 빠르고, 안전하게, 분산환경에서 처리하고 전달해주는 시스템
주요특징
분산 구조
여러 대의 서버에 데이터를 분산하여 저장 & 처리
서버가 죽어도 데이터가 사라지지 않도록 복제
대용량 처리
초당 수십만 ~ 수백만 건의 데이터 처리 가능
실시간 데이터 처리
데이터가 들어오자마자 바로 다른 시스템으로 전달 가능
내구성/신뢰성
저장된 데이터는 쉽게 사라지지 않고, 소비자가 읽을 때까지 안전하게 저장

Kafka 개념

Producer(프로듀서): 메세지를 “카프카”에 보내는 역할 (예: 로그 발생 서버)
Broker(브로커): 카프카 서버(메세지 저장소)
Topic(토픽): 메세지를 주제별로 구분하는 논리적 공간(폴더)
Consumer(컨슈머): 카프카에서 메세지를 꺼내가는 역할 (예: 분석 서버, 알림 서버 등)
Consumer Group: 여러 컨슈머가 같은 토픽의 데이터를 나눠서 읽는 그룹
Producer: 글을 쓰는 사람
Topic: 게시판
Broker: 카페 서버
Consumer: 글을 읽는 사람

Kafka 실습

docker-compose.monitoring.yml 쪽에 Kafka 내용 추가

kafka: image: confluentinc/cp-kafka:7.6.0 container_name: kafka restart: unless-stopped ports: - "9092:9092" - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper networks: - prod_server
YAML
복사

application-properties 설정

spring.kafka.bootstrap-servers=localhost:29092 spring.kafka.consumer.group-id=search-log-group spring.kafka.consumer.auto-offset-reset=earliest management.health.elasticsearch.enabled=false
Markdown
복사

searchlog 패키지 생성 및 코드 작성

domain > SearchLogDocument
package org.example.backendproject.board.searchlog.domain; import jakarta.persistence.Id; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.elasticsearch.annotations.Document; @Document(indexName = "search-log-index") @Data @NoArgsConstructor @AllArgsConstructor @Builder public class SearchLogDocument { // 엘라스틱 서치에 저장되는 검색 데이터 @Id private String id; private String keyword; private String userId; private String searchedAt; }
Java
복사
dto > SearchLogMessage
package org.example.backendproject.board.searchlog.dto; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class SearchLogMessage { // kafka로 주고받는 메세지 포맷(DTO) private String keyword; // 검색된 키워드 private String userId; // 검색한 유저 Id private String searchedAt; // 검색한 시간 }
Java
복사
repository
package org.example.backendproject.board.searchlog.repository; import org.example.backendproject.board.searchlog.domain.SearchLogDocument; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; @Repository public interface SearchLogRepository extends ElasticsearchRepository<SearchLogDocument, String> { // 엘라스틱 서치 저장/검색용 레포지토리 }
Java
복사
kafka
KafkaConsumerConfig
package org.example.backendproject.board.searchlog.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.example.backendproject.board.searchlog.dto.SearchLogMessage; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.HashMap; import java.util.Map; // Kafka에서 메시지를 받아오기 위한 설정을 담당하는 Spring @Configuration 클래스입니다. // 보통 Kafka를 사용할 때는 ProducerConfig(보내기), ConsumerConfig(받기) 두 가지를 별도 클래스로 관리합니다. @Configuration public class KafkaConsumerConfig { // Kafka에서 메시지를 받을 설정 클래스 // 1. Kafka에서 메시지를 꺼내올 ConsumerFactory 빈(Bean)을 만듭니다. // @Bean을 붙여서, 스프링이 이 메서드의 리턴값을 Bean(객체)로 등록하도록 만듭니다. // ConsumerFactory는 Kafka로부터 메시지를 꺼낼 때 필요한 '소비자' 객체를 만드는 팩토리입니다. @Bean public ConsumerFactory<String, SearchLogMessage> consumerFactory() { //Kafka로부터 받은 메시지를 SearchLogMessage 객체로 자동 변환(역직렬화) 해주는 역할 JsonDeserializer<SearchLogMessage> deserializer = new JsonDeserializer<>( SearchLogMessage.class); //메시지의 타입 정보를 헤더에서 제거할지 설정하는 부분 (false로 두면 타입 정보가 유지되어 타입 매핑이 안전) deserializer.setRemoveTypeHeaders(false); //역직렬화(메시지 → 객체)할 때 어떤 패키지의 클래스까지 허용할지 설정 (현재는 모든 패키지 허용) //역직렬화 대상이 되는 클래스의 “패키지”를 신뢰 목록에 추가 //만약 신뢰하지 않는(=내가 모르는) 패키지의 클래스로 역직렬화가 가능하게 하면, 악성 객체 주입 등 보안 위험이 생길 수 있음. deserializer.addTrustedPackages("*"); //Kafka 메시지의 Key에 대해서도 타입 매퍼를 사용하겠다는 옵션 deserializer.setUseTypeMapperForKey(true); //Kafka Consumer의 필수 옵션들을 담는 Map Map<String, Object> config = new HashMap<>(); //afka 서버의 주소(포트 포함)**를 지정 config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); //이 Consumer가 속한 Consumer Group 이름 //같은 Group 내 Consumer들이 서로 메시지를 분배해서 소비 config.put(ConsumerConfig.GROUP_ID_CONFIG, "search-log-group"); //**Key(메시지의 식별자)**를 어떻게 읽을지 지정 (여기선 String) config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //제로 Kafka Consumer 객체를 만들어주는 Factory를 리턴 return new DefaultKafkaConsumerFactory<>( config, new StringDeserializer(), deserializer ); } //@KafkaListener에서 사용할 Listener 컨테이너 팩토리 Bean을 등록. //여러 개의 Kafka Consumer가 동시에 메시지를 병렬로 처리할 수 있게 해주는 설정. //위에서 만든 consumerFactory를 연결시킴. @Bean public ConcurrentKafkaListenerContainerFactory<String, SearchLogMessage> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, SearchLogMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } /** 이 코드는 Kafka에서 메시지를 읽어오는 설정을 담당합니다. 메시지는 JSON → SearchLogMessage 객체로 자동 변환됩니다. ConsumerFactory는 “실제 메시지 꺼내는 소비자(Consumer)”를 만들어주고, ListenerContainerFactory는 “여러 소비자가 동시에 메시지를 처리할 수 있도록 도와주는 역할”을 합니다. **/ }
Java
복사
KafkaProducerConfig
package org.example.backendproject.board.searchlog.kafka; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.example.backendproject.board.searchlog.dto.SearchLogMessage; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.*; import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.HashMap; import java.util.Map; // Kafka로 메시지(=이벤트)를 보낼 때 필요한 설정을 해주는 Spring @Configuration 클래 @Configuration public class KafkaProducerConfig { //Kafka로 메시지 보낼 설정 클래스 //Kafka로 메시지를 보낼 때 사용하는 프로듀서(Producer) 객체를 만드는 팩토리 //key 타입은 String, value 타입은 SearchLogMessage (우리가 보낼 데이터 타입) @Bean public ProducerFactory<String, SearchLogMessage> producerFactory() { //Kafka Producer 설정을 위한 Map 생성 Map<String, Object> config = new HashMap<>(); //Kafka 서버 주소를 등록 config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); //메시지의 key를 String으로 직렬화 config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //메시지의 value(SearchLogMessage)를 JSON 형태로 직렬화 config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); //위 설정(Map)을 사용해서 ProducerFactory 객체 생성 return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, SearchLogMessage> kafkaTemplate() { //스프링에서 Kafka로 메시지 보낼 때 사용하는 핵심 객체 //에서 만든 ProducerFactory로 KafkaTemplate을 생성해서 Bean으로 등록 return new KafkaTemplate<>(producerFactory()); } }
Java
복사
SearchLogConsumer
package org.example.backendproject.board.searchlog.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.example.backendproject.board.searchlog.domain.SearchLogDocument; import org.example.backendproject.board.searchlog.dto.SearchLogMessage; import org.example.backendproject.board.searchlog.service.SearchLogEsService; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor @Service public class SearchLogConsumer { // 카프카에서 메세지를 꺼내서 엘라스틱서치로 넘기는 클래스 private final SearchLogEsService searchLogEsService; @KafkaListener( topics = "search-log", // 구독한 토픽 이름 groupId = "search-log-group", // 이 컨슈머가 어떤 컨슈머 그룹에 속하는지 containerFactory = "kafkaListenerContainerFactory" // 사용할 리스너 컨테이너 설정 Bean ) public void consume(SearchLogMessage message) { log.info("카프카에서 메세지 수신 : {}", message); // 카프카에서 받은 메세지를 엘라스틱 전용 객체로 변환 SearchLogDocument doc = SearchLogDocument.builder() .keyword(message.getKeyword()) .userId(message.getUserId()) .searchedAt(message.getSearchedAt()) .build(); // 엘라스틱서치에 저장 searchLogEsService.save(doc); } }
Java
복사
service > SearchLogEsService
package org.example.backendproject.board.searchlog.service; import lombok.RequiredArgsConstructor; import org.example.backendproject.board.searchlog.domain.SearchLogDocument; import org.example.backendproject.board.searchlog.repository.SearchLogRepository; import org.springframework.stereotype.Service; @RequiredArgsConstructor @Service public class SearchLogEsService { // 엘라스틱서치 저장, 통계 집계 비즈니스 로직 서비스 private final SearchLogRepository searchLogRepository; // 카프카에서 전달 받은 검색데이터 저장 메서드 public void save(SearchLogDocument searchLogDocument) { searchLogRepository.save(searchLogDocument); } }
Java
복사
검색어 정보 카프카로 전송
package org.example.backendproject.board.elasticsearch.controller; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import lombok.RequiredArgsConstructor; import org.example.backendproject.board.elasticsearch.dto.BoardEsDocument; import org.example.backendproject.board.elasticsearch.service.BoardEsService; import org.example.backendproject.board.searchlog.dto.SearchLogMessage; import org.springframework.data.domain.Page; import org.springframework.http.ResponseEntity; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RequiredArgsConstructor @RestController @RequestMapping("/boards") public class BoardEsController { private final BoardEsService boardEsService; // 스프링에서 카프카로 메세지를 전송하기 위한 컴포넌트 private final KafkaTemplate<String, SearchLogMessage> kafkaTemplate; @GetMapping("/elasticsearch") // 엘라스틱서치 검색 결과를 page 형태로 감싼 다음 HTTP 응답을 json으로 반환 public ResponseEntity<Page<BoardEsDocument>> elasticSearch( @RequestParam String keyword, @RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "10") int size) { // 검색어 정보 카프카 전송 String userId = "1"; String searchedAt = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME); SearchLogMessage message = new SearchLogMessage(keyword, userId, searchedAt); kafkaTemplate.send("search-log", message); // search-log 토픽으로 메세지 전달 return ResponseEntity.ok(boardEsService.search(keyword, page, size)); } }
Java
복사

코드 작성 후 실행 중인 Kafka 확인

ElasticSearch - 조회수, 검색어 순위 키워드 조회, 정렬 작업

조회수 칼럼 추가

Board 엔티티에 조회수 칼럼 추가
@Column(name = "view_count", nullable = false) private Long viewCount = 0L; // 기본값 0으로 초기화
Java
복사

DTO 수정 - viewCount 추가

package org.example.backendproject.board.dto; import com.fasterxml.jackson.annotation.JsonProperty; import java.time.LocalDateTime; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @Getter @Setter @NoArgsConstructor @AllArgsConstructor public class BoardDTO { private Long id; private String title; private String content; private String username; private Long user_id; private LocalDateTime created_date; private LocalDateTime updated_date; private String batchKey; @JsonProperty("view_count") private Long viewCount; public BoardDTO(Long id, String title, String content, String username, Long user_id, LocalDateTime created_date, LocalDateTime updated_date, Long viewCount) { this.id = id; this.title = title; this.content = content; this.username = username; this.user_id = user_id; this.created_date = created_date; this.updated_date = updated_date; this.viewCount = viewCount; } }
Java
복사

BoardEsDocument 쪽에 view_count 추가

package org.example.backendproject.board.elasticsearch.dto; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import jakarta.persistence.Id; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.example.backendproject.board.dto.BoardDTO; import org.springframework.data.elasticsearch.annotations.Document; @JsonIgnoreProperties(ignoreUnknown = true) @Document(indexName = "board-index") @Data @NoArgsConstructor @AllArgsConstructor @Builder public class BoardEsDocument { // 엘라스틱서치에 적용될 문서를 자바 객체로 정의한 클래스 // 엘라스틱 전용 DTO @Id private String id; private String title; private String content; private String username; private Long userId; private String created_date; private String updated_date; private Long view_count = 0L; // BoardDTO를 ElasticSearch 전용 DTO로 변환하는 정적 메서드 public static BoardEsDocument from(BoardDTO dto) { // BoardDTO를 받아서 ElasticSearch DTO로 변환한다. return BoardEsDocument.builder() .id(String.valueOf(dto.getId())) .title(dto.getTitle()) .content(dto.getContent()) .username(dto.getUsername()) .userId(dto.getUser_id()) .created_date(dto.getCreated_date() != null ? dto.getCreated_date().toString() : null) .updated_date(dto.getUpdated_date() != null ? dto.getUpdated_date().toString() : null) .view_count(dto.getViewCount()) .build(); } }
Java
복사

BoardRepository 수정 - 조회수 추가

package org.example.backendproject.board.repository; import java.util.List; import org.example.backendproject.board.dto.BoardDTO; import org.example.backendproject.board.entity.Board; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; public interface BoardRepository extends JpaRepository<Board, Long> { // 특정 User의 게시글만 조회 //@Query(value = "SELECT * FROM board WHERE user_id = :userId", nativeQuery = true) //이거 사용안해도 되는데 예시로 네이티브쿼라로 작성한거임 List<Board> findByUserId(Long userId); //보드 엔티티를 기준으로 조회하되 //Board 엔티티 전체를 반환하는게 아니라 원하는 값만 보드dto 생성자에 넣어서 리스트로 반환합니다 //대소문자 구분없이 검색하는 옵션 //title에 해당 키워드가 포함되어 있거나 content에 키워드가 포함되어잇는거 출력 /** 페이징 적용 전 **/ /** 검색기능 **/ // 제목 또는 내용에 키워드가 포함된 글 검색 (대소문자 구분 없음) @Query("SELECT new org.example.backendproject.board.dto.BoardDTO(" + "b.id, b.title, b.content,b.user.userProfile.username, b.user.id, b.created_date, b.updated_date, b.viewCount" + ") " + "FROM Board b " + "WHERE LOWER(b.title) LIKE LOWER(CONCAT('%', :keyword, '%')) " + "OR LOWER(b.content) LIKE LOWER(CONCAT('%', :keyword, '%'))") List<BoardDTO> searchKeyword(@Param("keyword") String keyword); /** 페이징 적용 후 **/ //페이징 전체 목록 @Query("SELECT new org.example.backendproject.board.dto.BoardDTO(" + "b.id, b.title, b.content,b.user.userProfile.username, b.user.id,b.created_date, b.updated_date, b.viewCount) " + "FROM Board b " + "ORDER BY b.created_date DESC") //쿼리로 정렬 Page<BoardDTO> findAllPaging(Pageable pageable); //페이징 처리 결과를 담는 페이징 객체입니다. //전체 페이지수, 현재 페이지 번호,전체 아이템 겟수 등 페이징 관련 모든 정보들을 반환합니다. //Pageable은 jpa에서 제공하는 페이징 정보를 담은 객체입니다. //page번호, 한페이지당 데이터 갯수 ,정렬 기준 등 파라미터를 받아 원하는 조건으로 페이징 및 정렬 쿼리를 생성할 수 있습니다. //페이징 검색 목록 @Query("SELECT new org.example.backendproject.board.dto.BoardDTO(" + "b.id, b.title, b.content,b.user.userProfile.username, b.user.id, b.created_date, b.updated_date, b.viewCount) " + "FROM Board b " + "WHERE LOWER(b.title) LIKE LOWER(CONCAT('%', :keyword, '%')) " + "OR LOWER(b.content) LIKE LOWER(CONCAT('%', :keyword, '%'))" + "ORDER BY b.created_date DESC")// 쿼리로 정렬 Page<BoardDTO> searchKeywordPaging(@Param("keyword") String keyword, Pageable pageable); }
Java
복사

BatchInsert 쪽의 조회수(view_count) 추가해주기

BatchRepository
package org.example.backendproject.board.repository; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; import lombok.RequiredArgsConstructor; import org.example.backendproject.board.dto.BoardDTO; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; @Repository @RequiredArgsConstructor public class BatchRepository { private final JdbcTemplate jdbcTemplate; public void batchInsert(List<BoardDTO> boardDTO) { String sql = "INSERT INTO board (title, content, user_id, created_date, updated_date, batch_key, view_count) " + "VALUES (?, ?, ?, ?, ?, ?, ?) "; jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { BoardDTO dto = boardDTO.get(i); ps.setString(1, dto.getTitle()); ps.setString(2, dto.getContent()); ps.setLong(3, dto.getUser_id()); ps.setString(4, String.valueOf(dto.getCreated_date())); ps.setString(5, String.valueOf(dto.getUpdated_date())); ps.setString(6, dto.getBatchKey()); ps.setLong(7, dto.getViewCount() != null ? dto.getViewCount() : 0L); } @Override public int getBatchSize() { return boardDTO.size(); } }); } public List<BoardDTO> findByBatchKey(String batchKey) { String sql = "SELECT b.id, b.title, b.content, b.user_id, b.created_date, b.updated_date, b.batch_key, b.view_count, up.username " + "FROM board b " + "JOIN user u ON b.user_id = u.id " + "JOIN user_profile up ON up.user_id = u.id " + "WHERE b.batch_key = ?"; return jdbcTemplate.query(sql, new Object[]{batchKey}, (rs, rowNum) -> { BoardDTO dto = new BoardDTO(); dto.setId(rs.getLong("id")); dto.setTitle(rs.getString("title")); dto.setContent(rs.getString("content")); dto.setUser_id(rs.getLong("user_id")); dto.setCreated_date(rs.getTimestamp("created_date") != null ? rs.getTimestamp("created_date").toLocalDateTime() : null); dto.setUpdated_date(rs.getTimestamp("updated_date") != null ? rs.getTimestamp("updated_date").toLocalDateTime() : null); dto.setBatchKey(rs.getString("batch_key")); dto.setUsername(rs.getString("username")); dto.setViewCount(rs.getLong("view_count")); return dto; }); } }
Java
복사
BoardService
... 중략 ... /** 배치 작업 (JdbcTemplate) **/ @Transactional public void batchSaveBoard(List<BoardDTO> boardDTOList) { Long start = System.currentTimeMillis(); int batchsize = 10; // 한번에 처리할 배치 크기 for (int i = 0; i < boardDTOList.size(); i += batchsize) { // i는 1000씩 증가 // 전체 데이터를 1000개씩 잘라서 배치리스트에 담는다. long batchStart = System.currentTimeMillis(); int end = Math.min(boardDTOList.size(), i + batchsize); // 두 개의 숫자 중 작은 수를 반환 List<BoardDTO> batchList = boardDTOList.subList(i, end); // 전체 데이터에서 1000씩 작업을 하는데 마지막 데이터가 1000개가 안 될수도 있으니 // Math.min()으로 전체 크기를 넘지 않게 마지막 인덱스를 계산해서 작업한다. // 내가 넣은 데이터만 엘라스틱서치에 동기화하기 위해 uuid 생성 String batchKey = UUID.randomUUID().toString(); for (BoardDTO dto : batchList) { dto.setBatchKey(batchKey); } // 1. MySQL로 INSERT batchRepository.batchInsert(batchList); // 2. MySQL에 Insert한 데이터를 다시 조회 List<BoardDTO> saveBoards = batchRepository.findByBatchKey(batchKey); // 3. 엘라스틱서치용으로 변환 List<BoardEsDocument> documents = saveBoards.stream() .map(BoardEsDocument::from) // DTO -> 엘라스틱서치용 dto로 변환 .toList(); // 4. ElasticSearch 벌크 인덱싱 (재시도 포함) boolean success = false; int retryCount = 3; while (retryCount-- > 0 && !success) { try { // 4. 엘라스틱서치 bulk 인덱싱 boardEsService.bulkIndexInsert(documents); success = true; } catch (Exception e) { log.warn("[BOARD][BATCH] Elasticsearch 인덱싱 실패 (남은 재시도 {}회): {}", retryCount, e.getMessage()); if (retryCount == 0) { log.error("[BOARD][BATCH] Elasticsearch 최종 실패. 수동 복구 필요: {}", e.getMessage(), e); } try { Thread.sleep(1000); // 1초 대기 후 재시도 } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } } long batchEnd = System.currentTimeMillis(); log.info("[BOARD][BATCH] {}~{} ({}건) 처리 완료 - {} ms", i, end - 1, batchList.size(), batchEnd - batchStart); } Long end = System.currentTimeMillis(); log.info("[BOARD][BATCH] 전체 저장 소요 시간(ms): {}", (end - start)); } ... 중략 ...
Java
복사

board-index 내용 변경

mappings 부분에서 view_count 추가
추가 후 ElasticSearch의 Dev Tool에서 인덱스 지우고 다시 생성해준 다음 데이터를 추가해줘야 확인 가능
실무에서는 인덱스를 복제하는 방식으로 진행
"updated_date": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss.SSSSSS||yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis" }, "view_count": { "type": "long" }
Java
복사

BoardEsService - 검색어 순위 키워드 조회 + 정렬

... 중략 ... // 검색어 순위 키워드 조회 기능 public List<String> getTopSearchKeyword() { // TermsAggreagation 엘라스틱서치의 집계 메서드 TermsAggregation termsAggregation = TermsAggregation.of(t -> t .field("keyword.keyword") // 집계 기준 필드 .size(10)); // 상위 10개만 불러오기 // 집계 요청 SearchRequest request = SearchRequest.of(s -> s .index("search-log-index") // 집계를 가져올 인덱스 이름 .size(0) // 집계만 가져오고 검색 결과는 가져오지 않음 .aggregations("top_keywords", a -> a.terms(termsAggregation)) // 인기 검색어 집계 ); try { // 집계 응답 SearchResponse<Void> response = client.search(request, void.class); return response.aggregations() // 응답 결과에서 집계 결과만 꺼냄 .get("top_keywords") // 위에서 내가 집계요청한 이름 .sterms() // String terms로 변환 .buckets() // 집계 결과 버킷 리스트 .array() // 버킷 리스트를 배열로 변환 .stream() // 배열을 스트림으로 변환 .map(bucket -> bucket.key().stringValue()) // 버킷의 key값을 문자열로 꺼냄 .map(Object::toString) // String으로 변환 .collect(Collectors.toList()); // 스트림 결과를 리스트로 모아서 반환 } catch (IOException e) { throw new RuntimeException("검색어 통계중 오류 발생", e); } }
Java
복사

BoardEsController - BoardEsService의 검색어 순위 키워드 조회 서비스 호출

@GetMapping("/top-keywords") public ResponseEntity<List<String>> getTopKeyWord() { List<String> keywords = boardEsService.getTopSearchKeyword(); return ResponseEntity.ok(keywords); }
Java
복사

작업 이후 로그인하여 조회 결과 및 WAS 쪽 로그 확인

Kibana에서 검색어 통계 확인

Discover 에서 로그 확인

Postman 테스트 - 검색어 순위 키워드 조회

오늘 푸시한 커밋리스트

날짜
커밋 메시지
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01
2025-07-01