Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions objectstore-service/src/backend/tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl Backend for TieredStorage {
usecase = id.usecase().to_owned(),
backend_choice = backend_choice.as_str(),
backend_type = backend_ty,
upload_type = "direct",
);

Ok(())
Expand Down Expand Up @@ -604,17 +605,23 @@ impl MultipartUploadBackend for TieredStorage {
id: &ObjectId,
metadata: &Metadata,
) -> Result<InitiateMultipartResponse> {
let start = Instant::now();
let physical = new_long_term_revision(id);

let id = self
let upload_id = self
.inner
.long_term
.initiate_multipart(&physical, metadata)
.await?;

objectstore_metrics::record!(
"multipart.initiate.latency" = start.elapsed(),
usecase = id.usecase().to_owned(),
);

let id = TieredUploadId {
revision: physical.key,
upload_id: id,
upload_id,
};
id.try_into()
}
Expand All @@ -628,14 +635,16 @@ impl MultipartUploadBackend for TieredStorage {
content_md5: Option<&str>,
body: ClientStream,
) -> Result<UploadPartResponse> {
let start = Instant::now();
let tiered: TieredUploadId = upload_id.try_into()?;

let physical = ObjectId {
context: id.context.clone(),
key: tiered.revision,
};

self.inner
let etag = self
.inner
.long_term
.upload_part(
&physical,
Expand All @@ -645,7 +654,18 @@ impl MultipartUploadBackend for TieredStorage {
content_md5,
body,
)
.await
.await?;

objectstore_metrics::record!(
"multipart.upload_part.latency" = start.elapsed(),
usecase = id.usecase().to_owned(),
);
objectstore_metrics::record!(
"multipart.upload_part.size" = content_length,
usecase = id.usecase().to_owned(),
);
Comment thread
cursor[bot] marked this conversation as resolved.

Ok(etag)
}

async fn list_parts(
Expand All @@ -655,35 +675,53 @@ impl MultipartUploadBackend for TieredStorage {
max_parts: Option<u32>,
part_number_marker: Option<PartNumber>,
) -> Result<ListPartsResponse> {
let start = Instant::now();
let tiered: TieredUploadId = upload_id.try_into()?;

let physical = ObjectId {
context: id.context.clone(),
key: tiered.revision,
};

self.inner
let result = self
.inner
.long_term
.list_parts(&physical, &tiered.upload_id, max_parts, part_number_marker)
.await
.await;

objectstore_metrics::record!(
"multipart.list_parts.latency" = start.elapsed(),
usecase = id.usecase().to_owned(),
);

result
}

async fn abort_multipart(
&self,
id: &ObjectId,
upload_id: &UploadId,
) -> Result<AbortMultipartResponse> {
let start = Instant::now();
let tiered: TieredUploadId = upload_id.try_into()?;

let physical = ObjectId {
context: id.context.clone(),
key: tiered.revision,
};

self.inner
let result = self
.inner
.long_term
.abort_multipart(&physical, &tiered.upload_id)
.await
.await;

objectstore_metrics::record!(
"multipart.abort.latency" = start.elapsed(),
usecase = id.usecase().to_owned(),
);

result
}

async fn complete_multipart(
Expand All @@ -692,6 +730,8 @@ impl MultipartUploadBackend for TieredStorage {
upload_id: &UploadId,
parts: Vec<CompletedPart>,
) -> Result<CompleteMultipartResponse> {
let start = Instant::now();
let part_count = parts.len();
let tiered: TieredUploadId = upload_id.try_into()?;

let physical = ObjectId {
Expand Down Expand Up @@ -790,6 +830,24 @@ impl MultipartUploadBackend for TieredStorage {
// Update guard and let it schedule cleanup in the background.
guard.advance(ChangePhase::compare_and_write(written));

objectstore_metrics::record!(
"multipart.complete.latency" = start.elapsed(),
usecase = id.usecase().to_owned(),
);
objectstore_metrics::record!(
"multipart.complete.part_count" = part_count as u64,
usecase = id.usecase().to_owned(),
);
if let Some(size) = metadata.size {
objectstore_metrics::record!(
"put.size" = size as u64,
usecase = id.usecase().to_owned(),
backend_choice = BackendChoice::LongTerm.as_str(),
backend_type = self.backend_type(&BackendChoice::LongTerm),
upload_type = "multipart",
);
}
Comment thread
lcian marked this conversation as resolved.
Comment thread
lcian marked this conversation as resolved.

Ok(None)
}
}
Expand Down
Loading