Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f11e96b
feat: 랭킹 배치 anchorDate 파라미터 리스너와 롤링 윈도우 계산기 추가
ghtjr410 Apr 15, 2026
69a51f3
feat: 랭킹 배치 스테이징 테이블과 Step 0 초기화 Tasklet 추가
ghtjr410 Apr 15, 2026
c4f692b
feat: 랭킹 배치 Step 1 - View 메트릭 cursor 스트리밍 집계 추가
ghtjr410 Apr 15, 2026
bccd9b0
feat: 랭킹 배치 Step 2/3 - Like/Order 메트릭 스테이징 적재 추가
ghtjr410 Apr 15, 2026
ebe14ce
feat: 랭킹 배치 Step 4a/4b - MV 사전 DELETE Tasklet 추가
ghtjr410 Apr 15, 2026
8ca5d0c
feat: 랭킹 배치 Step 5 - 전체 상품 score 계산을 2차 스테이징에 적재
ghtjr410 Apr 15, 2026
97e453a
feat: 랭킹 배치 Step 5b/7/6 - Promote/Audit/Redis 로 파이프라인 완성
ghtjr410 Apr 15, 2026
f018ade
feat: commerce-api 의 랭킹 MV 조회 어댑터 추가
ghtjr410 Apr 15, 2026
066ae51
feat: 랭킹 조회 API 를 LAST_7D/LAST_30D 롤링 윈도우로 교체
ghtjr410 Apr 15, 2026
aef63c9
test: 랭킹 배치 재시작 시나리오 4종 추가 + chunk Reader saveState 비활성화
ghtjr410 Apr 15, 2026
d716afc
test: 랭킹 배치 시드 생성기와 분포 검증 추가
ghtjr410 Apr 15, 2026
b4072ca
test: 랭킹 배치 선형성/스파이크 측정 벤치마크와 실행 스크립트 추가
ghtjr410 Apr 15, 2026
d440609
fix: streaming aggregator lookahead 를 ExecutionContext 에 직렬화하여 chunk-…
ghtjr410 Apr 16, 2026
cfe03d1
test: Scenario 4 - Hot product 긴 bucket 체인의 chunk 경계 무절단 검증
ghtjr410 Apr 16, 2026
bf1fa78
fix: weight_group 을 Job 시작 시점에 ExecutionContext 스냅샷으로 동결
ghtjr410 Apr 16, 2026
b2a74b8
fix: Step 7 audit 실패 시 오염 MV 격리 + API 전일 anchor 자동 fallback
ghtjr410 Apr 17, 2026
9772899
refactor: Step 4a/4b 를 단일 PurgeMvTasklet 으로 통합
ghtjr410 Apr 17, 2026
acc517f
style: 배치 테스트 13개 파일을 프로젝트 test-patterns 컨벤션으로 교체
ghtjr410 Apr 17, 2026
d33a90e
refactor: MV 교체를 DELETE+INSERT 단일 TX 원자 교체로 변경
ghtjr410 Apr 17, 2026
c236b24
chore: Step 번호를 0~7 순차로 재정렬
ghtjr410 Apr 17, 2026
776b6a5
refactor: audit 검증 Step 제거, 실행 이력 기록을 promote Step으로 통합
ghtjr410 Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
import org.springframework.stereotype.Component;

import java.time.Clock;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.temporal.WeekFields;

