Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@ jobs:

- name: Run pre-commit
run: |
jq '.[]' --raw-output <<< '${{steps.changes.outputs.all_files}}' |
xargs pre-commit run --files
if [ "${{ github.event_name }}" = "push" ]; then
# On main branch, run on all files to catch any formatting drift
pre-commit run --all-files
else
# On PRs, run only on changed files
jq '.[]' --raw-output <<< '${{steps.changes.outputs.all_files}}' |
xargs pre-commit run --files
fi

python-client-lint:
name: Python Client Lint
Expand Down
4 changes: 2 additions & 2 deletions devenv/sync.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import shutil

from devenv import constants
from devenv.lib import brew, config, proc, uv

import shutil


def main(context: dict[str, str]) -> int:
reporoot = context["reporoot"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
TESTS_OUTPUT_ROOT,
TaskbrokerConfig,
create_topic,
get_available_ports,
send_generic_messages_to_topic,
)

Expand Down Expand Up @@ -86,11 +85,9 @@ def test_tasks_written_once_during_rebalancing() -> None:
max_pending_count = 15_000
topic_name = "taskworker"
kafka_deadletter_topic = "taskworker-dlq"
grpc_ports = get_available_ports(num_consumers)
curr_time = int(time.time())

print(
f"""
print(f"""
Running test with the following configuration:
num of consumers: {num_consumers},
num of messages: {num_messages},
Expand All @@ -99,10 +96,8 @@ def test_tasks_written_once_during_rebalancing() -> None:
min restart duration: {min_restart_duration} seconds,
max restart duration: {max_restart_duration} seconds,
topic name: {topic_name},
grpc ports: {grpc_ports},
random seed value: 42
"""
)
""")
random.seed(42)

# Ensure topic exists and has correct number of partitions
Expand All @@ -123,7 +118,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
kafka_deadletter_topic=kafka_deadletter_topic,
kafka_consumer_group=topic_name,
kafka_auto_offset_reset="earliest",
grpc_port=grpc_ports[i],
grpc_port=0, # Disable gRPC - not needed for this test
)

for filename, config in taskbroker_configs.items():
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/integration_tests/test_failed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,16 +254,14 @@ def test_failed_tasks() -> None:
kafka_deadletter_topic = "taskworker-dlq"
curr_time = int(time.time())

print(
f"""
print(f"""
Running test with the following configuration:
num of messages: {num_messages},
num of partitions: {num_partitions},
num of workers: {num_workers},
max pending count: {max_pending_count},
topic name: {topic_name}
"""
)
""")

