From acab49638032719c6319441a1a02e392901e7861 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Thu, 7 May 2026 15:06:30 +1000 Subject: [PATCH] speedups --- .../src/report_html_observations.rs | 21 + .../bioscript-cli/src/report_html_sections.rs | 16 +- rust/bioscript-cli/src/report_output.rs | 27 +- rust/bioscript-formats/src/genotype.rs | 18 + .../src/genotype/cram_backend.rs | 1 + .../src/genotype/cram_backend/observation.rs | 374 ++++++++++++++ .../src/genotype/cram_backend/store.rs | 486 +++++------------- rust/bioscript-formats/src/genotype/io.rs | 3 + rust/bioscript-formats/src/genotype/types.rs | 2 + .../src/genotype/vcf/matching.rs | 7 +- rust/bioscript-formats/src/prepare.rs | 2 +- .../tests/file_formats/basic.rs | 8 +- 12 files changed, 588 insertions(+), 377 deletions(-) create mode 100644 rust/bioscript-formats/src/genotype/cram_backend/observation.rs diff --git a/rust/bioscript-cli/src/report_html_observations.rs b/rust/bioscript-cli/src/report_html_observations.rs index 8f68623..f5946b4 100644 --- a/rust/bioscript-cli/src/report_html_observations.rs +++ b/rust/bioscript-cli/src/report_html_observations.rs @@ -41,6 +41,7 @@ fn render_observation_table( let show_facets = observations .iter() .any(|observation| !json_field_as_tsv(observation.get("facets")).is_empty()); + let show_imputed_reference_note = observations.iter().any(observation_is_imputed_vcf_reference); let headers = all_headers .iter() .copied() @@ -66,6 +67,9 @@ fn render_observation_table( out.push_str(""); } out.push_str(""); + if show_imputed_reference_note { + out.push_str("

* In variant-only VCF inputs, absent queried variant rows are shown as imputed reference genotypes. This is usually appropriate for variant-only VCFs, but it may be wrong if the VCF omits loci for another reason.

"); + } } fn observation_filter_group(observation: &serde_json::Value) -> &'static str { @@ -125,6 +129,14 @@ fn observation_row_class(observation: &serde_json::Value) -> &'static str { fn render_observation_cell(out: &mut String, observation: &serde_json::Value, header: &str) { let cell_class = table_column_class(header); + if header == "outcome" { + let mut value = json_field_as_tsv(observation.get(header)); + if value == "reference" && observation_is_imputed_vcf_reference(observation) { + value.push('*'); + } + let _ = write!(out, "{}", cell_class, html_escape(&value)); + return; + } if header == "ref_alt" { class_cell(out, &observation_ref_alt(observation), "mono"); return; @@ -184,6 +196,15 @@ fn render_observation_cell(out: &mut String, observation: &serde_json::Value, he ); } +fn observation_is_imputed_vcf_reference(observation: &serde_json::Value) -> bool { + observation + .get("evidence_raw") + .and_then(serde_json::Value::as_str) + .is_some_and(|evidence| { + evidence.contains("imputed reference genotype from absent variant-only VCF record") + }) +} + fn observation_ref_alt(observation: &serde_json::Value) -> String { let ref_allele = observation .get("ref") diff --git a/rust/bioscript-cli/src/report_html_sections.rs b/rust/bioscript-cli/src/report_html_sections.rs index 2f4e5a9..5fd80eb 100644 --- a/rust/bioscript-cli/src/report_html_sections.rs +++ b/rust/bioscript-cli/src/report_html_sections.rs @@ -168,6 +168,7 @@ fn render_input_debug(out: &mut String, reports: &[serde_json::Value], show_part "Source", "Assembly", "Inferred Sex", + "VCF Ref Imputation", "Evidence", ]); for (idx, header) in headers.iter().enumerate() { @@ -217,6 +218,7 @@ fn render_input_debug(out: &mut String, reports: &[serde_json::Value], show_part value_str(sex, "method"), ]), ); + table_cell(out, input_debug_vcf_imputation(debug)); table_cell(out, &input_debug_evidence(debug)); out.push_str(""); } @@ -256,6 +258,7 @@ fn render_input_debug_key_values(out: &mut String, report: &serde_json::Value) { value_str(sex, "method"), ]), ); + input_debug_kv(out, "VCF ref imputation", input_debug_vcf_imputation(debug)); input_debug_kv(out, "Evidence", &input_debug_evidence(debug)); out.push_str(""); } @@ -278,6 +281,18 @@ fn compact_join(values: &[&str]) -> String { .join(" / ") } +fn input_debug_vcf_imputation(debug: &serde_json::Value) -> &'static str { + if debug + .get("vcf_missing_reference_imputation") + .and_then(serde_json::Value::as_bool) + == Some(true) + { + "used" + } else { + "" + } +} + fn input_debug_evidence(debug: &serde_json::Value) -> String { let mut evidence = Vec::new(); collect_string_array(debug.get("evidence"), &mut evidence); @@ -300,4 +315,3 @@ fn collect_string_array(value: Option<&serde_json::Value>, out: &mut Vec } } } - diff --git a/rust/bioscript-cli/src/report_output.rs b/rust/bioscript-cli/src/report_output.rs index 6766d91..7f2eeb4 100644 --- a/rust/bioscript-cli/src/report_output.rs +++ b/rust/bioscript-cli/src/report_output.rs @@ -19,6 +19,20 @@ fn app_report_json(input: AppReportJsonInput<'_>) -> serde_json::Value { item.get("call_status").and_then(serde_json::Value::as_str) == Some("called") }) .count(); + let input_debug = input + .input_inspection + .map(|inspection| { + let mut value = input_inspection_json(inspection); + if observations_have_imputed_vcf_references(input.observations) + && let Some(object) = value.as_object_mut() + { + object.insert( + "vcf_missing_reference_imputation".to_owned(), + serde_json::Value::Bool(true), + ); + } + value + }); serde_json::json!({ "schema": "bioscript:report:1.0", "version": "1.0", @@ -29,7 +43,7 @@ fn app_report_json(input: AppReportJsonInput<'_>) -> serde_json::Value { "input": { "file_name": input.input_file.file_name().and_then(|value| value.to_str()).unwrap_or_default(), "file_path": input.input_file.display().to_string(), - "debug": input.input_inspection.map(input_inspection_json), + "debug": input_debug, }, "report_status": if called == input.observations.len() { "complete" } else { "partial" }, "derived_from": input.observations.iter().filter_map(|item| item.get("variant_key").cloned()).collect::>(), @@ -46,6 +60,17 @@ fn app_report_json(input: AppReportJsonInput<'_>) -> serde_json::Value { }) } +fn observations_have_imputed_vcf_references(observations: &[serde_json::Value]) -> bool { + observations.iter().any(|observation| { + observation + .get("evidence_raw") + .and_then(serde_json::Value::as_str) + .is_some_and(|evidence| { + evidence.contains("imputed reference genotype from absent variant-only VCF record") + }) + }) +} + fn report_manifest_metadata(path: &Path) -> Result { let text = fs::read_to_string(path) .map_err(|err| format!("failed to read manifest metadata {}: {err}", path.display()))?; diff --git a/rust/bioscript-formats/src/genotype.rs b/rust/bioscript-formats/src/genotype.rs index c082921..0fb784e 100644 --- a/rust/bioscript-formats/src/genotype.rs +++ b/rust/bioscript-formats/src/genotype.rs @@ -84,6 +84,10 @@ impl GenotypeStore { GenotypeSourceFormat::Zip => Self::from_zip_file(path), GenotypeSourceFormat::Vcf => Ok(Self::from_vcf_file(path, options)), GenotypeSourceFormat::Cram => Self::from_cram_file(path, options), + GenotypeSourceFormat::Bam => Err(RuntimeError::Unsupported(format!( + "BAM alignment lookup is not implemented yet for {}", + path.display() + ))), } } @@ -361,6 +365,7 @@ impl RsidMapBackend { GenotypeSourceFormat::Zip => "zip", GenotypeSourceFormat::Vcf => "vcf", GenotypeSourceFormat::Cram => "cram", + GenotypeSourceFormat::Bam => "bam", } } @@ -392,6 +397,7 @@ impl DelimitedBackend { GenotypeSourceFormat::Zip => "zip", GenotypeSourceFormat::Vcf => "vcf", GenotypeSourceFormat::Cram => "cram", + GenotypeSourceFormat::Bam => "bam", } } @@ -939,6 +945,18 @@ mod tests { detect_source_format(&text, Some(GenotypeSourceFormat::Cram)).unwrap(), GenotypeSourceFormat::Cram )); + let bam = dir.join("sample.bam"); + fs::write(&bam, b"BAM").unwrap(); + assert!(matches!( + detect_source_format(&bam, None).unwrap(), + GenotypeSourceFormat::Bam + )); + assert!( + GenotypeStore::from_file(&bam) + .unwrap_err() + .to_string() + .contains("BAM alignment lookup is not implemented yet") + ); assert!(!looks_like_vcf_lines(&["rsid\tgenotype".to_owned()])); let backend = DelimitedBackend { diff --git a/rust/bioscript-formats/src/genotype/cram_backend.rs b/rust/bioscript-formats/src/genotype/cram_backend.rs index 01787f3..30d3747 100644 --- a/rust/bioscript-formats/src/genotype/cram_backend.rs +++ b/rust/bioscript-formats/src/genotype/cram_backend.rs @@ -18,6 +18,7 @@ use crate::alignment; use super::GenotypeLoadOptions; mod indel; +mod observation; mod reader; mod store; diff --git a/rust/bioscript-formats/src/genotype/cram_backend/observation.rs b/rust/bioscript-formats/src/genotype/cram_backend/observation.rs new file mode 100644 index 0000000..1618e15 --- /dev/null +++ b/rust/bioscript-formats/src/genotype/cram_backend/observation.rs @@ -0,0 +1,374 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + path::Path, +}; + +use bioscript_core::{ + Assembly, GenomicLocus, RuntimeError, VariantKind, VariantObservation, VariantSpec, +}; +use noodles::cram; + +use crate::alignment::{self, AlignmentOpKind}; +use crate::genotype::types::CramBackend; + +use super::{ + anchor_window, classify_expected_indel, describe_copy_number_decision_rule, describe_locus, + describe_snp_decision_rule, first_base, indel_at_anchor, infer_copy_number_genotype, + infer_snp_genotype, observe_snp_pileup, record_overlaps_locus, snp_pileup_with_reader, + spans_position, +}; + +impl CramBackend { + pub(super) fn observe_with_reader( + &self, + reader: &mut cram::io::indexed_reader::IndexedReader, + label: &str, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + ) -> Result { + match variant.kind.unwrap_or(VariantKind::Other) { + VariantKind::Snp => { + self.observe_snp_with_reader(reader, label, variant, assembly, locus) + } + VariantKind::Deletion => { + self.observe_deletion_with_reader(reader, label, variant, assembly, locus) + } + VariantKind::Insertion | VariantKind::Indel => { + self.observe_indel_with_reader(reader, label, variant, assembly, locus) + } + VariantKind::Other => Err(RuntimeError::Unsupported(format!( + "backend '{}' does not yet support {:?} observation for {}", + self.backend_name(), + variant.kind.unwrap_or(VariantKind::Other), + self.path.display() + ))), + } + } + + pub(super) fn observe_snp( + &self, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + reference_file: &Path, + ) -> Result { + let reference = variant + .reference + .as_deref() + .and_then(first_base) + .ok_or_else(|| { + RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned()) + })?; + let alternate = variant + .alternate + .as_deref() + .and_then(first_base) + .ok_or_else(|| { + RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned()) + })?; + + let target_pos = locus.start; + let pileup = observe_snp_pileup( + &self.path, + &self.options, + reference_file, + locus, + reference, + alternate, + )?; + let ref_count = pileup.filtered_ref_count; + let alt_count = pileup.filtered_alt_count; + let depth = pileup.filtered_depth; + + let evidence = pileup.evidence_lines(&describe_locus(locus), target_pos); + + Ok(VariantObservation { + backend: self.backend_name().to_owned(), + matched_rsid: variant.rsids.first().cloned(), + assembly: Some(assembly), + genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth), + ref_count: Some(ref_count), + alt_count: Some(alt_count), + depth: Some(depth), + raw_counts: pileup.raw_base_counts, + decision: Some(describe_snp_decision_rule( + reference, alternate, ref_count, alt_count, depth, + )), + evidence, + }) + } + + fn observe_snp_with_reader( + &self, + reader: &mut cram::io::indexed_reader::IndexedReader, + label: &str, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + ) -> Result { + let reference = variant + .reference + .as_deref() + .and_then(first_base) + .ok_or_else(|| { + RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned()) + })?; + let alternate = variant + .alternate + .as_deref() + .and_then(first_base) + .ok_or_else(|| { + RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned()) + })?; + + let pileup = snp_pileup_with_reader( + reader, + label, + locus, + reference, + alternate, + self.options.allow_reference_md5_mismatch, + )?; + let ref_count = pileup.filtered_ref_count; + let alt_count = pileup.filtered_alt_count; + let depth = pileup.filtered_depth; + let evidence = pileup.evidence_lines(&describe_locus(locus), locus.start); + + Ok(VariantObservation { + backend: self.backend_name().to_owned(), + matched_rsid: variant.rsids.first().cloned(), + assembly: Some(assembly), + genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth), + ref_count: Some(ref_count), + alt_count: Some(alt_count), + depth: Some(depth), + raw_counts: pileup.raw_base_counts, + decision: Some(describe_snp_decision_rule( + reference, alternate, ref_count, alt_count, depth, + )), + evidence, + }) + } + + pub(super) fn observe_deletion( + &self, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + reference_file: &Path, + ) -> Result { + let deletion_length = variant.deletion_length.ok_or_else(|| { + RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned()) + })?; + let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned()); + let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned()); + let anchor_pos = locus.start.saturating_sub(1); + + let mut alt_count = 0u32; + let mut ref_count = 0u32; + let mut depth = 0u32; + + alignment::for_each_cram_record( + &self.path, + &self.options, + reference_file, + &anchor_window(locus), + |record| { + if record.is_unmapped || !spans_position(&record, anchor_pos) { + return Ok(true); + } + depth += 1; + match indel_at_anchor(&record, anchor_pos) { + Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => { + alt_count += 1; + } + _ => ref_count += 1, + } + Ok(true) + }, + )?; + + Ok(VariantObservation { + backend: self.backend_name().to_owned(), + matched_rsid: variant.rsids.first().cloned(), + assembly: Some(assembly), + genotype: infer_copy_number_genotype( + &reference, &alternate, ref_count, alt_count, depth, + ), + ref_count: Some(ref_count), + alt_count: Some(alt_count), + depth: Some(depth), + raw_counts: BTreeMap::new(), + decision: Some(describe_copy_number_decision_rule( + &reference, &alternate, ref_count, alt_count, depth, + )), + evidence: vec![format!( + "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}", + locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count + )], + }) + } + + fn observe_deletion_with_reader( + &self, + reader: &mut cram::io::indexed_reader::IndexedReader, + label: &str, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + ) -> Result { + let deletion_length = variant.deletion_length.ok_or_else(|| { + RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned()) + })?; + let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned()); + let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned()); + let anchor_pos = locus.start.saturating_sub(1); + + let mut alt_count = 0u32; + let mut ref_count = 0u32; + let mut depth = 0u32; + + alignment::for_each_cram_record_with_reader( + reader, + label, + &anchor_window(locus), + |record| { + if record.is_unmapped || !spans_position(&record, anchor_pos) { + return Ok(true); + } + depth += 1; + match indel_at_anchor(&record, anchor_pos) { + Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => { + alt_count += 1; + } + _ => ref_count += 1, + } + Ok(true) + }, + )?; + + Ok(VariantObservation { + backend: self.backend_name().to_owned(), + matched_rsid: variant.rsids.first().cloned(), + assembly: Some(assembly), + genotype: infer_copy_number_genotype( + &reference, &alternate, ref_count, alt_count, depth, + ), + ref_count: Some(ref_count), + alt_count: Some(alt_count), + depth: Some(depth), + raw_counts: BTreeMap::new(), + decision: Some(describe_copy_number_decision_rule( + &reference, &alternate, ref_count, alt_count, depth, + )), + evidence: vec![format!( + "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}", + locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count + )], + }) + } + + pub(super) fn observe_indel( + &self, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + reference_file: &Path, + ) -> Result { + let reference = variant.reference.clone().ok_or_else(|| { + RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned()) + })?; + let alternate = variant.alternate.clone().ok_or_else(|| { + RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned()) + })?; + let records = + alignment::query_cram_records(&self.path, &self.options, reference_file, locus)?; + + let mut alt_count = 0u32; + let mut ref_count = 0u32; + let mut depth = 0u32; + let mut matching_alt_lengths = BTreeSet::new(); + + for record in records { + if record.is_unmapped { + continue; + } + if !record_overlaps_locus(&record, locus) { + continue; + } + let classification = + classify_expected_indel(&record, locus, reference.len(), &alternate)?; + if !classification.covering { + continue; + } + depth += 1; + if classification.matches_alt { + alt_count += 1; + matching_alt_lengths.insert(classification.observed_len); + } else if classification.reference_like { + ref_count += 1; + } + } + + let evidence_label = if matching_alt_lengths.is_empty() { + "none".to_owned() + } else { + matching_alt_lengths + .into_iter() + .map(|len| len.to_string()) + .collect::>() + .join(",") + }; + + Ok(VariantObservation { + backend: self.backend_name().to_owned(), + matched_rsid: variant.rsids.first().cloned(), + assembly: Some(assembly), + genotype: infer_copy_number_genotype( + &reference, &alternate, ref_count, alt_count, depth, + ), + ref_count: Some(ref_count), + alt_count: Some(alt_count), + depth: Some(depth), + raw_counts: BTreeMap::new(), + decision: Some(describe_copy_number_decision_rule( + &reference, &alternate, ref_count, alt_count, depth, + )), + evidence: vec![format!( + "observed indel at {} depth={} ref_count={} alt_count={} matching_alt_lengths={}", + describe_locus(locus), + depth, + ref_count, + alt_count, + evidence_label + )], + }) + } + + fn observe_indel_with_reader( + &self, + reader: &mut cram::io::indexed_reader::IndexedReader, + label: &str, + variant: &VariantSpec, + assembly: Assembly, + locus: &GenomicLocus, + ) -> Result { + let reference = variant.reference.clone().ok_or_else(|| { + RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned()) + })?; + let alternate = variant.alternate.clone().ok_or_else(|| { + RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned()) + })?; + + super::observe_cram_indel_with_reader( + reader, + label, + locus, + &reference, + &alternate, + variant.rsids.first().cloned(), + Some(assembly), + ) + } +} diff --git a/rust/bioscript-formats/src/genotype/cram_backend/store.rs b/rust/bioscript-formats/src/genotype/cram_backend/store.rs index 7af7aca..955e33a 100644 --- a/rust/bioscript-formats/src/genotype/cram_backend/store.rs +++ b/rust/bioscript-formats/src/genotype/cram_backend/store.rs @@ -1,23 +1,10 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Write as _, - path::Path, -}; +use std::{env, fmt::Write as _, path::Path, thread}; -use bioscript_core::{ - Assembly, GenomicLocus, RuntimeError, VariantKind, VariantObservation, VariantSpec, -}; +use bioscript_core::{RuntimeError, VariantKind, VariantObservation, VariantSpec}; -use noodles::cram; +use crate::alignment; -use crate::alignment::{self, AlignmentOpKind}; - -use super::{ - anchor_window, choose_variant_locus, classify_expected_indel, - describe_copy_number_decision_rule, describe_locus, describe_snp_decision_rule, first_base, - indel_at_anchor, infer_copy_number_genotype, infer_snp_genotype, observe_snp_pileup, - record_overlaps_locus, snp_pileup_with_reader, spans_position, -}; +use super::choose_variant_locus; use crate::genotype::{describe_query, types::CramBackend}; impl CramBackend { @@ -89,27 +76,119 @@ impl CramBackend { ))); }; + let mut indexed: Vec<(usize, &VariantSpec)> = variants.iter().enumerate().collect(); + indexed.sort_by_cached_key(|(_, variant)| crate::genotype::variant_sort_key(variant)); + + let worker_count = cram_lookup_worker_count(indexed.len()); + if worker_count <= 1 { + return self.lookup_variants_serial(reference_file, &indexed); + } + + let jobs = indexed + .into_iter() + .map(|(idx, variant)| (idx, variant.clone())) + .collect::>(); + self.lookup_variants_parallel(reference_file, &jobs, worker_count) + } + + fn lookup_variants_serial( + &self, + reference_file: &Path, + indexed: &[(usize, &VariantSpec)], + ) -> Result, RuntimeError> { let repository = alignment::build_reference_repository(reference_file)?; let mut reader = alignment::build_cram_indexed_reader_from_path(&self.path, &self.options, repository)?; let label = self.path.display().to_string(); - let mut indexed: Vec<(usize, &VariantSpec)> = variants.iter().enumerate().collect(); - indexed.sort_by_cached_key(|(_, variant)| crate::genotype::variant_sort_key(variant)); - - let mut results = vec![VariantObservation::default(); variants.len()]; + let result_len = indexed + .iter() + .map(|(idx, _)| *idx) + .max() + .map_or(0, |idx| idx + 1); + let mut results = vec![VariantObservation::default(); result_len]; for (idx, variant) in indexed { let Some((assembly, locus)) = choose_variant_locus(variant, reference_file) else { - results[idx] = self.unsupported_locus_observation(variant, reference_file); + results[*idx] = self.unsupported_locus_observation(variant, reference_file); continue; }; - results[idx] = + results[*idx] = self.observe_with_reader(&mut reader, &label, variant, assembly, &locus)?; } Ok(results) } + fn lookup_variants_parallel( + &self, + reference_file: &Path, + jobs: &[(usize, VariantSpec)], + worker_count: usize, + ) -> Result, RuntimeError> { + let result_len = jobs + .iter() + .map(|(idx, _)| *idx) + .max() + .map_or(0, |idx| idx + 1); + let chunk_size = jobs.len().div_ceil(worker_count); + let path = self.path.clone(); + let options = self.options.clone(); + let reference_file = reference_file.to_path_buf(); + + let mut handles = Vec::new(); + for chunk in jobs.chunks(chunk_size) { + let chunk = chunk.to_vec(); + let worker = CramBackend { + path: path.clone(), + options: options.clone(), + }; + let reference_file = reference_file.clone(); + handles.push(thread::spawn( + move || -> Result, RuntimeError> { + let repository = alignment::build_reference_repository(&reference_file)?; + let mut reader = alignment::build_cram_indexed_reader_from_path( + &worker.path, + &worker.options, + repository, + )?; + let label = worker.path.display().to_string(); + let mut observations = Vec::with_capacity(chunk.len()); + + for (idx, variant) in chunk { + let observation = if let Some((assembly, locus)) = + choose_variant_locus(&variant, &reference_file) + { + worker.observe_with_reader( + &mut reader, + &label, + &variant, + assembly, + &locus, + )? + } else { + worker.unsupported_locus_observation(&variant, &reference_file) + }; + observations.push((idx, observation)); + } + + Ok(observations) + }, + )); + } + + let mut results = vec![VariantObservation::default(); result_len]; + for handle in handles { + let observations = handle + .join() + .map_err(|_| RuntimeError::Io("CRAM lookup worker panicked".to_owned()))??; + for (idx, observation) in observations { + results[idx] = observation; + } + } + + Ok(results) + } + fn unsupported_locus_observation( &self, variant: &VariantSpec, @@ -130,357 +209,22 @@ impl CramBackend { ..VariantObservation::default() } } +} - fn observe_with_reader( - &self, - reader: &mut cram::io::indexed_reader::IndexedReader, - label: &str, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - ) -> Result { - match variant.kind.unwrap_or(VariantKind::Other) { - VariantKind::Snp => { - self.observe_snp_with_reader(reader, label, variant, assembly, locus) - } - VariantKind::Deletion => { - self.observe_deletion_with_reader(reader, label, variant, assembly, locus) - } - VariantKind::Insertion | VariantKind::Indel => { - self.observe_indel_with_reader(reader, label, variant, assembly, locus) - } - VariantKind::Other => Err(RuntimeError::Unsupported(format!( - "backend '{}' does not yet support {:?} observation for {}", - self.backend_name(), - variant.kind.unwrap_or(VariantKind::Other), - self.path.display() - ))), - } - } - - fn observe_snp( - &self, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - reference_file: &Path, - ) -> Result { - let reference = variant - .reference - .as_deref() - .and_then(first_base) - .ok_or_else(|| { - RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned()) - })?; - let alternate = variant - .alternate - .as_deref() - .and_then(first_base) - .ok_or_else(|| { - RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned()) - })?; - - let target_pos = locus.start; - let pileup = observe_snp_pileup( - &self.path, - &self.options, - reference_file, - locus, - reference, - alternate, - )?; - let ref_count = pileup.filtered_ref_count; - let alt_count = pileup.filtered_alt_count; - let depth = pileup.filtered_depth; - - let evidence = pileup.evidence_lines(&describe_locus(locus), target_pos); - - Ok(VariantObservation { - backend: self.backend_name().to_owned(), - matched_rsid: variant.rsids.first().cloned(), - assembly: Some(assembly), - genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth), - ref_count: Some(ref_count), - alt_count: Some(alt_count), - depth: Some(depth), - raw_counts: pileup.raw_base_counts, - decision: Some(describe_snp_decision_rule( - reference, alternate, ref_count, alt_count, depth, - )), - evidence, - }) - } - - fn observe_snp_with_reader( - &self, - reader: &mut cram::io::indexed_reader::IndexedReader, - label: &str, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - ) -> Result { - let reference = variant - .reference - .as_deref() - .and_then(first_base) - .ok_or_else(|| { - RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned()) - })?; - let alternate = variant - .alternate - .as_deref() - .and_then(first_base) - .ok_or_else(|| { - RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned()) - })?; - - let pileup = snp_pileup_with_reader( - reader, - label, - locus, - reference, - alternate, - self.options.allow_reference_md5_mismatch, - )?; - let ref_count = pileup.filtered_ref_count; - let alt_count = pileup.filtered_alt_count; - let depth = pileup.filtered_depth; - let evidence = pileup.evidence_lines(&describe_locus(locus), locus.start); - - Ok(VariantObservation { - backend: self.backend_name().to_owned(), - matched_rsid: variant.rsids.first().cloned(), - assembly: Some(assembly), - genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth), - ref_count: Some(ref_count), - alt_count: Some(alt_count), - depth: Some(depth), - raw_counts: pileup.raw_base_counts, - decision: Some(describe_snp_decision_rule( - reference, alternate, ref_count, alt_count, depth, - )), - evidence, - }) - } - - fn observe_deletion( - &self, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - reference_file: &Path, - ) -> Result { - let deletion_length = variant.deletion_length.ok_or_else(|| { - RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned()) - })?; - let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned()); - let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned()); - let anchor_pos = locus.start.saturating_sub(1); - - let mut alt_count = 0u32; - let mut ref_count = 0u32; - let mut depth = 0u32; - - alignment::for_each_cram_record( - &self.path, - &self.options, - reference_file, - &anchor_window(locus), - |record| { - if record.is_unmapped || !spans_position(&record, anchor_pos) { - return Ok(true); - } - depth += 1; - match indel_at_anchor(&record, anchor_pos) { - Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => { - alt_count += 1; - } - _ => ref_count += 1, - } - Ok(true) - }, - )?; - - Ok(VariantObservation { - backend: self.backend_name().to_owned(), - matched_rsid: variant.rsids.first().cloned(), - assembly: Some(assembly), - genotype: infer_copy_number_genotype( - &reference, &alternate, ref_count, alt_count, depth, - ), - ref_count: Some(ref_count), - alt_count: Some(alt_count), - depth: Some(depth), - raw_counts: BTreeMap::new(), - decision: Some(describe_copy_number_decision_rule( - &reference, &alternate, ref_count, alt_count, depth, - )), - evidence: vec![format!( - "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}", - locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count - )], - }) - } - - fn observe_deletion_with_reader( - &self, - reader: &mut cram::io::indexed_reader::IndexedReader, - label: &str, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - ) -> Result { - let deletion_length = variant.deletion_length.ok_or_else(|| { - RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned()) - })?; - let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned()); - let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned()); - let anchor_pos = locus.start.saturating_sub(1); - - let mut alt_count = 0u32; - let mut ref_count = 0u32; - let mut depth = 0u32; - - alignment::for_each_cram_record_with_reader( - reader, - label, - &anchor_window(locus), - |record| { - if record.is_unmapped || !spans_position(&record, anchor_pos) { - return Ok(true); - } - depth += 1; - match indel_at_anchor(&record, anchor_pos) { - Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => { - alt_count += 1; - } - _ => ref_count += 1, - } - Ok(true) - }, - )?; - - Ok(VariantObservation { - backend: self.backend_name().to_owned(), - matched_rsid: variant.rsids.first().cloned(), - assembly: Some(assembly), - genotype: infer_copy_number_genotype( - &reference, &alternate, ref_count, alt_count, depth, - ), - ref_count: Some(ref_count), - alt_count: Some(alt_count), - depth: Some(depth), - raw_counts: BTreeMap::new(), - decision: Some(describe_copy_number_decision_rule( - &reference, &alternate, ref_count, alt_count, depth, - )), - evidence: vec![format!( - "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}", - locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count - )], - }) - } - - fn observe_indel( - &self, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - reference_file: &Path, - ) -> Result { - let reference = variant.reference.clone().ok_or_else(|| { - RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned()) - })?; - let alternate = variant.alternate.clone().ok_or_else(|| { - RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned()) - })?; - let records = - alignment::query_cram_records(&self.path, &self.options, reference_file, locus)?; - - let mut alt_count = 0u32; - let mut ref_count = 0u32; - let mut depth = 0u32; - let mut matching_alt_lengths = BTreeSet::new(); - - for record in records { - if record.is_unmapped { - continue; - } - if !record_overlaps_locus(&record, locus) { - continue; - } - let classification = - classify_expected_indel(&record, locus, reference.len(), &alternate)?; - if !classification.covering { - continue; - } - depth += 1; - if classification.matches_alt { - alt_count += 1; - matching_alt_lengths.insert(classification.observed_len); - } else if classification.reference_like { - ref_count += 1; - } - } - - let evidence_label = if matching_alt_lengths.is_empty() { - "none".to_owned() - } else { - matching_alt_lengths - .into_iter() - .map(|len| len.to_string()) - .collect::>() - .join(",") - }; +const DEFAULT_MAX_CRAM_WORKERS: usize = 16; - Ok(VariantObservation { - backend: self.backend_name().to_owned(), - matched_rsid: variant.rsids.first().cloned(), - assembly: Some(assembly), - genotype: infer_copy_number_genotype( - &reference, &alternate, ref_count, alt_count, depth, - ), - ref_count: Some(ref_count), - alt_count: Some(alt_count), - depth: Some(depth), - raw_counts: BTreeMap::new(), - decision: Some(describe_copy_number_decision_rule( - &reference, &alternate, ref_count, alt_count, depth, - )), - evidence: vec![format!( - "observed indel at {} depth={} ref_count={} alt_count={} matching_alt_lengths={}", - describe_locus(locus), - depth, - ref_count, - alt_count, - evidence_label - )], - }) +fn cram_lookup_worker_count(job_count: usize) -> usize { + if job_count <= 1 { + return 1; } - fn observe_indel_with_reader( - &self, - reader: &mut cram::io::indexed_reader::IndexedReader, - label: &str, - variant: &VariantSpec, - assembly: Assembly, - locus: &GenomicLocus, - ) -> Result { - let reference = variant.reference.clone().ok_or_else(|| { - RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned()) - })?; - let alternate = variant.alternate.clone().ok_or_else(|| { - RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned()) - })?; + let requested = env::var("BIOSCRIPT_CRAM_THREADS") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0); + let available = thread::available_parallelism().map_or(1, usize::from); - super::observe_cram_indel_with_reader( - reader, - label, - locus, - &reference, - &alternate, - variant.rsids.first().cloned(), - Some(assembly), - ) - } + requested + .unwrap_or_else(|| available.min(DEFAULT_MAX_CRAM_WORKERS)) + .min(job_count) } diff --git a/rust/bioscript-formats/src/genotype/io.rs b/rust/bioscript-formats/src/genotype/io.rs index d350fdb..c70622d 100644 --- a/rust/bioscript-formats/src/genotype/io.rs +++ b/rust/bioscript-formats/src/genotype/io.rs @@ -129,6 +129,9 @@ pub(crate) fn detect_source_format( if lower.ends_with(".cram") { return Ok(GenotypeSourceFormat::Cram); } + if lower.ends_with(".bam") { + return Ok(GenotypeSourceFormat::Bam); + } if lower.ends_with(".vcf") || lower.ends_with(".vcf.gz") { return Ok(GenotypeSourceFormat::Vcf); } diff --git a/rust/bioscript-formats/src/genotype/types.rs b/rust/bioscript-formats/src/genotype/types.rs index aad9abe..310066c 100644 --- a/rust/bioscript-formats/src/genotype/types.rs +++ b/rust/bioscript-formats/src/genotype/types.rs @@ -60,6 +60,7 @@ pub enum GenotypeSourceFormat { Zip, Vcf, Cram, + Bam, } impl FromStr for GenotypeSourceFormat { @@ -71,6 +72,7 @@ impl FromStr for GenotypeSourceFormat { "zip" => Ok(Self::Zip), "vcf" => Ok(Self::Vcf), "cram" => Ok(Self::Cram), + "bam" => Ok(Self::Bam), other => Err(format!("unsupported input format: {other}")), } } diff --git a/rust/bioscript-formats/src/genotype/vcf/matching.rs b/rust/bioscript-formats/src/genotype/vcf/matching.rs index 5a65a7e..719ad9b 100644 --- a/rust/bioscript-formats/src/genotype/vcf/matching.rs +++ b/rust/bioscript-formats/src/genotype/vcf/matching.rs @@ -37,13 +37,18 @@ pub(super) fn imputed_reference_observation( let reference = first_single_base_allele(variant.reference.as_deref())?; first_single_base_allele(variant.alternate.as_deref())?; let genotype = reference_genotype_for_locus(reference, locus, inferred_sex); + let evidence_prefix = if missing_evidence.contains(label) { + missing_evidence.to_owned() + } else { + format!("{label}: {missing_evidence}") + }; Some(VariantObservation { backend: backend_name.to_owned(), matched_rsid: variant.rsids.first().cloned(), assembly, genotype: Some(genotype), evidence: vec![format!( - "{label}: {missing_evidence} | imputed reference genotype from absent variant-only VCF record" + "{evidence_prefix} | imputed reference genotype from absent variant-only VCF record" )], ..VariantObservation::default() }) diff --git a/rust/bioscript-formats/src/prepare.rs b/rust/bioscript-formats/src/prepare.rs index 3d62a0d..a93238d 100644 --- a/rust/bioscript-formats/src/prepare.rs +++ b/rust/bioscript-formats/src/prepare.rs @@ -49,7 +49,7 @@ pub fn prepare_indexes(request: &PrepareRequest) -> Result { + (Some(path), Some(GenotypeSourceFormat::Cram | GenotypeSourceFormat::Bam)) => { Some(ensure_alignment_index(path, &cache_dir)?) } (Some(path), None) if detect_alignment_input(path) => { diff --git a/rust/bioscript-formats/tests/file_formats/basic.rs b/rust/bioscript-formats/tests/file_formats/basic.rs index 8dca8f8..906dbc9 100644 --- a/rust/bioscript-formats/tests/file_formats/basic.rs +++ b/rust/bioscript-formats/tests/file_formats/basic.rs @@ -38,9 +38,13 @@ fn genotype_source_format_parses_supported_values_and_rejects_unknowns() { "cram".parse::().unwrap(), GenotypeSourceFormat::Cram ); + assert_eq!( + "bam".parse::().unwrap(), + GenotypeSourceFormat::Bam + ); - let err = "bam".parse::().unwrap_err(); - assert_eq!(err, "unsupported input format: bam"); + let err = "plink".parse::().unwrap_err(); + assert_eq!(err, "unsupported input format: plink"); } #[test]