Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 90 additions & 15 deletions vortex-io/src/filesystem/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@ impl dyn FileSystem + '_ {
validate_glob(pattern)?;

// If there are no glob characters, the pattern is an exact file path.
// Return it directly without listing the filesystem.
// Use [`list`](FileSystem::list) with the full path as the prefix so that
// the filesystem confirms the file exists and populates its size, then
// filter to the exact match to avoid yielding prefix-collisions such as
// `foo.vortex.backup` when the caller asked for `foo.vortex`.
if !pattern.contains(['*', '?', '[']) {
let listing = FileListing {
path: pattern.to_string(),
size: None,
};
return Ok(futures::stream::once(async { Ok(listing) }).boxed());
let pattern = pattern.to_string();
let stream = self
.list(&pattern)
.try_filter(move |listing| {
let matches = listing.path == pattern;
async move { matches }
})
.into_stream()
.boxed();
return Ok(stream);
}

let glob_pattern = glob::Pattern::new(pattern)
Expand Down Expand Up @@ -87,20 +95,40 @@ mod tests {

use async_trait::async_trait;
use futures::TryStreamExt;
use futures::stream;
use parking_lot::Mutex;
use vortex_error::vortex_panic;

use super::*;
use crate::VortexReadAt;
use crate::filesystem::FileSystem;

/// A mock filesystem that panics if `list` is called.
/// A mock filesystem whose `list` records the requested prefix and returns
/// a preconfigured set of listings.
#[derive(Debug)]
struct NoListFileSystem;
struct MockFileSystem {
listings: Vec<FileListing>,
last_prefix: Mutex<Option<String>>,
}

impl MockFileSystem {
fn new(listings: Vec<FileListing>) -> Self {
Self {
listings,
last_prefix: Mutex::new(None),
}
}

fn last_prefix(&self) -> Option<String> {
self.last_prefix.lock().clone()
}
}

#[async_trait]
impl FileSystem for NoListFileSystem {
fn list(&self, _prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
vortex_panic!("list() should not be called for exact paths")
impl FileSystem for MockFileSystem {
fn list(&self, prefix: &str) -> BoxStream<'_, VortexResult<FileListing>> {
*self.last_prefix.lock() = Some(prefix.to_string());
stream::iter(self.listings.clone().into_iter().map(Ok)).boxed()
}

async fn open_read(&self, _path: &str) -> VortexResult<Arc<dyn VortexReadAt>> {
Expand All @@ -109,12 +137,59 @@ mod tests {
}

#[tokio::test]
async fn test_glob_exact_path_skips_list() -> VortexResult<()> {
let fs: &dyn FileSystem = &NoListFileSystem;
let results: Vec<FileListing> = fs.glob("data/file.vortex")?.try_collect().await?;
async fn test_glob_exact_path_existing_returns_listing_with_size() -> VortexResult<()> {
let fs = MockFileSystem::new(vec![FileListing {
path: "data/file.vortex".to_string(),
size: Some(1024),
}]);
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("data/file.vortex")?.try_collect().await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "data/file.vortex");
assert_eq!(results[0].size, None);
assert_eq!(
results[0].size,
Some(1024),
"exact-path glob should propagate the size reported by list"
);
assert_eq!(
fs.last_prefix().as_deref(),
Some("data/file.vortex"),
"exact path should be passed as the list prefix"
);
Ok(())
}

#[tokio::test]
async fn test_glob_exact_path_missing_returns_empty_stream() -> VortexResult<()> {
let fs = MockFileSystem::new(vec![]);
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("data/missing.vortex")?.try_collect().await?;
assert!(
results.is_empty(),
"missing exact path should yield an empty stream"
);
Ok(())
}

#[tokio::test]
async fn test_glob_exact_path_filters_prefix_collisions() -> VortexResult<()> {
// Object stores list by prefix, so `list("foo.vortex")` can return `foo.vortex.backup`.
// The exact-path branch must filter to an equality match.
let fs = MockFileSystem::new(vec![
FileListing {
path: "foo.vortex".to_string(),
size: Some(10),
},
FileListing {
path: "foo.vortex.backup".to_string(),
size: Some(20),
},
]);
let fs_dyn: &dyn FileSystem = &fs;
let results: Vec<FileListing> = fs_dyn.glob("foo.vortex")?.try_collect().await?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].path, "foo.vortex");
assert_eq!(results[0].size, Some(10));
Ok(())
}

Expand Down
Loading