From ad828f4755e481fde56e1680c1e64d611e00a688 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 12 Apr 2026 22:04:07 +0530 Subject: [PATCH 1/4] WIP --- .../FlatCollectionWriteTest.java | 129 +++++ .../core/documentstore/Collection.java | 56 ++- .../postgres/FlatPostgresCollection.java | 446 ++++++++++++++++-- 3 files changed, 594 insertions(+), 37 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 3111cc63..9c4922e9 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -3403,6 +3403,135 @@ void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception { } } + @Nested + @DisplayName("Key-Specific Bulk Update Operations") + class KeySpecificBulkUpdateTests { + + @Test + @DisplayName("Should update multiple keys with all operator types in a single batch") + void testBulkUpdateAllOperatorTypes() throws Exception { + Map> updates = new LinkedHashMap<>(); + updates.put( + rawKey("1"), + List.of( + SubDocumentUpdate.of("item", "UpdatedSoap"), + SubDocumentUpdate.builder() + .subDocument("price") + .operator(UpdateOperator.ADD) + .subDocumentValue(SubDocumentValue.of(5)) + .build(), + SubDocumentUpdate.builder() + .subDocument("props.brand") + .operator(UpdateOperator.SET) + .subDocumentValue(SubDocumentValue.of("NewBrand")) + .build())); + + updates.put( + rawKey("3"), + List.of( + SubDocumentUpdate.builder() + .subDocument("props.brand") + .operator(UpdateOperator.UNSET) + .build(), + SubDocumentUpdate.builder() + .subDocument("tags") + .operator(UpdateOperator.APPEND_TO_LIST) + .subDocumentValue(SubDocumentValue.of(new String[] {"newTag1", "newTag2"})) + .build())); + + updates.put( + rawKey("5"), + List.of( + SubDocumentUpdate.builder() + .subDocument("tags") + .operator(UpdateOperator.ADD_TO_LIST_IF_ABSENT) + .subDocumentValue(SubDocumentValue.of(new String[] {"hygiene", "uniqueTag"})) + .build())); + + updates.put( + rawKey("6"), + List.of( + SubDocumentUpdate.builder() + .subDocument("tags") + .operator(UpdateOperator.REMOVE_ALL_FROM_LIST) + .subDocumentValue(SubDocumentValue.of(new String[] {"plastic"})) + .build())); + + BulkUpdateResult result = flatCollection.bulkUpdate(updates, UpdateOptions.builder().build()); + + assertEquals(4, result.getUpdatedCount()); + + try (CloseableIterator iter = flatCollection.find(queryById("1"))) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + assertEquals("UpdatedSoap", json.get("item").asText()); + assertEquals(15, json.get("price").asInt()); // 10 + 5 + assertEquals("NewBrand", json.get("props").get("brand").asText()); + assertEquals("M", json.get("props").get("size").asText()); // preserved + } + + try (CloseableIterator iter = flatCollection.find(queryById("3"))) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + assertFalse(json.get("props").has("brand")); + assertEquals("L", json.get("props").get("size").asText()); // preserved + JsonNode tagsNode = json.get("tags"); + assertEquals(6, tagsNode.size()); // Original 4 + 2 new + } + + try (CloseableIterator iter = flatCollection.find(queryById("5"))) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + JsonNode tagsNode = json.get("tags"); + assertEquals(4, tagsNode.size()); // Original 3 + 1 new unique + Set tags = new HashSet<>(); + tagsNode.forEach(n -> tags.add(n.asText())); + assertTrue(tags.contains("uniqueTag")); + } + + try (CloseableIterator iter = flatCollection.find(queryById("6"))) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + JsonNode tagsNode = json.get("tags"); + assertEquals(2, tagsNode.size()); // grooming, essential remain + Set tags = new HashSet<>(); + tagsNode.forEach(n -> tags.add(n.asText())); + assertFalse(tags.contains("plastic")); + } + } + + @Test + @DisplayName("Should handle edge cases: empty map, null map, non-existent keys") + void testBulkUpdateEdgeCases() throws Exception { + UpdateOptions options = UpdateOptions.builder().build(); + + // Empty map + assertEquals(0, flatCollection.bulkUpdate(new HashMap<>(), options).getUpdatedCount()); + + // Null map + Map> nullUpdates = null; + assertEquals(0, flatCollection.bulkUpdate(nullUpdates, options).getUpdatedCount()); + + // Non-existent key + Map> nonExistent = new LinkedHashMap<>(); + nonExistent.put(rawKey("non-existent"), List.of(SubDocumentUpdate.of("item", "X"))); + assertEquals(0, flatCollection.bulkUpdate(nonExistent, options).getUpdatedCount()); + } + + // Creates a key with raw ID (matching test data format) + private Key rawKey(String id) { + return Key.from(id); + } + + private Query queryById(String id) { + return Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), RelationalOperator.EQ, ConstantExpression.of(id))) + .build(); + } + } + private static void executeInsertStatements() { PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; try { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index c1d5357a..0cee8720 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -21,8 +21,8 @@ public interface Collection { * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How - * the existing fields are modified is implementation specific. For example, upserting { - * "foo2": "bar2" } + * the existing fields are modified is implementation specific. For example, upserting + * { "foo2": "bar2" } * if a document * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to @@ -42,8 +42,8 @@ public interface Collection { * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How - * the existing fields are modified is implementation specific. For example, upserting { - * "foo2": "bar2" } + * the existing fields are modified is implementation specific. For example, upserting + * { "foo2": "bar2" } * if a document * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to @@ -398,5 +398,53 @@ CloseableIterator bulkUpdate( final UpdateOptions updateOptions) throws IOException; + /** + * Bulk update sub-documents with key-specific updates. Each key can have its own set of + * SubDocumentUpdate operations, allowing different updates per document. + * + *

This method supports all update operators (SET, UNSET, ADD, APPEND_TO_LIST, + * ADD_TO_LIST_IF_ABSENT, REMOVE_ALL_FROM_LIST). Updates for each individual key are applied + * atomically, but there is no atomicity guarantee across different keys - some keys may be + * updated while others fail. Any atomicity guarantees are implementation-specific. + * + *

Example usage: + * + *

{@code
+   * Map> updates = new HashMap<>();
+   *
+   * // Key 1: SET a field and ADD to a number
+   * updates.put(key1, List.of(
+   *     SubDocumentUpdate.of("name", "NewName"),
+   *     SubDocumentUpdate.builder()
+   *         .subDocument("count")
+   *         .operator(UpdateOperator.ADD)
+   *         .subDocumentValue(SubDocumentValue.of(5))
+   *         .build()
+   * ));
+   *
+   * // Key 2: APPEND to an array
+   * updates.put(key2, List.of(
+   *     SubDocumentUpdate.builder()
+   *         .subDocument("tags")
+   *         .operator(UpdateOperator.APPEND_TO_LIST)
+   *         .subDocumentValue(SubDocumentValue.of(new String[]{"newTag"}))
+   *         .build()
+   * ));
+   *
+   * BulkUpdateResult result = collection.bulkUpdate(updates, UpdateOptions.builder().build());
+   * }
+ * + * @param updates Map of Key to Collection of SubDocumentUpdate operations. Each key's updates are + * applied atomically, but no cross-key atomicity is guaranteed. + * @param updateOptions Options for the update operation + * @return BulkUpdateResult containing the count of successfully updated documents + * @throws IOException if the update operation fails + */ + default BulkUpdateResult bulkUpdate( + Map> updates, UpdateOptions updateOptions) + throws IOException { + throw new UnsupportedOperationException("bulkUpdate is not supported!"); + } + String UNSUPPORTED_QUERY_OPERATION = "Query operation is not supported"; } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index ad8c1d1a..e77be6e6 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -25,8 +25,19 @@ import java.sql.Timestamp; import java.sql.Types; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest; import org.hypertrace.core.documentstore.BulkDeleteResult; @@ -217,8 +228,255 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { } @Override - public BulkUpdateResult bulkUpdateSubDocs(Map> documents) { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + public BulkUpdateResult bulkUpdateSubDocs(Map> documents) + throws Exception { + if (documents == null || documents.isEmpty()) { + return new BulkUpdateResult(0); + } + + String tableName = tableIdentifier.getTableName(); + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + + Set updatedKeys = new HashSet<>(); + + try (Connection connection = client.getTransactionalConnection()) { + try { + // Lock all rows upfront with SELECT ... FOR UPDATE to ensure atomicity + String lockSql = + String.format( + "SELECT %s FROM %s WHERE %s = ANY(?) FOR UPDATE", + quotedPkColumn, tableIdentifier, quotedPkColumn); + + String[] keyArray = documents.keySet().stream().map(Key::toString).toArray(String[]::new); + Array sqlArray = connection.createArrayOf("text", keyArray); + + try (PreparedStatement lockPs = connection.prepareStatement(lockSql)) { + lockPs.setArray(1, sqlArray); + lockPs.executeQuery(); // Acquire locks on all rows + } + + // Now execute updates for each key + for (Map.Entry> entry : documents.entrySet()) { + Key key = entry.getKey(); + Map subDocUpdates = entry.getValue(); + + if (subDocUpdates == null || subDocUpdates.isEmpty()) { + continue; + } + + boolean updated = + executeSubDocUpdatesForKey(connection, key, subDocUpdates, tableName, quotedPkColumn); + if (updated) { + updatedKeys.add(key); + } + } + + // Update lastUpdatedTime within the same transaction + updateLastUpdatedTimeForKeys(connection, updatedKeys, tableName, quotedPkColumn); + + connection.commit(); + } catch (Exception e) { + connection.rollback(); + throw e; + } + } catch (SQLException e) { + LOGGER.error("SQLException in bulkUpdateSubDocs", e); + throw e; + } + + return new BulkUpdateResult(updatedKeys.size()); + } + + /** + * Executes sub-document updates for a single key. + * + * @return true if at least one update was successful + */ + private boolean executeSubDocUpdatesForKey( + Connection connection, + Key key, + Map subDocUpdates, + String tableName, + String quotedPkColumn) + throws SQLException { + + // Group updates by resolved column + Map> updatesByColumn = new LinkedHashMap<>(); + + for (Map.Entry subDocEntry : subDocUpdates.entrySet()) { + String subDocPath = subDocEntry.getKey(); + Document subDocument = subDocEntry.getValue(); + + Optional columnName = resolveColumnName(subDocPath, tableName); + if (columnName.isEmpty()) { + if (missingColumnStrategy == MissingColumnStrategy.THROW) { + throw new IllegalArgumentException( + "Column not found in schema for path: " + + subDocPath + + " and missing column strategy is configured to: " + + missingColumnStrategy); + } + LOGGER.warn("Skipping update for unresolved path: {}", subDocPath); + continue; + } + + updatesByColumn + .computeIfAbsent(columnName.get(), k -> new ArrayList<>()) + .add(new SubDocUpdateEntry(subDocPath, subDocument)); + } + + if (updatesByColumn.isEmpty()) { + return false; + } + + // Build SET clause fragments - one per column + List setFragments = new ArrayList<>(); + List params = new ArrayList<>(); + + for (Map.Entry> columnEntry : updatesByColumn.entrySet()) { + String columnName = columnEntry.getKey(); + List columnUpdates = columnEntry.getValue(); + + PostgresColumnMetadata colMeta = + schemaRegistry.getColumnOrRefresh(tableName, columnName).orElseThrow(); + + // Track the current expression for chaining nested JSONB updates + String currentExpr = String.format("\"%s\"", columnName); + String fragment = null; + + for (SubDocUpdateEntry updateEntry : columnUpdates) { + String fullPath = updateEntry.path; + String[] nestedPath = getNestedPath(fullPath, columnName); + boolean isTopLevel = nestedPath.length == 0; + + if (isTopLevel) { + // Top-level column update - set the entire column value + PostgresDataType colType = colMeta.getPostgresType(); + if (colType == PostgresDataType.JSONB) { + // JSONB columns need explicit cast + fragment = String.format("\"%s\" = ?::jsonb", columnName); + params.add(updateEntry.document.toJson()); + } else { + fragment = String.format("\"%s\" = ?", columnName); + params.add(prepareValueForColumn(updateEntry.document, colMeta)); + } + } else { + // Nested JSONB path update using jsonb_set + String jsonPath = formatJsonbPath(nestedPath); + String jsonValue = updateEntry.document.toJson(); + String valueExpr = + String.format("jsonb_set(%s, '%s'::text[], ?::jsonb)", currentExpr, jsonPath); + currentExpr = valueExpr; + fragment = String.format("\"%s\" = %s", columnName, valueExpr); + params.add(jsonValue); + } + } + + if (fragment != null) { + setFragments.add(fragment); + } + } + + if (setFragments.isEmpty()) { + return false; + } + + // Build and execute UPDATE SQL + String sql = + String.format( + "UPDATE %s SET %s WHERE %s = ?", + tableIdentifier, String.join(", ", setFragments), quotedPkColumn); + + params.add(key.toString()); + + LOGGER.debug("Executing sub-doc update SQL: {}", sql); + + try (PreparedStatement ps = connection.prepareStatement(sql)) { + int idx = 1; + for (Object param : params) { + ps.setObject(idx++, param); + } + int rowsUpdated = ps.executeUpdate(); + return rowsUpdated > 0; + } + } + + /** + * Prepares a Document value for insertion into a column based on its type. For JSONB columns, + * returns the JSON string. For other types, extracts the appropriate scalar value. + */ + private Object prepareValueForColumn(Document document, PostgresColumnMetadata colMeta) + throws SQLException { + try { + JsonNode jsonNode = MAPPER.readTree(document.toJson()); + PostgresDataType type = colMeta.getPostgresType(); + + if (type == PostgresDataType.JSONB) { + return document.toJson(); + } + + // For non-JSONB columns, extract the scalar value + return extractValue(jsonNode, type, colMeta.isArray()); + } catch (IOException e) { + throw new SQLException("Failed to parse document for column: " + colMeta.getName(), e); + } + } + + /** Formats a nested path array into PostgreSQL jsonb path format: {path1,path2,path3} */ + private String formatJsonbPath(String[] nestedPath) { + return "{" + String.join(",", nestedPath) + "}"; + } + + /** + * Updates the lastUpdatedTime column for the given keys using the provided connection. This + * allows the update to be part of an existing transaction. + */ + private void updateLastUpdatedTimeForKeys( + Connection connection, Set keys, String tableName, String quotedPkColumn) + throws SQLException { + if (keys.isEmpty() || lastUpdatedTsColumn == null) { + return; + } + + String quotedTsColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(lastUpdatedTsColumn); + + Optional tsColMeta = + schemaRegistry.getColumnOrRefresh(tableName, lastUpdatedTsColumn); + if (tsColMeta.isEmpty()) { + LOGGER.warn( + "lastUpdatedTsColumn '{}' not found in schema, skipping timestamp update", + lastUpdatedTsColumn); + return; + } + + long now = System.currentTimeMillis(); + Object tsValue = convertTimestampForType(now, tsColMeta.get().getPostgresType()); + + String sql = + String.format( + "UPDATE %s SET %s = ? WHERE %s = ?", tableIdentifier, quotedTsColumn, quotedPkColumn); + + try (PreparedStatement ps = connection.prepareStatement(sql)) { + for (Key key : keys) { + ps.setObject(1, tsValue); + ps.setString(2, key.toString()); + ps.addBatch(); + } + ps.executeBatch(); + } + } + + /** Helper class to hold sub-document update information. */ + private static class SubDocUpdateEntry { + + final String path; + final Document document; + + SubDocUpdateEntry(String path, Document document) { + this.path = path; + this.document = document; + } } @Override @@ -896,6 +1154,104 @@ public CloseableIterator bulkUpdate( } } + @Override + public BulkUpdateResult bulkUpdate( + Map> updates, UpdateOptions updateOptions) + throws IOException { + + if (updates == null || updates.isEmpty()) { + return new BulkUpdateResult(0); + } + + Preconditions.checkArgument(updateOptions != null, "UpdateOptions cannot be NULL"); + + String tableName = tableIdentifier.getTableName(); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(getPKForTable(tableName)); + + Set updatedKeys = new HashSet<>(); + for (Map.Entry> entry : updates.entrySet()) { + Key key = entry.getKey(); + Collection keyUpdates = entry.getValue(); + + if (keyUpdates == null || keyUpdates.isEmpty()) { + continue; + } + + try { + boolean updated = updateSingleKey(key, keyUpdates, tableName, quotedPkColumn); + if (updated) { + updatedKeys.add(key); + } + } catch (Exception e) { + LOGGER.warn("Failed to update key {}: {}", key, e.getMessage()); + // Continue with other keys - no cross-key atomicity + } + } + + return new BulkUpdateResult(updatedKeys.size()); + } + + private boolean updateSingleKey( + Key key, Collection keyUpdates, String tableName, String quotedPkColumn) + throws IOException { + + updateValidator.validate(keyUpdates); + Map resolvedColumns = resolvePathsToColumns(keyUpdates, tableName); + + try (Connection connection = client.getPooledConnection()) { + return executeKeyUpdate( + connection, key, keyUpdates, tableName, quotedPkColumn, resolvedColumns); + } catch (SQLException e) { + throw new IOException("Failed to update key: " + key, e); + } + } + + private boolean executeKeyUpdate( + Connection connection, + Key key, + java.util.Collection keyUpdates, + String tableName, + String quotedPkColumn, + Map resolvedColumns) + throws SQLException { + + List setFragments = new ArrayList<>(); + List params = new ArrayList<>(); + + boolean hasUpdates = + buildSetClauseFragments( + connection, keyUpdates, tableName, resolvedColumns, setFragments, params); + + if (!hasUpdates) { + return false; + } + + // Add lastUpdatedTime to the same UPDATE statement + if (lastUpdatedTsColumn != null) { + setFragments.add(String.format("\"%s\" = ?", lastUpdatedTsColumn)); + params.add(new Timestamp(System.currentTimeMillis())); + } + + // Build and execute UPDATE SQL for this key + String sql = + String.format( + "UPDATE %s SET %s WHERE %s = ?", + tableIdentifier, String.join(", ", setFragments), quotedPkColumn); + + params.add(key.toString()); + + LOGGER.debug("Executing key update SQL: {}", sql); + + try (PreparedStatement ps = connection.prepareStatement(sql)) { + int idx = 1; + for (Object param : params) { + ps.setObject(idx++, param); + } + int rowsUpdated = ps.executeUpdate(); + return rowsUpdated > 0; + } + } + /** * Validates all updates and resolves column names. * @@ -1014,6 +1370,56 @@ private void executeUpdate( String filterClause = filterParser.buildFilterClause(); Params filterParams = filterParser.getParamsBuilder().build(); + List setFragments = new ArrayList<>(); + List params = new ArrayList<>(); + + boolean hasUpdates = + buildSetClauseFragments( + connection, updates, tableName, resolvedColumns, setFragments, params); + + if (!hasUpdates) { + LOGGER.warn("All update paths were skipped - no valid columns to update"); + return; + } + + // Build final UPDATE SQL + String sql = + String.format( + "UPDATE %s SET %s %s", tableIdentifier, String.join(", ", setFragments), filterClause); + + LOGGER.debug("Executing update SQL: {}", sql); + + try (PreparedStatement ps = connection.prepareStatement(sql)) { + int idx = 1; + for (Object param : params) { + ps.setObject(idx++, param); + } + for (Object param : filterParams.getObjectParams().values()) { + ps.setObject(idx++, param); + } + int rowsUpdated = ps.executeUpdate(); + LOGGER.debug("Rows updated: {}", rowsUpdated); + } catch (SQLException e) { + LOGGER.error("Failed to execute update. SQL: {}, SQLState: {}", sql, e.getSQLState(), e); + throw e; + } + } + + /** + * Builds SET clause fragments for an UPDATE statement by grouping updates by column and chaining + * nested JSONB updates. + * + * @return true if at least one valid update fragment was built, false otherwise + */ + private boolean buildSetClauseFragments( + Connection connection, + Collection updates, + String tableName, + Map resolvedColumns, + List setFragments, + List params) + throws SQLException { + // Group updates by column to handle multiple nested updates to the same JSONB column Map> updatesByColumn = new LinkedHashMap<>(); for (SubDocumentUpdate update : updates) { @@ -1026,9 +1432,9 @@ private void executeUpdate( updatesByColumn.computeIfAbsent(columnName, k -> new ArrayList<>()).add(update); } - // Build SET clause fragments - one per column - List setFragments = new ArrayList<>(); - List params = new ArrayList<>(); + if (updatesByColumn.isEmpty()) { + return false; + } for (Map.Entry> entry : updatesByColumn.entrySet()) { String columnName = entry.getKey(); @@ -1095,33 +1501,7 @@ private void executeUpdate( } } - // If all updates were skipped, nothing to do - if (setFragments.isEmpty()) { - LOGGER.warn("All update paths were skipped - no valid columns to update"); - return; - } - - // Build final UPDATE SQL - String sql = - String.format( - "UPDATE %s SET %s %s", tableIdentifier, String.join(", ", setFragments), filterClause); - - LOGGER.debug("Executing update SQL: {}", sql); - - try (PreparedStatement ps = connection.prepareStatement(sql)) { - int idx = 1; - for (Object param : params) { - ps.setObject(idx++, param); - } - for (Object param : filterParams.getObjectParams().values()) { - ps.setObject(idx++, param); - } - int rowsUpdated = ps.executeUpdate(); - LOGGER.debug("Rows updated: {}", rowsUpdated); - } catch (SQLException e) { - LOGGER.error("Failed to execute update. SQL: {}, SQLState: {}", sql, e.getSQLState(), e); - throw e; - } + return !setFragments.isEmpty(); } /*isRetry: Whether this is a retry attempt*/ From b6c0237b88c77c5c48a13a57c44d059d29463ae9 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 12 Apr 2026 22:08:23 +0530 Subject: [PATCH 2/4] WIP --- .../postgres/FlatPostgresCollection.java | 251 +----------------- 1 file changed, 2 insertions(+), 249 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index e77be6e6..42075284 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -228,255 +228,8 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) { } @Override - public BulkUpdateResult bulkUpdateSubDocs(Map> documents) - throws Exception { - if (documents == null || documents.isEmpty()) { - return new BulkUpdateResult(0); - } - - String tableName = tableIdentifier.getTableName(); - String pkColumn = getPKForTable(tableName); - String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); - - Set updatedKeys = new HashSet<>(); - - try (Connection connection = client.getTransactionalConnection()) { - try { - // Lock all rows upfront with SELECT ... FOR UPDATE to ensure atomicity - String lockSql = - String.format( - "SELECT %s FROM %s WHERE %s = ANY(?) FOR UPDATE", - quotedPkColumn, tableIdentifier, quotedPkColumn); - - String[] keyArray = documents.keySet().stream().map(Key::toString).toArray(String[]::new); - Array sqlArray = connection.createArrayOf("text", keyArray); - - try (PreparedStatement lockPs = connection.prepareStatement(lockSql)) { - lockPs.setArray(1, sqlArray); - lockPs.executeQuery(); // Acquire locks on all rows - } - - // Now execute updates for each key - for (Map.Entry> entry : documents.entrySet()) { - Key key = entry.getKey(); - Map subDocUpdates = entry.getValue(); - - if (subDocUpdates == null || subDocUpdates.isEmpty()) { - continue; - } - - boolean updated = - executeSubDocUpdatesForKey(connection, key, subDocUpdates, tableName, quotedPkColumn); - if (updated) { - updatedKeys.add(key); - } - } - - // Update lastUpdatedTime within the same transaction - updateLastUpdatedTimeForKeys(connection, updatedKeys, tableName, quotedPkColumn); - - connection.commit(); - } catch (Exception e) { - connection.rollback(); - throw e; - } - } catch (SQLException e) { - LOGGER.error("SQLException in bulkUpdateSubDocs", e); - throw e; - } - - return new BulkUpdateResult(updatedKeys.size()); - } - - /** - * Executes sub-document updates for a single key. - * - * @return true if at least one update was successful - */ - private boolean executeSubDocUpdatesForKey( - Connection connection, - Key key, - Map subDocUpdates, - String tableName, - String quotedPkColumn) - throws SQLException { - - // Group updates by resolved column - Map> updatesByColumn = new LinkedHashMap<>(); - - for (Map.Entry subDocEntry : subDocUpdates.entrySet()) { - String subDocPath = subDocEntry.getKey(); - Document subDocument = subDocEntry.getValue(); - - Optional columnName = resolveColumnName(subDocPath, tableName); - if (columnName.isEmpty()) { - if (missingColumnStrategy == MissingColumnStrategy.THROW) { - throw new IllegalArgumentException( - "Column not found in schema for path: " - + subDocPath - + " and missing column strategy is configured to: " - + missingColumnStrategy); - } - LOGGER.warn("Skipping update for unresolved path: {}", subDocPath); - continue; - } - - updatesByColumn - .computeIfAbsent(columnName.get(), k -> new ArrayList<>()) - .add(new SubDocUpdateEntry(subDocPath, subDocument)); - } - - if (updatesByColumn.isEmpty()) { - return false; - } - - // Build SET clause fragments - one per column - List setFragments = new ArrayList<>(); - List params = new ArrayList<>(); - - for (Map.Entry> columnEntry : updatesByColumn.entrySet()) { - String columnName = columnEntry.getKey(); - List columnUpdates = columnEntry.getValue(); - - PostgresColumnMetadata colMeta = - schemaRegistry.getColumnOrRefresh(tableName, columnName).orElseThrow(); - - // Track the current expression for chaining nested JSONB updates - String currentExpr = String.format("\"%s\"", columnName); - String fragment = null; - - for (SubDocUpdateEntry updateEntry : columnUpdates) { - String fullPath = updateEntry.path; - String[] nestedPath = getNestedPath(fullPath, columnName); - boolean isTopLevel = nestedPath.length == 0; - - if (isTopLevel) { - // Top-level column update - set the entire column value - PostgresDataType colType = colMeta.getPostgresType(); - if (colType == PostgresDataType.JSONB) { - // JSONB columns need explicit cast - fragment = String.format("\"%s\" = ?::jsonb", columnName); - params.add(updateEntry.document.toJson()); - } else { - fragment = String.format("\"%s\" = ?", columnName); - params.add(prepareValueForColumn(updateEntry.document, colMeta)); - } - } else { - // Nested JSONB path update using jsonb_set - String jsonPath = formatJsonbPath(nestedPath); - String jsonValue = updateEntry.document.toJson(); - String valueExpr = - String.format("jsonb_set(%s, '%s'::text[], ?::jsonb)", currentExpr, jsonPath); - currentExpr = valueExpr; - fragment = String.format("\"%s\" = %s", columnName, valueExpr); - params.add(jsonValue); - } - } - - if (fragment != null) { - setFragments.add(fragment); - } - } - - if (setFragments.isEmpty()) { - return false; - } - - // Build and execute UPDATE SQL - String sql = - String.format( - "UPDATE %s SET %s WHERE %s = ?", - tableIdentifier, String.join(", ", setFragments), quotedPkColumn); - - params.add(key.toString()); - - LOGGER.debug("Executing sub-doc update SQL: {}", sql); - - try (PreparedStatement ps = connection.prepareStatement(sql)) { - int idx = 1; - for (Object param : params) { - ps.setObject(idx++, param); - } - int rowsUpdated = ps.executeUpdate(); - return rowsUpdated > 0; - } - } - - /** - * Prepares a Document value for insertion into a column based on its type. For JSONB columns, - * returns the JSON string. For other types, extracts the appropriate scalar value. - */ - private Object prepareValueForColumn(Document document, PostgresColumnMetadata colMeta) - throws SQLException { - try { - JsonNode jsonNode = MAPPER.readTree(document.toJson()); - PostgresDataType type = colMeta.getPostgresType(); - - if (type == PostgresDataType.JSONB) { - return document.toJson(); - } - - // For non-JSONB columns, extract the scalar value - return extractValue(jsonNode, type, colMeta.isArray()); - } catch (IOException e) { - throw new SQLException("Failed to parse document for column: " + colMeta.getName(), e); - } - } - - /** Formats a nested path array into PostgreSQL jsonb path format: {path1,path2,path3} */ - private String formatJsonbPath(String[] nestedPath) { - return "{" + String.join(",", nestedPath) + "}"; - } - - /** - * Updates the lastUpdatedTime column for the given keys using the provided connection. This - * allows the update to be part of an existing transaction. - */ - private void updateLastUpdatedTimeForKeys( - Connection connection, Set keys, String tableName, String quotedPkColumn) - throws SQLException { - if (keys.isEmpty() || lastUpdatedTsColumn == null) { - return; - } - - String quotedTsColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(lastUpdatedTsColumn); - - Optional tsColMeta = - schemaRegistry.getColumnOrRefresh(tableName, lastUpdatedTsColumn); - if (tsColMeta.isEmpty()) { - LOGGER.warn( - "lastUpdatedTsColumn '{}' not found in schema, skipping timestamp update", - lastUpdatedTsColumn); - return; - } - - long now = System.currentTimeMillis(); - Object tsValue = convertTimestampForType(now, tsColMeta.get().getPostgresType()); - - String sql = - String.format( - "UPDATE %s SET %s = ? WHERE %s = ?", tableIdentifier, quotedTsColumn, quotedPkColumn); - - try (PreparedStatement ps = connection.prepareStatement(sql)) { - for (Key key : keys) { - ps.setObject(1, tsValue); - ps.setString(2, key.toString()); - ps.addBatch(); - } - ps.executeBatch(); - } - } - - /** Helper class to hold sub-document update information. */ - private static class SubDocUpdateEntry { - - final String path; - final Document document; - - SubDocUpdateEntry(String path, Document document) { - this.path = path; - this.document = document; - } + public BulkUpdateResult bulkUpdateSubDocs(Map> documents) { + throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); } @Override From 8679979948e4e28ce7f2ac0c433bc075d43cef4a Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 13 Apr 2026 16:30:51 +0530 Subject: [PATCH 3/4] Execute entire batch on a single pooled connection --- .../postgres/FlatPostgresCollection.java | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 42075284..760dfdc4 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -922,41 +922,47 @@ public BulkUpdateResult bulkUpdate( String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(getPKForTable(tableName)); Set updatedKeys = new HashSet<>(); - for (Map.Entry> entry : updates.entrySet()) { - Key key = entry.getKey(); - Collection keyUpdates = entry.getValue(); - if (keyUpdates == null || keyUpdates.isEmpty()) { - continue; - } + // Use a single connection for all key updates to reduce pool overhead + try (Connection connection = client.getPooledConnection()) { + for (Map.Entry> entry : updates.entrySet()) { + Key key = entry.getKey(); + Collection keyUpdates = entry.getValue(); - try { - boolean updated = updateSingleKey(key, keyUpdates, tableName, quotedPkColumn); - if (updated) { - updatedKeys.add(key); + if (keyUpdates == null || keyUpdates.isEmpty()) { + continue; + } + + try { + boolean updated = updateSingleKey(connection, key, keyUpdates, tableName, quotedPkColumn); + if (updated) { + updatedKeys.add(key); + } + } catch (Exception e) { + LOGGER.warn("Failed to update key {}: {}", key, e.getMessage()); + // Continue with other keys - no cross-key atomicity } - } catch (Exception e) { - LOGGER.warn("Failed to update key {}: {}", key, e.getMessage()); - // Continue with other keys - no cross-key atomicity } + } catch (SQLException e) { + throw new IOException("Failed to get connection for bulk update", e); } return new BulkUpdateResult(updatedKeys.size()); } private boolean updateSingleKey( - Key key, Collection keyUpdates, String tableName, String quotedPkColumn) - throws IOException { + Connection connection, + Key key, + Collection keyUpdates, + String tableName, + String quotedPkColumn) + throws IOException, SQLException { updateValidator.validate(keyUpdates); Map resolvedColumns = resolvePathsToColumns(keyUpdates, tableName); - try (Connection connection = client.getPooledConnection()) { - return executeKeyUpdate( - connection, key, keyUpdates, tableName, quotedPkColumn, resolvedColumns); - } catch (SQLException e) { - throw new IOException("Failed to update key: " + key, e); - } + return executeKeyUpdate( + connection, key, keyUpdates, tableName, quotedPkColumn, resolvedColumns); } private boolean executeKeyUpdate( From 5b732214d4e1ad9624267cbe2989843a8fd0ecac Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Tue, 14 Apr 2026 12:35:04 +0530 Subject: [PATCH 4/4] Update javadocs --- .../org/hypertrace/core/documentstore/Collection.java | 11 ++++++----- .../postgres/FlatPostgresCollection.java | 1 - 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 0cee8720..a79e3c99 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -21,8 +21,8 @@ public interface Collection { * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How - * the existing fields are modified is implementation specific. For example, upserting - * { "foo2": "bar2" } + * the existing fields are modified is implementation specific. For example, upserting { + * "foo2": "bar2" } * if a document * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to @@ -42,8 +42,8 @@ public interface Collection { * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How - * the existing fields are modified is implementation specific. For example, upserting - * { "foo2": "bar2" } + * the existing fields are modified is implementation specific. For example, upserting { + * "foo2": "bar2" } * if a document * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to @@ -405,7 +405,8 @@ CloseableIterator bulkUpdate( *

This method supports all update operators (SET, UNSET, ADD, APPEND_TO_LIST, * ADD_TO_LIST_IF_ABSENT, REMOVE_ALL_FROM_LIST). Updates for each individual key are applied * atomically, but there is no atomicity guarantee across different keys - some keys may be - * updated while others fail. Any atomicity guarantees are implementation-specific. + * updated while others fail. Batch-level atomicity is not guaranteed, while per-key update + * atomicity is guaranteed. * *

Example usage: * diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 760dfdc4..b54b36d1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -923,7 +923,6 @@ public BulkUpdateResult bulkUpdate( Set updatedKeys = new HashSet<>(); - // Use a single connection for all key updates to reduce pool overhead try (Connection connection = client.getPooledConnection()) { for (Map.Entry> entry : updates.entrySet()) { Key key = entry.getKey();