From 620395cf02865819166142eb24009f6456cdf649 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Tue, 23 Jun 2026 14:53:06 +1000 Subject: [PATCH] Support gzipped genotype text in zip inputs --- rust/bioscript-formats/src/genotype.rs | 471 +++-------------- .../src/genotype/delimited/scan.rs | 55 +- rust/bioscript-formats/src/genotype/io.rs | 11 + rust/bioscript-formats/src/genotype/load.rs | 487 ++++++++++++++++++ rust/bioscript-formats/src/inspect/io.rs | 47 ++ 5 files changed, 644 insertions(+), 427 deletions(-) create mode 100644 rust/bioscript-formats/src/genotype/load.rs diff --git a/rust/bioscript-formats/src/genotype.rs b/rust/bioscript-formats/src/genotype.rs index c4ba9d5..d647469 100644 --- a/rust/bioscript-formats/src/genotype.rs +++ b/rust/bioscript-formats/src/genotype.rs @@ -1,14 +1,6 @@ -use std::{ - collections::HashMap, - fs::File, - io::{BufRead, BufReader, Cursor}, - path::Path, -}; - -use zip::ZipArchive; +use std::collections::HashMap; -use bioscript_core::{RuntimeError, VariantObservation}; -use noodles::bgzf; +use bioscript_core::VariantObservation; mod alignment_bytes; mod backends; @@ -19,6 +11,7 @@ mod common; mod cram_backend; mod delimited; mod io; +mod load; mod loaders; mod query; mod types; @@ -36,17 +29,11 @@ pub(crate) use delimited::{ lines_look_like_gsgt, parse_streaming_row, }; use delimited::{RowParser, is_no_call as gsgt_is_no_call, scan_delimited_variants}; -use io::{ - detect_source_format, is_bgzf_path, looks_like_vcf_lines, read_lines_from_reader, - select_zip_entry, -}; -use types::{ - AlignmentBytesBackend, BamBackend, BcfBackend, BcfSource, CramBackend, DelimitedBackend, - QueryBackend, RsidMapBackend, VcfBackend, -}; +pub(crate) use io::is_bgzf_path; pub use types::{ BackendCapabilities, GenotypeLoadOptions, GenotypeSourceFormat, GenotypeStore, QueryKind, }; +use types::{QueryBackend, RsidMapBackend}; pub use vcf::{ choose_variant_locus_for_assembly, detect_vcf_assembly, imputed_reference_observation, observe_vcf_snp_with_reader, observe_vcf_variant_with_reader, @@ -55,10 +42,6 @@ use vcf::{lookup_indexed_vcf_variants, scan_vcf_variants}; pub(crate) use vcf_tokens::genotype_from_vcf_gt; impl GenotypeStore { - pub fn from_file(path: &Path) -> Result { - Self::from_file_with_options(path, &GenotypeLoadOptions::default()) - } - /// Wrap an existing store with a cache of pre-resolved observations. /// `lookup_variant`/`lookup_variants` consult `observations` first; on /// miss they delegate to the underlying store. This is how the report @@ -111,404 +94,21 @@ impl GenotypeStore { }), } } - - pub fn from_file_with_options( - path: &Path, - options: &GenotypeLoadOptions, - ) -> Result { - match detect_source_format(path, options.format)? { - GenotypeSourceFormat::Text => Ok(Self::from_delimited_file( - path, - GenotypeSourceFormat::Text, - None, - options, - )), - GenotypeSourceFormat::Zip => Self::from_zip_file(path, options), - GenotypeSourceFormat::Vcf => Ok(Self::from_vcf_file(path, options)), - GenotypeSourceFormat::Bcf => { - if path - .to_string_lossy() - .to_ascii_lowercase() - .ends_with(".zip") - { - Self::from_zip_file(path, options) - } else { - Ok(Self::from_bcf_file(path, options)) - } - } - GenotypeSourceFormat::Cram => Self::from_cram_file(path, options), - GenotypeSourceFormat::Bam => Ok(Self::from_bam_file(path, options)), - } - } - - pub fn from_bytes(name: &str, bytes: &[u8]) -> Result { - Self::from_bytes_with_options(name, bytes, &GenotypeLoadOptions::default()) - } - - pub fn from_bytes_with_options( - name: &str, - bytes: &[u8], - options: &GenotypeLoadOptions, - ) -> Result { - if let Some(format) = options.format { - return match format { - GenotypeSourceFormat::Text => { - let reader = BufReader::new(Cursor::new(bytes)); - Self::from_delimited_reader(GenotypeSourceFormat::Text, reader, name) - } - GenotypeSourceFormat::Zip => Self::from_zip_bytes(name, bytes), - GenotypeSourceFormat::Vcf => Self::from_vcf_bytes(name, bytes), - GenotypeSourceFormat::Bcf => Ok(Self::from_bcf_bytes(name, bytes, options)), - GenotypeSourceFormat::Cram | GenotypeSourceFormat::Bam => { - Err(RuntimeError::Unsupported(format!( - "{format:?} input requires alignment byte loading" - ))) - } - }; - } - - // The report pipeline hands us a fixed virtual path (`/input/genotypes`) - // with no extension, so we cannot rely on `name` alone for format - // detection the way `from_file_with_options` can. Sniff the leading - // bytes so a zip/VCF payload is recognised regardless of the name. - let lower = name.to_ascii_lowercase(); - if lower.ends_with(".zip") || bytes_look_like_zip(bytes) { - return Self::from_zip_bytes(name, bytes); - } - if lower.ends_with(".bcf") { - return Ok(Self::from_bcf_bytes( - name, - bytes, - &GenotypeLoadOptions::default(), - )); - } - if lower.ends_with(".vcf") || lower.ends_with(".vcf.gz") || bytes_look_like_vcf(bytes) { - return Self::from_vcf_bytes(name, bytes); - } - let reader = BufReader::new(Cursor::new(bytes)); - Self::from_delimited_reader(GenotypeSourceFormat::Text, reader, name) - } - - fn from_zip_bytes(name: &str, bytes: &[u8]) -> Result { - let mut archive = ZipArchive::new(Cursor::new(bytes)).map_err(|err| { - RuntimeError::Io(format!("failed to read genotype zip {name}: {err}")) - })?; - let bcf_entries = collect_bcf_zip_entries_from_archive(&mut archive, name)?; - if !bcf_entries.is_empty() { - let mut entries = Vec::with_capacity(bcf_entries.len()); - for entry_name in bcf_entries { - let mut entry = archive.by_name(&entry_name).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype entry {entry_name} in {name}: {err}" - )) - })?; - let mut data = Vec::new(); - std::io::Read::read_to_end(&mut entry, &mut data).map_err(|err| { - RuntimeError::Io(format!( - "failed to read genotype entry {entry_name} in {name}: {err}" - )) - })?; - entries.push((entry_name, data)); - } - return Ok(Self { - backend: QueryBackend::Bcf(BcfBackend { - source: BcfSource::ZipBytes { - name: name.to_owned(), - entries, - }, - options: GenotypeLoadOptions::default(), - }), - }); - } - let mut selected = None; - for idx in 0..archive.len() { - let entry = archive.by_index(idx).map_err(|err| { - RuntimeError::Io(format!("failed to inspect genotype zip {name}: {err}")) - })?; - if entry.is_dir() { - continue; - } - let entry_name = entry.name().to_owned(); - let lower = entry_name.to_ascii_lowercase(); - if lower.ends_with(".vcf") - || lower.ends_with(".vcf.gz") - || lower.ends_with(".bcf") - || lower.ends_with(".txt") - || lower.ends_with(".tsv") - || lower.ends_with(".csv") - { - selected = Some(entry_name); - break; - } - } - let selected = selected.ok_or_else(|| { - RuntimeError::Unsupported(format!( - "zip archive {name} does not contain a supported genotype file" - )) - })?; - let entry = archive.by_name(&selected).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype entry {selected} in {name}: {err}" - )) - })?; - let label = format!("genotype entry {selected} in {name}"); - // Stream-decompress directly off the zip reader so we never have to - // materialize the entire decompressed entry in memory. GenesForGood - // exports decompress to >128MB which used to trip the old - // `read_zip_entry_limited` cap; the cap is gone because the streaming - // parser keeps memory bounded to the rsid map itself. - if selected.to_ascii_lowercase().ends_with(".vcf.gz") { - return Self::from_vcf_reader(BufReader::new(bgzf::io::Reader::new(entry)), &label); - } - let reader = BufReader::new(entry); - if selected.to_ascii_lowercase().ends_with(".vcf") { - return Self::from_vcf_reader(reader, &label); - } - if selected.to_ascii_lowercase().ends_with(".bcf") { - let mut entry = reader.into_inner(); - let mut data = Vec::new(); - std::io::Read::read_to_end(&mut entry, &mut data).map_err(|err| { - RuntimeError::Io(format!( - "failed to read genotype entry {selected} in {name}: {err}" - )) - })?; - return Ok(Self::from_bcf_bytes( - &label, - &data, - &GenotypeLoadOptions::default(), - )); - } - Self::from_delimited_reader(GenotypeSourceFormat::Zip, reader, &label) - } - - fn from_vcf_file(path: &Path, options: &GenotypeLoadOptions) -> Self { - Self { - backend: QueryBackend::Vcf(VcfBackend { - path: path.to_path_buf(), - options: options.clone(), - }), - } - } - - fn from_bcf_file(path: &Path, options: &GenotypeLoadOptions) -> Self { - Self { - backend: QueryBackend::Bcf(BcfBackend { - source: BcfSource::File(path.to_path_buf()), - options: options.clone(), - }), - } - } - - fn from_bcf_bytes(name: &str, bytes: &[u8], options: &GenotypeLoadOptions) -> Self { - Self { - backend: QueryBackend::Bcf(BcfBackend { - source: BcfSource::Bytes { - name: name.to_owned(), - data: bytes.to_vec(), - }, - options: options.clone(), - }), - } - } - - fn from_vcf_bytes(name: &str, bytes: &[u8]) -> Result { - let lower = name.to_ascii_lowercase(); - if lower.ends_with(".vcf.gz") || bytes_look_like_gzip(bytes) { - return Self::from_vcf_reader( - BufReader::new(bgzf::io::Reader::new(Cursor::new(bytes))), - name, - ); - } - Self::from_vcf_reader(BufReader::new(Cursor::new(bytes)), name) - } - - fn from_zip_file(path: &Path, options: &GenotypeLoadOptions) -> Result { - let bcf_entries = collect_bcf_zip_entries(path)?; - if !bcf_entries.is_empty() { - return Ok(Self { - backend: QueryBackend::Bcf(BcfBackend { - source: BcfSource::ZipFile { - path: path.to_path_buf(), - entries: bcf_entries, - }, - options: options.clone(), - }), - }); - } - let selected = select_zip_entry(path)?; - let lower = selected.to_ascii_lowercase(); - if lower.ends_with(".vcf") || lower.ends_with(".vcf.gz") { - let file = File::open(path).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype zip {}: {err}", - path.display() - )) - })?; - let mut archive = ZipArchive::new(file).map_err(|err| { - RuntimeError::Io(format!( - "failed to read genotype zip {}: {err}", - path.display() - )) - })?; - let entry = archive.by_name(&selected).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype entry {selected} in {}: {err}", - path.display() - )) - })?; - let lines = read_lines_from_reader(BufReader::new(entry), path)?; - return Self::from_vcf_lines(lines); - } - Ok(Self::from_delimited_file( - path, - GenotypeSourceFormat::Zip, - Some(selected), - options, - )) - } - - fn from_cram_file(path: &Path, options: &GenotypeLoadOptions) -> Result { - Ok(Self { - backend: QueryBackend::Cram(CramBackend { - path: path.to_path_buf(), - options: options.clone(), - }), - }) - } - - fn from_bam_file(path: &Path, options: &GenotypeLoadOptions) -> Self { - Self { - backend: QueryBackend::Bam(BamBackend { - path: path.to_path_buf(), - options: options.clone(), - }), - } - } - - /// Build an in-memory CRAM/BAM store. `kind` is `Cram` or `Bam`; `index` - /// is the `.crai`/`.bai` bytes; `reference`/`reference_index` are the - /// FASTA + `.fai` bytes (CRAM only — pass empty for BAM). Used by the - /// report pipeline, which virtualizes the genotype input. - #[must_use] - pub fn from_alignment_bytes( - kind: GenotypeSourceFormat, - data: Vec, - index: Vec, - reference: Vec, - reference_index: Vec, - options: &GenotypeLoadOptions, - ) -> Self { - Self { - backend: QueryBackend::AlignmentBytes(AlignmentBytesBackend { - kind, - data, - index, - reference, - reference_index, - options: options.clone(), - }), - } - } - - fn from_vcf_reader(reader: R, label: &str) -> Result { - loaders::from_vcf_reader(reader, label) - } - - fn from_delimited_reader( - format: GenotypeSourceFormat, - reader: R, - label: &str, - ) -> Result { - loaders::from_delimited_reader(format, reader, label) - } - - fn from_vcf_lines(lines: Vec) -> Result { - loaders::from_vcf_lines(lines) - } - - fn from_delimited_file( - path: &Path, - format: GenotypeSourceFormat, - zip_entry_name: Option, - options: &GenotypeLoadOptions, - ) -> Self { - Self { - backend: QueryBackend::Delimited(DelimitedBackend { - format, - path: path.to_path_buf(), - zip_entry_name, - options: options.clone(), - }), - } - } -} - -fn collect_bcf_zip_entries(path: &Path) -> Result, RuntimeError> { - let file = File::open(path).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype zip {}: {err}", - path.display() - )) - })?; - let mut archive = ZipArchive::new(file).map_err(|err| { - RuntimeError::Io(format!( - "failed to read genotype zip {}: {err}", - path.display() - )) - })?; - collect_bcf_zip_entries_from_archive(&mut archive, &path.display().to_string()) -} - -fn collect_bcf_zip_entries_from_archive( - archive: &mut ZipArchive, - label: &str, -) -> Result, RuntimeError> { - let mut entries = Vec::new(); - for idx in 0..archive.len() { - let entry = archive.by_index(idx).map_err(|err| { - RuntimeError::Io(format!("failed to inspect genotype zip {label}: {err}")) - })?; - if entry.is_dir() { - continue; - } - let name = entry.name().to_owned(); - if name.to_ascii_lowercase().ends_with(".bcf") { - entries.push(name); - } - } - entries.sort(); - Ok(entries) -} - -fn bytes_look_like_zip(bytes: &[u8]) -> bool { - bytes.starts_with(b"PK\x03\x04") - || bytes.starts_with(b"PK\x05\x06") - || bytes.starts_with(b"PK\x07\x08") -} - -fn bytes_look_like_gzip(bytes: &[u8]) -> bool { - bytes.starts_with(&[0x1f, 0x8b]) -} - -fn bytes_look_like_vcf(bytes: &[u8]) -> bool { - let prefix = &bytes[..bytes.len().min(8192)]; - let text = String::from_utf8_lossy(prefix); - let lines: Vec = text.lines().map(str::to_owned).collect(); - looks_like_vcf_lines(&lines) } #[cfg(test)] mod tests { use super::*; use std::{ - fs, + fs::{self, File}, io::Write, - path::PathBuf, + path::{Path, PathBuf}, str::FromStr, time::{SystemTime, UNIX_EPOCH}, }; use bioscript_core::{Assembly, GenomicLocus, VariantKind, VariantSpec}; + use flate2::{Compression, write::GzEncoder}; use noodles::bgzf; use noodles::csi; use zip::write::SimpleFileOptions; @@ -528,7 +128,11 @@ mod tests { looks_like_header_fields, normalize_name, split_csv_line, strip_bom, strip_inline_comment, }, - io::{looks_like_vcf_lines, read_plain_lines}, + io::{ + detect_source_format, is_bgzf_path, looks_like_vcf_lines, read_plain_lines, + select_zip_entry, + }, + types::{DelimitedBackend, VcfBackend}, vcf::{ detect_vcf_assembly, detect_vcf_assembly_from_path, extract_vcf_sample_genotype, normalize_chromosome_name, parse_vcf_record, vcf_row_matches_variant, @@ -579,6 +183,12 @@ mod tests { } } + fn gzip_bytes(contents: &[u8]) -> Vec { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(contents).unwrap(); + encoder.finish().unwrap() + } + #[test] fn genotype_public_cache_wrappers_and_empty_store_cover_lookup_contracts() { let fallback = @@ -636,6 +246,49 @@ mod tests { assert_eq!(empty.backend_name(), "text"); } + #[test] + fn genotype_loads_gzipped_text_from_path_and_bytes() { + let text = b"rsid\tgenotype\nrs1\tAG\n"; + let compressed = gzip_bytes(text); + + let dir = temp_dir("gzipped-text"); + let path = dir.join("sample.txt.gz"); + fs::write(&path, &compressed).unwrap(); + let from_path = GenotypeStore::from_file(&path).unwrap(); + assert_eq!(from_path.get("rs1").unwrap().as_deref(), Some("AG")); + + let from_bytes = GenotypeStore::from_bytes("sample.txt.gz", &compressed).unwrap(); + assert_eq!(from_bytes.get("rs1").unwrap().as_deref(), Some("AG")); + } + + #[test] + fn genotype_loads_vcf_gz_entry_from_zip_path() { + let dir = temp_dir("zip-vcf-gz"); + let zip_path = dir.join("sample.zip"); + let mut vcf_gz = Vec::new(); + { + let mut writer = bgzf::io::Writer::new(&mut vcf_gz); + writer + .write_all( + b"##fileformat=VCFv4.2\n#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tsample\n1\t10\trs1\tA\tG\t.\tPASS\t.\tGT\t0/1\n", + ) + .unwrap(); + writer.finish().unwrap(); + } + { + let file = File::create(&zip_path).unwrap(); + let mut archive = zip::ZipWriter::new(file); + archive + .start_file("nested/sample.vcf.gz", SimpleFileOptions::default()) + .unwrap(); + archive.write_all(&vcf_gz).unwrap(); + archive.finish().unwrap(); + } + + let store = GenotypeStore::from_file(&zip_path).unwrap(); + assert_eq!(store.get("rs1").unwrap().as_deref(), Some("AG")); + } + #[test] fn genotype_private_helpers_cover_assembly_sorting_and_decision_rules() { let variant = variant_with_loci(); diff --git a/rust/bioscript-formats/src/genotype/delimited/scan.rs b/rust/bioscript-formats/src/genotype/delimited/scan.rs index bb7f3a6..d7c8c51 100644 --- a/rust/bioscript-formats/src/genotype/delimited/scan.rs +++ b/rust/bioscript-formats/src/genotype/delimited/scan.rs @@ -4,6 +4,7 @@ use std::{ io::{BufRead, BufReader}, }; +use flate2::read::MultiGzDecoder; use zip::ZipArchive; use crate::inspect::{AssemblyAnchorScorer, detect_assembly}; @@ -127,15 +128,7 @@ fn prescan_assembly_anchors( } match backend.format { - GenotypeSourceFormat::Text => { - let file = File::open(&backend.path).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype file {}: {err}", - backend.path.display() - )) - })?; - vote(BufReader::new(file), is_gsgt) - } + GenotypeSourceFormat::Text => vote(open_text_reader(backend)?, is_gsgt), GenotypeSourceFormat::Zip => { let entry_name = backend.zip_entry_name.as_ref().ok_or_else(|| { RuntimeError::Unsupported(format!( @@ -161,7 +154,11 @@ fn prescan_assembly_anchors( backend.path.display() )) })?; - vote(BufReader::new(entry), is_gsgt) + if is_gzip_text_name(&entry_name.to_ascii_lowercase()) { + vote(BufReader::new(MultiGzDecoder::new(entry)), is_gsgt) + } else { + vote(BufReader::new(entry), is_gsgt) + } } _ => Ok(None), } @@ -337,13 +334,7 @@ pub(crate) fn scan_delimited_variants( match backend.format { GenotypeSourceFormat::Text => { - let file = File::open(&backend.path).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype file {}: {err}", - backend.path.display() - )) - })?; - let mut reader = BufReader::new(file); + let mut reader = open_text_reader(backend)?; scan_reader(&mut reader)?; } GenotypeSourceFormat::Zip => { @@ -371,7 +362,12 @@ pub(crate) fn scan_delimited_variants( backend.path.display() )) })?; - let mut reader = BufReader::new(entry); + let mut reader: Box = + if is_gzip_text_name(&entry_name.to_ascii_lowercase()) { + Box::new(BufReader::new(MultiGzDecoder::new(entry))) + } else { + Box::new(BufReader::new(entry)) + }; scan_reader(&mut reader)?; } _ => { @@ -401,3 +397,26 @@ pub(crate) fn scan_delimited_variants( Ok(results) } + +fn open_text_reader(backend: &DelimitedBackend) -> Result, RuntimeError> { + let file = File::open(&backend.path).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype file {}: {err}", + backend.path.display() + )) + })?; + let lower = backend.path.to_string_lossy().to_ascii_lowercase(); + if lower.ends_with(".gz") || lower.ends_with(".bgz") { + return Ok(Box::new(BufReader::new(MultiGzDecoder::new(file)))); + } + Ok(Box::new(BufReader::new(file))) +} + +fn is_gzip_text_name(lower_name: &str) -> bool { + lower_name.ends_with(".txt.gz") + || lower_name.ends_with(".txt.bgz") + || lower_name.ends_with(".tsv.gz") + || lower_name.ends_with(".tsv.bgz") + || lower_name.ends_with(".csv.gz") + || lower_name.ends_with(".csv.bgz") +} diff --git a/rust/bioscript-formats/src/genotype/io.rs b/rust/bioscript-formats/src/genotype/io.rs index 8b35553..fc8302d 100644 --- a/rust/bioscript-formats/src/genotype/io.rs +++ b/rust/bioscript-formats/src/genotype/io.rs @@ -4,6 +4,7 @@ use std::{ path::Path, }; +use flate2::read::MultiGzDecoder; use zip::ZipArchive; use bioscript_core::RuntimeError; @@ -23,6 +24,10 @@ pub(crate) fn read_plain_lines(path: &Path) -> Result, RuntimeError> path.display() )) })?; + let lower = path.to_string_lossy().to_ascii_lowercase(); + if lower.ends_with(".gz") || lower.ends_with(".bgz") { + return read_lines_from_reader(BufReader::new(MultiGzDecoder::new(file)), path); + } read_lines_from_reader(BufReader::new(file), path) } @@ -54,8 +59,14 @@ pub(crate) fn select_zip_entry(path: &Path) -> Result { let name = entry.name().to_owned(); let lower = name.to_ascii_lowercase(); if lower.ends_with(".txt") + || lower.ends_with(".txt.gz") + || lower.ends_with(".txt.bgz") || lower.ends_with(".csv") + || lower.ends_with(".csv.gz") + || lower.ends_with(".csv.bgz") || lower.ends_with(".tsv") + || lower.ends_with(".tsv.gz") + || lower.ends_with(".tsv.bgz") || lower.ends_with(".vcf") || lower.ends_with(".vcf.gz") || lower.ends_with(".bcf") diff --git a/rust/bioscript-formats/src/genotype/load.rs b/rust/bioscript-formats/src/genotype/load.rs new file mode 100644 index 0000000..235999d --- /dev/null +++ b/rust/bioscript-formats/src/genotype/load.rs @@ -0,0 +1,487 @@ +use std::{ + fs::File, + io::{BufRead, BufReader, Cursor, Read}, + path::Path, +}; + +use flate2::read::MultiGzDecoder; +use zip::ZipArchive; + +use bioscript_core::RuntimeError; + +use super::{ + io::{detect_source_format, looks_like_vcf_lines, read_lines_from_reader, select_zip_entry}, + loaders, + types::{ + AlignmentBytesBackend, BamBackend, BcfBackend, BcfSource, CramBackend, DelimitedBackend, + GenotypeLoadOptions, GenotypeSourceFormat, GenotypeStore, QueryBackend, VcfBackend, + }, +}; + +impl GenotypeStore { + pub fn from_file(path: &Path) -> Result { + Self::from_file_with_options(path, &GenotypeLoadOptions::default()) + } + + pub fn from_file_with_options( + path: &Path, + options: &GenotypeLoadOptions, + ) -> Result { + match detect_source_format(path, options.format)? { + GenotypeSourceFormat::Text => Ok(Self::from_delimited_file( + path, + GenotypeSourceFormat::Text, + None, + options, + )), + GenotypeSourceFormat::Zip => Self::from_zip_file(path, options), + GenotypeSourceFormat::Vcf => Ok(Self::from_vcf_file(path, options)), + GenotypeSourceFormat::Bcf => { + if path + .to_string_lossy() + .to_ascii_lowercase() + .ends_with(".zip") + { + Self::from_zip_file(path, options) + } else { + Ok(Self::from_bcf_file(path, options)) + } + } + GenotypeSourceFormat::Cram => Self::from_cram_file(path, options), + GenotypeSourceFormat::Bam => Ok(Self::from_bam_file(path, options)), + } + } + + pub fn from_bytes(name: &str, bytes: &[u8]) -> Result { + Self::from_bytes_with_options(name, bytes, &GenotypeLoadOptions::default()) + } + + pub fn from_bytes_with_options( + name: &str, + bytes: &[u8], + options: &GenotypeLoadOptions, + ) -> Result { + if let Some(format) = options.format { + return match format { + GenotypeSourceFormat::Text => { + let reader = BufReader::new(Cursor::new(bytes)); + Self::from_delimited_reader(GenotypeSourceFormat::Text, reader, name) + } + GenotypeSourceFormat::Zip => Self::from_zip_bytes(name, bytes), + GenotypeSourceFormat::Vcf => Self::from_vcf_bytes(name, bytes), + GenotypeSourceFormat::Bcf => Ok(Self::from_bcf_bytes(name, bytes, options)), + GenotypeSourceFormat::Cram | GenotypeSourceFormat::Bam => { + Err(RuntimeError::Unsupported(format!( + "{format:?} input requires alignment byte loading" + ))) + } + }; + } + + // The report pipeline hands us a fixed virtual path (`/input/genotypes`) + // with no extension, so we cannot rely on `name` alone for format + // detection the way `from_file_with_options` can. Sniff the leading + // bytes so a zip/VCF payload is recognised regardless of the name. + let lower = name.to_ascii_lowercase(); + if lower.ends_with(".zip") || bytes_look_like_zip(bytes) { + return Self::from_zip_bytes(name, bytes); + } + if lower.ends_with(".bcf") { + return Ok(Self::from_bcf_bytes( + name, + bytes, + &GenotypeLoadOptions::default(), + )); + } + if lower.ends_with(".vcf") || lower.ends_with(".vcf.gz") || bytes_look_like_vcf(bytes) { + return Self::from_vcf_bytes(name, bytes); + } + if bytes_look_like_gzip(bytes) { + if gzip_bytes_look_like_vcf(bytes)? { + return Self::from_vcf_bytes(name, bytes); + } + return Self::from_delimited_reader( + GenotypeSourceFormat::Text, + BufReader::new(MultiGzDecoder::new(Cursor::new(bytes))), + name, + ); + } + let reader = BufReader::new(Cursor::new(bytes)); + Self::from_delimited_reader(GenotypeSourceFormat::Text, reader, name) + } + + fn from_zip_bytes(name: &str, bytes: &[u8]) -> Result { + let mut archive = ZipArchive::new(Cursor::new(bytes)).map_err(|err| { + RuntimeError::Io(format!("failed to read genotype zip {name}: {err}")) + })?; + let bcf_entries = collect_bcf_zip_entries_from_archive(&mut archive, name)?; + if !bcf_entries.is_empty() { + let mut entries = Vec::with_capacity(bcf_entries.len()); + for entry_name in bcf_entries { + let mut entry = archive.by_name(&entry_name).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype entry {entry_name} in {name}: {err}" + )) + })?; + let mut data = Vec::new(); + std::io::Read::read_to_end(&mut entry, &mut data).map_err(|err| { + RuntimeError::Io(format!( + "failed to read genotype entry {entry_name} in {name}: {err}" + )) + })?; + entries.push((entry_name, data)); + } + return Ok(Self { + backend: QueryBackend::Bcf(BcfBackend { + source: BcfSource::ZipBytes { + name: name.to_owned(), + entries, + }, + options: GenotypeLoadOptions::default(), + }), + }); + } + let mut selected = None; + for idx in 0..archive.len() { + let entry = archive.by_index(idx).map_err(|err| { + RuntimeError::Io(format!("failed to inspect genotype zip {name}: {err}")) + })?; + if entry.is_dir() { + continue; + } + let entry_name = entry.name().to_owned(); + let lower = entry_name.to_ascii_lowercase(); + if lower.ends_with(".vcf") + || lower.ends_with(".vcf.gz") + || lower.ends_with(".bcf") + || lower.ends_with(".txt") + || lower.ends_with(".txt.gz") + || lower.ends_with(".txt.bgz") + || lower.ends_with(".tsv") + || lower.ends_with(".tsv.gz") + || lower.ends_with(".tsv.bgz") + || lower.ends_with(".csv") + || lower.ends_with(".csv.gz") + || lower.ends_with(".csv.bgz") + { + selected = Some(entry_name); + break; + } + } + let selected = selected.ok_or_else(|| { + RuntimeError::Unsupported(format!( + "zip archive {name} does not contain a supported genotype file" + )) + })?; + let entry = archive.by_name(&selected).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype entry {selected} in {name}: {err}" + )) + })?; + let label = format!("genotype entry {selected} in {name}"); + // Stream-decompress directly off the zip reader so we never have to + // materialize the entire decompressed entry in memory. GenesForGood + // exports decompress to >128MB which used to trip the old + // `read_zip_entry_limited` cap; the cap is gone because the streaming + // parser keeps memory bounded to the rsid map itself. + if selected.to_ascii_lowercase().ends_with(".vcf.gz") { + return Self::from_vcf_zip_entry(entry, &label); + } + if is_gzip_text_name(&selected.to_ascii_lowercase()) { + return Self::from_delimited_reader( + GenotypeSourceFormat::Zip, + BufReader::new(MultiGzDecoder::new(entry)), + &label, + ); + } + let reader = BufReader::new(entry); + if selected.to_ascii_lowercase().ends_with(".vcf") { + return Self::from_vcf_reader(reader, &label); + } + if selected.to_ascii_lowercase().ends_with(".bcf") { + let mut entry = reader.into_inner(); + let mut data = Vec::new(); + std::io::Read::read_to_end(&mut entry, &mut data).map_err(|err| { + RuntimeError::Io(format!( + "failed to read genotype entry {selected} in {name}: {err}" + )) + })?; + return Ok(Self::from_bcf_bytes( + &label, + &data, + &GenotypeLoadOptions::default(), + )); + } + Self::from_delimited_reader(GenotypeSourceFormat::Zip, reader, &label) + } + + fn from_vcf_file(path: &Path, options: &GenotypeLoadOptions) -> Self { + Self { + backend: QueryBackend::Vcf(VcfBackend { + path: path.to_path_buf(), + options: options.clone(), + }), + } + } + + fn from_bcf_file(path: &Path, options: &GenotypeLoadOptions) -> Self { + Self { + backend: QueryBackend::Bcf(BcfBackend { + source: BcfSource::File(path.to_path_buf()), + options: options.clone(), + }), + } + } + + fn from_bcf_bytes(name: &str, bytes: &[u8], options: &GenotypeLoadOptions) -> Self { + Self { + backend: QueryBackend::Bcf(BcfBackend { + source: BcfSource::Bytes { + name: name.to_owned(), + data: bytes.to_vec(), + }, + options: options.clone(), + }), + } + } + + fn from_vcf_bytes(name: &str, bytes: &[u8]) -> Result { + let lower = name.to_ascii_lowercase(); + if lower.ends_with(".vcf.gz") || bytes_look_like_gzip(bytes) { + return Self::from_vcf_reader( + BufReader::new(MultiGzDecoder::new(Cursor::new(bytes))), + name, + ); + } + Self::from_vcf_reader(BufReader::new(Cursor::new(bytes)), name) + } + + fn from_zip_file(path: &Path, options: &GenotypeLoadOptions) -> Result { + let bcf_entries = collect_bcf_zip_entries(path)?; + if !bcf_entries.is_empty() { + return Ok(Self { + backend: QueryBackend::Bcf(BcfBackend { + source: BcfSource::ZipFile { + path: path.to_path_buf(), + entries: bcf_entries, + }, + options: options.clone(), + }), + }); + } + let selected = select_zip_entry(path)?; + let lower = selected.to_ascii_lowercase(); + if lower.ends_with(".vcf.gz") { + let file = File::open(path).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype zip {}: {err}", + path.display() + )) + })?; + let mut archive = ZipArchive::new(file).map_err(|err| { + RuntimeError::Io(format!( + "failed to read genotype zip {}: {err}", + path.display() + )) + })?; + let entry = archive.by_name(&selected).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype entry {selected} in {}: {err}", + path.display() + )) + })?; + let label = format!("genotype entry {selected} in {}", path.display()); + return Self::from_vcf_zip_entry(entry, &label); + } + if lower.ends_with(".vcf") { + let file = File::open(path).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype zip {}: {err}", + path.display() + )) + })?; + let mut archive = ZipArchive::new(file).map_err(|err| { + RuntimeError::Io(format!( + "failed to read genotype zip {}: {err}", + path.display() + )) + })?; + let entry = archive.by_name(&selected).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype entry {selected} in {}: {err}", + path.display() + )) + })?; + let lines = read_lines_from_reader(BufReader::new(entry), path)?; + return Self::from_vcf_lines(lines); + } + Ok(Self::from_delimited_file( + path, + GenotypeSourceFormat::Zip, + Some(selected), + options, + )) + } + + fn from_cram_file(path: &Path, options: &GenotypeLoadOptions) -> Result { + Ok(Self { + backend: QueryBackend::Cram(CramBackend { + path: path.to_path_buf(), + options: options.clone(), + }), + }) + } + + fn from_bam_file(path: &Path, options: &GenotypeLoadOptions) -> Self { + Self { + backend: QueryBackend::Bam(BamBackend { + path: path.to_path_buf(), + options: options.clone(), + }), + } + } + + /// Build an in-memory CRAM/BAM store. `kind` is `Cram` or `Bam`; `index` + /// is the `.crai`/`.bai` bytes; `reference`/`reference_index` are the + /// FASTA + `.fai` bytes (CRAM only -- pass empty for BAM). Used by the + /// report pipeline, which virtualizes the genotype input. + #[must_use] + pub fn from_alignment_bytes( + kind: GenotypeSourceFormat, + data: Vec, + index: Vec, + reference: Vec, + reference_index: Vec, + options: &GenotypeLoadOptions, + ) -> Self { + Self { + backend: QueryBackend::AlignmentBytes(AlignmentBytesBackend { + kind, + data, + index, + reference, + reference_index, + options: options.clone(), + }), + } + } + + fn from_vcf_reader(reader: R, label: &str) -> Result { + loaders::from_vcf_reader(reader, label) + } + + fn from_vcf_zip_entry(entry: R, label: &str) -> Result { + let mut reader = BufReader::new(entry); + let is_gzip = bytes_look_like_gzip( + reader + .fill_buf() + .map_err(|err| RuntimeError::Io(format!("failed to inspect {label}: {err}")))?, + ); + if is_gzip { + return Self::from_vcf_reader(BufReader::new(MultiGzDecoder::new(reader)), label); + } + Self::from_vcf_reader(reader, label) + } + + fn from_delimited_reader( + format: GenotypeSourceFormat, + reader: R, + label: &str, + ) -> Result { + loaders::from_delimited_reader(format, reader, label) + } + + fn from_vcf_lines(lines: Vec) -> Result { + loaders::from_vcf_lines(lines) + } + + fn from_delimited_file( + path: &Path, + format: GenotypeSourceFormat, + zip_entry_name: Option, + options: &GenotypeLoadOptions, + ) -> Self { + Self { + backend: QueryBackend::Delimited(DelimitedBackend { + format, + path: path.to_path_buf(), + zip_entry_name, + options: options.clone(), + }), + } + } +} + +fn collect_bcf_zip_entries(path: &Path) -> Result, RuntimeError> { + let file = File::open(path).map_err(|err| { + RuntimeError::Io(format!( + "failed to open genotype zip {}: {err}", + path.display() + )) + })?; + let mut archive = ZipArchive::new(file).map_err(|err| { + RuntimeError::Io(format!( + "failed to read genotype zip {}: {err}", + path.display() + )) + })?; + collect_bcf_zip_entries_from_archive(&mut archive, &path.display().to_string()) +} + +fn collect_bcf_zip_entries_from_archive( + archive: &mut ZipArchive, + label: &str, +) -> Result, RuntimeError> { + let mut entries = Vec::new(); + for idx in 0..archive.len() { + let entry = archive.by_index(idx).map_err(|err| { + RuntimeError::Io(format!("failed to inspect genotype zip {label}: {err}")) + })?; + if entry.is_dir() { + continue; + } + let name = entry.name().to_owned(); + if name.to_ascii_lowercase().ends_with(".bcf") { + entries.push(name); + } + } + entries.sort(); + Ok(entries) +} + +fn bytes_look_like_zip(bytes: &[u8]) -> bool { + bytes.starts_with(b"PK\x03\x04") + || bytes.starts_with(b"PK\x05\x06") + || bytes.starts_with(b"PK\x07\x08") +} + +fn bytes_look_like_gzip(bytes: &[u8]) -> bool { + bytes.starts_with(&[0x1f, 0x8b]) +} + +fn gzip_bytes_look_like_vcf(bytes: &[u8]) -> Result { + let mut decoder = MultiGzDecoder::new(Cursor::new(bytes)); + let mut prefix = Vec::new(); + decoder + .by_ref() + .take(8192) + .read_to_end(&mut prefix) + .map_err(|err| RuntimeError::Io(format!("failed to read gzip genotype bytes: {err}")))?; + Ok(bytes_look_like_vcf(&prefix)) +} + +fn bytes_look_like_vcf(bytes: &[u8]) -> bool { + let prefix = &bytes[..bytes.len().min(8192)]; + let text = String::from_utf8_lossy(prefix); + let lines: Vec = text.lines().map(str::to_owned).collect(); + looks_like_vcf_lines(&lines) +} + +fn is_gzip_text_name(lower_name: &str) -> bool { + lower_name.ends_with(".txt.gz") + || lower_name.ends_with(".txt.bgz") + || lower_name.ends_with(".tsv.gz") + || lower_name.ends_with(".tsv.bgz") + || lower_name.ends_with(".csv.gz") + || lower_name.ends_with(".csv.bgz") +} diff --git a/rust/bioscript-formats/src/inspect/io.rs b/rust/bioscript-formats/src/inspect/io.rs index efca6c0..8970446 100644 --- a/rust/bioscript-formats/src/inspect/io.rs +++ b/rust/bioscript-formats/src/inspect/io.rs @@ -5,6 +5,7 @@ use std::{ }; use bioscript_core::RuntimeError; +use flate2::read::MultiGzDecoder; use noodles::bgzf; use zip::ZipArchive; @@ -19,6 +20,11 @@ pub(crate) fn read_plain_sample_lines_from_bytes( bytes, )))); } + if is_gzip_text_name(lower_name) { + return read_sample_lines_from_reader(BufReader::new(MultiGzDecoder::new(Cursor::new( + bytes, + )))); + } read_sample_lines_from_reader(BufReader::new(Cursor::new(bytes))) } @@ -42,6 +48,15 @@ pub(crate) fn read_zip_sample_lines_from_bytes( let reader = bgzf::io::Reader::new(Cursor::new(inner)); return read_sample_lines_from_reader(BufReader::new(reader)); } + if is_gzip_text_name(&selected_entry.to_ascii_lowercase()) { + let inner = read_entry_limited( + &mut entry, + MAX_ZIP_SAMPLE_ENTRY_BYTES, + &format!("compressed zip entry {selected_entry}"), + )?; + let reader = MultiGzDecoder::new(Cursor::new(inner)); + return read_sample_lines_from_reader(BufReader::new(reader)); + } read_sample_lines_from_reader(BufReader::new(entry)) } @@ -65,8 +80,11 @@ pub(crate) fn select_zip_entry_from_bytes(bytes: &[u8]) -> Result Result, Runtim if lower.ends_with(".vcf.gz") { return read_sample_lines_from_reader(BufReader::new(bgzf::io::Reader::new(file))); } + if is_gzip_text_name(&lower) { + return read_sample_lines_from_reader(BufReader::new(MultiGzDecoder::new(file))); + } read_sample_lines_from_reader(BufReader::new(file)) } @@ -116,10 +137,33 @@ pub(crate) fn read_zip_sample_lines( let reader = bgzf::io::Reader::new(Cursor::new(bytes)); return read_sample_lines_from_reader(BufReader::new(reader)); } + if is_gzip_text_name(&selected_entry.to_ascii_lowercase()) { + let bytes = read_entry_limited( + &mut entry, + MAX_ZIP_SAMPLE_ENTRY_BYTES, + &format!( + "compressed zip entry {selected_entry} in {}", + path.display() + ), + )?; + let reader = MultiGzDecoder::new(Cursor::new(bytes)); + return read_sample_lines_from_reader(BufReader::new(reader)); + } read_sample_lines_from_reader(BufReader::new(entry)) } +fn is_gzip_text_name(lower_name: &str) -> bool { + lower_name.ends_with(".txt.gz") + || lower_name.ends_with(".txt.bgz") + || lower_name.ends_with(".tsv.gz") + || lower_name.ends_with(".tsv.bgz") + || lower_name.ends_with(".csv.gz") + || lower_name.ends_with(".csv.bgz") + || (lower_name.ends_with(".gz") && !lower_name.ends_with(".vcf.gz")) + || (lower_name.ends_with(".bgz") && !lower_name.ends_with(".vcf.bgz")) +} + pub(crate) fn read_entry_limited( reader: &mut R, max_bytes: u64, @@ -176,8 +220,11 @@ pub(crate) fn select_zip_entry(path: &Path) -> Result { || lower.ends_with(".vcf.gz") || lower.ends_with(".bcf") || lower.ends_with(".txt") + || lower.ends_with(".txt.gz") || lower.ends_with(".tsv") + || lower.ends_with(".tsv.gz") || lower.ends_with(".csv") + || lower.ends_with(".csv.gz") { return Ok(name); }