Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion clients/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dependencies = [
"sentry-protos>=0.15.0",
"confluent_kafka>=2.3.0",
"cronsim>=2.6",
"grpcio>=1.67.0",
"grpcio>=1.67.1",
"grpcio-health-checking>=1.67.1",
"msgpack>=1.0.0",
"orjson>=3.10.10",
"protobuf>=5.28.3",
Expand Down
5 changes: 5 additions & 0 deletions clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
MAX_ACTIVATION_SIZE = 1024 * 1024 * 10
"""Max payload size we will process."""

GRPC_HEALTH_CHECK_METHOD = "/grpc.health.v1.Health/Check"


def make_broker_hosts(
host_prefix: str,
Expand Down Expand Up @@ -199,6 +201,9 @@ def intercept_service(
if handler is None or not self._secrets:
return handler

if handler_call_details.method == GRPC_HEALTH_CHECK_METHOD:
return handler

if handler.request_streaming or handler.response_streaming or handler.unary_unary is None:
return handler

Expand Down
20 changes: 20 additions & 0 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import TYPE_CHECKING, Any, Callable, List

import grpc
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
from sentry_protos.taskbroker.v1 import taskbroker_pb2_grpc
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
FetchNextTask,
Expand Down Expand Up @@ -46,6 +47,8 @@

logger = logging.getLogger(__name__)

WORKER_SERVICE_NAME = "sentry_protos.taskbroker.v1.WorkerService"


class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer):
"""
Expand Down Expand Up @@ -241,6 +244,7 @@ def start(self) -> int:
# Running shutdown() within the signal handler can lead to deadlocks

server: grpc.Server | None = None
health_servicer: health.HealthServicer | None = None

def signal_handler(*args: Any) -> None:
if server:
Expand All @@ -265,8 +269,20 @@ def signal_handler(*args: Any) -> None:
taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server(
WorkerServicer(self.worker_pool), server
)

# The health service is used by the K8s readiness check
health_servicer = health.HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
health_servicer.set("", health_pb2.HealthCheckResponse.NOT_SERVING)
health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.NOT_SERVING)

server.add_insecure_port(f"[::]:{self._grpc_port}")
server.start()

# Indicate that the server is ready
health_servicer.set("", health_pb2.HealthCheckResponse.SERVING)
health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.SERVING)

logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port})

try:
Expand All @@ -276,6 +292,10 @@ def signal_handler(*args: Any) -> None:
pass

finally:
if health_servicer is not None:
health_servicer.set("", health_pb2.HealthCheckResponse.NOT_SERVING)
health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.NOT_SERVING)

if server is not None:
server.stop(grace=5)

Expand Down
14 changes: 14 additions & 0 deletions clients/python/tests/worker/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
from taskbroker_client.metrics import NoOpMetricsBackend
from taskbroker_client.types import ProcessingResult
from taskbroker_client.worker.client import (
GRPC_HEALTH_CHECK_METHOD,
HealthCheckSettings,
HostTemporarilyUnavailable,
RequestSignatureServerInterceptor,
TaskbrokerClient,
grpc_metadata_get,
make_broker_hosts,
Expand Down Expand Up @@ -380,6 +382,18 @@ def test_parse_rpc_secret_list() -> None:
assert parse_rpc_secret_list('["a","b"]') == ["a", "b"]


def test_request_signature_server_interceptor_skips_grpc_health_check() -> None:
handler = grpc.unary_unary_rpc_method_handler(
lambda request, context: None,
request_deserializer=lambda body: body,
response_serializer=lambda response: b"",
)
handler_call_details = Mock(method=GRPC_HEALTH_CHECK_METHOD, invocation_metadata=())
interceptor = RequestSignatureServerInterceptor(["secret"])

assert interceptor.intercept_service(lambda _: handler, handler_call_details) is handler


def test_get_task_with_namespace() -> None:
channel = MockChannel()
channel.add_response(
Expand Down
16 changes: 15 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading