diff --git a/docs/examples/readme.md b/docs/examples/readme.md index a8c25161..ffb4dca0 100644 --- a/docs/examples/readme.md +++ b/docs/examples/readme.md @@ -1,6 +1,6 @@ # Example configuration and sample outputs -Broader manuals (MS-GF+ CLI, BuildSA, MzID→TSV, changelog, …) are Markdown pages one level up: [`../readme.md`](../readme.md). +Broader manuals (MS-GF+ CLI, BuildSA, changelog, …) are Markdown pages one level up: [`../readme.md`](../readme.md). This directory holds **small text examples** shipped with the repository: @@ -12,7 +12,6 @@ This directory holds **small text examples** shipped with the repository: | `protocols.txt` | Example custom protocols for `params/protocols.txt`. | | `Mods.txt` | Example modification file. | | `pxd001819_example.pin` | **Sample Percolator `.pin` output** from a PXD001819 (yeast + UPS1 on LTQ Orbitrap Velos) search. Header + 20 target PSMs + 10 decoy PSMs, chosen for peptide-sequence diversity so every column is represented. Use this to inspect the `.pin` schema without running a full search. Full column reference in [`../output.md`](../output.md). | -| `test.mzid` | Tiny mzIdentML sample used in static documentation. | | `test.tsv`, `test_Unrolled.tsv` | Example TSV exports for documentation. | **Not stored here:** tutorial spreadsheets, plots, or bundled FASTA/index files. Those are either removed as non-essential bloat or live under `src/test/resources/` for automated tests. diff --git a/docs/examples/test.mzid b/docs/examples/test.mzid deleted file mode 100644 index bf654b45..00000000 --- a/docs/examples/test.mzid +++ /dev/null @@ -1,153 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - IGAYLFVDMAHVAGLIAAGVYPNPVPHAHVVTSTTHK - - - NLANPTSVILASIQMLEYLGMADK - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/pom.xml b/pom.xml index 1b04afc5..935a2624 100644 --- a/pom.xml +++ b/pom.xml @@ -106,17 +106,6 @@ - - uk.ac.ebi.jmzidml - jmzidentml - 1.2.11 - - - com.sun.xml.bind - jaxb-xjc - - - junit junit @@ -124,24 +113,13 @@ test jar - - javax.xml.bind - jaxb-api - 2.3.1 - - - org.glassfish.jaxb - jaxb-runtime - 2.3.2 - runtime - org.apache.commons commons-text 1.11.0 - + it.unimi.dsi fastutil @@ -175,12 +153,6 @@ The internal repository file:${project.basedir}/repo - - - - sammoa-group - https://nexus.nuiton.org/nexus/content/groups/sammoa-group/ - Center for Computational Mass Spectrometry, University of California, San Diego diff --git a/src/main/java/edu/ucsd/msjava/msdbsearch/CompactFastaSequence.java b/src/main/java/edu/ucsd/msjava/msdbsearch/CompactFastaSequence.java index 774d4f2c..886644b5 100644 --- a/src/main/java/edu/ucsd/msjava/msdbsearch/CompactFastaSequence.java +++ b/src/main/java/edu/ucsd/msjava/msdbsearch/CompactFastaSequence.java @@ -540,7 +540,9 @@ private FileSignature readSequence() { long lastModified = in.readLong(); sequence = new byte[size]; - in.read(sequence); + // readFully: plain in.read() may return short on large .cseq files, + // silently corrupting the in-memory sequence. + in.readFully(sequence); in.close(); return new FileSignature(formatId, id, lastModified); diff --git a/src/main/java/edu/ucsd/msjava/msdbsearch/CompactSuffixArray.java b/src/main/java/edu/ucsd/msjava/msdbsearch/CompactSuffixArray.java index f43e18b0..033b443b 100644 --- a/src/main/java/edu/ucsd/msjava/msdbsearch/CompactSuffixArray.java +++ b/src/main/java/edu/ucsd/msjava/msdbsearch/CompactSuffixArray.java @@ -3,14 +3,23 @@ import edu.ucsd.msjava.msutil.AminoAcid; import edu.ucsd.msjava.msutil.AminoAcidSet; import edu.ucsd.msjava.sequences.Constants; +import edu.ucsd.msjava.suffixarray.ByteSequence; import edu.ucsd.msjava.suffixarray.SuffixFactory; +import it.unimi.dsi.fastutil.ints.IntArrays; import java.io.*; +import java.nio.file.Files; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.Locale; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * SuffixArray class for fast exact matching. @@ -266,58 +275,22 @@ private int checkID() { return 0; } + /** Sysprop overriding the number of threads used during the sort+LCP phase. */ + static final String SA_BUILD_THREADS_PROPERTY = "msgfplus.buildsa.threads"; + + /** Cap on default thread count: higher values give diminishing returns and thrash IO. */ + private static final int MAX_DEFAULT_SA_BUILD_THREADS = 8; + /** - * Helper method that creates the suffixFile. - * - * @param sequence the Adapter object that represents the database (text). - * @param indexFile newly created index file. - * @param nlcpFile newly created nlcp file. + * Build the suffix-array index files. Two-phase radix-then-sort: each suffix + * is hashed by its first {@link #BUCKET_SIZE} residues into a bucket, then + * sorted lexicographically from offset {@code BUCKET_SIZE} onward. The + * sort+LCP phase is parallelised across contiguous bucket-id ranges; the + * write step is single-threaded to preserve on-disk ordering. */ private void createSuffixArrayFiles(CompactFastaSequence sequence, File indexFile, File nlcpFile) { System.out.println("Creating the suffix array indexed file... Size: " + sequence.getSize()); - // helper local class - class Bucket { - - private int[] items; - private int size; - - /** - * Constructor. - */ - public Bucket() { - this.items = new int[10]; - this.size = 0; - } - - /** - * Add item to the bucket. - * @param item the item to add. - */ - public void add(int item) { - if (this.size >= items.length) { - // Double the capacity of this.items - this.items = Arrays.copyOf(this.items, this.size * 2); -// if(this.size > 100) -// System.out.println(item+":"+this.size); - } - this.items[this.size++] = item; - } - - /** - * Get a sorted version of this bucket. - * @return - */ - public SuffixFactory.Suffix[] getSortedSuffixes() { - SuffixFactory.Suffix[] sa = new SuffixFactory.Suffix[this.size]; - for (int i = 0; i < this.size; i++) { - sa[i] = factory.makeSuffix(this.items[i]); - } - Arrays.sort(sa); - return sa; - } - } - // the size of the alphabet to make the hashes int hashBase = sequence.getAlphabetSize(); System.out.println("AlphabetSize: " + sequence.getAlphabetSize()); @@ -354,7 +327,6 @@ public SuffixFactory.Suffix[] getSortedSuffixes() { System.out.printf("Suffix creation: %.2f%% complete.\n", j * 100.0 / numResiduesInSequence); } -// System.out.println(j); // quick wait to derive the next hash, since we are reading the sequence in order byte b = Constants.TERMINATOR; if (i < numResiduesInSequence) @@ -369,8 +341,6 @@ public SuffixFactory.Suffix[] getSortedSuffixes() { bucketSuffixes[currentHash].add(j); } - System.gc(); - try { DataOutputStream indexOut = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))); DataOutputStream nlcpOut = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(nlcpFile))); @@ -378,47 +348,9 @@ public SuffixFactory.Suffix[] getSortedSuffixes() { indexOut.writeInt(sequence.getId()); nlcpOut.writeInt(numResiduesInSequence); nlcpOut.writeInt(sequence.getId()); - SuffixFactory.Suffix prevBucketSuffix = null; -// byte[] neighboringLcps = new byte[numResiduesInSequence]; // the computed neighboring lcps System.out.println("Sorting suffixes... Size: " + bucketSuffixes.length); - lastStatusTime = System.currentTimeMillis(); - - for (int i = 0; i < bucketSuffixes.length; i++) { - // print progress - if (i % 100000 == 0 && System.currentTimeMillis() - lastStatusTime > 2000) { - lastStatusTime = System.currentTimeMillis(); - System.out.printf("Sorting: %.2f%% complete.\n", i * 100.0 / bucketSuffixes.length); - } - - if (bucketSuffixes[i] != null) { - - SuffixFactory.Suffix[] sortedSuffixes = bucketSuffixes[i].getSortedSuffixes(); - - SuffixFactory.Suffix first = sortedSuffixes[0]; - byte lcp = 0; - if (prevBucketSuffix != null) { - lcp = first.getLCP(prevBucketSuffix); - } - // write information to file - indexOut.writeInt(first.getIndex()); - nlcpOut.writeByte(lcp); - SuffixFactory.Suffix prevSuffix = first; - - for (int j = 1; j < sortedSuffixes.length; j++) { - SuffixFactory.Suffix thisSuffix = sortedSuffixes[j]; - //store the information - indexOut.writeInt(thisSuffix.getIndex()); - lcp = thisSuffix.getLCP(prevSuffix, BUCKET_SIZE); - nlcpOut.writeByte(lcp); - prevSuffix = thisSuffix; - } - prevBucketSuffix = sortedSuffixes[0]; - bucketSuffixes[i] = null; // deallocate the memory - } - } - - bucketSuffixes = null; + sortAndWriteBuckets(sequence, bucketSuffixes, indexFile, indexOut, nlcpOut); long lastModified = sequence.getLastModified(); indexOut.writeLong(lastModified); @@ -439,6 +371,363 @@ public SuffixFactory.Suffix[] getSortedSuffixes() { return; } + /** + * Sort + LCP compute phase. Parallelises across contiguous bucket-id + * ranges; each worker streams its sorted indices + intra-range LCPs into + * per-range temp files. The merge step fixes up the cross-range boundary + * LCP byte and streams the temp files into the final output sequentially + * (writing single-threaded preserves on-disk ordering). Temp files are + * deleted in the {@code finally} block, with {@link File#deleteOnExit} as + * a fallback for hard crashes. + */ + private static void sortAndWriteBuckets(CompactFastaSequence sequence, + Bucket[] bucketSuffixes, + File indexFile, + DataOutputStream indexOut, + DataOutputStream nlcpOut) throws IOException { + int numThreads = resolveSortThreads(); + int[][] ranges = partitionBucketIds(bucketSuffixes, numThreads); + + if (ranges.length == 1) { + writeBucketsDirect(sequence, bucketSuffixes, ranges[0][0], ranges[0][1], indexOut, nlcpOut); + return; + } + + File parentDir = indexFile.getAbsoluteFile().getParentFile(); + if (parentDir == null) parentDir = new File("."); + String tempBasename = indexFile.getName() + ".buildsa-tmp." + ProcessHandle.current().pid() + "." + System.nanoTime(); + + List rangeMetadatas = new ArrayList<>(ranges.length); + try { + ExecutorService pool = Executors.newFixedThreadPool(ranges.length, r -> { + Thread t = new Thread(r, "buildsa-sort"); + t.setDaemon(true); + return t; + }); + try { + List> futures = new ArrayList<>(ranges.length); + for (int idx = 0; idx < ranges.length; idx++) { + final int from = ranges[idx][0]; + final int to = ranges[idx][1]; + final File tempIndices = new File(parentDir, tempBasename + ".indices." + idx); + final File tempLcps = new File(parentDir, tempBasename + ".lcps." + idx); + tempIndices.deleteOnExit(); + tempLcps.deleteOnExit(); + futures.add(pool.submit(() -> processBucketRangeToTempFiles( + sequence, bucketSuffixes, from, to, tempIndices, tempLcps))); + } + for (Future f : futures) { + rangeMetadatas.add(f.get()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while building suffix array", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) throw (RuntimeException) cause; + if (cause instanceof IOException) throw (IOException) cause; + throw new IOException("Suffix array sort worker failed", cause != null ? cause : e); + } finally { + pool.shutdown(); + } + + int prevRangeLastBucketFirst = -1; + for (RangeMetadata md : rangeMetadatas) { + if (md.numEntries == 0) continue; + mergeRangeIntoOutput(sequence, md, prevRangeLastBucketFirst, indexOut, nlcpOut); + prevRangeLastBucketFirst = md.lastBucketFirstSuffix; + } + } finally { + for (RangeMetadata md : rangeMetadatas) { + deleteQuietly(md.tempIndicesFile); + deleteQuietly(md.tempLcpsFile); + } + // Sweep debris from workers that died before returning a RangeMetadata. + File[] orphans = parentDir.listFiles((dir, name) -> name.startsWith(tempBasename)); + if (orphans != null) { + for (File f : orphans) deleteQuietly(f); + } + } + } + + private static void deleteQuietly(File f) { + if (f == null) return; + try { Files.deleteIfExists(f.toPath()); } catch (IOException ignored) { } + } + + /** + * Stream one range's temp files into the final output. The first LCP byte + * is rewritten against {@code prevRangeLastBucketFirst} to bridge the + * cross-range boundary; for the globally-first range + * {@code prevRangeLastBucketFirst} is -1 and the placeholder 0 written by + * the worker passes through. + */ + private static void mergeRangeIntoOutput(CompactFastaSequence sequence, + RangeMetadata md, + int prevRangeLastBucketFirst, + DataOutputStream indexOut, + DataOutputStream nlcpOut) throws IOException { + try (DataInputStream idxIn = new DataInputStream(new BufferedInputStream(new FileInputStream(md.tempIndicesFile))); + DataInputStream lcpIn = new DataInputStream(new BufferedInputStream(new FileInputStream(md.tempLcpsFile)))) { + int firstIndex = idxIn.readInt(); + byte firstLcp = lcpIn.readByte(); + if (prevRangeLastBucketFirst >= 0) { + firstLcp = computeLcpByte(sequence, firstIndex, prevRangeLastBucketFirst, 0); + } + indexOut.writeInt(firstIndex); + nlcpOut.writeByte(firstLcp); + + for (int i = 1; i < md.numEntries; i++) { + indexOut.writeInt(idxIn.readInt()); + nlcpOut.writeByte(lcpIn.readByte()); + } + } + } + + private static int resolveSortThreads() { + String configured = System.getProperty(SA_BUILD_THREADS_PROPERTY); + if (configured != null) { + try { + int n = Integer.parseInt(configured.trim()); + if (n > 0) return n; + } catch (NumberFormatException ignored) { } + } + int procs = Runtime.getRuntime().availableProcessors(); + return Math.max(1, Math.min(procs, MAX_DEFAULT_SA_BUILD_THREADS)); + } + + /** + * Split bucket ids into contiguous ranges balanced by total suffix count + * (so each worker has roughly equal sort+LCP work, not equal bucket count). + */ + private static int[][] partitionBucketIds(Bucket[] buckets, int numThreads) { + if (numThreads <= 1 || buckets.length == 0) { + return new int[][]{{0, buckets.length}}; + } + long totalSuffixes = 0L; + for (Bucket b : buckets) { + if (b != null) totalSuffixes += b.size; + } + if (totalSuffixes == 0L) { + return new int[][]{{0, buckets.length}}; + } + long perThread = (totalSuffixes + numThreads - 1) / numThreads; + + int[][] ranges = new int[numThreads][]; + int rangeStart = 0; + int rangeIdx = 0; + long running = 0L; + for (int i = 0; i < buckets.length; i++) { + Bucket b = buckets[i]; + if (b != null) running += b.size; + if (running >= perThread && rangeIdx < numThreads - 1) { + ranges[rangeIdx++] = new int[]{rangeStart, i + 1}; + rangeStart = i + 1; + running = 0L; + } + } + ranges[rangeIdx++] = new int[]{rangeStart, buckets.length}; + if (rangeIdx != numThreads) { + int[][] trimmed = new int[rangeIdx][]; + System.arraycopy(ranges, 0, trimmed, 0, rangeIdx); + ranges = trimmed; + } + return ranges; + } + + /** + * Sort each bucket in the range, compute intra-range LCPs, and stream the + * output into per-worker temp files. The first LCP byte is a placeholder + * (0) — the merge step rewrites it against the previous range's last + * bucket. Each bucket's storage is released as soon as it is sorted, so + * peak heap is bounded by the largest in-flight bucket per thread. + */ + private static RangeMetadata processBucketRangeToTempFiles(CompactFastaSequence sequence, + Bucket[] buckets, + int from, + int to, + File tempIndicesFile, + File tempLcpsFile) throws IOException { + long count = 0L; + for (int i = from; i < to; i++) { + if (buckets[i] != null) count += buckets[i].size; + } + if (count == 0L) { + return new RangeMetadata(null, null, 0, -1); + } + if (count > Integer.MAX_VALUE) { + throw new IllegalStateException("Suffix array bucket range exceeds Integer.MAX_VALUE entries"); + } + + int lastBucketFirstSuffix = -1; + int prevIntraBucketLast = -1; + int prevBucketFirst = -1; + int numEntries = 0; + boolean firstBucketSeen = false; + + try (DataOutputStream idxOut = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tempIndicesFile))); + DataOutputStream lcpOut = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tempLcpsFile)))) { + for (int bucketId = from; bucketId < to; bucketId++) { + Bucket bucket = buckets[bucketId]; + if (bucket == null) continue; + + int[] sorted = bucket.trimmedArray(); + buckets[bucketId] = null; + IntArrays.quickSort(sorted, (a, b) -> compareSuffixesFrom(sequence, a, b, BUCKET_SIZE)); + + int first = sorted[0]; + idxOut.writeInt(first); + byte lcp = firstBucketSeen ? computeLcpByte(sequence, first, prevBucketFirst, 0) : 0; + lcpOut.writeByte(lcp); + numEntries++; + firstBucketSeen = true; + prevIntraBucketLast = first; + + for (int j = 1; j < sorted.length; j++) { + int thisIndex = sorted[j]; + idxOut.writeInt(thisIndex); + lcpOut.writeByte(computeLcpByte(sequence, thisIndex, prevIntraBucketLast, BUCKET_SIZE)); + numEntries++; + prevIntraBucketLast = thisIndex; + } + + prevBucketFirst = first; + lastBucketFirstSuffix = first; + } + } + + return new RangeMetadata(tempIndicesFile, tempLcpsFile, numEntries, lastBucketFirstSuffix); + } + + /** + * Single-thread direct-write path: sort each bucket, compute LCPs, and + * write to disk in one pass. Used when {@link #SA_BUILD_THREADS_PROPERTY} + * resolves to 1. + */ + private static void writeBucketsDirect(CompactFastaSequence sequence, + Bucket[] buckets, + int from, + int to, + DataOutputStream indexOut, + DataOutputStream nlcpOut) throws IOException { + int prevBucketFirstIndex = -1; + long lastStatusTime = System.currentTimeMillis(); + for (int i = from; i < to; i++) { + if (i % 100000 == 0 && System.currentTimeMillis() - lastStatusTime > 2000) { + lastStatusTime = System.currentTimeMillis(); + System.out.printf("Sorting: %.2f%% complete.%n", (i - from) * 100.0 / (to - from)); + } + + Bucket bucket = buckets[i]; + if (bucket == null) continue; + + int[] sorted = bucket.trimmedArray(); + buckets[i] = null; + IntArrays.quickSort(sorted, (a, b) -> compareSuffixesFrom(sequence, a, b, BUCKET_SIZE)); + + int first = sorted[0]; + byte lcp = 0; + if (prevBucketFirstIndex >= 0) { + lcp = computeLcpByte(sequence, first, prevBucketFirstIndex, 0); + } + indexOut.writeInt(first); + nlcpOut.writeByte(lcp); + int prev = first; + + for (int j = 1; j < sorted.length; j++) { + int thisIndex = sorted[j]; + indexOut.writeInt(thisIndex); + lcp = computeLcpByte(sequence, thisIndex, prev, BUCKET_SIZE); + nlcpOut.writeByte(lcp); + prev = thisIndex; + } + prevBucketFirstIndex = first; + } + } + + /** Per-worker sort+LCP output handle. Indices/LCPs live on disk; this carries + * the small metadata the merge step needs. Empty ranges return {@code null} + * file paths. */ + static final class RangeMetadata { + final File tempIndicesFile; + final File tempLcpsFile; + final int numEntries; + final int lastBucketFirstSuffix; + + RangeMetadata(File tempIndicesFile, File tempLcpsFile, int numEntries, int lastBucketFirstSuffix) { + this.tempIndicesFile = tempIndicesFile; + this.tempLcpsFile = tempLcpsFile; + this.numEntries = numEntries; + this.lastBucketFirstSuffix = lastBucketFirstSuffix; + } + } + + /** Growable {@code int[]} bucket of suffix indices. Shared between the + * bucketing phase (sequential {@link #add}) and the per-range worker + * threads (concurrent {@link #trimmedArray} — safe because bucketing + * completes before any worker starts). */ + private static final class Bucket { + private int[] items; + private int size; + + Bucket() { + this.items = new int[10]; + this.size = 0; + } + + void add(int item) { + if (this.size >= items.length) { + this.items = Arrays.copyOf(this.items, this.size * 2); + } + this.items[this.size++] = item; + } + + /** Return a fresh int[] of exactly {@code size} entries. The bucket's + * internal storage can then be dropped. */ + int[] trimmedArray() { + return (this.size == this.items.length) ? this.items : Arrays.copyOf(this.items, this.size); + } + } + + /** + * Compare two suffixes of {@code sequence} starting at the given offset. + * Sign semantics match {@link Comparable#compareTo} and {@link ByteSequence#compareTo}; + * magnitude is not preserved. + */ + private static int compareSuffixesFrom(CompactFastaSequence sequence, int idxA, int idxB, int startOffset) { + if (idxA == idxB) return 0; + long seqSize = sequence.getSize(); + long remainA = seqSize - idxA; + long remainB = seqSize - idxB; + long limitLong = Math.min(remainA, remainB); + int limit = limitLong > ByteSequence.MAX_COMPARISON_LENGTH + ? ByteSequence.MAX_COMPARISON_LENGTH + : (int) limitLong; + for (int offset = startOffset; offset < limit; offset++) { + byte a = sequence.getByteAt(idxA + offset); + byte b = sequence.getByteAt(idxB + offset); + if (a != b) return Byte.compare(a, b); // signed compare, matches ByteSequence.compareTo + } + // Shorter suffix sorts first (matches ByteSequence.compareTo semantics). + return Long.compare(remainA, remainB); + } + + /** LCP of two suffixes starting from {@code startOffset}, capped at {@link Byte#MAX_VALUE}. */ + private static byte computeLcpByte(CompactFastaSequence sequence, int idxA, int idxB, int startOffset) { + long seqSize = sequence.getSize(); + long remainA = seqSize - idxA; + long remainB = seqSize - idxB; + long limitLong = Math.min(remainA, remainB); + int limit = limitLong > Byte.MAX_VALUE ? Byte.MAX_VALUE : (int) limitLong; + int offset = startOffset; + for (; offset < limit; offset++) { + byte a = sequence.getByteAt(idxA + offset); + byte b = sequence.getByteAt(idxB + offset); + if (a != b) return (byte) offset; + } + return (byte) offset; + } + @Override public String toString() { String retVal = "Size of the suffix array: " + this.size + "\n"; diff --git a/src/main/java/edu/ucsd/msjava/msdbsearch/ConcurrentMSGFPlus.java b/src/main/java/edu/ucsd/msjava/msdbsearch/ConcurrentMSGFPlus.java index 8a434214..4afcd8a5 100644 --- a/src/main/java/edu/ucsd/msjava/msdbsearch/ConcurrentMSGFPlus.java +++ b/src/main/java/edu/ucsd/msjava/msdbsearch/ConcurrentMSGFPlus.java @@ -1,159 +1,167 @@ -package edu.ucsd.msjava.msdbsearch; - -import edu.ucsd.msjava.misc.ProgressData; -import edu.ucsd.msjava.misc.ProgressReporter; - -import java.io.PrintStream; -import java.util.List; - -import org.apache.commons.io.output.NullOutputStream; - -public class ConcurrentMSGFPlus { - public static class RunMSGFPlus implements Runnable, ProgressReporter { - private final ScoredSpectraMap specScanner; - private final DBScanner scanner; - SearchParams params; - List resultList; - private final int taskNum; - private ProgressData progress; - - @Override - public void setProgressData(ProgressData data) { - progress = data; - } - - @Override - public ProgressData getProgressData() { - return progress; - } - - public RunMSGFPlus( - ScoredSpectraMap specScanner, - CompactSuffixArray sa, - SearchParams params, - List resultList, - int taskNum - ) { - this.specScanner = specScanner; - this.params = params; - this.scanner = new DBScanner( - specScanner, - sa, - params.getEnzyme(), - params.getAASet(), - params.getNumMatchesPerSpec(), - params.getMinPeptideLength(), - params.getMaxPeptideLength(), - params.getMaxNumVariantsPerPeptide(), - params.getMinDeNovoScore(), - params.ignoreMetCleavage(), - params.getMaxMissedCleavages() - ); - this.resultList = resultList; - this.taskNum = taskNum; - progress = null; - } - - @Override - public void run() { - if (progress == null) { - progress = new ProgressData(); - } - - PrintStream output; - if (params.getVerbose()) { - output = System.out; - } else { - output = new PrintStream(new NullOutputStream()); - } - - progress.stepRange(5.0); - String threadName = Thread.currentThread().getName(); - output.println(threadName + ": Starting task " + taskNum); - - specScanner.setProgressObj(new ProgressData(progress)); - - // Pre-process spectra - long startTimePreprocess = System.currentTimeMillis(); - if (Thread.currentThread().isInterrupted()) { - return; - } - - if (specScanner.getPepMassSpecKeyMap().size() == 0) - specScanner.makePepMassSpecKeyMap(); - - output.println(threadName + ": Preprocessing spectra..."); - if (Thread.currentThread().isInterrupted()) { - return; - } - specScanner.preProcessSpectra(); - if (Thread.currentThread().isInterrupted()) { - return; - } - output.print(threadName + ": Preprocessing spectra finished "); - output.format("(elapsed time: %.2f sec)\n", (float) ((System.currentTimeMillis() - startTimePreprocess) / 1000)); - - specScanner.getProgressObj().setParentProgressObj(null); - progress.report(5.0); - progress.stepRange(80.0); - scanner.setProgressObj(new ProgressData(progress)); - - long startTimeDbSearch = System.currentTimeMillis(); - - // DB search - output.println(threadName + ": Database search..."); - scanner.setThreadName(threadName); - scanner.setPrintStream(output); - - int ntt = params.getNumTolerableTermini(); - if (params.getEnzyme() == null) - ntt = 0; - int nnet = 2 - ntt; - if (Thread.currentThread().isInterrupted()) { - return; - } - scanner.dbSearch(nnet); - if (Thread.currentThread().isInterrupted()) { - return; - } - output.print(threadName + ": Database search finished "); - output.format("(elapsed time: %.2f sec)\n", (float) ((System.currentTimeMillis() - startTimeDbSearch) / 1000)); - - progress.stepRange(95.0); - - long startTimeComputeEvalue = System.currentTimeMillis(); - output.println(threadName + ": Computing spectral E-values..."); - if (Thread.currentThread().isInterrupted()) { - return; - } - scanner.computeSpecEValue(false); - if (Thread.currentThread().isInterrupted()) { - return; - } - output.print(threadName + ": Computing spectral E-values finished "); - output.format("(elapsed time: %.2f sec)\n", (float) ((System.currentTimeMillis() - startTimeComputeEvalue) / 1000)); - - scanner.getProgressObj().setParentProgressObj(null); - progress.stepRange(100); - - if (Thread.currentThread().isInterrupted()) { - return; - } - - scanner.generateSpecIndexDBMatchMap(); - - progress.report(30.0); - - if (params.outputAdditionalFeatures()) - scanner.addAdditionalFeatures(); - - progress.report(60.0); - - scanner.addResultsToList(resultList); - - progress.report(100.0); -// gen.addSpectrumIdentificationResults(scanner.getSpecIndexDBMatchMap()); - output.println(threadName + ": Task " + taskNum + " completed."); - } - } -} +package edu.ucsd.msjava.msdbsearch; + +import edu.ucsd.msjava.misc.ProgressData; +import edu.ucsd.msjava.misc.ProgressReporter; + +import java.io.PrintStream; +import java.util.List; +import java.util.function.Supplier; + +import org.apache.commons.io.output.NullOutputStream; + +public class ConcurrentMSGFPlus { + public static class RunMSGFPlus implements Runnable, ProgressReporter { + private final Supplier specScannerSupplier; + private final CompactSuffixArray sa; + SearchParams params; + List resultList; + private final int taskNum; + private ProgressData progress; + private ScoredSpectraMap specScanner; + private DBScanner scanner; + + @Override + public void setProgressData(ProgressData data) { + progress = data; + } + + @Override + public ProgressData getProgressData() { + return progress; + } + + public RunMSGFPlus( + Supplier specScannerSupplier, + CompactSuffixArray sa, + SearchParams params, + List resultList, + int taskNum + ) { + this.specScannerSupplier = specScannerSupplier; + this.sa = sa; + this.params = params; + this.resultList = resultList; + this.taskNum = taskNum; + progress = null; + } + + @Override + public void run() { + if (progress == null) { + progress = new ProgressData(); + } + + if (specScanner == null) { + specScanner = specScannerSupplier.get(); + scanner = new DBScanner( + specScanner, + sa, + params.getEnzyme(), + params.getAASet(), + params.getNumMatchesPerSpec(), + params.getMinPeptideLength(), + params.getMaxPeptideLength(), + params.getMaxNumVariantsPerPeptide(), + params.getMinDeNovoScore(), + params.ignoreMetCleavage(), + params.getMaxMissedCleavages() + ); + } + + PrintStream output; + if (params.getVerbose()) { + output = System.out; + } else { + output = new PrintStream(new NullOutputStream()); + } + + progress.stepRange(5.0); + String threadName = Thread.currentThread().getName(); + output.println(threadName + ": Starting task " + taskNum); + + specScanner.setProgressObj(new ProgressData(progress)); + + // Pre-process spectra + long startTimePreprocess = System.currentTimeMillis(); + if (Thread.currentThread().isInterrupted()) { + return; + } + + if (specScanner.getPepMassSpecKeyMap().size() == 0) + specScanner.makePepMassSpecKeyMap(); + + output.println(threadName + ": Preprocessing spectra..."); + if (Thread.currentThread().isInterrupted()) { + return; + } + specScanner.preProcessSpectra(); + if (Thread.currentThread().isInterrupted()) { + return; + } + output.print(threadName + ": Preprocessing spectra finished "); + output.format("(elapsed time: %.2f sec)\n", (float) ((System.currentTimeMillis() - startTimePreprocess) / 1000)); + + specScanner.getProgressObj().setParentProgressObj(null); + progress.report(5.0); + progress.stepRange(80.0); + scanner.setProgressObj(new ProgressData(progress)); + + long startTimeDbSearch = System.currentTimeMillis(); + + // DB search + output.println(threadName + ": Database search..."); + scanner.setThreadName(threadName); + scanner.setPrintStream(output); + + int ntt = params.getNumTolerableTermini(); + if (params.getEnzyme() == null) + ntt = 0; + int nnet = 2 - ntt; + if (Thread.currentThread().isInterrupted()) { + return; + } + scanner.dbSearch(nnet); + if (Thread.currentThread().isInterrupted()) { + return; + } + output.print(threadName + ": Database search finished "); + output.format("(elapsed time: %.2f sec)\n", (float) ((System.currentTimeMillis() - startTimeDbSearch) / 1000)); + + progress.stepRange(95.0); + + long startTimeComputeEvalue = System.currentTimeMillis(); + output.println(threadName + ": Computing spectral E-values..."); + if (Thread.currentThread().isInterrupted()) { + return; + } + scanner.computeSpecEValue(false); + if (Thread.currentThread().isInterrupted()) { + return; + } + output.print(threadName + ": Computing spectral E-values finished "); + output.format("(elapsed time: %.2f sec)\n", (float) ((System.currentTimeMillis() - startTimeComputeEvalue) / 1000)); + + scanner.getProgressObj().setParentProgressObj(null); + progress.stepRange(100); + + if (Thread.currentThread().isInterrupted()) { + return; + } + + scanner.generateSpecIndexDBMatchMap(); + + progress.report(30.0); + + if (params.outputAdditionalFeatures()) + scanner.addAdditionalFeatures(); + + progress.report(60.0); + + scanner.addResultsToList(resultList); + + progress.report(100.0); +// gen.addSpectrumIdentificationResults(scanner.getSpecIndexDBMatchMap()); + output.println(threadName + ": Task " + taskNum + " completed."); + } + } +} diff --git a/src/main/java/edu/ucsd/msjava/msdbsearch/MassCalibrator.java b/src/main/java/edu/ucsd/msjava/msdbsearch/MassCalibrator.java index 1c97ba2b..24817f94 100644 --- a/src/main/java/edu/ucsd/msjava/msdbsearch/MassCalibrator.java +++ b/src/main/java/edu/ucsd/msjava/msdbsearch/MassCalibrator.java @@ -119,17 +119,7 @@ public MassCalibrator( * @return learned ppm shift, or 0.0 if the pre-pass had insufficient data */ public double learnPrecursorShiftPpm(int ioIndex) { - // Cheap guard: skip the pre-pass entirely on small files. Running the - // pre-pass calls preProcessSpectra() on a subset of shared Spectrum - // objects, which mutates their scored state and causes a ~0.1% PSM-list - // drift vs -precursorCal off when the main search later re-processes - // those same spectra. This is the hard correctness gate. - // - // Threshold of 10_000 SpecKeys corresponds to ~3000-spectrum files, which - // is both (a) too small to reliably yield MIN_CONFIDENT_PSMS confident - // matches, and (b) small enough that the state-mutation side-effect is - // noticeable. Real datasets (PXD001819 ~66K, Astral ~75K, TMT ~40K) are - // comfortably above the threshold and run the calibrator as intended. + // Skip the pre-pass on small files where MIN_CONFIDENT_PSMS can't be reached. if (specKeyList == null || specKeyList.size() < MIN_SPECKEYS_FOR_PREPASS) { return 0.0; } diff --git a/src/main/java/edu/ucsd/msjava/msutil/ActivationMethod.java b/src/main/java/edu/ucsd/msjava/msutil/ActivationMethod.java index 1c0abe3e..a691dfe9 100644 --- a/src/main/java/edu/ucsd/msjava/msutil/ActivationMethod.java +++ b/src/main/java/edu/ucsd/msjava/msutil/ActivationMethod.java @@ -1,9 +1,7 @@ package edu.ucsd.msjava.msutil; -import edu.ucsd.msjava.mzid.Constants; import edu.ucsd.msjava.params.ParamObject; import edu.ucsd.msjava.params.UserParam; -import uk.ac.ebi.jmzidml.model.mzidml.CvParam; import java.io.File; import java.nio.file.Paths; @@ -16,7 +14,7 @@ public class ActivationMethod implements ParamObject { private String fullName; private boolean electronBased = false; private String accession; - private CvParam cvParam; + private CvParamInfo cvParam; private ActivationMethod(String name, String fullName) { this(name, fullName, null); @@ -27,7 +25,7 @@ private ActivationMethod(String name, String fullName, String accession) { this.fullName = fullName; this.accession = accession; if (accession != null) - this.cvParam = Constants.makeCvParam(accession, fullName); + this.cvParam = new CvParamInfo(accession, fullName, null); } private ActivationMethod electronBased() { @@ -55,7 +53,7 @@ public boolean isElectronBased() { return electronBased; } - public CvParam getCvParam() { + public CvParamInfo getCvParam() { return cvParam; } diff --git a/src/main/java/edu/ucsd/msjava/msutil/CvParamInfo.java b/src/main/java/edu/ucsd/msjava/msutil/CvParamInfo.java index 96cd07da..86a83109 100644 --- a/src/main/java/edu/ucsd/msjava/msutil/CvParamInfo.java +++ b/src/main/java/edu/ucsd/msjava/msutil/CvParamInfo.java @@ -1,26 +1,31 @@ -package edu.ucsd.msjava.msutil; - -/** - * Stored information for a cvParam, for "copying" cvParams from mzML( and potentially other formats) to mzIdentML - * @author Bryson Gibbons - */ -public class CvParamInfo { - private String accession; - private String name; - private String value; - private String unitAccession; - private String unitName; - private Boolean hasUnit; - - public CvParamInfo(String accession, String name, String value) { - this.accession = accession; - this.name = name; - this.value = value; - } - - public CvParamInfo(String accession, String name, String value, String unitAccession, String unitName) { - this.accession = accession; - this.name = name; +package edu.ucsd.msjava.msutil; + +/** + * Lightweight controlled-vocabulary parameter metadata used by parsers and + * runtime metadata plumbing without depending on mzIdentML model classes. + * + * @author Bryson Gibbons + */ +public class CvParamInfo { + private final String accession; + private final String name; + private final String value; + private final String unitAccession; + private final String unitName; + private final Boolean hasUnit; + + public CvParamInfo(String accession, String name, String value) { + this.accession = accession; + this.name = name; + this.value = value; + this.unitAccession = null; + this.unitName = null; + this.hasUnit = false; + } + + public CvParamInfo(String accession, String name, String value, String unitAccession, String unitName) { + this.accession = accession; + this.name = name; this.value = value; this.hasUnit = true; this.unitAccession = unitAccession; diff --git a/src/main/java/edu/ucsd/msjava/msutil/SpectraAccessor.java b/src/main/java/edu/ucsd/msjava/msutil/SpectraAccessor.java index 9b9707fc..57523ec9 100644 --- a/src/main/java/edu/ucsd/msjava/msutil/SpectraAccessor.java +++ b/src/main/java/edu/ucsd/msjava/msutil/SpectraAccessor.java @@ -1,11 +1,9 @@ package edu.ucsd.msjava.msutil; -import edu.ucsd.msjava.mzid.Constants; import edu.ucsd.msjava.mzml.StaxMzMLParser; import edu.ucsd.msjava.mzml.StaxMzMLSpectraIterator; import edu.ucsd.msjava.mzml.StaxMzMLSpectraMap; import edu.ucsd.msjava.parser.*; -import uk.ac.ebi.jmzidml.model.mzidml.CvParam; import java.io.File; import java.io.FileNotFoundException; @@ -67,7 +65,7 @@ public SpectrumAccessorBySpecIndex getSpecMap() { if (specFormat == SpecFileFormat.MZML) { if (staxParser == null) { try { - staxParser = new StaxMzMLParser(specFile); + staxParser = new StaxMzMLParser(specFile, minMSLevel, maxMSLevel); } catch (Exception e) { throw new RuntimeException("Failed to parse mzML file: " + specFile.getAbsolutePath(), e); } @@ -104,7 +102,7 @@ public Iterator getSpecItr() { if (specFormat == SpecFileFormat.MZML) { if (staxParser == null) { try { - staxParser = new StaxMzMLParser(specFile); + staxParser = new StaxMzMLParser(specFile, minMSLevel, maxMSLevel); } catch (Exception e) { throw new RuntimeException("Failed to parse mzML file: " + specFile.getAbsolutePath(), e); } @@ -167,16 +165,16 @@ public String getTitle(int specIndex) { return getSpecMap().getTitle(specIndex); } - public CvParam getSpectrumIDFormatCvParam() { - CvParam cvParam = null; + public CvParamInfo getSpectrumIDFormatCvParam() { + CvParamInfo cvParam = null; if (specFormat == SpecFileFormat.DTA_TXT || specFormat == SpecFileFormat.MGF || specFormat == SpecFileFormat.PKL || specFormat == SpecFileFormat.MS2 ) - cvParam = Constants.makeCvParam("MS:1000774", "multiple peak list nativeID format"); + cvParam = new CvParamInfo("MS:1000774", "multiple peak list nativeID format", null); else if (specFormat == SpecFileFormat.MZDATA) - cvParam = Constants.makeCvParam("MS:1000777", "spectrum identifier nativeID format"); + cvParam = new CvParamInfo("MS:1000777", "spectrum identifier nativeID format", null); else if (specFormat == SpecFileFormat.MZML) { if (staxParser == null) { try { @@ -187,7 +185,7 @@ else if (specFormat == SpecFileFormat.MZML) { } String[] idFormat = staxParser.detectSpectrumIDFormat(); if (idFormat != null) { - cvParam = Constants.makeCvParam(idFormat[0], idFormat[1]); + cvParam = new CvParamInfo(idFormat[0], idFormat[1], null); } else { throw new IllegalStateException("Unsupported mzML format: " + specFile.getAbsolutePath() + " does not contain a child term of MS:1000767 (native spectrum identifier format)"); diff --git a/src/main/java/edu/ucsd/msjava/mzid/Constants.java b/src/main/java/edu/ucsd/msjava/mzid/Constants.java deleted file mode 100644 index 75573b05..00000000 --- a/src/main/java/edu/ucsd/msjava/mzid/Constants.java +++ /dev/null @@ -1,159 +0,0 @@ -package edu.ucsd.msjava.mzid; - -import edu.ucsd.msjava.ui.MSGFPlus; -import uk.ac.ebi.jmzidml.model.mzidml.*; - -public class Constants { - static String analysisSoftID = "ID_software"; - static String providerID = "ID_provider"; - static String siListID = "SI_LIST_1"; - static String sirID = "SIR_"; - static String siiID = "SII_"; - static String spectraDataID = "SID_1"; - static String psiCvID = "PSI-MS"; - static String siProtocolID = "SearchProtocol_1"; - static String massTableID = "MassTable_1"; - static String searchDBID = "SearchDB_1"; - static String pepEvidenceListID = "PepEvidList_1"; - static String specIdentID = "SpecIdent_1"; - static String unimodID = "UNIMOD"; - static String unitCvID = "UO"; - static String measureMzID = "Measure_MZ"; - static String measureIntID = "Measure_Int"; - static String measureErrorID = "Measure_Error"; - static String sourceFileID = "SourceFile_1"; - static String pepIDPrefix = "Pep_"; - static String pepEvIDPrefix = "PepEv_"; - static final String UNIMOD_RESOURCE_PATH = "unimod.obo"; - - static Cv psiCV; - static Cv unimodCV; - static Cv unitCV; - - static AnalysisSoftware msgfPlus; - -// static Person docOwner; -// static Organization org; -// static Affiliation aff; - - static { - psiCV = new Cv(); - psiCV.setUri("https://raw.githubusercontent.com/HUPO-PSI/psi-ms-CV/master/psi-ms.obo"); - psiCV.setId(psiCvID); - psiCV.setVersion("3.30.0"); - psiCV.setFullName("PSI-MS"); - - unimodCV = new Cv(); - unimodCV.setUri("http://www.unimod.org/obo/unimod.obo"); - unimodCV.setId(unimodID); - unimodCV.setFullName("UNIMOD"); - - unitCV = new Cv(); - unitCV.setUri("https://raw.githubusercontent.com/bio-ontology-research-group/unit-ontology/master/unit.obo"); - unitCV.setId(unitCvID); - unitCV.setFullName("UNIT-ONTOLOGY"); - - msgfPlus = new AnalysisSoftware(); - msgfPlus.setName("MS-GF+"); - Param tempParam = new Param(); - tempParam.setParam(makeCvParam("MS:1002048", "MS-GF+")); -// tempParam.setParam(makeCvParam("MS:1001475","OMSSA")); - msgfPlus.setSoftwareName(tempParam); - msgfPlus.setId(analysisSoftID); - msgfPlus.setVersion(MSGFPlus.VERSION); - -// docOwner = new Person(); -// docOwner.setId("PERSON_DOC_OWNER"); -// docOwner.setFirstName("Sangtae"); -// docOwner.setLastName("Kim"); -// -// org = new Organization(); -// org.setId("ORG_DOC_OWNER"); -// org.setName("UCSD"); -// -// Affiliation aff = new Affiliation(); -// aff.setOrganization(org); -// docOwner.getAffiliation().add(aff); - - } - - /** - * Helper method to create and return a CvParam from accession, name and CV - * - * @return CvParam - */ - public static CvParam makeCvParam(String accession, String name) { - return makeCvParam(accession, name, psiCV); - } - - /** - * Helper method to create and return a CvParam from accession, name and CV - * - * @return CvParam - */ - public static CvParam makeCvParam(String accession, String name, Cv cv) { - CvParam cvParam = new CvParam(); - cvParam.setAccession(accession); - cvParam.setName(name); - cvParam.setCv(cv); - return cvParam; - } - - /** - * Helper method to create and return a CvParam from accession, name, CV, unitAccession, unitName and unitCV - * - * @return CvParam - */ - public static CvParam makeCvParam(String accession, String name, Cv cv, String unitAccession, String unitName, Cv alternateUnitCV) { - CvParam cvParam = makeCvParam(accession, name, cv); - cvParam.setUnitAccession(unitAccession); - cvParam.setUnitCv(alternateUnitCV); - cvParam.setUnitName(unitName); - return cvParam; - } - - /** - * Helper method to create and return an UserParam - * - * @return UserParam - */ - public static UserParam makeUserParam(String name) { - UserParam userParam = new UserParam(); - userParam.setName(name); - return userParam; - } - - /** - * Helper method to create and return an UserParam - * - * @return UserParam - */ - public static UserParam makeUserParam(String name, String value) { - UserParam userParam = new UserParam(); - userParam.setName(name); - userParam.setValue(value); - return userParam; - } - - /** - * Helper method to setup a CvParam with CVRef, with either Daltons or ppm as units - */ - - public static CvParam getCvParamWithMassUnits(Boolean isDaltonUnit) { - CvParam cvParam = new CvParam(); - - // - cvParam.setCv(psiCV); - cvParam.setUnitCv(unitCV); - - if (isDaltonUnit) { - cvParam.setUnitAccession("UO:0000221"); - cvParam.setUnitName("dalton"); - } else { - cvParam.setUnitAccession("UO:0000169"); - cvParam.setUnitName("parts per million"); - } - return cvParam; - } - -} diff --git a/src/main/java/edu/ucsd/msjava/mzid/Unimod.java b/src/main/java/edu/ucsd/msjava/mzid/Unimod.java index dc795950..32e6f307 100644 --- a/src/main/java/edu/ucsd/msjava/mzid/Unimod.java +++ b/src/main/java/edu/ucsd/msjava/mzid/Unimod.java @@ -7,11 +7,12 @@ import java.util.HashMap; import java.util.Map; -public class Unimod { - - public static Unimod getUnimod() { - return unimod; - } +public class Unimod { + private static final String UNIMOD_RESOURCE_PATH = "unimod.obo"; + + public static Unimod getUnimod() { + return unimod; + } public String getRecordID(String name) { return recordIDMap.get(name); @@ -26,14 +27,14 @@ public String getDeltaComposition(String id) { private Unimod() { readUnimodOBOFile(); - } - - private void readUnimodOBOFile() { - InputStream is = Unimod.class.getClassLoader().getResourceAsStream(Constants.UNIMOD_RESOURCE_PATH); - if (is == null) { - System.err.println("Unable to access \"unimod.obo\"."); - System.exit(-1); - } + } + + private void readUnimodOBOFile() { + InputStream is = Unimod.class.getClassLoader().getResourceAsStream(UNIMOD_RESOURCE_PATH); + if (is == null) { + System.err.println("Unable to access \"unimod.obo\"."); + System.exit(-1); + } BufferedReader in = new BufferedReader(new InputStreamReader(is)); recordIDMap = new HashMap(); diff --git a/src/main/java/edu/ucsd/msjava/mzml/StaxMzMLParser.java b/src/main/java/edu/ucsd/msjava/mzml/StaxMzMLParser.java index 98d2383a..12b15475 100644 --- a/src/main/java/edu/ucsd/msjava/mzml/StaxMzMLParser.java +++ b/src/main/java/edu/ucsd/msjava/mzml/StaxMzMLParser.java @@ -17,7 +17,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.zip.DataFormatException; import java.util.zip.Inflater; @@ -64,8 +63,13 @@ public static class SpectrumIndex { // Referenceable param groups: group ID -> list of [accession, name, value, unitAccession, unitName] private final Map> refParamGroups; - // Full spectrum cache (populated on first random access, thread-safe) - private final ConcurrentHashMap cache; + /** MS-level filter: spectra outside this range are never decoded or cached. */ + private final int minMSLevel; + private final int maxMSLevel; + + /** Synchronized cache of in-filter spectra. Returns defensive copies on read + * so pre-pass mutations cannot leak to the main pass. */ + private final Map cache; private volatile boolean allLoaded = false; // Reusable XMLInputFactory (thread-safe for creation) @@ -79,16 +83,28 @@ public static class SpectrumIndex { } /** - * Construct a parser for the given mzML file. - * Immediately builds the spectrum index (single sequential pass). + * Construct a parser for the given mzML file with no MS-level filter. + * Prefer {@link #StaxMzMLParser(File, int, int)} so MS1 spectra can be + * skipped during the binary-decode preload. */ public StaxMzMLParser(File specFile) throws IOException, XMLStreamException { + this(specFile, 1, Integer.MAX_VALUE); + } + + /** + * Construct a parser for the given mzML file, decoding/caching only spectra + * with MS level inside {@code [minMSLevel, maxMSLevel]}. Immediately builds + * the spectrum index (single sequential pass; no peak decode). + */ + public StaxMzMLParser(File specFile, int minMSLevel, int maxMSLevel) throws IOException, XMLStreamException { this.specFile = specFile; + this.minMSLevel = minMSLevel; + this.maxMSLevel = maxMSLevel; this.indexList = new ArrayList<>(); this.indexBySpecIdx = new HashMap<>(); this.indexById = new HashMap<>(); this.refParamGroups = new HashMap<>(); - this.cache = new ConcurrentHashMap<>(); + this.cache = Collections.synchronizedMap(new HashMap<>()); buildIndex(); } @@ -138,41 +154,59 @@ public Float getPrecursorMz(int specIndex) { /** * Parse and return the full spectrum (with peaks) for the given 1-based index. - * On first cache miss, preloads ALL spectra in a single sequential pass. - * This avoids O(N) scans from the start for each random-access lookup. + * On first cache miss, performs a bulk preload of all in-filter spectra; every + * subsequent call returns a defensive copy from the cache. Returns {@code null} + * for unknown indices and for spectra outside the configured MS-level filter. */ public Spectrum getSpectrumBySpecIndex(int specIndex) { - if (allLoaded) return cache.get(specIndex); - - Spectrum cached = cache.get(specIndex); - if (cached != null) return cached; - - // Return null immediately for unknown indices instead of triggering a full preload - if (!indexBySpecIdx.containsKey(specIndex)) return null; + SpectrumIndex si = indexBySpecIdx.get(specIndex); + if (si == null) return null; + if (si.msLevel < minMSLevel || si.msLevel > maxMSLevel) return null; - try { - preloadAllSpectra(); - } catch (Exception e) { - throw new RuntimeException("Failed to preload spectra while retrieving spectrum index " + specIndex, e); + if (!allLoaded && !cache.containsKey(specIndex)) { + try { + preloadAllSpectra(); + } catch (Exception e) { + throw new RuntimeException("Failed to preload spectra while retrieving spectrum index " + specIndex, e); + } } - return cache.get(specIndex); + return cloneSpectrum(cache.get(specIndex)); } /** - * Parse all spectra in a single sequential pass and populate the cache. + * Walk the file once and cache every in-filter spectrum. Out-of-filter + * spectra are skipped without binary decode — no {@link Spectrum} or + * {@code Peak} objects allocated. */ private synchronized void preloadAllSpectra() throws IOException, XMLStreamException { if (allLoaded) return; long startTime = System.currentTimeMillis(); + int loaded = 0, skipped = 0; try (InputStream is = new BufferedInputStream(new FileInputStream(specFile), 256 * 1024)) { XMLStreamReader reader = XML_INPUT_FACTORY.createXMLStreamReader(is); try { while (reader.hasNext()) { int event = reader.next(); if (event == XMLStreamConstants.START_ELEMENT && "spectrum".equals(reader.getLocalName())) { + // Skip out-of-filter spectra without binary decode by consulting + // the pre-built index via the spectrum's id attribute. + String id = reader.getAttributeValue(null, "id"); + SpectrumIndex si = id != null ? indexById.get(id) : null; + if (si != null && (si.msLevel < minMSLevel || si.msLevel > maxMSLevel)) { + skipElement(reader, "spectrum"); + skipped++; + continue; + } Spectrum spec = parseOneSpectrum(reader); if (spec != null) { + int ms = spec.getMSLevel(); + if (ms < minMSLevel || ms > maxMSLevel) { + // Index lookup missed (malformed mzML id mismatch); drop post-parse. + skipped++; + continue; + } cache.put(spec.getSpecIndex(), spec); + loaded++; } } } @@ -184,7 +218,37 @@ private synchronized void preloadAllSpectra() throws IOException, XMLStreamExcep } allLoaded = true; long elapsed = System.currentTimeMillis() - startTime; - System.out.println("StAX mzML preload: " + cache.size() + " spectra loaded in " + elapsed + " ms"); + System.out.println("StAX mzML preload: " + loaded + " spectra loaded (" + skipped + " filtered out by MS level) in " + elapsed + " ms"); + } + + /** + * Defensive copy of a cached {@link Spectrum}. Mirrors the field set + * populated by {@link #parseOneSpectrum}; keep the two in lock-step. + */ + private static Spectrum cloneSpectrum(Spectrum src) { + if (src == null) return null; + Spectrum dst = new Spectrum(); + dst.setID(src.getID()); + dst.setSpecIndex(src.getSpecIndex()); + if (src.getScanNum() > 0) dst.setScanNum(src.getScanNum()); + dst.setMsLevel(src.getMSLevel()); + dst.setIsCentroided(src.isCentroided()); + if (src.getScanPolarity() != null) dst.setScanPolarity(src.getScanPolarity()); + dst.setRt(src.getRt()); + dst.setRtIsSeconds(src.getRtIsSeconds()); + dst.setIsolationWindowTargetMz(src.getIsolationWindowTargetMz()); + if (src.getPrecursorPeak() != null) { + Peak p = src.getPrecursorPeak(); + dst.setPrecursor(new Peak(p.getMz(), p.getIntensity(), p.getCharge())); + } + if (src.getActivationMethod() != null) dst.setActivationMethod(src.getActivationMethod()); + if (src.getAddlCvParams() != null) { + for (CvParamInfo cv : src.getAddlCvParams()) dst.addAddlCvParam(cv); + } + for (Peak p : src) { + dst.add(new Peak(p.getMz(), p.getIntensity(), p.getCharge())); + } + return dst; } /** @@ -940,7 +1004,7 @@ private void cleanup() { /** * Suppress all logback logging. Called at startup to silence noisy - * library output (jmzidml, etc.). + * library output. */ public static void turnOffLogs() { if (!logOff) { diff --git a/src/main/java/edu/ucsd/msjava/ui/MSGFPlus.java b/src/main/java/edu/ucsd/msjava/ui/MSGFPlus.java index 0f6f54fd..3a10c9bc 100644 --- a/src/main/java/edu/ucsd/msjava/ui/MSGFPlus.java +++ b/src/main/java/edu/ucsd/msjava/ui/MSGFPlus.java @@ -423,27 +423,35 @@ private static String runMSGFPlus(int ioIndex, SpecFileFormat specFormat, File o try { for (int i = 0; i < numTasks; i++) { - ScoredSpectraMap specScanner = new ScoredSpectraMap( - specAcc, - Collections.synchronizedList(specKeyList.subList(startIndex[i], endIndex[i])), - leftPrecursorMassTolerance, - rightPrecursorMassTolerance, - minIsotopeError, - maxIsotopeError, - specDataType, - params.outputAdditionalFeatures(), - false, - precursorMassShiftPpm - ); - if (doNotUseEdgeScore) - specScanner.turnOffEdgeScoring(); + final int taskStartIndex = startIndex[i]; + final int taskEndIndex = endIndex[i]; + final boolean storeRankScorer = params.outputAdditionalFeatures(); + final int taskNum = i + 1; + // Defer ScoredSpectraMap construction to the worker thread so all + // tasks' spectrum heaps aren't allocated up front when queued. ConcurrentMSGFPlus.RunMSGFPlus msgfplusExecutor = new ConcurrentMSGFPlus.RunMSGFPlus( - specScanner, + () -> { + ScoredSpectraMap specScanner = new ScoredSpectraMap( + specAcc, + Collections.synchronizedList(specKeyList.subList(taskStartIndex, taskEndIndex)), + leftPrecursorMassTolerance, + rightPrecursorMassTolerance, + minIsotopeError, + maxIsotopeError, + specDataType, + storeRankScorer, + false, + precursorMassShiftPpm + ); + if (doNotUseEdgeScore) + specScanner.turnOffEdgeScoring(); + return specScanner; + }, sa, params, resultList, - i + 1 + taskNum ); if (DISABLE_THREADING) { diff --git a/src/test/java/edu/ucsd/msjava/msdbsearch/TestConcurrentMSGFPlus.java b/src/test/java/edu/ucsd/msjava/msdbsearch/TestConcurrentMSGFPlus.java new file mode 100644 index 00000000..9cb5b715 --- /dev/null +++ b/src/test/java/edu/ucsd/msjava/msdbsearch/TestConcurrentMSGFPlus.java @@ -0,0 +1,36 @@ +package edu.ucsd.msjava.msdbsearch; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestConcurrentMSGFPlus { + + @Test + public void defersScoredSpectraMapConstructionUntilRun() { + AtomicInteger buildCount = new AtomicInteger(); + ConcurrentMSGFPlus.RunMSGFPlus task = new ConcurrentMSGFPlus.RunMSGFPlus( + () -> { + buildCount.incrementAndGet(); + throw new IllegalStateException("sentinel"); + }, + null, + null, + Collections.emptyList(), + 1 + ); + + Assert.assertEquals(0, buildCount.get()); + + try { + task.run(); + Assert.fail("Expected the ScoredSpectraMap supplier to run inside run()."); + } catch (IllegalStateException expected) { + Assert.assertEquals("sentinel", expected.getMessage()); + } + + Assert.assertEquals(1, buildCount.get()); + } +} diff --git a/src/test/java/msgfplus/TestBuildSAParallelBitIdentity.java b/src/test/java/msgfplus/TestBuildSAParallelBitIdentity.java new file mode 100644 index 00000000..9e06abbf --- /dev/null +++ b/src/test/java/msgfplus/TestBuildSAParallelBitIdentity.java @@ -0,0 +1,110 @@ +package msgfplus; + +import edu.ucsd.msjava.msdbsearch.CompactFastaSequence; +import edu.ucsd.msjava.msdbsearch.CompactSuffixArray; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Bit-identity test: the parallel sort path must produce byte-identical + * .csarr/.cnlcp output to the single-thread path between the header and footer + * (header id and footer mtime are non-deterministic between builds). + */ +public class TestBuildSAParallelBitIdentity { + + /** Mirror of {@code CompactSuffixArray.SA_BUILD_THREADS_PROPERTY} (package-private there). */ + private static final String SA_BUILD_THREADS_PROPERTY = "msgfplus.buildsa.threads"; + + private static final String FIXTURE = "ecoli.fasta"; + + @Test + public void parallelMatchesSingleThreadByteForByte() throws Exception { + File singleArtifacts = stageFastaIntoTempDir("buildsa-N1"); + File parallelArtifacts = stageFastaIntoTempDir("buildsa-N4"); + try { + byte[] singleCsarr, singleCnlcp; + byte[] parallelCsarr, parallelCnlcp; + + String prevThreads = System.getProperty(SA_BUILD_THREADS_PROPERTY); + try { + System.setProperty(SA_BUILD_THREADS_PROPERTY, "1"); + CompactFastaSequence seq1 = new CompactFastaSequence(singleArtifacts.getAbsolutePath()); + new CompactSuffixArray(seq1); + singleCsarr = readBodyBytes(new File(stripExt(singleArtifacts.getAbsolutePath()) + ".csarr")); + singleCnlcp = readBodyBytes(new File(stripExt(singleArtifacts.getAbsolutePath()) + ".cnlcp")); + + System.setProperty(SA_BUILD_THREADS_PROPERTY, "4"); + CompactFastaSequence seq4 = new CompactFastaSequence(parallelArtifacts.getAbsolutePath()); + new CompactSuffixArray(seq4); + parallelCsarr = readBodyBytes(new File(stripExt(parallelArtifacts.getAbsolutePath()) + ".csarr")); + parallelCnlcp = readBodyBytes(new File(stripExt(parallelArtifacts.getAbsolutePath()) + ".cnlcp")); + } finally { + if (prevThreads == null) { + System.clearProperty(SA_BUILD_THREADS_PROPERTY); + } else { + System.setProperty(SA_BUILD_THREADS_PROPERTY, prevThreads); + } + } + + Assert.assertArrayEquals(".csarr post-header bytes must be identical between N=1 and N=4", singleCsarr, parallelCsarr); + Assert.assertArrayEquals(".cnlcp post-header bytes must be identical between N=1 and N=4", singleCnlcp, parallelCnlcp); + + File parentDir = parallelArtifacts.getAbsoluteFile().getParentFile(); + File[] debris = parentDir.listFiles((dir, name) -> name.contains(".buildsa-tmp.")); + Assert.assertNotNull(debris); + Assert.assertEquals("BuildSA temp files must be cleaned up on success: " + java.util.Arrays.toString(debris), + 0, debris.length); + } finally { + deleteDirRecursive(singleArtifacts.getParentFile()); + deleteDirRecursive(parallelArtifacts.getParentFile()); + } + } + + /** Copies the FASTA fixture into a fresh temp dir so build artifacts don't pollute test resources. */ + private static File stageFastaIntoTempDir(String prefix) throws Exception { + Path tempDir = Files.createTempDirectory(prefix); + File source = new File(TestBuildSAParallelBitIdentity.class.getClassLoader().getResource(FIXTURE).toURI()); + File dest = new File(tempDir.toFile(), source.getName()); + Files.copy(source.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); + return dest; + } + + /** + * Read the file with the 8-byte header (size + id) and 12-byte footer + * (lastModified + formatId) trimmed off. Both are non-deterministic between + * runs; the body in between is the actual sort output to compare. + */ + private static byte[] readBodyBytes(File f) throws IOException { + byte[] all = Files.readAllBytes(f.toPath()); + int headerSize = 8; + int footerSize = 8 + 4; + Assert.assertTrue("Output file too small: " + f, all.length >= headerSize + footerSize); + int bodyLen = all.length - headerSize - footerSize; + byte[] body = new byte[bodyLen]; + System.arraycopy(all, headerSize, body, 0, bodyLen); + return body; + } + + private static String stripExt(String path) { + int dot = path.lastIndexOf('.'); + return dot < 0 ? path : path.substring(0, dot); + } + + private static void deleteDirRecursive(File dir) { + if (dir == null || !dir.exists()) return; + File[] entries = dir.listFiles(); + if (entries != null) { + for (File f : entries) { + if (f.isDirectory()) deleteDirRecursive(f); + else f.delete(); + } + } + dir.delete(); + } +} diff --git a/src/test/java/msgfplus/TestStaxMzMLParser.java b/src/test/java/msgfplus/TestStaxMzMLParser.java index 204cd50a..a0924ba2 100644 --- a/src/test/java/msgfplus/TestStaxMzMLParser.java +++ b/src/test/java/msgfplus/TestStaxMzMLParser.java @@ -168,13 +168,34 @@ public void testGetSpectrumById() throws Exception { } @Test - public void testCacheHit() throws Exception { + public void testCacheReturnsDefensiveCopy() throws Exception { StaxMzMLParser parser = new StaxMzMLParser(getMzMLFile()); - // First access Spectrum spec1 = parser.getSpectrumBySpecIndex(2); - // Second access should hit cache (same object) Spectrum spec2 = parser.getSpectrumBySpecIndex(2); - Assert.assertSame("Cache should return same object", spec1, spec2); + Assert.assertNotSame("Defensive copy should return distinct instances", spec1, spec2); + + Assert.assertEquals(spec1.getID(), spec2.getID()); + Assert.assertEquals(spec1.size(), spec2.size()); + Assert.assertEquals(spec1.getPrecursorPeak().getMz(), spec2.getPrecursorPeak().getMz(), 0.0001f); + + // Mutation on one copy must not leak to a future read. + spec1.get(0).setRank(99); + Spectrum spec3 = parser.getSpectrumBySpecIndex(2); + Assert.assertNotSame(spec1, spec3); + Assert.assertNotEquals("Mutation must not leak through cache", 99, spec3.get(0).getRank()); + } + + @Test + public void testMSLevelPreloadFilter() throws Exception { + // tiny.pwiz.mzML has MS1 at indices 1, 3, 4 and MS2 at index 2. + StaxMzMLParser parser = new StaxMzMLParser(getMzMLFile(), 2, 2); + Assert.assertNull("MS1 (index 1) must be filtered out", parser.getSpectrumBySpecIndex(1)); + Assert.assertNull("MS1 (index 3) must be filtered out", parser.getSpectrumBySpecIndex(3)); + Assert.assertNull("MS1 (index 4) must be filtered out", parser.getSpectrumBySpecIndex(4)); + Spectrum ms2 = parser.getSpectrumBySpecIndex(2); + Assert.assertNotNull("MS2 (index 2) must come through", ms2); + Assert.assertEquals(2, ms2.getMSLevel()); + Assert.assertEquals(10, ms2.size()); } @Test diff --git a/src/test/resources/test.mzid b/src/test/resources/test.mzid deleted file mode 100644 index bf654b45..00000000 --- a/src/test/resources/test.mzid +++ /dev/null @@ -1,153 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - IGAYLFVDMAHVAGLIAAGVYPNPVPHAHVVTSTTHK - - - NLANPTSVILASIQMLEYLGMADK - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -