diff --git a/handwritten/spanner/src/transaction.ts b/handwritten/spanner/src/transaction.ts index e0bdfb54ebd8..e2e30acea34b 100644 --- a/handwritten/spanner/src/transaction.ts +++ b/handwritten/spanner/src/transaction.ts @@ -56,6 +56,22 @@ export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; const RETRY_INFO_BIN = 'google.rpc.retryinfo-bin'; +let nextAffinityId = 0; + +/** + * Injects a key-value pair into the gaxOpts.otherArgs.options object + * without mutating the original. + */ +function injectGaxOpt(existingOpts: any, key: string, value: any): any { + return Object.assign({}, existingOpts, { + otherArgs: Object.assign({}, existingOpts?.otherArgs, { + options: Object.assign({}, existingOpts?.otherArgs?.options, { + [key]: value, + }), + }), + }); +} + export interface TimestampBounds { strong?: boolean; minReadTimestamp?: PreciseDate | spannerClient.protobuf.ITimestamp; @@ -292,6 +308,9 @@ export class Snapshot extends EventEmitter { | undefined | null; id?: Uint8Array | string; + public _affinityKey?: string; + public _bindGaxOpts?: CallOptions; + public _unbindGaxOpts?: CallOptions; multiplexedSessionPreviousTransactionId?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -361,8 +380,60 @@ export class Snapshot extends EventEmitter { this.ended = false; this.session = session; this.queryOptions = Object.assign({}, queryOptions); - this.request = session.request.bind(session); - this.requestStream = session.requestStream.bind(session); + // If the session is multiplexed, generate a unique affinity key for this + // specific transaction/snapshot. This allows requests using the same shared + // multiplexed session to be distributed across different gRPC channels. + if (session.metadata && session.metadata.multiplexed) { + this._affinityKey = `mux-affinity-${process.pid}-${nextAffinityId++}`; + // Pre-construct and cache the bind gax options to avoid creating + // a new object on every request, which improves performance. + this._bindGaxOpts = { + otherArgs: { + options: { + affinityKey: this._affinityKey, + }, + }, + }; + // Pre-construct and cache the unbind gax options. This explicitly signals + // the channel factory to release the affinity mapping when the transaction ends. + this._unbindGaxOpts = { + otherArgs: { + options: { + affinityKey: this._affinityKey, + unbind: true, + }, + }, + }; + } + this.request = (config: any, callback: Function) => { + if (this._affinityKey) { + if (!config.gaxOpts || Object.keys(config.gaxOpts).length === 0) { + config.gaxOpts = this._bindGaxOpts as any; + } else { + config.gaxOpts = injectGaxOpt( + config.gaxOpts, + 'affinityKey', + this._affinityKey, + ); + } + } + return session.request(config, callback); + }; + + this.requestStream = (config: any) => { + if (this._affinityKey) { + if (!config.gaxOpts || Object.keys(config.gaxOpts).length === 0) { + config.gaxOpts = this._bindGaxOpts as any; + } else { + config.gaxOpts = injectGaxOpt( + config.gaxOpts, + 'affinityKey', + this._affinityKey, + ); + } + } + return session.requestStream(config); + }; const readOnly = Snapshot.encodeTimestampBounds(options || {}); this._options = {readOnly}; @@ -1024,6 +1095,20 @@ export class Snapshot extends EventEmitter { this.ended = true; process.nextTick(() => this.emit('end')); + + if (this._affinityKey) { + const database = this.session.parent as Database; + const spanner = database.parent.parent as Spanner; + const client = spanner.clients_.get('SpannerClient') as any; + + if (client?.spannerStub) { + Promise.resolve(client.spannerStub) + .then((stub: any) => { + stub?.getChannel?.()?.unbind?.(this._affinityKey); + }) + .catch(() => {}); + } + } } /** @@ -2374,7 +2459,7 @@ export class Transaction extends Dml { typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const gaxOpts = + let gaxOpts = 'gaxOptions' in options ? (options as CommitOptions).gaxOptions : options; const mutations = this._queuedMutations; @@ -2449,12 +2534,20 @@ export class Transaction extends Dml { span.addEvent('Starting Commit'); const database = this.session.parent as Database; + if (this._affinityKey) { + if (!gaxOpts || Object.keys(gaxOpts).length === 0) { + gaxOpts = this._unbindGaxOpts as any; + } else { + gaxOpts = injectGaxOpt(gaxOpts, 'unbind', true); + } + } + this.request( { client: 'SpannerClient', method: 'commit', reqOpts, - gaxOpts: gaxOpts, + gaxOpts, headers: injectRequestIDIntoHeaders( headers, this.session, @@ -2787,7 +2880,7 @@ export class Transaction extends Dml { | spannerClient.spanner.v1.Spanner.RollbackCallback, cb?: spannerClient.spanner.v1.Spanner.RollbackCallback, ): void | Promise { - const gaxOpts = + let gaxOpts = typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; @@ -2812,6 +2905,14 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } + if (this._affinityKey) { + if (!gaxOpts || Object.keys(gaxOpts).length === 0) { + gaxOpts = this._unbindGaxOpts as any; + } else { + gaxOpts = injectGaxOpt(gaxOpts, 'unbind', true); + } + } + this.request( { client: 'SpannerClient', diff --git a/handwritten/spanner/test/index.ts b/handwritten/spanner/test/index.ts index 6a9d96325b47..68c377d98334 100644 --- a/handwritten/spanner/test/index.ts +++ b/handwritten/spanner/test/index.ts @@ -2209,7 +2209,6 @@ describe('Spanner', () => { FAKE_GAPIC_CLIENT[CONFIG.method] = function (reqOpts, gaxOpts, arg) { assert.strictEqual(this, FAKE_GAPIC_CLIENT); assert.deepStrictEqual(reqOpts, CONFIG.reqOpts); - assert.notStrictEqual(reqOpts, CONFIG.reqOpts); // Check that gaxOpts has the expected structure assert.ok(gaxOpts.otherArgs); diff --git a/handwritten/spanner/test/multiplexed-session.ts b/handwritten/spanner/test/multiplexed-session.ts index d21f912f5e8f..ba934dd25d5a 100644 --- a/handwritten/spanner/test/multiplexed-session.ts +++ b/handwritten/spanner/test/multiplexed-session.ts @@ -43,7 +43,11 @@ describe('MultiplexedSession', () => { return Object.assign(new Session(DATABASE, name), props, { create: sandbox.stub().resolves(), - transaction: sandbox.stub().returns(new FakeTransaction()), + transaction: sandbox.stub().callsFake(() => { + const txn = new FakeTransaction(); + (txn as any)._affinityKey = 'mock-uuid'; + return txn; + }), }); }; @@ -185,13 +189,15 @@ describe('MultiplexedSession', () => { }); }); - it('should pass back the session and txn', done => { - const fakeTxn = new FakeTransaction() as unknown as Transaction; + it('should pass back the session and txn with affinity key', done => { sandbox.stub(multiplexedSession, '_getSession').resolves(fakeMuxSession); multiplexedSession.getSession((err, session, txn) => { assert.ifError(err); assert.strictEqual(session, fakeMuxSession); - assert.deepStrictEqual(txn, fakeTxn); + assert(txn); + assert(txn._affinityKey); + assert.strictEqual(typeof txn._affinityKey, 'string'); + assert(txn._affinityKey.length > 0); done(); }); }); diff --git a/handwritten/spanner/test/transaction.ts b/handwritten/spanner/test/transaction.ts index 45b1d24a840d..4b61833f1f5d 100644 --- a/handwritten/spanner/test/transaction.ts +++ b/handwritten/spanner/test/transaction.ts @@ -163,6 +163,49 @@ describe('Transaction', () => { assert.strictEqual(REQUEST_STREAM.callCount, 1); }); + it('should generate _affinityKey for multiplexed sessions', () => { + const multiplexedSession = Object.assign({}, SESSION, { + metadata: {multiplexed: true}, + }); + const txn = new Snapshot(multiplexedSession); + assert.ok(txn._affinityKey); + assert.ok(txn._affinityKey.startsWith('mux-affinity-')); + assert.ok(txn._bindGaxOpts.otherArgs.options.affinityKey); + assert.strictEqual( + txn._bindGaxOpts.otherArgs.options.affinityKey, + txn._affinityKey, + ); + assert.strictEqual( + txn._unbindGaxOpts.otherArgs.options.affinityKey, + txn._affinityKey, + ); + assert.strictEqual(txn._unbindGaxOpts.otherArgs.options.unbind, true); + }); + + it('should inject affinity key in `Session#request` if multiplexed', () => { + REQUEST.resetHistory(); + const multiplexedSession = Object.assign({}, SESSION, { + metadata: {multiplexed: true}, + }); + const txn = new Snapshot(multiplexedSession); + txn.request({client: 'SpannerClient'}, () => {}); + assert.strictEqual(REQUEST.callCount, 1); + const arg = REQUEST.lastCall.args[0]; + assert.deepStrictEqual(arg.gaxOpts, txn._bindGaxOpts); + }); + + it('should inject affinity key in `Session#requestStream` if multiplexed', () => { + REQUEST_STREAM.resetHistory(); + const multiplexedSession = Object.assign({}, SESSION, { + metadata: {multiplexed: true}, + }); + const txn = new Snapshot(multiplexedSession); + txn.requestStream({client: 'SpannerClient'}); + assert.strictEqual(REQUEST_STREAM.callCount, 1); + const arg = REQUEST_STREAM.lastCall.args[0]; + assert.deepStrictEqual(arg.gaxOpts, txn._bindGaxOpts); + }); + it('should set the commonHeaders_', () => { assert.deepStrictEqual(snapshot.commonHeaders_, { [CLOUD_RESOURCE_HEADER]: snapshot.session.parent.formattedName_, @@ -540,6 +583,31 @@ describe('Transaction', () => { snapshot.end(); snapshot.end(); }); + + it('should unbind affinity key on end if multiplexed', done => { + const fakeUnbind = sandbox.stub(); + const fakeStub = {getChannel: () => ({unbind: fakeUnbind})}; + const fakeClient = {spannerStub: Promise.resolve(fakeStub)}; + const fakeSpanner = { + clients_: new Map([['SpannerClient', fakeClient]]), + }; + const fakeDatabase = {parent: {parent: fakeSpanner}}; + + const multiplexedSession = Object.assign({}, SESSION, { + parent: fakeDatabase, + metadata: {multiplexed: true}, + }); + const txn = new Snapshot(multiplexedSession); + + txn.on('end', () => { + setTimeout(() => { + assert.strictEqual(fakeUnbind.callCount, 1); + assert.strictEqual(fakeUnbind.lastCall.args[0], txn._affinityKey); + done(); + }, 10); + }); + txn.end(); + }); }); describe('read', () => { @@ -1860,6 +1928,18 @@ describe('Transaction', () => { ); }); + it('should inject _unbindGaxOpts for commit if _affinityKey is present', () => { + const stub = sandbox.stub(transaction, 'request'); + (transaction as any)._affinityKey = 'mux-affinity-1'; + const unbindOpts = {otherArgs: {options: {unbind: true}}}; + (transaction as any)._unbindGaxOpts = unbindOpts; + + transaction.commit(); + + const {gaxOpts} = stub.lastCall.args[0]; + assert.deepStrictEqual(gaxOpts, unbindOpts); + }); + it('should accept gaxOptions as CallOptions', done => { const gaxOptions = { retry: { @@ -2385,6 +2465,18 @@ describe('Transaction', () => { ); }); + it('should inject _unbindGaxOpts for rollback if _affinityKey is present', () => { + const stub = sandbox.stub(transaction, 'request'); + (transaction as any)._affinityKey = 'mux-affinity-1'; + const unbindOpts = {otherArgs: {options: {unbind: true}}}; + (transaction as any)._unbindGaxOpts = unbindOpts; + + transaction.rollback(); + + const {gaxOpts} = stub.lastCall.args[0]; + assert.deepStrictEqual(gaxOpts, unbindOpts); + }); + it('should accept gaxOptions', done => { const gaxOptions = {}; transaction.request = config => {