From f4419bf9c3195d5f7a6aae4ed3aa947a08015c29 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Mon, 20 Apr 2026 13:18:00 +0200 Subject: [PATCH 1/3] Log Kafka consumer.close errors --- packages/kafka/lib/AbstractKafkaConsumer.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 320e704b..367c16fa 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -2,7 +2,7 @@ import { randomUUID } from 'node:crypto' import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { - InternalError, + InternalError, isError, resolveGlobalErrorLogObject, stringValueSerializer, type TransactionObservabilityManager, @@ -239,8 +239,12 @@ export abstract class AbstractKafkaConsumer< try { await this.consumer.close() - } catch { - // Ignoring errors at this stage + } catch (err) { + // Reporting error but not throwing further + this.logger.warn(resolveGlobalErrorLogObject(err), 'Error while closing Kafka consumer') + this.errorReporter.report({ + error: isError(err) ? err : new Error('Unknown error while closing Kafka consumer') + }) } this.consumer = undefined } From 147316bdb167a8d966547ce22502671b63584f8a Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Mon, 20 Apr 2026 13:18:45 +0200 Subject: [PATCH 2/3] linting --- packages/kafka/lib/AbstractKafkaConsumer.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 367c16fa..b0710436 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -2,7 +2,8 @@ import { randomUUID } from 'node:crypto' import { pipeline } from 'node:stream/promises' import { setTimeout } from 'node:timers/promises' import { - InternalError, isError, + InternalError, + isError, resolveGlobalErrorLogObject, stringValueSerializer, type TransactionObservabilityManager, @@ -243,7 +244,7 @@ export abstract class AbstractKafkaConsumer< // Reporting error but not throwing further this.logger.warn(resolveGlobalErrorLogObject(err), 'Error while closing Kafka consumer') this.errorReporter.report({ - error: isError(err) ? err : new Error('Unknown error while closing Kafka consumer') + error: isError(err) ? err : new Error('Unknown error while closing Kafka consumer'), }) } this.consumer = undefined From 378f87c4466cfef0653a1cf3035393d348204250 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Mon, 20 Apr 2026 13:24:19 +0200 Subject: [PATCH 3/3] Including resolveErrorLog in error report context --- packages/kafka/lib/AbstractKafkaConsumer.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index b0710436..14972f7b 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -242,9 +242,11 @@ export abstract class AbstractKafkaConsumer< await this.consumer.close() } catch (err) { // Reporting error but not throwing further - this.logger.warn(resolveGlobalErrorLogObject(err), 'Error while closing Kafka consumer') + const resolvedErrorLog = resolveGlobalErrorLogObject(err) + this.logger.warn(resolvedErrorLog, 'Error while closing Kafka consumer') this.errorReporter.report({ error: isError(err) ? err : new Error('Unknown error while closing Kafka consumer'), + context: resolvedErrorLog, }) } this.consumer = undefined