From 266e98996543d311da05291b09d849d88a5f0bf8 Mon Sep 17 00:00:00 2001
From: ddjiny
Date: Wed, 8 Apr 2026 01:18:13 +0900
Subject: [PATCH 1/6] =?UTF-8?q?feat:=20Redis=20ZSET=20=EA=B8=B0=EB=B0=98?=
=?UTF-8?q?=20=EC=8B=A4=EC=8B=9C=EA=B0=84=20=EB=9E=AD=ED=82=B9=20=EC=8B=9C?=
=?UTF-8?q?=EC=8A=A4=ED=85=9C=20=EA=B5=AC=ED=98=84=20(Step=200~1)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- RankingWeight 가중치 상수 (VIEW=0.01, LIKE=0.3, ORDER=1.0+ε×log)
- RankingRepository/Impl: ZINCRBY+EXPIRE, SADD/SREM 좋아요 멱등
- RankingService: 점수 증감 + 날짜 키 resolve + 좋아요 중복 방어
- CatalogEventProcessor: 조회/좋아요 이벤트 → 랭킹 적재 (try-catch)
- OrderEventProcessor: 주문 이벤트 → 건수+금액 보너스 적재 (try-catch)
- ErrorType.INVALID_DATE_FORMAT 추가
- 단위 테스트 10건 + 통합 테스트 6건 (Testcontainers Redis)
---
.../com/loopers/support/error/ErrorType.java | 5 +-
apps/commerce-streamer/build.gradle.kts | 3 +
.../domain/ranking/RankingRepository.java | 41 +++++
.../domain/ranking/RankingService.java | 59 +++++++
.../loopers/domain/ranking/RankingWeight.java | 40 +++++
.../ranking/RankingRepositoryImpl.java | 57 +++++++
.../consumer/CatalogEventProcessor.java | 38 ++++-
.../consumer/OrderEventProcessor.java | 25 ++-
.../domain/ranking/RankingServiceTest.java | 83 +++++++++
.../domain/ranking/RankingWeightTest.java | 41 +++++
.../RankingRepositoryImplIntegrationTest.java | 121 +++++++++++++
k6/README.md | 48 ++++++
k6/scripts/session9/ranking-api-load.js | 88 ++++++++++
k6/scripts/session9/ranking-e2e-accuracy.js | 159 ++++++++++++++++++
.../session9/ranking-event-throughput.js | 159 ++++++++++++++++++
k6/scripts/session9/ranking-mixed-load.js | 151 +++++++++++++++++
.../session9/ranking-weight-accuracy.js | 140 +++++++++++++++
k6/seed-session9.sh | 58 +++++++
18 files changed, 1310 insertions(+), 6 deletions(-)
create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingRepository.java
create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java
create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingWeight.java
create mode 100644 apps/commerce-streamer/src/main/java/com/loopers/infrastructure/ranking/RankingRepositoryImpl.java
create mode 100644 apps/commerce-streamer/src/test/java/com/loopers/domain/ranking/RankingServiceTest.java
create mode 100644 apps/commerce-streamer/src/test/java/com/loopers/domain/ranking/RankingWeightTest.java
create mode 100644 apps/commerce-streamer/src/test/java/com/loopers/infrastructure/ranking/RankingRepositoryImplIntegrationTest.java
create mode 100644 k6/scripts/session9/ranking-api-load.js
create mode 100644 k6/scripts/session9/ranking-e2e-accuracy.js
create mode 100644 k6/scripts/session9/ranking-event-throughput.js
create mode 100644 k6/scripts/session9/ranking-mixed-load.js
create mode 100644 k6/scripts/session9/ranking-weight-accuracy.js
create mode 100644 k6/seed-session9.sh
diff --git a/apps/commerce-api/src/main/java/com/loopers/support/error/ErrorType.java b/apps/commerce-api/src/main/java/com/loopers/support/error/ErrorType.java
index 907f330aee..bfe54b42f2 100644
--- a/apps/commerce-api/src/main/java/com/loopers/support/error/ErrorType.java
+++ b/apps/commerce-api/src/main/java/com/loopers/support/error/ErrorType.java
@@ -82,7 +82,10 @@ public enum ErrorType {
/** Entry Token */
ENTRY_TOKEN_REQUIRED(HttpStatus.FORBIDDEN, "ENTRY_TOKEN_REQUIRED", "입장 토큰이 필요합니다."),
ENTRY_TOKEN_INVALID(HttpStatus.FORBIDDEN, "ENTRY_TOKEN_INVALID", "유효하지 않은 입장 토큰입니다."),
- ENTRY_TOKEN_EXPIRED(HttpStatus.FORBIDDEN, "ENTRY_TOKEN_EXPIRED", "입장 토큰이 만료되었습니다.");
+ ENTRY_TOKEN_EXPIRED(HttpStatus.FORBIDDEN, "ENTRY_TOKEN_EXPIRED", "입장 토큰이 만료되었습니다."),
+
+ /** Ranking */
+ INVALID_DATE_FORMAT(HttpStatus.BAD_REQUEST, "INVALID_DATE_FORMAT", "날짜 형식이 올바르지 않습니다 (yyyyMMdd)");
private final HttpStatus status;
private final String code;
diff --git a/apps/commerce-streamer/build.gradle.kts b/apps/commerce-streamer/build.gradle.kts
index ba710e6eba..ec427aec11 100644
--- a/apps/commerce-streamer/build.gradle.kts
+++ b/apps/commerce-streamer/build.gradle.kts
@@ -20,4 +20,7 @@ dependencies {
testImplementation(testFixtures(project(":modules:jpa")))
testImplementation(testFixtures(project(":modules:redis")))
testImplementation(testFixtures(project(":modules:kafka")))
+
+ // testcontainers (Redis 통합 테스트)
+ testImplementation("com.redis:testcontainers-redis")
}
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingRepository.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingRepository.java
new file mode 100644
index 0000000000..d851eebf35
--- /dev/null
+++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingRepository.java
@@ -0,0 +1,41 @@
+package com.loopers.domain.ranking;
+
+import java.time.LocalDate;
+
+/**
+ * 랭킹 ZSET 쓰기 연산 인터페이스 (commerce-streamer 전용).
+ *
+ * commerce-api의 RankingRepository와는 별도 정의 (ISP).
+ * streamer는 쓰기(ZINCRBY, SADD, SREM)만, api는 읽기(ZREVRANGE, ZREVRANK)만 정의.
+ */
+public interface RankingRepository {
+
+ /**
+ * 특정 상품의 랭킹 점수를 증가시킨다. (ZINCRBY)
+ *
+ * @param productId 상품 ID
+ * @param score 증가시킬 점수 (가중치 적용된 값)
+ * @param date 집계 대상 날짜 (이벤트의 occurredAt 기준)
+ */
+ void incrementScore(Long productId, double score, LocalDate date);
+
+ /**
+ * 좋아요 멱등 처리 — 해당 유저의 좋아요가 처음인 경우에만 true 반환. (SADD)
+ *
+ * @param productId 상품 ID
+ * @param userId 유저 ID
+ * @param date 집계 대상 날짜
+ * @return true: 신규 (ZINCRBY 진행), false: 이미 존재 (스킵)
+ */
+ boolean addLikeIfAbsent(Long productId, Long userId, LocalDate date);
+
+ /**
+ * 좋아요 취소 멱등 처리 — 해당 유저의 좋아요가 존재하던 경우에만 true 반환. (SREM)
+ *
+ * @param productId 상품 ID
+ * @param userId 유저 ID
+ * @param date 집계 대상 날짜
+ * @return true: 제거됨 (ZINCRBY -score 진행), false: 없었음 (스킵)
+ */
+ boolean removeLikeIfPresent(Long productId, Long userId, LocalDate date);
+}
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java
new file mode 100644
index 0000000000..8f7d170eb7
--- /dev/null
+++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingService.java
@@ -0,0 +1,59 @@
+package com.loopers.domain.ranking;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+@Service
+@RequiredArgsConstructor
+@Slf4j
+public class RankingService {
+
+ private final RankingRepository rankingRepository;
+
+ /**
+ * 이벤트 기반 랭킹 점수를 증가시킨다. (조회, 주문용)
+ */
+ public void incrementScore(Long productId, double score, LocalDateTime occurredAt) {
+ LocalDate date = resolveDate(occurredAt);
+ rankingRepository.incrementScore(productId, score, date);
+ log.debug("[Ranking] ZINCRBY productId={}, score={}, date={}", productId, score, date);
+ }
+
+ /**
+ * 좋아요 멱등 점수 증가 — SADD로 중복 확인 후 ZINCRBY.
+ */
+ public void incrementLikeScoreIfAbsent(Long productId, Long userId, double score,
+ LocalDateTime occurredAt) {
+ LocalDate date = resolveDate(occurredAt);
+ boolean isNew = rankingRepository.addLikeIfAbsent(productId, userId, date);
+ if (isNew) {
+ rankingRepository.incrementScore(productId, score, date);
+ log.debug("[Ranking] LIKED (new) productId={}, userId={}, score={}", productId, userId, score);
+ } else {
+ log.debug("[Ranking] LIKED (duplicate, skip) productId={}, userId={}", productId, userId);
+ }
+ }
+
+ /**
+ * 좋아요 취소 멱등 점수 차감 — SREM으로 확인 후 ZINCRBY 음수.
+ */
+ public void decrementLikeScoreIfPresent(Long productId, Long userId, double score,
+ LocalDateTime occurredAt) {
+ LocalDate date = resolveDate(occurredAt);
+ boolean wasPresent = rankingRepository.removeLikeIfPresent(productId, userId, date);
+ if (wasPresent) {
+ rankingRepository.incrementScore(productId, -score, date);
+ log.debug("[Ranking] UNLIKED (removed) productId={}, userId={}, score=-{}", productId, userId, score);
+ } else {
+ log.debug("[Ranking] UNLIKED (not found, skip) productId={}, userId={}", productId, userId);
+ }
+ }
+
+ private LocalDate resolveDate(LocalDateTime occurredAt) {
+ return occurredAt != null ? occurredAt.toLocalDate() : LocalDate.now();
+ }
+}
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingWeight.java b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingWeight.java
new file mode 100644
index 0000000000..e11a5b2d1a
--- /dev/null
+++ b/apps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingWeight.java
@@ -0,0 +1,40 @@
+package com.loopers.domain.ranking;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+/**
+ * 랭킹 점수 가중치 상수.
+ *
+ * 업계 랭킹 신호 비율 기반 설계:
+ *
+ * - 발견 신호(조회): ~10% 기여
+ * - 관여 신호(좋아요): ~25% 기여
+ * - 구매 신호(주문): ~65% 기여
+ *
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RankingWeight {
+
+ /** 조회 1건당 점수. 빈도 최다이므로 가장 낮게 설정. */
+ public static final double VIEW = 0.01;
+
+ /** 좋아요 1건당 점수. SET 멱등 처리와 함께 사용. */
+ public static final double LIKE = 0.3;
+
+ /** 주문 건수 기본 점수 (금액 보너스 전). */
+ public static final double ORDER_BASE = 1.0;
+
+ /** 주문 금액 보너스 스케일링 팩터. score = ORDER_BASE + PRICE_EPSILON × log₁₀(amount + 1) */
+ public static final double PRICE_EPSILON = 0.01;
+
+ /**
+ * 주문 1건의 점수를 계산한다. (건수 우선 + 금액 보너스)
+ *
+ * @param amount 주문 금액 (price × quantity)
+ * @return ORDER_BASE + PRICE_EPSILON × log₁₀(amount + 1)
+ */
+ public static double orderScore(long amount) {
+ return ORDER_BASE + PRICE_EPSILON * Math.log10(amount + 1);
+ }
+}
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/ranking/RankingRepositoryImpl.java b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/ranking/RankingRepositoryImpl.java
new file mode 100644
index 0000000000..2ed758c631
--- /dev/null
+++ b/apps/commerce-streamer/src/main/java/com/loopers/infrastructure/ranking/RankingRepositoryImpl.java
@@ -0,0 +1,57 @@
+package com.loopers.infrastructure.ranking;
+
+import com.loopers.config.redis.RedisConfig;
+import com.loopers.domain.ranking.RankingRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+
+@Repository
+@Slf4j
+public class RankingRepositoryImpl implements RankingRepository {
+
+ private static final String RANKING_KEY_PREFIX = "ranking:all:";
+ private static final String LIKED_KEY_PREFIX = "ranking:liked:";
+ private static final Duration TTL = Duration.ofDays(2);
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
+
+ private final RedisTemplate redisTemplateMaster;
+
+ public RankingRepositoryImpl(
+ @Qualifier(RedisConfig.REDIS_TEMPLATE_MASTER) RedisTemplate redisTemplateMaster) {
+ this.redisTemplateMaster = redisTemplateMaster;
+ }
+
+ @Override
+ public void incrementScore(Long productId, double score, LocalDate date) {
+ String key = RANKING_KEY_PREFIX + date.format(DATE_FORMAT);
+ String member = String.valueOf(productId);
+
+ redisTemplateMaster.opsForZSet().incrementScore(key, member, score);
+ redisTemplateMaster.expire(key, TTL);
+ }
+
+ @Override
+ public boolean addLikeIfAbsent(Long productId, Long userId, LocalDate date) {
+ String key = LIKED_KEY_PREFIX + date.format(DATE_FORMAT);
+ String member = productId + ":" + userId;
+
+ Long result = redisTemplateMaster.opsForSet().add(key, member);
+ redisTemplateMaster.expire(key, TTL);
+ return result != null && result > 0;
+ }
+
+ @Override
+ public boolean removeLikeIfPresent(Long productId, Long userId, LocalDate date) {
+ String key = LIKED_KEY_PREFIX + date.format(DATE_FORMAT);
+ String member = productId + ":" + userId;
+
+ Long result = redisTemplateMaster.opsForSet().remove(key, member);
+ return result != null && result > 0;
+ }
+}
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventProcessor.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventProcessor.java
index ef8e9001e6..ca62e60493 100644
--- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventProcessor.java
+++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventProcessor.java
@@ -2,6 +2,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.loopers.domain.metrics.ProductMetricsService;
+import com.loopers.domain.ranking.RankingService;
+import com.loopers.domain.ranking.RankingWeight;
import com.loopers.infrastructure.monitoring.ConsumerMetrics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -9,6 +11,7 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
+import java.time.LocalDateTime;
import java.util.Map;
/**
@@ -24,6 +27,7 @@
public class CatalogEventProcessor {
private final ProductMetricsService productMetricsService;
+ private final RankingService rankingService;
private final ConsumerMetrics consumerMetrics;
private final ObjectMapper objectMapper;
@@ -55,21 +59,49 @@ public void process(ConsumerRecord
*/
public void processBatch(List> records) {
- Map viewScores = new HashMap<>();
- Map viewCounts = new HashMap<>();
+ Map viewScores = new HashMap<>(); // Redis: productId별 점수 합산
+ Map viewCounts = new HashMap<>(); // DB: (productId, metricDate)별 카운트
List likeEvents = new ArrayList<>();
// Phase 1: 파싱 + VIEWED 합산
@@ -58,8 +58,9 @@ public void processBatch(List> records) {
switch (event.eventType) {
case "PRODUCT_VIEWED" -> {
+ LocalDate metricDate = event.occurredAt.toLocalDate(); // JVM TZ = Asia/Seoul
viewScores.merge(event.productId, RankingWeight.VIEW, Double::sum);
- viewCounts.merge(event.productId, 1, Integer::sum);
+ viewCounts.merge(new ProductDateKey(event.productId, metricDate), 1, Integer::sum);
}
case "PRODUCT_LIKED", "PRODUCT_UNLIKED" -> likeEvents.add(event);
default -> log.warn("[CatalogProcessor] 알 수 없는 eventType: {}", event.eventType);
@@ -68,40 +69,54 @@ public void processBatch(List> records) {
// Phase 2: VIEWED — DB 배치 + Redis Pipeline
if (!viewScores.isEmpty()) {
- // DB: 조회수 배치 합산 증가 (productId당 1회 upsert)
- viewCounts.forEach((productId, count) ->
- productMetricsService.incrementViewCountBy(productId, count));
+ long totalViewEvents = viewCounts.values().stream().mapToInt(Integer::intValue).sum();
- // Redis: Pipeline 배치 처리
+ // DB: 조회수 배치 합산 증가 (productId × metricDate당 1회 upsert, 누적 + daily 동일 TX)
+ viewCounts.forEach((key, count) ->
+ productMetricsService.incrementViewCountBy(key.productId(), count, key.metricDate()));
+
+ // Redis: Pipeline 배치 처리 — 실패 시 메시지는 이미 ack되므로 메트릭으로 손실 가시화
try {
rankingService.incrementScoreBatch(viewScores);
+ consumerMetrics.recordCatalogProcessed(totalViewEvents);
} catch (Exception e) {
- log.warn("[CatalogProcessor] 랭킹 배치 적재 실패 — PRODUCT_VIEWED, products={}", viewScores.keySet(), e);
+ consumerMetrics.recordCatalogFailed(totalViewEvents);
+ log.warn("[CatalogProcessor] 랭킹 배치 적재 실패 — PRODUCT_VIEWED, products={}, lostEvents={}",
+ viewScores.keySet(), totalViewEvents, e);
}
- log.debug("[CatalogProcessor] VIEWED 배치 처리 완료 — {}개 상품, {}건 이벤트",
- viewScores.size(), viewCounts.values().stream().mapToInt(i -> i).sum());
+ log.debug("[CatalogProcessor] VIEWED 배치 처리 완료 — {}개 버킷, {}건 이벤트",
+ viewCounts.size(), totalViewEvents);
}
// Phase 3: LIKED/UNLIKED — 1건씩 처리 (멱등 보장)
for (ParsedEvent event : likeEvents) {
try {
+ LocalDate metricDate = event.occurredAt.toLocalDate();
if ("PRODUCT_LIKED".equals(event.eventType)) {
- productMetricsService.incrementLikeCount(event.productId);
+ productMetricsService.incrementLikeCount(event.productId, metricDate);
rankingService.incrementLikeScoreIfAbsent(
event.productId, event.userId, RankingWeight.LIKE, event.occurredAt);
} else {
- productMetricsService.decrementLikeCount(event.productId);
+ productMetricsService.decrementLikeCount(event.productId, metricDate);
rankingService.decrementLikeScoreIfPresent(
event.productId, event.userId, RankingWeight.LIKE, event.occurredAt);
}
+ consumerMetrics.recordCatalogProcessed();
} catch (Exception e) {
+ consumerMetrics.recordCatalogFailed();
log.warn("[CatalogProcessor] 처리 실패 — {}, productId={}",
event.eventType, event.productId, e);
}
}
}
+ /**
+ * Phase 1 압착 집계용 복합 키. (productId, metricDate) 단위로 카운트를 합산하여
+ * 자정 경계 이벤트가 서로 다른 버킷으로 올바르게 분리되도록 한다.
+ */
+ private record ProductDateKey(Long productId, LocalDate metricDate) {}
+
@SuppressWarnings("unchecked")
private ParsedEvent parseRecord(ConsumerRecord record) {
Object value = record.value();
diff --git a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventProcessor.java b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventProcessor.java
index 1e417ab770..d65df997b9 100644
--- a/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventProcessor.java
+++ b/apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventProcessor.java
@@ -16,6 +16,7 @@
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
+import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@@ -107,6 +108,8 @@ private void handleOrderCreated(Map payload, LocalDateTime occur
List