diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a67097a..f2c7c4bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,13 +87,17 @@ jobs: google/cloud-sdk \ /run-bigtable.sh + - name: Build GCS Emulator + run: | + docker build -t gcs-emulator-local https://github.com/getsentry/google-cloud-storage-testbench.git#main + - name: Start GCS Emulator run: | docker run -d \ --name gcs-emulator \ -p 8087:9000 \ -e GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME=test-bucket \ - gcr.io/cloud-devrel-public-resources/storage-testbench:latest + gcs-emulator-local - name: Start MinIO run: | diff --git a/devservices/config.yml b/devservices/config.yml index eef642e1..ce6cf959 100644 --- a/devservices/config.yml +++ b/devservices/config.yml @@ -63,7 +63,7 @@ services: - orchestrator=devservices restart: unless-stopped gcs: - image: gcr.io/cloud-devrel-public-resources/storage-testbench:latest + build: https://github.com/getsentry/google-cloud-storage-testbench.git#main environment: GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME: test-bucket healthcheck: diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index fb77585d..eddef040 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -820,6 +820,9 @@ impl From for crate::multipart::CompleteMultipartError { } } +/// XXX: Any change that affects this implementation should be manually tested against real GCS. +/// That's because the fork of [storage-testbench](https://github.com/googleapis/storage-testbench) +/// that we test against has an incomplete implementation of the XML multipart API that likely doesn't match GCS's behavior in many cases. #[async_trait::async_trait] impl MultipartUploadBackend for GcsBackend { #[tracing::instrument(level = "trace", fields(?id), skip_all)] @@ -1017,6 +1020,7 @@ mod tests { use super::*; use crate::id::ObjectContext; + use crate::multipart::CompletedPart; use crate::stream; // NB: Not run any of these tests, you need to have a GCS emulator running. This is done @@ -1333,4 +1337,390 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_multipart_single_part() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_mins(33)), + origin: Some("203.0.113.42".into()), + custom: BTreeMap::from_iter([("hello".into(), "world".into())]), + ..Default::default() + }; + + let upload_id = backend.initiate_multipart(&id, &metadata).await?; + + let data = b"hello, multipart world!"; + let etag = backend + .upload_part( + &id, + &upload_id, + 1, + data.len() as u64, + None, + stream::single(data.to_vec()), + ) + .await?; + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await?; + assert!(result.is_none(), "expected no error on complete"); + + let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + assert_eq!(payload, data); + assert_eq!(meta.content_type, "text/plain".to_string()); + assert_eq!( + meta.expiration_policy, + ExpirationPolicy::TimeToLive(Duration::from_mins(33)) + ); + assert_eq!(meta.origin, Some("203.0.113.42".into())); + assert_eq!( + meta.custom, + BTreeMap::from_iter([("hello".into(), "world".into())]) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_multipart_multiple_parts() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await?; + + // Non-final parts must be >= 5 MiB. + const MIN_PART: usize = 5 * 1024 * 1024; + let part1 = vec![b'a'; MIN_PART]; + let part2 = vec![b'b'; MIN_PART]; + let part3 = b"cccc".to_vec(); + + let etag1 = backend + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await?; + let etag2 = backend + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await?; + let etag3 = backend + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await?; + + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![ + CompletedPart { + part_number: 1, + etag: etag1, + }, + CompletedPart { + part_number: 2, + etag: etag2, + }, + CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await?; + assert!(result.is_none(), "expected no error on complete"); + + // Object exists after complete + let (_meta, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + let mut expected = Vec::new(); + expected.extend_from_slice(&part1); + expected.extend_from_slice(&part2); + expected.extend_from_slice(&part3); + assert_eq!(payload, expected); + + Ok(()) + } + + #[tokio::test] + async fn test_multipart_out_of_order_upload() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await?; + + // Non-final parts must be >= 5 MiB. + const MIN_PART: usize = 5 * 1024 * 1024; + let part1 = vec![b'a'; MIN_PART]; + let part2 = vec![b'b'; MIN_PART]; + let part3 = b"cccc".to_vec(); + + // Upload parts out of order: 2, 3, 1. + let etag2 = backend + .upload_part( + &id, + &upload_id, + 2, + part2.len() as u64, + None, + stream::single(part2.clone()), + ) + .await?; + let etag3 = backend + .upload_part( + &id, + &upload_id, + 3, + part3.len() as u64, + None, + stream::single(part3.clone()), + ) + .await?; + let etag1 = backend + .upload_part( + &id, + &upload_id, + 1, + part1.len() as u64, + None, + stream::single(part1.clone()), + ) + .await?; + + // Complete with parts listed in order. + let result = backend + .complete_multipart( + &id, + &upload_id, + vec![ + CompletedPart { + part_number: 1, + etag: etag1, + }, + CompletedPart { + part_number: 2, + etag: etag2, + }, + CompletedPart { + part_number: 3, + etag: etag3, + }, + ], + ) + .await?; + assert!(result.is_none(), "expected no error on complete"); + + // Verify reassembly order matches part numbers, not upload order. + let (_meta, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + let mut expected = Vec::new(); + expected.extend_from_slice(&part1); + expected.extend_from_slice(&part2); + expected.extend_from_slice(&part3); + assert_eq!(payload, expected); + + Ok(()) + } + + #[tokio::test] + async fn test_multipart_list_parts() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await?; + + let etag1 = backend + .upload_part(&id, &upload_id, 1, 3, None, stream::single(b"aaa".to_vec())) + .await?; + let etag2 = backend + .upload_part(&id, &upload_id, 2, 3, None, stream::single(b"bbb".to_vec())) + .await?; + + // List all parts. + let list = backend.list_parts(&id, &upload_id, None, None).await?; + assert_eq!(list.parts.len(), 2); + assert_eq!(list.parts[0].part_number, 1); + assert_eq!(list.parts[0].etag, etag1); + assert_eq!(list.parts[0].size, 3); + assert_eq!(list.parts[1].part_number, 2); + assert_eq!(list.parts[1].etag, etag2); + assert_eq!(list.parts[1].size, 3); + + // List with max_parts=1 to test pagination. + let page1 = backend.list_parts(&id, &upload_id, Some(1), None).await?; + assert_eq!(page1.parts.len(), 1); + assert_eq!(page1.parts[0].part_number, 1); + assert!(page1.is_truncated); + assert!(page1.next_part_number_marker.is_some()); + + let page2 = backend + .list_parts(&id, &upload_id, Some(1), page1.next_part_number_marker) + .await?; + assert_eq!(page2.parts.len(), 1); + assert_eq!(page2.parts[0].part_number, 2); + + // Clean up. + backend.abort_multipart(&id, &upload_id).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_multipart_abort() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata::default(); + + let upload_id = backend.initiate_multipart(&id, &metadata).await?; + + backend + .upload_part( + &id, + &upload_id, + 1, + 5, + None, + stream::single(b"hello".to_vec()), + ) + .await?; + + backend.abort_multipart(&id, &upload_id).await?; + + // Object should not exist after abort. + let result = backend.get_object(&id).await?; + assert!(result.is_none(), "object should not exist after abort"); + + Ok(()) + } + + async fn multipart_put( + backend: &GcsBackend, + id: &ObjectId, + metadata: &Metadata, + payload: impl Into, + ) -> Result<()> { + let payload: bytes::Bytes = payload.into(); + let upload_id = backend.initiate_multipart(id, metadata).await?; + let etag = backend + .upload_part( + id, + &upload_id, + 1, + payload.len() as u64, + None, + stream::single(payload), + ) + .await?; + let error = backend + .complete_multipart( + id, + &upload_id, + vec![CompletedPart { + part_number: 1, + etag, + }], + ) + .await?; + assert!( + error.is_none(), + "complete_multipart returned error: {error:?}" + ); + Ok(()) + } + + #[tokio::test] + async fn test_multipart_ttl_immediate() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata { + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)), + ..Default::default() + }; + + multipart_put(&backend, &id, &metadata, "hello, world").await?; + + let result = backend.get_object(&id).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_multipart_tti_immediate() -> Result<()> { + let backend = create_test_backend().await?; + let id = make_id(); + let metadata = Metadata { + expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)), + ..Default::default() + }; + + multipart_put(&backend, &id, &metadata, "hello, world").await?; + + let result = backend.get_object(&id).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_multipart_compressed_payload_roundtrip() -> Result<()> { + use objectstore_types::metadata::Compression; + + let backend = create_test_backend().await?; + + let plaintext = b"hello, world (but compressed with zstd)"; + let compressed = zstd::encode_all(&plaintext[..], 3)?; + + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + compression: Some(Compression::Zstd), + ..Default::default() + }; + + multipart_put(&backend, &id, &metadata, compressed.clone()).await?; + + let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + + assert_eq!(meta.compression, Some(Compression::Zstd)); + assert_eq!( + payload, compressed, + "Payload should be returned still compressed, not auto-decompressed" + ); + + Ok(()) + } }