[volume-10] Batch 기반 주간·월간 랭킹 시스템 구현 - 양권모#413
[volume-10] Batch 기반 주간·월간 랭킹 시스템 구현 - 양권모#413Praesentia-YKM wants to merge 8 commits intoLoopers-dev-lab:Praesentia-YKMfrom
Conversation
- 좋아요/주문/결제 도메인에 이벤트 record 정의 (LikeToggledEvent, OrderPlacedEvent, PaymentCompletedEvent) - LikeTransactionService에서 직접 호출 대신 이벤트 발행으로 전환 - LikeMetricsEventListener: AFTER_COMMIT + REQUIRES_NEW로 좋아요 집계 처리 - OrderFacade.placeOrder(), PaymentFacade.handleCallback()에서 이벤트 발행 - UserActivityEventListener: 모든 도메인 이벤트를 구독하여 유저 행동 로깅 - 캐시 evict 실패 시에도 DB 업데이트가 롤백되지 않도록 try-catch 처리 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Step 2: Transactional Outbox Pattern + Kafka Producer/Consumer - OutboxEvent 엔티티 (2-Phase: PENDING → PROCESSING → PUBLISHED/FAILED) - OutboxEventListener (BEFORE_COMMIT으로 도메인 TX 원자성 보장) - OutboxRelayService (전용 ExecutorService + @scheduled 폴링 + Recovery/Cleanup) - ProductViewedEvent 추가 및 ProductFacade 연동 - Kafka 설정: acks=all, idempotence=true - CatalogEventConsumer (LIKED/UNLIKED/PRODUCT_VIEWED → ProductMetrics 집계) - OrderEventConsumer (ORDER_PLACED/PAYMENT_COMPLETED 로깅) - EventHandled 테이블 기반 Consumer 멱등 처리 - @Version 낙관적 락 충돌 시 재시도 + DLQ 전송 Step 3: Kafka 기반 선착순 쿠폰 발급 - CouponModel에 maxQuantity/issuedCount 추가 - CouponFacade.requestCouponIssue() → Kafka 비동기 위임 - CouponIssueConsumer (비관적 락 + 수량 제한 + UK 중복 방지) - 발급 결과 확인 Polling API (/issue-status) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Step 1: Redis Sorted Set 대기열 진입/순번 조회 API - Step 2: 스케줄러 기반 입장 토큰 발급/검증/TTL + HandlerInterceptor - Step 3: Prometheus 메트릭(Counter/Gauge) + k6 부하테스트 시나리오 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Redis 기반 대기열 시스템 전체 구현 (Step 1~3) 머지
- QueueTokenInterceptor.afterCompletion()에서 주문 성공 시 토큰 자동 삭제 - QueueTokenService.removeToken() 위임 메서드 추가 - 토큰 TTL 만료 통합 테스트 추가 (QueueTokenExpiryIntegrationTest) - 처리량 초과 통합 테스트 추가 (QueueThroughputIntegrationTest) - Interceptor 토큰 삭제 단위 테스트 3건 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- 이벤트별 가중치 점수 계산 (조회 0.1, 좋아요 0.2, 주문 0.7*log10) - Kafka Consumer에서 ZSET ZINCRBY로 실시간 랭킹 갱신 - 랭킹 조회 API (GET /api/v1/rankings) + 상품 상세에 rank 필드 추가 - OrderPlacedEvent에 items 확장하여 상품별 주문 랭킹 반영 - E2E 테스트 및 .http 파일 포함 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- RankingScoreCalculator를 modules/jpa로 이동 (api/batch 공유) - ProductMetricsDaily 엔티티 추가 (일별 상품 지표 집계) - MvProductRankWeekly/Monthly MV 엔티티 + Repository 추가 - WeeklyRankingJob / MonthlyRankingJob Spring Batch 구현 (Reader/Processor/Writer + Activate/ClearOldVersion Tasklet) - Ranking API에 주간/월간 조회 엔드포인트 확장 - Catalog/OrderEventConsumer 에서 일별 지표 반영 - 관련 단위/E2E 테스트 추가 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughQueue 대기 시스템, Ranking 순위 산출, Event-driven Outbox 패턴, Coupon 비동기 발급, Like 토글 메트릭 추적, 주간/월간 Batch 순위 계산, Product 상세조회 순위 통합, Kafka 스트리밍 컨슈머를 구현한다. Redis 정렬 집합, Spring Batch 작업, 트랜잭션 이벤트 리스너, Kafka 토픽 관리, 토큰 라이프사이클 관리를 포함한다. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant QueueAPI as Queue API
participant QueueService
participant QueueScheduler
participant TokenService
participant OrderAPI as Order API
participant QueueInterceptor
User->>QueueAPI: POST /api/v1/queue/enter<br/>(eventId, userId)
QueueAPI->>QueueService: enter(eventId, userId)
QueueService->>QueueService: add to Redis ZSET<br/>calculate position
QueueService-->>QueueAPI: QueueStatus(position, totalWaiting)
QueueAPI-->>User: 200 OK {position: 1}
loop Every 3 seconds
QueueScheduler->>QueueService: popFront(eventId, BATCH_SIZE=10)
QueueService->>QueueService: pop 10 users from ZSET
loop For each user
QueueScheduler->>TokenService: issueToken(eventId, userId)
TokenService->>TokenService: generate UUID token<br/>store in Redis w/ 300s TTL
TokenService-->>QueueScheduler: token
end
end
User->>QueueAPI: GET /api/v1/queue/position<br/>(eventId, userId)
QueueAPI->>QueueService: getPosition(eventId, userId)
QueueService-->>QueueAPI: QueueStatus(..., token)
QueueAPI-->>User: 200 OK {token: "uuid"}
User->>OrderAPI: POST /api/v1/orders<br/>Headers: X-Queue-Token, X-User-Id, X-Event-Id
OrderAPI->>QueueInterceptor: preHandle()
QueueInterceptor->>TokenService: validateToken(eventId, userId, token)
TokenService-->>QueueInterceptor: valid
QueueInterceptor-->>OrderAPI: request allowed
OrderAPI->>OrderAPI: process order
OrderAPI-->>User: 200 OK {orderId: 123}
OrderAPI->>QueueInterceptor: afterCompletion(status=200)
QueueInterceptor->>TokenService: removeToken(eventId, userId)
TokenService->>TokenService: delete from Redis
sequenceDiagram
participant App as Application
participant EventPub as Event Publisher
participant OutboxListener as Outbox Listener
participant OutboxDB as Outbox Table
participant RelayService as Relay Service
participant Kafka
participant Consumer as Streamer Consumer
participant MetricsDB as Metrics DB
App->>App: process business logic
App->>EventPub: publish LikeToggledEvent<br/>(productId, liked)
EventPub->>OutboxListener: onApplicationEvent()
OutboxListener->>OutboxDB: save OutboxEvent<br/>(status=PENDING)
OutboxDB-->>OutboxListener: eventId saved
loop Every 1 second
RelayService->>OutboxDB: findPendingEventsForUpdate(BATCH_SIZE)
OutboxDB-->>RelayService: [OutboxEvent]
par Async Publish
RelayService->>OutboxDB: markProcessing()
loop For each event
RelayService->>Kafka: send ProducerRecord<br/>(topic, payload, headers)
Kafka-->>RelayService: ack (10s timeout)
end
RelayService->>OutboxDB: markPublished()
end
end
Kafka-->>Consumer: consume batch<br/>(catalog-events)
Consumer->>Consumer: extract X-Event-Id<br/>check EventHandled
Consumer->>MetricsDB: increment like_count<br/>increment ranking score
Consumer->>MetricsDB: save EventHandled<br/>(eventId, eventType)
Consumer->>Kafka: commit batch
sequenceDiagram
participant Batch as Batch Job
participant Reader as Reader (DB)
participant Processor as Processor
participant RedisRanking as Redis Ranking
participant Writer as Writer (DB)
participant VersionMgr as Version Manager
Batch->>Batch: start WeeklyRankingJob
Batch->>Batch: clearOldVersionStep
Batch->>VersionMgr: getNextWeeklyVersion()
VersionMgr-->>Batch: nextVersion=2
Batch->>Writer: deleteByVersion(2)
Batch->>Batch: store nextVersion in context
Batch->>Batch: aggregateStep (chunk=100)
loop Read batches
Batch->>Reader: findByDateRange(startDate, endDate)
Reader-->>Batch: [ProductMetricsDaily]
loop Process items
Batch->>Processor: process(metricsDaily)
Processor->>Processor: aggregate per product<br/>calculate score
Processor->>Processor: assign rank=counter++
Processor-->>Batch: ProductRankWeekly
end
loop Write batch
Batch->>Writer: saveAll([ProductRankWeekly])
end
end
Batch->>Batch: activateVersionStep
Batch->>VersionMgr: getCurrentWeeklyVersion()
VersionMgr-->>Batch: oldVersion=1
Batch->>VersionMgr: activateWeeklyVersion(2)
Batch->>Writer: deleteByVersion(1)
Batch-->>Batch: version 1 → 2
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 9
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
apps/commerce-api/src/main/java/com/loopers/application/payment/PaymentFacade.java (1)
70-86:⚠️ Potential issue | 🟠 Major
status가SUCCESS/FAILED어느 쪽도 아닐 때도 이벤트가 발행된다.현재 코드는
if/else if로 상태를 분기한 뒤, 분기 밖에서 무조건PaymentCompletedEvent를 발행한다. PG가PENDING/CANCELED/UNKNOWN등 예기치 못한 값으로 콜백을 보내면 결제/주문 상태는 그대로인 채success=false인 "완료 이벤트"만 유포되어, 이를 소비하는 Outbox/활동 로깅/후속 Kafka 컨슈머가 실패로 오인하고 사용자 알림·보상 로직이 잘못 실행될 수 있다. 운영상 콜백 재시도·리플레이와 겹치면 영향이 확장된다.수정안:
- 유효 상태에서만 발행한다.
- 알 수 없는 상태는 예외로 전환해 PG 콜백 재시도/관측을 유도한다.
🛠️ 제안 diff
- if ("SUCCESS".equals(status)) { - payment.markSuccess(); - order.confirmPayment(); - } else if ("FAILED".equals(status)) { - payment.markFailed(failureReason); - order.failPayment(); - } - - eventPublisher.publishEvent(new PaymentCompletedEvent( - payment.getId(), payment.orderId(), payment.userId(), "SUCCESS".equals(status) - )); + final boolean success; + if ("SUCCESS".equals(status)) { + payment.markSuccess(); + order.confirmPayment(); + success = true; + } else if ("FAILED".equals(status)) { + payment.markFailed(failureReason); + order.failPayment(); + success = false; + } else { + log.warn("알 수 없는 PG 콜백 상태 - transactionKey: {}, status: {}", transactionKey, status); + throw new CoreException(ErrorType.BAD_REQUEST, "지원하지 않는 결제 콜백 상태다."); + } + + eventPublisher.publishEvent(new PaymentCompletedEvent( + payment.getId(), payment.orderId(), payment.userId(), success + ));추가 테스트:
status=SUCCESS/FAILED각각에서 이벤트 발행 1회 및success플래그 검증.status=UNKNOWN/null에서 예외 발생 및 이벤트 미발행, 결제/주문 상태 불변 검증.- 동일
transactionKey로 재콜백 수신 시 중복 상태 전이 방지(멱등) 회귀 테스트.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/main/java/com/loopers/application/payment/PaymentFacade.java` around lines 70 - 86, The handleCallback method currently always publishes PaymentCompletedEvent regardless of status; change it to only act and publish when status is a known terminal value ("SUCCESS" or "FAILED"): inside handleCallback (using paymentService.getByTransactionKey, payment.markSuccess, payment.markFailed, order.confirmPayment, order.failPayment) validate the incoming status, throw an exception for unknown/null statuses to trigger PG retry/observation, and only call eventPublisher.publishEvent(new PaymentCompletedEvent(...)) when you have executed a valid state transition; additionally ensure idempotency by checking payment’s current state before applying markSuccess/markFailed so repeated callbacks for the same transactionKey do not re-emit events or re-transition terminal payments.apps/commerce-api/src/test/java/com/loopers/application/product/ProductFacadeTest.java (1)
102-130:⚠️ Potential issue | 🟡 Minor새 의존성(ApplicationEventPublisher, RankingFacade) 행위 검증 누락.
ProductFacade에RankingFacade와ApplicationEventPublisher가 추가되었음에도GetProduct#returnsProductDetail는 기존 필드(name/brandName/stockStatus)만 검증하며, 신규 행위인 다음 두 가지를 검증하지 않는다.
ProductViewedEvent발행 여부 및 payload(productId 일치)RankingFacade에서 가져온 rank 값이ProductDetail에 반영되는지운영 관점에서 이 둘이 회귀하면 아웃박스 미발행(→ 랭킹 스코어 누락)과 상세 응답의 rank 누락으로 이어져 사용자 체감 품질이 저하된다. 아래와 같이 verify/assert를 추가할 것을 권장한다.
♻️ 추가 테스트 예시
+ when(rankingFacade.getRank(productId)).thenReturn(/* 기대 rank */); + ProductDetail result = productFacade.getProduct(productId); assertAll( () -> assertThat(result.name()).isEqualTo("에어맥스"), () -> assertThat(result.brandName()).isEqualTo("나이키"), - () -> assertThat(result.stockStatus()).isEqualTo(StockStatus.IN_STOCK) + () -> assertThat(result.stockStatus()).isEqualTo(StockStatus.IN_STOCK), + () -> verify(eventPublisher).publishEvent(any(ProductViewedEvent.class)) );As per coding guidelines: "단위 테스트는 경계값/실패 케이스/예외 흐름을 포함하는지 점검한다" 및 "Mock 남용으로 의미가 약해지면 테스트 방향을 재정렬하도록 제안한다" — 주입만 하고 검증하지 않는 mock은 테스트 의미를 약화시킨다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-api/src/test/java/com/loopers/application/product/ProductFacadeTest.java` around lines 102 - 130, The test GetProduct#returnsProductDetail needs to verify the new side-effects introduced to ProductFacade: assert that ApplicationEventPublisher.publishEvent was called with a ProductViewedEvent whose productId equals productId, and assert that RankingFacade.getRank (or the mocked ranking call used by productFacade) is invoked and its returned rank is reflected in the returned ProductDetail (e.g., result.rank() equals the mocked rank). Update the test to mock RankingFacade to return a known rank, add verify(applicationEventPublisher).publishEvent(argThat(event -> ((ProductViewedEvent)event).getProductId().equals(productId))) and verify(rankingFacade).getRank(productId) (or the concrete method used), and add an assertion for result.rank() matching the mocked value while keeping existing assertions for name/brandName/stockStatus.
♻️ Duplicate comments (1)
apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventConsumer.java (1)
181-198:⚠️ Potential issue | 🟠 Major
OrderEventConsumer.sendToDlq와 동일한getMessage()NPE 리스크이다.
exception.getMessage()가 null인 경우(대표적으로 NPE).getBytes(...)단계에서 다시 NPE가 발생해 DLQ 전송이 실패하고 메시지가 유실된다. 해당 파일의 수정안과 동일하게 null 안전 처리 및 예외 클래스명 fallback을 적용해야 한다. 테스트도 message가 null인 예외로sendToDlq호출 시 DLT 발행에 성공함을 검증하는 단위 테스트를 공통 helper로 추출해 두는 것을 권장한다.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventConsumer.java` around lines 181 - 198, The sendToDlq method risks an NPE when exception.getMessage() is null; update sendToDlq to null-safe the error header by computing a non-null string (e.g., String errMsg = exception.getMessage() != null ? exception.getMessage() : exception.getClass().getSimpleName()) before calling getBytes, use that errMsg for the "X-Error-Message" RecordHeader, and keep the existing copy of original headers and "X-Original-Topic" logic; also add/adjust a unit test (reusable helper) that calls CatalogEventConsumer.sendToDlq with an Exception whose getMessage() returns null and asserts the DLT publish succeeds and contains the fallback error header.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c061c31f-42da-4682-876a-52b9ac8d6b9d
⛔ Files ignored due to path filters (6)
docs/plans/2026-03-27-step1-application-event.mdis excluded by!**/*.mdand included by**docs/plans/2026-04-03-queue-system-design.mdis excluded by!**/*.mdand included by**docs/plans/2026-04-03-step1-queue-system.mdis excluded by!**/*.mdand included by**docs/plans/2026-04-03-step2-token-scheduler.mdis excluded by!**/*.mdand included by**docs/plans/2026-04-03-step3-monitoring-k6.mdis excluded by!**/*.mdand included by**docs/requirements/round7-requirement.mdis excluded by!**/*.mdand included by**
📒 Files selected for processing (137)
.http/queue.http.http/ranking.httpapps/commerce-api/build.gradle.ktsapps/commerce-api/src/main/java/com/loopers/application/coupon/CouponFacade.javaapps/commerce-api/src/main/java/com/loopers/application/coupon/CouponIssueService.javaapps/commerce-api/src/main/java/com/loopers/application/like/LikeMetricsEventListener.javaapps/commerce-api/src/main/java/com/loopers/application/like/LikeTransactionService.javaapps/commerce-api/src/main/java/com/loopers/application/logging/UserActivityEventListener.javaapps/commerce-api/src/main/java/com/loopers/application/order/OrderFacade.javaapps/commerce-api/src/main/java/com/loopers/application/outbox/OutboxEventListener.javaapps/commerce-api/src/main/java/com/loopers/application/payment/PaymentFacade.javaapps/commerce-api/src/main/java/com/loopers/application/product/ProductDetail.javaapps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueConstants.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueEntryScheduler.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueFacade.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueInfo.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueMetrics.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueService.javaapps/commerce-api/src/main/java/com/loopers/application/queue/QueueTokenService.javaapps/commerce-api/src/main/java/com/loopers/application/ranking/RankingFacade.javaapps/commerce-api/src/main/java/com/loopers/application/ranking/RankingInfo.javaapps/commerce-api/src/main/java/com/loopers/application/ranking/RankingService.javaapps/commerce-api/src/main/java/com/loopers/application/ranking/RankingWithProduct.javaapps/commerce-api/src/main/java/com/loopers/config/KafkaTopicConfig.javaapps/commerce-api/src/main/java/com/loopers/config/OutboxRelayConfig.javaapps/commerce-api/src/main/java/com/loopers/config/QueueTokenInterceptor.javaapps/commerce-api/src/main/java/com/loopers/config/WebMvcConfig.javaapps/commerce-api/src/main/java/com/loopers/domain/coupon/CouponModel.javaapps/commerce-api/src/main/java/com/loopers/domain/coupon/event/CouponIssueRequestedEvent.javaapps/commerce-api/src/main/java/com/loopers/domain/like/event/LikeToggledEvent.javaapps/commerce-api/src/main/java/com/loopers/domain/order/event/OrderPlacedEvent.javaapps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEvent.javaapps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxEventEnvelope.javaapps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxRepository.javaapps/commerce-api/src/main/java/com/loopers/domain/outbox/OutboxStatus.javaapps/commerce-api/src/main/java/com/loopers/domain/payment/event/PaymentCompletedEvent.javaapps/commerce-api/src/main/java/com/loopers/domain/product/event/ProductViewedEvent.javaapps/commerce-api/src/main/java/com/loopers/domain/queue/QueueRepository.javaapps/commerce-api/src/main/java/com/loopers/domain/queue/QueueTokenRepository.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankMonthly.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/MvProductRankWeekly.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/MvRankingRepository.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingEntry.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingKeyGenerator.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingPeriod.javaapps/commerce-api/src/main/java/com/loopers/domain/ranking/RankingRepository.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxJpaRepository.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRelayService.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRepositoryImpl.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/queue/QueueRepositoryImpl.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/queue/QueueTokenRepositoryImpl.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankWeeklyJpaRepository.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvRankingRepositoryImpl.javaapps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/RankingRepositoryImpl.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/coupon/CouponV1Controller.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/coupon/CouponV1Dto.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/product/ProductV1Dto.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/queue/QueueV1ApiSpec.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/queue/QueueV1Controller.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/queue/QueueV1Dto.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1ApiSpec.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Controller.javaapps/commerce-api/src/main/java/com/loopers/interfaces/api/ranking/RankingV1Dto.javaapps/commerce-api/src/test/java/com/loopers/application/like/LikeMetricsEventListenerTest.javaapps/commerce-api/src/test/java/com/loopers/application/like/LikeTransactionServiceTest.javaapps/commerce-api/src/test/java/com/loopers/application/logging/UserActivityEventListenerTest.javaapps/commerce-api/src/test/java/com/loopers/application/order/OrderFacadeTest.javaapps/commerce-api/src/test/java/com/loopers/application/outbox/OutboxEventListenerTest.javaapps/commerce-api/src/test/java/com/loopers/application/payment/PaymentFacadeTest.javaapps/commerce-api/src/test/java/com/loopers/application/product/ProductFacadeTest.javaapps/commerce-api/src/test/java/com/loopers/application/queue/QueueEntrySchedulerTest.javaapps/commerce-api/src/test/java/com/loopers/application/queue/QueueFacadeTest.javaapps/commerce-api/src/test/java/com/loopers/application/queue/QueueServiceTest.javaapps/commerce-api/src/test/java/com/loopers/application/queue/QueueTokenServiceTest.javaapps/commerce-api/src/test/java/com/loopers/config/QueueTokenInterceptorTest.javaapps/commerce-api/src/test/java/com/loopers/domain/coupon/CouponModelQuantityTest.javaapps/commerce-api/src/test/java/com/loopers/domain/like/event/LikeToggledEventTest.javaapps/commerce-api/src/test/java/com/loopers/domain/outbox/OutboxEventTest.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/queue/QueueConcurrencyIntegrationTest.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/queue/QueueRepositoryImplIntegrationTest.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/queue/QueueThroughputIntegrationTest.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/queue/QueueTokenExpiryIntegrationTest.javaapps/commerce-api/src/test/java/com/loopers/infrastructure/queue/QueueTokenRepositoryImplIntegrationTest.javaapps/commerce-api/src/test/java/com/loopers/interfaces/api/queue/QueueV1ApiE2ETest.javaapps/commerce-api/src/test/java/com/loopers/interfaces/api/ranking/RankingV1ApiE2ETest.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/MonthlyRankingJobConfig.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/WeeklyRankingJobConfig.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/ActivateVersionTasklet.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/ClearOldVersionTasklet.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyActivateVersionTasklet.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyClearOldVersionTasklet.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyRankingProcessor.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyRankingReader.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyRankingWriter.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/WeeklyRankingProcessor.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/WeeklyRankingReader.javaapps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/WeeklyRankingWriter.javaapps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductMetricsAggregation.javaapps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductRankMonthly.javaapps/commerce-batch/src/main/java/com/loopers/domain/ranking/ProductRankWeekly.javaapps/commerce-batch/src/main/java/com/loopers/domain/ranking/RankingVersionManager.javaapps/commerce-batch/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsDailyBatchRepository.javaapps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductRankMonthlyJpaRepository.javaapps/commerce-batch/src/main/java/com/loopers/infrastructure/ranking/ProductRankWeeklyJpaRepository.javaapps/commerce-batch/src/test/java/com/loopers/CommerceBatchApplicationTest.javaapps/commerce-batch/src/test/java/com/loopers/domain/ranking/ProductRankWeeklyModelTest.javaapps/commerce-batch/src/test/java/com/loopers/job/ranking/MonthlyRankingJobE2ETest.javaapps/commerce-batch/src/test/java/com/loopers/job/ranking/WeeklyRankingJobE2ETest.javaapps/commerce-streamer/src/main/java/com/loopers/domain/coupon/CouponEntity.javaapps/commerce-streamer/src/main/java/com/loopers/domain/coupon/CouponIssueEntity.javaapps/commerce-streamer/src/main/java/com/loopers/domain/idempotency/EventHandled.javaapps/commerce-streamer/src/main/java/com/loopers/domain/metrics/ProductMetrics.javaapps/commerce-streamer/src/main/java/com/loopers/domain/ranking/RankingKeyGenerator.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/coupon/CouponIssueJpaRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/coupon/CouponJpaRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/idempotency/EventHandledJpaRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsDailyJpaRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/metrics/ProductMetricsJpaRepository.javaapps/commerce-streamer/src/main/java/com/loopers/infrastructure/ranking/RankingRedisRepository.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CouponIssueConsumer.javaapps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.javaapps/commerce-streamer/src/test/java/com/loopers/domain/metrics/ProductMetricsTest.javaapps/commerce-streamer/src/test/java/com/loopers/domain/ranking/RankingKeyGeneratorTest.javaapps/commerce-streamer/src/test/java/com/loopers/domain/ranking/RankingScoreCalculatorTest.javadocs/queue-simulator.htmlmodules/jpa/src/main/java/com/loopers/domain/metrics/ProductMetricsDaily.javamodules/jpa/src/main/java/com/loopers/domain/metrics/ProductMetricsDailyId.javamodules/jpa/src/main/java/com/loopers/domain/ranking/RankingScoreCalculator.javamodules/jpa/src/test/java/com/loopers/domain/metrics/ProductMetricsDailyModelTest.javamodules/jpa/src/test/java/com/loopers/domain/ranking/RankingScoreCalculatorTest.javamodules/kafka/src/main/resources/kafka.ymlsupports/k6/queue-scheduler.jssupports/k6/queue-spike.jssupports/k6/queue-ttl.js
| @Cacheable(cacheNames = "productDetail", key = "#productId") | ||
| @Transactional(readOnly = true) | ||
| @Transactional | ||
| public ProductDetail getProduct(Long productId) { | ||
| ProductModel product = productService.getById(productId); | ||
| String brandName = getBrandName(product.getBrandId()); | ||
| StockModel stock = stockService.getByProductId(productId); | ||
| return ProductDetail.ofCustomer(product, brandName, StockStatus.from(stock.getQuantity())); | ||
| Long rank = rankingFacade.getProductRank(productId); | ||
| eventPublisher.publishEvent(new ProductViewedEvent(productId, null)); | ||
| return ProductDetail.ofCustomer(product, brandName, StockStatus.from(stock.getQuantity()), rank); | ||
| } |
There was a problem hiding this comment.
@Cacheable 메서드 내부에서 이벤트 발행과 rank 포함이 결합되어 있다. 치명적 결함이다.
운영/기능 관점 이슈 3건이 한 메서드에 얽혀 있다.
-
캐시 히트 시
ProductViewedEvent가 발행되지 않는다.@Cacheable은 캐시 적중 시 메서드 본문을 건너뛰므로, 인기 상품일수록(=캐시 히트율이 높을수록) 조회 이벤트가 유실된다. 이 이벤트는 commerce-streamer의 ZINCRBY로 실시간 랭킹을 만드는 입력이므로, 결과적으로 "인기 상품일수록 랭킹에 덜 잡히는" 역전 현상이 생긴다. 일간 Redis ZSET 설계의 근간이 무너진다. -
rank가 캐시에 포함된다.ProductDetail에 실린rank는 본 메서드 반환값과 함께productDetail캐시로 들어가지만, 실제 랭킹은 이벤트 수신에 따라 초 단위로 변동한다. 그 결과 캐시 TTL 동안 오래된 순위 값이 반환되어 사용자에게 보인다. 캐시 무효화 주기와 랭킹 갱신 주기가 완전히 어긋난다. -
@Transactional을readOnly에서 쓰기 트랜잭션으로 바꾼 의도가 불명확하다. 본 메서드는 상태를 변경하지 않으며, 이벤트 발행도AFTER_COMMIT리스너로 동작한다면 읽기 트랜잭션으로 충분하다. 불필요한 커밋 경로·락 경합이 생긴다. -
new ProductViewedEvent(productId, null)의null이 userId라면 트래킹·개인화 집계에서 사용자 식별이 영구 불가능해진다. 필요 시SecurityContext또는 헤더에서 사용자 식별자를 주입해야 한다.
수정 방향을 권장한다.
- 조회와 이벤트 발행을 분리해, 이벤트 발행은
@Cacheable이 걸리지 않은 별도 퍼블릭 메서드(또는 컨트롤러 어드바이스)에서 수행한다. rank는 캐시 대상에서 제외하고, 캐시된ProductDetail에 조회 시점에rank를 덧붙여 반환한다(데코레이션).@Transactional(readOnly = true)로 환원한다.- 사용자 식별자는 가능한 경우 실제 값을 전달하고, 익명 조회 정책을 주석으로 명시한다.
추가 테스트:
- 동일 상품을 2회 연속 조회 시 이벤트가 2회 발행되는지 검증(캐시 히트에도 발행).
- 캐시 TTL 내에서 Redis ZSET 점수가 변경된 뒤 다시 조회 시
rank가 최신값으로 반영되는지 검증.
🛠 제안 diff (요지)
- `@Cacheable`(cacheNames = "productDetail", key = "#productId")
- `@Transactional`
- public ProductDetail getProduct(Long productId) {
- ProductModel product = productService.getById(productId);
- String brandName = getBrandName(product.getBrandId());
- StockModel stock = stockService.getByProductId(productId);
- Long rank = rankingFacade.getProductRank(productId);
- eventPublisher.publishEvent(new ProductViewedEvent(productId, null));
- return ProductDetail.ofCustomer(product, brandName, StockStatus.from(stock.getQuantity()), rank);
- }
+ public ProductDetail getProduct(Long productId, Long userId) {
+ ProductDetail cached = getProductCached(productId);
+ eventPublisher.publishEvent(new ProductViewedEvent(productId, userId));
+ Long rank = rankingFacade.getProductRank(productId);
+ return cached.withRank(rank);
+ }
+
+ `@Cacheable`(cacheNames = "productDetail", key = "#productId")
+ `@Transactional`(readOnly = true)
+ public ProductDetail getProductCached(Long productId) {
+ ProductModel product = productService.getById(productId);
+ String brandName = getBrandName(product.getBrandId());
+ StockModel stock = stockService.getByProductId(productId);
+ return ProductDetail.ofCustomer(product, brandName, StockStatus.from(stock.getQuantity()), null);
+ }As per coding guidelines ("엔티티/값 객체/DTO 경계를 명확히 하고, 불변성과 캡슐화를 점검한다" 및 "도메인 규칙과 인프라 관심사가 섞이면 분리하도록 제안한다").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-api/src/main/java/com/loopers/application/product/ProductFacade.java`
around lines 45 - 54, The getProduct method currently combines caching, event
publishing and rank inclusion causing missed events on cache hits, stale rank in
cache, and unnecessary write transaction; split responsibilities by making
getProduct a readOnly cached method that only returns cacheable product details
without rank or event (adjust `@Transactional` to readOnly = true and remove event
publish and rankingFacade usage from ProductFacade.getProduct), add a separate
public non-cached method (e.g., publishProductView or recordProductView called
from controller or advice) that always publishes ProductViewedEvent (use real
userId from SecurityContext if available instead of null), and decorate the
cached ProductDetail at response time by fetching current rank from
rankingFacade and attaching it before returning to clients (so ProductDetail
stays cacheable but rank is computed dynamically).
| public void relay() { | ||
| // Phase 1: PENDING → PROCESSING | ||
| List<OutboxEvent> events = fetchAndMarkProcessing(); | ||
| if (events.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| // Phase 2: partitionKey별 그루핑 → Kafka 발행 | ||
| Map<String, List<OutboxEvent>> grouped = events.stream() | ||
| .collect(Collectors.groupingBy(OutboxEvent::getPartitionKey)); | ||
|
|
||
| List<CompletableFuture<Void>> futures = new ArrayList<>(); | ||
| for (Map.Entry<String, List<OutboxEvent>> entry : grouped.entrySet()) { | ||
| CompletableFuture<Void> future = CompletableFuture.runAsync( | ||
| () -> publishEvents(entry.getValue()), | ||
| outboxRelayExecutor | ||
| ); | ||
| futures.add(future); | ||
| } | ||
|
|
||
| CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); | ||
| log.info("Outbox relay 완료: {}건 처리", events.size()); | ||
| } | ||
|
|
||
| @Transactional | ||
| public List<OutboxEvent> fetchAndMarkProcessing() { | ||
| List<OutboxEvent> events = outboxRepository.findPendingEventsForUpdate(BATCH_SIZE); | ||
| events.forEach(OutboxEvent::markProcessing); | ||
| return events; | ||
| } |
There was a problem hiding this comment.
Self-invocation으로 @Transactional이 적용되지 않아 FOR UPDATE SKIP LOCKED 락이 소실된다.
운영 관점에서 치명적이다. relay()가 같은 클래스의 fetchAndMarkProcessing()을 this 참조로 직접 호출하므로 Spring AOP 프록시를 거치지 않고, @Transactional이 무효화된다. 그 결과 다음 문제가 연쇄적으로 발생한다.
findPendingEventsForUpdate(..)의FOR UPDATE SKIP LOCKED가 트랜잭션 범위를 가지지 못해 쿼리 종료 즉시 락이 해제되고, 멀티 인스턴스/다중 스케줄 환경에서 동일 이벤트가 두 번PROCESSING으로 마킹되어 중복 Kafka 발행이 발생할 수 있다.events.forEach(OutboxEvent::markProcessing)의 엔티티 상태 변경이 dirty checking으로 커밋되지 않아 DB에는 여전히PENDING으로 남을 수 있다. 이 경우recoverStalledEvents도 탐지할 수 없는 "유령 처리중" 상태가 된다.- Phase 2의 Kafka 발행은 트랜잭션이 열려 있지 않은 상태에서 진행되므로, 그룹 단위 예외/타임아웃 시 상태 정합성 보장이 약하다.
수정안은 두 가지다.
🔧 수정안 1: 분리된 빈(권장) — 프록시 경계를 강제한다
- `@Transactional`
- public List<OutboxEvent> fetchAndMarkProcessing() {
- List<OutboxEvent> events = outboxRepository.findPendingEventsForUpdate(BATCH_SIZE);
- events.forEach(OutboxEvent::markProcessing);
- return events;
- }
+ // 별도 `@Component` (예: OutboxRelayTxService)로 분리하여 주입받아 호출한다.
+ // 예: this.outboxRelayTxService.fetchAndMarkProcessing();별도 트랜잭셔널 빈으로 분리하면 자기호출 문제를 구조적으로 제거할 수 있고, 조회+상태전이+flush를 하나의 커밋 경계 안에 둘 수 있다.
🔧 수정안 2: 진입점(`relay`)에 트랜잭션 경계 지정은 지양
relay()에 @Transactional을 붙이면 Phase 2의 Kafka I/O까지 단일 트랜잭션에 묶여 장시간 락/커넥션 점유가 발생한다. 반드시 Phase 1(조회+상태전이)만 트랜잭션으로 감싸야 한다.
추가 테스트가 필요하다.
- 두 개 이상의 인스턴스(혹은 두 스레드)에서 동시에
relay()를 호출해도 동일OutboxEvent가PROCESSING으로 한 번만 마킹되는지 Testcontainers MySQL 기반 통합 테스트로 검증한다. fetchAndMarkProcessing()리턴 직후 DB에서 재조회하여 상태가PROCESSING으로 커밋되어 있는지 단정한다.
As per coding guidelines ("트랜잭션 경계(@Transactional) 위치와 전파, readOnly, 롤백 조건을 점검한다").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/outbox/OutboxRelayService.java`
around lines 53 - 82, The issue: self-invocation in OutboxRelayService.relay()
calls the `@Transactional` method fetchAndMarkProcessing() directly so the FOR
UPDATE SKIP LOCKED transaction is not applied; fix by moving the transactional
phase into a separate Spring bean (e.g., OutboxFetcherService) with a
`@Transactional` method fetchAndMarkProcessing() that calls
outboxRepository.findPendingEventsForUpdate(BATCH_SIZE), calls
OutboxEvent::markProcessing, and flushes/commits before returning; then have
OutboxRelayService.relay() call that new bean's fetchAndMarkProcessing(), keep
publishEvents(...) and outboxRelayExecutor usage unchanged, and avoid annotating
relay() with `@Transactional` to prevent long-lived transactions during Kafka I/O.
| public interface MvProductRankMonthlyJpaRepository extends JpaRepository<MvProductRankMonthly, Long> { | ||
| List<MvProductRankMonthly> findAllByOrderByRankingAsc(Pageable pageable); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# 주간 Repository도 동일 패턴인지 확인
fd -t f MvProductRankWeeklyJpaRepository.java -x cat {}
# MvRankingRepositoryImpl에서 version 인자를 어떻게 사용하는지 확인
fd -t f MvRankingRepositoryImpl.java -x cat {}
# 활성 버전 주입 경로 확인
rg -nP -C3 'RankingVersionManager|activeVersion|currentVersion' --type=javaRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 10648
🏁 Script executed:
# Check the entity models to confirm 'version' field exists
fd -t f -E '*.class' \( -name 'MvProductRankMonthly.java' -o -name 'MvProductRankWeekly.java' \) -x cat {}Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 315
🏁 Script executed:
# Check the tasklet implementations for actual version handling
fd -t f \( -name 'ClearOldVersionTasklet.java' -o -name 'ActivateVersionTasklet.java' \) | head -2 | xargs -I {} sh -c 'echo "=== {} ===" && cat {}'Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 315
🏁 Script executed:
# Look for any integration tests related to ranking batch or version management
fd -t f -path '*/test/*' -name '*Rank*Test*.java' -o -path '*/test/*' -name '*Version*Test*.java' | xargs -l1 sh -c 'wc -l "$1" 2>/dev/null | awk "{print \$1, \"$1\"}"' _ | sort -rn | head -5Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 253
🏁 Script executed:
# Find and display entity models
find . -type f -name 'MvProductRankMonthly.java' -o -name 'MvProductRankWeekly.java' | xargs -I {} sh -c 'echo "=== {} ===" && cat {}'Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 1825
🏁 Script executed:
# Check tasklet implementations
find . -type f \( -name 'ClearOldVersionTasklet.java' -o -name 'ActivateVersionTasklet.java' \) | xargs catRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 3440
🏁 Script executed:
# Search for test files related to ranking
find . -path '*/test/*' -type f \( -name '*Rank*Test*.java' -o -name '*Version*Test*.java' \) | head -10Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 732
🏁 Script executed:
# Check if there are any other query methods on the MV repositories
rg 'findAll|findBy' apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/ --type java -A 1Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 1365
🏁 Script executed:
# Check if deleteByVersion method exists on repositories
rg 'deleteByVersion' apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/ --type java -B2 -A2Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 67
🏁 Script executed:
# Check the E2E tests to see if they validate version isolation
cat ./apps/commerce-batch/src/test/java/com/loopers/job/ranking/WeeklyRankingJobE2ETest.javaRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 4311
🏁 Script executed:
# Check if RankingVersionManager has getCurrentWeeklyVersion/Monthly methods
rg 'getCurrentWeekly|getCurrentMonth|getNextWeekly|getNextMonth' --type java -B1 -A3Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 4937
활성 version 필터가 누락되어 버전 스왑 전략이 깨진다
PR 설계상 MV는 "버전 기반 스왑으로 무중단 조회"를 전제로 하며, ClearOldVersion → Aggregate(INSERT) → ActivateVersion 3-step으로 동작한다. 그러나 findAllByOrderByRankingAsc(Pageable)는 version 컬럼을 조건에 전혀 사용하지 않아, 배치 실행 도중에는 이전 버전 + 신규 적재 중인 버전이 섞여 ranking 값이 중복/누락되어 정렬되고, 조회 API가 순위를 왜곡 반환한다. Pageable로 top-N만 반환해도 서로 다른 버전의 동일 ranking=1 행이 섞이면 운영상 치명적이다.
동일 이슈가 주간 Repository(MvProductRankWeeklyJpaRepository)에도 존재한다. 활성 버전을 받아 필터링하는 쿼리로 교체하고, RankingVersionManager에서 활성 버전을 주입받도록 해야 한다. 또한 버전이 섞여 들어오는 회귀를 잡는 통합 테스트(구 버전 + 신 버전 동시 INSERT 상태에서 조회 결과가 단일 버전으로 한정되는지)를 추가해야 한다.
🐛 제안 수정
-public interface MvProductRankMonthlyJpaRepository extends JpaRepository<MvProductRankMonthly, Long> {
- List<MvProductRankMonthly> findAllByOrderByRankingAsc(Pageable pageable);
-}
+public interface MvProductRankMonthlyJpaRepository extends JpaRepository<MvProductRankMonthly, Long> {
+ List<MvProductRankMonthly> findAllByVersionOrderByRankingAsc(long version, Pageable pageable);
+}MvRankingRepositoryImpl의 getWeeklyTopN(), getMonthlyTopN() 메서드도 함께 수정하여 RankingVersionManager에서 현재 활성 버전을 주입받아 쿼리를 호출해야 한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-api/src/main/java/com/loopers/infrastructure/ranking/MvProductRankMonthlyJpaRepository.java`
around lines 9 - 11, The repository method findAllByOrderByRankingAsc(Pageable)
in MvProductRankMonthlyJpaRepository (and the analogous method in
MvProductRankWeeklyJpaRepository) is missing an active version filter; update
these JPA query methods to accept a version parameter and filter on the version
column (e.g., findAllByVersionAndOrderByRankingAsc) and ensure
MvRankingRepositoryImpl.getMonthlyTopN() and getWeeklyTopN() obtain the current
active version from RankingVersionManager and pass it into the repository call;
finally add an integration test that inserts rows for both old and new versions
concurrently and asserts that reads return only rows for the single active
version to prevent mixed-version results.
| @Slf4j | ||
| @StepScope | ||
| @ConditionalOnProperty(name = "spring.batch.job.name", havingValue = MonthlyRankingJobConfig.JOB_NAME) | ||
| @RequiredArgsConstructor | ||
| @Component | ||
| public class MonthlyClearOldVersionTasklet implements Tasklet { |
There was a problem hiding this comment.
in-memory 버전 매니저의 멀티인스턴스·재기동 내성을 확인한다.
PR 설명에 명시된 대로 RankingVersionManager가 AtomicLong 기반이면, 배치 인스턴스가 재기동되거나 API/Batch가 서로 다른 JVM에서 동작할 경우 버전이 초기화되거나 어긋난다. 이는 "무중단 스왑"이라는 설계 목적 자체를 무너뜨리므로, 단기라도 다음 중 하나를 적용해야 한다.
- DB 기반 버전 테이블(활성/후보 버전 레코드)
- Redis
INCR기반 버전 - 최소한 기동 시 DB
MAX(version)+1로 워밍업
추가 테스트: 컨텍스트 재기동 후 이전 버전과의 충돌/중복 적재가 없는지 통합 테스트에서 검증한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyClearOldVersionTasklet.java`
around lines 16 - 21, The in-memory RankingVersionManager currently using an
AtomicLong is not multi-instance or restart-safe: update RankingVersionManager
(and/or MonthlyClearOldVersionTasklet) to persist or seed the version from a
shared store—either implement a DB-backed version table (active/candidate
records) or use Redis INCR for atomic global increments, or, at minimum, on
startup set the AtomicLong to (SELECT MAX(version)+1 FROM
<ranking_version_table>) to warm up the in-memory counter; ensure the logic
lives in the RankingVersionManager initialization path and add integration tests
that restart the application/context to validate there is no version collision
or duplicate ingestion across JVMs.
| @StepScope | ||
| @ConditionalOnProperty(name = "spring.batch.job.name", havingValue = MonthlyRankingJobConfig.JOB_NAME) | ||
| @Component | ||
| public class MonthlyRankingProcessor implements ItemProcessor<ProductMetricsAggregation, ProductRankMonthly>, StepExecutionListener { | ||
|
|
||
| private final AtomicInteger rankCounter = new AtomicInteger(0); | ||
| private long nextVersion; | ||
|
|
||
| @Value("#{jobParameters['requestDate']}") | ||
| private LocalDate requestDate; | ||
|
|
||
| @Override | ||
| public void beforeStep(StepExecution stepExecution) { | ||
| this.nextVersion = stepExecution.getJobExecution() | ||
| .getExecutionContext() | ||
| .getLong("nextMonthlyVersion"); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Step 구성에서 processor가 listener로 등록되는지 확인한다.
fd -t f 'MonthlyRankingJobConfig.java' | xargs -I{} sed -n '1,250p' {}
echo '---'
fd -t f 'WeeklyRankingJobConfig.java' | xargs -I{} sed -n '1,250p' {}Repository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 7940
🏁 Script executed:
fd -t f 'MonthlyRankingProcessor.java' | xargs cat -nRepository: Loopers-dev-lab/loop-pack-be-l2-vol3-java
Length of output: 2379
MonthlyRankingProcessor가 Step에 listener로 명시 등록되지 않으면 nextVersion=0으로 데이터가 적재된다.
MonthlyRankingJobConfig.aggregateStep()에서 processor는 .processor(processor)로만 등록되고 .listener()로는 등록되지 않는다. Spring Batch에서 .processor()는 ItemProcessor 인터페이스만 작동하며, StepExecutionListener의 beforeStep()은 호출되지 않는다. 따라서 nextVersion은 선언부의 기본값 0으로 유지되어 모든 월간 랭킹이 version=0으로 적재된다. activate tasklet이 swap할 때 원하지 않는 버전이 노출되거나 unique 제약 위반으로 Job이 실패하므로, 운영상 첫 배치 실행 시 전량 오염되는 치명적 케이스다.
수정안:
MonthlyRankingJobConfig.aggregateStep()에서.listener((Object) processor)또는.listener(processor)추가 등록beforeStep()에서nextMonthlyVersion부재 시IllegalStateException발생하도록 명시적 실패 처리 추가- E2E 테스트에 activate된 버전의 실제 레코드 수를 검증하는 assertion 추가
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-batch/src/main/java/com/loopers/batch/job/ranking/step/MonthlyRankingProcessor.java`
around lines 17 - 33, The processor's StepExecutionListener.beforeStep is never
invoked because MonthlyRankingJobConfig.aggregateStep registers the processor
only via .processor(processor); update aggregateStep to also register the
listener (e.g., .listener((Object) processor) or .listener(processor)) so
MonthlyRankingProcessor.beforeStep runs and sets nextVersion; additionally, in
MonthlyRankingProcessor.beforeStep check for presence of "nextMonthlyVersion" in
the StepExecution executionContext and throw an IllegalStateException if missing
instead of leaving nextVersion as 0; finally add an E2E test assertion that
verifies the activated monthly version's record count matches expected to catch
this regression.
| /** | ||
| * @Version 낙관적 락 충돌 시 최대 MAX_RETRY 재시도. | ||
| * 동시 업데이트로 version이 맞지 않으면 재조회 후 재시도한다. | ||
| */ | ||
| private void processWithRetry(ConsumerRecord<String, String> record) { | ||
| for (int attempt = 1; attempt <= MAX_RETRY; attempt++) { | ||
| try { | ||
| processRecord(record); | ||
| return; | ||
| } catch (ObjectOptimisticLockingFailureException e) { | ||
| log.warn("낙관적 락 충돌 (attempt {}/{}): offset={}", attempt, MAX_RETRY, record.offset()); | ||
| if (attempt == MAX_RETRY) { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
낙관적 락 재시도 패턴이 자기 호출로 완전히 무력화된다.
processWithRetry → this.processRecord 호출은 프록시를 통하지 않아 @Transactional이 적용되지 않는다. 결과적으로:
- 각 시도가 새 트랜잭션/새 영속성 컨텍스트에서 실행되지 않아
ObjectOptimisticLockingFailureException이후 재시도해도 같은 1차 캐시의 스테일 엔티티로 다시 저장을 시도하게 되어 재시도의 의미가 사라진다. - 실제 DB 반영이 트랜잭션 없이 auto-commit으로 흩어져,
ProductMetrics/ProductMetricsDaily/EventHandled간 정합성이 깨질 수 있다(예: 카운트 증가는 됐는데EventHandled가 안 들어가 재소비 시 이중 증가). - 추가로 재시도 간 백오프가 없어, 경합이 심해지면 짧은 시간 안에 MAX_RETRY를 소진해 곧바로 DLQ로 빠진다.
수정안: 처리 로직을 별도 @Transactional(propagation=REQUIRES_NEW) 빈으로 분리하고, 재시도마다 해당 빈의 public 메서드를 호출하여 프록시를 경유시키며, 시도 간 지수 백오프(예: 20ms·80ms·240ms)를 적용한다. 테스트는 (a) 동일 상품에 대해 LIKED 이벤트 2건을 병렬 처리해 실제 재시도가 발생하고 최종 카운트가 +2 인지, (b) MAX_RETRY 도달 시 DLT로 넘어가는지 검증해야 한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CatalogEventConsumer.java`
around lines 65 - 81, processWithRetry currently calls this.processRecord
directly so the `@Transactional` proxy on processRecord is bypassed, nullifying
retry semantics and causing stale persistence context, inconsistent writes to
ProductMetrics/ProductMetricsDaily/EventHandled, and immediate MAX_RETRY
exhaustion; refactor by extracting the core processing into a separate
`@Component` bean with a public method annotated `@Transactional`(propagation =
REQUIRES_NEW) (e.g., CatalogEventProcessor.processRecordPublic) and change
processWithRetry to invoke that bean's public method through the proxy on each
attempt, add exponential backoff between attempts (e.g., 20ms, 80ms, 240ms), and
add tests that (a) run two parallel LIKED events for the same product to force
optimistic lock retries and verify final count is +2 and (b) confirm behavior
when MAX_RETRY is reached sends the record to the DLT.
| public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { | ||
| for (ConsumerRecord<String, String> record : records) { | ||
| try { | ||
| processRecord(record); | ||
| } catch (Exception e) { | ||
| log.error("coupon-issue-requests 처리 실패: offset={}, error={}", record.offset(), e.getMessage(), e); | ||
| } | ||
| } | ||
| ack.acknowledge(); | ||
| } | ||
|
|
||
| @Transactional | ||
| public void processRecord(ConsumerRecord<String, String> record) { |
There was a problem hiding this comment.
@Transactional이 자기 호출(self-invocation)로 무효화되고 있다.
consume(...)은 프록시를 통해 호출되지만, 내부에서 this.processRecord(record)를 직접 호출하므로 Spring AOP 프록시가 개입하지 못하여 processRecord의 @Transactional이 적용되지 않는다. 결과적으로 멱등 저장·쿠폰 발급·EventHandled 저장이 단일 트랜잭션으로 묶이지 않아, incrementIssuedCount 이후 couponIssueRepository.save가 실패하더라도 쿠폰 수량은 이미 증가한 상태로 커밋될 수 있다. 또한 findByIdForUpdate의 비관적 락도 트랜잭션이 없으므로 의미를 잃는다. 운영 환경에서 재고 초과 발급·데이터 정합성 깨짐이 발생한다.
수정안은 처리 로직을 별도 빈(CouponIssueHandler)으로 분리하여 주입받거나, 동일 클래스 내라면 ApplicationContext로 프록시를 받아 호출하는 방식이다. 테스트는 (1) processRecord 중간 예외 발생 시 couponRepository.issuedCount가 롤백되는지, (2) 동시 발급 요청에 대해 findByIdForUpdate 락이 실제로 동작하는지 확인하는 통합 테스트를 추가해야 한다.
🐛 제안 수정(분리 빈)
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class CouponIssueHandler {
+ private final CouponJpaRepository couponRepository;
+ private final CouponIssueJpaRepository couponIssueRepository;
+ private final EventHandledJpaRepository eventHandledRepository;
+
+ `@Transactional`
+ public void handle(String eventId, Long couponId, Long userId) { /* ... */ }
+} `@Component`
public class CouponIssueConsumer {
- private final CouponJpaRepository couponRepository;
- private final CouponIssueJpaRepository couponIssueRepository;
- private final EventHandledJpaRepository eventHandledRepository;
+ private final CouponIssueHandler handler;
private final ObjectMapper objectMapper;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CouponIssueConsumer.java`
around lines 39 - 51, 현재 CouponIssueConsumer.processRecord(...)에 붙은
`@Transactional이` 자기 호출로 무효화되어 트랜잭션과 비관적 락이 적용되지 않으므로, processRecord 로직을 별도
빈(CouponIssueHandler 같은 클래스)으로 옮기고 그 빈을 CouponIssueConsumer에 주입받아
consumer.consume(...)에서 handler.processRecord(record)를 호출하도록 변경하세요; 또는 같은 클래스
내에서 ApplicationContext.getBean(CouponIssueConsumer.class).processRecord(record)
방식으로 프록시를 통해 호출하도록 변경해 트랜잭션 프록시가 적용되게 하세요. 또한 새 빈(CouponIssueHandler) 또는 프록시 호출
지점에서 멱등 처리·incrementIssuedCount·couponIssueRepository.save·EventHandled 저장이 단일
트랜잭션에 묶이도록 유지하고, 통합 테스트를 추가해(processRecord 도중 예외 발생 시 issuedCount 롤백 확인, 동시 발급 시
findByIdForUpdate의 락 동작 확인) 수정이 올바른지 검증하세요.
| // 발급 처리 | ||
| try { | ||
| coupon.incrementIssuedCount(); | ||
| couponRepository.save(coupon); | ||
|
|
||
| CouponIssueEntity issue = new CouponIssueEntity(couponId, userId, coupon.getExpiredAt()); | ||
| couponIssueRepository.save(issue); | ||
|
|
||
| eventHandledRepository.save(new EventHandled(eventId, "COUPON_ISSUE_REQUESTED")); | ||
| log.info("쿠폰 발급 성공: couponId={}, userId={}", couponId, userId); | ||
| } catch (DataIntegrityViolationException e) { | ||
| log.info("쿠폰 중복 발급 방지 (UK): couponId={}, userId={}", couponId, userId); | ||
| eventHandledRepository.save(new EventHandled(eventId, "COUPON_ISSUE_REQUESTED")); | ||
| } |
There was a problem hiding this comment.
같은 트랜잭션 내에서 DataIntegrityViolationException 캐치 후 save 호출은 커밋되지 않는다.
(@Transactional이 정상 적용된다는 전제에서) DB 제약 위반이 발생한 시점에 해당 트랜잭션은 rollback-only 로 마킹되므로, 같은 트랜잭션 안에서 eventHandledRepository.save(...)를 호출해도 커밋 시점에 UnexpectedRollbackException이 발생하며 EventHandled는 저장되지 않는다. 이 경우 해당 이벤트가 재소비될 때 멱등 테이블에 기록이 없어 중복 처리 로직이 다시 돌게 되고, 운영 관점에서 UK 충돌 로그와 재시도가 반복적으로 쌓인다.
수정안은 UK 충돌 기록용 EventHandled 저장을 REQUIRES_NEW로 분리한 별도 서비스에 위임하거나, 선(先)-존재확인 흐름에서 잡아내도록 하는 것이다. 테스트는 CouponIssueEntity에 UK 충돌을 강제로 유발한 뒤 event_handled가 실제로 커밋되는지 검증하는 통합 테스트가 필요하다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/CouponIssueConsumer.java`
around lines 100 - 113, The DataIntegrityViolationException occurs after the
transaction is marked rollback-only so eventHandledRepository.save(...) inside
the same transaction (in CouponIssueConsumer) will not commit; extract the
EventHandled persistence into a separate transactional boundary with propagation
REQUIRES_NEW (e.g., create a new service method like
EventHandledService.saveAsNewTransaction(EventHandled) annotated with
`@Transactional`(propagation = Propagation.REQUIRES_NEW)) and call that from both
the success path and the catch(DataIntegrityViolationException) path in
CouponIssueConsumer instead of calling eventHandledRepository.save(...)
directly; also add an integration test that forces a UK conflict on
CouponIssueEntity and verifies EventHandled is actually committed.
| public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { | ||
| for (ConsumerRecord<String, String> record : records) { | ||
| try { | ||
| processRecord(record); | ||
| } catch (Exception e) { | ||
| log.error("order-events 처리 실패 → DLQ 전송: offset={}, error={}", record.offset(), e.getMessage(), e); | ||
| sendToDlq(record, e); | ||
| } | ||
| } | ||
| ack.acknowledge(); | ||
| } | ||
|
|
||
| @Transactional | ||
| public void processRecord(ConsumerRecord<String, String> record) { | ||
| String eventId = getHeader(record, "X-Event-Id"); | ||
| String eventType = getHeader(record, "X-Event-Type"); | ||
|
|
||
| if (eventId == null || eventType == null) { | ||
| log.warn("이벤트 헤더 누락: offset={}", record.offset()); | ||
| return; | ||
| } | ||
|
|
||
| if (eventHandledRepository.existsById(eventId)) { | ||
| log.debug("이미 처리된 이벤트: eventId={}", eventId); | ||
| return; | ||
| } | ||
|
|
||
| JsonNode envelope = parsePayload(record.value()); | ||
| if (envelope == null) return; | ||
|
|
||
| JsonNode data = envelope.get("data"); | ||
|
|
||
| switch (eventType) { | ||
| case "ORDER_PLACED" -> handleOrderPlaced(data); | ||
| case "PAYMENT_COMPLETED" -> handlePaymentCompleted(data); | ||
| default -> log.warn("알 수 없는 이벤트 타입: {}", eventType); | ||
| } | ||
|
|
||
| eventHandledRepository.save(new EventHandled(eventId, eventType)); | ||
| } |
There was a problem hiding this comment.
@Transactional이 자기 호출로 무효화된다(동일 패턴, 반복).
CouponIssueConsumer와 동일하게 consume → this.processRecord는 프록시를 우회하므로 processRecord의 @Transactional이 적용되지 않는다. 현 구현에서는 Redis ZINCRBY(incrementScore)와 ProductMetricsDaily upsert, EventHandled 저장이 서로 다른 리소스/비트랜잭션에서 실행되어, DB 저장 실패 시에도 Redis 랭킹은 이미 증가한 채로 남아 랭킹이 과대 집계된다. 또한 같은 배치 내 동일 이벤트가 두 번 들어온 경우 existsById 체크와 save 사이 TX 격리가 없어 이중 처리 가능성이 있다.
수정안은 처리 로직을 별도 @Transactional 빈으로 분리하거나, DB 트랜잭션 커밋 이후에 Redis를 증가시키도록 순서를 재배치(트랜잭션 커밋 후 리스너)하는 것이다. 테스트는 DB 저장 실패를 주입했을 때 Redis 스코어가 증가하지 않는지, 동일 eventId를 배치 내 두 번 넣었을 때 메트릭이 한 번만 반영되는지 확인해야 한다.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@apps/commerce-streamer/src/main/java/com/loopers/interfaces/consumer/OrderEventConsumer.java`
around lines 48 - 87, The processRecord method is currently annotated
`@Transactional` but is invoked directly from consume (this.processRecord), so the
transaction is not applied; separate the transactional work into a
Spring-managed bean/method and call it via the proxy (or move DB-related logic
into a new `@Service` with a method like transactionalProcessRecord) so that the
DB operations (existsById, upsert ProductMetricsDaily, save EventHandled) run
inside a single transaction; ensure Redis incrementScore is executed only after
the DB transaction commits (either by moving Redis logic out of the
transactional bean to be invoked after the proxy call returns or by publishing
an event/using TransactionSynchronizationManager.afterCommit); update consume to
call the proxied transactional bean (e.g., transactionalProcessRecord) and
adjust sendToDlq usage accordingly; add tests that simulate DB save failure to
assert Redis ZINCRBY is not called and a duplicate eventId in the same batch is
only applied once.
📌 Summary
배경: volume-9에서 Redis Sorted Set 기반의 일간 랭킹 시스템을 구축했지만, 주간·월간처럼 긴 시간 범위의 랭킹은 Redis에 적재하기 비효율적이었습니다. Redis ZSET은 TTL이 짧은 실시간 데이터에 적합하지만, 7일·30일 단위 집계는 배치로 사전 계산하여 DB의 Materialized View에 적재하는 것이 합리적이라고 판단했습니다.
목표: Spring Batch 를 활용하여
product_metrics_daily테이블의 일별 집계를 주간(7일)·월간(30일) 단위로 롤업하고, MV 테이블(mv_product_rank_weekly/mv_product_rank_monthly)에 적재합니다. 버전 기반 스왑 전략으로 조회 무중단을 확보하고, 단일 엔드포인트에서period파라미터로 일간·주간·월간을 분기합니다.결과:
weeklyRankingJob,monthlyRankingJob두 개의 Spring Batch Job을 추가했으며,GET /api/v1/rankings?period=WEEKLY|MONTHLY로 조회 가능합니다. 일간(Redis ZSET) + 주간·월간(DB MV) 이원화 구조가 완성되었고, 랭킹 집계 점수 계산 로직(RankingScoreCalculator)을modules/jpa공유 모듈로 이동하여 api/batch/streamer 3개 앱이 동일 공식을 사용합니다.🧭 Context & Decision
1. 왜 Spring Batch인가? —
@Scheduled로는 부족한 지점@ScheduledJobInstance+RunIdIncrementer현재 데이터 규모에서는
@Scheduled로도 돌아가지만, 상품 수가 늘어나면 Chunk 기반 처리 + 재개 + 모니터링이 필요해질 것으로 보았습니다.commerce-batch앱이 이미 분리되어 있고JobListener,StepMonitorListener,ChunkListener공통 인프라가 있어 Batch로 일관되게 실었습니다.2. Chunk vs Tasklet — 하이브리드 3-Step 선택
예시 구현처럼 "단일 Tasklet + Upsert" 도 가능했지만, 버전 기반 스왑 전략을 선택하면서 3-Step 하이브리드로 분리했습니다.
핵심 판단 근거: 집계는 상품 수에 비례하는 반복이므로 Chunk가 적합하다고 보았습니다. 반면 구버전 정리/활성화는 1회성 관리 작업이라 Tasklet이 자연스러웠습니다. 두 종류를 같은 Job 안에 조립할 수 있다는 점이 Spring Batch의 강점이라고 생각합니다.
3. Materialized View — MySQL 한계와 버전 기반 스왑
MySQL은
CREATE MATERIALIZED VIEW를 지원하지 않으므로 일반 테이블 + 배치 갱신으로 구현합니다. Refresh 전략을 고민했습니다.ON DUPLICATE KEY UPDATE)version컬럼 + Step 재실행 가능버전 스왑 동작
ProductRankWeeklyJpaRepository.deleteByVersion(nextVersion)으로 작업 중이던 다음 버전 찌꺼기 제거nextVersion값으로 새 랭킹 row 들을 INSERT (읽기 쿼리는 여전히currentVersion만 봄)RankingVersionManager.activateWeeklyVersion(nextVersion)으로 포인터 교체 → 과거 버전 DELETE조회 API는 현재 모든 row 를 읽지만, Activate 직후
deleteByVersion(oldVersion)으로 구 버전이 제거되므로 결과적으로 활성 버전 row 만 남습니다.4.
product_metrics_daily— 시간 단위 → 일 단위 집계 테이블기존 streamer 쪽에는 메트릭이 일 단위로 없었습니다. 주간 집계 시 한 상품당 최대 7행, 월간 30행으로 읽도록
ProductMetricsDaily엔티티를modules/jpa공유 모듈에 신설했습니다.CatalogEventConsumer,OrderEventConsumer가 상품 조회/좋아요/판매 이벤트를 받아 일자별로 누적ProductMetricsDailyBatchRepository.findByDateRange(start, end)로 조회ProductMetricsDailyId(productId, metricDate)복합키5.
RankingScoreCalculator공유 모듈로 이동volume-9의 랭킹 점수 계산기가
commerce-streamer안에만 있었습니다. 이번에 batch 쪽에서도 동일 공식이 필요해modules/jpa로 이동하여 api/batch/streamer 3앱이 단일 구현을 공유합니다. (공식:view×0.1 + like×0.2 + order×0.7)6. API — 단일 엔드포인트 +
periodenum경로 분기(
/rankings/weekly,/rankings/monthly) 대신 단일GET /api/v1/rankings+period=DAILY|WEEKLY|MONTHLY을 택했습니다. 호출자 입장에서 같은 응답 스키마를 받고, Service 레이어(RankingService) 내부에서만 Redis / MV Repository 라우팅이 일어납니다. 추후HOURLY,REALTIME등 확장이 필요해도 enum 만 늘리면 됩니다.🏗️ Design Overview
변경 범위
RankingService라우팅 확장, API enum 추가CatalogEventConsumer/OrderEventConsumer에서 일별 지표(ProductMetricsDaily) 누적ProductMetricsDaily,RankingScoreCalculator이전 + 단위 테스트신규/변경 컴포넌트
modules/jpa (공유)
ProductMetricsDaily,ProductMetricsDailyId— 일별 상품 지표 엔티티 (view/like/sale count)RankingScoreCalculator— 점수 공식 (streamer 전용 → 공유로 이전)commerce-batch — Domain / Infrastructure
ProductMetricsAggregation— Reader → Processor 사이 DTOProductRankWeekly/ProductRankMonthly— MV 테이블 쓰기용 엔티티 (view/like/sale count + version 포함)RankingVersionManager— 활성 버전 관리 (in-memoryAtomicLong)ProductMetricsDailyBatchRepository— 기간별 일 지표 조회ProductRankWeeklyJpaRepository/ProductRankMonthlyJpaRepository— MV 저장 +deleteByVersioncommerce-batch — Batch Layer
WeeklyRankingJobConfig/MonthlyRankingJobConfig—@ConditionalOnProperty(spring.batch.job.name=...)로 Job 선택,RunIdIncrementer+requestDateJobParameterWeeklyRankingReader/MonthlyRankingReader— 기간 내ProductMetricsDaily를 읽어 상품별 in-memory 합산 → 점수 계산 → 점수 내림차순 IteratorWeeklyRankingProcessor/MonthlyRankingProcessor— 순위(ranking) 부여 +ProductRankWeekly/Monthly변환WeeklyRankingWriter/MonthlyRankingWriter— Chunk 단위 JPA 저장ClearOldVersionTasklet/MonthlyClearOldVersionTasklet— 다음 버전 찌꺼기 제거 +ExecutionContext에nextWeeklyVersion저장ActivateVersionTasklet/MonthlyActivateVersionTasklet— 버전 포인터 교체 + 구 버전 DELETEcommerce-api — Domain / Application / Interfaces
MvProductRankWeekly/MvProductRankMonthly— 같은 MV 테이블의 읽기 전용 엔티티MvRankingRepository/MvRankingRepositoryImpl—rankingASC 페이징 조회RankingPeriodenum —DAILY/WEEKLY/MONTHLYRankingService#getTopRankings—switch(period)로 Redis(RankingRepository) vs DB(MvRankingRepository) 라우팅RankingV1Controller—GetMapping에periodRequestParam 추가 (기본DAILY)commerce-streamer
CatalogEventConsumer/OrderEventConsumer— 기존 시간 단위 지표 외에ProductMetricsDailyupsert 추가주요 책임 요약
WeeklyRankingReaderrequestDate기준 직전 7일의ProductMetricsDaily를 상품별 합산 + 점수 계산 후 Iterator 반환WeeklyRankingProcessorProductMetricsAggregation→ProductRankWeekly변환 (순위 부여)WeeklyRankingWriterActivateVersionTaskletRankingVersionManager포인터 교체 + 구 버전 row DELETERankingServiceperiod에 따라 Redis ZSET / MV Repository 라우팅MvRankingRepositoryImplrankingASC 페이징 조회 후RankingEntry변환RankingScoreCalculator🔁 Flow Diagram
배치 실행 흐름 —
weeklyRankingJob(monthlyRankingJob도 대칭)API 조회 흐름 —
GET /api/v1/rankings?period=...mv_product_rank_weekly테이블을 api 쪽MvProductRankWeekly(읽기 전용, 축약 컬럼) 와 batch 쪽ProductRankWeekly(view/like/sale count + version 포함, 쓰기용) 가 각각 매핑합니다. 스키마 변경 시 두 곳을 동시에 반영해야 하므로 공유 모듈로의 추출이 후속 과제로 남아있습니다.RankingVersionManager가 in-memoryAtomicLong— 배치 JVM 이 재기동되면currentVersion = 0으로 리셋됩니다. 현 구현에서는 Activate Step 종료 시 구 버전을 DELETE 하므로 테이블엔 활성 버전 row 만 남고, api 는 전체를 읽어 우회되고 있습니다. 멀티 인스턴스 배치나 재기동 내성이 필요하면 DB 저장형 version 으로 승격이 필요해 보입니다.ProductMetricsDaily를 기간 전체로findByDateRange해 HashMap 에 합산한 뒤 Iterator 반환. 상품 수 × 일수 만큼 메모리 사용. Chunk 의 장점(스트리밍 처리)이 이 지점에서는 희석되는 면이 있습니다. 대규모에서는 SQL GROUP BY + Pageable Reader 로 전환이 정석이라고 보고 있습니다.mv_product_rank_weekly/monthly,product_metrics_daily테이블 DDL 이 Flyway/Liquibase 로 버저닝돼 있지 않습니다. 환경 배포 순서 정립이 필요합니다.@ConditionalOnProperty(spring.batch.job.name=...)+ CLI 파라미터로만 실행됩니다. 정기 스케줄은 외부 워크플로우(Airflow / K8s CronJob 등) 에 위임하는 것을 전제로 했습니다.✅ Checklist
🧱 Spring Batch
WeeklyRankingJobConfig,MonthlyRankingJobConfig에서@ConditionalOnProperty(spring.batch.job.name=...)로 Job 선택,requestDateJobParameter 로 집계 기준일 전달,RunIdIncrementer로 재실행 허용Aggregate Step 은 Chunk(chunkSize=100), Clear/Activate Step 은 Tasklet 으로 각 작업 성격에 맞는 패턴 조합
mv_product_rank_weekly/mv_product_rank_monthly에version컬럼 + 버전 스왑 전략으로 조회 무중단·멱등성·롤백 여지 확보🧩 Ranking API
단일 엔드포인트
GET /api/v1/rankings?period=DAILY|WEEKLY|MONTHLY— 일간은 Redis ZSET, 주간·월간은 MV 테이블.RankingService#getTopRankings의switch(period)로 라우팅🧪 검증
RankingScoreCalculatorTest,ProductMetricsDailyModelTest,ProductRankWeeklyModelTest(점수 공식·엔티티 유효성 검증)WeeklyRankingJobE2ETest,MonthlyRankingJobE2ETest(@SpringBatchTest, 실제 DB 집계 + 버전 스왑 검증)개인적으로 직접 구현해본 배치 적채 처리 비교 프로젝트 관련 내용을 검토받고 싶어 추가했습니다.
📌 Summary
🧭 Context & Decision
문제 정의
java -jar runner.jar --size=N한 줄로 전 레벨 벤치마크가 실행되고, TPS/메모리/소요시간 비교 테이블이 출력됩니다.선택지와 결정
BatchBenchmark인터페이스를 정의하고, Spring의List<BatchBenchmark>자동 주입으로 모든 구현체를 순차 실행합니다.🏗️ Design Overview
프로젝트 구조
핵심 인터페이스
모든 레벨이 이 인터페이스를 구현하며,
BenchmarkApplication이List<BatchBenchmark>를 주입받아 순차 실행합니다.주요 컴포넌트 책임
OrderDataGeneratorSchemaInitializerBenchmarkRunnerConsoleReporterHtmlReporter레벨별 기술 스택
addBatch/executeBatchgetGeneratedKeys()JdbcTemplate.batchUpdateBatchPreparedStatementSetter,MAX(id)방식EntityManager.persistflush()/clear()패턴, Cascade insert@Scheduled구조Job/StepchunkLAST_INSERT_ID()PartitionerSimpleAsyncTaskExecutor, gridSize=4@KafkaListener(batchListener=true),@Profile("with-kafka")ForkJoinPool병렬🔁 Flow Diagram
Main Flow
sequenceDiagram autonumber participant Runner as BenchmarkApplication participant Schema as SchemaInitializer participant Bench as BatchBenchmark (L0~L7) participant DB as MySQL 8.0 participant Reporter as Console/HtmlReporter Runner->>Schema: createTables() Runner->>Runner: generate(N) 테스트 데이터 생성 loop 각 벤치마크 레벨 Runner->>Schema: truncateTables() (DELETE + AUTO_INCREMENT) Runner->>Bench: runInsert(data) Bench->>DB: batch INSERT (orders + order_item) DB-->>Bench: 완료 Runner->>Bench: runAggregate() Bench->>DB: INSERT INTO daily_sales_summary SELECT ... GROUP BY DB-->>Bench: 완료 Bench-->>Runner: BenchmarkResult (시간, TPS, 메모리) end Runner->>Reporter: print(insertResults, aggregateResults) Reporter-->>Runner: 콘솔 ASCII 테이블 + HTML 차트 리포트Error Handling Flow
flowchart TD A[벤치마크 루프 시작] --> B{benchmark.runInsert} B -->|성공| C[runAggregate 실행] B -->|Exception| D["[Failed] 로그 출력 → 다음 레벨로"] C -->|성공| E["[Done] 결과 수집"] C -->|Exception| D D --> F[다음 벤치마크] E --> F F --> B📊 벤치마크 결과 (10,000건 Bulk Insert)
🧪 추가 실험 — 100,000건 3회 반복 측정
구현 완료 후 실제로 돌려보며 스케일업 시 성능 변화를 확인해 보았습니다. 100,000건을 3회 반복 실행하여 중앙값(median)을 측정했습니다.
추가 튜닝 환경
결과 (100K, median of 3)
스케일업 시 순위가 뒤집혔습니다
10K에서는 L7(ForkJoinPool)이 71,942 TPS로 1위였지만, 100K에서는 L4(Spring Batch Chunk)가 289,017 TPS로 역전하는 결과가 나왔습니다.
Spring Batch의 Job/Step 초기화 비용이 소규모(10K)에서는 전체 시간의 상당 부분을 차지하지만, 대규모(100K)에서는 상각되면서 chunk 처리의 효율이 드러난 것으로 보입니다. (이 해석이 맞는지 검토 부탁드립니다.)
안정성 기록
@TransactionalAOP proxy 미작용 추정, 원인 규명은 후속 과제)재현 방법
사전 준비
1. 프로젝트 클론 및 빌드
git clone https://github.com/Praesentia-YKM/batch-benchmark-java.git cd batch-benchmark-java ./gradlew :runner:bootJar2. 인프라 기동
3. 기본 실행 (10K, L0~L5 + L7)
4. Kafka 포함 실행 (L6 추가)
5. 대규모 테스트 (100K, MySQL 튜닝 적용)
6. 인프라 정리
💡 스스로 내려본 결론
트레이드 오프
이번 벤치마크에서 얻은 인상은 데이터 규모와 운영 요구사항에 따라 적정 기술이 달라진다는 점이었습니다. 스케일에 따라 최적 기술이 바뀔 수 있다는 걸 수치로 확인한 것이 가장 큰 수확이었습니다.
시나리오별 추천
결론
JPA는 대량 삽입에 신중히 쓰는 편이 나아 보입니다
"병렬 = 빠르다"는 항상 성립하지는 않는 듯합니다
스케일에 따라 적정 기술이 바뀔 수 있습니다
🔍 리뷰 포인트
1. 구조적으로는 허술하지만 운영 흐름상 문제 없는 설계를 해야할 때 트레이드오프를 지어내려가는 과정이 궁금합니다.
과제를 하면서
RankingVersionManager의 in-memoryAtomicLong— 어디까지 허용 가능한가 에 대한 트레이드 오프를 고민했습니다.활성 버전을 배치 JVM 의
AtomicLong로 관리합니다. 배치 재기동 시currentVersion = 0으로 휘발되지만, Activate Step 에서 구 버전을 DELETE 하므로 "테이블엔 활성 버전 row 만 남는다" 는 전제로 API 는 전체 조회로 우회하도록 설계했습니다.전제가 깨지는 시나리오
대안
ranking_versionDB 메타 테이블질문
과제를 하면서 "휘발성 in-memory + DELETE 로 테이블 정돈" 전략을 선택했는데 이러한 구현은 어쩌면 규모에 맞는 편의적 단순화라는 생각이 들었습니다. 멘토님께서는 실무에서 이런 "구조적으로는 허술하지만 운영 흐름상 문제 없는 설계" 를 해야할때 스스로 내리는 지표나 문서가 있는지 궁금합니다.
2. MV 테이블의 이중 엔티티 매핑을 진행하면 CQRS 분리로 적합한 분리라고 볼 수 있는지 아니면 과한 분리인지 판단 기준이 궁금합니다.
같은
mv_product_rank_weekly테이블을 두 앱이 각자 매핑합니다.MvProductRankWeeklyProductRankWeekly공유 모듈(
modules/jpa) 이전이 자연스러워 보임에도 안 한 이유현 방식의 비용
질문
이 분리가 CQRS 의 자연스러운 물리 표현으로 읽어야 할지, 아니면 공유 가능한 것을 중복 관리하는 기술 부채로 읽어야 할지 경계가 헷갈립니다. 실무에선 보통 어느 쪽으로 판단하시는지 궁금합니다.
3. (학습프로젝트에 대한 질문깃허브 링크) 배치 데이터 적재 성능 측정 방식의 테스트 방식 질문
현재 측정 방식 요약
BenchmarkApplication.javaList<BatchBenchmark>자동 주입 후 순차 실행42L1회 생성, 모든 레벨 공유DELETE FROM ...+ALTER TABLE ... AUTO_INCREMENT = 1SchemaInitializer.truncateTablesrun-100k-batch.sh-Xmx2g -XX:+UseG1GCtry { ... } catch (Exception e) { System.out.printf("[Failed]") }Runtime.totalMemory() - freeMemory()BenchmarkRunner<테스트 설계 관련 질문>
멘토님이시라면 이런 상황에서 아래 세 가지를 어떻게 가져가실지 거 같으신가요?
<기술 적합성 판단 방법론 관련 질문>
저는 직접 해보지 않으면 이해가 잘 안 되는 타입이라, 기술 검토 시 항상 실제 구현 테스트를 해보는 편인데요. 관련해서 궁금증이 생겼습니다. 비교군 테스트를 어떻게 해보시는 편이신가요?
4. 제가 테스트한 결과에 대한 결론을 작성해보았습니다. 각 기술의 강점 + 트레이드오프에 대한 견해에 대한 검토를 받아보고 싶습니다!
강점·대가·적정 사용처 매트릭스
변경 목적
일간 랭킹(실시간 Redis ZSET)과 달리, 주간·월간 랭킹을 Spring Batch로 사전 계산하여 DB 기반 Materialized View에 적재하고, 버전 스왑 전략으로 조회 무중단을 보장. 동시에 이벤트 기반 아키텍처(ApplicationEvent → Outbox → Kafka)와 Redis 기반 큐 시스템을 확대 구현하여 주문 흐름 보호 및 메시지 전달 보장.
핵심 변경점
리스크/주의사항
테스트/검증 방법