Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 138 additions & 89 deletions rust/bioscript-formats/src/genotype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use zip::ZipArchive;
use bioscript_core::{Assembly, GenomicLocus, VariantKind};
use bioscript_core::{RuntimeError, VariantObservation, VariantSpec};

mod backends;
mod common;
mod cram_backend;
mod delimited;
Expand Down Expand Up @@ -51,12 +52,14 @@ pub use types::{
BackendCapabilities, GenotypeLoadOptions, GenotypeSourceFormat, GenotypeStore, QueryKind,
};
use types::{CramBackend, DelimitedBackend, QueryBackend, RsidMapBackend, VcfBackend};
pub use vcf::observe_vcf_snp_with_reader;
#[cfg(test)]
use vcf::{
choose_variant_locus_for_assembly, detect_vcf_assembly, extract_vcf_sample_genotype,
normalize_chromosome_name, parse_vcf_record, vcf_row_matches_variant,
};
pub use vcf::{
imputed_reference_observation, observe_vcf_snp_with_reader, observe_vcf_variant_with_reader,
};
use vcf::{lookup_indexed_vcf_variants, scan_vcf_variants};
use vcf_tokens::genotype_from_vcf_gt;
#[cfg(test)]
Expand All @@ -71,6 +74,38 @@ impl GenotypeStore {
Self::from_file_with_options(path, &GenotypeLoadOptions::default())
}

/// Wrap an existing store with a cache of pre-resolved observations.
/// `lookup_variant`/`lookup_variants` consult `observations` first; on
/// miss they delegate to the underlying store. This is how the report
/// pipeline avoids re-walking the genome inside analysis Python scripts —
/// the variants the panel/assays declared have already been observed in
/// `run_manifest_rows`, so the cache hits and the fallback only fires
/// for novel rsids the script asks about on its own.
pub fn with_cached_observations(
observations: Vec<VariantObservation>,
fallback: GenotypeStore,
) -> Self {
Self {
backend: QueryBackend::Cached {
observations,
fallback: Box::new(fallback.backend),
},
}
}

/// Empty store whose `lookup_*` always returns "no observation". Useful as
/// the wasm-side fallback when paired with `with_cached_observations` —
/// every rsid the panel cares about is already in the cache so the
/// fallback never fires; novel rsids return None gracefully.
pub fn empty() -> Self {
Self {
backend: QueryBackend::RsidMap(RsidMapBackend {
format: GenotypeSourceFormat::Text,
values: HashMap::new(),
}),
}
}

pub fn from_file_with_options(
path: &Path,
options: &GenotypeLoadOptions,
Expand Down Expand Up @@ -288,6 +323,15 @@ impl GenotypeStore {
rsid_lookup: false,
locus_lookup: true,
},
QueryBackend::Cached { .. } => {
// The cache itself answers both rsid and locus queries (we
// match by either), so unioning with the fallback gives the
// strongest set.
BackendCapabilities {
rsid_lookup: true,
locus_lookup: true,
}
}
}
}

Expand All @@ -305,6 +349,7 @@ impl GenotypeStore {
QueryBackend::Delimited(backend) => backend.backend_name(),
QueryBackend::Vcf(backend) => backend.backend_name(),
QueryBackend::Cram(backend) => backend.backend_name(),
QueryBackend::Cached { .. } => "cached",
}
}

Expand All @@ -319,6 +364,21 @@ impl GenotypeStore {
..VariantSpec::default()
})
.map(|obs| obs.genotype),
QueryBackend::Cached {
observations,
fallback,
} => {
if let Some(matched) = observations
.iter()
.find(|obs| obs.matched_rsid.as_deref().is_some_and(|r| r == rsid))
{
return Ok(matched.genotype.clone());
}
let inner = GenotypeStore {
backend: (**fallback).clone(),
};
inner.get(rsid)
}
}
}

Expand All @@ -331,13 +391,55 @@ impl GenotypeStore {
QueryBackend::Delimited(backend) => backend.lookup_variant(variant),
QueryBackend::Vcf(backend) => backend.lookup_variant(variant),
QueryBackend::Cram(backend) => backend.lookup_variant(variant),
QueryBackend::Cached {
observations,
fallback,
} => {
if let Some(hit) = match_cached_observation(observations, variant) {
return Ok(hit.clone());
}
let inner = GenotypeStore {
backend: (**fallback).clone(),
};
inner.lookup_variant(variant)
}
}
}

pub fn lookup_variants(
&self,
variants: &[VariantSpec],
) -> Result<Vec<VariantObservation>, RuntimeError> {
if let QueryBackend::Cached {
observations,
fallback,
} = &self.backend
{
// Resolve cache hits up-front; only round-trip the fallback for
// misses so we don't pay for re-opening a CRAM/VCF when the panel
// already covered every variant the script needs.
let mut results: Vec<Option<VariantObservation>> = vec![None; variants.len()];
let mut miss_indices = Vec::new();
let mut miss_specs = Vec::new();
for (idx, spec) in variants.iter().enumerate() {
if let Some(hit) = match_cached_observation(observations, spec) {
results[idx] = Some(hit.clone());
} else {
miss_indices.push(idx);
miss_specs.push(spec.clone());
}
}
if !miss_specs.is_empty() {
let inner = GenotypeStore {
backend: (**fallback).clone(),
};
let resolved = inner.lookup_variants(&miss_specs)?;
for (idx, observation) in miss_indices.into_iter().zip(resolved) {
results[idx] = Some(observation);
}
}
return Ok(results.into_iter().map(Option::unwrap_or_default).collect());
}
if let QueryBackend::Delimited(backend) = &self.backend {
return backend.lookup_variants(variants);
}
Expand All @@ -358,97 +460,44 @@ impl GenotypeStore {
}
}

