Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions lib/api/apiUtils/object/sourceChecksum.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const { PassThrough } = require('stream');
const async = require('async');
const { jsutil } = require('arsenal');

const { data } = require('../../../data/wrapper');
const ChecksumWritable = require('../../../auth/streamingV4/ChecksumWritable');

/**
* Sequentially GET the ordered source `dataLocator` parts into a single
* readable stream, reading them in order through `data.get`. Returns the
* PassThrough immediately; consumers should pipe it onward and observe
* `error` for any read failure along the way.
*
* @param {Array} dataLocator - ordered source parts
* @param {object} log - request logger
* @return {PassThrough}
*/
function buildSourcePartsStream(dataLocator, log) {
const passthrough = new PassThrough();
const wrapErr = (err, part) =>
Object.assign(err, {
copyPart: { key: part.key, dataStoreName: part.dataStoreName, dataStoreType: part.dataStoreType },
});
async.eachSeries(
dataLocator,
(part, cb) => {
const done = jsutil.once(cb);
if (part.dataStoreType === 'azure') {
// Azure's data.get writes part bytes into the provided writable
// instead of returning a Readable. Pipe a per-part PassThrough
// into the master passthrough and use its 'end' as the completion
// signal — same pattern arsenal's data.copyObject uses.
const perPart = new PassThrough();
perPart.once('error', err => done(wrapErr(err, part)));
perPart.once('end', () => done());
perPart.pipe(passthrough, { end: false });
return data.get(part, perPart, log, err => {
if (err) {
perPart.destroy(err);
done(wrapErr(err, part));
}
});
}
return data.get(part, null, log, (err, partStream) => {
if (err) {
return done(wrapErr(err, part));
}
partStream.once('error', err => done(wrapErr(err, part)));
partStream.once('end', () => done());
partStream.pipe(passthrough, { end: false });
return undefined;
});
},

Check notice

Code scanning / CodeQL

Callback-style function (async migration) Note

This function uses a callback parameter ('cb'). Refactor to async/await.
Comment on lines +26 to +53
err => {
if (err) {
passthrough.destroy(err);
} else {
passthrough.end();
}
},
);
return passthrough;
}

/**
* Compute the checksum of the (range-adjusted) source bytes by streaming them
* through a ChecksumWritable sink. An empty `dataLocator` ends the stream
* immediately, yielding the empty-input digest.
*
* @param {Array} dataLocator - ordered source parts
* @param {string} algorithm - lowercase checksum algorithm name
* @param {object} log - request logger
* @param {function} cb - cb(err, { algorithm, value })
* @return {undefined}
*/
function computeChecksumFromDataLocator(dataLocator, algorithm, log, cb) {
const onceCb = jsutil.once(cb);
const sourceStream = buildSourcePartsStream(dataLocator || [], log);
const checksumSink = new ChecksumWritable(algorithm, log);
sourceStream.once('error', err => {
checksumSink.destroy(err);
onceCb(err);
});
checksumSink.once('error', onceCb);
checksumSink.once('finish', () => onceCb(null, { algorithm, value: checksumSink.digest }));
sourceStream.pipe(checksumSink);
}

Check notice

Code scanning / CodeQL

Callback-style function (async migration) Note

This function uses a callback parameter ('cb'). Refactor to async/await.
Comment on lines +76 to +87

module.exports = { buildSourcePartsStream, computeChecksumFromDataLocator };
68 changes: 7 additions & 61 deletions lib/api/objectCopy.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const async = require('async');
const { PassThrough } = require('stream');

const { errors, errorInstances, jsutil, versioning, s3middleware, s3routes } = require('arsenal');
const { validateObjectKeyLength } = s3routes.routesUtils;
Expand Down Expand Up @@ -33,6 +32,7 @@ const {
} = require('./apiUtils/integrity/validateChecksums');
const ChecksumTransform = require('../auth/streamingV4/ChecksumTransform');
const ChecksumWritable = require('../auth/streamingV4/ChecksumWritable');
const { buildSourcePartsStream } = require('./apiUtils/object/sourceChecksum');
const kms = require('../kms/wrapper');

