perf(rdf): shard keyed-write ordering lock to cut submission contention#29679
perf(rdf): shard keyed-write ordering lock to cut submission contention#29679harshach wants to merge 1 commit into
Conversation
RdfUpdater.submitKeyedAsync chained per-entity RDF writes under a single global monitor over keyedWriteTails, so every entity CRUD / relationship write serialized its (brief) submission bookkeeping even for completely unrelated entities. Replace the global monitor with a bounded Guava Striped<Lock> (64 stripes): writes whose keys fall on disjoint stripes no longer contend, while writes sharing a key still serialize to preserve per-entity ordering. bulkGet acquires the stripes in canonical order so concurrent multi-key (from+to) relationship writes cannot deadlock. Cleanup runs lock-free — the compare-and-remove only clears a tail this write installed and only after it has completed, so ordering is preserved without a lock. Adds a high-contention deadlock-freedom stress test; the existing same-key ordering test still passes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
❌ PR checklist incompleteThis PR cannot be merged until the following are addressed on its linked issue:
The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically. Maintainers can bypass this check by adding the |
There was a problem hiding this comment.
Pull request overview
This PR reduces contention in RDF write submission by replacing the single global monitor used for keyed write-chaining with a bounded set of striped locks, preserving per-entity ordering while allowing unrelated entities’ submissions to proceed concurrently. It also adds a concurrency stress test intended to catch deadlock regressions in multi-key (relationship) submissions.
Changes:
- Replace global
synchronized (keyedWriteTails)with GuavaStriped<Lock>(64 stripes) to shard keyed-write tail chaining. - Make tail cleanup lock-free via
ConcurrentHashMap.remove(key, expectedValue)compare-and-remove semantics. - Add a high-contention multi-key concurrency test to validate deadlock-free lock acquisition ordering.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfUpdater.java | Shards keyed-write submission locking via Striped<Lock> and uses lock-free tail cleanup to reduce submission contention while preserving ordering guarantees. |
| openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfUpdaterTest.java | Adds a concurrency stress test for overlapping multi-key relationship writes to detect deadlock regressions. |
| ExecutorService submitters = Executors.newFixedThreadPool(12); | ||
| try { | ||
| for (int i = 0; i < submissions; i++) { | ||
| UUID from = ids.get(i % poolSize); | ||
| UUID to = ids.get((i * 5 + 1) % poolSize); | ||
| EntityRelationship rel = | ||
| new EntityRelationship() | ||
| .withFromId(from) | ||
| .withToId(to) | ||
| .withFromEntity(Entity.TABLE) | ||
| .withToEntity(Entity.TABLE) | ||
| .withRelationshipType(Relationship.CONTAINS); | ||
| submitters.submit(() -> RdfUpdater.addRelationship(rel)); | ||
| } | ||
| assertTrue( | ||
| completed.await(30, TimeUnit.SECONDS), | ||
| "all writes should drain — striped-lock acquisition must be deadlock-free"); | ||
| } finally { | ||
| submitters.shutdownNow(); | ||
| } |
| @Test | ||
| @DisplayName("concurrent overlapping multi-key writes drain without deadlock") | ||
| void concurrentMultiKeyWritesDoNotDeadlock() throws Exception { | ||
| int poolSize = 6; | ||
| List<UUID> ids = new ArrayList<>(); | ||
| for (int i = 0; i < poolSize; i++) { | ||
| ids.add(UUID.randomUUID()); | ||
| } | ||
|
|
||
| int submissions = 300; | ||
| CountDownLatch completed = new CountDownLatch(submissions); | ||
| doAnswer( | ||
| ignored -> { | ||
| completed.countDown(); | ||
| return null; |
There was a problem hiding this comment.
💡 Edge Case: Concurrency test asserts only deadlock-freedom, not ordering
The riskiest behavioral change in this PR is moving the tail-cleanup (keyedWriteTails.remove(key, next)) out from under the lock and making it a lock-free compare-and-remove. The new test concurrentMultiKeyWritesDoNotDeadlock only verifies that all 300 writes eventually drain (deadlock-freedom); it does not assert that per-entity submission ordering is still preserved when writes race with the lock-free cleanup. The existing writesForSameEntityIdAreSequenced test exercises ordering but only for two sequential submissions with no concurrent cleanup contention, so the specific interleaving this refactor introduces (a completing write removing its tail while another write on the same key is chaining) is not covered.
Consider adding an ordering assertion under contention: submit many writes that all share a single key from multiple threads and record the order in which the mocked repository method executes, asserting it matches submission order (or at least that no write runs concurrently with another for the same key). This turns the change's key invariant into a regression guard rather than relying on the deadlock timeout alone.
Was this helpful? React with 👍 / 👎
Code Review 👍 Approved with suggestions 0 resolved / 2 findingsReplaces the global monitor in 💡 Edge Case: Concurrency test asserts only deadlock-freedom, not ordering📄 openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfUpdaterTest.java:237-251 The riskiest behavioral change in this PR is moving the tail-cleanup ( Consider adding an ordering assertion under contention: submit many writes that all share a single key from multiple threads and record the order in which the mocked repository method executes, asserting it matches submission order (or at least that no write runs concurrently with another for the same key). This turns the change's key invariant into a regression guard rather than relying on the deadlock timeout alone. 💡 Quality: Shared static state not reset between concurrency tests📄 openmetadata-service/src/test/java/org/openmetadata/service/rdf/RdfUpdaterTest.java:239-253
The risk is low today (300 << 1000), but it makes the test order-dependent. Consider resetting the static counters/map via reflection in 🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change the behavior for this request:
Was this helpful? React with 👍 / 👎 | Gitar |
| void concurrentMultiKeyWritesDoNotDeadlock() throws Exception { | ||
| int poolSize = 6; | ||
| List<UUID> ids = new ArrayList<>(); | ||
| for (int i = 0; i < poolSize; i++) { | ||
| ids.add(UUID.randomUUID()); | ||
| } | ||
|
|
||
| int submissions = 300; | ||
| CountDownLatch completed = new CountDownLatch(submissions); | ||
| doAnswer( | ||
| ignored -> { | ||
| completed.countDown(); | ||
| return null; | ||
| }) | ||
| .when(mockRepository) |
There was a problem hiding this comment.
💡 Quality: Shared static state not reset between concurrency tests
RdfUpdater keeps process-wide static state (pendingWrites counter and the keyedWriteTails map) that is never reset in setUp/tearDown. The new test submits 300 writes and depends on none being dropped by the MAX_PENDING_RDF_WRITES = 1000 backpressure gate, and on the completed latch reaching zero. If a prior test in the suite leaves writes pending (e.g., submitted but not fully drained before its method returns), pendingWrites carries over and could, in principle, cause backpressure drops here — a dropped write never invokes mockRepository.addRelationship, so the latch would never reach zero and the test would fail via the 30s timeout rather than a clear assertion.
The risk is low today (300 << 1000), but it makes the test order-dependent. Consider resetting the static counters/map via reflection in setUp (as is already done for rdfRepository) so each test starts from a known state, and/or draining pending writes in tearDown.
Was this helpful? React with 👍 / 👎
| List<Lock> heldLocks = lockWriteStripes(writeKeys); | ||
| CompletableFuture<Void> next; | ||
| synchronized (keyedWriteTails) { | ||
| try { |
There was a problem hiding this comment.
Lock acquisition in
lockWriteStripes happens before the try-finally block that calls unlockWriteStripes. If an unexpected error were thrown mid-acquisition (e.g., lock.lock() raising a ThreadDeath or the ArrayList allocation failing with OOM), any locks already acquired would never be released, permanently deadlocking every future write whose key shares one of those stripes. Moving the acquisition inside the guard closes this window at no cost.
| List<Lock> heldLocks = lockWriteStripes(writeKeys); | |
| CompletableFuture<Void> next; | |
| synchronized (keyedWriteTails) { | |
| try { | |
| List<Lock> heldLocks = new ArrayList<>(); | |
| CompletableFuture<Void> next; | |
| try { | |
| for (Lock lock : writeLocks.bulkGet(writeKeys)) { | |
| lock.lock(); | |
| heldLocks.add(lock); | |
| } |
| } finally { | ||
| submitters.shutdownNow(); | ||
| } |
There was a problem hiding this comment.
shutdownNow() interrupts worker threads, but ReentrantLock.lock() is not interruptible — submission threads blocked waiting for a stripe lock will acquire it and continue executing after shutdownNow() returns. Those threads will then call RdfUpdater.addRelationship, which queues tasks onto AsyncService; by that point @AfterEach may have already restored or nulled rdfRepository, causing NPEs or cross-test mock interactions. Adding awaitTermination gives those threads time to drain before the test exits.
| } finally { | |
| submitters.shutdownNow(); | |
| } | |
| } finally { | |
| submitters.shutdownNow(); | |
| submitters.awaitTermination(5, TimeUnit.SECONDS); | |
| } |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
🔴 Playwright Results — 2 failure(s), 46 flaky✅ 4438 passed · ❌ 2 failed · 🟡 46 flaky · ⏭️ 37 skipped
Genuine Failures (failed on all attempts)❌
|



Describe your changes:
RdfUpdater.submitKeyedAsyncchains per-entity RDF writes so writes touching the same entity id run in submission order, but it did the tail lookup/chaining under a single global monitor (synchronized (keyedWriteTails)). Since every RDF write path supplies a write key, all entity CRUD / relationship submissions serialized their (short) bookkeeping section even for completely unrelated entities. This replaces the global monitor with a bounded GuavaStriped<Lock>(64 stripes): writes whose keys land on disjoint stripes no longer contend, while writes sharing a key still serialize so per-entity ordering is preserved.bulkGetreturns the stripe locks in a canonical order, so concurrent multi-key (from+to) relationship writes cannot deadlock; cleanup runs lock-free because the compare-and-remove only clears a tail this write installed, and only after it has completed. Follow-up to the keyed-write model added in #29368.Type of change:
High-level design:
The invariant is unchanged:
keyedWriteTails[key]holds the most recent write future touchingkey; a new write chains afterallOfof its keys' current tails and becomes their new tail. Only the concurrency control around that read-modify-write changed — from one global monitor to per-stripe locks acquired inbulkGetorder (deadlock-free for the ≤2-key relationship case). The actual Fuseki write already ran off-lock onAsyncService, so this only widens the submission path, not execution.Tests:
Unit tests
RdfUpdaterTest$KeyedWriteConcurrency— 300 overlapping multi-key writes across 12 threads must all drain; a broken lock order would hang and trip the timeout).writesForSameEntityIdAreSequenced) still passes.RdfUpdaterTesttests pass;mvn spotless:checkclean.Backend integration tests
Playwright (UI) tests
UI screen recording / screenshots:
Not applicable.
Checklist:
Fixes #<issue-number>above.🤖 Generated with Claude Code
Greptile Summary
This PR narrows submission-path contention in
RdfUpdater.submitKeyedAsyncby replacing a single globalsynchronized (keyedWriteTails)monitor with a 64-stripeGuava Striped<Lock>. Writes whose keys land on different stripes now proceed in parallel, while writes sharing a key still serialize in submission order; cleanup is made lock-free by relying onConcurrentHashMap.remove(key, next)as a conditional compare-and-remove.lockWriteStripesdelegates key→stripe mapping and canonical ordering toGuava.bulkGet, ensuring multi-key submissions always acquire stripes in the same ascending-index order and cannot deadlock; all tail map updates complete before any lock is released.whenCompleteremoves each key's tail without holding a lock — safe becauseConcurrentHashMap.remove(key, value)is atomic and only clears the entry if the stored value is still the future this write installed.Confidence Score: 4/5
Safe to merge; the concurrency model is correct and well-tested. Two small hardening gaps — one in production code, one in test cleanup — are worth addressing but do not affect the happy path.
The lock acquisition for
lockWriteStripessits outside thetry-finallyguard, leaving a theoretical window where a partially-acquired lock set could never be released if an error fires betweenlock()calls. The test'sshutdownNow()withoutawaitTerminationcan leave submission threads alive past@AfterEach, causing spurious side-effects in subsequent tests under slow CI.Both changed files warrant a second look:
RdfUpdater.javafor the lock-acquisition placement, andRdfUpdaterTest.javafor the executor shutdown pattern.Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant T1 as Thread T1 (W1: K1+K2) participant T2 as Thread T2 (W2: K1) participant SL as Striped<Lock> participant KWT as keyedWriteTails (CHM) participant AS as AsyncService T1->>SL: "bulkGet({K1,K2}) canonical order" T1->>SL: lock(S1) T1->>SL: lock(S2) T2->>SL: lock(S1) BLOCKS T1->>KWT: getOrDefault(K1,K2) prev tails T1->>AS: thenRunAsync(task1) futureW1 T1->>KWT: put(K1,futureW1) put(K2,futureW1) T1->>SL: unlock(S2) unlock(S1) T2->>SL: lock(S1) acquired T2->>KWT: getOrDefault(K1) futureW1 T2->>AS: thenRunAsync(task2) futureW2 T2->>KWT: put(K1,futureW2) T2->>SL: unlock(S1) AS-->>KWT: futureW1 completes remove(K1,futureW1) no-op AS-->>KWT: futureW2 completes remove(K1,futureW2) succeeds%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant T1 as Thread T1 (W1: K1+K2) participant T2 as Thread T2 (W2: K1) participant SL as Striped<Lock> participant KWT as keyedWriteTails (CHM) participant AS as AsyncService T1->>SL: "bulkGet({K1,K2}) canonical order" T1->>SL: lock(S1) T1->>SL: lock(S2) T2->>SL: lock(S1) BLOCKS T1->>KWT: getOrDefault(K1,K2) prev tails T1->>AS: thenRunAsync(task1) futureW1 T1->>KWT: put(K1,futureW1) put(K2,futureW1) T1->>SL: unlock(S2) unlock(S1) T2->>SL: lock(S1) acquired T2->>KWT: getOrDefault(K1) futureW1 T2->>AS: thenRunAsync(task2) futureW2 T2->>KWT: put(K1,futureW2) T2->>SL: unlock(S1) AS-->>KWT: futureW1 completes remove(K1,futureW1) no-op AS-->>KWT: futureW2 completes remove(K1,futureW2) succeedsReviews (1): Last reviewed commit: "perf(rdf): shard keyed-write ordering lo..." | Re-trigger Greptile