create_topic(topic_name, num_partitions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,14 @@ def test_task_worker_processing() -> None:
kafka_deadletter_topic = "taskworker-dlq"
curr_time = int(time.time())

print(
f"""
print(f"""
Running test with the following configuration:
num of messages: {num_messages},
num of partitions: {num_partitions},
num of workers: {num_workers},
max pending count: {max_pending_count},
topic name: {topic_name}
"""
)
""")

create_topic(topic_name, num_partitions)

Expand Down
6 changes: 2 additions & 4 deletions integration_tests/integration_tests/test_upkeep_delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,15 @@ def test_upkeep_delay() -> None:
min_delay = 20
max_delay = 40

print(
f"""
print(f"""
Running test with the following configuration:
num of messages: {num_messages},
num of partitions: {num_partitions},
max pending count: {max_pending_count},
topic name: {topic_name},
dlq topic name: {dlq_topic_name},
delay range: {min_delay} - {max_delay}
"""
)
""")

create_topic(topic_name, num_partitions)

Expand Down
6 changes: 2 additions & 4 deletions integration_tests/integration_tests/test_upkeep_expiry.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,14 @@ def test_upkeep_expiry() -> None:
dlq_topic_name = "taskworker-dlq"
curr_time = int(time.time())

print(
f"""
print(f"""
Running test with the following configuration:
num of messages: {num_messages},
num of partitions: {num_partitions},
max pending count: {max_pending_count},
topic name: {topic_name},
dlq topic name: {dlq_topic_name}
"""
)
""")

create_topic(topic_name, num_partitions)
create_topic(dlq_topic_name, 1)
Expand Down
6 changes: 2 additions & 4 deletions integration_tests/integration_tests/test_upkeep_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,15 @@ def test_upkeep_retry() -> None:
kafka_deadletter_topic = "taskworker-dlq"
curr_time = int(time.time())

print(
f"""
print(f"""
Running test with the following configuration:
num of messages: {num_messages},
attempts per task: {retries_per_task},
num of partitions: {num_partitions},
num of workers: {num_workers},
max pending count: {max_pending_count},
topic name: {topic_name}
"""
)
""")

create_topic(topic_name, num_partitions)

Expand Down
102 changes: 55 additions & 47 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,57 +213,62 @@ async fn main() -> Result<(), Error> {
(None, None)
};

// GRPC server
let grpc_server_task = tokio::spawn({
let grpc_store = store.clone();
let grpc_config = config.clone();
let grpc_status_tx = status_update_tx.clone();
// GRPC server - only start if port is configured (port 0 disables it)
let grpc_server_task = if config.grpc_port > 0 {
Some(tokio::spawn({
let grpc_store = store.clone();
let grpc_config = config.clone();
let grpc_status_tx = status_update_tx.clone();

async move {
let addr = format!("{}:{}", grpc_config.grpc_addr, grpc_config.grpc_port)
.parse()
.expect("Failed to parse address");

let layers = tower::ServiceBuilder::new()
.layer(MetricsLayer::default())
.layer(AuthLayer::new(&grpc_config))
.into_inner();

let server = Server::builder()
.layer(layers)
.add_service(ConsumerServiceServer::new(TaskbrokerServer {
store: grpc_store,
config: grpc_config,
update_tx: grpc_status_tx,
}))
.add_service(health_service.clone())
.serve(addr);

let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop();
info!("GRPC server listening on {}", addr);
select! {
biased;

async move {
let addr = format!("{}:{}", grpc_config.grpc_addr, grpc_config.grpc_port)
.parse()
.expect("Failed to parse address");

let layers = tower::ServiceBuilder::new()
.layer(MetricsLayer::default())
.layer(AuthLayer::new(&grpc_config))
.into_inner();

let server = Server::builder()
.layer(layers)
.add_service(ConsumerServiceServer::new(TaskbrokerServer {
store: grpc_store,
config: grpc_config,
update_tx: grpc_status_tx,
}))
.add_service(health_service.clone())
.serve(addr);

let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop();
info!("GRPC server listening on {}", addr);
select! {
biased;

res = server => {
info!("GRPC server task failed, shutting down");

// Wait for any running requests to drain
tokio::time::sleep(Duration::from_secs(5)).await;
match res {
Ok(()) => Ok(()),
Err(e) => Err(anyhow!("GRPC server task failed: {:?}", e)),
res = server => {
info!("GRPC server task failed, shutting down");

// Wait for any running requests to drain
tokio::time::sleep(Duration::from_secs(5)).await;
match res {
Ok(()) => Ok(()),
Err(e) => Err(anyhow!("GRPC server task failed: {:?}", e)),
}
}
}
_ = guard.wait() => {
info!("Cancellation token received, shutting down GRPC server");
_ = guard.wait() => {
info!("Cancellation token received, shutting down GRPC server");

// Wait for any running requests to drain
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
// Wait for any running requests to drain
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
}
}
}
});
}))
} else {
info!("GRPC server disabled (grpc_port=0)");
None
};

// Initialize push and fetch pools
let push_pool = Arc::new(PushPool::new(config.clone(), store.clone()));
Expand All @@ -289,10 +294,13 @@ async fn main() -> Result<(), Error> {
.on_signal(SignalKind::hangup())
.on_signal(SignalKind::quit())
.on_completion(log_task_completion("consumer", consumer_task))
.on_completion(log_task_completion("grpc_server", grpc_server_task))
.on_completion(log_task_completion("upkeep_task", upkeep_task))
.on_completion(log_task_completion("maintenance_task", maintenance_task));

if let Some(task) = grpc_server_task {
departure = departure.on_completion(log_task_completion("grpc_server", task));
}

if let Some(task) = push_task {
departure = departure.on_completion(log_task_completion("push_task", task));
}
Expand Down
Loading