Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 106 additions & 5 deletions handwritten/spanner/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ export type Rows = Array<Row | Json>;
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
const RETRY_INFO_BIN = 'google.rpc.retryinfo-bin';

let nextAffinityId = 0;

/**
* Injects a key-value pair into the gaxOpts.otherArgs.options object
* without mutating the original.
*/
function injectGaxOpt(existingOpts: any, key: string, value: any): any {
return Object.assign({}, existingOpts, {
otherArgs: Object.assign({}, existingOpts?.otherArgs, {
options: Object.assign({}, existingOpts?.otherArgs?.options, {
[key]: value,
}),
}),
});
}

export interface TimestampBounds {
strong?: boolean;
minReadTimestamp?: PreciseDate | spannerClient.protobuf.ITimestamp;
Expand Down Expand Up @@ -292,6 +308,9 @@ export class Snapshot extends EventEmitter {
| undefined
| null;
id?: Uint8Array | string;
public _affinityKey?: string;
public _bindGaxOpts?: CallOptions;
public _unbindGaxOpts?: CallOptions;
multiplexedSessionPreviousTransactionId?: Uint8Array | string;
ended: boolean;
metadata?: spannerClient.spanner.v1.ITransaction;
Expand Down Expand Up @@ -361,8 +380,60 @@ export class Snapshot extends EventEmitter {
this.ended = false;
this.session = session;
this.queryOptions = Object.assign({}, queryOptions);
this.request = session.request.bind(session);
this.requestStream = session.requestStream.bind(session);
// If the session is multiplexed, generate a unique affinity key for this
// specific transaction/snapshot. This allows requests using the same shared
// multiplexed session to be distributed across different gRPC channels.
if (session.metadata && session.metadata.multiplexed) {
this._affinityKey = `mux-affinity-${process.pid}-${nextAffinityId++}`;
// Pre-construct and cache the bind gax options to avoid creating
// a new object on every request, which improves performance.
this._bindGaxOpts = {
otherArgs: {
options: {
affinityKey: this._affinityKey,
},
},
};
// Pre-construct and cache the unbind gax options. This explicitly signals
// the channel factory to release the affinity mapping when the transaction ends.
this._unbindGaxOpts = {
otherArgs: {
options: {
affinityKey: this._affinityKey,
unbind: true,
},
},
};
}
this.request = (config: any, callback: Function) => {
if (this._affinityKey) {
if (!config.gaxOpts || Object.keys(config.gaxOpts).length === 0) {
config.gaxOpts = this._bindGaxOpts as any;
} else {
config.gaxOpts = injectGaxOpt(
config.gaxOpts,
'affinityKey',
this._affinityKey,
);
}
}
return session.request(config, callback);
};
Comment thread
alkatrivedi marked this conversation as resolved.

this.requestStream = (config: any) => {
if (this._affinityKey) {
if (!config.gaxOpts || Object.keys(config.gaxOpts).length === 0) {
config.gaxOpts = this._bindGaxOpts as any;
} else {
config.gaxOpts = injectGaxOpt(
config.gaxOpts,
'affinityKey',
this._affinityKey,
);
}
}
return session.requestStream(config);
};
Comment thread
alkatrivedi marked this conversation as resolved.
Comment on lines +408 to +436

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Mutating the config object passed to this.request and this.requestStream can lead to unexpected side effects if the caller reuses the configuration object (e.g., during retries or multiple sequential requests). It is safer to shallow copy the config object before modifying its gaxOpts property.

    this.request = (config: any, callback: Function) => {
      if (this._affinityKey) {
        let gaxOpts;
        if (!config.gaxOpts || Object.keys(config.gaxOpts).length === 0) {
          gaxOpts = this._bindGaxOpts as any;
        } else {
          gaxOpts = injectGaxOpt(
            config.gaxOpts,
            'affinityKey',
            this._affinityKey,
          );
        }
        config = Object.assign({}, config, {gaxOpts});
      }
      return session.request(config, callback);
    };

    this.requestStream = (config: any) => {
      if (this._affinityKey) {
        let gaxOpts;
        if (!config.gaxOpts || Object.keys(config.gaxOpts).length === 0) {
          gaxOpts = this._bindGaxOpts as any;
        } else {
          gaxOpts = injectGaxOpt(
            config.gaxOpts,
            'affinityKey',
            this._affinityKey,
          );
        }
        config = Object.assign({}, config, {gaxOpts});
      }
      return session.requestStream(config);
    };


const readOnly = Snapshot.encodeTimestampBounds(options || {});
this._options = {readOnly};
Expand Down Expand Up @@ -1024,6 +1095,20 @@ export class Snapshot extends EventEmitter {

this.ended = true;
process.nextTick(() => this.emit('end'));

if (this._affinityKey) {
const database = this.session.parent as Database;
const spanner = database.parent.parent as Spanner;
const client = spanner.clients_.get('SpannerClient') as any;
Comment on lines +1100 to +1102

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Accessing nested properties like this.session.parent and database.parent.parent without optional chaining can cause a TypeError at runtime if any of these parent references are undefined (e.g., in lightweight test environments or mocked sessions). Using optional chaining ensures defensive programming and avoids potential crashes.

Suggested change
const database = this.session.parent as Database;
const spanner = database.parent.parent as Spanner;
const client = spanner.clients_.get('SpannerClient') as any;
const database = this.session?.parent as Database;
const spanner = database?.parent?.parent as Spanner;
const client = spanner?.clients_?.get('SpannerClient') as any;


if (client?.spannerStub) {
Promise.resolve(client.spannerStub)
.then((stub: any) => {
stub?.getChannel?.()?.unbind?.(this._affinityKey);
})
.catch(() => {});
}
}
}

/**
Expand Down Expand Up @@ -2374,7 +2459,7 @@ export class Transaction extends Dml {
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
const gaxOpts =
let gaxOpts =
'gaxOptions' in options ? (options as CommitOptions).gaxOptions : options;

const mutations = this._queuedMutations;
Expand Down Expand Up @@ -2449,12 +2534,20 @@ export class Transaction extends Dml {
span.addEvent('Starting Commit');

const database = this.session.parent as Database;
if (this._affinityKey) {
if (!gaxOpts || Object.keys(gaxOpts).length === 0) {
gaxOpts = this._unbindGaxOpts as any;
} else {
gaxOpts = injectGaxOpt(gaxOpts, 'unbind', true);
}
}

this.request(
{
client: 'SpannerClient',
method: 'commit',
reqOpts,
gaxOpts: gaxOpts,
gaxOpts,
headers: injectRequestIDIntoHeaders(
headers,
this.session,
Expand Down Expand Up @@ -2787,7 +2880,7 @@ export class Transaction extends Dml {
| spannerClient.spanner.v1.Spanner.RollbackCallback,
cb?: spannerClient.spanner.v1.Spanner.RollbackCallback,
): void | Promise<void> {
const gaxOpts =
let gaxOpts =
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};
const callback =
typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!;
Expand All @@ -2812,6 +2905,14 @@ export class Transaction extends Dml {
addLeaderAwareRoutingHeader(headers);
}

if (this._affinityKey) {
if (!gaxOpts || Object.keys(gaxOpts).length === 0) {
gaxOpts = this._unbindGaxOpts as any;
} else {
gaxOpts = injectGaxOpt(gaxOpts, 'unbind', true);
}
}

this.request(
{
client: 'SpannerClient',
Expand Down
1 change: 0 additions & 1 deletion handwritten/spanner/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2209,7 +2209,6 @@ describe('Spanner', () => {
FAKE_GAPIC_CLIENT[CONFIG.method] = function (reqOpts, gaxOpts, arg) {
assert.strictEqual(this, FAKE_GAPIC_CLIENT);
assert.deepStrictEqual(reqOpts, CONFIG.reqOpts);
assert.notStrictEqual(reqOpts, CONFIG.reqOpts);

// Check that gaxOpts has the expected structure
assert.ok(gaxOpts.otherArgs);
Expand Down
14 changes: 10 additions & 4 deletions handwritten/spanner/test/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ describe('MultiplexedSession', () => {

return Object.assign(new Session(DATABASE, name), props, {
create: sandbox.stub().resolves(),
transaction: sandbox.stub().returns(new FakeTransaction()),
transaction: sandbox.stub().callsFake(() => {
const txn = new FakeTransaction();
(txn as any)._affinityKey = 'mock-uuid';
return txn;
}),
});
};

Expand Down Expand Up @@ -185,13 +189,15 @@ describe('MultiplexedSession', () => {
});
});

it('should pass back the session and txn', done => {
const fakeTxn = new FakeTransaction() as unknown as Transaction;
it('should pass back the session and txn with affinity key', done => {
sandbox.stub(multiplexedSession, '_getSession').resolves(fakeMuxSession);
multiplexedSession.getSession((err, session, txn) => {
assert.ifError(err);
assert.strictEqual(session, fakeMuxSession);
assert.deepStrictEqual(txn, fakeTxn);
assert(txn);
assert(txn._affinityKey);
assert.strictEqual(typeof txn._affinityKey, 'string');
assert(txn._affinityKey.length > 0);
done();
});
});
Expand Down
92 changes: 92 additions & 0 deletions handwritten/spanner/test/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,49 @@ describe('Transaction', () => {
assert.strictEqual(REQUEST_STREAM.callCount, 1);
});

it('should generate _affinityKey for multiplexed sessions', () => {
const multiplexedSession = Object.assign({}, SESSION, {
metadata: {multiplexed: true},
});
const txn = new Snapshot(multiplexedSession);
assert.ok(txn._affinityKey);
assert.ok(txn._affinityKey.startsWith('mux-affinity-'));
assert.ok(txn._bindGaxOpts.otherArgs.options.affinityKey);
assert.strictEqual(
txn._bindGaxOpts.otherArgs.options.affinityKey,
txn._affinityKey,
);
assert.strictEqual(
txn._unbindGaxOpts.otherArgs.options.affinityKey,
txn._affinityKey,
);
assert.strictEqual(txn._unbindGaxOpts.otherArgs.options.unbind, true);
});

it('should inject affinity key in `Session#request` if multiplexed', () => {
REQUEST.resetHistory();
const multiplexedSession = Object.assign({}, SESSION, {
metadata: {multiplexed: true},
});
const txn = new Snapshot(multiplexedSession);
txn.request({client: 'SpannerClient'}, () => {});
assert.strictEqual(REQUEST.callCount, 1);
const arg = REQUEST.lastCall.args[0];
assert.deepStrictEqual(arg.gaxOpts, txn._bindGaxOpts);
});

it('should inject affinity key in `Session#requestStream` if multiplexed', () => {
REQUEST_STREAM.resetHistory();
const multiplexedSession = Object.assign({}, SESSION, {
metadata: {multiplexed: true},
});
const txn = new Snapshot(multiplexedSession);
txn.requestStream({client: 'SpannerClient'});
assert.strictEqual(REQUEST_STREAM.callCount, 1);
const arg = REQUEST_STREAM.lastCall.args[0];
assert.deepStrictEqual(arg.gaxOpts, txn._bindGaxOpts);
});

it('should set the commonHeaders_', () => {
assert.deepStrictEqual(snapshot.commonHeaders_, {
[CLOUD_RESOURCE_HEADER]: snapshot.session.parent.formattedName_,
Expand Down Expand Up @@ -540,6 +583,31 @@ describe('Transaction', () => {
snapshot.end();
snapshot.end();
});

it('should unbind affinity key on end if multiplexed', done => {
const fakeUnbind = sandbox.stub();
const fakeStub = {getChannel: () => ({unbind: fakeUnbind})};
const fakeClient = {spannerStub: Promise.resolve(fakeStub)};
const fakeSpanner = {
clients_: new Map([['SpannerClient', fakeClient]]),
};
const fakeDatabase = {parent: {parent: fakeSpanner}};

const multiplexedSession = Object.assign({}, SESSION, {
parent: fakeDatabase,
metadata: {multiplexed: true},
});
const txn = new Snapshot(multiplexedSession);

txn.on('end', () => {
setTimeout(() => {
assert.strictEqual(fakeUnbind.callCount, 1);
assert.strictEqual(fakeUnbind.lastCall.args[0], txn._affinityKey);
done();
}, 10);
});
txn.end();
});
});

describe('read', () => {
Expand Down Expand Up @@ -1860,6 +1928,18 @@ describe('Transaction', () => {
);
});

it('should inject _unbindGaxOpts for commit if _affinityKey is present', () => {
const stub = sandbox.stub(transaction, 'request');
(transaction as any)._affinityKey = 'mux-affinity-1';
const unbindOpts = {otherArgs: {options: {unbind: true}}};
(transaction as any)._unbindGaxOpts = unbindOpts;

transaction.commit();

const {gaxOpts} = stub.lastCall.args[0];
assert.deepStrictEqual(gaxOpts, unbindOpts);
});

it('should accept gaxOptions as CallOptions', done => {
const gaxOptions = {
retry: {
Expand Down Expand Up @@ -2385,6 +2465,18 @@ describe('Transaction', () => {
);
});

it('should inject _unbindGaxOpts for rollback if _affinityKey is present', () => {
const stub = sandbox.stub(transaction, 'request');
(transaction as any)._affinityKey = 'mux-affinity-1';
const unbindOpts = {otherArgs: {options: {unbind: true}}};
(transaction as any)._unbindGaxOpts = unbindOpts;

transaction.rollback();

const {gaxOpts} = stub.lastCall.args[0];
assert.deepStrictEqual(gaxOpts, unbindOpts);
});

it('should accept gaxOptions', done => {
const gaxOptions = {};
transaction.request = config => {
Expand Down
Loading