const versionIdUtils = versioning.VersionID;
Expand Down Expand Up @@ -66,63 +66,6 @@ function _orphanedDataLocations(dataToDelete, newDataGetInfo) {
return orphans.length > 0 ? orphans : null;
}

/**
* Concatenate the source object's parts into a single Readable stream by
* reading them sequentially through `data.get`. Returns the PassThrough
* immediately; consumers should pipe it to the next stage and observe
* `error` on this stream for any read failure along the way.
*
* @param {Array} dataLocator - ordered source parts
* @param {object} log - request logger
* @return {PassThrough}
*/
function _pipeSourcePartsThrough(dataLocator, log) {
const passthrough = new PassThrough();
const wrapErr = (err, part) =>
Object.assign(err, {
copyPart: { key: part.key, dataStoreName: part.dataStoreName, dataStoreType: part.dataStoreType },
});
async.eachSeries(
dataLocator,
(part, cb) => {
const done = jsutil.once(cb);
if (part.dataStoreType === 'azure') {
// Azure's data.get writes part bytes into the provided writable
// instead of returning a Readable. Pipe a per-part PassThrough
// into the master passthrough and use its 'end' as the completion
// signal — same pattern arsenal's data.copyObject uses.
const perPart = new PassThrough();
perPart.once('error', err => done(wrapErr(err, part)));
perPart.once('end', () => done());
perPart.pipe(passthrough, { end: false });
return data.get(part, perPart, log, err => {
if (err) {
perPart.destroy(err);
done(wrapErr(err, part));
}
});
}
return data.get(part, null, log, (err, partStream) => {
if (err) {
return done(wrapErr(err, part));
}
partStream.once('error', err => done(wrapErr(err, part)));
partStream.once('end', () => done());
partStream.pipe(passthrough, { end: false });
return undefined;
});
},
err => {
if (err) {
passthrough.destroy(err);
} else {
passthrough.end();
}
},
);
return passthrough;
}

/**
* Decide whether the destination's checksum needs to be recomputed by
* streaming the source bytes through a ChecksumTransform.
Expand Down Expand Up @@ -207,7 +150,7 @@ function _recomputeChecksumAndStore(
algorithm: algoName,
size: storeMetadataParams.size,
});
const sourceStream = _pipeSourcePartsThrough(dataLocator, log);
const sourceStream = buildSourcePartsStream(dataLocator, log);
const checksumSink = new ChecksumWritable(algoName, log);
const finish = jsutil.once(err => {
if (err) {
Expand Down Expand Up @@ -236,7 +179,7 @@ function _recomputeChecksumAndStore(

// Stream source bytes through a ChecksumTransform and write them out as a single put.
log.debug('recomputing checksum on CopyObject', { algorithm: algoName, size: storeMetadataParams.size });
const sourceStream = _pipeSourcePartsThrough(dataLocator, log);
const sourceStream = buildSourcePartsStream(dataLocator, log);
const checksumStream = new ChecksumTransform(algoName, undefined, false, log);
const done = jsutil.once((err, results) => {
if (err) {
Expand Down Expand Up @@ -362,11 +305,14 @@ function _prepMetadata(
};
}
// Cannot copy from same source and destination if no MD
// changed and no source version id
// changed and no source version id. A requested checksum algorithm counts
// as a change (it (re)computes the object's checksum in place), matching
// AWS, so it is excluded from this guard.
if (
sourceIsDestination &&
whichMetadata === 'COPY' &&
Object.keys(overrideMetadata).length === 0 &&
!headers['x-amz-checksum-algorithm'] &&
!sourceVersionId
) {
return {
Expand Down
Loading
Loading