diff --git a/clients/python/src/taskbroker_client/constants.py b/clients/python/src/taskbroker_client/constants.py index 6655c199..383a2b0f 100644 --- a/clients/python/src/taskbroker_client/constants.py +++ b/clients/python/src/taskbroker_client/constants.py @@ -46,6 +46,12 @@ The maximum number of bytes before a task parameter is compressed. """ +DEFAULT_GRPC_MAX_MESSAGE_SIZE = 10 * 1024 * 1024 # 10MB +""" +The maximum size of a gRPC message in bytes. +Can be overridden via TASKBROKER_GRPC_MAX_MESSAGE_SIZE env var. +""" + DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH = 1.0 """ The number of gRPC requests before touching the health check file diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 86327dec..357d4708 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -2,6 +2,7 @@ import logging import multiprocessing +import os import queue import signal import threading @@ -23,6 +24,7 @@ from taskbroker_client.app import import_app from taskbroker_client.constants import ( + DEFAULT_GRPC_MAX_MESSAGE_SIZE, DEFAULT_REBALANCE_AFTER, DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, DEFAULT_WORKER_QUEUE_SIZE, @@ -261,9 +263,16 @@ def signal_handler(*args: Any) -> None: if self._grpc_secrets: interceptors = [RequestSignatureServerInterceptor(self._grpc_secrets)] + max_message_size = int( + os.environ.get("TASKBROKER_GRPC_MAX_MESSAGE_SIZE", DEFAULT_GRPC_MAX_MESSAGE_SIZE) + ) server = grpc.server( ThreadPoolExecutor(max_workers=self._concurrency), interceptors=interceptors, + options=[ + ("grpc.max_receive_message_length", max_message_size), + ("grpc.max_send_message_length", max_message_size), + ], ) taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( diff --git a/src/config.rs b/src/config.rs index a5fedf3a..17705933 100644 --- a/src/config.rs +++ b/src/config.rs @@ -268,6 +268,10 @@ pub struct Config { /// If a message is bigger than this then the produce will fail. pub max_message_size: u64, + /// The maximum size in bytes for gRPC messages sent to workers. + /// Should be at least as large as max_message_size. + pub grpc_max_message_size: usize, + /// The number of pages to vacuum from sqlite when vacuum is run. /// If None, all pages will be vacuumed. pub vacuum_page_count: Option, @@ -420,6 +424,7 @@ impl Default for Config { maintenance_task_interval_ms: 6000, max_delayed_task_allowed_sec: 3600, max_message_size: 5000000, + grpc_max_message_size: 10 * 1024 * 1024, // 10MB vacuum_page_count: None, full_vacuum_on_start: true, full_vacuum_on_upkeep: true, diff --git a/src/worker.rs b/src/worker.rs index 29250ff7..2942ce41 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -57,7 +57,12 @@ pub struct Worker { impl Worker { pub async fn connect(config: Arc, endpoint: String) -> Result { - let client = WorkerServiceClient::connect(endpoint).await?; + let channel = tonic::transport::Channel::from_shared(endpoint)? + .connect() + .await?; + let client = WorkerServiceClient::new(channel) + .max_encoding_message_size(config.grpc_max_message_size) + .max_decoding_message_size(config.grpc_max_message_size); let secrets = config.grpc_shared_secret.clone(); let timeout = Duration::from_millis(config.push_timeout_ms);