diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ab5823e..a34650f8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -65,8 +65,14 @@ jobs: - name: Run pre-commit run: | - jq '.[]' --raw-output <<< '${{steps.changes.outputs.all_files}}' | - xargs pre-commit run --files + if [ "${{ github.event_name }}" = "push" ]; then + # On main branch, run on all files to catch any formatting drift + pre-commit run --all-files + else + # On PRs, run only on changed files + jq '.[]' --raw-output <<< '${{steps.changes.outputs.all_files}}' | + xargs pre-commit run --files + fi python-client-lint: name: Python Client Lint diff --git a/devenv/sync.py b/devenv/sync.py index a1bb373a..7bdfc7b2 100644 --- a/devenv/sync.py +++ b/devenv/sync.py @@ -1,8 +1,8 @@ +import shutil + from devenv import constants from devenv.lib import brew, config, proc, uv -import shutil - def main(context: dict[str, str]) -> int: reporoot = context["reporoot"] diff --git a/integration_tests/integration_tests/test_consumer_rebalancing.py b/integration_tests/integration_tests/test_consumer_rebalancing.py index b9d21732..50a125e6 100644 --- a/integration_tests/integration_tests/test_consumer_rebalancing.py +++ b/integration_tests/integration_tests/test_consumer_rebalancing.py @@ -14,7 +14,6 @@ TESTS_OUTPUT_ROOT, TaskbrokerConfig, create_topic, - get_available_ports, send_generic_messages_to_topic, ) @@ -86,11 +85,9 @@ def test_tasks_written_once_during_rebalancing() -> None: max_pending_count = 15_000 topic_name = "taskworker" kafka_deadletter_topic = "taskworker-dlq" - grpc_ports = get_available_ports(num_consumers) curr_time = int(time.time()) - print( - f""" + print(f""" Running test with the following configuration: num of consumers: {num_consumers}, num of messages: {num_messages}, @@ -99,10 +96,8 @@ def test_tasks_written_once_during_rebalancing() -> None: min restart duration: {min_restart_duration} seconds, max restart duration: {max_restart_duration} seconds, topic name: {topic_name}, - grpc ports: {grpc_ports}, random seed value: 42 - """ - ) + """) random.seed(42) # Ensure topic exists and has correct number of partitions @@ -123,7 +118,7 @@ def test_tasks_written_once_during_rebalancing() -> None: kafka_deadletter_topic=kafka_deadletter_topic, kafka_consumer_group=topic_name, kafka_auto_offset_reset="earliest", - grpc_port=grpc_ports[i], + grpc_port=0, # Disable gRPC - not needed for this test ) for filename, config in taskbroker_configs.items(): diff --git a/integration_tests/integration_tests/test_failed_tasks.py b/integration_tests/integration_tests/test_failed_tasks.py index e2b481e0..7fedb2dd 100644 --- a/integration_tests/integration_tests/test_failed_tasks.py +++ b/integration_tests/integration_tests/test_failed_tasks.py @@ -254,16 +254,14 @@ def test_failed_tasks() -> None: kafka_deadletter_topic = "taskworker-dlq" curr_time = int(time.time()) - print( - f""" + print(f""" Running test with the following configuration: num of messages: {num_messages}, num of partitions: {num_partitions}, num of workers: {num_workers}, max pending count: {max_pending_count}, topic name: {topic_name} - """ - ) + """) create_topic(topic_name, num_partitions) diff --git a/integration_tests/integration_tests/test_task_worker_processing.py b/integration_tests/integration_tests/test_task_worker_processing.py index b8a9f4ca..6bc6ff27 100644 --- a/integration_tests/integration_tests/test_task_worker_processing.py +++ b/integration_tests/integration_tests/test_task_worker_processing.py @@ -185,16 +185,14 @@ def test_task_worker_processing() -> None: kafka_deadletter_topic = "taskworker-dlq" curr_time = int(time.time()) - print( - f""" + print(f""" Running test with the following configuration: num of messages: {num_messages}, num of partitions: {num_partitions}, num of workers: {num_workers}, max pending count: {max_pending_count}, topic name: {topic_name} - """ - ) + """) create_topic(topic_name, num_partitions) diff --git a/integration_tests/integration_tests/test_upkeep_delay.py b/integration_tests/integration_tests/test_upkeep_delay.py index e1eea1e1..b3434915 100644 --- a/integration_tests/integration_tests/test_upkeep_delay.py +++ b/integration_tests/integration_tests/test_upkeep_delay.py @@ -199,8 +199,7 @@ def test_upkeep_delay() -> None: min_delay = 20 max_delay = 40 - print( - f""" + print(f""" Running test with the following configuration: num of messages: {num_messages}, num of partitions: {num_partitions}, @@ -208,8 +207,7 @@ def test_upkeep_delay() -> None: topic name: {topic_name}, dlq topic name: {dlq_topic_name}, delay range: {min_delay} - {max_delay} - """ - ) + """) create_topic(topic_name, num_partitions) diff --git a/integration_tests/integration_tests/test_upkeep_expiry.py b/integration_tests/integration_tests/test_upkeep_expiry.py index 73ea7195..32b80a7a 100644 --- a/integration_tests/integration_tests/test_upkeep_expiry.py +++ b/integration_tests/integration_tests/test_upkeep_expiry.py @@ -172,16 +172,14 @@ def test_upkeep_expiry() -> None: dlq_topic_name = "taskworker-dlq" curr_time = int(time.time()) - print( - f""" + print(f""" Running test with the following configuration: num of messages: {num_messages}, num of partitions: {num_partitions}, max pending count: {max_pending_count}, topic name: {topic_name}, dlq topic name: {dlq_topic_name} - """ - ) + """) create_topic(topic_name, num_partitions) create_topic(dlq_topic_name, 1) diff --git a/integration_tests/integration_tests/test_upkeep_retry.py b/integration_tests/integration_tests/test_upkeep_retry.py index 4f3892d7..b6756ece 100644 --- a/integration_tests/integration_tests/test_upkeep_retry.py +++ b/integration_tests/integration_tests/test_upkeep_retry.py @@ -248,8 +248,7 @@ def test_upkeep_retry() -> None: kafka_deadletter_topic = "taskworker-dlq" curr_time = int(time.time()) - print( - f""" + print(f""" Running test with the following configuration: num of messages: {num_messages}, attempts per task: {retries_per_task}, @@ -257,8 +256,7 @@ def test_upkeep_retry() -> None: num of workers: {num_workers}, max pending count: {max_pending_count}, topic name: {topic_name} - """ - ) + """) create_topic(topic_name, num_partitions) diff --git a/src/main.rs b/src/main.rs index c3648a6d..5888f521 100644 --- a/src/main.rs +++ b/src/main.rs @@ -213,57 +213,62 @@ async fn main() -> Result<(), Error> { (None, None) }; - // GRPC server - let grpc_server_task = tokio::spawn({ - let grpc_store = store.clone(); - let grpc_config = config.clone(); - let grpc_status_tx = status_update_tx.clone(); + // GRPC server - only start if port is configured (port 0 disables it) + let grpc_server_task = if config.grpc_port > 0 { + Some(tokio::spawn({ + let grpc_store = store.clone(); + let grpc_config = config.clone(); + let grpc_status_tx = status_update_tx.clone(); + + async move { + let addr = format!("{}:{}", grpc_config.grpc_addr, grpc_config.grpc_port) + .parse() + .expect("Failed to parse address"); + + let layers = tower::ServiceBuilder::new() + .layer(MetricsLayer::default()) + .layer(AuthLayer::new(&grpc_config)) + .into_inner(); + + let server = Server::builder() + .layer(layers) + .add_service(ConsumerServiceServer::new(TaskbrokerServer { + store: grpc_store, + config: grpc_config, + update_tx: grpc_status_tx, + })) + .add_service(health_service.clone()) + .serve(addr); + + let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop(); + info!("GRPC server listening on {}", addr); + select! { + biased; - async move { - let addr = format!("{}:{}", grpc_config.grpc_addr, grpc_config.grpc_port) - .parse() - .expect("Failed to parse address"); - - let layers = tower::ServiceBuilder::new() - .layer(MetricsLayer::default()) - .layer(AuthLayer::new(&grpc_config)) - .into_inner(); - - let server = Server::builder() - .layer(layers) - .add_service(ConsumerServiceServer::new(TaskbrokerServer { - store: grpc_store, - config: grpc_config, - update_tx: grpc_status_tx, - })) - .add_service(health_service.clone()) - .serve(addr); - - let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop(); - info!("GRPC server listening on {}", addr); - select! { - biased; - - res = server => { - info!("GRPC server task failed, shutting down"); - - // Wait for any running requests to drain - tokio::time::sleep(Duration::from_secs(5)).await; - match res { - Ok(()) => Ok(()), - Err(e) => Err(anyhow!("GRPC server task failed: {:?}", e)), + res = server => { + info!("GRPC server task failed, shutting down"); + + // Wait for any running requests to drain + tokio::time::sleep(Duration::from_secs(5)).await; + match res { + Ok(()) => Ok(()), + Err(e) => Err(anyhow!("GRPC server task failed: {:?}", e)), + } } - } - _ = guard.wait() => { - info!("Cancellation token received, shutting down GRPC server"); + _ = guard.wait() => { + info!("Cancellation token received, shutting down GRPC server"); - // Wait for any running requests to drain - tokio::time::sleep(Duration::from_secs(5)).await; - Ok(()) + // Wait for any running requests to drain + tokio::time::sleep(Duration::from_secs(5)).await; + Ok(()) + } } } - } - }); + })) + } else { + info!("GRPC server disabled (grpc_port=0)"); + None + }; // Initialize push and fetch pools let push_pool = Arc::new(PushPool::new(config.clone(), store.clone())); @@ -289,10 +294,13 @@ async fn main() -> Result<(), Error> { .on_signal(SignalKind::hangup()) .on_signal(SignalKind::quit()) .on_completion(log_task_completion("consumer", consumer_task)) - .on_completion(log_task_completion("grpc_server", grpc_server_task)) .on_completion(log_task_completion("upkeep_task", upkeep_task)) .on_completion(log_task_completion("maintenance_task", maintenance_task)); + if let Some(task) = grpc_server_task { + departure = departure.on_completion(log_task_completion("grpc_server", task)); + } + if let Some(task) = push_task { departure = departure.on_completion(log_task_completion("push_task", task)); }