diff --git a/handwritten/storage/src/bucket.ts b/handwritten/storage/src/bucket.ts index 7f80f1c43603..3dfad4204916 100644 --- a/handwritten/storage/src/bucket.ts +++ b/handwritten/storage/src/bucket.ts @@ -29,6 +29,7 @@ import * as http from 'http'; import * as path from 'path'; import {promisify} from 'util'; import AsyncRetry from 'async-retry'; +import {randomUUID} from 'crypto'; import {convertObjKeysToSnakeCase, handleContextValidation} from './util.js'; import {Acl, AclMetadata} from './acl.js'; @@ -4511,6 +4512,7 @@ class Bucket extends ServiceObject { optionsOrCallback?: UploadOptions | UploadCallback, callback?: UploadCallback, ): Promise | void { + const persistentInvocationId = randomUUID(); const upload = (numberOfRetries: number | undefined) => { const returnValue = AsyncRetry( async (bail: (err: GaxiosError | Error) => void) => { @@ -4521,7 +4523,10 @@ class Bucket extends ServiceObject { ) { newFile.storage.retryOptions.autoRetry = false; } - const writable = newFile.createWriteStream(options); + const writable = newFile.createWriteStream({ + ...options, + invocationId: persistentInvocationId, + }); if (options.onUploadProgress) { writable.on('progress', options.onUploadProgress); } diff --git a/handwritten/storage/src/file.ts b/handwritten/storage/src/file.ts index aec3cd9129e6..4498d9d393b5 100644 --- a/handwritten/storage/src/file.ts +++ b/handwritten/storage/src/file.ts @@ -27,6 +27,7 @@ import * as resumableUpload from './resumable-upload.js'; import {Writable, Readable, pipeline, Transform, PipelineSource} from 'stream'; import * as zlib from 'zlib'; import * as http from 'http'; +import {randomUUID} from 'crypto'; import { ExceptionMessages, @@ -248,6 +249,7 @@ export interface CreateResumableUploadOptions * @see {@link CRC32C.from} for possible values. */ resumeCRC32C?: Parameters<(typeof CRC32C)['from']>[0]; + invocationId?: string; preconditionOpts?: PreconditionOptions; [GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY]; } @@ -4218,13 +4220,17 @@ class File extends ServiceObject { ) { maxRetries = 0; } + const persistentInvocationId = randomUUID(); const returnValue = AsyncRetry( async (bail: (err: Error) => void) => { return new Promise((resolve, reject) => { if (maxRetries === 0) { this.storage.retryOptions.autoRetry = false; } - const writable = this.createWriteStream(options); + const writable = this.createWriteStream({ + ...options, + invocationId: persistentInvocationId, + }); if (options.onUploadProgress) { writable.on('progress', options.onUploadProgress); @@ -4486,6 +4492,7 @@ class File extends ServiceObject { chunkSize: options?.chunkSize, highWaterMark: options?.highWaterMark, universeDomain: this.bucket.storage.universeDomain, + invocationId: options.invocationId, [GCCL_GCS_CMD_KEY]: options[GCCL_GCS_CMD_KEY], }; @@ -4545,6 +4552,7 @@ class File extends ServiceObject { uploadType: 'multipart', }, url, + invocationId: options.invocationId, [GCCL_GCS_CMD_KEY]: options[GCCL_GCS_CMD_KEY], method: 'POST', responseType: 'json', diff --git a/handwritten/storage/src/storage-transport.ts b/handwritten/storage/src/storage-transport.ts index 49226013218c..6ac3d7d24026 100644 --- a/handwritten/storage/src/storage-transport.ts +++ b/handwritten/storage/src/storage-transport.ts @@ -49,6 +49,7 @@ export interface StorageQueryParameters extends StandardStorageQueryParams { export interface StorageRequestOptions extends GaxiosOptions { [GCCL_GCS_CMD_KEY]?: string; + invocationId?: string; interceptors?: GaxiosInterceptor[]; autoPaginate?: boolean; autoPaginateVal?: boolean; @@ -254,7 +255,7 @@ export class StorageTransport { } #prepareHeaders(reqOpts: StorageRequestOptions): Record { - const headersObj = this.#buildRequestHeaders(reqOpts.headers); + const headersObj = this.#buildRequestHeaders(reqOpts); if (reqOpts[GCCL_GCS_CMD_KEY]) { const current = headersObj.get('x-goog-api-client') || ''; @@ -299,12 +300,13 @@ export class StorageTransport { return searchParams.toString(); }; - #buildRequestHeaders(requestHeaders = {}) { - const headers = new Headers(requestHeaders); + #buildRequestHeaders(reqOpts: StorageRequestOptions) { + const headers = new Headers(reqOpts.headers); headers.set('User-Agent', this.#getUserAgentString()); + const invocationId = reqOpts.invocationId || randomUUID(); headers.set( 'x-goog-api-client', - `${getRuntimeTrackingString()} gccl/${this.packageJson.version}-${getModuleFormat()} gccl-invocation-id/${randomUUID()}`, + `${getRuntimeTrackingString()} gccl/${this.packageJson.version}-${getModuleFormat()} gccl-invocation-id/${invocationId}`, ); return headers; } diff --git a/handwritten/storage/system-test/storage.ts b/handwritten/storage/system-test/storage.ts index 3ab297a15fc2..38f578987198 100644 --- a/handwritten/storage/system-test/storage.ts +++ b/handwritten/storage/system-test/storage.ts @@ -3247,6 +3247,42 @@ describe('storage', function () { assert.strictEqual(called, true); }); + + it('should maintain the same invocationId across the upload lifecycle', async () => { + const invocationIds: string[] = []; + + const originalRequest = bucket.storageTransport.authClient.request.bind( + bucket.storageTransport.authClient, + ); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + bucket.storageTransport.authClient.request = async (config: any) => { + const headers = config.headers || {}; + const apiHeaderKey = Object.keys(headers).find( + key => key.toLowerCase() === 'x-goog-api-client', + ); + + if (apiHeaderKey) { + const val = headers[apiHeaderKey]; + const match = val.match(/gccl-invocation-id\/([a-f0-9-]+)/); + if (match) { + invocationIds.push(match[1]); + } + } + return originalRequest(config); + }; + + try { + const destination = `test-id-${Date.now()}.txt`; + await bucket.upload(FILES.big.path, {destination, resumable: false}); + + assert.ok(invocationIds.length >= 1); + const uniqueIds = [...new Set(invocationIds)]; + assert.strictEqual(uniqueIds.length, 1); + } finally { + bucket.storageTransport.authClient.request = originalRequest; + } + }); }); describe('channels', () => { diff --git a/handwritten/storage/test/bucket.ts b/handwritten/storage/test/bucket.ts index b3ee5ac830b5..d60aaf06dc80 100644 --- a/handwritten/storage/test/bucket.ts +++ b/handwritten/storage/test/bucket.ts @@ -2880,6 +2880,55 @@ describe('Bucket', () => { done(); }); }); + + it('should use the same invocationId across retries in a multipart upload', done => { + const fakeFile = new File(bucket, 'file-name'); + const options = { + destination: fakeFile, + resumable: false, + preconditionOpts: {ifGenerationMatch: 123}, + }; + let retryCount = 0; + let firstInvocationId: string | undefined; + + bucket.storage.retryOptions.autoRetry = true; + bucket.storage.retryOptions.maxRetries = 2; + bucket.storage.retryOptions.idempotencyStrategy = 1; + bucket.storage.retryOptions.retryableErrorFn = () => true; + + fakeFile.createWriteStream = (options_: CreateWriteStreamOptions) => { + retryCount++; + const currentId = options_.invocationId; + + if (retryCount === 1) { + firstInvocationId = currentId; + } else { + assert.strictEqual(currentId, firstInvocationId); + } + + const ws = new stream.PassThrough(); + ws.resume(); + + setImmediate(() => { + if (retryCount === 1) { + const error = new Error('Retryable failure') as GaxiosError; + error.code = 500; + error.status = 500; + ws.destroy(error); + } else { + ws.emit('metadata', {}); + } + }); + + return ws as any; + }; + + bucket.upload(filepath, options, err => { + assert.ifError(err); + assert.strictEqual(retryCount, 2); + done(); + }); + }); }); it('should allow overriding content type', done => { diff --git a/handwritten/storage/test/file.ts b/handwritten/storage/test/file.ts index 3bd0e5f865af..f3279e9f176d 100644 --- a/handwritten/storage/test/file.ts +++ b/handwritten/storage/test/file.ts @@ -4583,26 +4583,29 @@ describe('File', () => { }); }); - it('should accept an options object', done => { - const options = {}; + it('should accept an options object', async () => { + const options = {resumable: false}; sandbox.stub(file, 'createWriteStream').callsFake(options_ => { - assert.strictEqual(options_, options); - setImmediate(done); - return new PassThrough(); + assert.strictEqual(options_?.resumable, options.resumable); + assert.ok(options_?.invocationId); + const ws = new PassThrough(); + setImmediate(() => ws.emit('finish')); + return ws; }); - file.save(DATA, options, assert.ifError); + await file.save(DATA, options, assert.ifError); }); - it('should not require options', done => { + it('should not require options', async () => { sandbox.stub(file, 'createWriteStream').callsFake(options_ => { - assert.deepStrictEqual(options_, {}); - setImmediate(done); - return new PassThrough(); + assert.ok(options_?.invocationId); + const ws = new PassThrough(); + setImmediate(() => ws.emit('finish')); + return ws; }); - file.save(DATA, assert.ifError); + await file.save(DATA, assert.ifError); }); it('should register the error listener', done => { @@ -4655,6 +4658,22 @@ describe('File', () => { file.save(DATA, assert.ifError); }); + + it('should generate a single invocationId and pass it to createWriteStream', async () => { + const options = {resumable: false}; + const createWriteStreamStub = sandbox + .stub(file, 'createWriteStream') + .callsFake(() => { + return new DelayedStreamNoError(); + }); + + await file.save(DATA, options); + + // Verify createWriteStream was called with an invocationId + const calledOptions = createWriteStreamStub.firstCall.args[0]; + assert.ok(calledOptions?.invocationId); + assert.strictEqual(typeof calledOptions?.invocationId, 'string'); + }); }); describe('setMetadata', () => { @@ -5219,6 +5238,22 @@ describe('File', () => { }); assert.strictEqual(file.storage.retryOptions.autoRetry, true); }); + + it('should pass the invocationId to the resumable upload configuration', done => { + const options = { + invocationId: 'resumable-persistent-id', + }; + + const resumableUpload = require('../src/resumable-upload'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sandbox.stub(resumableUpload, 'upload').callsFake((cfg: any) => { + assert.strictEqual(cfg.invocationId, options.invocationId); + setImmediate(done); + return new PassThrough(); + }); + + file.startResumableUpload_(duplexify(), options); + }); }); }); @@ -5335,6 +5370,25 @@ describe('File', () => { await file.startSimpleUpload_(duplexify(), options); }); + it('should pass the invocationId to the storageTransport', async () => { + const options = { + invocationId: 'test-uuid-1234', + userProject: 'user-project-id', + }; + file.storageTransport.makeRequest = sandbox + .stub() + .callsFake((options_: StorageRequestOptions) => { + assert.strictEqual( + options_.queryParameters?.userProject, + options.userProject, + ); + assert.strictEqual(options_.invocationId, options.invocationId); + }) + .resolves({}); + + await file.startSimpleUpload_(duplexify(), options); + }); + describe('request', () => { describe('error', () => { const error = new Error('Error.'); diff --git a/handwritten/storage/test/storage-transport.ts b/handwritten/storage/test/storage-transport.ts index d1282eec13bd..52c7e4ab6b69 100644 --- a/handwritten/storage/test/storage-transport.ts +++ b/handwritten/storage/test/storage-transport.ts @@ -22,7 +22,7 @@ import sinon from 'sinon'; import assert from 'assert'; import {GCCL_GCS_CMD_KEY} from '../src/nodejs-common/util'; import {RETRYABLE_ERR_FN_DEFAULT} from '../src/storage'; -import {Gaxios} from 'gaxios'; +import {Gaxios, GaxiosResponse} from 'gaxios'; describe('Storage Transport', () => { let sandbox: sinon.SinonSandbox; @@ -189,6 +189,53 @@ describe('Storage Transport', () => { assert.ok(transport.authClient instanceof GoogleAuth); }); + it('should use the provided invocationId in x-goog-api-client header', async () => { + const invocationId = 'manual-id-5678'; + const mockResponse = { + config: {}, + data: {}, + headers: {}, + status: 200, + statusText: 'OK', + request: {}, + } as unknown as GaxiosResponse; + + const requestStub = transport.authClient.request as sinon.SinonStub; + requestStub.resolves(mockResponse); + + await transport.makeRequest({ + url: 'http://test', + invocationId: invocationId, + }); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const headers = requestStub.firstCall.args[0].headers as any; + const apiClientHeader = headers['x-goog-api-client']; + + assert.ok(apiClientHeader.includes(`gccl-invocation-id/${invocationId}`)); + }); + + it('should generate a new random ID if none is provided', async () => { + const mockResponse = { + config: {}, + data: {}, + headers: {}, + status: 200, + statusText: 'OK', + } as GaxiosResponse; + const requestStub = transport.authClient.request as sinon.SinonStub; + requestStub.resolves(mockResponse); + + await transport.makeRequest({url: 'http://test'}); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const headers = requestStub.firstCall.args[0].headers as any; + const apiClientHeader = headers['x-goog-api-client']; + + assert.ok(apiClientHeader.includes('gccl-invocation-id/')); + const id = apiClientHeader.split('gccl-invocation-id/')[1]; + assert.strictEqual(id.length, 36); + }); + it('should handle absolute URLs and project validation', async () => { const requestStub = authClientStub.request as sinon.SinonStub; requestStub.resolves({data: {}, headers: new Map()});