From 2fd32a83895572c6a0972ea227c8542be7a4c2a2 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Sat, 23 May 2026 14:16:49 -0700 Subject: [PATCH 1/4] Avoid pulling too many objects into memory for indexing --- api/src/org/labkey/api/exp/Lsid.java | 4 +- .../labkey/api/exp/api/ExperimentService.java | 19 ++++++- .../org/labkey/api/search/SearchService.java | 20 ++++++-- assay/src/org/labkey/assay/AssayManager.java | 49 +++++++++++++------ .../experiment/api/ExperimentServiceImpl.java | 48 +++++++++++------- .../experiment/api/SampleTypeServiceImpl.java | 30 ++++++++---- 6 files changed, 120 insertions(+), 50 deletions(-) diff --git a/api/src/org/labkey/api/exp/Lsid.java b/api/src/org/labkey/api/exp/Lsid.java index 0cfa516eba4..2ddfee94a31 100644 --- a/api/src/org/labkey/api/exp/Lsid.java +++ b/api/src/org/labkey/api/exp/Lsid.java @@ -308,9 +308,9 @@ static public String namespaceLikeString(String namespace) return "urn:lsid:%:" + namespace + ".%:%"; } - static public String namespaceFilter(String columnName, String namespace) + static public SQLFragment namespaceFilter(Enum column, String namespace) { - return columnName + " LIKE '" + namespaceLikeString(namespace) + "'"; + return new SQLFragment().appendIdentifier(column.name()).append(" LIKE ?").add(namespaceLikeString(namespace)); } /** diff --git a/api/src/org/labkey/api/exp/api/ExperimentService.java b/api/src/org/labkey/api/exp/api/ExperimentService.java index ce8efc452e8..100df3d2af9 100644 --- a/api/src/org/labkey/api/exp/api/ExperimentService.java +++ b/api/src/org/labkey/api/exp/api/ExperimentService.java @@ -188,8 +188,23 @@ enum DataTypeForExclusion List getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol); List getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol, @NotNull Predicate filterFn); - - List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container); + + /** + * @param filterSQL optional additional WHERE predicates; callers doing keyset pagination should include + * {@code ER.RowId > minRowId} here + * @param limit max rows to return; pass {@code Table.ALL_ROWS} (-1) for no limit + * @return up to {@code limit} ExpRuns in {@code container} matching {@code filterSQL}, ordered by RowId + */ + List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container, int limit); + + /** + * @param modifiedSince optional upper-exclusive Modified cutoff; pass {@code null} to return all batches + * @param minRowId keyset cursor — only batches with RowId > minRowId are returned; pass 0 for the first page + * @param limit max rows to return + * @return up to {@code limit} assay batches for {@code batchProtocol} in {@code container} with + * RowId > minRowId (and Modified > modifiedSince when non-null), ordered by RowId + */ + List getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, @Nullable Date modifiedSince, long minRowId, int limit); List getExpRunsForJobId(long jobId); diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index 48a2632c59a..d1f6ef82af3 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -75,6 +75,7 @@ public interface SearchService extends SearchMXBean Logger _log = LogHelper.getLogger(SearchService.class, "Full text search service"); long DEFAULT_FILE_SIZE_LIMIT = 100L; // 100 MB + int INDEXING_LIMIT = 1_000; /** * Returns the max file size indexed @@ -494,12 +495,21 @@ public String normalizeHref(Path contextPath, Container c) interface DocumentProvider { /** - * Enumerate documents for full-text search. Unless it's known there will be a small number of documents - * added to the queue, add Runnable to the IndexTask that adds the Resources from the container to the queue. - * If there are potentially many documents for a container, add resources in batches of 1,000 or so to avoid - * a huge memory footprint. + * Enumerate documents for full-text search indexing. Do NOT fetch an unbounded result set into memory. * - * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) + *