@Component
public class RankingKeyResolver {

private static final DateTimeFormatter DAILY_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final DateTimeFormatter MONTHLY_FORMAT = DateTimeFormatter.ofPattern("yyyyMM");

private final Clock clock;

Expand All @@ -25,18 +22,26 @@ public String resolve(RankingPeriod period, LocalDate date, String groupName) {
String base = switch (period) {
case REALTIME -> "ranking:realtime";
case DAILY -> "ranking:daily:" + date.format(DAILY_FORMAT);
case WEEKLY -> {
WeekFields iso = WeekFields.ISO;
int year = date.get(iso.weekBasedYear());
int week = date.get(iso.weekOfWeekBasedYear());
yield String.format("ranking:weekly:%d%02d", year, week);
}
case MONTHLY -> "ranking:monthly:" + date.format(MONTHLY_FORMAT);
// LAST_7D / LAST_30D 는 anchor = 어제 (오늘 제외 롤링 윈도우) 기반 키
case LAST_7D -> "ranking:last7d:" + anchorDateKey(date);
case LAST_30D -> "ranking:last30d:" + anchorDateKey(date);
};
return base + ":" + groupName;
}

public String resolve(RankingPeriod period, LocalDate date) {
return resolve(period, date, "control");
}

/**
* 조회 기준일(오늘) 로부터 anchor_date (= 어제) 를 반환한다.
* 배치가 이 anchor 로 MV / Redis ZSET 을 만들므로 API 도 동일 키로 조회해야 한다.
*/
public LocalDate anchorDateOf(LocalDate date) {
return date.minusDays(1);
}

private String anchorDateKey(LocalDate date) {
return anchorDateOf(date).format(DAILY_FORMAT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import com.loopers.domain.ranking.RankEntry;
import com.loopers.domain.ranking.RankingPeriod;
import com.loopers.domain.ranking.mv.MvRankEntry;
import com.loopers.domain.ranking.mv.MvRankingQueryRepository;
import com.loopers.infrastructure.ranking.RankingRedisRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.time.DayOfWeek;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.TemporalAdjusters;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -21,15 +21,18 @@ public class RankingService {
private final RankingRedisRepository rankingRedisRepository;
private final RankingKeyResolver keyResolver;
private final RankingFallbackAggregator fallbackAggregator;
private final MvRankingQueryRepository mvRankingQueryRepository;
private final ExperimentGroupResolver experimentGroupResolver;

public RankingService(RankingRedisRepository rankingRedisRepository,
RankingKeyResolver keyResolver,
RankingFallbackAggregator fallbackAggregator,
MvRankingQueryRepository mvRankingQueryRepository,
ExperimentGroupResolver experimentGroupResolver) {
this.rankingRedisRepository = rankingRedisRepository;
this.keyResolver = keyResolver;
this.fallbackAggregator = fallbackAggregator;
this.mvRankingQueryRepository = mvRankingQueryRepository;
this.experimentGroupResolver = experimentGroupResolver;
}

Expand All @@ -47,11 +50,18 @@ public long getTotalCount(RankingPeriod period, LocalDate date, String group) {
String key = keyResolver.resolve(period, date, group);
try {
Long count = rankingRedisRepository.getTotalCount(key);
return count == null ? 0 : count;
if (count != null && count > 0) {
return count;
}
} catch (Exception e) {
log.warn("Redis totalCount 조회 실패: {}", e.getMessage());
return 0;
}
// Redis miss 또는 0일 때 MV 카운트 fallback (LAST_7D / LAST_30D 만 해당)
return switch (period) {
case LAST_7D -> mvRankingQueryRepository.countLast7d(keyResolver.anchorDateOf(date), group);
case LAST_30D -> mvRankingQueryRepository.countLast30d(keyResolver.anchorDateOf(date), group);
default -> 0;
};
}

public Integer getProductRank(Long productId, RankingPeriod period, LocalDate date) {
Expand All @@ -67,14 +77,53 @@ private List<RankEntry> loadRankEntries(RankingPeriod period, LocalDate date, in
return fromRedis;
}
} catch (Exception e) {
log.warn("Redis 랭킹 조회 실패, DB fallback. period={}, date={}, group={}: {}",
log.warn("Redis 랭킹 조회 실패, fallback. period={}, date={}, group={}: {}",
period, date, group, e.getMessage());
}

return fallbackFromDb(period, date, page, size);
return switch (period) {
// 롤링 랭킹: MV 테이블 직접 조회 (확정된 TOP N 을 그대로 투영, bucket SUM 재계산 아님)
case LAST_7D, LAST_30D -> fallbackFromMv(period, date, page, size, group);
// 실시간/일간: 기존 bucket 집계 재계산 경로
case REALTIME, DAILY -> fallbackFromBucketAggregation(period, date, page, size);
};
}

private static final int MV_FALLBACK_MAX_DAYS = 3;

private List<RankEntry> fallbackFromMv(RankingPeriod period, LocalDate date, int page, int size, String group) {
try {
LocalDate anchorDate = keyResolver.anchorDateOf(date);
int offset = page * size;

// 현재 anchor 의 MV 가 비어있으면 전일 anchor 로 자동 fallback (최대 3일).
// 배치 미실행 또는 해당 anchor 에 데이터가 없으면 비어있을 수 있음.
// "잘못된 랭킹" 보다 "어제 랭킹이라도 보여주기" 가 사용자 경험상 나음.
for (int retry = 0; retry < MV_FALLBACK_MAX_DAYS; retry++) {
List<MvRankEntry> rows = switch (period) {
case LAST_7D -> mvRankingQueryRepository.findLast7d(anchorDate, group, offset, size);
case LAST_30D -> mvRankingQueryRepository.findLast30d(anchorDate, group, offset, size);
default -> List.of();
};
if (!rows.isEmpty()) {
if (retry > 0) {
log.info("MV fallback: 현재 anchor 비어있어 전일로 대체. period={}, 원래anchor={}, 사용anchor={}",
period, keyResolver.anchorDateOf(date), anchorDate);
}
return rows.stream()
.map(r -> new RankEntry(r.productId(), r.score(), r.rankPosition()))
.toList();
}
anchorDate = anchorDate.minusDays(1);
}
return List.of();
} catch (Exception e) {
log.error("MV fallback 실패. period={}, date={}, group={}", period, date, group, e);
return List.of();
}
}
Comment on lines +94 to 124
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

MV 조회에 타임아웃/실패 처리 정책이 없어 cascading 장애로 번질 수 있다.

fallbackFromMv 는 최대 3 회 DB 조회를 수행한다. 각 호출의 JDBC 타임아웃 / 커넥션 풀 상한 / 지연 허용치에 대한 방어가 없고, MV 테이블이 락이나 대형 트랜잭션에 물리면 3 회가 누적되어 API 스레드가 오래 점유된다. Redis 장애가 났을 때 모든 요청이 이 경로로 몰리면 MV 조회 큐잉으로 DB 커넥션 풀이 바닥나는 연쇄 장애가 발생한다.

또한 catch (Exception e) 로 모든 예외를 List.of() 로 바꿔 삼키는 구조 (120-123) 는 다음을 의미한다:

  • 사용자에게는 "빈 랭킹" 으로 보이는데, 로그만으로는 DB 장애 인지 데이터 없음 인지 구분이 어렵다.
  • APM / 알림 연동 (예: Sentry, CoreException → ApiControllerAdvice) 의 에러율 지표에 잡히지 않는다.

권고다:

  1. MvRankingQueryRepository 구현체에 statement timeout / @Transactional(timeout = N) 설정 (예: 1 초).
  2. 첫 번째 조회 실패는 fallback 으로 살리더라도, 연속 실패 시에는 지표성 예외 (CoreException) 또는 Micrometer 카운터 + 구조화 로그 (mv_fallback_error tag) 를 방출해 운영 가시성 확보.
  3. Redis 장애 장기화 시 circuit breaker (resilience4j) 로 MV 호출 자체를 차단하고 그대로 빈 리스트 반환 + 알림.

추가 테스트로는 MvRankingQueryRepository Mockito stub 으로 SQLTimeoutException 을 던졌을 때:

  • 호출 횟수가 3 회로 제한되는지,
  • 로그 레벨이 error 인지,
  • 반환값이 List.of() 인지

확인하는 케이스를 포함하길 권고한다.

코딩 가이드라인 (외부 호출에는 타임아웃/재시도/서킷브레이커 고려 여부를 점검하고, 실패 시 대체 흐름을 제안한다) 에 따라 제안한다.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/commerce-api/src/main/java/com/loopers/application/ranking/RankingService.java`
around lines 94 - 124, fallbackFromMv performs up to MV_FALLBACK_MAX_DAYS DB
calls without per-call timeouts and swallows all exceptions, enabling cascading
failures and losing operational visibility; update MvRankingQueryRepository
implementations to enforce statement/transaction timeouts (e.g.,
`@Transactional`(timeout=1) or JDBC statement timeout) and wrap the MV access in a
circuit breaker (resilience4j) around the calls from fallbackFromMv; change
fallbackFromMv to treat the first SQL timeout/exception as a single fallback
(log as warn and continue), but on consecutive failures emit a CoreException or
increment a Micrometer counter + structured error log (tag mv_fallback_error) so
failures surface to APM, and ensure the method still returns List.of() when the
breaker is open; add unit tests for fallbackFromMv using Mockito to throw
SQLTimeoutException from MvRankingQueryRepository verifying at-most
MV_FALLBACK_MAX_DAYS invocations, error-level logging/metric emission on
repeated failures, and that the method returns an empty list when the breaker
tripped.


private List<RankEntry> fallbackFromDb(RankingPeriod period, LocalDate date, int page, int size) {
private List<RankEntry> fallbackFromBucketAggregation(RankingPeriod period, LocalDate date, int page, int size) {
try {
LocalDateTime from = calculateFrom(period, date);
LocalDateTime to = RankingDateUtils.kstDateToUtcBoundary(date.plusDays(1));
Expand All @@ -94,7 +143,7 @@ private List<RankEntry> fallbackFromDb(RankingPeriod period, LocalDate date, int
.map(e -> new RankEntry(e.getKey(), e.getValue(), rankCounter.getAndIncrement()))
.toList();
} catch (Exception e) {
log.error("DB fallback도 실패. period={}, date={}", period, date, e);
log.error("bucket 집계 fallback 실패. period={}, date={}", period, date, e);
return List.of();
}
}
Expand All @@ -109,14 +158,9 @@ private LocalDateTime calculateFrom(RankingPeriod period, LocalDate date) {
java.time.ZoneOffset.UTC);
}
case DAILY -> RankingDateUtils.kstDateToUtcBoundary(date);
case WEEKLY -> {
LocalDate monday = date.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY));
yield RankingDateUtils.kstDateToUtcBoundary(monday);
}
case MONTHLY -> {
LocalDate firstOfMonth = date.withDayOfMonth(1);
yield RankingDateUtils.kstDateToUtcBoundary(firstOfMonth);
}
// LAST_7D / LAST_30D 는 MV fallback 경로라 여기 들어올 일 없음
case LAST_7D, LAST_30D -> throw new IllegalStateException(
"rolling period fallback 은 MV 경로로만 처리되어야 한다: " + period);
};
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package com.loopers.domain.ranking;

/**
* 랭킹 조회 기간.
*
* <p>WEEKLY/MONTHLY 캘린더 경계는 "월요일 오전/매월 1일 오전에 표본이 1일치" 라는 빈약성
* 문제가 있고, 실무에선 이커머스 랭킹을 롤링 N일 (오늘 제외) 로 구현하는 것이 일반적이다
* (설계.md 프롤로그 + 데빈/케브/앨런 멘토링 결론). 본 API 는 배치가 만드는 롤링 MV 와
* 일관되게 LAST_7D / LAST_30D 로 노출한다.</p>
*/
public enum RankingPeriod {
REALTIME, DAILY, WEEKLY, MONTHLY
REALTIME,
DAILY,
LAST_7D,
LAST_30D
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.loopers.domain.ranking.mv;

import java.io.Serializable;
import java.time.LocalDate;
import java.util.Objects;

/**
* mv_product_rank_last_7d / mv_product_rank_last_30d 공통 PK.
* commerce-batch 가 생성·적재하는 MV 를 commerce-api 가 읽기 위한 스키마 미러.
*/
public class MvProductRankId implements Serializable {

private LocalDate anchorDate;
private String weightGroup;
private Long productId;

public MvProductRankId() {
}

public MvProductRankId(LocalDate anchorDate, String weightGroup, Long productId) {
this.anchorDate = anchorDate;
this.weightGroup = weightGroup;
this.productId = productId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof MvProductRankId that)) return false;
return Objects.equals(anchorDate, that.anchorDate)
&& Objects.equals(weightGroup, that.weightGroup)
&& Objects.equals(productId, that.productId);
}

@Override
public int hashCode() {
return Objects.hash(anchorDate, weightGroup, productId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.loopers.domain.ranking.mv;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.IdClass;
import jakarta.persistence.Index;
import jakarta.persistence.Table;
import lombok.Getter;

import java.time.LocalDate;
import java.time.LocalDateTime;

@Entity
@Table(
name = "mv_product_rank_last_30d",
indexes = @Index(
name = "idx_last_30d_rank",
columnList = "anchor_date, weight_group, rank_position"
)
)
@IdClass(MvProductRankId.class)
@Getter
public class MvProductRankLast30d {

@Id
@Column(name = "anchor_date", nullable = false)
private LocalDate anchorDate;

@Id
@Column(name = "weight_group", length = 32, nullable = false)
private String weightGroup;

@Id
@Column(name = "product_id", nullable = false)
private Long productId;

@Column(name = "view_count", nullable = false)
private long viewCount;

@Column(name = "like_count", nullable = false)
private long likeCount;

@Column(name = "sales_amount", nullable = false)
private long salesAmount;

@Column(name = "score", nullable = false)
private double score;

@Column(name = "rank_position", nullable = false)
private int rankPosition;

@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt;

protected MvProductRankLast30d() {
}

public MvProductRankLast30d(LocalDate anchorDate, String weightGroup, Long productId,
long viewCount, long likeCount, long salesAmount,
double score, int rankPosition, LocalDateTime createdAt) {
this.anchorDate = anchorDate;
this.weightGroup = weightGroup;
this.productId = productId;
this.viewCount = viewCount;
this.likeCount = likeCount;
this.salesAmount = salesAmount;
this.score = score;
this.rankPosition = rankPosition;
this.createdAt = createdAt;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.loopers.domain.ranking.mv;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.IdClass;
import jakarta.persistence.Index;
import jakarta.persistence.Table;
import lombok.Getter;

import java.time.LocalDate;
import java.time.LocalDateTime;

/**
* 롤링 7일 랭킹 확정 MV 의 commerce-api 측 읽기 모델.
* commerce-batch 가 쓰기 소유자이며, 여기서는 조회만 한다.
*/
@Entity
@Table(
name = "mv_product_rank_last_7d",
indexes = @Index(
name = "idx_last_7d_rank",
columnList = "anchor_date, weight_group, rank_position"
)
)
@IdClass(MvProductRankId.class)
@Getter
public class MvProductRankLast7d {

@Id
@Column(name = "anchor_date", nullable = false)
private LocalDate anchorDate;

@Id
@Column(name = "weight_group", length = 32, nullable = false)
private String weightGroup;

@Id
@Column(name = "product_id", nullable = false)
private Long productId;

@Column(name = "view_count", nullable = false)
private long viewCount;

@Column(name = "like_count", nullable = false)
private long likeCount;

@Column(name = "sales_amount", nullable = false)
private long salesAmount;

@Column(name = "score", nullable = false)
private double score;

@Column(name = "rank_position", nullable = false)
private int rankPosition;

@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt;

protected MvProductRankLast7d() {
}

public MvProductRankLast7d(LocalDate anchorDate, String weightGroup, Long productId,
long viewCount, long likeCount, long salesAmount,
double score, int rankPosition, LocalDateTime createdAt) {
this.anchorDate = anchorDate;
this.weightGroup = weightGroup;
this.productId = productId;
this.viewCount = viewCount;
this.likeCount = likeCount;
this.salesAmount = salesAmount;
this.score = score;
this.rankPosition = rankPosition;
this.createdAt = createdAt;
}
}
Loading