impl RsidMapBackend {
fn backend_name(&self) -> &'static str {
match self.format {
GenotypeSourceFormat::Text => "text",
GenotypeSourceFormat::Zip => "zip",
GenotypeSourceFormat::Vcf => "vcf",
GenotypeSourceFormat::Cram => "cram",
GenotypeSourceFormat::Bam => "bam",
}
/// Match a `VariantSpec` against a pre-resolved observation list. Tries rsid
/// equality first (most common case for `PGx` panels), then falls back to a
/// chrom+pos+ref+alt match against either `GRCh37` or `GRCh38` loci so cached
/// observations from a CRAM lookup (which may have been done on one assembly)
/// can satisfy a script that supplies the spec on the other.
fn match_cached_observation<'a>(
observations: &'a [VariantObservation],
spec: &VariantSpec,
) -> Option<&'a VariantObservation> {
if let Some(matched) = observations.iter().find(|obs| {
obs.matched_rsid
.as_deref()
.is_some_and(|rsid| spec.rsids.iter().any(|target| target == rsid))
}) {
return Some(matched);
}

fn lookup_variant(&self, variant: &VariantSpec) -> Result<VariantObservation, RuntimeError> {
for rsid in &variant.rsids {
if let Some(value) = self.values.get(rsid) {
return Ok(VariantObservation {
backend: self.backend_name().to_owned(),
matched_rsid: Some(rsid.clone()),
genotype: Some(value.clone()),
evidence: vec![format!("resolved by rsid {rsid}")],
..VariantObservation::default()
});
}
}

Ok(VariantObservation {
backend: self.backend_name().to_owned(),
evidence: vec!["no matching rsid found".to_owned()],
..VariantObservation::default()
})
}
}

impl DelimitedBackend {
fn backend_name(&self) -> &'static str {
match self.format {
GenotypeSourceFormat::Text => "text",
GenotypeSourceFormat::Zip => "zip",
GenotypeSourceFormat::Vcf => "vcf",
GenotypeSourceFormat::Cram => "cram",
GenotypeSourceFormat::Bam => "bam",
let assembly_loci = [spec.grch37.as_ref(), spec.grch38.as_ref()];
let target_ref = spec.reference.as_deref();
let target_alt = spec.alternate.as_deref();
observations.iter().find(|obs| {
let Some(loci) = assembly_loci.iter().find_map(|l| *l) else {
return false;
};
let evidence_match = obs
.evidence
.iter()
.any(|line| line.contains(&loci.chrom) && line.contains(&loci.start.to_string()));
if !evidence_match {
return false;
}
}

fn get(&self, rsid: &str) -> Result<Option<String>, RuntimeError> {
let results = self.lookup_variants(&[VariantSpec {
rsids: vec![rsid.to_owned()],
..VariantSpec::default()
}])?;
Ok(results.into_iter().next().and_then(|obs| obs.genotype))
}

fn lookup_variant(&self, variant: &VariantSpec) -> Result<VariantObservation, RuntimeError> {
let mut results = self.lookup_variants(std::slice::from_ref(variant))?;
Ok(results.pop().unwrap_or_default())
}

fn lookup_variants(
&self,
variants: &[VariantSpec],
) -> Result<Vec<VariantObservation>, RuntimeError> {
scan_delimited_variants(self, variants)
}
}

impl VcfBackend {
fn backend_name(&self) -> &'static str {
"vcf"
}

fn get(&self, rsid: &str) -> Result<Option<String>, RuntimeError> {
let results = self.lookup_variants(&[VariantSpec {
rsids: vec![rsid.to_owned()],
..VariantSpec::default()
}])?;
Ok(results.into_iter().next().and_then(|obs| obs.genotype))
}

fn lookup_variant(&self, variant: &VariantSpec) -> Result<VariantObservation, RuntimeError> {
let mut results = self.lookup_variants(std::slice::from_ref(variant))?;
Ok(results.pop().unwrap_or_default())
}

fn lookup_variants(
&self,
variants: &[VariantSpec],
) -> Result<Vec<VariantObservation>, RuntimeError> {
if let Some(results) = lookup_indexed_vcf_variants(self, variants)? {
return Ok(results);
match (target_ref, target_alt) {
(Some(r), Some(a)) => obs
.evidence
.iter()
.any(|line| line.contains(r) && line.contains(a)),
_ => true,
}
scan_vcf_variants(self, variants)
}
})
}

#[cfg(test)]
Expand Down
Loading
Loading