From cac2679d745894965b1f404a0539e26ce3d4aa0b Mon Sep 17 00:00:00 2001 From: Prem Palanisamy Date: Wed, 15 Apr 2026 07:25:07 +0100 Subject: [PATCH] Add skipDuplicates() scope guard to createDocuments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a RAII-style scope guard that lets callers opt into silent duplicate handling for batch inserts. Duplicate-key errors during the wrapped call are silenced at the adapter layer via dialect-specific no-op inserts; everything outside the guarded call behaves normally. Usage: $db->skipDuplicates(fn () => $db->createDocuments($collection, $docs)); Adapter dialects: - MariaDB/MySQL: INSERT IGNORE INTO - Postgres: INSERT INTO ... ON CONFLICT (...) DO NOTHING - SQLite: INSERT OR IGNORE INTO - Mongo: upsert + \$setOnInsert, bypassing the transaction wrap to avoid txn-abort-on-duplicate Hot-path cost: one property read + one boolean branch per chunk. Callers that don't use skipDuplicates pay no closure allocation and no scope setup. Mirror::createDocuments: the skipDuplicates path re-fetches source's authoritative state via find() after the adapter insert settles, then forwards that to destination instead of the caller's input. This is race-free because the query runs after source's write has resolved — regardless of whether rows were inserted, skipped, or raced with concurrent writers, destination always receives a faithful snapshot of source. Fixes Greptile #3084293974: captureOnNext-based approach forwarded would-be values for source-skipped duplicates, diverging destination from source. upsertDocumentsWithIncrease: restore per-tenant grouping for the existing-doc lookup when running in sharedTables + tenantPerDocument mode. The previously-inlined find() ran under the session tenant context (null for platform workers) and silently missed rows from other tenants, which broke the StatsUsage worker flushing stats across projects. Per-tenant grouping uses K queries (K = unique tenants in the batch) instead of N (per-doc) or 1 (broken). The common single-tenant path still hits the fast batched code. Tests cover every adapter: intra-batch duplicates, cross-batch duplicates, nested scope, large batches, relationships, Mirror dual-write with authoritative state forwarding. --- src/Database/Adapter.php | 23 ++ src/Database/Adapter/Mongo.php | 41 +++ src/Database/Adapter/Pool.php | 15 + src/Database/Adapter/Postgres.php | 29 ++ src/Database/Adapter/SQL.php | 36 +- src/Database/Adapter/SQLite.php | 5 + src/Database/Database.php | 90 ++++- src/Database/Mirror.php | 85 +++++ tests/e2e/Adapter/MirrorTest.php | 77 ++++ tests/e2e/Adapter/Scopes/DocumentTests.php | 405 +++++++++++++++++++++ 10 files changed, 789 insertions(+), 17 deletions(-) diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index a7b385cce..1678024ee 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -33,6 +33,8 @@ abstract class Adapter protected bool $alterLocks = false; + protected bool $skipDuplicates = false; + /** * @var array */ @@ -392,6 +394,27 @@ public function inTransaction(): bool return $this->inTransaction > 0; } + /** + * Run a callback with skipDuplicates enabled. + * Duplicate key errors during createDocuments() will be silently skipped + * instead of thrown. Nestable — saves and restores previous state. + * + * @template T + * @param callable(): T $callback + * @return T + */ + public function skipDuplicates(callable $callback): mixed + { + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + /** * @template T * @param callable(): T $callback diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index b654b436e..311b60476 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -122,6 +122,11 @@ public function withTransaction(callable $callback): mixed return $callback(); } + // upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation. + if ($this->skipDuplicates) { + return $callback(); + } + try { $this->startTransaction(); $result = $callback(); @@ -1492,6 +1497,42 @@ public function createDocuments(Document $collection, array $documents): array $records[] = $record; } + // insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead. + if ($this->skipDuplicates) { + if (empty($records)) { + return []; + } + + $operations = []; + foreach ($records as $record) { + $filter = ['_uid' => $record['_uid'] ?? '']; + if ($this->sharedTables) { + $filter['_tenant'] = $record['_tenant'] ?? $this->getTenant(); + } + + // Filter fields can't reappear in $setOnInsert (mongo path-conflict error). + $setOnInsert = $record; + unset($setOnInsert['_uid'], $setOnInsert['_tenant']); + + if (empty($setOnInsert)) { + continue; + } + + $operations[] = [ + 'filter' => $filter, + 'update' => ['$setOnInsert' => $setOnInsert], + ]; + } + + try { + $this->client->upsert($name, $operations, $options); + } catch (MongoException $e) { + throw $this->processException($e); + } + + return $documents; + } + try { $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 668753387..7bbfb98f2 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -43,6 +43,11 @@ public function __construct(UtopiaPool $pool) public function delegate(string $method, array $args): mixed { if ($this->pinnedAdapter !== null) { + if ($this->skipDuplicates) { + return $this->pinnedAdapter->skipDuplicates( + fn () => $this->pinnedAdapter->{$method}(...$args) + ); + } return $this->pinnedAdapter->{$method}(...$args); } @@ -66,6 +71,11 @@ public function delegate(string $method, array $args): mixed $adapter->setMetadata($key, $value); } + if ($this->skipDuplicates) { + return $adapter->skipDuplicates( + fn () => $adapter->{$method}(...$args) + ); + } return $adapter->{$method}(...$args); }); } @@ -146,6 +156,11 @@ public function withTransaction(callable $callback): mixed $this->pinnedAdapter = $adapter; try { + if ($this->skipDuplicates) { + return $adapter->skipDuplicates( + fn () => $adapter->withTransaction($callback) + ); + } return $adapter->withTransaction($callback); } finally { $this->pinnedAdapter = null; diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 8dcf72025..2c27e08e7 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -2350,6 +2350,35 @@ public function getSupportForOptionalSpatialAttributeWithExistingRows(): bool return false; } + protected function getInsertKeyword(): string + { + return 'INSERT INTO'; + } + + protected function getInsertSuffix(string $table): string + { + if (!$this->skipDuplicates) { + return ''; + } + + $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + protected function getInsertPermissionsSuffix(): string + { + if (!$this->skipDuplicates) { + return ''; + } + + $conflictTarget = $this->sharedTables + ? '("_type", "_permission", "_document", "_tenant")' + : '("_type", "_permission", "_document")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + public function decodePoint(string $wkb): array { if (str_starts_with(strtoupper($wkb), 'POINT(')) { diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6864e6aee..3fe2696db 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -1029,6 +1029,33 @@ public function getSupportForHostname(): bool return true; } + /** + * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertKeyword(): string + { + return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + } + + /** + * Returns a suffix appended after VALUES clause for duplicate handling. + * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + */ + protected function getInsertSuffix(string $table): string + { + return ''; + } + + /** + * Returns a suffix for the permissions INSERT statement when ignoring duplicates. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertPermissionsSuffix(): string + { + return ''; + } + /** * Get current attribute count from collection document * @@ -2476,6 +2503,7 @@ public function createDocuments(Document $collection, array $documents): array if (empty($documents)) { return $documents; } + $spatialAttributes = $this->getSpatialAttributes($collection); $collection = $collection->getId(); try { @@ -2573,8 +2601,9 @@ public function createDocuments(Document $collection, array $documents): array $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - INSERT INTO {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} + {$this->getInsertSuffix($name)} "); foreach ($bindValues as $key => $value) { @@ -2588,8 +2617,9 @@ public function createDocuments(Document $collection, array $documents): array $permissions = \implode(', ', $permissions); $sqlPermissions = " - INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) - VALUES {$permissions}; + {$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + VALUES {$permissions} + {$this->getInsertPermissionsSuffix()} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 3c25987eb..33f370775 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -1936,4 +1936,9 @@ public function getSupportForTTLIndexes(): bool { return false; } + + protected function getInsertKeyword(): string + { + return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + } } diff --git a/src/Database/Database.php b/src/Database/Database.php index bae99ae79..62681793d 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -417,6 +417,8 @@ class Database protected bool $preserveDates = false; + protected bool $skipDuplicates = false; + protected bool $preserveSequence = false; protected int $maxQueryValues = 5000; @@ -842,6 +844,29 @@ public function skipRelationshipsExistCheck(callable $callback): mixed } } + public function skipDuplicates(callable $callback): mixed + { + $previous = $this->skipDuplicates; + $this->skipDuplicates = true; + + try { + return $callback(); + } finally { + $this->skipDuplicates = $previous; + } + } + + /** + * Build a tenant-aware identity key for a document. + * Returns ":" in tenant-per-document shared-table mode, otherwise just the id. + */ + private function tenantKey(Document $document): string + { + return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + } + /** * Trigger callback for events * @@ -5700,9 +5725,11 @@ public function createDocuments( } foreach (\array_chunk($documents, $batchSize) as $chunk) { - $batch = $this->withTransaction(function () use ($collection, $chunk) { - return $this->adapter->createDocuments($collection, $chunk); - }); + $insert = fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk)); + // Set adapter flag before withTransaction so Mongo can opt out of a real txn. + $batch = $this->skipDuplicates + ? $this->adapter->skipDuplicates($insert) + : $insert(); $batch = $this->adapter->getSequences($collection->getId(), $batch); @@ -7116,18 +7143,53 @@ public function upsertDocumentsWithIncrease( $created = 0; $updated = 0; $seenIds = []; - foreach ($documents as $key => $document) { - if ($this->getSharedTables() && $this->getTenantPerDocument()) { - $old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - )))); - } else { - $old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), + + // Batch-fetch existing documents in one query instead of N individual getDocument() calls. + // tenantPerDocument: group ids by tenant and run one find() per tenant under withTenant, + // so cross-tenant batches (e.g. StatsUsage worker) don't get silently scoped to the + // session tenant and miss rows belonging to other tenants. + $existingDocs = []; + + if ($this->getSharedTables() && $this->getTenantPerDocument()) { + $idsByTenant = []; + foreach ($documents as $doc) { + if ($doc->getId() !== '') { + $idsByTenant[$doc->getTenant()][] = $doc->getId(); + } + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + $found = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent( + fn () => $this->find($collection->getId(), [ + Query::equal('$id', $tenantIds), + Query::limit(\count($tenantIds)), + ]) ))); + foreach ($found as $doc) { + $existingDocs[$tenant . ':' . $doc->getId()] = $doc; + } + } + } else { + $docIds = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents), + fn ($id) => $id !== '' + ))); + + if (!empty($docIds)) { + $existing = $this->authorization->skip(fn () => $this->silent( + fn () => $this->find($collection->getId(), [ + Query::equal('$id', $docIds), + Query::limit(\count($docIds)), + ]) + )); + foreach ($existing as $doc) { + $existingDocs[$this->tenantKey($doc)] = $doc; + } } + } + + foreach ($documents as $key => $document) { + $old = $existingDocs[$this->tenantKey($document)] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); @@ -7294,7 +7356,7 @@ public function upsertDocumentsWithIncrease( $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); } - $seenIds[] = $document->getId(); + $seenIds[] = $this->tenantKey($document); $old = $this->adapter->castingBefore($collection, $old); $document = $this->adapter->castingBefore($collection, $document); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index f740cab3e..6f043accf 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -601,6 +601,91 @@ public function createDocuments( ?callable $onNext = null, ?callable $onError = null, ): int { + if ($this->skipDuplicates) { + $modified = $this->source->skipDuplicates( + fn () => $this->source->createDocuments( + $collection, + $documents, + $batchSize, + $onNext, + $onError, + ) + ); + + if ( + \in_array($collection, self::SOURCE_ONLY_COLLECTIONS) + || $this->destination === null + ) { + return $modified; + } + + $upgrade = $this->silent(fn () => $this->getUpgradeStatus($collection)); + if ($upgrade === null || $upgrade->getAttribute('status', '') !== 'upgraded') { + return $modified; + } + + try { + // Adapter-level INSERT IGNORE does not report per-row outcomes, so + // forwarding the caller's input would diverge on source-skipped duplicates. + // Re-fetch source's authoritative state after its write settles and + // forward that — race-free regardless of concurrent writers. + $ids = \array_values(\array_filter( + \array_map(fn (Document $d) => $d->getId(), $documents), + fn ($id) => $id !== '' + )); + + if (empty($ids)) { + return $modified; + } + + $authoritative = $this->source->silent( + fn () => $this->source->find($collection, [ + Query::equal('$id', $ids), + Query::limit(\count($ids)), + ]) + ); + + $clones = []; + foreach ($authoritative as $document) { + $clone = clone $document; + foreach ($this->writeFilters as $filter) { + $clone = $filter->beforeCreateDocument( + source: $this->source, + destination: $this->destination, + collectionId: $collection, + document: $clone, + ); + } + $clones[] = $clone; + } + + $this->destination->skipDuplicates( + fn () => $this->destination->withPreserveDates( + fn () => $this->destination->createDocuments( + $collection, + $clones, + $batchSize, + ) + ) + ); + + foreach ($clones as $clone) { + foreach ($this->writeFilters as $filter) { + $filter->afterCreateDocument( + source: $this->source, + destination: $this->destination, + collectionId: $collection, + document: $clone, + ); + } + } + } catch (\Throwable $err) { + $this->logError('createDocuments', $err); + } + + return $modified; + } + $modified = $this->source->createDocuments( $collection, $documents, diff --git a/tests/e2e/Adapter/MirrorTest.php b/tests/e2e/Adapter/MirrorTest.php index 31bf3f3b6..f4efe1b29 100644 --- a/tests/e2e/Adapter/MirrorTest.php +++ b/tests/e2e/Adapter/MirrorTest.php @@ -313,6 +313,83 @@ public function testDeleteMirroredDocument(): void $this->assertTrue($database->getDestination()->getDocument('testDeleteMirroredDocument', $document->getId())->isEmpty()); } + public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): void + { + $database = $this->getDatabase(); + $collection = 'mirrorSkipDup'; + + $database->createCollection($collection, attributes: [ + new Document([ + '$id' => 'name', + 'type' => Database::VAR_STRING, + 'required' => true, + 'size' => Database::LENGTH_KEY, + ]), + ], permissions: [ + Permission::create(Role::any()), + Permission::read(Role::any()), + ], documentSecurity: false); + + // Seed the SOURCE only (bypass the mirror) with the row we want to + // skipDuplicates over later. Destination intentionally does NOT have it. + $database->getSource()->createDocument($collection, new Document([ + '$id' => 'dup', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ])); + + // Sanity check setup + $this->assertSame( + 'Original', + $database->getSource()->getDocument($collection, 'dup')->getAttribute('name') + ); + $this->assertTrue( + $database->getDestination()->getDocument($collection, 'dup')->isEmpty() + ); + + $database->skipDuplicates(fn () => $database->createDocuments($collection, [ + new Document([ + '$id' => 'dup', + 'name' => 'WouldBe', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'fresh', + 'name' => 'Fresh', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ])); + + $this->assertSame( + 'Original', + $database->getSource()->getDocument($collection, 'dup')->getAttribute('name') + ); + $this->assertSame( + 'Fresh', + $database->getSource()->getDocument($collection, 'fresh')->getAttribute('name') + ); + + // Destination must hold source's authoritative value, not the WouldBe input. + $this->assertSame( + 'Original', + $database->getDestination()->getDocument($collection, 'dup')->getAttribute('name'), + 'Destination must reflect source authoritative state, not caller input' + ); + $this->assertSame( + 'Fresh', + $database->getDestination()->getDocument($collection, 'fresh')->getAttribute('name') + ); + } + protected function deleteColumn(string $collection, string $column): bool { $sqlTable = "`" . self::$source->getDatabase() . "`.`" . self::$source->getNamespace() . "_" . $collection . "`"; diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index 49d75e4b6..c45610c28 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7855,4 +7855,409 @@ public function testRegexInjection(): void // } // $database->deleteCollection($collectionName); // } + + public function testCreateDocumentsIgnoreDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial documents + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Original A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc2', + 'name' => 'Original B', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // Without ignore, duplicates should throw + try { + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + $this->fail('Expected DuplicateException'); + } catch (DuplicateException $e) { + $this->assertNotEmpty($e->getMessage()); + } + + // With skipDuplicates, duplicates should be silently skipped + $emittedIds = []; + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(2, $count); + $this->assertCount(2, $emittedIds); + \sort($emittedIds); + $this->assertSame(['doc1', 'doc3'], $emittedIds); + + $doc1 = $database->getDocument(__FUNCTION__, 'doc1'); + $this->assertSame('Original A', $doc1->getAttribute('name')); + + $doc3 = $database->getDocument(__FUNCTION__, 'doc3'); + $this->assertSame('New C', $doc3->getAttribute('name')); + + // Total should be 3 (doc1, doc2, doc3) + $all = $database->find(__FUNCTION__); + $this->assertCount(3, $all); + } + + public function testCreateDocumentsIgnoreAllDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial document + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // With skipDuplicates, inserting only duplicates should succeed with no new rows + $emittedIds = []; + $collection = __FUNCTION__; + $count = $database->skipDuplicates(function () use ($database, $collection, &$emittedIds) { + return $database->createDocuments($collection, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(1, $count); + $this->assertSame(['existing'], $emittedIds); + + $doc = $database->getDocument(__FUNCTION__, 'existing'); + $this->assertSame('Original', $doc->getAttribute('name')); + + // Still only 1 document + $all = $database->find(__FUNCTION__); + $this->assertCount(1, $all); + } + + public function testCreateDocumentsSkipDuplicatesEmptyBatch(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupEmpty'; + $database->createCollection($collection); + $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); + + $count = $database->skipDuplicates(fn () => $database->createDocuments($collection, [])); + + $this->assertSame(0, $count); + $this->assertCount(0, $database->find($collection)); + } + + public function testCreateDocumentsSkipDuplicatesNestedScope(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupNested'; + $database->createCollection($collection); + $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); + + $makeDoc = fn (string $id, string $name) => new Document([ + '$id' => $id, + 'name' => $name, + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]); + + // Seed an existing doc + $database->createDocuments($collection, [$makeDoc('seed', 'Seed')]); + + // Nested scope — inner scope runs inside outer scope. + // After inner exits, outer state should still be "skip enabled". + // After outer exits, state should restore to "skip disabled". + $countOuter = $database->skipDuplicates(function () use ($database, $collection, $makeDoc) { + // Inner scope: add dup + new + $countInner = $database->skipDuplicates(function () use ($database, $collection, $makeDoc) { + return $database->createDocuments($collection, [ + $makeDoc('seed', 'Dup'), + $makeDoc('innerNew', 'InnerNew'), + ]); + }); + $this->assertSame(2, $countInner); + + // Still inside outer scope — skip flag should still be on + return $database->createDocuments($collection, [ + $makeDoc('seed', 'Dup2'), + $makeDoc('outerNew', 'OuterNew'), + ]); + }); + $this->assertSame(2, $countOuter); + + // After both scopes exit, skip flag is off again — a plain createDocuments + // call with a duplicate should throw. + $thrown = null; + try { + $database->createDocuments($collection, [$makeDoc('seed', 'ShouldThrow')]); + } catch (DuplicateException $e) { + $thrown = $e; + } + $this->assertNotNull($thrown, 'Plain createDocuments after nested scopes should throw on duplicate'); + + // Final state: seed + innerNew + outerNew + $all = $database->find($collection); + $ids = \array_map(fn (Document $d) => $d->getId(), $all); + \sort($ids); + $this->assertSame(['innerNew', 'outerNew', 'seed'], $ids); + } + + public function testCreateDocumentsSkipDuplicatesLargeBatch(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupLarge'; + $database->createCollection($collection); + $database->createAttribute($collection, 'idx', Database::VAR_INTEGER, 0, true); + + // Seed 50 docs + $seed = []; + for ($i = 0; $i < 50; $i++) { + $seed[] = new Document([ + '$id' => 'doc_' . $i, + 'idx' => $i, + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]); + } + $database->createDocuments($collection, $seed); + + // Now call skipDuplicates with 300 docs: 50 existing (0-49) + 250 new (50-299). + // 300 > default INSERT_BATCH_SIZE, so this exercises the chunk loop. + $batch = []; + for ($i = 0; $i < 300; $i++) { + $batch[] = new Document([ + '$id' => 'doc_' . $i, + 'idx' => $i + 1000, // different value so we can detect if existing got overwritten + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]); + } + + $emittedIds = []; + $count = $database->skipDuplicates(function () use ($database, $collection, $batch, &$emittedIds) { + return $database->createDocuments($collection, $batch, onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + + $this->assertSame(300, $count); + $this->assertCount(300, $emittedIds); + + $seedDoc = $database->getDocument($collection, 'doc_25'); + $this->assertSame(25, $seedDoc->getAttribute('idx')); + + $newDoc = $database->getDocument($collection, 'doc_100'); + $this->assertSame(1100, $newDoc->getAttribute('idx')); + + $total = $database->count($collection); + $this->assertSame(300, $total); + } + + public function testCreateDocumentsSkipDuplicatesSecondCallSkipsAll(): void + { + $database = $this->getDatabase(); + + $collection = 'skipDupSecond'; + $database->createCollection($collection); + $database->createAttribute($collection, 'name', Database::VAR_STRING, 128, true); + + $makeBatch = fn (string $name) => \array_map( + fn (string $id) => new Document([ + '$id' => $id, + 'name' => $name, + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ['a', 'b', 'c'] + ); + + // First call — all new + $firstCount = $database->skipDuplicates( + fn () => $database->createDocuments($collection, $makeBatch('First')) + ); + $this->assertSame(3, $firstCount); + + $emittedIds = []; + $secondCount = $database->skipDuplicates(function () use ($database, $collection, $makeBatch, &$emittedIds) { + return $database->createDocuments($collection, $makeBatch('Second'), onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }); + }); + $this->assertSame(3, $secondCount); + \sort($emittedIds); + $this->assertSame(['a', 'b', 'c'], $emittedIds); + + // All three should retain the First values + foreach (['a', 'b', 'c'] as $id) { + $doc = $database->getDocument($collection, $id); + $this->assertSame('First', $doc->getAttribute('name'), "Doc {$id} should not have been overwritten"); + } + } + + public function testCreateDocumentsSkipDuplicatesRelationships(): void + { + $database = $this->getDatabase(); + + if (!$database->getAdapter()->getSupportForRelationships()) { + $this->expectNotToPerformAssertions(); + return; + } + + $parent = 'skipDupParent'; + $child = 'skipDupChild'; + $permissions = [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ]; + + $database->createCollection($parent); + $database->createCollection($child); + $database->createAttribute($parent, 'name', Database::VAR_STRING, 128, true); + $database->createAttribute($child, 'name', Database::VAR_STRING, 128, true); + $database->createRelationship( + collection: $parent, + relatedCollection: $child, + type: Database::RELATION_ONE_TO_MANY, + id: 'children', + ); + + $database->createDocument($parent, new Document([ + '$id' => 'existingParent', + 'name' => 'ExistingParent', + '$permissions' => $permissions, + 'children' => [ + new Document([ + '$id' => 'existingChild', + 'name' => 'ExistingChild', + '$permissions' => $permissions, + ]), + ], + ])); + + $batch = [ + new Document([ + '$id' => 'existingParent', + 'name' => 'ShouldNotOverwrite', + '$permissions' => $permissions, + 'children' => [ + new Document([ + '$id' => 'existingChild', + 'name' => 'ExistingChild', + '$permissions' => $permissions, + ]), + new Document([ + '$id' => 'retryChild', + 'name' => 'RetryChild', + '$permissions' => $permissions, + ]), + ], + ]), + new Document([ + '$id' => 'newParent', + 'name' => 'NewParent', + '$permissions' => $permissions, + 'children' => [ + new Document([ + '$id' => 'newChild', + 'name' => 'NewChild', + '$permissions' => $permissions, + ]), + ], + ]), + ]; + + $database->skipDuplicates(fn () => $database->createDocuments($parent, $batch)); + + $existing = $database->getDocument($parent, 'existingParent'); + $this->assertFalse($existing->isEmpty()); + $this->assertSame('ExistingParent', $existing->getAttribute('name')); + + $existingChildren = $existing->getAttribute('children', []); + $childIds = \array_map(fn (Document $d) => $d->getId(), $existingChildren); + \sort($childIds); + $this->assertSame(['existingChild', 'retryChild'], $childIds); + + $new = $database->getDocument($parent, 'newParent'); + $this->assertFalse($new->isEmpty()); + $this->assertSame('NewParent', $new->getAttribute('name')); + $newChildren = $new->getAttribute('children', []); + $this->assertCount(1, $newChildren); + $this->assertSame('newChild', $newChildren[0]->getId()); + + $allChildren = $database->find($child); + $allChildIds = \array_map(fn (Document $d) => $d->getId(), $allChildren); + \sort($allChildIds); + $this->assertSame(['existingChild', 'newChild', 'retryChild'], $allChildIds); + } }