Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clients/python/src/taskbroker_client/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
The maximum number of bytes before a task parameter is compressed.
"""

DEFAULT_GRPC_MAX_MESSAGE_SIZE = 10 * 1024 * 1024 # 10MB
"""
The maximum size of a gRPC message in bytes.
Can be overridden via TASKBROKER_GRPC_MAX_MESSAGE_SIZE env var.
"""

DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH = 1.0
"""
The number of gRPC requests before touching the health check file
Expand Down
9 changes: 9 additions & 0 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import multiprocessing
import os
import queue
import signal
import threading
Expand All @@ -23,6 +24,7 @@

from taskbroker_client.app import import_app
from taskbroker_client.constants import (
DEFAULT_GRPC_MAX_MESSAGE_SIZE,
DEFAULT_REBALANCE_AFTER,
DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH,
DEFAULT_WORKER_QUEUE_SIZE,
Expand Down Expand Up @@ -261,9 +263,16 @@ def signal_handler(*args: Any) -> None:
if self._grpc_secrets:
interceptors = [RequestSignatureServerInterceptor(self._grpc_secrets)]

max_message_size = int(
os.environ.get("TASKBROKER_GRPC_MAX_MESSAGE_SIZE", DEFAULT_GRPC_MAX_MESSAGE_SIZE)
)
server = grpc.server(
ThreadPoolExecutor(max_workers=self._concurrency),
interceptors=interceptors,
options=[
("grpc.max_receive_message_length", max_message_size),
("grpc.max_send_message_length", max_message_size),
],
)

taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server(
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ pub struct Config {
/// If a message is bigger than this then the produce will fail.
pub max_message_size: u64,

/// The maximum size in bytes for gRPC messages sent to workers.
/// Should be at least as large as max_message_size.
pub grpc_max_message_size: usize,

/// The number of pages to vacuum from sqlite when vacuum is run.
/// If None, all pages will be vacuumed.
pub vacuum_page_count: Option<usize>,
Expand Down Expand Up @@ -420,6 +424,7 @@ impl Default for Config {
maintenance_task_interval_ms: 6000,
max_delayed_task_allowed_sec: 3600,
max_message_size: 5000000,
grpc_max_message_size: 10 * 1024 * 1024, // 10MB
vacuum_page_count: None,
full_vacuum_on_start: true,
full_vacuum_on_upkeep: true,
Expand Down
7 changes: 6 additions & 1 deletion src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ pub struct Worker {

impl Worker {
pub async fn connect(config: Arc<Config>, endpoint: String) -> Result<Self> {
let client = WorkerServiceClient::connect(endpoint).await?;
let channel = tonic::transport::Channel::from_shared(endpoint)?
.connect()
.await?;
let client = WorkerServiceClient::new(channel)
.max_encoding_message_size(config.grpc_max_message_size)
.max_decoding_message_size(config.grpc_max_message_size);

let secrets = config.grpc_shared_secret.clone();
let timeout = Duration::from_millis(config.push_timeout_ms);
Expand Down
Loading