From 9840ffabe1b084d90960a7732cad46a3e1823d5d Mon Sep 17 00:00:00 2001 From: sylvain senechal Date: Thu, 28 May 2026 19:32:59 +0200 Subject: [PATCH] Implement CRR Cascaded feature Issue: CLDSRV-897 --- lib/routes/routeBackbeat.js | 137 ++++++++- package.json | 12 +- scripts/crr-cascade-test.ts | 281 +++++++++++++++++++ tests/functional/backbeat/crrCascade.js | 354 ++++++++++++++++++++++++ tests/unit/routes/routeBackbeat.js | 42 +++ yarn.lock | 25 +- 6 files changed, 824 insertions(+), 27 deletions(-) create mode 100644 scripts/crr-cascade-test.ts create mode 100644 tests/functional/backbeat/crrCascade.js diff --git a/lib/routes/routeBackbeat.js b/lib/routes/routeBackbeat.js index cde9c63a50..8074016cc6 100644 --- a/lib/routes/routeBackbeat.js +++ b/lib/routes/routeBackbeat.js @@ -7,7 +7,13 @@ const joi = require('@hapi/joi'); const backbeatProxy = httpProxy.createProxyServer({ ignorePath: true, }); -const { auth, errors, errorInstances, s3middleware, s3routes, models, storage } = require('arsenal'); +const { auth, errors, errorInstances, s3middleware, s3routes, models, storage, versioning } = require('arsenal'); +const { decode, encode, compare: compareMicroVersionId, Ordering } = versioning.VersionID; +const { + VersionIdCollisionException, + StaleMicroVersionIdException, + MicroVersionIdAlreadyStoredException, +} = require('@scality/cloudserverclient'); const { responseJSONBody } = s3routes.routesUtils; const { getSubPartIds } = s3middleware.azureHelper.mpuUtils; @@ -21,6 +27,7 @@ const locationStorageCheck = require('../api/apiUtils/object/locationStorageChec const { dataStore } = require('../api/apiUtils/object/storeObject'); const prepareRequestContexts = require('../api/apiUtils/authorization/prepareRequestContexts'); const { decodeVersionId } = require('../api/apiUtils/object/versioning'); +const getReplicationInfo = require('../api/apiUtils/object/getReplicationInfo'); const locationKeysHaveChanged = require('../api/apiUtils/object/locationKeysHaveChanged'); const { standardMetadataValidateBucketAndObj, metadataGetObject } = require('../metadata/metadataUtils'); const { config } = require('../Config'); @@ -32,6 +39,7 @@ const { } = require('../api/apiUtils/integrity/validateChecksums'); const { BackendInfo } = models; const { pushReplicationMetric } = require('./utilities/pushReplicationMetric'); +const writeContinue = require('../utilities/writeContinue'); const kms = require('../kms/wrapper'); const { listLifecycleCurrents } = require('../api/backbeat/listLifecycleCurrents'); const { listLifecycleNonCurrents } = require('../api/backbeat/listLifecycleNonCurrents'); @@ -93,7 +101,7 @@ function _isObjectRequest(req) { return ['data', 'metadata', 'multiplebackenddata', 'multiplebackendmetadata'].includes(req.resourceType); } -function _respondWithHeaders(response, payload, extraHeaders, log, callback) { +function _respondWithHeaders(response, payload, extraHeaders, log, callback, statusCode = 200) { let body = ''; if (typeof payload === 'string') { body = payload; @@ -115,10 +123,10 @@ function _respondWithHeaders(response, payload, extraHeaders, log, callback) { // eslint-disable-next-line no-param-reassign response.serverAccessLog.endTurnAroundTime = process.hrtime.bigint(); } - response.writeHead(200, httpHeaders); + response.writeHead(statusCode, httpHeaders); response.end(body, 'utf8', () => { log.end().info('responded with payload', { - httpCode: 200, + httpCode: statusCode, contentLength: Buffer.byteLength(body), }); callback(); @@ -414,6 +422,32 @@ function putData(request, response, bucketInfo, objMd, log, callback) { log.error(errMessage); return callback(errorInstances.BadRequest.customizeDescription(errMessage)); } + + const incomingVersionIdEncoded = request.headers['x-scal-version-id']; + const decoded = incomingVersionIdEncoded ? decode(incomingVersionIdEncoded) : null; + const incomingVersionIdDecoded = decoded instanceof Error ? null : decoded; + if (incomingVersionIdDecoded && objMd && objMd.versionId === incomingVersionIdDecoded) { + // Skip the write if data is already at destination for this version id + // Return 409 with the existing microVersionId so backbeat can + // decide if putMetadata is still needed + log.debug('crr cascade putData: version already at destination', { + method: 'putData', + bucketName: request.bucketName, + objectKey: request.objectKey, + hasMicroVersionId: !!objMd.microVersionId, + }); + request.resume(); + return _respondWithHeaders( + response, + { code: VersionIdCollisionException.name, message: 'version id already at destination' }, + { 'x-scal-micro-version-id': objMd.microVersionId ? encode(objMd.microVersionId) : '' }, + log, + callback, + 409, + ); + } + + writeContinue(request, response); const context = { bucketName: request.bucketName, owner: canonicalID, @@ -539,6 +573,48 @@ function getCanonicalIdsByAccountId(accountId, log, cb) { } function putMetadata(request, response, bucketInfo, objMd, log, callback) { + const { bucketName, objectKey } = request; + + const encodedMicroVersionId = request.headers['x-scal-micro-version-id']; + const decoded = encodedMicroVersionId ? decode(encodedMicroVersionId) : null; + const incomingMicroVersionId = decoded instanceof Error ? null : decoded; + if (incomingMicroVersionId) { + const cmp = compareMicroVersionId(incomingMicroVersionId, objMd?.microVersionId); + if (cmp === Ordering.EQUAL) { + log.debug('crr cascade putMetadata: loop detected, skipping write', { + method: 'putMetadata', + bucketName, + objectKey, + }); + request.resume(); + return _respondWithHeaders( + response, + { code: MicroVersionIdAlreadyStoredException.name, + message: 'incoming microVersionId already at destination' }, + {}, + log, + callback, + 409, + ); + } + if (cmp === Ordering.OLDER) { + log.debug('crr cascade putMetadata: stale event, rejecting', { + method: 'putMetadata', + bucketName, + objectKey, + }); + request.resume(); + return _respondWithHeaders( + response, + { code: StaleMicroVersionIdException.name, message: 'incoming revision is older than destination' }, + { 'x-scal-micro-version-id': objMd?.microVersionId ? encode(objMd.microVersionId) : '' }, + log, + callback, + 409, + ); + } + } + return _getRequestPayload(request, (err, payload) => { if (err) { return callback(err); @@ -552,7 +628,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) { return callback(errors.MalformedPOSTRequest); } - const { headers, bucketName, objectKey } = request; + const { headers } = request; // Destination-side delete-marker replication. // We need the REPLICA status to distinguish from @@ -560,7 +636,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) { if ( omVal.isDeleteMarker && omVal.replicationInfo && - omVal.replicationInfo.status === 'REPLICA' && + (omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') && request.serverAccessLog ) { // eslint-disable-next-line no-param-reassign @@ -576,7 +652,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) { // The REPLICA status excludes source-side replication-status updates. if ( omVal.replicationInfo && - omVal.replicationInfo.status === 'REPLICA' && + (omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') && (omVal.originOp === 's3:ObjectTagging:Put' || omVal.originOp === 's3:ObjectTagging:Delete') && request.serverAccessLog ) { @@ -593,7 +669,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) { // The REPLICA status excludes source-side replication-status updates. if ( omVal.replicationInfo && - omVal.replicationInfo.status === 'REPLICA' && + (omVal.replicationInfo.isReplica === true || omVal.replicationInfo.status === 'REPLICA') && omVal.originOp === 's3:ObjectAcl:Put' && request.serverAccessLog ) { @@ -672,7 +748,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) { // then we want to create a version for the replica object even though // none was provided in the object metadata value. if (omVal.replicationInfo.isNFS) { - const { isReplica } = omVal.replicationInfo; + const isReplica = omVal.replicationInfo.isReplica === true + || omVal.replicationInfo.status === 'REPLICA'; versioning = isReplica; omVal.replicationInfo.isNFS = !isReplica; } @@ -724,6 +801,48 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) { options.isNull = isNull; } + // Cascade triggering + // If the bucket receiving this replica has its own CRR rules, set + // status to PENDING so the queue populator here picks it up for the + // next hop. If not, clear the source-side replicationInfo fields + // Always mark isReplica=true. + if (incomingMicroVersionId) { + const isMDOnly = headers['x-scal-replication-content'] === 'METADATA'; + const objSize = omVal['content-length'] || 0; + + // These S3-compatible Scality locations are excluded + // as cascade targets because they use the MultiBackend S3 path which + // bypasses the putData/putMetadata routes, so loop detection cannot fire + // on those destinations. + const BLOCKED_LOCATION_TYPES = ['location-scality-ring-s3-v1', 'location-scality-artesca-s3-v1']; + + const nextReplInfo = getReplicationInfo(config, objectKey, bucketInfo, isMDOnly, objSize, null, null, null); + + if (nextReplInfo) { + nextReplInfo.backends = nextReplInfo.backends.filter(b => { + const loc = config.locationConstraints[b.site]; + return !loc || !BLOCKED_LOCATION_TYPES.includes(loc.type); + }); + } + + if (nextReplInfo && nextReplInfo.backends.length > 0) { + omVal.replicationInfo = nextReplInfo; + } else { + omVal.replicationInfo = { + status: '', + backends: [], + content: [], + destination: '', + storageClass: '', + role: '', + storageType: '', + dataStoreVersionId: '', + }; + } + + omVal.replicationInfo.isReplica = true; + } + return async.series( [ // Zenko's CRR delegates replacing the account diff --git a/package.json b/package.json index ce60f45e2d..d6bedd5679 100644 --- a/package.json +++ b/package.json @@ -19,17 +19,17 @@ }, "homepage": "https://github.com/scality/S3#readme", "dependencies": { + "@aws-crypto/crc32": "^5.2.0", + "@aws-crypto/crc32c": "^5.2.0", "@aws-sdk/client-iam": "^3.930.0", "@aws-sdk/client-s3": "^3.1013.0", "@aws-sdk/client-sts": "^3.930.0", + "@aws-sdk/crc64-nvme-crt": "^3.989.0", "@aws-sdk/credential-providers": "^3.864.0", "@aws-sdk/middleware-retry": "^3.374.0", "@aws-sdk/protocol-http": "^3.374.0", "@aws-sdk/s3-request-presigner": "^3.901.0", "@aws-sdk/signature-v4": "^3.374.0", - "@aws-crypto/crc32": "^5.2.0", - "@aws-crypto/crc32c": "^5.2.0", - "@aws-sdk/crc64-nvme-crt": "^3.989.0", "@azure/storage-blob": "^12.28.0", "@hapi/joi": "^17.1.1", "@opentelemetry/api": "^1.9.0", @@ -65,11 +65,11 @@ "vaultclient": "scality/vaultclient#8.5.3", "werelogs": "scality/werelogs#8.2.2", "ws": "^8.18.0", + "@scality/cloudserverclient": "1.0.9", "xml2js": "^0.6.2" }, "devDependencies": { "@eslint/compat": "^1.2.2", - "@scality/cloudserverclient": "1.0.7", "@scality/eslint-config-scality": "scality/Guidelines#8.3.1", "eslint": "^9.14.0", "eslint-plugin-import": "^2.31.0", @@ -88,10 +88,10 @@ "nodemon": "^3.1.10", "nyc": "^15.1.0", "pino-pretty": "^13.1.3", + "prettier": "^3.4.2", "sinon": "^13.0.1", "ts-morph": "^28.0.0", - "tv4": "^1.3.0", - "prettier": "^3.4.2" + "tv4": "^1.3.0" }, "resolutions": { "jsonwebtoken": "^9.0.0", diff --git a/scripts/crr-cascade-test.ts b/scripts/crr-cascade-test.ts new file mode 100644 index 0000000000..1c4b6fa389 --- /dev/null +++ b/scripts/crr-cascade-test.ts @@ -0,0 +1,281 @@ +import { + S3Client, + CreateBucketCommand, + PutBucketVersioningCommand, + PutBucketReplicationCommand, + PutObjectCommand, + GetObjectCommand, + ListObjectsV2Command, +} from '@aws-sdk/client-s3'; +import { + IAMClient, + CreateRoleCommand, + GetRoleCommand, + CreatePolicyCommand, + AttachUserPolicyCommand, + AttachRolePolicyCommand, + ListPoliciesCommand, +} from '@aws-sdk/client-iam'; +import { NodeHttpHandler } from '@smithy/node-http-handler'; +import https from 'https'; + +// ─── location names as configured in the Artesca UI ────────────────────────── +// https://s3.workloadplane.scality.local +// 7MB2SGXF3LMZMCFZOD22 / zDKH+EKDlOKG6Ivmdewg9veQJ6FvqUzd+j5yKbT= +const LOCATION_B = 'cascade-b'; +// 2H6QL1SOWZBF0KGEBS1B / sODDxuUoeW+R3w4pKoZ5CyZGFGXhVv/cBfL98jUD +const LOCATION_C = 'cascade-c'; +// ───────────────────────────────────────────────────────────────────────────── + +const requestHandler = new NodeHttpHandler({ + httpsAgent: new https.Agent({ rejectUnauthorized: false }), +}); + +const clientOptions = { + endpoint: 'https://s3.workloadplane.scality.local', + region: 'us-east-1', + forcePathStyle: true, + requestHandler, +}; + +// cascade-user-a : existing admin user +// ak : 5I4ACS7S2AEJQ5F1EQ64 +// sk : NrCd1=tmjvmOKSJXC7C3xXWiX5KGzxh=NR07aY1e +const s3a = new S3Client({ + ...clientOptions, + credentials: { + accessKeyId: '5I4ACS7S2AEJQ5F1EQ64', + secretAccessKey: 'NrCd1=tmjvmOKSJXC7C3xXWiX5KGzxh=NR07aY1e', + }, +}); + +// cascade-user-b : ak=7MB2SGXF3LMZMCFZOD22, sk=zDKH+EKDlOKG6Ivmdewg9veQJ6FvqUzd+j5yKbT= +// cascade-user-c : ak=2H6QL1SOWZBF0KGEBS1B, sk=sODDxuUoeW+R3w4pKoZ5CyZGFGXhVv/cBfL98jUD + +const iam = new IAMClient({ + endpoint: 'https://iam.workloadplane.scality.local', + region: 'us-east-1', + credentials: { + accessKeyId: '5I4ACS7S2AEJQ5F1EQ64', + secretAccessKey: 'NrCd1=tmjvmOKSJXC7C3xXWiX5KGzxh=NR07aY1e', + }, + requestHandler, +}); + +// Strip headers that Vault doesn't support +iam.middlewareStack.add( + (next: any) => async (args: any) => { + const headers = args.request?.headers ?? {}; + for (const key of Object.keys(headers)) { + const lower = key.toLowerCase(); + if (lower.startsWith('x-amz-checksum') || + lower === 'x-amz-sdk-checksum-algorithm' || + lower === 'x-amz-trailer' || + lower === 'transfer-encoding') { + delete headers[key]; + } + } + if (args.request?.body && typeof args.request.body !== 'string') { + // Force body to string to avoid chunked encoding + const chunks: Buffer[] = []; + for await (const chunk of args.request.body) chunks.push(chunk); + args.request.body = Buffer.concat(chunks).toString(); + } + return next(args); + }, + { step: 'finalizeRequest', priority: 'low', name: 'stripProblematicHeaders' }, +); + +const REPLICATION_POLICY_DOC = JSON.stringify({ + Version: '2012-10-17', + Statement: [{ + Effect: 'Allow', + Action: ['s3:ReplicateObject'], + Resource: ['arn:aws:s3:::*/*'], + }], +}); + +function getErrorCode(e: any): string { + return e.name ?? e.Code ?? e.Error?.Code ?? ''; +} + +async function ensureRolePolicy(): Promise { + const policyName = 's3-replication-admin'; + let policyArn: string; + try { + const res = await iam.send(new CreatePolicyCommand({ + PolicyName: policyName, + PolicyDocument: JSON.stringify({ + Version: '2012-10-17', + Statement: [{ Effect: 'Allow', Action: 's3:*', Resource: '*' }], + }), + })); + policyArn = res.Policy!.Arn!; + console.log(` created policy ${policyName} (${policyArn})`); + } catch (e: any) { + if (!getErrorCode(e).includes('EntityAlreadyExists')) throw e; + const list = await iam.send(new ListPoliciesCommand({ Scope: 'Local' })); + const found = list.Policies?.find(p => p.PolicyName === policyName); + if (!found?.Arn) throw new Error(`policy ${policyName} exists but ARN not found`); + policyArn = found.Arn; + console.log(` exists policy ${policyName} (${policyArn})`); + } + return policyArn; +} + +async function ensureReplicationRole(): Promise { + const label = 'role s3-replication-role'; + let roleArn: string; + try { + const res = await iam.send(new CreateRoleCommand({ + RoleName: 's3-replication-role', + AssumeRolePolicyDocument: JSON.stringify({ + Version: '2012-10-17', + Statement: [{ Effect: 'Allow', Principal: { AWS: '*' }, Action: 'sts:AssumeRole' }], + }), + })); + roleArn = res.Role!.Arn!; + console.log(` created ${label} (${roleArn})`); + } catch (e: any) { + if (!getErrorCode(e).includes('EntityAlreadyExists')) throw e; + const res = await iam.send(new GetRoleCommand({ RoleName: 's3-replication-role' })); + roleArn = res.Role!.Arn!; + console.log(` exists ${label} (${roleArn})`); + } + const policyArn = await ensureRolePolicy(); + try { + await iam.send(new AttachRolePolicyCommand({ RoleName: 's3-replication-role', PolicyArn: policyArn })); + console.log(` attached policy s3-replication-admin → s3-replication-role`); + } catch (e: any) { + if (!getErrorCode(e).includes('EntityAlreadyExists')) throw e; + console.log(` exists policy s3-replication-admin → s3-replication-role`); + } + return roleArn; +} + +async function ensureUserPolicy(username: string, policyName: string) { + let policyArn: string; + try { + const res = await iam.send(new CreatePolicyCommand({ + PolicyName: policyName, + PolicyDocument: REPLICATION_POLICY_DOC, + })); + policyArn = res.Policy!.Arn!; + console.log(` created policy ${policyName}`); + } catch (e: any) { + if (!getErrorCode(e).includes('EntityAlreadyExists')) throw e; + const list = await iam.send(new ListPoliciesCommand({ Scope: 'Local' })); + const found = list.Policies?.find(p => p.PolicyName === policyName); + if (!found?.Arn) throw new Error(`policy ${policyName} exists but ARN not found`); + policyArn = found.Arn; + console.log(` exists policy ${policyName} (${policyArn})`); + } + try { + await iam.send(new AttachUserPolicyCommand({ UserName: username, PolicyArn: policyArn })); + console.log(` attached policy ${policyName} → ${username}`); + } catch (e: any) { + if (!getErrorCode(e).includes('EntityAlreadyExists')) throw e; + console.log(` exists policy ${policyName} → ${username}`); + } +} + +async function ensureBucket(bucket: string) { + try { + await s3a.send(new CreateBucketCommand({ Bucket: bucket })); + console.log(` created bucket ${bucket}`); + } catch (e: any) { + const code = getErrorCode(e); + if (!code.includes('BucketAlreadyOwnedByYou') && !code.includes('BucketAlreadyExists')) throw e; + console.log(` exists bucket ${bucket}`); + } + await s3a.send(new PutBucketVersioningCommand({ + Bucket: bucket, + VersioningConfiguration: { Status: 'Enabled' }, + })); +} + +async function putReplication(srcBucket: string, dstBucket: string, replicationLocation: string, roleArn: string) { + await s3a.send(new PutBucketReplicationCommand({ + Bucket: srcBucket, + ReplicationConfiguration: { + Role: `${roleArn},${roleArn}`, + Rules: [{ + Prefix: '', + Status: 'Enabled', + Destination: { + Bucket: `arn:aws:s3:::${dstBucket}`, + StorageClass: replicationLocation as never, + }, + }], + }, + })); + console.log(` set replication ${srcBucket} → ${dstBucket} (${replicationLocation})`); +} + +async function waitForObject(client: S3Client, bucket: string, key: string, timeoutMs = 60_000) { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + try { + await client.send(new GetObjectCommand({ Bucket: bucket, Key: key })); + console.log(`✓ object found in ${bucket}`); + return; + } catch { + await new Promise(r => setTimeout(r, 2000)); + } + } + throw new Error(`timeout: object not found in ${bucket} after ${timeoutMs / 1000}s`); +} + +async function setup() { + console.log('[IAM]'); + const roleArn = await ensureReplicationRole(); + await ensureUserPolicy('cascade-user-b', 'crr-cascade-b'); + await ensureUserPolicy('cascade-user-c', 'crr-cascade-c'); + + console.log('[buckets]'); + await ensureBucket('crr-cascade-a'); + await ensureBucket('crr-cascade-b'); + await ensureBucket('crr-cascade-c'); + + console.log('[replication]'); + await putReplication('crr-cascade-a', 'crr-cascade-b', LOCATION_B, roleArn); + await putReplication('crr-cascade-b', 'crr-cascade-c', LOCATION_C, roleArn); + + console.log('\nsetup complete — run: bun crr-cascade-test.ts test'); +} + +async function test() { + const key = `test-object-${Date.now()}`; + + await s3a.send(new PutObjectCommand({ + Bucket: 'crr-cascade-a', + Key: key, + Body: 'hello from crr-cascade test', + ContentType: 'text/plain', + })); + console.log(`uploaded ${key} to crr-cascade-a, waiting for cascade...`); + + await waitForObject(s3a, 'crr-cascade-b', key); + await waitForObject(s3a, 'crr-cascade-c', key); + + console.log('cascade verified: a → b → c'); +} + +async function status() { + for (const bucket of ['crr-cascade-a', 'crr-cascade-b', 'crr-cascade-c']) { + const client = s3a; + const res = await client.send(new ListObjectsV2Command({ Bucket: bucket })); + const objects = res.Contents ?? []; + console.log(`${bucket}: ${objects.length} object(s)`); + for (const o of objects) console.log(` ${o.Key} (${o.LastModified?.toISOString()})`); + } +} + +const cmd = process.argv[2]; +if (cmd === 'test') { + test().catch(console.error); +} else if (cmd === 'status') { + status().catch(console.error); +} else { + setup().catch(console.error); +} diff --git a/tests/functional/backbeat/crrCascade.js b/tests/functional/backbeat/crrCascade.js new file mode 100644 index 0000000000..8182995cb2 --- /dev/null +++ b/tests/functional/backbeat/crrCascade.js @@ -0,0 +1,354 @@ +'use strict'; + +const assert = require('assert'); +const { createHash } = require('crypto'); +const { v4: uuidv4 } = require('uuid'); +const { + CreateBucketCommand, + PutBucketReplicationCommand, + PutBucketVersioningCommand, + PutObjectCommand, + PutObjectTaggingCommand, +} = require('@aws-sdk/client-s3'); + +const { versioning, models } = require('arsenal'); +const { ObjectMD } = models; +const BucketUtility = require('../aws-node-sdk/lib/utility/bucket-util'); + +const { + BackbeatRoutesClient, + GetMetadataCommand, + PutMetadataCommand, + PutDataCommand, + VersionIdCollisionException, + StaleMicroVersionIdException, + MicroVersionIdAlreadyStoredException, +} = require('@scality/cloudserverclient'); + +const { generateVersionId, encode: encodeVersionId } = versioning.VersionID; + +const TEST_BUCKET = `bucket-crr-cascade-${uuidv4().split('-')[0]}`; +const TEST_BUCKET_CRR = `bucket-crr-cascade-crr-${uuidv4().split('-')[0]}`; +const DEST_BUCKET = `bucket-crr-cascade-dest-${uuidv4().split('-')[0]}`; +const OBJECT_BODY = 'imAboutToBeCascadedWitNoParachuteInMyBack'; +const OBJECT_MD5_HEX = createHash('md5').update(OBJECT_BODY).digest('hex'); +const CANONICAL_ID = '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be'; +const bucketUtil = new BucketUtility('default', {}); +const s3 = bucketUtil.s3; + +let backbeatClient; + +function makeMicroVersionId() { + const raw = generateVersionId('test-instance', 'RG001'); + return { raw, encoded: encodeVersionId(raw) }; +} + +function buildMetadataBody(overrides) { + const obj = Object.assign({ + 'content-length': Buffer.byteLength(OBJECT_BODY), + 'content-type': 'text/plain', + 'last-modified': new Date().toISOString(), + 'x-amz-version-id': 'null', + 'owner-id': CANONICAL_ID, + 'owner-display-name': 'test', + 'content-md5': OBJECT_MD5_HEX, + replicationInfo: { + status: 'REPLICA', + isReplica: true, + backends: [], + content: [], + destination: '', + storageClass: '', + role: '', + storageType: '', + dataStoreVersionId: '', + }, + }, overrides || {}); + return Buffer.from(JSON.stringify(obj)); +} + +async function putMetadata(key, mvId) { + const bodyOverrides = mvId + ? { microVersionId: mvId.raw } + : {}; + return backbeatClient.send(new PutMetadataCommand({ + Bucket: TEST_BUCKET, + Key: key, + MicroVersionId: mvId ? mvId.encoded : undefined, + Body: buildMetadataBody(bodyOverrides), + })); +} + +async function putData(key, { versionId } = {}) { + return backbeatClient.send(new PutDataCommand({ + Bucket: TEST_BUCKET, + Key: key, + ContentMD5: OBJECT_MD5_HEX, + CanonicalID: CANONICAL_ID, + VersioningRequired: true, + VersionId: versionId || undefined, + Body: Buffer.from(OBJECT_BODY), + })); +} + +before(async () => { + const creds = await s3.config.credentials(); + backbeatClient = new BackbeatRoutesClient({ + endpoint: `http://${process.env.IP || '127.0.0.1'}:8000`, + region: 'us-east-1', + credentials: { + accessKeyId: creds.accessKeyId, + secretAccessKey: creds.secretAccessKey, + }, + forcePathStyle: true, + }); + + await s3.send(new CreateBucketCommand({ Bucket: TEST_BUCKET })); + await s3.send(new PutBucketVersioningCommand({ + Bucket: TEST_BUCKET, + VersioningConfiguration: { Status: 'Enabled' }, + })); + + await s3.send(new CreateBucketCommand({ Bucket: DEST_BUCKET })); + await s3.send(new PutBucketVersioningCommand({ + Bucket: DEST_BUCKET, + VersioningConfiguration: { Status: 'Enabled' }, + })); + await s3.send(new CreateBucketCommand({ Bucket: TEST_BUCKET_CRR })); + await s3.send(new PutBucketVersioningCommand({ + Bucket: TEST_BUCKET_CRR, + VersioningConfiguration: { Status: 'Enabled' }, + })); + await s3.send(new PutBucketReplicationCommand({ + Bucket: TEST_BUCKET_CRR, + ReplicationConfiguration: { + Role: 'arn:aws:iam::account-id:role/src-resource,' + + 'arn:aws:iam::account-id:role/dest-resource', + Rules: [{ + Status: 'Enabled', + Prefix: '', + Destination: { + Bucket: `arn:aws:s3:::${DEST_BUCKET}`, + StorageClass: 'zenko', + }, + }], + }, + })); +}); + +describe('CRR cascade : putMetadata', () => { + it('should succeed on first write when destination has no microVersionId', async () => { + const key = 'crr-cascade-md-first-write'; + const mvId = makeMicroVersionId(); + await putMetadata(key, mvId); + const { Body } = await backbeatClient.send(new GetMetadataCommand({ Bucket: TEST_BUCKET, Key: key })); + assert.strictEqual(new ObjectMD(JSON.parse(Body)).getMicroVersionId(), mvId.raw); + }); + + it('should throw MicroVersionIdAlreadyStoredException on second write with the same microVersionId', async () => { + const key = 'crr-cascade-md-loop'; + const mvId = makeMicroVersionId(); + await putMetadata(key, mvId); + await assert.rejects( + () => putMetadata(key, mvId), + err => { + assert.ok(err instanceof MicroVersionIdAlreadyStoredException, + 'second write with same id should throw MicroVersionIdAlreadyStoredException'); + return true; + }, + ); + }); + + it('should return 409 when writing with an older microVersionId', async () => { + const key = 'crr-cascade-md-stale'; + const olderMvId = makeMicroVersionId(); + const newerMvId = makeMicroVersionId(); + await putMetadata(key, newerMvId); + try { + await putMetadata(key, olderMvId); + assert.fail('expected StaleMicroVersionIdException'); + } catch (err) { + assert.ok(err instanceof StaleMicroVersionIdException, + `expected StaleMicroVersionIdException, got ${err.constructor.name}`); + } + }); + + it('should return 409 when writing with an older microVersionId after objectPutTagging bumped it', async () => { + const key = 'crr-cascade-md-stale-tagging'; + + await s3.send(new PutObjectCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + Body: Buffer.from(OBJECT_BODY), + ContentType: 'text/plain', + })); + + const olderMvId = makeMicroVersionId(); + + // ObjectPutTagging calls bumpMicroVersionId which should set a newer microVersionId + await s3.send(new PutObjectTaggingCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + Tagging: { TagSet: [{ Key: 'crr', Value: 'cascade' }] }, + })); + + const { Body } = await backbeatClient.send(new GetMetadataCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + })); + const storedMd = new ObjectMD(JSON.parse(Body)); + assert.ok(storedMd.getMicroVersionId(), + 'objectPutTagging should have set a microVersionId on an object with replicationInfo'); + + try { + await backbeatClient.send(new PutMetadataCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + MicroVersionId: olderMvId.encoded, + Body: buildMetadataBody({ microVersionId: olderMvId.raw }), + })); + assert.fail('expected StaleMicroVersionIdException'); + } catch (err) { + assert.ok(err instanceof StaleMicroVersionIdException, + `expected StaleMicroVersionIdException, got ${err.constructor.name}`); + } + }); +}); + +describe('CRR cascade : putData', () => { + it('should throw VersionIdCollisionException with MicroVersionId when versionId matches current master', + async () => { + const key = 'crr-cascade-data-version-collision'; + const putResult = await s3.send(new PutObjectCommand({ + Bucket: TEST_BUCKET, + Key: key, + Body: Buffer.from(OBJECT_BODY), + ContentType: 'text/plain', + })); + const encodedVersionId = putResult.VersionId; + assert.ok(encodedVersionId, 'PutObject should return a VersionId'); + + // Set a microVersionId on that version via backbeat putMetadata. + const mvId = makeMicroVersionId(); + await backbeatClient.send(new PutMetadataCommand({ + Bucket: TEST_BUCKET, + Key: key, + VersionId: encodedVersionId, + MicroVersionId: mvId.encoded, + Body: buildMetadataBody({ microVersionId: mvId.raw }), + })); + + try { + await putData(key, { versionId: encodedVersionId }); + assert.fail('expected VersionIdCollisionException'); + } catch (err) { + assert.ok(err instanceof VersionIdCollisionException, + `expected VersionIdCollisionException, got ${err.constructor.name}`); + + const { Body } = await backbeatClient.send(new GetMetadataCommand({ + Bucket: TEST_BUCKET, + Key: key, + })); + const storedMd = new ObjectMD(JSON.parse(Body)); + const { decode } = versioning.VersionID; + const decoded = decode(err.microVersionId); + assert.ok(!(decoded instanceof Error), + 'microVersionId in exception should be a decodable versionId'); + assert.strictEqual(decoded, storedMd.getMicroVersionId(), + 'decoded microVersionId should match the stored microVersionId'); + } + }); + + it('should write data normally when VersionId does not match the current master', async () => { + const key = 'crr-cascade-data-version-no-match'; + + const putResult = await s3.send(new PutObjectCommand({ + Bucket: TEST_BUCKET, + Key: key, + Body: Buffer.from(OBJECT_BODY), + ContentType: 'text/plain', + })); + assert.ok(putResult.VersionId, 'PutObject should return a VersionId'); + + const output = await putData(key, { versionId: 'totally-different-version-id' }); + assert.ok(output.Location, + 'should return a Location because data was written normally'); + }); +}); + +describe('CRR cascade : no errors', () => { + it('should succeed and store updated metadata when putMetadata microVersionId is newer', async () => { + const key = 'crr-cascade-md-forward'; + const olderMvId = makeMicroVersionId(); + const newerMvId = makeMicroVersionId(); + + await putMetadata(key, olderMvId); + await putMetadata(key, newerMvId); + + const { Body } = await backbeatClient.send(new GetMetadataCommand({ + Bucket: TEST_BUCKET, + Key: key, + })); + const storedMd = new ObjectMD(JSON.parse(Body)); + assert.strictEqual(storedMd.getMicroVersionId(), newerMvId.raw, + 'stored microVersionId should be the newer one'); + + assert.strictEqual(storedMd.getReplicationStatus(), '', + 'replication status should be cleared when no further replication is configured'); + assert.deepStrictEqual(storedMd.getReplicationBackends(), [], + 'replication backends should be empty when no further replication is configured'); + assert.strictEqual(storedMd.getReplicationIsReplica(), true, + 'isReplica should be preserved regardless of further replication config'); + }); + +}); + +describe('CRR cascade : cascade setup for next location', () => { + it('should set replication status to PENDING and preserve isReplica when bucket has CRR rules', async () => { + const key = 'crr-cascade-next-hop'; + const olderMvId = makeMicroVersionId(); + const newerMvId = makeMicroVersionId(); + + await backbeatClient.send(new PutMetadataCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + MicroVersionId: olderMvId.encoded, + Body: buildMetadataBody({ microVersionId: olderMvId.raw }), + })); + await backbeatClient.send(new PutMetadataCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + MicroVersionId: newerMvId.encoded, + Body: buildMetadataBody({ microVersionId: newerMvId.raw }), + })); + + const { Body } = await backbeatClient.send(new GetMetadataCommand({ + Bucket: TEST_BUCKET_CRR, + Key: key, + })); + const storedMd = new ObjectMD(JSON.parse(Body)); + assert.strictEqual(storedMd.getMicroVersionId(), newerMvId.raw, + 'stored microVersionId should be the newer one'); + + assert.strictEqual(storedMd.getReplicationStatus(), 'PENDING', + 'replication status should be PENDING when a CRR rule matches'); + assert.ok(storedMd.getReplicationBackends().length > 0, + 'replication backends should be populated when a CRR rule matches'); + assert.strictEqual(storedMd.getReplicationIsReplica(), true, + 'isReplica should be preserved regardless of further replication config'); + }); +}); + +describe('CRR cascade : baseline (no cascade headers)', () => { + it('should succeed normally when putData has no VersionId header', async () => { + const key = 'crr-cascade-baseline-data'; + const output = await putData(key); + assert.ok(output.Location, + 'putData without VersionId should return a Location'); + }); + + it('should succeed normally when putMetadata has no MicroVersionId header', async () => { + const key = 'crr-cascade-baseline-md'; + await putMetadata(key, null); + }); +}); diff --git a/tests/unit/routes/routeBackbeat.js b/tests/unit/routes/routeBackbeat.js index 6a581bfb82..b119e3f369 100644 --- a/tests/unit/routes/routeBackbeat.js +++ b/tests/unit/routes/routeBackbeat.js @@ -20,6 +20,7 @@ const bucketPutVersioning = require('../../../lib/api/bucketPutVersioning'); const objectPut = require('../../../lib/api/objectPut'); const { objectDelete } = require('../../../lib/api/objectDelete'); const bucketPutPolicy = require('../../../lib/api/bucketPutPolicy'); +const { versioning } = require('arsenal'); const log = new DummyRequestLogger(); @@ -180,6 +181,47 @@ describe('routeBackbeat', () => { assert.deepStrictEqual(mockResponse.body, [{}]); }); + it('should return 409 VersionIdCollisionException when versionId matches master, no microVersionId', async () => { + // Old objects without microVersionId: data is already at destination. + // Cloudserver returns 409 VersionIdCollisionException with empty x-scal-micro-version-id. + // Backbeat sees empty MicroVersionId and proceeds to putMetadata (partAlreadyAtDest). + const rawVersionId = versioning.VersionID.generateVersionId('test', 'RG001'); + const encodedVersionId = versioning.VersionID.encode(rawVersionId); + + mockRequest = prepareDummyRequest({ + 'x-scal-canonical-id': 'id', + 'content-md5': '1234', + 'content-length': '0', + 'x-scal-versioning-required': 'true', + 'x-scal-version-id': encodedVersionId, + }); + mockRequest.method = 'PUT'; + mockRequest.url = '/_/backbeat/data/bucket0/key0'; + mockRequest.destroy = () => {}; + + metadataUtils.standardMetadataValidateBucketAndObj.callsFake( + (params, denies, log, callback) => { + callback(null, { + getVersioningConfiguration: () => ({ Status: 'Enabled' }), + isVersioningEnabled: () => true, + getLocationConstraint: () => undefined, + }, { + versionId: rawVersionId, + // no microVersionId — old-format object + }); + }); + + routeBackbeat('127.0.0.1', mockRequest, mockResponse, log); + void await endPromise; + + sinon.assert.notCalled(storeObject.dataStore); + assert.strictEqual(mockResponse.statusCode, 409); + assert.strictEqual(mockResponse.body.code, 'VersionIdCollisionException'); + const [, responseHeaders] = mockResponse.writeHead.firstCall.args; + assert.strictEqual(responseHeaders['x-scal-micro-version-id'], '', + 'should have empty x-scal-micro-version-id header for old-format objects'); + }); + describe('putMetadata', () => { const bucketInfo = { getVersioningConfiguration: () => ({ Status: 'Enabled' }), diff --git a/yarn.lock b/yarn.lock index 2cedf76401..eaea38970b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3662,14 +3662,15 @@ resolved "https://registry.yarnpkg.com/@rtsao/scc/-/scc-1.1.0.tgz#927dd2fae9bc3361403ac2c7a00c32ddce9ad7e8" integrity sha512-zt6OdqaDoOnJ1ZYsCYGt9YmWzDXl4vQdKTyJev62gFhRGKdx7mcT54V9KIjg+d2wi9EXsPvAPKe7i7WjfVWB8g== -"@scality/cloudserverclient@1.0.7": - version "1.0.7" - resolved "https://registry.yarnpkg.com/@scality/cloudserverclient/-/cloudserverclient-1.0.7.tgz#ee9eed09cc7da5e97d5ad8359f429218a0a30859" - integrity sha512-gMDtI/ufRDVqWJlYkvqxXRfZOChBCw9uXt2lsaIvMuiqx/pDTNZMVLfPyRN5vx0tb6HP+5ZAe2FyPdtKXG24ng== +"@scality/cloudserverclient@1.0.9": + version "1.0.9" + resolved "https://registry.yarnpkg.com/@scality/cloudserverclient/-/cloudserverclient-1.0.9.tgz#0a333e39436e1f1e74279c3b5a11645be1fffbc1" + integrity sha512-ZGqH4G535opDAEP2PxdYDFG7kjZ/eBrzP3ZmO49goFVjRhyDCZ1gT0Pask3/wZcyysZ2Au1qylorZSLPiZmB2A== dependencies: "@aws-sdk/client-s3" "^3.1009.0" + "@aws-sdk/middleware-expect-continue" "^3.972.8" JSONStream "^1.3.5" - fast-xml-parser "^4.3.2" + fast-xml-parser "^5.5.7" "@scality/eslint-config-scality@scality/Guidelines#8.3.1": version "8.3.1" @@ -6174,7 +6175,7 @@ resolved "https://registry.yarnpkg.com/@types/node/-/node-25.9.3.tgz#11dfe7a33e68fa5c560f0aa76cc5595621ef26b9" integrity sha512-603BddQMv3pUcr4U2dhujk83N2tTDVr/34wII2B6bJy6g+8WD6yUb11jszNs0gdi4PesVWl7ABt8nYMVpnLUcg== dependencies: - undici-types ">=7.24.0 <7.24.7" + undici-types "~8.3.0" "@types/triple-beam@^1.3.2": version "1.3.5" @@ -8312,7 +8313,7 @@ fast-xml-builder@^1.1.4: dependencies: path-expression-matcher "^1.1.3" -fast-xml-parser@5.2.5, fast-xml-parser@5.3.6, fast-xml-parser@5.5.6, fast-xml-parser@^4.3.2, fast-xml-parser@^5.0.7, fast-xml-parser@^5.5.6: +fast-xml-parser@5.2.5, fast-xml-parser@5.3.6, fast-xml-parser@5.5.6, fast-xml-parser@^5.0.7, fast-xml-parser@^5.5.6, fast-xml-parser@^5.5.7: version "5.5.7" resolved "https://registry.yarnpkg.com/fast-xml-parser/-/fast-xml-parser-5.5.7.tgz#e1ddc86662d808450a19cf2fb6ccc9c3c9933c5d" integrity sha512-LteOsISQ2GEiDHZch6L9hB0+MLoYVLToR7xotrzU0opCICBkxOPgHAy1HxAvtxfJNXDJpgAsQN30mkrfpO2Prg== @@ -12649,16 +12650,16 @@ undefsafe@^2.0.5: resolved "https://registry.yarnpkg.com/undefsafe/-/undefsafe-2.0.5.tgz#38733b9327bdcd226db889fb723a6efd162e6e2c" integrity sha512-WxONCrssBM8TSPRqN5EmsjVrsv4A8X12J4ArBiiayv3DyyG3ZlIg6yysuuSYdZsVz3TKcTg2fd//Ujd4CHV1iA== -"undici-types@>=7.24.0 <7.24.7": - version "7.24.6" - resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-7.24.6.tgz#61275b485d7fd4e9d269c7cf04ec2873c9cc0f91" - integrity sha512-WRNW+sJgj5OBN4/0JpHFqtqzhpbnV0GuB+OozA9gCL7a993SmU+1JBZCzLNxYsbMfIeDL+lTsphD5jN5N+n0zg== - undici-types@~6.20.0: version "6.20.0" resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-6.20.0.tgz#8171bf22c1f588d1554d55bf204bc624af388433" integrity sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg== +undici-types@~8.3.0: + version "8.3.0" + resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-8.3.0.tgz#44e9fc9f3244648cdea35e4f9bb2d681e9410809" + integrity sha512-j375ScV60dom+YkPFIfTLcOiPxkN/buHz5GobjLhixFuANaNs3C9l4GmrWqejgXWJ7BbJcFYpTEUkS1Ge8bpZQ== + unique-filename@^4.0.0: version "4.0.0" resolved "https://registry.yarnpkg.com/unique-filename/-/unique-filename-4.0.0.tgz#a06534d370e7c977a939cd1d11f7f0ab8f1fed13"