diff --git a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java index 1aa8ed52fa..1b47b16fac 100644 --- a/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java +++ b/apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.java @@ -2,6 +2,10 @@ import com.loopers.domain.product.Product; import com.loopers.domain.product.ProductService; +import com.loopers.domain.ranking.MvProductRankMonthly; +import com.loopers.domain.ranking.MvProductRankMonthlyRepository; +import com.loopers.domain.ranking.MvProductRankWeekly; +import com.loopers.domain.ranking.MvProductRankWeeklyRepository; import com.loopers.infrastructure.ranking.RankingRedisRepository; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.ZSetOperations; @@ -23,16 +27,29 @@ public class RankingFacade { private final RankingRedisRepository rankingRedisRepository; private final ProductService productService; + private final MvProductRankWeeklyRepository mvWeeklyRepository; + private final MvProductRankMonthlyRepository mvMonthlyRepository; /** - * 날짜 기반 랭킹 목록을 페이징으로 조회한다. - * - ZSET에서 상위 N개 productId + score 조회 - * - productService.findAllByIds()로 일괄 IN 조회 (N+1 방지) - * - ZSET에 있지만 삭제된 상품은 결과에서 제외 + * period(daily|weekly|monthly) 기반으로 랭킹 목록을 조회한다. + * - daily (기본값): Redis ZSET 조회 (date 파라미터 기반) + * - weekly: mv_product_rank_weekly 조회 + * - monthly: mv_product_rank_monthly 조회 * - * @param date yyyyMMdd 형식. null 또는 빈 값이면 오늘 날짜 사용. + * @param period "daily" | "weekly" | "monthly". null 또는 기타 값이면 "daily" 로 처리. + * @param date yyyyMMdd 형식. period=daily 일 때만 사용. null 이면 오늘. */ - public List findRankings(String date, int page, int size) { + public List findRankings(String period, String date, int page, int size) { + if ("weekly".equalsIgnoreCase(period)) { + return findWeeklyRankings(page, size); + } + if ("monthly".equalsIgnoreCase(period)) { + return findMonthlyRankings(page, size); + } + return findDailyRankings(date, page, size); + } + + private List findDailyRankings(String date, int page, int size) { LocalDate targetDate = (date == null || date.isBlank()) ? LocalDate.now() : LocalDate.parse(date, DATE_FORMATTER); @@ -63,4 +80,48 @@ public List findRankings(String date, int page, int size) { } return result; } + + private List findWeeklyRankings(int page, int size) { + List rows = mvWeeklyRepository.findTop(page, size); + if (rows.isEmpty()) { + return List.of(); + } + List productIds = rows.stream().map(MvProductRankWeekly::getProductId).toList(); + Map productMap = productService.findAllByIds(productIds).stream() + .collect(Collectors.toMap(Product::getId, Function.identity())); + + List result = new ArrayList<>(); + int baseRank = page * size + 1; + for (int i = 0; i < rows.size(); i++) { + MvProductRankWeekly row = rows.get(i); + Product product = productMap.get(row.getProductId()); + if (product == null) { + continue; + } + result.add(RankingInfo.of(baseRank + i, product, row.getScore())); + } + return result; + } + + private List findMonthlyRankings(int page, int size) { + List rows = mvMonthlyRepository.findTop(page, size); + if (rows.isEmpty()) { + return List.of(); + } + List productIds = rows.stream().map(MvProductRankMonthly::getProductId).toList(); + Map productMap = productService.findAllByIds(productIds).stream() + .collect(Collectors.toMap(Product::getId, Function.identity())); + + List result = new ArrayList<>(); + int baseRank = page * size + 1; + for (int i = 0; i < rows.size(); i++) { + MvProductRankMonthly row = rows.get(i); + Product product = productMap.get(row.getProductId()); + if (product == null) { + continue; + } + result.add(RankingInfo.of(baseRank + i, product, row.getScore())); + } + return result; + } } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/product/ProductMetrics.java b/apps/commerce-api/src/main/java/com/loopers/domain/product/ProductMetrics.java index 0a8b4cfb88..88e9747362 100644 --- a/apps/commerce-api/src/main/java/com/loopers/domain/product/ProductMetrics.java +++ b/apps/commerce-api/src/main/java/com/loopers/domain/product/ProductMetrics.java @@ -6,18 +6,24 @@ import jakarta.persistence.GenerationType; import jakarta.persistence.Id; import jakarta.persistence.Table; +import jakarta.persistence.UniqueConstraint; import jakarta.persistence.Version; import lombok.Getter; +import java.time.LocalDate; + /** - * 상품 집계 지표 (product_metrics). - * like_count는 Kafka 이벤트 소비 후 비동기로 갱신된다 (Eventual Consistency). - * commerce-api: 읽기 전용 조회 (likeCount 표시용) - * commerce-streamer: 쓰기 (Kafka Consumer가 upsert) + * 일별 상품 집계 지표 (product_metrics). + * like_count / order_count 는 해당 날짜(date)의 순증감을 기록한다. + * commerce-api: 읽기 전용 조회 (오늘 날짜 기준 likeCount 표시용) + * commerce-streamer: 쓰기 (Kafka Consumer가 (product_id, date) 기준 upsert) */ @Getter @Entity -@Table(name = "product_metrics") +@Table( + name = "product_metrics", + uniqueConstraints = @UniqueConstraint(columnNames = {"product_id", "date"}) +) public class ProductMetrics { @Id @@ -27,9 +33,12 @@ public class ProductMetrics { @Version private Long version; - @Column(name = "product_id", nullable = false, unique = true) + @Column(name = "product_id", nullable = false) private Long productId; + @Column(name = "date", nullable = false) + private LocalDate date; + @Column(name = "like_count", nullable = false) private int likeCount; @@ -39,8 +48,9 @@ public class ProductMetrics { protected ProductMetrics() { } - public ProductMetrics(Long productId, int likeCount) { + public ProductMetrics(Long productId, LocalDate date, int likeCount) { this.productId = productId; + this.date = date; this.likeCount = likeCount; this.orderCount = 0; } diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.java b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.java new file mode 100644 index 0000000000..f87d845ff5 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.java @@ -0,0 +1,41 @@ +package com.loopers.domain.ranking; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Getter; + +import java.time.LocalDateTime; + +/** + * 월간 랭킹 Materialized View (mv_product_rank_monthly). + * Spring Batch MonthlyRankingJob 이 집계한 결과를 조회 전용으로 사용한다. + */ +@Getter +@Entity +@Table(name = "mv_product_rank_monthly") +public class MvProductRankMonthly { + + @Id + @Column(name = "product_id") + private Long productId; + + @Column(name = "like_count", nullable = false) + private int likeCount; + + @Column(name = "order_count", nullable = false) + private int orderCount; + + @Column(name = "score", nullable = false) + private double score; + + @Column(name = "ranking_period", nullable = false) + private String rankingPeriod; + + @Column(name = "updated_at", nullable = false) + private LocalDateTime updatedAt; + + protected MvProductRankMonthly() { + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthlyRepository.java b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthlyRepository.java new file mode 100644 index 0000000000..ab109f0ba6 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthlyRepository.java @@ -0,0 +1,7 @@ +package com.loopers.domain.ranking; + +import java.util.List; + +public interface MvProductRankMonthlyRepository { + List findTop(int page, int size); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.java b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.java new file mode 100644 index 0000000000..d11244c7dd --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.java @@ -0,0 +1,41 @@ +package com.loopers.domain.ranking; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.Getter; + +import java.time.LocalDateTime; + +/** + * 주간 랭킹 Materialized View (mv_product_rank_weekly). + * Spring Batch WeeklyRankingJob 이 집계한 결과를 조회 전용으로 사용한다. + */ +@Getter +@Entity +@Table(name = "mv_product_rank_weekly") +public class MvProductRankWeekly { + + @Id + @Column(name = "product_id") + private Long productId; + + @Column(name = "like_count", nullable = false) + private int likeCount; + + @Column(name = "order_count", nullable = false) + private int orderCount; + + @Column(name = "score", nullable = false) + private double score; + + @Column(name = "year_month_week", nullable = false) + private String yearMonthWeek; + + @Column(name = "updated_at", nullable = false) + private LocalDateTime updatedAt; + + protected MvProductRankWeekly() { + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeeklyRepository.java b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeeklyRepository.java new file mode 100644 index 0000000000..7febb10c8a --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeeklyRepository.java @@ -0,0 +1,7 @@ +package com.loopers.domain.ranking; + +import java.util.List; + +public interface MvProductRankWeeklyRepository { + List findTop(int page, int size); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java index fb4f1cc62f..965f4d9ee4 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java @@ -3,8 +3,9 @@ import com.loopers.domain.product.ProductMetrics; import org.springframework.data.jpa.repository.JpaRepository; +import java.time.LocalDate; import java.util.Optional; public interface ProductMetricsJpaRepository extends JpaRepository { - Optional findByProductId(Long productId); + Optional findByProductIdAndDate(Long productId, LocalDate date); } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java index 55863c96f4..fada8bd342 100644 --- a/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/product/ProductMetricsRepositoryImpl.java @@ -5,6 +5,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Component; +import java.time.LocalDate; import java.util.Optional; @RequiredArgsConstructor @@ -15,6 +16,6 @@ public class ProductMetricsRepositoryImpl implements ProductMetricsRepository { @Override public Optional findByProductId(Long productId) { - return productMetricsJpaRepository.findByProductId(productId); + return productMetricsJpaRepository.findByProductIdAndDate(productId, LocalDate.now()); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java new file mode 100644 index 0000000000..51a5d44a78 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java @@ -0,0 +1,10 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankMonthly; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface MvProductRankMonthlyJpaRepository extends JpaRepository { + Page findAllByOrderByScoreDesc(Pageable pageable); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyRepositoryImpl.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyRepositoryImpl.java new file mode 100644 index 0000000000..dc8060226e --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyRepositoryImpl.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankMonthly; +import com.loopers.domain.ranking.MvProductRankMonthlyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Component; + +import java.util.List; + +@RequiredArgsConstructor +@Component +public class MvProductRankMonthlyRepositoryImpl implements MvProductRankMonthlyRepository { + + private final MvProductRankMonthlyJpaRepository jpaRepository; + + @Override + public List findTop(int page, int size) { + return jpaRepository.findAllByOrderByScoreDesc(PageRequest.of(page, size)).getContent(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.java new file mode 100644 index 0000000000..9c2c2ef3df --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.java @@ -0,0 +1,10 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankWeekly; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface MvProductRankWeeklyJpaRepository extends JpaRepository { + Page findAllByOrderByScoreDesc(Pageable pageable); +} diff --git a/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyRepositoryImpl.java b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyRepositoryImpl.java new file mode 100644 index 0000000000..a469d89602 --- /dev/null +++ b/apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyRepositoryImpl.java @@ -0,0 +1,21 @@ +package com.loopers.infrastructure.ranking; + +import com.loopers.domain.ranking.MvProductRankWeekly; +import com.loopers.domain.ranking.MvProductRankWeeklyRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Component; + +import java.util.List; + +@RequiredArgsConstructor +@Component +public class MvProductRankWeeklyRepositoryImpl implements MvProductRankWeeklyRepository { + + private final MvProductRankWeeklyJpaRepository jpaRepository; + + @Override + public List findTop(int page, int size) { + return jpaRepository.findAllByOrderByScoreDesc(PageRequest.of(page, size)).getContent(); + } +} diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java index 438dec86fc..a5f9eb0d60 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.java @@ -23,26 +23,29 @@ public class RankingV1Controller { private final RankingFacade rankingFacade; /** - * 날짜 기반 상품 랭킹 목록을 페이징으로 조회한다. - * - date 미입력 시 오늘 날짜 기준 + * 기간(period) 기반 상품 랭킹 목록을 페이징으로 조회한다. + * - period: "daily"(기본) | "weekly" | "monthly" + * - date: yyyyMMdd 형식. period=daily 일 때 사용. 미입력 시 오늘 날짜. * - page는 0-based * - 인증 불필요 (공개 API) */ @GetMapping public ApiResponse getRankings( + @RequestParam(required = false) String period, @RequestParam(required = false) String date, @RequestParam(defaultValue = "0") int page, @RequestParam(defaultValue = "20") int size ) { - String targetDate = (date == null || date.isBlank()) - ? LocalDate.now().format(DATE_FORMATTER) - : date; + String resolvedPeriod = (period == null || period.isBlank()) ? "daily" : period; + String targetDate = "daily".equalsIgnoreCase(resolvedPeriod) + ? (date == null || date.isBlank() ? LocalDate.now().format(DATE_FORMATTER) : date) + : null; - List rankings = rankingFacade.findRankings(targetDate, page, size); + List rankings = rankingFacade.findRankings(resolvedPeriod, targetDate, page, size); List content = rankings.stream() .map(RankingV1Dto.RankingResponse::from) .toList(); - return ApiResponse.success(new RankingV1Dto.RankingPageResponse(content, targetDate, page, size)); + return ApiResponse.success(new RankingV1Dto.RankingPageResponse(content, resolvedPeriod, targetDate, page, size)); } } diff --git a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java index 4ca31b1184..8a8ae97353 100644 --- a/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java +++ b/apps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.java @@ -30,6 +30,7 @@ public static RankingResponse from(RankingInfo info) { public record RankingPageResponse( List content, + String period, String date, int page, int size diff --git a/apps/commerce-api/src/test/java/com/loopers/interfaces/api/RankingPeriodV1ApiE2ETest.java b/apps/commerce-api/src/test/java/com/loopers/interfaces/api/RankingPeriodV1ApiE2ETest.java new file mode 100644 index 0000000000..34ffa0357a --- /dev/null +++ b/apps/commerce-api/src/test/java/com/loopers/interfaces/api/RankingPeriodV1ApiE2ETest.java @@ -0,0 +1,218 @@ +package com.loopers.interfaces.api; + +import com.loopers.domain.brand.Brand; +import com.loopers.domain.product.Product; +import com.loopers.domain.product.SellingStatus; +import com.loopers.infrastructure.brand.BrandJpaRepository; +import com.loopers.infrastructure.product.ProductJpaRepository; +import com.loopers.infrastructure.ranking.MvProductRankMonthlyJpaRepository; +import com.loopers.infrastructure.ranking.MvProductRankWeeklyJpaRepository; +import com.loopers.interfaces.api.ranking.RankingV1Dto; +import com.loopers.utils.DatabaseCleanUp; +import com.loopers.utils.RedisCleanUp; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.jdbc.core.JdbcTemplate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +class RankingPeriodV1ApiE2ETest { + + @Autowired + private TestRestTemplate testRestTemplate; + + @Autowired + private BrandJpaRepository brandJpaRepository; + + @Autowired + private ProductJpaRepository productJpaRepository; + + @Autowired + private MvProductRankWeeklyJpaRepository weeklyJpaRepository; + + @Autowired + private MvProductRankMonthlyJpaRepository monthlyJpaRepository; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @Autowired + private DatabaseCleanUp databaseCleanUp; + + @Autowired + private RedisCleanUp redisCleanUp; + + @AfterEach + void tearDown() { + databaseCleanUp.truncateAllTables(); + redisCleanUp.truncateAll(); + } + + private Brand createBrand(String name) { + return brandJpaRepository.save(new Brand(name, null)); + } + + private Product createProduct(Long brandId, String name, int price) { + return productJpaRepository.save(new Product(brandId, name, null, price, 10, SellingStatus.SELLING)); + } + + @DisplayName("GET /api/v1/rankings?period=weekly") + @Nested + class GetWeeklyRankings { + + @DisplayName("mv_product_rank_weekly 데이터가 없으면 빈 목록을 반환한다.") + @Test + void returnsEmptyList_whenNoWeeklyMvData() { + // arrange (empty) + + // act + ResponseEntity> response = testRestTemplate.exchange( + "/api/v1/rankings?period=weekly", + HttpMethod.GET, null, + new ParameterizedTypeReference>() {} + ); + + // assert + assertAll( + () -> assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK), + () -> assertThat(response.getBody().data().content()).isEmpty(), + () -> assertThat(response.getBody().data().period()).isEqualTo("weekly"), + () -> assertThat(response.getBody().data().date()).isNull() + ); + } + + @DisplayName("mv_product_rank_weekly 데이터를 score 내림차순으로 반환한다.") + @Test + void returnsWeeklyRanking_orderedByScore() { + // arrange + Brand brand = createBrand("Nike"); + Product product1 = createProduct(brand.getId(), "상품A", 10000); + Product product2 = createProduct(brand.getId(), "상품B", 20000); + + // product2가 더 높은 score + jdbcTemplate.update( + "INSERT INTO mv_product_rank_weekly (product_id, like_count, order_count, score, year_month_week, updated_at) " + + "VALUES (?, ?, ?, ?, ?, NOW())", + product1.getId(), 2, 1, 1.1, "2025-W15" + ); + jdbcTemplate.update( + "INSERT INTO mv_product_rank_weekly (product_id, like_count, order_count, score, year_month_week, updated_at) " + + "VALUES (?, ?, ?, ?, ?, NOW())", + product2.getId(), 5, 3, 3.1, "2025-W15" + ); + + // act + ResponseEntity> response = testRestTemplate.exchange( + "/api/v1/rankings?period=weekly", + HttpMethod.GET, null, + new ParameterizedTypeReference>() {} + ); + + // assert + assertAll( + () -> assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK), + () -> assertThat(response.getBody().data().content()).hasSize(2), + () -> assertThat(response.getBody().data().content().get(0).rank()).isEqualTo(1), + () -> assertThat(response.getBody().data().content().get(0).productId()).isEqualTo(product2.getId()), + () -> assertThat(response.getBody().data().content().get(1).rank()).isEqualTo(2), + () -> assertThat(response.getBody().data().content().get(1).productId()).isEqualTo(product1.getId()), + () -> assertThat(response.getBody().data().period()).isEqualTo("weekly") + ); + } + } + + @DisplayName("GET /api/v1/rankings?period=monthly") + @Nested + class GetMonthlyRankings { + + @DisplayName("mv_product_rank_monthly 데이터가 없으면 빈 목록을 반환한다.") + @Test + void returnsEmptyList_whenNoMonthlyMvData() { + // arrange (empty) + + // act + ResponseEntity> response = testRestTemplate.exchange( + "/api/v1/rankings?period=monthly", + HttpMethod.GET, null, + new ParameterizedTypeReference>() {} + ); + + // assert + assertAll( + () -> assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK), + () -> assertThat(response.getBody().data().content()).isEmpty(), + () -> assertThat(response.getBody().data().period()).isEqualTo("monthly"), + () -> assertThat(response.getBody().data().date()).isNull() + ); + } + + @DisplayName("mv_product_rank_monthly 데이터를 score 내림차순으로 반환한다.") + @Test + void returnsMonthlyRanking_orderedByScore() { + // arrange + Brand brand = createBrand("Adidas"); + Product product1 = createProduct(brand.getId(), "상품C", 15000); + Product product2 = createProduct(brand.getId(), "상품D", 25000); + + jdbcTemplate.update( + "INSERT INTO mv_product_rank_monthly (product_id, like_count, order_count, score, ranking_period, updated_at) " + + "VALUES (?, ?, ?, ?, ?, NOW())", + product1.getId(), 3, 2, 2.0, "2025-04" + ); + jdbcTemplate.update( + "INSERT INTO mv_product_rank_monthly (product_id, like_count, order_count, score, ranking_period, updated_at) " + + "VALUES (?, ?, ?, ?, ?, NOW())", + product2.getId(), 10, 8, 7.6, "2025-04" + ); + + // act + ResponseEntity> response = testRestTemplate.exchange( + "/api/v1/rankings?period=monthly", + HttpMethod.GET, null, + new ParameterizedTypeReference>() {} + ); + + // assert + assertAll( + () -> assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK), + () -> assertThat(response.getBody().data().content()).hasSize(2), + () -> assertThat(response.getBody().data().content().get(0).productId()).isEqualTo(product2.getId()), + () -> assertThat(response.getBody().data().content().get(1).productId()).isEqualTo(product1.getId()), + () -> assertThat(response.getBody().data().period()).isEqualTo("monthly") + ); + } + } + + @DisplayName("GET /api/v1/rankings (period 미입력)") + @Nested + class GetDefaultPeriodRankings { + + @DisplayName("period 파라미터 없이 호출하면 daily로 처리한다.") + @Test + void defaultsToDailyRanking_whenPeriodParamOmitted() { + // act + ResponseEntity> response = testRestTemplate.exchange( + "/api/v1/rankings", + HttpMethod.GET, null, + new ParameterizedTypeReference>() {} + ); + + // assert + assertAll( + () -> assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK), + () -> assertThat(response.getBody().data().period()).isEqualTo("daily") + ); + } + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MonthlyRankingJobConfig.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MonthlyRankingJobConfig.java new file mode 100644 index 0000000000..cf7357f6f5 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MonthlyRankingJobConfig.java @@ -0,0 +1,149 @@ +package com.loopers.batch.job.ranking; + +import com.loopers.batch.listener.ChunkListener; +import com.loopers.batch.listener.JobListener; +import com.loopers.batch.listener.StepMonitorListener; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAdjusters; + +@ConditionalOnProperty(name = "spring.batch.job.name", havingValue = MonthlyRankingJobConfig.JOB_NAME) +@RequiredArgsConstructor +@Configuration +public class MonthlyRankingJobConfig { + + public static final String JOB_NAME = "monthlyRankingJob"; + private static final String STEP_TRUNCATE = "monthlyTruncateStep"; + private static final String STEP_AGGREGATE = "monthlyAggregateStep"; + private static final int CHUNK_SIZE = 1000; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final DataSource dataSource; + private final JobListener jobListener; + private final StepMonitorListener stepMonitorListener; + private final ChunkListener chunkListener; + + @Bean(JOB_NAME) + public Job monthlyRankingJob() { + return new JobBuilder(JOB_NAME, jobRepository) + .start(monthlyTruncateStep(null)) + .next(monthlyAggregateStep(null)) + .listener(jobListener) + .build(); + } + + @JobScope + @Bean(STEP_TRUNCATE) + public Step monthlyTruncateStep(@Value("#{jobParameters['targetDate']}") String targetDate) { + Tasklet tasklet = (contribution, chunkContext) -> { + new org.springframework.jdbc.core.JdbcTemplate(dataSource) + .update("DELETE FROM mv_product_rank_monthly"); + return RepeatStatus.FINISHED; + }; + return new StepBuilder(STEP_TRUNCATE, jobRepository) + .tasklet(tasklet, transactionManager) + .listener(stepMonitorListener) + .build(); + } + + @JobScope + @Bean(STEP_AGGREGATE) + public Step monthlyAggregateStep(@Value("#{jobParameters['targetDate']}") String targetDate) { + return new StepBuilder(STEP_AGGREGATE, jobRepository) + .chunk(CHUNK_SIZE, transactionManager) + .reader(monthlyReader(targetDate)) + .processor(monthlyProcessor(targetDate)) + .writer(monthlyWriter()) + .listener(stepMonitorListener) + .listener(chunkListener) + .build(); + } + + @StepScope + @Bean("monthlyReader") + public JdbcCursorItemReader monthlyReader( + @Value("#{jobParameters['targetDate']}") String targetDate + ) { + LocalDate target = LocalDate.parse(targetDate, DATE_FORMATTER); + LocalDate startDate = target.with(TemporalAdjusters.firstDayOfMonth()); + LocalDate endDate = target.with(TemporalAdjusters.lastDayOfMonth()); + + return new JdbcCursorItemReaderBuilder() + .name("monthlyReader") + .dataSource(dataSource) + .sql( + "SELECT product_id, SUM(like_count) AS like_count, SUM(order_count) AS order_count, " + + "(SUM(like_count) * " + RankingScoreCalculator.LIKE_WEIGHT + " + SUM(order_count) * " + RankingScoreCalculator.ORDER_WEIGHT + ") AS score " + + "FROM product_metrics " + + "WHERE date BETWEEN ? AND ? " + + "GROUP BY product_id " + + "ORDER BY score DESC " + + "LIMIT 100" + ) + .preparedStatementSetter(ps -> { + ps.setObject(1, startDate); + ps.setObject(2, endDate); + }) + .rowMapper((rs, rowNum) -> new ProductMetricsAggregateRow( + rs.getLong("product_id"), + rs.getInt("like_count"), + rs.getInt("order_count"), + rs.getDouble("score") + )) + .build(); + } + + @StepScope + @Bean("monthlyProcessor") + public ItemProcessor monthlyProcessor( + @Value("#{jobParameters['targetDate']}") String targetDate + ) { + LocalDate target = LocalDate.parse(targetDate, DATE_FORMATTER); + String yearMonth = target.format(DateTimeFormatter.ofPattern("yyyy-MM")); + + return row -> new MvRankRow( + row.productId(), + row.likeCount(), + row.orderCount(), + row.score(), + yearMonth + ); + } + + @Bean("monthlyWriter") + public JdbcBatchItemWriter monthlyWriter() { + return new JdbcBatchItemWriterBuilder() + .dataSource(dataSource) + .sql( + "INSERT INTO mv_product_rank_monthly " + + "(product_id, like_count, order_count, score, ranking_period, updated_at) " + + "VALUES (:productId, :likeCount, :orderCount, :score, :period, NOW())" + ) + .beanMapped() + .build(); + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MvRankRow.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MvRankRow.java new file mode 100644 index 0000000000..ba4900e889 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MvRankRow.java @@ -0,0 +1,14 @@ +package com.loopers.batch.job.ranking; + +/** + * Processor 출력 / JdbcBatchItemWriter 입력 행. + * mv_product_rank_weekly 또는 mv_product_rank_monthly 에 INSERT 된다. + */ +public record MvRankRow( + long productId, + int likeCount, + int orderCount, + double score, + String period +) { +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/ProductMetricsAggregateRow.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/ProductMetricsAggregateRow.java new file mode 100644 index 0000000000..e948fac4be --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/ProductMetricsAggregateRow.java @@ -0,0 +1,13 @@ +package com.loopers.batch.job.ranking; + +/** + * JdbcCursorItemReader 가 읽어온 집계 행 (product_id, like_count 합계, order_count 합계, score). + * score 는 DB에서 직접 계산해 단일 출처로 관리한다. + */ +public record ProductMetricsAggregateRow( + long productId, + int likeCount, + int orderCount, + double score +) { +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/RankingScoreCalculator.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/RankingScoreCalculator.java new file mode 100644 index 0000000000..8c2b5495c1 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/RankingScoreCalculator.java @@ -0,0 +1,18 @@ +package com.loopers.batch.job.ranking; + +/** + * 배치 랭킹 점수 계산 공식. + * Streamer 가중치 (LikeEventConsumer: 0.2, OrderEventConsumer: 0.7) 와 일치시킨다. + */ +public final class RankingScoreCalculator { + + public static final double LIKE_WEIGHT = 0.2; + public static final double ORDER_WEIGHT = 0.7; + + private RankingScoreCalculator() { + } + + public static double calculate(int likeCount, int orderCount) { + return likeCount * LIKE_WEIGHT + orderCount * ORDER_WEIGHT; + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/WeeklyRankingJobConfig.java b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/WeeklyRankingJobConfig.java new file mode 100644 index 0000000000..2547ce2288 --- /dev/null +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/WeeklyRankingJobConfig.java @@ -0,0 +1,151 @@ +package com.loopers.batch.job.ranking; + +import com.loopers.batch.listener.ChunkListener; +import com.loopers.batch.listener.JobListener; +import com.loopers.batch.listener.StepMonitorListener; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobScope; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.database.JdbcBatchItemWriter; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; +import java.time.DayOfWeek; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAdjusters; + +@ConditionalOnProperty(name = "spring.batch.job.name", havingValue = WeeklyRankingJobConfig.JOB_NAME) +@RequiredArgsConstructor +@Configuration +public class WeeklyRankingJobConfig { + + public static final String JOB_NAME = "weeklyRankingJob"; + private static final String STEP_TRUNCATE = "weeklyTruncateStep"; + private static final String STEP_AGGREGATE = "weeklyAggregateStep"; + private static final int CHUNK_SIZE = 1000; + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final DataSource dataSource; + private final JobListener jobListener; + private final StepMonitorListener stepMonitorListener; + private final ChunkListener chunkListener; + + @Bean(JOB_NAME) + public Job weeklyRankingJob() { + return new JobBuilder(JOB_NAME, jobRepository) + .start(weeklyTruncateStep(null)) + .next(weeklyAggregateStep(null)) + .listener(jobListener) + .build(); + } + + @JobScope + @Bean(STEP_TRUNCATE) + public Step weeklyTruncateStep(@Value("#{jobParameters['targetDate']}") String targetDate) { + Tasklet tasklet = (contribution, chunkContext) -> { + new org.springframework.jdbc.core.JdbcTemplate(dataSource) + .update("DELETE FROM mv_product_rank_weekly"); + return RepeatStatus.FINISHED; + }; + return new StepBuilder(STEP_TRUNCATE, jobRepository) + .tasklet(tasklet, transactionManager) + .listener(stepMonitorListener) + .build(); + } + + @JobScope + @Bean(STEP_AGGREGATE) + public Step weeklyAggregateStep(@Value("#{jobParameters['targetDate']}") String targetDate) { + return new StepBuilder(STEP_AGGREGATE, jobRepository) + .chunk(CHUNK_SIZE, transactionManager) + .reader(weeklyReader(targetDate)) + .processor(weeklyProcessor(targetDate)) + .writer(weeklyWriter()) + .listener(stepMonitorListener) + .listener(chunkListener) + .build(); + } + + @StepScope + @Bean("weeklyReader") + public JdbcCursorItemReader weeklyReader( + @Value("#{jobParameters['targetDate']}") String targetDate + ) { + LocalDate target = LocalDate.parse(targetDate, DATE_FORMATTER); + LocalDate startDate = target.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY)); + LocalDate endDate = startDate.plusDays(6); + + return new JdbcCursorItemReaderBuilder() + .name("weeklyReader") + .dataSource(dataSource) + .sql( + "SELECT product_id, SUM(like_count) AS like_count, SUM(order_count) AS order_count, " + + "(SUM(like_count) * " + RankingScoreCalculator.LIKE_WEIGHT + " + SUM(order_count) * " + RankingScoreCalculator.ORDER_WEIGHT + ") AS score " + + "FROM product_metrics " + + "WHERE date BETWEEN ? AND ? " + + "GROUP BY product_id " + + "ORDER BY score DESC " + + "LIMIT 100" + ) + .preparedStatementSetter(ps -> { + ps.setObject(1, startDate); + ps.setObject(2, endDate); + }) + .rowMapper((rs, rowNum) -> new ProductMetricsAggregateRow( + rs.getLong("product_id"), + rs.getInt("like_count"), + rs.getInt("order_count"), + rs.getDouble("score") + )) + .build(); + } + + @StepScope + @Bean("weeklyProcessor") + public ItemProcessor weeklyProcessor( + @Value("#{jobParameters['targetDate']}") String targetDate + ) { + LocalDate target = LocalDate.parse(targetDate, DATE_FORMATTER); + LocalDate monday = target.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY)); + String yearMonthWeek = monday.getYear() + "-W" + String.format("%02d", monday.get(java.time.temporal.WeekFields.ISO.weekOfWeekBasedYear())); + + return row -> new MvRankRow( + row.productId(), + row.likeCount(), + row.orderCount(), + row.score(), + yearMonthWeek + ); + } + + @Bean("weeklyWriter") + public JdbcBatchItemWriter weeklyWriter() { + return new JdbcBatchItemWriterBuilder() + .dataSource(dataSource) + .sql( + "INSERT INTO mv_product_rank_weekly " + + "(product_id, like_count, order_count, score, year_month_week, updated_at) " + + "VALUES (:productId, :likeCount, :orderCount, :score, :period, NOW())" + ) + .beanMapped() + .build(); + } +} diff --git a/apps/commerce-batch/src/main/java/com/loopers/batch/listener/JobListener.java b/apps/commerce-batch/src/main/java/com/loopers/batch/listener/JobListener.java index cb5c8bebd7..95c30404e7 100644 --- a/apps/commerce-batch/src/main/java/com/loopers/batch/listener/JobListener.java +++ b/apps/commerce-batch/src/main/java/com/loopers/batch/listener/JobListener.java @@ -7,6 +7,8 @@ import org.springframework.batch.core.annotation.BeforeJob; import org.springframework.stereotype.Component; +import org.springframework.batch.core.ExitStatus; + import java.time.Duration; import java.time.Instant; import java.time.ZoneId; @@ -18,7 +20,7 @@ public class JobListener { @BeforeJob void beforeJob(JobExecution jobExecution) { - log.info("Job '${jobExecution.jobInstance.jobName}' 시작"); + log.info("Job '{}' 시작", jobExecution.getJobInstance().getJobName()); jobExecution.getExecutionContext().putLong("startTime", System.currentTimeMillis()); } @@ -49,5 +51,13 @@ void afterJob(JobExecution jobExecution) { ).trim(); log.info(message); + + if (ExitStatus.FAILED.getExitCode().equals(jobExecution.getExitStatus().getExitCode())) { + log.error("[{}] Job FAILED — ExitCode: {}, Exceptions: {}", + jobExecution.getJobInstance().getJobName(), + jobExecution.getExitStatus().getExitCode(), + jobExecution.getAllFailureExceptions()); + // TODO: Slack 알람 연동 + } } } diff --git a/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchApplicationTest.java b/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchApplicationTest.java index c5e3bc7a35..88df29a497 100644 --- a/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchApplicationTest.java +++ b/apps/commerce-batch/src/test/java/com/loopers/CommerceBatchApplicationTest.java @@ -3,7 +3,7 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; -@SpringBootTest +@SpringBootTest(properties = "spring.batch.job.enabled=false") public class CommerceBatchApplicationTest { @Test void contextLoads() {} diff --git a/apps/commerce-batch/src/test/java/com/loopers/job/ranking/MonthlyRankingJobE2ETest.java b/apps/commerce-batch/src/test/java/com/loopers/job/ranking/MonthlyRankingJobE2ETest.java new file mode 100644 index 0000000000..b8218a79aa --- /dev/null +++ b/apps/commerce-batch/src/test/java/com/loopers/job/ranking/MonthlyRankingJobE2ETest.java @@ -0,0 +1,136 @@ +package com.loopers.job.ranking; + +import com.loopers.batch.job.ranking.MonthlyRankingJobConfig; +import com.loopers.batch.job.ranking.RankingScoreCalculator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.springframework.batch.core.ExitStatus; +import org.springframework.test.context.jdbc.Sql; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.TestPropertySource; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAdjusters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; + +@SpringBootTest +@SpringBatchTest +@TestPropertySource(properties = "spring.batch.job.name=" + MonthlyRankingJobConfig.JOB_NAME) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Sql(scripts = "classpath:schema/ranking-tables.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_CLASS) +class MonthlyRankingJobE2ETest { + + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Autowired + @Qualifier(MonthlyRankingJobConfig.JOB_NAME) + private Job job; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @AfterEach + void tearDown() { + jdbcTemplate.update("DELETE FROM mv_product_rank_monthly"); + jdbcTemplate.update("DELETE FROM product_metrics"); + } + + @DisplayName("월간 랭킹 배치를 실행하면, 해당 달의 product_metrics를 집계해 mv_product_rank_monthly에 적재한다.") + @Test + void monthlyRankingJob_aggregatesAndWritesToMvTable() throws Exception { + // arrange + LocalDate firstDay = LocalDate.now().with(TemporalAdjusters.firstDayOfMonth()); + // product 1: 월초 + 15일치 — like 총 8, order 총 5 → score = 8*0.2 + 5*0.7 = 5.1 + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, firstDay, 5, 3 + ); + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, firstDay.plusDays(14), 3, 2 + ); + // product 2: 단일 행 — like 0, order 10 → score = 0*0.2 + 10*0.7 = 7.0 + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 2L, firstDay, 0, 10 + ); + + jobLauncherTestUtils.setJob(job); + + // act + var params = new JobParametersBuilder() + .addString("targetDate", firstDay.format(DATE_FORMATTER)) + .addLong("runId", System.nanoTime()) + .toJobParameters(); + var execution = jobLauncherTestUtils.launchJob(params); + + // assert + Integer rowCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM mv_product_rank_monthly", Integer.class + ); + Double scoreProduct1 = jdbcTemplate.queryForObject( + "SELECT score FROM mv_product_rank_monthly WHERE product_id = ?", Double.class, 1L + ); + Double scoreProduct2 = jdbcTemplate.queryForObject( + "SELECT score FROM mv_product_rank_monthly WHERE product_id = ?", Double.class, 2L + ); + + assertAll( + () -> assertThat(execution.getExitStatus().getExitCode()).isEqualTo(ExitStatus.COMPLETED.getExitCode()), + () -> assertThat(rowCount).isEqualTo(2), + () -> assertThat(scoreProduct1).isEqualTo(RankingScoreCalculator.calculate(8, 5)), + () -> assertThat(scoreProduct2).isEqualTo(RankingScoreCalculator.calculate(0, 10)) + ); + } + + @DisplayName("해당 달 범위 밖의 product_metrics는 집계하지 않는다.") + @Test + void monthlyRankingJob_excludesMetricsOutsideTargetMonth() throws Exception { + // arrange + LocalDate firstDay = LocalDate.now().with(TemporalAdjusters.firstDayOfMonth()); + // 이번 달 데이터 + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, firstDay, 3, 2 + ); + // 지난 달 데이터 (범위 밖) + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 2L, firstDay.minusMonths(1), 10, 10 + ); + + jobLauncherTestUtils.setJob(job); + + // act + var params = new JobParametersBuilder() + .addString("targetDate", firstDay.format(DATE_FORMATTER)) + .addLong("runId", System.nanoTime()) + .toJobParameters(); + var execution = jobLauncherTestUtils.launchJob(params); + + // assert: product 2는 지난 달이라 집계되지 않음 + Integer rowCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM mv_product_rank_monthly", Integer.class + ); + + assertAll( + () -> assertThat(execution.getExitStatus().getExitCode()).isEqualTo(ExitStatus.COMPLETED.getExitCode()), + () -> assertThat(rowCount).isEqualTo(1) + ); + } +} diff --git a/apps/commerce-batch/src/test/java/com/loopers/job/ranking/WeeklyRankingJobE2ETest.java b/apps/commerce-batch/src/test/java/com/loopers/job/ranking/WeeklyRankingJobE2ETest.java new file mode 100644 index 0000000000..262c1b48ae --- /dev/null +++ b/apps/commerce-batch/src/test/java/com/loopers/job/ranking/WeeklyRankingJobE2ETest.java @@ -0,0 +1,168 @@ +package com.loopers.job.ranking; + +import com.loopers.batch.job.ranking.RankingScoreCalculator; +import com.loopers.batch.job.ranking.WeeklyRankingJobConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.springframework.batch.core.ExitStatus; +import org.springframework.test.context.jdbc.Sql; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.batch.test.context.SpringBatchTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.TestPropertySource; + +import java.time.DayOfWeek; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAdjusters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; + +@SpringBootTest +@SpringBatchTest +@TestPropertySource(properties = "spring.batch.job.name=" + WeeklyRankingJobConfig.JOB_NAME) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Sql(scripts = "classpath:schema/ranking-tables.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_CLASS) +class WeeklyRankingJobE2ETest { + + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Autowired + @Qualifier(WeeklyRankingJobConfig.JOB_NAME) + private Job job; + + @Autowired + private JdbcTemplate jdbcTemplate; + + @AfterEach + void tearDown() { + jdbcTemplate.update("DELETE FROM mv_product_rank_weekly"); + jdbcTemplate.update("DELETE FROM product_metrics"); + } + + @DisplayName("주간 랭킹 배치를 실행하면, 해당 주의 product_metrics를 집계해 mv_product_rank_weekly에 적재한다.") + @Test + void weeklyRankingJob_aggregatesAndWritesToMvTable() throws Exception { + // arrange + LocalDate monday = LocalDate.now().with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY)); + // product 1: 2일치 — like 총 7, order 총 4 → score = 7*0.2 + 4*0.7 = 4.2 + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, monday, 5, 3 + ); + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, monday.plusDays(1), 2, 1 + ); + // product 2: 1일치 — like 10, order 0 → score = 10*0.2 = 2.0 + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 2L, monday, 10, 0 + ); + + jobLauncherTestUtils.setJob(job); + + // act + var params = new JobParametersBuilder() + .addString("targetDate", monday.format(DATE_FORMATTER)) + .addLong("runId", System.nanoTime()) + .toJobParameters(); + var execution = jobLauncherTestUtils.launchJob(params); + + // assert + Integer rowCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM mv_product_rank_weekly", Integer.class + ); + Double scoreProduct1 = jdbcTemplate.queryForObject( + "SELECT score FROM mv_product_rank_weekly WHERE product_id = ?", Double.class, 1L + ); + Double scoreProduct2 = jdbcTemplate.queryForObject( + "SELECT score FROM mv_product_rank_weekly WHERE product_id = ?", Double.class, 2L + ); + + assertAll( + () -> assertThat(execution.getExitStatus().getExitCode()).isEqualTo(ExitStatus.COMPLETED.getExitCode()), + () -> assertThat(rowCount).isEqualTo(2), + () -> assertThat(scoreProduct1).isEqualTo(RankingScoreCalculator.calculate(7, 4)), + () -> assertThat(scoreProduct2).isEqualTo(RankingScoreCalculator.calculate(10, 0)) + ); + } + + @DisplayName("해당 주 범위 밖의 product_metrics는 집계하지 않는다.") + @Test + void weeklyRankingJob_excludesMetricsOutsideTargetWeek() throws Exception { + // arrange + LocalDate monday = LocalDate.now().with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY)); + // 이번 주 데이터 + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, monday, 3, 2 + ); + // 지난 주 데이터 (범위 밖) + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 2L, monday.minusWeeks(1), 10, 10 + ); + + jobLauncherTestUtils.setJob(job); + + // act + var params = new JobParametersBuilder() + .addString("targetDate", monday.format(DATE_FORMATTER)) + .addLong("runId", System.nanoTime()) + .toJobParameters(); + var execution = jobLauncherTestUtils.launchJob(params); + + // assert: product 2는 지난 주라 집계되지 않음 + Integer rowCount = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM mv_product_rank_weekly", Integer.class + ); + + assertAll( + () -> assertThat(execution.getExitStatus().getExitCode()).isEqualTo(ExitStatus.COMPLETED.getExitCode()), + () -> assertThat(rowCount).isEqualTo(1) + ); + } + + @DisplayName("재실행 시 기존 mv_product_rank_weekly 데이터를 삭제하고 새로 적재한다.") + @Test + void weeklyRankingJob_truncatesBeforeWrite() throws Exception { + // arrange: 이전 배치 결과가 남아있다고 가정 + jdbcTemplate.update( + "INSERT INTO mv_product_rank_weekly (product_id, like_count, order_count, score, year_month_week, updated_at) " + + "VALUES (?, ?, ?, ?, ?, NOW())", + 999L, 1, 1, 0.9, "2020-W01" + ); + LocalDate monday = LocalDate.now().with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY)); + jdbcTemplate.update( + "INSERT INTO product_metrics (product_id, date, like_count, order_count) VALUES (?, ?, ?, ?)", + 1L, monday, 5, 3 + ); + + jobLauncherTestUtils.setJob(job); + + // act + var params = new JobParametersBuilder() + .addString("targetDate", monday.format(DATE_FORMATTER)) + .addLong("runId", System.nanoTime()) + .toJobParameters(); + jobLauncherTestUtils.launchJob(params); + + // assert: product 999 (이전 데이터)는 삭제되고 product 1만 남음 + Integer count999 = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM mv_product_rank_weekly WHERE product_id = ?", Integer.class, 999L + ); + assertThat(count999).isEqualTo(0); + } +} diff --git a/apps/commerce-batch/src/test/resources/schema/ranking-tables.sql b/apps/commerce-batch/src/test/resources/schema/ranking-tables.sql new file mode 100644 index 0000000000..5580954113 --- /dev/null +++ b/apps/commerce-batch/src/test/resources/schema/ranking-tables.sql @@ -0,0 +1,30 @@ +CREATE TABLE IF NOT EXISTS product_metrics +( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + version BIGINT, + product_id BIGINT NOT NULL, + date DATE NOT NULL, + like_count INT NOT NULL DEFAULT 0, + order_count INT NOT NULL DEFAULT 0, + UNIQUE KEY uk_product_date (product_id, date) +); + +CREATE TABLE IF NOT EXISTS mv_product_rank_weekly +( + product_id BIGINT PRIMARY KEY, + like_count INT NOT NULL, + order_count INT NOT NULL, + score DOUBLE NOT NULL, + year_month_week VARCHAR(10) NOT NULL, + updated_at DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS mv_product_rank_monthly +( + product_id BIGINT PRIMARY KEY, + like_count INT NOT NULL, + order_count INT NOT NULL, + score DOUBLE NOT NULL, + ranking_period VARCHAR(10) NOT NULL, + updated_at DATETIME NOT NULL +); diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java index affe6c13b6..c1cfb5a71d 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/product/ProductMetrics.java @@ -6,16 +6,22 @@ import jakarta.persistence.GenerationType; import jakarta.persistence.Id; import jakarta.persistence.Table; +import jakarta.persistence.UniqueConstraint; import jakarta.persistence.Version; import lombok.Getter; +import java.time.LocalDate; + /** - * 상품 집계 지표 (product_metrics). - * commerce-streamer: Kafka 이벤트 소비 후 like_count를 upsert한다. + * 일별 상품 집계 지표 (product_metrics). + * commerce-streamer: Kafka 이벤트 소비 후 (product_id, date) 기준으로 upsert한다. */ @Getter @Entity -@Table(name = "product_metrics") +@Table( + name = "product_metrics", + uniqueConstraints = @UniqueConstraint(columnNames = {"product_id", "date"}) +) public class ProductMetrics { @Id @@ -25,9 +31,12 @@ public class ProductMetrics { @Version private Long version; - @Column(name = "product_id", nullable = false, unique = true) + @Column(name = "product_id", nullable = false) private Long productId; + @Column(name = "date", nullable = false) + private LocalDate date; + @Column(name = "like_count", nullable = false) private int likeCount; @@ -37,14 +46,16 @@ public class ProductMetrics { protected ProductMetrics() { } - public ProductMetrics(Long productId, int likeCount) { + public ProductMetrics(Long productId, LocalDate date, int likeCount) { this.productId = productId; + this.date = date; this.likeCount = likeCount; this.orderCount = 0; } - public ProductMetrics(Long productId, int likeCount, int orderCount) { + public ProductMetrics(Long productId, LocalDate date, int likeCount, int orderCount) { this.productId = productId; + this.date = date; this.likeCount = likeCount; this.orderCount = orderCount; } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java index fb4f1cc62f..965f4d9ee4 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/product/ProductMetricsJpaRepository.java @@ -3,8 +3,9 @@ import com.loopers.domain.product.ProductMetrics; import org.springframework.data.jpa.repository.JpaRepository; +import java.time.LocalDate; import java.util.Optional; public interface ProductMetricsJpaRepository extends JpaRepository { - Optional findByProductId(Long productId); + Optional findByProductIdAndDate(Long productId, LocalDate date); } diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java index 4572bd359d..be39f2e0c2 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/LikeEventConsumer.java @@ -71,8 +71,8 @@ private void processLikeEvent(LikeEventMessage message, LocalDate eventDate) { } ProductMetrics metrics = productMetricsJpaRepository - .findByProductId(message.productId()) - .orElseGet(() -> new ProductMetrics(message.productId(), 0)); + .findByProductIdAndDate(message.productId(), eventDate) + .orElseGet(() -> new ProductMetrics(message.productId(), eventDate, 0)); double rankingIncrement; if ("LIKED".equals(message.type())) { diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.java index 569f014910..30b3ac189e 100644 --- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.java +++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.java @@ -75,8 +75,8 @@ private void processOrderEvent(OrderEventMessage message, LocalDate eventDate) { } ProductMetrics metrics = productMetricsJpaRepository - .findByProductId(message.productId()) - .orElseGet(() -> new ProductMetrics(message.productId(), 0)); + .findByProductIdAndDate(message.productId(), eventDate) + .orElseGet(() -> new ProductMetrics(message.productId(), eventDate, 0)); metrics.increaseOrderCount(); productMetricsJpaRepository.save(metrics); diff --git a/apps/commerce-streamer/src/test/java/com/loopers/domain/product/ProductMetricsOptimisticLockTest.java b/apps/commerce-streamer/src/test/java/com/loopers/domain/product/ProductMetricsOptimisticLockTest.java index a58b5b1975..b09fc70b21 100644 --- a/apps/commerce-streamer/src/test/java/com/loopers/domain/product/ProductMetricsOptimisticLockTest.java +++ b/apps/commerce-streamer/src/test/java/com/loopers/domain/product/ProductMetricsOptimisticLockTest.java @@ -8,6 +8,8 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.orm.ObjectOptimisticLockingFailureException; +import java.time.LocalDate; + import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -28,11 +30,12 @@ void tearDown() { @Test void concurrentUpdate_throwsOptimisticLockException() { // arrange: 초기 ProductMetrics 저장 (version=0) - productMetricsJpaRepository.save(new ProductMetrics(9999L, 0)); + LocalDate today = LocalDate.now(); + productMetricsJpaRepository.save(new ProductMetrics(9999L, today, 0)); // 두 스레드가 각각 같은 row를 읽는다 (version=0) - ProductMetrics metrics1 = productMetricsJpaRepository.findByProductId(9999L).orElseThrow(); - ProductMetrics metrics2 = productMetricsJpaRepository.findByProductId(9999L).orElseThrow(); + ProductMetrics metrics1 = productMetricsJpaRepository.findByProductIdAndDate(9999L, today).orElseThrow(); + ProductMetrics metrics2 = productMetricsJpaRepository.findByProductIdAndDate(9999L, today).orElseThrow(); // 첫 번째 스레드가 먼저 저장 (version: 0 → 1) metrics1.increaseLikeCount(); @@ -44,7 +47,7 @@ void concurrentUpdate_throwsOptimisticLockException() { .isInstanceOf(ObjectOptimisticLockingFailureException.class); // DB에는 첫 번째 저장 결과만 반영됨 - ProductMetrics result = productMetricsJpaRepository.findByProductId(9999L).orElseThrow(); + ProductMetrics result = productMetricsJpaRepository.findByProductIdAndDate(9999L, today).orElseThrow(); assertThat(result.getLikeCount()).isEqualTo(1); } } diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/LikeEventConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/LikeEventConsumerTest.java index 8e7219dcb3..6845b5dbd3 100644 --- a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/LikeEventConsumerTest.java +++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/LikeEventConsumerTest.java @@ -15,6 +15,7 @@ import org.mockito.ArgumentCaptor; import org.springframework.kafka.support.Acknowledgment; +import java.time.LocalDate; import java.util.List; import java.util.Optional; @@ -23,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -56,7 +58,7 @@ void upsertIncrease_whenLikedEventReceived() { ConsumerRecord record = new ConsumerRecord<>("catalog-events", 0, 0L, "100", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); when(eventHandledJpaRepository.existsByTopicAndEventId("catalog-events", "evt-1")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.empty()); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.empty()); // act likeEventConsumer.handleLikeEvents(List.of(record), acknowledgment); @@ -75,9 +77,9 @@ void upsertDecrease_whenUnlikedEventReceived() { LikeEventConsumer.LikeEventMessage message = new LikeEventConsumer.LikeEventMessage("UNLIKED", 100L, 1L, "evt-2"); ConsumerRecord record = new ConsumerRecord<>("catalog-events", 0, 0L, "100", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); - ProductMetrics existing = new ProductMetrics(100L, 3); + ProductMetrics existing = new ProductMetrics(100L, LocalDate.now(), 3); when(eventHandledJpaRepository.existsByTopicAndEventId("catalog-events", "evt-2")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.of(existing)); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.of(existing)); // act likeEventConsumer.handleLikeEvents(List.of(record), acknowledgment); @@ -114,7 +116,7 @@ void incrementsRankingScore_whenLikedEventReceived() { ConsumerRecord record = new ConsumerRecord<>("catalog-events", 0, 0L, "100", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); when(eventHandledJpaRepository.existsByTopicAndEventId("catalog-events", "evt-r1")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.empty()); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.empty()); // act likeEventConsumer.handleLikeEvents(List.of(record), acknowledgment); @@ -132,9 +134,9 @@ void decrementsRankingScore_whenUnlikedEventReceived() { LikeEventConsumer.LikeEventMessage message = new LikeEventConsumer.LikeEventMessage("UNLIKED", 100L, 1L, "evt-r2"); ConsumerRecord record = new ConsumerRecord<>("catalog-events", 0, 0L, "100", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); - ProductMetrics existing = new ProductMetrics(100L, 3); + ProductMetrics existing = new ProductMetrics(100L, LocalDate.now(), 3); when(eventHandledJpaRepository.existsByTopicAndEventId("catalog-events", "evt-r2")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.of(existing)); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.of(existing)); // act likeEventConsumer.handleLikeEvents(List.of(record), acknowledgment); diff --git a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/OrderEventConsumerTest.java b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/OrderEventConsumerTest.java index 2411762071..b40fc31516 100644 --- a/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/OrderEventConsumerTest.java +++ b/apps/commerce-streamer/src/test/java/com/loopers/interfaces/consumer/OrderEventConsumerTest.java @@ -15,6 +15,7 @@ import org.mockito.ArgumentCaptor; import org.springframework.kafka.support.Acknowledgment; +import java.time.LocalDate; import java.util.List; import java.util.Optional; @@ -23,6 +24,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -56,7 +58,7 @@ void upsertIncrease_whenOrderCreatedEventReceived() { ConsumerRecord record = new ConsumerRecord<>("order-events", 0, 0L, "1", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); when(eventHandledJpaRepository.existsByTopicAndEventId("order-events", "evt-1")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.empty()); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.empty()); // act orderEventConsumer.handleOrderEvents(List.of(record), acknowledgment); @@ -75,9 +77,9 @@ void upsertIncrease_whenMetricsAlreadyExists() { OrderEventConsumer.OrderEventMessage message = new OrderEventConsumer.OrderEventMessage("ORDER_CREATED", 100L, 1L, 50000, "evt-2"); ConsumerRecord record = new ConsumerRecord<>("order-events", 0, 0L, "1", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); - ProductMetrics existing = new ProductMetrics(100L, 5, 3); + ProductMetrics existing = new ProductMetrics(100L, LocalDate.now(), 5, 3); when(eventHandledJpaRepository.existsByTopicAndEventId("order-events", "evt-2")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.of(existing)); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.of(existing)); // act orderEventConsumer.handleOrderEvents(List.of(record), acknowledgment); @@ -117,7 +119,7 @@ void incrementsRankingScore_whenOrderCreatedEventReceived() { new ConsumerRecord<>("order-events", 0, 0L, "1", message); Acknowledgment acknowledgment = mock(Acknowledgment.class); when(eventHandledJpaRepository.existsByTopicAndEventId("order-events", "evt-r1")).thenReturn(false); - when(productMetricsJpaRepository.findByProductId(100L)).thenReturn(Optional.empty()); + when(productMetricsJpaRepository.findByProductIdAndDate(eq(100L), any(LocalDate.class))).thenReturn(Optional.empty()); // act orderEventConsumer.handleOrderEvents(List.of(record), acknowledgment);