Pattern 1 — recursive requeue (preferred when the underlying table supports keyset pagination). + * Fetch at most {@link SearchService#INDEXING_LIMIT} rows, process them, then re-enqueue the next batch + * only if the batch was full. This keeps the ResultSet closed between batches and interleaves with other + * queue work. See {@code ExperimentServiceImpl.indexMaterials()} and + * {@code AssayManager.indexAssayRuns()} for examples.

+ * + *

Pattern 2 — forEachBatch + per-batch runnable (simpler when using {@code TableSelector}). + * Stream rows in batches of {@link SearchService#INDEXING_LIMIT} and wrap each batch in a + * {@code queue.addRunnable()} so indexing is deferred. See + * {@code InventoryManager.indexLocations()} and {@code NotebookManager.indexNotebooks()} + * for examples.

+ * + * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) */ void enumerateDocuments(TaskIndexingQueue adder, @Nullable Date modifiedSince); diff --git a/assay/src/org/labkey/assay/AssayManager.java b/assay/src/org/labkey/assay/AssayManager.java index 871591e59cd..4199fb734cc 100644 --- a/assay/src/org/labkey/assay/AssayManager.java +++ b/assay/src/org/labkey/assay/AssayManager.java @@ -16,6 +16,7 @@ package org.labkey.assay; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -710,33 +711,51 @@ public void indexAssayBatches(SearchService.TaskIndexingQueue queue, @Nullable D for (ExpProtocol protocol : getAssayProtocols(queue.getContainer())) { if (shouldIndexProtocolBatches(protocol)) - indexAssayBatches(queue, protocol, modifiedSince); + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, 0)); } } - private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince) + private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, + @Nullable Date modifiedSince, long minRowId) { - if (shouldIndexProtocolBatches(protocol)) - { - for (ExpExperiment batch : protocol.getBatches(queue.getContainer())) - { - if (modifiedSince == null || modifiedSince.before(batch.getModified())) - indexAssayBatch(queue, batch); - } - } + List batches = ExperimentService.get().getExpBatches( + queue.getContainer(), protocol, modifiedSince, minRowId, SearchService.INDEXING_LIMIT); + + MutableLong maxRowIdProcessed = new MutableLong(minRowId); + batches.forEach(b -> { + indexAssayBatch(queue, b); + maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), b.getRowId())); + }); + + if (batches.size() == SearchService.INDEXING_LIMIT) + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, maxRowIdProcessed.longValue())); } public void indexAssayRuns(SearchService.TaskIndexingQueue queue, @Nullable Date modifiedSince) { for (ExpProtocol protocol : getAssayProtocols(queue.getContainer())) - indexAssayRuns(queue, protocol, modifiedSince); + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, 0)); } - private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince) + private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, + @Nullable Date modifiedSince, long minRowId) { - ExperimentService.get().getExpRuns(queue.getContainer(), protocol, null, run -> - modifiedSince == null || modifiedSince.before(run.getModified()) - ).forEach(r -> indexAssayRun(queue, r)); + SQLFragment filterSQL = new SQLFragment("ER.ProtocolLSID = ? AND ER.RowId > ?") + .add(protocol.getLSID()) + .add(minRowId); + if (modifiedSince != null) + filterSQL.append(" AND ER.Modified > ?").add(modifiedSince); + + List runs = ExperimentService.get().getExpRuns(filterSQL, _ -> true, queue.getContainer(), SearchService.INDEXING_LIMIT); + + MutableLong maxRowIdProcessed = new MutableLong(minRowId); + runs.forEach(r -> { + indexAssayRun(queue, r); + maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), r.getRowId())); + }); + + if (runs.size() == SearchService.INDEXING_LIMIT) + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, maxRowIdProcessed.longValue())); } @Override diff --git a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java index bf51266dc3f..b205a65989d 100644 --- a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java @@ -564,7 +564,7 @@ public List getExpRuns(Container container, @Nullable ExpProtocol pa sql.add(childProtocol.getLSID()); } - return getExpRuns(sql, filterFn, container); + return getExpRuns(sql, filterFn, container, Table.ALL_ROWS); } @Override @@ -582,17 +582,17 @@ public boolean hasExpRuns(Container container, @NotNull Predicate filter } @Override - public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container) + public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container, int limit) { - SQLFragment sql = new SQLFragment(" SELECT ER.* " - + " FROM exp.ExperimentRun ER " - + " WHERE ER.Container = ? "); + SQLFragment sql = new SQLFragment("SELECT ER.* FROM exp.ExperimentRun ER WHERE ER.Container = ?"); sql.add(container.getId()); - if (null != filterSQL && !filterSQL.isEmpty()) - sql.append(" AND " ).append(filterSQL); - - sql.append(" ORDER BY ER.RowId "); + sql.append(" AND ").append(filterSQL); + sql.append(" ORDER BY ER.RowId"); + if (limit > 0) + { + sql = getSchema().getSqlDialect().limitRows(sql, limit); + } try (Stream runs = new SqlSelector(getSchema(), sql).setJdbcCaching(false).uncachedStream(ExperimentRun.class)) { @@ -600,6 +600,22 @@ public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Pre } } + @Override + public List getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, + @Nullable Date modifiedSince, long minRowId, int limit) + { + SQLFragment sql = new SQLFragment("SELECT E.* FROM ").append(getTinfoExperiment(), "E") + .append(" WHERE E.Container = ?").add(container.getId()) + .append(" AND E.BatchProtocolId = ?").add(batchProtocol.getRowId()) + .append(" AND E.RowId > ?").add(minRowId); + if (modifiedSince != null) + sql.append(" AND E.Modified > ?").add(modifiedSince); + sql.append(" ORDER BY E.RowId"); + sql = getSchema().getSqlDialect().limitRows(sql, limit); + + return ExpExperimentImpl.fromExperiments(new SqlSelector(getSchema(), sql).setJdbcCaching(false).getArray(Experiment.class)); + } + @Override public List getExpRunsForJobId(long jobId) { @@ -745,7 +761,7 @@ public List getExpDatas(Container container, @Nullable DataType typ { SimpleFilter filter = SimpleFilter.createContainerFilter(container); if (type != null) - filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null); + filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix())); if (name != null) filter.addCondition(FieldKey.fromParts(ExpDataTable.Column.Name.name()), name); @@ -756,7 +772,7 @@ public List getOutputDatas(long runRowId, @Nullable DataType type) { SimpleFilter filter = new SimpleFilter(FieldKey.fromParts("RunId"), runRowId); if (type != null) - filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null); + filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix())); return getExpDatas(filter); } @@ -1018,8 +1034,6 @@ public List getExpMaterialsByObjectId(ContainerFilter container return result; } - private static final int INDEXING_LIMIT = 1_000; - @Override public void enumerateDocuments(SearchService.TaskIndexingQueue queue, final Date modifiedSince) { @@ -1076,7 +1090,7 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue if (!modifiedSQL.isEmpty()) sql.append(" AND ").append(modifiedSQL); sql.append(" ORDER BY RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT); + sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); MutableLong maxRowIdProcessed = new MutableLong(minRowId); @@ -1089,7 +1103,7 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expMaterial.getRowId())); }); - if (materials.size() == INDEXING_LIMIT) + if (materials.size() == SearchService.INDEXING_LIMIT) { // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents queue.addRunnable((q) -> indexMaterials(q, modifiedSince, maxRowIdProcessed.longValue())); @@ -1114,7 +1128,7 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina sql.append(" AND ").append(modifiedSQL); sql.append(" ORDER BY RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT); + sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); MutableLong maxRowIdProcessed = new MutableLong(minRowId); @@ -1127,7 +1141,7 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expData.getRowId())); }); - if (data.size() == INDEXING_LIMIT) + if (data.size() == SearchService.INDEXING_LIMIT) { // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents queue.addRunnable((q) -> indexData(q, modifiedSince, maxRowIdProcessed.longValue())); diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 8cbd4c9ad87..a692f2f9091 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -17,6 +17,7 @@ package org.labkey.experiment.api; import org.apache.commons.collections4.ListUtils; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; import org.apache.commons.math3.util.Precision; @@ -346,11 +347,11 @@ public void indexSampleType(ExpSampleType sampleType, SearchService.TaskIndexing impl.index(q, null); } - indexSampleTypeMaterials(sampleType, q); + indexSampleTypeMaterials(sampleType, q, 0); }); } - private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue) + private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue, long minRowId) { // Index all ExpMaterial that have never been indexed OR where either the ExpSampleType definition or ExpMaterial itself has changed since last indexed SQLFragment sql = new SQLFragment("SELECT m.* FROM ") @@ -359,17 +360,28 @@ private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.Ta .append(ExperimentServiceImpl.get().getTinfoMaterialIndexed(), "mi") .append(" ON m.RowId = mi.MaterialId WHERE m.LSID NOT LIKE ").appendValue("%:" + StudyService.SPECIMEN_NAMESPACE_PREFIX + "%", getExpSchema().getSqlDialect()) .append(" AND m.cpasType = ?").add(sampleType.getLSID()) + .append(" AND m.RowId > ?").add(minRowId) .append(" AND (mi.lastIndexed IS NULL OR mi.lastIndexed < ? OR (m.modified IS NOT NULL AND mi.lastIndexed < m.modified))") .append(" ORDER BY m.RowId") // Issue 51263: order by RowId to reduce deadlock .add(sampleType.getModified()); - - new SqlSelector(getExpSchema().getScope(), sql).forEachBatch(Material.class, 1000, batch -> { - for (Material m : batch) - { - ExpMaterialImpl impl = new ExpMaterialImpl(m); - impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); - } + sql = getExpSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); + SqlSelector selector = new SqlSelector(getExpSchema().getScope(), sql); + selector.setJdbcCaching(false); + MutableLong maxRowIdProcessed = new MutableLong(minRowId); + + // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables + List materials = selector.getArrayList(Material.class); + materials.forEach(m -> { + ExpMaterialImpl impl = new ExpMaterialImpl(m); + impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); + maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), impl.getRowId())); }); + + if (materials.size() == SearchService.INDEXING_LIMIT) + { + // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents + queue.addRunnable((q) -> indexSampleTypeMaterials(sampleType, q, maxRowIdProcessed.longValue())); + } } From 337efd3091ad853dcf544763efc31b38273123eb Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Sat, 23 May 2026 14:27:38 -0700 Subject: [PATCH 2/4] Be consistent --- .../src/org/labkey/experiment/api/ExperimentServiceImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java index b205a65989d..7186bda9205 100644 --- a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java @@ -611,7 +611,10 @@ public List getExpBatches(@NotNull Container container, if (modifiedSince != null) sql.append(" AND E.Modified > ?").add(modifiedSince); sql.append(" ORDER BY E.RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, limit); + if (limit > 0) + { + sql = getSchema().getSqlDialect().limitRows(sql, limit); + } return ExpExperimentImpl.fromExperiments(new SqlSelector(getSchema(), sql).setJdbcCaching(false).getArray(Experiment.class)); } From fe6ca3d7dd1c20223e1f76150b85d924d62f6817 Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Tue, 26 May 2026 17:47:39 -0700 Subject: [PATCH 3/4] Centralize batching for indexing more --- .../org/labkey/api/search/SearchService.java | 106 ++++++++++++++---- assay/src/org/labkey/assay/AssayManager.java | 25 ++--- .../experiment/api/ExperimentServiceImpl.java | 30 ++--- .../experiment/api/SampleTypeServiceImpl.java | 13 +-- 4 files changed, 108 insertions(+), 66 deletions(-) diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index d1f6ef82af3..3c4c1485b70 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -21,12 +21,15 @@ import org.jetbrains.annotations.Nullable; import org.json.JSONObject; import org.labkey.api.data.ColumnInfo; +import org.labkey.api.data.CompareType; import org.labkey.api.data.Container; import org.labkey.api.data.ContainerManager; import org.labkey.api.data.DbSchema; import org.labkey.api.data.SQLFragment; import org.labkey.api.data.SimpleFilter; +import org.labkey.api.data.Sort; import org.labkey.api.data.TableInfo; +import org.labkey.api.data.TableSelector; import org.labkey.api.data.dialect.SqlDialect; import org.labkey.api.mbean.SearchMXBean; import org.labkey.api.query.ExprColumn; @@ -67,6 +70,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.ToLongFunction; public interface SearchService extends SearchMXBean { @@ -240,7 +244,7 @@ interface TaskIndexingQueue // // plug in interfaces // - + interface ResourceResolver { default WebdavResource resolve(@NotNull String resourceIdentifier) { return null; } @@ -289,7 +293,7 @@ public String getName() { return _name; } - + public String getDescription() { return _description; @@ -327,7 +331,7 @@ public boolean isShowInAdvancedSearch() // search // - + class SearchResult { public long totalHits; @@ -426,7 +430,7 @@ public String normalizeHref(Path contextPath, Container c) @Nullable SearchHit find(String docId) throws IOException; String escapeTerm(String term); - + List getSearchCategories(); // @@ -465,7 +469,7 @@ public String normalizeHref(Path contextPath, Container c) void waitForIdle() throws InterruptedException; - + /** default implementation saving lastIndexed */ void setLastIndexedForPath(Path path, long indexed, long modified); @@ -476,9 +480,9 @@ public String normalizeHref(Path contextPath, Container c) void maintenance(); // - // configuration, plugins + // configuration, plugins // - + void addSearchCategory(SearchCategory category); List getAllCategories(); List getCategories(String categories); @@ -497,17 +501,12 @@ interface DocumentProvider /** * Enumerate documents for full-text search indexing. Do NOT fetch an unbounded result set into memory. * - *

Pattern 1 — recursive requeue (preferred when the underlying table supports keyset pagination). - * Fetch at most {@link SearchService#INDEXING_LIMIT} rows, process them, then re-enqueue the next batch - * only if the batch was full. This keeps the ResultSet closed between batches and interleaves with other - * queue work. See {@code ExperimentServiceImpl.indexMaterials()} and - * {@code AssayManager.indexAssayRuns()} for examples.

- * - *

Pattern 2 — forEachBatch + per-batch runnable (simpler when using {@code TableSelector}). - * Stream rows in batches of {@link SearchService#INDEXING_LIMIT} and wrap each batch in a - * {@code queue.addRunnable()} so indexing is deferred. See - * {@code InventoryManager.indexLocations()} and {@code NotebookManager.indexNotebooks()} - * for examples.

+ *

Use the recursive-requeue pattern: create an {@link IndexBatchCursor} from {@code minRowId = 0}, + * call {@link IndexBatchCursor#createSelector} (for {@code TableSelector}-based callers) or build a + * raw SQL query with {@code RowId > minRowId ORDER BY RowId LIMIT} {@link SearchService#INDEXING_LIMIT}, + * process the batch with {@link IndexBatchCursor#forEach}, then call {@link IndexBatchCursor#wasFull()} + * and requeue only if it returns {@code true}. This keeps the ResultSet closed between batches and + * interleaves indexing with other queue work.

* * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) */ @@ -550,18 +549,81 @@ interface DocumentParser boolean detect(WebdavResource resource, String contentType, byte[] buf) throws IOException; void parse(InputStream stream, ContentHandler handler) throws IOException, SAXException; } - + + + /** + * Keyset-pagination cursor for the recursive-requeue batch-indexing pattern. + * Tracks position (max RowId seen) and batch fullness across one round of indexing. + * Intended for use in {@link DocumentProvider#enumerateDocuments} implementations. + */ + class IndexBatchCursor + { + private long _maxRowId; + private int _count; + + public IndexBatchCursor(long minRowId) + { + _maxRowId = minRowId; + } + + /** Records {@code rowId} as processed. Throws if results are not strictly ascending by RowId. */ + public void advance(long rowId) + { + if (rowId <= _maxRowId) + throw new IllegalStateException("Expected results strictly ordered by RowId but got " + rowId + " after " + _maxRowId); + _maxRowId = rowId; + _count++; + } + + /** Returns the maximum RowId seen so far, suitable for passing as {@code minRowId} to the next batch. */ + public long getMaxRowId() + { + return _maxRowId; + } + + /** + * Adds {@code RowId > current max} to {@code filter} and returns a {@link TableSelector} ordered by RowId + * and limited to {@link #INDEXING_LIMIT} rows. Callers should pass the result to {@link #forEach} and + * then call {@link #wasFull} to decide whether to requeue. + */ + public TableSelector createSelector(TableInfo tableInfo, SimpleFilter filter) + { + filter.addCondition(FieldKey.fromParts("RowId"), _maxRowId, CompareType.GT); + TableSelector selector = new TableSelector(tableInfo, filter, new Sort("RowId")); + selector.setMaxRows(INDEXING_LIMIT); + return selector; + } + + /** + * Iterates {@code batch}, invoking {@code action} on each item and then calling {@link #advance} + * with the item's RowId. {@code advance} is called unconditionally after each action so the + * count stays accurate even when the action swallows errors internally. + */ + public void forEach(List batch, ToLongFunction rowIdOf, Consumer action) + { + batch.forEach(item -> { + action.accept(item); + advance(rowIdOf.applyAsLong(item)); + }); + } + + /** Returns {@code true} if the batch was full, meaning more rows may remain and a requeue is needed. */ + public boolean wasFull() + { + return _count == INDEXING_LIMIT; + } + } // an interface that enumerates documents in a container (not recursive) void addDocumentProvider(DocumentProvider provider); void addDocumentParser(DocumentParser parser); - + // // helpers // - + /** * filter for documents modified since the provided date @@ -602,7 +664,7 @@ public LastIndexedClause(TableInfo info, java.util.Date modifiedSince, String mo // Incremental if modifiedSince is set and is more recent than 1967-10-04 boolean incremental = modifiedSince != null && modifiedSince.compareTo(oldDate) > 0; - + // no filter if (!incremental) return; diff --git a/assay/src/org/labkey/assay/AssayManager.java b/assay/src/org/labkey/assay/AssayManager.java index 4199fb734cc..bd625971d01 100644 --- a/assay/src/org/labkey/assay/AssayManager.java +++ b/assay/src/org/labkey/assay/AssayManager.java @@ -16,7 +16,6 @@ package org.labkey.assay; -import org.apache.commons.lang3.mutable.MutableLong; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -721,14 +720,11 @@ private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtoco List batches = ExperimentService.get().getExpBatches( queue.getContainer(), protocol, modifiedSince, minRowId, SearchService.INDEXING_LIMIT); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); - batches.forEach(b -> { - indexAssayBatch(queue, b); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), b.getRowId())); - }); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); + tracker.forEach(batches, ExpExperiment::getRowId, b -> indexAssayBatch(queue, b)); - if (batches.size() == SearchService.INDEXING_LIMIT) - queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, maxRowIdProcessed.longValue())); + if (tracker.wasFull()) + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, tracker.getMaxRowId())); } public void indexAssayRuns(SearchService.TaskIndexingQueue queue, @Nullable Date modifiedSince) @@ -748,14 +744,11 @@ private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol p List runs = ExperimentService.get().getExpRuns(filterSQL, _ -> true, queue.getContainer(), SearchService.INDEXING_LIMIT); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); - runs.forEach(r -> { - indexAssayRun(queue, r); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), r.getRowId())); - }); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); + tracker.forEach(runs, ExpRun::getRowId, r -> indexAssayRun(queue, r)); - if (runs.size() == SearchService.INDEXING_LIMIT) - queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, maxRowIdProcessed.longValue())); + if (tracker.wasFull()) + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, tracker.getMaxRowId())); } @Override @@ -764,7 +757,7 @@ public void indexAssayRun(SearchService.TaskIndexingQueue queue, long expRunRowI ExpRun expRun = ExperimentService.get().getExpRun(expRunRowId); if (expRun == null) return; - + if (shouldIndexRun(expRun)) indexAssayRun(queue, ExperimentService.get().getExpRun(expRunRowId)); } diff --git a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java index 7186bda9205..faa60ed1f61 100644 --- a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java @@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.commons.lang3.mutable.MutableLong; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.fhcrc.cpas.exp.xml.SimpleTypeNames; @@ -1096,21 +1096,17 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables - List materials = selector.getArrayList(Material.class); - materials.forEach(m -> { + tracker.forEach(selector.getArrayList(Material.class), Material::getRowId, m -> { ExpMaterialImpl expMaterial = new ExpMaterialImpl(m); expMaterial.index(queue, null); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expMaterial.getRowId())); }); - if (materials.size() == SearchService.INDEXING_LIMIT) - { + if (tracker.wasFull()) // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents - queue.addRunnable((q) -> indexMaterials(q, modifiedSince, maxRowIdProcessed.longValue())); - } + queue.addRunnable((q) -> indexMaterials(q, modifiedSince, tracker.getMaxRowId())); } public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, final Date modifiedSince, long minRowId) @@ -1134,21 +1130,17 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables - List data = selector.getArrayList(Data.class); - data.forEach(d -> { + tracker.forEach(selector.getArrayList(Data.class), Data::getRowId, d -> { ExpDataImpl expData = new ExpDataImpl(d); expData.index(queue, null); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expData.getRowId())); }); - if (data.size() == SearchService.INDEXING_LIMIT) - { + if (tracker.wasFull()) // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents - queue.addRunnable((q) -> indexData(q, modifiedSince, maxRowIdProcessed.longValue())); - } + queue.addRunnable((q) -> indexData(q, modifiedSince, tracker.getMaxRowId())); } public List getIndexableDataClasses(Container container, @Nullable Date modifiedSince) @@ -7567,13 +7559,13 @@ else if (_aliquotRootCache.containsKey(parent.getLSID())) _aliquotRootCache.put(outputAliquot.getLSID(), rootMaterialRowId); // add self's root to cache sql.addAll(rec._protApp.getRowId(), rec._protApp._object.getRunId(), rootMaterialRowId, parent.getLSID(), outputAliquot.getRowId()); - + new SqlExecutor(tableInfo.getSchema()).execute(sql); } } } } - + private void saveExpMaterialOutputs(List protAppRecords) { for (ProtocolAppRecord rec : protAppRecords) diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index a692f2f9091..5a7519ee49f 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -17,7 +17,6 @@ package org.labkey.experiment.api; import org.apache.commons.collections4.ListUtils; -import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; import org.apache.commons.math3.util.Precision; @@ -367,21 +366,17 @@ private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.Ta sql = getExpSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getExpSchema().getScope(), sql); selector.setJdbcCaching(false); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables - List materials = selector.getArrayList(Material.class); - materials.forEach(m -> { + tracker.forEach(selector.getArrayList(Material.class), Material::getRowId, m -> { ExpMaterialImpl impl = new ExpMaterialImpl(m); impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), impl.getRowId())); }); - if (materials.size() == SearchService.INDEXING_LIMIT) - { + if (tracker.wasFull()) // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents - queue.addRunnable((q) -> indexSampleTypeMaterials(sampleType, q, maxRowIdProcessed.longValue())); - } + queue.addRunnable((q) -> indexSampleTypeMaterials(sampleType, q, tracker.getMaxRowId())); } From 785520ad80429284ee1ffe46fa5812472dfc023a Mon Sep 17 00:00:00 2001 From: labkey-jeckels Date: Tue, 26 May 2026 21:15:47 -0700 Subject: [PATCH 4/4] Suggested updates --- api/src/org/labkey/api/search/SearchService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index 3c4c1485b70..fc9d4c0767e 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -153,7 +153,7 @@ enum PRIORITY /** Intended for content that needed indexing or reindexing because it was modified or created */ modified, /** Highest priority. Used for removing documents from the index, which is generally faster than adding */ - delete; + delete } @@ -602,8 +602,8 @@ public TableSelector createSelector(TableInfo tableInfo, SimpleFilter filter) public void forEach(List batch, ToLongFunction rowIdOf, Consumer action) { batch.forEach(item -> { - action.accept(item); advance(rowIdOf.applyAsLong(item)); + action.accept(item); }); }