diff --git a/api/src/org/labkey/api/exp/Lsid.java b/api/src/org/labkey/api/exp/Lsid.java index 0cfa516eba4..2ddfee94a31 100644 --- a/api/src/org/labkey/api/exp/Lsid.java +++ b/api/src/org/labkey/api/exp/Lsid.java @@ -308,9 +308,9 @@ static public String namespaceLikeString(String namespace) return "urn:lsid:%:" + namespace + ".%:%"; } - static public String namespaceFilter(String columnName, String namespace) + static public SQLFragment namespaceFilter(Enum column, String namespace) { - return columnName + " LIKE '" + namespaceLikeString(namespace) + "'"; + return new SQLFragment().appendIdentifier(column.name()).append(" LIKE ?").add(namespaceLikeString(namespace)); } /** diff --git a/api/src/org/labkey/api/exp/api/ExperimentService.java b/api/src/org/labkey/api/exp/api/ExperimentService.java index ce8efc452e8..100df3d2af9 100644 --- a/api/src/org/labkey/api/exp/api/ExperimentService.java +++ b/api/src/org/labkey/api/exp/api/ExperimentService.java @@ -188,8 +188,23 @@ enum DataTypeForExclusion List getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol); List getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol, @NotNull Predicate filterFn); - - List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container); + + /** + * @param filterSQL optional additional WHERE predicates; callers doing keyset pagination should include + * {@code ER.RowId > minRowId} here + * @param limit max rows to return; pass {@code Table.ALL_ROWS} (-1) for no limit + * @return up to {@code limit} ExpRuns in {@code container} matching {@code filterSQL}, ordered by RowId + */ + List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container, int limit); + + /** + * @param modifiedSince optional upper-exclusive Modified cutoff; pass {@code null} to return all batches + * @param minRowId keyset cursor — only batches with RowId > minRowId are returned; pass 0 for the first page + * @param limit max rows to return + * @return up to {@code limit} assay batches for {@code batchProtocol} in {@code container} with + * RowId > minRowId (and Modified > modifiedSince when non-null), ordered by RowId + */ + List getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, @Nullable Date modifiedSince, long minRowId, int limit); List getExpRunsForJobId(long jobId); diff --git a/api/src/org/labkey/api/search/SearchService.java b/api/src/org/labkey/api/search/SearchService.java index 48a2632c59a..3c4c1485b70 100644 --- a/api/src/org/labkey/api/search/SearchService.java +++ b/api/src/org/labkey/api/search/SearchService.java @@ -21,12 +21,15 @@ import org.jetbrains.annotations.Nullable; import org.json.JSONObject; import org.labkey.api.data.ColumnInfo; +import org.labkey.api.data.CompareType; import org.labkey.api.data.Container; import org.labkey.api.data.ContainerManager; import org.labkey.api.data.DbSchema; import org.labkey.api.data.SQLFragment; import org.labkey.api.data.SimpleFilter; +import org.labkey.api.data.Sort; import org.labkey.api.data.TableInfo; +import org.labkey.api.data.TableSelector; import org.labkey.api.data.dialect.SqlDialect; import org.labkey.api.mbean.SearchMXBean; import org.labkey.api.query.ExprColumn; @@ -67,6 +70,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.ToLongFunction; public interface SearchService extends SearchMXBean { @@ -75,6 +79,7 @@ public interface SearchService extends SearchMXBean Logger _log = LogHelper.getLogger(SearchService.class, "Full text search service"); long DEFAULT_FILE_SIZE_LIMIT = 100L; // 100 MB + int INDEXING_LIMIT = 1_000; /** * Returns the max file size indexed @@ -239,7 +244,7 @@ interface TaskIndexingQueue // // plug in interfaces // - + interface ResourceResolver { default WebdavResource resolve(@NotNull String resourceIdentifier) { return null; } @@ -288,7 +293,7 @@ public String getName() { return _name; } - + public String getDescription() { return _description; @@ -326,7 +331,7 @@ public boolean isShowInAdvancedSearch() // search // - + class SearchResult { public long totalHits; @@ -425,7 +430,7 @@ public String normalizeHref(Path contextPath, Container c) @Nullable SearchHit find(String docId) throws IOException; String escapeTerm(String term); - + List getSearchCategories(); // @@ -464,7 +469,7 @@ public String normalizeHref(Path contextPath, Container c) void waitForIdle() throws InterruptedException; - + /** default implementation saving lastIndexed */ void setLastIndexedForPath(Path path, long indexed, long modified); @@ -475,9 +480,9 @@ public String normalizeHref(Path contextPath, Container c) void maintenance(); // - // configuration, plugins + // configuration, plugins // - + void addSearchCategory(SearchCategory category); List getAllCategories(); List getCategories(String categories); @@ -494,12 +499,16 @@ public String normalizeHref(Path contextPath, Container c) interface DocumentProvider { /** - * Enumerate documents for full-text search. Unless it's known there will be a small number of documents - * added to the queue, add Runnable to the IndexTask that adds the Resources from the container to the queue. - * If there are potentially many documents for a container, add resources in batches of 1,000 or so to avoid - * a huge memory footprint. + * Enumerate documents for full-text search indexing. Do NOT fetch an unbounded result set into memory. + * + *

Use the recursive-requeue pattern: create an {@link IndexBatchCursor} from {@code minRowId = 0}, + * call {@link IndexBatchCursor#createSelector} (for {@code TableSelector}-based callers) or build a + * raw SQL query with {@code RowId > minRowId ORDER BY RowId LIMIT} {@link SearchService#INDEXING_LIMIT}, + * process the batch with {@link IndexBatchCursor#forEach}, then call {@link IndexBatchCursor#wasFull()} + * and requeue only if it returns {@code true}. This keeps the ResultSet closed between batches and + * interleaves indexing with other queue work.

* - * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) + * @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed) */ void enumerateDocuments(TaskIndexingQueue adder, @Nullable Date modifiedSince); @@ -540,18 +549,81 @@ interface DocumentParser boolean detect(WebdavResource resource, String contentType, byte[] buf) throws IOException; void parse(InputStream stream, ContentHandler handler) throws IOException, SAXException; } - + + + /** + * Keyset-pagination cursor for the recursive-requeue batch-indexing pattern. + * Tracks position (max RowId seen) and batch fullness across one round of indexing. + * Intended for use in {@link DocumentProvider#enumerateDocuments} implementations. + */ + class IndexBatchCursor + { + private long _maxRowId; + private int _count; + + public IndexBatchCursor(long minRowId) + { + _maxRowId = minRowId; + } + + /** Records {@code rowId} as processed. Throws if results are not strictly ascending by RowId. */ + public void advance(long rowId) + { + if (rowId <= _maxRowId) + throw new IllegalStateException("Expected results strictly ordered by RowId but got " + rowId + " after " + _maxRowId); + _maxRowId = rowId; + _count++; + } + + /** Returns the maximum RowId seen so far, suitable for passing as {@code minRowId} to the next batch. */ + public long getMaxRowId() + { + return _maxRowId; + } + + /** + * Adds {@code RowId > current max} to {@code filter} and returns a {@link TableSelector} ordered by RowId + * and limited to {@link #INDEXING_LIMIT} rows. Callers should pass the result to {@link #forEach} and + * then call {@link #wasFull} to decide whether to requeue. + */ + public TableSelector createSelector(TableInfo tableInfo, SimpleFilter filter) + { + filter.addCondition(FieldKey.fromParts("RowId"), _maxRowId, CompareType.GT); + TableSelector selector = new TableSelector(tableInfo, filter, new Sort("RowId")); + selector.setMaxRows(INDEXING_LIMIT); + return selector; + } + + /** + * Iterates {@code batch}, invoking {@code action} on each item and then calling {@link #advance} + * with the item's RowId. {@code advance} is called unconditionally after each action so the + * count stays accurate even when the action swallows errors internally. + */ + public void forEach(List batch, ToLongFunction rowIdOf, Consumer action) + { + batch.forEach(item -> { + action.accept(item); + advance(rowIdOf.applyAsLong(item)); + }); + } + + /** Returns {@code true} if the batch was full, meaning more rows may remain and a requeue is needed. */ + public boolean wasFull() + { + return _count == INDEXING_LIMIT; + } + } // an interface that enumerates documents in a container (not recursive) void addDocumentProvider(DocumentProvider provider); void addDocumentParser(DocumentParser parser); - + // // helpers // - + /** * filter for documents modified since the provided date @@ -592,7 +664,7 @@ public LastIndexedClause(TableInfo info, java.util.Date modifiedSince, String mo // Incremental if modifiedSince is set and is more recent than 1967-10-04 boolean incremental = modifiedSince != null && modifiedSince.compareTo(oldDate) > 0; - + // no filter if (!incremental) return; diff --git a/assay/src/org/labkey/assay/AssayManager.java b/assay/src/org/labkey/assay/AssayManager.java index 871591e59cd..bd625971d01 100644 --- a/assay/src/org/labkey/assay/AssayManager.java +++ b/assay/src/org/labkey/assay/AssayManager.java @@ -710,33 +710,45 @@ public void indexAssayBatches(SearchService.TaskIndexingQueue queue, @Nullable D for (ExpProtocol protocol : getAssayProtocols(queue.getContainer())) { if (shouldIndexProtocolBatches(protocol)) - indexAssayBatches(queue, protocol, modifiedSince); + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, 0)); } } - private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince) + private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, + @Nullable Date modifiedSince, long minRowId) { - if (shouldIndexProtocolBatches(protocol)) - { - for (ExpExperiment batch : protocol.getBatches(queue.getContainer())) - { - if (modifiedSince == null || modifiedSince.before(batch.getModified())) - indexAssayBatch(queue, batch); - } - } + List batches = ExperimentService.get().getExpBatches( + queue.getContainer(), protocol, modifiedSince, minRowId, SearchService.INDEXING_LIMIT); + + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); + tracker.forEach(batches, ExpExperiment::getRowId, b -> indexAssayBatch(queue, b)); + + if (tracker.wasFull()) + queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, tracker.getMaxRowId())); } public void indexAssayRuns(SearchService.TaskIndexingQueue queue, @Nullable Date modifiedSince) { for (ExpProtocol protocol : getAssayProtocols(queue.getContainer())) - indexAssayRuns(queue, protocol, modifiedSince); + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, 0)); } - private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince) + private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, + @Nullable Date modifiedSince, long minRowId) { - ExperimentService.get().getExpRuns(queue.getContainer(), protocol, null, run -> - modifiedSince == null || modifiedSince.before(run.getModified()) - ).forEach(r -> indexAssayRun(queue, r)); + SQLFragment filterSQL = new SQLFragment("ER.ProtocolLSID = ? AND ER.RowId > ?") + .add(protocol.getLSID()) + .add(minRowId); + if (modifiedSince != null) + filterSQL.append(" AND ER.Modified > ?").add(modifiedSince); + + List runs = ExperimentService.get().getExpRuns(filterSQL, _ -> true, queue.getContainer(), SearchService.INDEXING_LIMIT); + + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); + tracker.forEach(runs, ExpRun::getRowId, r -> indexAssayRun(queue, r)); + + if (tracker.wasFull()) + queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, tracker.getMaxRowId())); } @Override @@ -745,7 +757,7 @@ public void indexAssayRun(SearchService.TaskIndexingQueue queue, long expRunRowI ExpRun expRun = ExperimentService.get().getExpRun(expRunRowId); if (expRun == null) return; - + if (shouldIndexRun(expRun)) indexAssayRun(queue, ExperimentService.get().getExpRun(expRunRowId)); } diff --git a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java index bf51266dc3f..faa60ed1f61 100644 --- a/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java @@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.commons.lang3.mutable.MutableLong; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.fhcrc.cpas.exp.xml.SimpleTypeNames; @@ -564,7 +564,7 @@ public List getExpRuns(Container container, @Nullable ExpProtocol pa sql.add(childProtocol.getLSID()); } - return getExpRuns(sql, filterFn, container); + return getExpRuns(sql, filterFn, container, Table.ALL_ROWS); } @Override @@ -582,17 +582,17 @@ public boolean hasExpRuns(Container container, @NotNull Predicate filter } @Override - public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container) + public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate filterFn, @NotNull Container container, int limit) { - SQLFragment sql = new SQLFragment(" SELECT ER.* " - + " FROM exp.ExperimentRun ER " - + " WHERE ER.Container = ? "); + SQLFragment sql = new SQLFragment("SELECT ER.* FROM exp.ExperimentRun ER WHERE ER.Container = ?"); sql.add(container.getId()); - if (null != filterSQL && !filterSQL.isEmpty()) - sql.append(" AND " ).append(filterSQL); - - sql.append(" ORDER BY ER.RowId "); + sql.append(" AND ").append(filterSQL); + sql.append(" ORDER BY ER.RowId"); + if (limit > 0) + { + sql = getSchema().getSqlDialect().limitRows(sql, limit); + } try (Stream runs = new SqlSelector(getSchema(), sql).setJdbcCaching(false).uncachedStream(ExperimentRun.class)) { @@ -600,6 +600,25 @@ public List getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Pre } } + @Override + public List getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, + @Nullable Date modifiedSince, long minRowId, int limit) + { + SQLFragment sql = new SQLFragment("SELECT E.* FROM ").append(getTinfoExperiment(), "E") + .append(" WHERE E.Container = ?").add(container.getId()) + .append(" AND E.BatchProtocolId = ?").add(batchProtocol.getRowId()) + .append(" AND E.RowId > ?").add(minRowId); + if (modifiedSince != null) + sql.append(" AND E.Modified > ?").add(modifiedSince); + sql.append(" ORDER BY E.RowId"); + if (limit > 0) + { + sql = getSchema().getSqlDialect().limitRows(sql, limit); + } + + return ExpExperimentImpl.fromExperiments(new SqlSelector(getSchema(), sql).setJdbcCaching(false).getArray(Experiment.class)); + } + @Override public List getExpRunsForJobId(long jobId) { @@ -745,7 +764,7 @@ public List getExpDatas(Container container, @Nullable DataType typ { SimpleFilter filter = SimpleFilter.createContainerFilter(container); if (type != null) - filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null); + filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix())); if (name != null) filter.addCondition(FieldKey.fromParts(ExpDataTable.Column.Name.name()), name); @@ -756,7 +775,7 @@ public List getOutputDatas(long runRowId, @Nullable DataType type) { SimpleFilter filter = new SimpleFilter(FieldKey.fromParts("RunId"), runRowId); if (type != null) - filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null); + filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix())); return getExpDatas(filter); } @@ -1018,8 +1037,6 @@ public List getExpMaterialsByObjectId(ContainerFilter container return result; } - private static final int INDEXING_LIMIT = 1_000; - @Override public void enumerateDocuments(SearchService.TaskIndexingQueue queue, final Date modifiedSince) { @@ -1076,24 +1093,20 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue if (!modifiedSQL.isEmpty()) sql.append(" AND ").append(modifiedSQL); sql.append(" ORDER BY RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT); + sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables - List materials = selector.getArrayList(Material.class); - materials.forEach(m -> { + tracker.forEach(selector.getArrayList(Material.class), Material::getRowId, m -> { ExpMaterialImpl expMaterial = new ExpMaterialImpl(m); expMaterial.index(queue, null); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expMaterial.getRowId())); }); - if (materials.size() == INDEXING_LIMIT) - { + if (tracker.wasFull()) // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents - queue.addRunnable((q) -> indexMaterials(q, modifiedSince, maxRowIdProcessed.longValue())); - } + queue.addRunnable((q) -> indexMaterials(q, modifiedSince, tracker.getMaxRowId())); } public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, final Date modifiedSince, long minRowId) @@ -1114,24 +1127,20 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina sql.append(" AND ").append(modifiedSQL); sql.append(" ORDER BY RowId"); - sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT); + sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); SqlSelector selector = new SqlSelector(getSchema(), sql); selector.setJdbcCaching(false); - MutableLong maxRowIdProcessed = new MutableLong(minRowId); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables - List data = selector.getArrayList(Data.class); - data.forEach(d -> { + tracker.forEach(selector.getArrayList(Data.class), Data::getRowId, d -> { ExpDataImpl expData = new ExpDataImpl(d); expData.index(queue, null); - maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expData.getRowId())); }); - if (data.size() == INDEXING_LIMIT) - { + if (tracker.wasFull()) // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents - queue.addRunnable((q) -> indexData(q, modifiedSince, maxRowIdProcessed.longValue())); - } + queue.addRunnable((q) -> indexData(q, modifiedSince, tracker.getMaxRowId())); } public List getIndexableDataClasses(Container container, @Nullable Date modifiedSince) @@ -7550,13 +7559,13 @@ else if (_aliquotRootCache.containsKey(parent.getLSID())) _aliquotRootCache.put(outputAliquot.getLSID(), rootMaterialRowId); // add self's root to cache sql.addAll(rec._protApp.getRowId(), rec._protApp._object.getRunId(), rootMaterialRowId, parent.getLSID(), outputAliquot.getRowId()); - + new SqlExecutor(tableInfo.getSchema()).execute(sql); } } } } - + private void saveExpMaterialOutputs(List protAppRecords) { for (ProtocolAppRecord rec : protAppRecords) diff --git a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java index 8cbd4c9ad87..5a7519ee49f 100644 --- a/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java +++ b/experiment/src/org/labkey/experiment/api/SampleTypeServiceImpl.java @@ -346,11 +346,11 @@ public void indexSampleType(ExpSampleType sampleType, SearchService.TaskIndexing impl.index(q, null); } - indexSampleTypeMaterials(sampleType, q); + indexSampleTypeMaterials(sampleType, q, 0); }); } - private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue) + private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue, long minRowId) { // Index all ExpMaterial that have never been indexed OR where either the ExpSampleType definition or ExpMaterial itself has changed since last indexed SQLFragment sql = new SQLFragment("SELECT m.* FROM ") @@ -359,17 +359,24 @@ private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.Ta .append(ExperimentServiceImpl.get().getTinfoMaterialIndexed(), "mi") .append(" ON m.RowId = mi.MaterialId WHERE m.LSID NOT LIKE ").appendValue("%:" + StudyService.SPECIMEN_NAMESPACE_PREFIX + "%", getExpSchema().getSqlDialect()) .append(" AND m.cpasType = ?").add(sampleType.getLSID()) + .append(" AND m.RowId > ?").add(minRowId) .append(" AND (mi.lastIndexed IS NULL OR mi.lastIndexed < ? OR (m.modified IS NOT NULL AND mi.lastIndexed < m.modified))") .append(" ORDER BY m.RowId") // Issue 51263: order by RowId to reduce deadlock .add(sampleType.getModified()); - - new SqlSelector(getExpSchema().getScope(), sql).forEachBatch(Material.class, 1000, batch -> { - for (Material m : batch) - { - ExpMaterialImpl impl = new ExpMaterialImpl(m); - impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); - } + sql = getExpSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT); + SqlSelector selector = new SqlSelector(getExpSchema().getScope(), sql); + selector.setJdbcCaching(false); + SearchService.IndexBatchCursor tracker = new SearchService.IndexBatchCursor(minRowId); + + // Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables + tracker.forEach(selector.getArrayList(Material.class), Material::getRowId, m -> { + ExpMaterialImpl impl = new ExpMaterialImpl(m); + impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/); }); + + if (tracker.wasFull()) + // Requeue for the next batch. This avoids overwhelming the indexer's queue with documents + queue.addRunnable((q) -> indexSampleTypeMaterials(sampleType, q, tracker.getMaxRowId())); }