Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 19 additions & 31 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ export class SignalClient {
private useV0SignalPath = false;

constructor(useJSON: boolean = false, loggerOptions: LoggerOptions = {}) {
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.Signal);
this.loggerContextCb = loggerOptions.loggerContextCb;
this.log = getLogger(loggerOptions.loggerName ?? LoggerNames.Signal, () => this.logContext);
this.useJSON = useJSON;
this.requestQueue = new AsyncQueue();
this.queuedRequests = [];
Expand Down Expand Up @@ -284,10 +284,7 @@ export class SignalClient {
reason?: ReconnectReason,
): Promise<ReconnectResponse | undefined> {
if (!this.options) {
this.log.warn(
'attempted to reconnect without signal options being set, ignoring',
this.logContext,
);
this.log.warn('attempted to reconnect without signal options being set, ignoring');
return;
}
this.state = SignalConnectionState.RECONNECTING;
Expand Down Expand Up @@ -377,10 +374,9 @@ export class SignalClient {
if (redactedUrl.searchParams.has('access_token')) {
redactedUrl.searchParams.set('access_token', '<redacted>');
}
this.log.debug(`connecting to ${redactedUrl}`, {
this.log.info(`signal connecting to ${redactedUrl}`, {
reconnect: opts.reconnect,
reconnectReason: opts.reconnectReason,
...this.logContext,
});
if (this.ws) {
await this.close(false);
Expand All @@ -399,7 +395,6 @@ export class SignalClient {
}
if (closeInfo.closeCode !== 1000) {
this.log.warn(`websocket closed`, {
...this.logContext,
reason: closeInfo.reason,
code: closeInfo.closeCode,
wasClean: closeInfo.closeCode === 1000,
Expand Down Expand Up @@ -466,7 +461,6 @@ export class SignalClient {

if (this.pingTimeoutDuration && this.pingTimeoutDuration > 0) {
this.log.debug('ping config', {
...this.logContext,
timeout: this.pingTimeoutDuration,
interval: this.pingIntervalDuration,
});
Expand Down Expand Up @@ -555,7 +549,7 @@ export class SignalClient {
await Promise.race([closePromise, sleep(MAX_WS_CLOSE_TIME)]);
}
} catch (e) {
this.log.debug('websocket error while closing', { ...this.logContext, error: e });
this.log.debug('websocket error while closing', { error: e });
} finally {
if (updateState) {
this.state = SignalConnectionState.DISCONNECTED;
Expand All @@ -566,7 +560,7 @@ export class SignalClient {

// initial offer after joining
sendOffer(offer: RTCSessionDescriptionInit, offerId: number) {
this.log.debug('sending offer', { ...this.logContext, offerSdp: offer.sdp });
this.log.debug('sending offer', { offerSdp: offer.sdp });
this.sendRequest({
case: 'offer',
value: toProtoSessionDescription(offer, offerId),
Expand All @@ -575,15 +569,15 @@ export class SignalClient {

// answer a server-initiated offer
sendAnswer(answer: RTCSessionDescriptionInit, offerId: number) {
this.log.debug('sending answer', { ...this.logContext, answerSdp: answer.sdp });
this.log.debug('sending answer', { answerSdp: answer.sdp });
return this.sendRequest({
case: 'answer',
value: toProtoSessionDescription(answer, offerId),
});
}

sendIceCandidate(candidate: RTCIceCandidateInit, target: SignalTarget) {
this.log.debug('sending ice candidate', { ...this.logContext, candidate });
this.log.debug('sending ice candidate', { candidate });
return this.sendRequest({
case: 'trickle',
value: new TrickleRequest({
Expand Down Expand Up @@ -768,10 +762,7 @@ export class SignalClient {
return;
}
if (!this.streamWriter) {
this.log.error(
`cannot send signal request before connected, type: ${message?.case}`,
this.logContext,
);
this.log.error(`cannot send signal request before connected, type: ${message?.case}`);
return;
}
const req = new SignalRequest({ message });
Expand All @@ -783,14 +774,14 @@ export class SignalClient {
await this.streamWriter.write(req.toBinary());
}
} catch (e) {
this.log.error('error sending signal message', { ...this.logContext, error: e });
this.log.error('error sending signal message', { error: e });
}
}

private handleSignalResponse(res: SignalResponse) {
const msg = res.message;
if (msg == undefined) {
this.log.debug('received unsupported message', this.logContext);
this.log.debug('received unsupported message');
return;
}

Expand Down Expand Up @@ -899,7 +890,7 @@ export class SignalClient {
this.onDataTrackSubscriberHandles(msg.value);
}
} else {
this.log.debug('unsupported message', { ...this.logContext, msgCase: msg.case });
this.log.debug('unsupported message', { msgCase: msg.case });
}

if (!pingHandled) {
Expand All @@ -920,14 +911,14 @@ export class SignalClient {
if (this.state === SignalConnectionState.DISCONNECTED) return;
const onCloseCallback = this.onClose;
await this.close(undefined, reason);
this.log.debug(`websocket connection closed: ${reason}`, { ...this.logContext, reason });
this.log.info(`websocket connection closed: ${reason}`, { reason });
if (onCloseCallback) {
onCloseCallback(reason);
}
}

private handleWSError(error: unknown) {
this.log.error('websocket error', { ...this.logContext, error });
this.log.error('websocket error', { error });
}

/**
Expand All @@ -937,15 +928,14 @@ export class SignalClient {
private resetPingTimeout() {
this.clearPingTimeout();
if (!this.pingTimeoutDuration) {
this.log.warn('ping timeout duration not set', this.logContext);
this.log.warn('ping timeout duration not set');
return;
}
this.pingTimeout = CriticalTimers.setTimeout(() => {
this.log.warn(
`ping timeout triggered. last pong received at: ${new Date(
Date.now() - this.pingTimeoutDuration! * 1000,
).toUTCString()}`,
this.logContext,
);
this.handleOnClose('ping timeout');
}, this.pingTimeoutDuration * 1000);
Expand All @@ -964,17 +954,17 @@ export class SignalClient {
this.clearPingInterval();
this.resetPingTimeout();
if (!this.pingIntervalDuration) {
this.log.warn('ping interval duration not set', this.logContext);
this.log.warn('ping interval duration not set');
return;
}
this.log.debug('start ping interval', this.logContext);
this.log.debug('start ping interval');
this.pingInterval = CriticalTimers.setInterval(() => {
this.sendPing();
}, this.pingIntervalDuration * 1000);
}

private clearPingInterval() {
this.log.debug('clearing ping interval', this.logContext);
this.log.debug('clearing ping interval');
this.clearPingTimeout();
if (this.pingInterval) {
CriticalTimers.clearInterval(this.pingInterval);
Expand All @@ -994,6 +984,7 @@ export class SignalClient {
firstMessage?: SignalResponse,
) {
this.state = SignalConnectionState.CONNECTED;
this.log.info('signal connected');
clearTimeout(timeoutHandle);
this.startPingInterval();
this.startReadingLoop(connection.readable.getReader(), firstMessage);
Expand Down Expand Up @@ -1031,10 +1022,7 @@ export class SignalClient {
};
} else {
// in reconnecting, any message received means signal reconnected and we still need to process it
this.log.debug(
'declaring signal reconnected without reconnect response received',
this.logContext,
);
this.log.debug('declaring signal reconnected without reconnect response received');
return {
isValid: true,
response: undefined,
Expand Down
Loading