diff --git a/clients/python/pyproject.toml b/clients/python/pyproject.toml index ce91e320..bf51ac7a 100644 --- a/clients/python/pyproject.toml +++ b/clients/python/pyproject.toml @@ -9,7 +9,8 @@ dependencies = [ "sentry-protos>=0.15.0", "confluent_kafka>=2.3.0", "cronsim>=2.6", - "grpcio>=1.67.0", + "grpcio>=1.67.1", + "grpcio-health-checking>=1.67.1", "msgpack>=1.0.0", "orjson>=3.10.10", "protobuf>=5.28.3", diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 6b9e1ae0..14d2112e 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -41,6 +41,8 @@ MAX_ACTIVATION_SIZE = 1024 * 1024 * 10 """Max payload size we will process.""" +GRPC_HEALTH_CHECK_METHOD = "/grpc.health.v1.Health/Check" + def make_broker_hosts( host_prefix: str, @@ -199,6 +201,9 @@ def intercept_service( if handler is None or not self._secrets: return handler + if handler_call_details.method == GRPC_HEALTH_CHECK_METHOD: + return handler + if handler.request_streaming or handler.response_streaming or handler.unary_unary is None: return handler diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 12360032..86327dec 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Callable, List import grpc +from grpc_health.v1 import health, health_pb2, health_pb2_grpc from sentry_protos.taskbroker.v1 import taskbroker_pb2_grpc from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( FetchNextTask, @@ -46,6 +47,8 @@ logger = logging.getLogger(__name__) +WORKER_SERVICE_NAME = "sentry_protos.taskbroker.v1.WorkerService" + class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): """ @@ -241,6 +244,7 @@ def start(self) -> int: # Running shutdown() within the signal handler can lead to deadlocks server: grpc.Server | None = None + health_servicer: health.HealthServicer | None = None def signal_handler(*args: Any) -> None: if server: @@ -265,8 +269,20 @@ def signal_handler(*args: Any) -> None: taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( WorkerServicer(self.worker_pool), server ) + + # The health service is used by the K8s readiness check + health_servicer = health.HealthServicer() + health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server) + health_servicer.set("", health_pb2.HealthCheckResponse.NOT_SERVING) + health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.NOT_SERVING) + server.add_insecure_port(f"[::]:{self._grpc_port}") server.start() + + # Indicate that the server is ready + health_servicer.set("", health_pb2.HealthCheckResponse.SERVING) + health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.SERVING) + logger.info("taskworker.grpc_server.started", extra={"port": self._grpc_port}) try: @@ -276,6 +292,10 @@ def signal_handler(*args: Any) -> None: pass finally: + if health_servicer is not None: + health_servicer.set("", health_pb2.HealthCheckResponse.NOT_SERVING) + health_servicer.set(WORKER_SERVICE_NAME, health_pb2.HealthCheckResponse.NOT_SERVING) + if server is not None: server.stop(grace=5) diff --git a/clients/python/tests/worker/test_client.py b/clients/python/tests/worker/test_client.py index f2509cd2..33d9083a 100644 --- a/clients/python/tests/worker/test_client.py +++ b/clients/python/tests/worker/test_client.py @@ -29,8 +29,10 @@ from taskbroker_client.metrics import NoOpMetricsBackend from taskbroker_client.types import ProcessingResult from taskbroker_client.worker.client import ( + GRPC_HEALTH_CHECK_METHOD, HealthCheckSettings, HostTemporarilyUnavailable, + RequestSignatureServerInterceptor, TaskbrokerClient, grpc_metadata_get, make_broker_hosts, @@ -380,6 +382,18 @@ def test_parse_rpc_secret_list() -> None: assert parse_rpc_secret_list('["a","b"]') == ["a", "b"] +def test_request_signature_server_interceptor_skips_grpc_health_check() -> None: + handler = grpc.unary_unary_rpc_method_handler( + lambda request, context: None, + request_deserializer=lambda body: body, + response_serializer=lambda response: b"", + ) + handler_call_details = Mock(method=GRPC_HEALTH_CHECK_METHOD, invocation_metadata=()) + interceptor = RequestSignatureServerInterceptor(["secret"]) + + assert interceptor.intercept_service(lambda _: handler, handler_call_details) is handler + + def test_get_task_with_namespace() -> None: channel = MockChannel() channel.add_response( diff --git a/uv.lock b/uv.lock index bc7f4966..702c5651 100644 --- a/uv.lock +++ b/uv.lock @@ -219,6 +219,18 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/grpcio-1.75.1-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:07a554fa31c668cf0e7a188678ceeca3cb8fead29bbe455352e712ec33ca701c" }, ] +[[package]] +name = "grpcio-health-checking" +version = "1.67.1" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/grpcio_health_checking-1.67.1-py3-none-any.whl", hash = "sha256:93753da5062152660aef2286c9b261e07dd87124a65e4dc9fbd47d1ce966b39d" }, +] + [[package]] name = "h11" version = "0.16.0" @@ -709,6 +721,7 @@ dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "cronsim", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "grpcio-health-checking", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "msgpack", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "orjson", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "protobuf", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -746,7 +759,8 @@ requires-dist = [ { name = "click", marker = "extra == 'examples'", specifier = ">=8.3" }, { name = "confluent-kafka", specifier = ">=2.3.0" }, { name = "cronsim", specifier = ">=2.6" }, - { name = "grpcio", specifier = ">=1.67.0" }, + { name = "grpcio", specifier = ">=1.67.1" }, + { name = "grpcio-health-checking", specifier = ">=1.67.1" }, { name = "msgpack", specifier = ">=1.0.0" }, { name = "orjson", specifier = ">=3.10.10" }, { name = "protobuf", specifier = ">=5.28.3" },