diff --git a/rust/bioscript-cli/src/report_html_observations.rs b/rust/bioscript-cli/src/report_html_observations.rs
index 8f68623..f5946b4 100644
--- a/rust/bioscript-cli/src/report_html_observations.rs
+++ b/rust/bioscript-cli/src/report_html_observations.rs
@@ -41,6 +41,7 @@ fn render_observation_table(
let show_facets = observations
.iter()
.any(|observation| !json_field_as_tsv(observation.get("facets")).is_empty());
+ let show_imputed_reference_note = observations.iter().any(observation_is_imputed_vcf_reference);
let headers = all_headers
.iter()
.copied()
@@ -66,6 +67,9 @@ fn render_observation_table(
out.push_str("");
}
out.push_str("");
+ if show_imputed_reference_note {
+ out.push_str("
* In variant-only VCF inputs, absent queried variant rows are shown as imputed reference genotypes. This is usually appropriate for variant-only VCFs, but it may be wrong if the VCF omits loci for another reason.
");
+ }
}
fn observation_filter_group(observation: &serde_json::Value) -> &'static str {
@@ -125,6 +129,14 @@ fn observation_row_class(observation: &serde_json::Value) -> &'static str {
fn render_observation_cell(out: &mut String, observation: &serde_json::Value, header: &str) {
let cell_class = table_column_class(header);
+ if header == "outcome" {
+ let mut value = json_field_as_tsv(observation.get(header));
+ if value == "reference" && observation_is_imputed_vcf_reference(observation) {
+ value.push('*');
+ }
+ let _ = write!(out, "{} | ", cell_class, html_escape(&value));
+ return;
+ }
if header == "ref_alt" {
class_cell(out, &observation_ref_alt(observation), "mono");
return;
@@ -184,6 +196,15 @@ fn render_observation_cell(out: &mut String, observation: &serde_json::Value, he
);
}
+fn observation_is_imputed_vcf_reference(observation: &serde_json::Value) -> bool {
+ observation
+ .get("evidence_raw")
+ .and_then(serde_json::Value::as_str)
+ .is_some_and(|evidence| {
+ evidence.contains("imputed reference genotype from absent variant-only VCF record")
+ })
+}
+
fn observation_ref_alt(observation: &serde_json::Value) -> String {
let ref_allele = observation
.get("ref")
diff --git a/rust/bioscript-cli/src/report_html_sections.rs b/rust/bioscript-cli/src/report_html_sections.rs
index 2f4e5a9..5fd80eb 100644
--- a/rust/bioscript-cli/src/report_html_sections.rs
+++ b/rust/bioscript-cli/src/report_html_sections.rs
@@ -168,6 +168,7 @@ fn render_input_debug(out: &mut String, reports: &[serde_json::Value], show_part
"Source",
"Assembly",
"Inferred Sex",
+ "VCF Ref Imputation",
"Evidence",
]);
for (idx, header) in headers.iter().enumerate() {
@@ -217,6 +218,7 @@ fn render_input_debug(out: &mut String, reports: &[serde_json::Value], show_part
value_str(sex, "method"),
]),
);
+ table_cell(out, input_debug_vcf_imputation(debug));
table_cell(out, &input_debug_evidence(debug));
out.push_str("");
}
@@ -256,6 +258,7 @@ fn render_input_debug_key_values(out: &mut String, report: &serde_json::Value) {
value_str(sex, "method"),
]),
);
+ input_debug_kv(out, "VCF ref imputation", input_debug_vcf_imputation(debug));
input_debug_kv(out, "Evidence", &input_debug_evidence(debug));
out.push_str("");
}
@@ -278,6 +281,18 @@ fn compact_join(values: &[&str]) -> String {
.join(" / ")
}
+fn input_debug_vcf_imputation(debug: &serde_json::Value) -> &'static str {
+ if debug
+ .get("vcf_missing_reference_imputation")
+ .and_then(serde_json::Value::as_bool)
+ == Some(true)
+ {
+ "used"
+ } else {
+ ""
+ }
+}
+
fn input_debug_evidence(debug: &serde_json::Value) -> String {
let mut evidence = Vec::new();
collect_string_array(debug.get("evidence"), &mut evidence);
@@ -300,4 +315,3 @@ fn collect_string_array(value: Option<&serde_json::Value>, out: &mut Vec
}
}
}
-
diff --git a/rust/bioscript-cli/src/report_output.rs b/rust/bioscript-cli/src/report_output.rs
index 6766d91..7f2eeb4 100644
--- a/rust/bioscript-cli/src/report_output.rs
+++ b/rust/bioscript-cli/src/report_output.rs
@@ -19,6 +19,20 @@ fn app_report_json(input: AppReportJsonInput<'_>) -> serde_json::Value {
item.get("call_status").and_then(serde_json::Value::as_str) == Some("called")
})
.count();
+ let input_debug = input
+ .input_inspection
+ .map(|inspection| {
+ let mut value = input_inspection_json(inspection);
+ if observations_have_imputed_vcf_references(input.observations)
+ && let Some(object) = value.as_object_mut()
+ {
+ object.insert(
+ "vcf_missing_reference_imputation".to_owned(),
+ serde_json::Value::Bool(true),
+ );
+ }
+ value
+ });
serde_json::json!({
"schema": "bioscript:report:1.0",
"version": "1.0",
@@ -29,7 +43,7 @@ fn app_report_json(input: AppReportJsonInput<'_>) -> serde_json::Value {
"input": {
"file_name": input.input_file.file_name().and_then(|value| value.to_str()).unwrap_or_default(),
"file_path": input.input_file.display().to_string(),
- "debug": input.input_inspection.map(input_inspection_json),
+ "debug": input_debug,
},
"report_status": if called == input.observations.len() { "complete" } else { "partial" },
"derived_from": input.observations.iter().filter_map(|item| item.get("variant_key").cloned()).collect::>(),
@@ -46,6 +60,17 @@ fn app_report_json(input: AppReportJsonInput<'_>) -> serde_json::Value {
})
}
+fn observations_have_imputed_vcf_references(observations: &[serde_json::Value]) -> bool {
+ observations.iter().any(|observation| {
+ observation
+ .get("evidence_raw")
+ .and_then(serde_json::Value::as_str)
+ .is_some_and(|evidence| {
+ evidence.contains("imputed reference genotype from absent variant-only VCF record")
+ })
+ })
+}
+
fn report_manifest_metadata(path: &Path) -> Result {
let text = fs::read_to_string(path)
.map_err(|err| format!("failed to read manifest metadata {}: {err}", path.display()))?;
diff --git a/rust/bioscript-formats/src/genotype.rs b/rust/bioscript-formats/src/genotype.rs
index c082921..0fb784e 100644
--- a/rust/bioscript-formats/src/genotype.rs
+++ b/rust/bioscript-formats/src/genotype.rs
@@ -84,6 +84,10 @@ impl GenotypeStore {
GenotypeSourceFormat::Zip => Self::from_zip_file(path),
GenotypeSourceFormat::Vcf => Ok(Self::from_vcf_file(path, options)),
GenotypeSourceFormat::Cram => Self::from_cram_file(path, options),
+ GenotypeSourceFormat::Bam => Err(RuntimeError::Unsupported(format!(
+ "BAM alignment lookup is not implemented yet for {}",
+ path.display()
+ ))),
}
}
@@ -361,6 +365,7 @@ impl RsidMapBackend {
GenotypeSourceFormat::Zip => "zip",
GenotypeSourceFormat::Vcf => "vcf",
GenotypeSourceFormat::Cram => "cram",
+ GenotypeSourceFormat::Bam => "bam",
}
}
@@ -392,6 +397,7 @@ impl DelimitedBackend {
GenotypeSourceFormat::Zip => "zip",
GenotypeSourceFormat::Vcf => "vcf",
GenotypeSourceFormat::Cram => "cram",
+ GenotypeSourceFormat::Bam => "bam",
}
}
@@ -939,6 +945,18 @@ mod tests {
detect_source_format(&text, Some(GenotypeSourceFormat::Cram)).unwrap(),
GenotypeSourceFormat::Cram
));
+ let bam = dir.join("sample.bam");
+ fs::write(&bam, b"BAM").unwrap();
+ assert!(matches!(
+ detect_source_format(&bam, None).unwrap(),
+ GenotypeSourceFormat::Bam
+ ));
+ assert!(
+ GenotypeStore::from_file(&bam)
+ .unwrap_err()
+ .to_string()
+ .contains("BAM alignment lookup is not implemented yet")
+ );
assert!(!looks_like_vcf_lines(&["rsid\tgenotype".to_owned()]));
let backend = DelimitedBackend {
diff --git a/rust/bioscript-formats/src/genotype/cram_backend.rs b/rust/bioscript-formats/src/genotype/cram_backend.rs
index 01787f3..30d3747 100644
--- a/rust/bioscript-formats/src/genotype/cram_backend.rs
+++ b/rust/bioscript-formats/src/genotype/cram_backend.rs
@@ -18,6 +18,7 @@ use crate::alignment;
use super::GenotypeLoadOptions;
mod indel;
+mod observation;
mod reader;
mod store;
diff --git a/rust/bioscript-formats/src/genotype/cram_backend/observation.rs b/rust/bioscript-formats/src/genotype/cram_backend/observation.rs
new file mode 100644
index 0000000..1618e15
--- /dev/null
+++ b/rust/bioscript-formats/src/genotype/cram_backend/observation.rs
@@ -0,0 +1,374 @@
+use std::{
+ collections::{BTreeMap, BTreeSet},
+ path::Path,
+};
+
+use bioscript_core::{
+ Assembly, GenomicLocus, RuntimeError, VariantKind, VariantObservation, VariantSpec,
+};
+use noodles::cram;
+
+use crate::alignment::{self, AlignmentOpKind};
+use crate::genotype::types::CramBackend;
+
+use super::{
+ anchor_window, classify_expected_indel, describe_copy_number_decision_rule, describe_locus,
+ describe_snp_decision_rule, first_base, indel_at_anchor, infer_copy_number_genotype,
+ infer_snp_genotype, observe_snp_pileup, record_overlaps_locus, snp_pileup_with_reader,
+ spans_position,
+};
+
+impl CramBackend {
+ pub(super) fn observe_with_reader(
+ &self,
+ reader: &mut cram::io::indexed_reader::IndexedReader,
+ label: &str,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ ) -> Result {
+ match variant.kind.unwrap_or(VariantKind::Other) {
+ VariantKind::Snp => {
+ self.observe_snp_with_reader(reader, label, variant, assembly, locus)
+ }
+ VariantKind::Deletion => {
+ self.observe_deletion_with_reader(reader, label, variant, assembly, locus)
+ }
+ VariantKind::Insertion | VariantKind::Indel => {
+ self.observe_indel_with_reader(reader, label, variant, assembly, locus)
+ }
+ VariantKind::Other => Err(RuntimeError::Unsupported(format!(
+ "backend '{}' does not yet support {:?} observation for {}",
+ self.backend_name(),
+ variant.kind.unwrap_or(VariantKind::Other),
+ self.path.display()
+ ))),
+ }
+ }
+
+ pub(super) fn observe_snp(
+ &self,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ reference_file: &Path,
+ ) -> Result {
+ let reference = variant
+ .reference
+ .as_deref()
+ .and_then(first_base)
+ .ok_or_else(|| {
+ RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned())
+ })?;
+ let alternate = variant
+ .alternate
+ .as_deref()
+ .and_then(first_base)
+ .ok_or_else(|| {
+ RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned())
+ })?;
+
+ let target_pos = locus.start;
+ let pileup = observe_snp_pileup(
+ &self.path,
+ &self.options,
+ reference_file,
+ locus,
+ reference,
+ alternate,
+ )?;
+ let ref_count = pileup.filtered_ref_count;
+ let alt_count = pileup.filtered_alt_count;
+ let depth = pileup.filtered_depth;
+
+ let evidence = pileup.evidence_lines(&describe_locus(locus), target_pos);
+
+ Ok(VariantObservation {
+ backend: self.backend_name().to_owned(),
+ matched_rsid: variant.rsids.first().cloned(),
+ assembly: Some(assembly),
+ genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth),
+ ref_count: Some(ref_count),
+ alt_count: Some(alt_count),
+ depth: Some(depth),
+ raw_counts: pileup.raw_base_counts,
+ decision: Some(describe_snp_decision_rule(
+ reference, alternate, ref_count, alt_count, depth,
+ )),
+ evidence,
+ })
+ }
+
+ fn observe_snp_with_reader(
+ &self,
+ reader: &mut cram::io::indexed_reader::IndexedReader,
+ label: &str,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ ) -> Result {
+ let reference = variant
+ .reference
+ .as_deref()
+ .and_then(first_base)
+ .ok_or_else(|| {
+ RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned())
+ })?;
+ let alternate = variant
+ .alternate
+ .as_deref()
+ .and_then(first_base)
+ .ok_or_else(|| {
+ RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned())
+ })?;
+
+ let pileup = snp_pileup_with_reader(
+ reader,
+ label,
+ locus,
+ reference,
+ alternate,
+ self.options.allow_reference_md5_mismatch,
+ )?;
+ let ref_count = pileup.filtered_ref_count;
+ let alt_count = pileup.filtered_alt_count;
+ let depth = pileup.filtered_depth;
+ let evidence = pileup.evidence_lines(&describe_locus(locus), locus.start);
+
+ Ok(VariantObservation {
+ backend: self.backend_name().to_owned(),
+ matched_rsid: variant.rsids.first().cloned(),
+ assembly: Some(assembly),
+ genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth),
+ ref_count: Some(ref_count),
+ alt_count: Some(alt_count),
+ depth: Some(depth),
+ raw_counts: pileup.raw_base_counts,
+ decision: Some(describe_snp_decision_rule(
+ reference, alternate, ref_count, alt_count, depth,
+ )),
+ evidence,
+ })
+ }
+
+ pub(super) fn observe_deletion(
+ &self,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ reference_file: &Path,
+ ) -> Result {
+ let deletion_length = variant.deletion_length.ok_or_else(|| {
+ RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned())
+ })?;
+ let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned());
+ let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned());
+ let anchor_pos = locus.start.saturating_sub(1);
+
+ let mut alt_count = 0u32;
+ let mut ref_count = 0u32;
+ let mut depth = 0u32;
+
+ alignment::for_each_cram_record(
+ &self.path,
+ &self.options,
+ reference_file,
+ &anchor_window(locus),
+ |record| {
+ if record.is_unmapped || !spans_position(&record, anchor_pos) {
+ return Ok(true);
+ }
+ depth += 1;
+ match indel_at_anchor(&record, anchor_pos) {
+ Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => {
+ alt_count += 1;
+ }
+ _ => ref_count += 1,
+ }
+ Ok(true)
+ },
+ )?;
+
+ Ok(VariantObservation {
+ backend: self.backend_name().to_owned(),
+ matched_rsid: variant.rsids.first().cloned(),
+ assembly: Some(assembly),
+ genotype: infer_copy_number_genotype(
+ &reference, &alternate, ref_count, alt_count, depth,
+ ),
+ ref_count: Some(ref_count),
+ alt_count: Some(alt_count),
+ depth: Some(depth),
+ raw_counts: BTreeMap::new(),
+ decision: Some(describe_copy_number_decision_rule(
+ &reference, &alternate, ref_count, alt_count, depth,
+ )),
+ evidence: vec![format!(
+ "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}",
+ locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count
+ )],
+ })
+ }
+
+ fn observe_deletion_with_reader(
+ &self,
+ reader: &mut cram::io::indexed_reader::IndexedReader,
+ label: &str,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ ) -> Result {
+ let deletion_length = variant.deletion_length.ok_or_else(|| {
+ RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned())
+ })?;
+ let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned());
+ let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned());
+ let anchor_pos = locus.start.saturating_sub(1);
+
+ let mut alt_count = 0u32;
+ let mut ref_count = 0u32;
+ let mut depth = 0u32;
+
+ alignment::for_each_cram_record_with_reader(
+ reader,
+ label,
+ &anchor_window(locus),
+ |record| {
+ if record.is_unmapped || !spans_position(&record, anchor_pos) {
+ return Ok(true);
+ }
+ depth += 1;
+ match indel_at_anchor(&record, anchor_pos) {
+ Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => {
+ alt_count += 1;
+ }
+ _ => ref_count += 1,
+ }
+ Ok(true)
+ },
+ )?;
+
+ Ok(VariantObservation {
+ backend: self.backend_name().to_owned(),
+ matched_rsid: variant.rsids.first().cloned(),
+ assembly: Some(assembly),
+ genotype: infer_copy_number_genotype(
+ &reference, &alternate, ref_count, alt_count, depth,
+ ),
+ ref_count: Some(ref_count),
+ alt_count: Some(alt_count),
+ depth: Some(depth),
+ raw_counts: BTreeMap::new(),
+ decision: Some(describe_copy_number_decision_rule(
+ &reference, &alternate, ref_count, alt_count, depth,
+ )),
+ evidence: vec![format!(
+ "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}",
+ locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count
+ )],
+ })
+ }
+
+ pub(super) fn observe_indel(
+ &self,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ reference_file: &Path,
+ ) -> Result {
+ let reference = variant.reference.clone().ok_or_else(|| {
+ RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned())
+ })?;
+ let alternate = variant.alternate.clone().ok_or_else(|| {
+ RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned())
+ })?;
+ let records =
+ alignment::query_cram_records(&self.path, &self.options, reference_file, locus)?;
+
+ let mut alt_count = 0u32;
+ let mut ref_count = 0u32;
+ let mut depth = 0u32;
+ let mut matching_alt_lengths = BTreeSet::new();
+
+ for record in records {
+ if record.is_unmapped {
+ continue;
+ }
+ if !record_overlaps_locus(&record, locus) {
+ continue;
+ }
+ let classification =
+ classify_expected_indel(&record, locus, reference.len(), &alternate)?;
+ if !classification.covering {
+ continue;
+ }
+ depth += 1;
+ if classification.matches_alt {
+ alt_count += 1;
+ matching_alt_lengths.insert(classification.observed_len);
+ } else if classification.reference_like {
+ ref_count += 1;
+ }
+ }
+
+ let evidence_label = if matching_alt_lengths.is_empty() {
+ "none".to_owned()
+ } else {
+ matching_alt_lengths
+ .into_iter()
+ .map(|len| len.to_string())
+ .collect::>()
+ .join(",")
+ };
+
+ Ok(VariantObservation {
+ backend: self.backend_name().to_owned(),
+ matched_rsid: variant.rsids.first().cloned(),
+ assembly: Some(assembly),
+ genotype: infer_copy_number_genotype(
+ &reference, &alternate, ref_count, alt_count, depth,
+ ),
+ ref_count: Some(ref_count),
+ alt_count: Some(alt_count),
+ depth: Some(depth),
+ raw_counts: BTreeMap::new(),
+ decision: Some(describe_copy_number_decision_rule(
+ &reference, &alternate, ref_count, alt_count, depth,
+ )),
+ evidence: vec![format!(
+ "observed indel at {} depth={} ref_count={} alt_count={} matching_alt_lengths={}",
+ describe_locus(locus),
+ depth,
+ ref_count,
+ alt_count,
+ evidence_label
+ )],
+ })
+ }
+
+ fn observe_indel_with_reader(
+ &self,
+ reader: &mut cram::io::indexed_reader::IndexedReader,
+ label: &str,
+ variant: &VariantSpec,
+ assembly: Assembly,
+ locus: &GenomicLocus,
+ ) -> Result {
+ let reference = variant.reference.clone().ok_or_else(|| {
+ RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned())
+ })?;
+ let alternate = variant.alternate.clone().ok_or_else(|| {
+ RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned())
+ })?;
+
+ super::observe_cram_indel_with_reader(
+ reader,
+ label,
+ locus,
+ &reference,
+ &alternate,
+ variant.rsids.first().cloned(),
+ Some(assembly),
+ )
+ }
+}
diff --git a/rust/bioscript-formats/src/genotype/cram_backend/store.rs b/rust/bioscript-formats/src/genotype/cram_backend/store.rs
index 7af7aca..955e33a 100644
--- a/rust/bioscript-formats/src/genotype/cram_backend/store.rs
+++ b/rust/bioscript-formats/src/genotype/cram_backend/store.rs
@@ -1,23 +1,10 @@
-use std::{
- collections::{BTreeMap, BTreeSet},
- fmt::Write as _,
- path::Path,
-};
+use std::{env, fmt::Write as _, path::Path, thread};
-use bioscript_core::{
- Assembly, GenomicLocus, RuntimeError, VariantKind, VariantObservation, VariantSpec,
-};
+use bioscript_core::{RuntimeError, VariantKind, VariantObservation, VariantSpec};
-use noodles::cram;
+use crate::alignment;
-use crate::alignment::{self, AlignmentOpKind};
-
-use super::{
- anchor_window, choose_variant_locus, classify_expected_indel,
- describe_copy_number_decision_rule, describe_locus, describe_snp_decision_rule, first_base,
- indel_at_anchor, infer_copy_number_genotype, infer_snp_genotype, observe_snp_pileup,
- record_overlaps_locus, snp_pileup_with_reader, spans_position,
-};
+use super::choose_variant_locus;
use crate::genotype::{describe_query, types::CramBackend};
impl CramBackend {
@@ -89,27 +76,119 @@ impl CramBackend {
)));
};
+ let mut indexed: Vec<(usize, &VariantSpec)> = variants.iter().enumerate().collect();
+ indexed.sort_by_cached_key(|(_, variant)| crate::genotype::variant_sort_key(variant));
+
+ let worker_count = cram_lookup_worker_count(indexed.len());
+ if worker_count <= 1 {
+ return self.lookup_variants_serial(reference_file, &indexed);
+ }
+
+ let jobs = indexed
+ .into_iter()
+ .map(|(idx, variant)| (idx, variant.clone()))
+ .collect::>();
+ self.lookup_variants_parallel(reference_file, &jobs, worker_count)
+ }
+
+ fn lookup_variants_serial(
+ &self,
+ reference_file: &Path,
+ indexed: &[(usize, &VariantSpec)],
+ ) -> Result, RuntimeError> {
let repository = alignment::build_reference_repository(reference_file)?;
let mut reader =
alignment::build_cram_indexed_reader_from_path(&self.path, &self.options, repository)?;
let label = self.path.display().to_string();
- let mut indexed: Vec<(usize, &VariantSpec)> = variants.iter().enumerate().collect();
- indexed.sort_by_cached_key(|(_, variant)| crate::genotype::variant_sort_key(variant));
-
- let mut results = vec![VariantObservation::default(); variants.len()];
+ let result_len = indexed
+ .iter()
+ .map(|(idx, _)| *idx)
+ .max()
+ .map_or(0, |idx| idx + 1);
+ let mut results = vec![VariantObservation::default(); result_len];
for (idx, variant) in indexed {
let Some((assembly, locus)) = choose_variant_locus(variant, reference_file) else {
- results[idx] = self.unsupported_locus_observation(variant, reference_file);
+ results[*idx] = self.unsupported_locus_observation(variant, reference_file);
continue;
};
- results[idx] =
+ results[*idx] =
self.observe_with_reader(&mut reader, &label, variant, assembly, &locus)?;
}
Ok(results)
}
+ fn lookup_variants_parallel(
+ &self,
+ reference_file: &Path,
+ jobs: &[(usize, VariantSpec)],
+ worker_count: usize,
+ ) -> Result, RuntimeError> {
+ let result_len = jobs
+ .iter()
+ .map(|(idx, _)| *idx)
+ .max()
+ .map_or(0, |idx| idx + 1);
+ let chunk_size = jobs.len().div_ceil(worker_count);
+ let path = self.path.clone();
+ let options = self.options.clone();
+ let reference_file = reference_file.to_path_buf();
+
+ let mut handles = Vec::new();
+ for chunk in jobs.chunks(chunk_size) {
+ let chunk = chunk.to_vec();
+ let worker = CramBackend {
+ path: path.clone(),
+ options: options.clone(),
+ };
+ let reference_file = reference_file.clone();
+ handles.push(thread::spawn(
+ move || -> Result, RuntimeError> {
+ let repository = alignment::build_reference_repository(&reference_file)?;
+ let mut reader = alignment::build_cram_indexed_reader_from_path(
+ &worker.path,
+ &worker.options,
+ repository,
+ )?;
+ let label = worker.path.display().to_string();
+ let mut observations = Vec::with_capacity(chunk.len());
+
+ for (idx, variant) in chunk {
+ let observation = if let Some((assembly, locus)) =
+ choose_variant_locus(&variant, &reference_file)
+ {
+ worker.observe_with_reader(
+ &mut reader,
+ &label,
+ &variant,
+ assembly,
+ &locus,
+ )?
+ } else {
+ worker.unsupported_locus_observation(&variant, &reference_file)
+ };
+ observations.push((idx, observation));
+ }
+
+ Ok(observations)
+ },
+ ));
+ }
+
+ let mut results = vec![VariantObservation::default(); result_len];
+ for handle in handles {
+ let observations = handle
+ .join()
+ .map_err(|_| RuntimeError::Io("CRAM lookup worker panicked".to_owned()))??;
+ for (idx, observation) in observations {
+ results[idx] = observation;
+ }
+ }
+
+ Ok(results)
+ }
+
fn unsupported_locus_observation(
&self,
variant: &VariantSpec,
@@ -130,357 +209,22 @@ impl CramBackend {
..VariantObservation::default()
}
}
+}
- fn observe_with_reader(
- &self,
- reader: &mut cram::io::indexed_reader::IndexedReader,
- label: &str,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- ) -> Result {
- match variant.kind.unwrap_or(VariantKind::Other) {
- VariantKind::Snp => {
- self.observe_snp_with_reader(reader, label, variant, assembly, locus)
- }
- VariantKind::Deletion => {
- self.observe_deletion_with_reader(reader, label, variant, assembly, locus)
- }
- VariantKind::Insertion | VariantKind::Indel => {
- self.observe_indel_with_reader(reader, label, variant, assembly, locus)
- }
- VariantKind::Other => Err(RuntimeError::Unsupported(format!(
- "backend '{}' does not yet support {:?} observation for {}",
- self.backend_name(),
- variant.kind.unwrap_or(VariantKind::Other),
- self.path.display()
- ))),
- }
- }
-
- fn observe_snp(
- &self,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- reference_file: &Path,
- ) -> Result {
- let reference = variant
- .reference
- .as_deref()
- .and_then(first_base)
- .ok_or_else(|| {
- RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned())
- })?;
- let alternate = variant
- .alternate
- .as_deref()
- .and_then(first_base)
- .ok_or_else(|| {
- RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned())
- })?;
-
- let target_pos = locus.start;
- let pileup = observe_snp_pileup(
- &self.path,
- &self.options,
- reference_file,
- locus,
- reference,
- alternate,
- )?;
- let ref_count = pileup.filtered_ref_count;
- let alt_count = pileup.filtered_alt_count;
- let depth = pileup.filtered_depth;
-
- let evidence = pileup.evidence_lines(&describe_locus(locus), target_pos);
-
- Ok(VariantObservation {
- backend: self.backend_name().to_owned(),
- matched_rsid: variant.rsids.first().cloned(),
- assembly: Some(assembly),
- genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth),
- ref_count: Some(ref_count),
- alt_count: Some(alt_count),
- depth: Some(depth),
- raw_counts: pileup.raw_base_counts,
- decision: Some(describe_snp_decision_rule(
- reference, alternate, ref_count, alt_count, depth,
- )),
- evidence,
- })
- }
-
- fn observe_snp_with_reader(
- &self,
- reader: &mut cram::io::indexed_reader::IndexedReader,
- label: &str,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- ) -> Result {
- let reference = variant
- .reference
- .as_deref()
- .and_then(first_base)
- .ok_or_else(|| {
- RuntimeError::InvalidArguments("SNP variant requires ref/reference".to_owned())
- })?;
- let alternate = variant
- .alternate
- .as_deref()
- .and_then(first_base)
- .ok_or_else(|| {
- RuntimeError::InvalidArguments("SNP variant requires alt/alternate".to_owned())
- })?;
-
- let pileup = snp_pileup_with_reader(
- reader,
- label,
- locus,
- reference,
- alternate,
- self.options.allow_reference_md5_mismatch,
- )?;
- let ref_count = pileup.filtered_ref_count;
- let alt_count = pileup.filtered_alt_count;
- let depth = pileup.filtered_depth;
- let evidence = pileup.evidence_lines(&describe_locus(locus), locus.start);
-
- Ok(VariantObservation {
- backend: self.backend_name().to_owned(),
- matched_rsid: variant.rsids.first().cloned(),
- assembly: Some(assembly),
- genotype: infer_snp_genotype(reference, alternate, ref_count, alt_count, depth),
- ref_count: Some(ref_count),
- alt_count: Some(alt_count),
- depth: Some(depth),
- raw_counts: pileup.raw_base_counts,
- decision: Some(describe_snp_decision_rule(
- reference, alternate, ref_count, alt_count, depth,
- )),
- evidence,
- })
- }
-
- fn observe_deletion(
- &self,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- reference_file: &Path,
- ) -> Result {
- let deletion_length = variant.deletion_length.ok_or_else(|| {
- RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned())
- })?;
- let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned());
- let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned());
- let anchor_pos = locus.start.saturating_sub(1);
-
- let mut alt_count = 0u32;
- let mut ref_count = 0u32;
- let mut depth = 0u32;
-
- alignment::for_each_cram_record(
- &self.path,
- &self.options,
- reference_file,
- &anchor_window(locus),
- |record| {
- if record.is_unmapped || !spans_position(&record, anchor_pos) {
- return Ok(true);
- }
- depth += 1;
- match indel_at_anchor(&record, anchor_pos) {
- Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => {
- alt_count += 1;
- }
- _ => ref_count += 1,
- }
- Ok(true)
- },
- )?;
-
- Ok(VariantObservation {
- backend: self.backend_name().to_owned(),
- matched_rsid: variant.rsids.first().cloned(),
- assembly: Some(assembly),
- genotype: infer_copy_number_genotype(
- &reference, &alternate, ref_count, alt_count, depth,
- ),
- ref_count: Some(ref_count),
- alt_count: Some(alt_count),
- depth: Some(depth),
- raw_counts: BTreeMap::new(),
- decision: Some(describe_copy_number_decision_rule(
- &reference, &alternate, ref_count, alt_count, depth,
- )),
- evidence: vec![format!(
- "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}",
- locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count
- )],
- })
- }
-
- fn observe_deletion_with_reader(
- &self,
- reader: &mut cram::io::indexed_reader::IndexedReader,
- label: &str,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- ) -> Result {
- let deletion_length = variant.deletion_length.ok_or_else(|| {
- RuntimeError::InvalidArguments("deletion variant requires deletion_length".to_owned())
- })?;
- let reference = variant.reference.clone().unwrap_or_else(|| "I".to_owned());
- let alternate = variant.alternate.clone().unwrap_or_else(|| "D".to_owned());
- let anchor_pos = locus.start.saturating_sub(1);
-
- let mut alt_count = 0u32;
- let mut ref_count = 0u32;
- let mut depth = 0u32;
-
- alignment::for_each_cram_record_with_reader(
- reader,
- label,
- &anchor_window(locus),
- |record| {
- if record.is_unmapped || !spans_position(&record, anchor_pos) {
- return Ok(true);
- }
- depth += 1;
- match indel_at_anchor(&record, anchor_pos) {
- Some((AlignmentOpKind::Deletion, len)) if len == deletion_length => {
- alt_count += 1;
- }
- _ => ref_count += 1,
- }
- Ok(true)
- },
- )?;
-
- Ok(VariantObservation {
- backend: self.backend_name().to_owned(),
- matched_rsid: variant.rsids.first().cloned(),
- assembly: Some(assembly),
- genotype: infer_copy_number_genotype(
- &reference, &alternate, ref_count, alt_count, depth,
- ),
- ref_count: Some(ref_count),
- alt_count: Some(alt_count),
- depth: Some(depth),
- raw_counts: BTreeMap::new(),
- decision: Some(describe_copy_number_decision_rule(
- &reference, &alternate, ref_count, alt_count, depth,
- )),
- evidence: vec![format!(
- "observed deletion anchor {}:{} len={} depth={} ref_count={} alt_count={}",
- locus.chrom, anchor_pos, deletion_length, depth, ref_count, alt_count
- )],
- })
- }
-
- fn observe_indel(
- &self,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- reference_file: &Path,
- ) -> Result {
- let reference = variant.reference.clone().ok_or_else(|| {
- RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned())
- })?;
- let alternate = variant.alternate.clone().ok_or_else(|| {
- RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned())
- })?;
- let records =
- alignment::query_cram_records(&self.path, &self.options, reference_file, locus)?;
-
- let mut alt_count = 0u32;
- let mut ref_count = 0u32;
- let mut depth = 0u32;
- let mut matching_alt_lengths = BTreeSet::new();
-
- for record in records {
- if record.is_unmapped {
- continue;
- }
- if !record_overlaps_locus(&record, locus) {
- continue;
- }
- let classification =
- classify_expected_indel(&record, locus, reference.len(), &alternate)?;
- if !classification.covering {
- continue;
- }
- depth += 1;
- if classification.matches_alt {
- alt_count += 1;
- matching_alt_lengths.insert(classification.observed_len);
- } else if classification.reference_like {
- ref_count += 1;
- }
- }
-
- let evidence_label = if matching_alt_lengths.is_empty() {
- "none".to_owned()
- } else {
- matching_alt_lengths
- .into_iter()
- .map(|len| len.to_string())
- .collect::>()
- .join(",")
- };
+const DEFAULT_MAX_CRAM_WORKERS: usize = 16;
- Ok(VariantObservation {
- backend: self.backend_name().to_owned(),
- matched_rsid: variant.rsids.first().cloned(),
- assembly: Some(assembly),
- genotype: infer_copy_number_genotype(
- &reference, &alternate, ref_count, alt_count, depth,
- ),
- ref_count: Some(ref_count),
- alt_count: Some(alt_count),
- depth: Some(depth),
- raw_counts: BTreeMap::new(),
- decision: Some(describe_copy_number_decision_rule(
- &reference, &alternate, ref_count, alt_count, depth,
- )),
- evidence: vec![format!(
- "observed indel at {} depth={} ref_count={} alt_count={} matching_alt_lengths={}",
- describe_locus(locus),
- depth,
- ref_count,
- alt_count,
- evidence_label
- )],
- })
+fn cram_lookup_worker_count(job_count: usize) -> usize {
+ if job_count <= 1 {
+ return 1;
}
- fn observe_indel_with_reader(
- &self,
- reader: &mut cram::io::indexed_reader::IndexedReader,
- label: &str,
- variant: &VariantSpec,
- assembly: Assembly,
- locus: &GenomicLocus,
- ) -> Result {
- let reference = variant.reference.clone().ok_or_else(|| {
- RuntimeError::InvalidArguments("indel variant requires ref/reference".to_owned())
- })?;
- let alternate = variant.alternate.clone().ok_or_else(|| {
- RuntimeError::InvalidArguments("indel variant requires alt/alternate".to_owned())
- })?;
+ let requested = env::var("BIOSCRIPT_CRAM_THREADS")
+ .ok()
+ .and_then(|value| value.parse::().ok())
+ .filter(|value| *value > 0);
+ let available = thread::available_parallelism().map_or(1, usize::from);
- super::observe_cram_indel_with_reader(
- reader,
- label,
- locus,
- &reference,
- &alternate,
- variant.rsids.first().cloned(),
- Some(assembly),
- )
- }
+ requested
+ .unwrap_or_else(|| available.min(DEFAULT_MAX_CRAM_WORKERS))
+ .min(job_count)
}
diff --git a/rust/bioscript-formats/src/genotype/io.rs b/rust/bioscript-formats/src/genotype/io.rs
index d350fdb..c70622d 100644
--- a/rust/bioscript-formats/src/genotype/io.rs
+++ b/rust/bioscript-formats/src/genotype/io.rs
@@ -129,6 +129,9 @@ pub(crate) fn detect_source_format(
if lower.ends_with(".cram") {
return Ok(GenotypeSourceFormat::Cram);
}
+ if lower.ends_with(".bam") {
+ return Ok(GenotypeSourceFormat::Bam);
+ }
if lower.ends_with(".vcf") || lower.ends_with(".vcf.gz") {
return Ok(GenotypeSourceFormat::Vcf);
}
diff --git a/rust/bioscript-formats/src/genotype/types.rs b/rust/bioscript-formats/src/genotype/types.rs
index aad9abe..310066c 100644
--- a/rust/bioscript-formats/src/genotype/types.rs
+++ b/rust/bioscript-formats/src/genotype/types.rs
@@ -60,6 +60,7 @@ pub enum GenotypeSourceFormat {
Zip,
Vcf,
Cram,
+ Bam,
}
impl FromStr for GenotypeSourceFormat {
@@ -71,6 +72,7 @@ impl FromStr for GenotypeSourceFormat {
"zip" => Ok(Self::Zip),
"vcf" => Ok(Self::Vcf),
"cram" => Ok(Self::Cram),
+ "bam" => Ok(Self::Bam),
other => Err(format!("unsupported input format: {other}")),
}
}
diff --git a/rust/bioscript-formats/src/genotype/vcf/matching.rs b/rust/bioscript-formats/src/genotype/vcf/matching.rs
index 5a65a7e..719ad9b 100644
--- a/rust/bioscript-formats/src/genotype/vcf/matching.rs
+++ b/rust/bioscript-formats/src/genotype/vcf/matching.rs
@@ -37,13 +37,18 @@ pub(super) fn imputed_reference_observation(
let reference = first_single_base_allele(variant.reference.as_deref())?;
first_single_base_allele(variant.alternate.as_deref())?;
let genotype = reference_genotype_for_locus(reference, locus, inferred_sex);
+ let evidence_prefix = if missing_evidence.contains(label) {
+ missing_evidence.to_owned()
+ } else {
+ format!("{label}: {missing_evidence}")
+ };
Some(VariantObservation {
backend: backend_name.to_owned(),
matched_rsid: variant.rsids.first().cloned(),
assembly,
genotype: Some(genotype),
evidence: vec![format!(
- "{label}: {missing_evidence} | imputed reference genotype from absent variant-only VCF record"
+ "{evidence_prefix} | imputed reference genotype from absent variant-only VCF record"
)],
..VariantObservation::default()
})
diff --git a/rust/bioscript-formats/src/prepare.rs b/rust/bioscript-formats/src/prepare.rs
index 3d62a0d..a93238d 100644
--- a/rust/bioscript-formats/src/prepare.rs
+++ b/rust/bioscript-formats/src/prepare.rs
@@ -49,7 +49,7 @@ pub fn prepare_indexes(request: &PrepareRequest) -> Result {
+ (Some(path), Some(GenotypeSourceFormat::Cram | GenotypeSourceFormat::Bam)) => {
Some(ensure_alignment_index(path, &cache_dir)?)
}
(Some(path), None) if detect_alignment_input(path) => {
diff --git a/rust/bioscript-formats/tests/file_formats/basic.rs b/rust/bioscript-formats/tests/file_formats/basic.rs
index 8dca8f8..906dbc9 100644
--- a/rust/bioscript-formats/tests/file_formats/basic.rs
+++ b/rust/bioscript-formats/tests/file_formats/basic.rs
@@ -38,9 +38,13 @@ fn genotype_source_format_parses_supported_values_and_rejects_unknowns() {
"cram".parse::().unwrap(),
GenotypeSourceFormat::Cram
);
+ assert_eq!(
+ "bam".parse::().unwrap(),
+ GenotypeSourceFormat::Bam
+ );
- let err = "bam".parse::().unwrap_err();
- assert_eq!(err, "unsupported input format: bam");
+ let err = "plink".parse::().unwrap_err();
+ assert_eq!(err, "unsupported input format: plink");
}
#[test]