From 9ec363192edc44adc106bab96e62f62be27d591a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 29 May 2026 22:32:24 +0200 Subject: [PATCH] fix(grpc): Increase max message size to 10MB The default gRPC max message size is 4MB. Tasks with payloads larger than this fail to be pushed to workers and get stuck in an infinite retry loop, causing the pending_activation_max_lag metric to flap. Broker side: Add grpc_max_message_size config (default 10MB) and use it when creating the WorkerServiceClient. Worker side: Add TASKBROKER_GRPC_MAX_MESSAGE_SIZE env var (default 10MB) and apply it to the gRPC server options. ref INC-2209 Co-Authored-By: Claude Opus 4.5 --- clients/python/src/taskbroker_client/constants.py | 6 ++++++ clients/python/src/taskbroker_client/worker/worker.py | 9 +++++++++ src/config.rs | 5 +++++ src/worker.rs | 7 ++++++- 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/constants.py b/clients/python/src/taskbroker_client/constants.py index 6655c199..383a2b0f 100644 --- a/clients/python/src/taskbroker_client/constants.py +++ b/clients/python/src/taskbroker_client/constants.py @@ -46,6 +46,12 @@ The maximum number of bytes before a task parameter is compressed. """ +DEFAULT_GRPC_MAX_MESSAGE_SIZE = 10 * 1024 * 1024 # 10MB +""" +The maximum size of a gRPC message in bytes. +Can be overridden via TASKBROKER_GRPC_MAX_MESSAGE_SIZE env var. +""" + DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH = 1.0 """ The number of gRPC requests before touching the health check file diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 86327dec..357d4708 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -2,6 +2,7 @@ import logging import multiprocessing +import os import queue import signal import threading @@ -23,6 +24,7 @@ from taskbroker_client.app import import_app from taskbroker_client.constants import ( + DEFAULT_GRPC_MAX_MESSAGE_SIZE, DEFAULT_REBALANCE_AFTER, DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, DEFAULT_WORKER_QUEUE_SIZE, @@ -261,9 +263,16 @@ def signal_handler(*args: Any) -> None: if self._grpc_secrets: interceptors = [RequestSignatureServerInterceptor(self._grpc_secrets)] + max_message_size = int( + os.environ.get("TASKBROKER_GRPC_MAX_MESSAGE_SIZE", DEFAULT_GRPC_MAX_MESSAGE_SIZE) + ) server = grpc.server( ThreadPoolExecutor(max_workers=self._concurrency), interceptors=interceptors, + options=[ + ("grpc.max_receive_message_length", max_message_size), + ("grpc.max_send_message_length", max_message_size), + ], ) taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( diff --git a/src/config.rs b/src/config.rs index a5fedf3a..17705933 100644 --- a/src/config.rs +++ b/src/config.rs @@ -268,6 +268,10 @@ pub struct Config { /// If a message is bigger than this then the produce will fail. pub max_message_size: u64, + /// The maximum size in bytes for gRPC messages sent to workers. + /// Should be at least as large as max_message_size. + pub grpc_max_message_size: usize, + /// The number of pages to vacuum from sqlite when vacuum is run. /// If None, all pages will be vacuumed. pub vacuum_page_count: Option, @@ -420,6 +424,7 @@ impl Default for Config { maintenance_task_interval_ms: 6000, max_delayed_task_allowed_sec: 3600, max_message_size: 5000000, + grpc_max_message_size: 10 * 1024 * 1024, // 10MB vacuum_page_count: None, full_vacuum_on_start: true, full_vacuum_on_upkeep: true, diff --git a/src/worker.rs b/src/worker.rs index 29250ff7..2942ce41 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -57,7 +57,12 @@ pub struct Worker { impl Worker { pub async fn connect(config: Arc, endpoint: String) -> Result { - let client = WorkerServiceClient::connect(endpoint).await?; + let channel = tonic::transport::Channel::from_shared(endpoint)? + .connect() + .await?; + let client = WorkerServiceClient::new(channel) + .max_encoding_message_size(config.grpc_max_message_size) + .max_decoding_message_size(config.grpc_max_message_size); let secrets = config.grpc_shared_secret.clone(); let timeout = Duration::from_millis(config.push_timeout_ms);