From 79b43a32097a7c4fa72f103fa19177491ad537b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 22 Jun 2026 21:54:02 -0700 Subject: [PATCH 1/6] fix: revert JobsProgress to in-memory set, add PingJobMirror JobsProgress persisted in-progress jobs to .runpod_jobs.pkl under os.getcwd(); on endpoints with a network volume every worker shared one file, so occupancy accounting cross-contaminated and jobs funneled onto a single worker (#432). Restore the 1.7.10 in-memory set and feed the separate ping process via a per-worker shared-memory mirror instead. Refs SLS-314, fixes runpod/runpod-python#432 --- requirements.txt | 1 - runpod/serverless/modules/worker_state.py | 141 ++++--- .../test_integration_worker_state.py | 349 ++++++------------ .../test_modules/test_state.py | 83 ++--- 4 files changed, 204 insertions(+), 370 deletions(-) diff --git a/requirements.txt b/requirements.txt index 60674a47..4f30655e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,6 @@ click >= 8.1.7 colorama >= 0.4.6, < 0.4.7 cryptography >= 48.0.1 fastapi[all] >= 0.94.0 -filelock >= 3.19.1 paramiko >= 3.3.1 prettytable >= 3.16.0 psutil >= 5.9.0 diff --git a/runpod/serverless/modules/worker_state.py b/runpod/serverless/modules/worker_state.py index b546ce02..41d730e6 100644 --- a/runpod/serverless/modules/worker_state.py +++ b/runpod/serverless/modules/worker_state.py @@ -2,15 +2,12 @@ Handles getting stuff from environment variables and updating the global state like job id. """ +import multiprocessing import os import time import uuid -import pickle -import tempfile from typing import Any, Dict, Optional, Set -from filelock import FileLock - from .rp_logger import RunPodLogger @@ -20,6 +17,8 @@ WORKER_ID = os.environ.get("RUNPOD_POD_ID", str(uuid.uuid4())) +PING_MIRROR_CAPACITY = 65536 # bytes; ample headroom for a job-id snapshot + # ----------------------------------- Flags ---------------------------------- # IS_LOCAL_TEST = os.environ.get("RUNPOD_WEBHOOK_GET_JOB", None) is None @@ -63,87 +62,32 @@ def __str__(self) -> str: # ---------------------------------------------------------------------------- # -# Tracker # +# Tracker # # ---------------------------------------------------------------------------- # class JobsProgress(Set[Job]): - """Track the state of current jobs in progress with persistent state.""" + """Track the state of current jobs in progress (in-memory, per process).""" _instance = None - _STATE_DIR = os.getcwd() - _STATE_FILE = os.path.join(_STATE_DIR, ".runpod_jobs.pkl") def __new__(cls): if JobsProgress._instance is None: - os.makedirs(cls._STATE_DIR, exist_ok=True) JobsProgress._instance = set.__new__(cls) - # Initialize as empty set before loading state set.__init__(JobsProgress._instance) - JobsProgress._instance._load_state() return JobsProgress._instance def __init__(self): - # This should never clear data in a singleton - # Don't call parent __init__ as it would clear the set + # Singleton: never re-initialize, it would clear the set. pass - + def __repr__(self) -> str: return f"<{self.__class__.__name__}>: {self.get_job_list()}" - def _load_state(self): - """Load jobs state from pickle file with file locking.""" - try: - if ( - os.path.exists(self._STATE_FILE) - and os.path.getsize(self._STATE_FILE) > 0 - ): - with FileLock(self._STATE_FILE + '.lock'): - with open(self._STATE_FILE, "rb") as f: - try: - loaded_jobs = pickle.load(f) - # Clear current state and add loaded jobs - super().clear() - for job in loaded_jobs: - set.add( - self, job - ) # Use set.add to avoid triggering _save_state - - except (EOFError, pickle.UnpicklingError): - # Handle empty or corrupted file - log.debug( - "JobsProgress: Failed to load state file, starting with empty state" - ) - pass - - except FileNotFoundError: - log.debug("JobsProgress: No state file found, starting with empty state") - pass - - def _save_state(self): - """Save jobs state to pickle file with atomic write and file locking.""" - try: - # Use temporary file for atomic write - with FileLock(self._STATE_FILE + '.lock'): - with tempfile.NamedTemporaryFile( - dir=self._STATE_DIR, delete=False, mode="wb" - ) as temp_f: - pickle.dump(set(self), temp_f) - - # Atomically replace the state file - os.replace(temp_f.name, self._STATE_FILE) - except Exception as e: - log.error(f"Failed to save job state: {e}") - - def clear(self) -> None: - super().clear() - self._save_state() - def add(self, element: Any): """ Adds a Job object to the set. - If the added element is a string, then `Job(id=element)` is added - - If the added element is a dict, that `Job(**element)` is added + If the added element is a string, then `Job(id=element)` is added. + If the added element is a dict, then `Job(**element)` is added. """ if isinstance(element, str): element = Job(id=element) @@ -154,17 +98,14 @@ def add(self, element: Any): if not isinstance(element, Job): raise TypeError("Only Job objects can be added to JobsProgress.") - result = super().add(element) - self._save_state() - return result + return super().add(element) def remove(self, element: Any): """ Removes a Job object from the set. - If the element is a string, then `Job(id=element)` is removed - - If the element is a dict, then `Job(**element)` is removed + If the element is a string, then `Job(id=element)` is removed. + If the element is a dict, then `Job(**element)` is removed. """ if isinstance(element, str): element = Job(id=element) @@ -175,9 +116,7 @@ def remove(self, element: Any): if not isinstance(element, Job): raise TypeError("Only Job objects can be removed from JobsProgress.") - result = super().discard(element) - self._save_state() - return result + return super().discard(element) def get(self, element: Any) -> Optional[Job]: if isinstance(element, str): @@ -193,10 +132,8 @@ def get(self, element: Any) -> Optional[Job]: def get_job_list(self) -> Optional[str]: """ - Returns the list of job IDs as comma-separated string. + Returns the list of job IDs as a comma-separated string, or None if empty. """ - self._load_state() - if not len(self): return None @@ -207,3 +144,53 @@ def get_job_count(self) -> int: Returns the number of jobs. """ return len(self) + + +# ---------------------------------------------------------------------------- # +# Ping Job Mirror # +# ---------------------------------------------------------------------------- # +class PingJobMirror: + """ + One-way snapshot of in-progress job ids from the worker (main) process to + the separate ping process. + + Backed by a fixed-size shared-memory buffer created in the main process and + passed to the ping process via ``Process(args=...)``. It lives only in this + worker's own process tree, so it cannot be shared across workers and never + touches the filesystem. All operations are best-effort and never raise into + the caller (a failure here must not break job processing or kill the ping). + """ + + def __init__(self, capacity: int = PING_MIRROR_CAPACITY, ctx=None): + ctx = ctx or multiprocessing + self._capacity = capacity + self._buffer = ctx.Array("c", capacity) # SynchronizedString with .get_lock() + + def set(self, job_ids: Optional[str]) -> None: + """Write the current job-id snapshot. Best-effort; never raises.""" + try: + data = (job_ids or "").encode("utf-8") + limit = self._capacity - 1 # reserve a byte for the NUL terminator + if len(data) > limit: + data = data[:limit] + cut = data.rfind(b",") + if cut != -1: + data = data[:cut] + log.warn( + f"PingJobMirror: job-id snapshot exceeded {limit} bytes; truncated" + ) + with self._buffer.get_lock(): + self._buffer.value = data + except Exception as err: # never break job processing + log.error(f"PingJobMirror.set failed: {err}") + + def get(self) -> Optional[str]: + """Read the current job-id snapshot. Best-effort; never raises.""" + try: + with self._buffer.get_lock(): + data = self._buffer.value + text = data.decode("utf-8") + return text or None + except Exception as err: # never kill the ping loop + log.debug(f"PingJobMirror.get failed: {err}") + return None diff --git a/tests/test_serverless/test_integration_worker_state.py b/tests/test_serverless/test_integration_worker_state.py index 11a7cd53..62c5f9f2 100644 --- a/tests/test_serverless/test_integration_worker_state.py +++ b/tests/test_serverless/test_integration_worker_state.py @@ -1,249 +1,118 @@ """ -Integration test for worker state persistence between job_scaler and heartbeat. -This test mimics the runpod.serverless.worker.run_worker path. +Unit tests for the in-memory JobsProgress tracker and the cross-process +PingJobMirror. Regression coverage for issue #432 (workers must not share +job state via a file on a network volume). """ -import asyncio -import os -import tempfile -from unittest.mock import AsyncMock, MagicMock, patch +import multiprocessing + import pytest -from runpod.serverless.modules.rp_ping import Heartbeat -from runpod.serverless.modules.rp_scale import JobScaler -from runpod.serverless.modules.worker_state import JobsProgress +from runpod.serverless.modules.worker_state import ( + JobsProgress, + PingJobMirror, + PING_MIRROR_CAPACITY, +) + + +def _reset_singleton(): + JobsProgress._instance = None -class TestWorkerStateIntegration: - """Test the integration between JobScaler and Heartbeat for state persistence.""" - +class TestJobsProgressInMemory: def setup_method(self): - """Setup test environment.""" - # Clear any existing singleton instance - JobsProgress._instance = None - - # Create a temporary directory for state files - self.temp_dir = tempfile.mkdtemp() - - # Mock environment variables for testing - self.env_patcher = patch.dict(os.environ, { - 'RUNPOD_AI_API_KEY': 'test_key', - 'RUNPOD_POD_ID': 'test_pod_id', - 'RUNPOD_WEBHOOK_PING': 'http://test.com/ping', - 'RUNPOD_PING_INTERVAL': '5000' - }) - self.env_patcher.start() - - # Mock the state directory to use our temp directory - self.state_dir_patcher = patch.object(JobsProgress, '_STATE_DIR', self.temp_dir) - self.state_file_patcher = patch.object(JobsProgress, '_STATE_FILE', - os.path.join(self.temp_dir, '.runpod_jobs.pkl')) - self.state_dir_patcher.start() - self.state_file_patcher.start() - + _reset_singleton() + def teardown_method(self): - """Cleanup test environment.""" - self.env_patcher.stop() - self.state_dir_patcher.stop() - self.state_file_patcher.stop() - - # Clean up temp directory - import shutil - shutil.rmtree(self.temp_dir, ignore_errors=True) - - # Reset singleton - JobsProgress._instance = None - - def test_jobs_progress_singleton_persistence(self): - """Test that JobsProgress maintains singleton behavior across processes.""" - jobs1 = JobsProgress() - jobs2 = JobsProgress() - - assert jobs1 is jobs2 - - # Add a job and verify it's visible in both instances - jobs1.add("test_job_1") - assert "test_job_1" in jobs2.get_job_list() - - def test_file_based_state_persistence(self): - """Test that job state persists to file and can be loaded.""" - # Create initial instance and add jobs - jobs1 = JobsProgress() - jobs1.add("job_1") - jobs1.add("job_2") - - # Verify state is saved to file - assert os.path.exists(jobs1._STATE_FILE) - - # Reset singleton to simulate new process - JobsProgress._instance = None - - # Create new instance and verify state is loaded - jobs2 = JobsProgress() - job_list = jobs2.get_job_list() - - assert "job_1" in job_list - assert "job_2" in job_list - - def test_jobs_progress_add_and_remove_jobs(self): - """Test JobsProgress job tracking functionality.""" - # Reset JobsProgress singleton - JobsProgress._instance = None - - # Get JobsProgress instance - jobs_progress = JobsProgress() - - # Test adding jobs - test_jobs = [ - {"id": "job_1", "input": {"test": "data1"}}, - {"id": "job_2", "input": {"test": "data2"}} - ] - - # Add jobs - for job in test_jobs: - jobs_progress.add(job) - - # Verify initial state - assert len(jobs_progress) == 2 - job_list = jobs_progress.get_job_list() - assert job_list is not None - assert "job_1" in job_list - assert "job_2" in job_list - - # Test removing jobs - jobs_progress.remove(test_jobs[0]) - - # Verify removal - assert len(jobs_progress) == 1 - - # Get remaining job - remaining_list = jobs_progress.get_job_list() - assert remaining_list is not None - assert "job_1" not in remaining_list - assert "job_2" in remaining_list - - # Test clearing jobs - jobs_progress.clear() - - # Verify clearing - assert len(jobs_progress) == 0 - assert jobs_progress.get_job_list() is None - - def test_heartbeat_reads_job_progress(self): - """Test that Heartbeat can read jobs from JobsProgress.""" - # Add jobs to progress - jobs_progress = JobsProgress() - jobs_progress.add("job_1") - jobs_progress.add("job_2") - - # Create heartbeat instance - heartbeat = Heartbeat() - - # Mock the session.get method to capture the ping parameters - with patch.object(heartbeat, '_session') as mock_session: - mock_response = MagicMock() - mock_response.url = "http://test.com/ping" - mock_response.status_code = 200 - mock_session.get.return_value = mock_response - - # Send a ping - heartbeat._send_ping() - - # Verify the ping was sent with job_ids - mock_session.get.assert_called_once() - call_args = mock_session.get.call_args - - # Check that job_id parameter contains our jobs - params = call_args[1]['params'] - assert 'job_id' in params - job_ids = params['job_id'] - assert "job_1" in job_ids - assert "job_2" in job_ids - - def test_multiprocess_heartbeat_state_access(self): - """Test that heartbeat process can access job state from main process.""" - # Add jobs in main process - main_jobs = JobsProgress() - main_jobs.add("main_job_1") - main_jobs.add("main_job_2") - - # Simulate what happens in the heartbeat process - # The process_loop creates a new Heartbeat instance - heartbeat = Heartbeat() - - # Mock the session to capture ping data - with patch.object(heartbeat, '_session') as mock_session: - mock_response = MagicMock() - mock_response.url = "http://test.com/ping" - mock_response.status_code = 200 - mock_session.get.return_value = mock_response - - # Send ping - this should read from the persisted state - heartbeat._send_ping() - - # Verify the ping includes jobs from main process - call_args = mock_session.get.call_args - params = call_args[1]['params'] - job_ids = params['job_id'] - - assert "main_job_1" in job_ids - assert "main_job_2" in job_ids - - @pytest.mark.asyncio - async def test_end_to_end_job_lifecycle(self): - """Test complete job lifecycle: add -> process -> remove -> ping.""" - # Mock job data - test_jobs = [{"id": "lifecycle_job", "input": {"test": "data"}}] - - async def mock_jobs_fetcher(session, count): - return test_jobs[:count] - - async def mock_job_handler(session, config, job): - await asyncio.sleep(0.1) # Simulate processing - - config = { - "handler": lambda x: x, - "jobs_fetcher": mock_jobs_fetcher, - "jobs_handler": mock_job_handler, - "jobs_fetcher_timeout": 1 - } - - # Create instances - job_scaler = JobScaler(config) - heartbeat = Heartbeat() - jobs_progress = JobsProgress() - - # Track ping calls - ping_calls = [] - - def capture_ping(*args, **kwargs): - job_ids = kwargs.get('params', {}).get('job_id', '') - ping_calls.append(job_ids) - mock_response = MagicMock() - mock_response.url = "http://test.com/ping" - mock_response.status_code = 200 - return mock_response - - with patch.object(heartbeat, '_session') as mock_session: - mock_session.get.side_effect = capture_ping - - # Start job processing - session = AsyncMock() - - # Add job - await job_scaler.jobs_queue.put(test_jobs[0]) - jobs_progress.add(test_jobs[0]["id"]) - - # Send ping with job active - heartbeat._send_ping() - - # Process job (this should remove it from progress) - await job_scaler.handle_job(session, test_jobs[0]) - - # Send ping after job completion - heartbeat._send_ping() - - # Verify ping behavior - assert len(ping_calls) == 2 - assert "lifecycle_job" in ping_calls[0] # Job was active - assert ping_calls[1] is None # Job completed, no active jobs \ No newline at end of file + _reset_singleton() + + def test_singleton_identity(self): + assert JobsProgress() is JobsProgress() + + def test_add_remove_clear_and_list(self): + jp = JobsProgress() + jp.add({"id": "job_1", "input": {"a": 1}}) + jp.add("job_2") + + assert jp.get_job_count() == 2 + job_list = jp.get_job_list() + assert "job_1" in job_list and "job_2" in job_list + + jp.remove("job_1") + assert jp.get_job_count() == 1 + assert "job_1" not in jp.get_job_list() + + jp.clear() + assert jp.get_job_count() == 0 + assert jp.get_job_list() is None + + def test_writes_nothing_to_disk(self, tmp_path, monkeypatch): + # Regression: JobsProgress must not persist to cwd (network-volume bug). + monkeypatch.chdir(tmp_path) + jp = JobsProgress() + jp.add("job_1") + jp.remove("job_1") + jp.add({"id": "job_2"}) + assert list(tmp_path.iterdir()) == [] + + def test_two_workers_do_not_share_jobs(self, tmp_path, monkeypatch): + # Regression: a second worker (new singleton, same cwd) sees nothing. + monkeypatch.chdir(tmp_path) + worker_a = JobsProgress() + worker_a.add("a_job") + + _reset_singleton() + worker_b = JobsProgress() + assert worker_b.get_job_count() == 0 + assert worker_b.get_job_list() is None + + +class TestPingJobMirror: + def test_set_get_roundtrip(self): + mirror = PingJobMirror() + mirror.set("job_a,job_b") + assert mirror.get() == "job_a,job_b" + + def test_empty_is_none(self): + mirror = PingJobMirror() + assert mirror.get() is None + mirror.set(None) + assert mirror.get() is None + mirror.set("") + assert mirror.get() is None + + def test_overflow_truncates_without_raising(self): + mirror = PingJobMirror(capacity=32) + # 10 ten-char ids joined by commas far exceeds 32 bytes. + ids = ",".join(f"id{n:08d}" for n in range(10)) + mirror.set(ids) # must not raise + stored = mirror.get() + assert stored is not None + assert len(stored.encode("utf-8")) <= 32 + # Truncation happens at a comma boundary: no partial id. + assert not stored.endswith(",") + + +def _child_read(mirror, queue): + queue.put(mirror.get()) + + +@pytest.mark.parametrize("method", ["fork", "spawn"]) +def test_mirror_roundtrip_across_process(method): + if method not in multiprocessing.get_all_start_methods(): + pytest.skip(f"start method {method} not available on this platform") + ctx = multiprocessing.get_context(method) + mirror = PingJobMirror(ctx=ctx) + mirror.set("job_a,job_b") + + queue = ctx.Queue() + proc = ctx.Process(target=_child_read, args=(mirror, queue)) + proc.start() + result = queue.get(timeout=15) + proc.join(timeout=15) + + assert result == "job_a,job_b" + + +def test_capacity_default(): + assert PING_MIRROR_CAPACITY == 65536 diff --git a/tests/test_serverless/test_modules/test_state.py b/tests/test_serverless/test_modules/test_state.py index 94772bde..4f13a301 100644 --- a/tests/test_serverless/test_modules/test_state.py +++ b/tests/test_serverless/test_modules/test_state.py @@ -160,66 +160,45 @@ async def test_get_job_count(self): # test job count contention when adding and removing jobs in parallel pass - async def test_state_persistence(self): - """Test state persistence across multiple JobsProgress instances""" - # First instance: add some jobs + async def test_state_not_shared_across_instances(self): + """Job state is in-memory and per-process: a fresh instance (a new + worker) must not inherit another instance's jobs. Regression guard for + #432, where shared-file persistence funneled all jobs to one worker.""" jobs1 = JobsProgress() jobs1.clear() # Ensure clean state - - job1 = {"id": "test_persistent_1"} - job2 = {"id": "test_persistent_2"} - - jobs1.add(job1) - jobs1.add(job2) - - # Reset singleton to simulate process restart + + jobs1.add({"id": "test_persistent_1"}) + jobs1.add({"id": "test_persistent_2"}) + + # Reset singleton to simulate a separate worker process. JobsProgress._instance = None jobs2 = JobsProgress() - - # Debug: check jobs2 right after creation - print(f"DEBUG: jobs2 length right after creation: {len(jobs2)}") - print(f"DEBUG: jobs2 contents right after creation: {list(jobs2)}") - - # Check that jobs were persisted - assert jobs2.get_job_count() == 2, "Jobs should be persisted across instances" - - # Verify specific jobs are present - assert jobs2.get("test_persistent_1") is not None, "First job should be retrievable" - assert jobs2.get("test_persistent_2") is not None, "Second job should be retrievable" - async def test_state_persistence_empty(self): - """Test state persistence when no jobs are present""" - # Clear any existing state + # The new instance starts empty; nothing is persisted across instances. + assert jobs2.get_job_count() == 0, "Jobs must not persist across instances" + assert jobs2.get_job_list() is None, "Job list must be None for a fresh instance" + + async def test_fresh_instance_is_empty(self): + """A fresh JobsProgress instance has no jobs.""" jobs1 = JobsProgress() jobs1.clear() - - # Reset singleton to simulate process restart + + # Reset singleton to simulate a separate worker process. JobsProgress._instance = None jobs2 = JobsProgress() - - # Check that no jobs are present - assert jobs2.get_job_count() == 0, "No jobs should be present after clear" + + assert jobs2.get_job_count() == 0, "No jobs should be present" assert jobs2.get_job_list() is None, "Job list should be None when empty" - async def test_file_persistence_after_clear(self): - """Verify that clearing the jobs results in an empty persistent state""" - # Add some jobs - jobs1 = JobsProgress() - jobs1.clear() # Ensure clean state - - job1 = {"id": "to_be_cleared_1"} - job2 = {"id": "to_be_cleared_2"} - - jobs1.add(job1) - jobs1.add(job2) - - # Clear the jobs - jobs1.clear() - - # Reset singleton to simulate process restart - JobsProgress._instance = None - jobs2 = JobsProgress() - - # Verify that no jobs remain - assert jobs2.get_job_count() == 0, "Jobs should be cleared in persistent state" - assert jobs2.get_job_list() is None, "Job list should be None after clear" \ No newline at end of file + async def test_clear_empties_the_set(self): + """Clearing removes all tracked jobs.""" + jobs = JobsProgress() + jobs.clear() + + jobs.add({"id": "to_be_cleared_1"}) + jobs.add({"id": "to_be_cleared_2"}) + + jobs.clear() + + assert jobs.get_job_count() == 0, "Jobs should be cleared" + assert jobs.get_job_list() is None, "Job list should be None after clear" \ No newline at end of file From b4800f95fcab0b2d72148bfeedab8b98ca8e2ecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 22 Jun 2026 21:57:10 -0700 Subject: [PATCH 2/6] fix: ping reads in-progress job ids from injected mirror The ping process no longer touches JobsProgress; it reads the job-id snapshot from the per-worker PingJobMirror passed in at process start. Refs SLS-314 --- runpod/serverless/modules/rp_ping.py | 15 +++-- .../test_serverless/test_modules/test_ping.py | 61 +++++++++---------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/runpod/serverless/modules/rp_ping.py b/runpod/serverless/modules/rp_ping.py index 838030b2..1d1c9a44 100644 --- a/runpod/serverless/modules/rp_ping.py +++ b/runpod/serverless/modules/rp_ping.py @@ -11,7 +11,7 @@ from runpod.http_client import SyncClientSession from runpod.serverless.modules.rp_logger import RunPodLogger -from runpod.serverless.modules.worker_state import WORKER_ID, JobsProgress +from runpod.serverless.modules.worker_state import WORKER_ID from runpod.version import __version__ as runpod_version log = RunPodLogger() @@ -30,6 +30,9 @@ def __init__(self, pool_connections=10, retries=3) -> None: self.PING_URL = self.PING_URL.replace("$RUNPOD_POD_ID", WORKER_ID) self.PING_INTERVAL = int(os.environ.get("RUNPOD_PING_INTERVAL", 10000)) // 1000 + # In-progress job-id snapshot, injected by the main process at start. + self._mirror = None + # Create a new HTTP session self._session = SyncClientSession() self._session.headers.update( @@ -52,15 +55,16 @@ def __init__(self, pool_connections=10, retries=3) -> None: self._session.mount("https://", adapter) @staticmethod - def process_loop(test=False): + def process_loop(mirror=None, test=False): """ Static helper to run the ping loop in a separate process. Creates a new Heartbeat instance to avoid pickling issues. """ hb = Heartbeat() + hb._mirror = mirror hb.ping_loop(test) - def start_ping(self, test=False): + def start_ping(self, mirror=None, test=False): """ Sends heartbeat pings to the Runpod server in a separate process. """ @@ -77,7 +81,7 @@ def start_ping(self, test=False): return if not Heartbeat._process_started: - process = Process(target=Heartbeat.process_loop, args=(test,)) + process = Process(target=Heartbeat.process_loop, args=(mirror, test)) process.daemon = True process.start() Heartbeat._process_started = True @@ -96,8 +100,7 @@ def _send_ping(self): """ Sends a heartbeat to the Runpod server. """ - jobs = JobsProgress() # Get the singleton instance - job_ids = jobs.get_job_list() + job_ids = self._mirror.get() if self._mirror is not None else None ping_params = {"job_id": job_ids, "runpod_version": runpod_version} try: diff --git a/tests/test_serverless/test_modules/test_ping.py b/tests/test_serverless/test_modules/test_ping.py index c40870f8..c014d401 100644 --- a/tests/test_serverless/test_modules/test_ping.py +++ b/tests/test_serverless/test_modules/test_ping.py @@ -50,12 +50,11 @@ def mock_session(self): yield session_instance @pytest.fixture - def mock_jobs(self): - """Mock the JobsProgress instance""" - with patch("runpod.serverless.modules.rp_ping.JobsProgress") as mock: - instance = mock.return_value - instance.get_job_list.return_value = "job1,job2,job3" - yield mock + def mock_mirror(self): + """A PingJobMirror stand-in returning a fixed in-progress job-id list.""" + mirror = MagicMock() + mirror.get.return_value = "job1,job2,job3" + yield mirror @pytest.fixture def mock_logger(self): @@ -152,10 +151,10 @@ def test_start_ping_success(self, mock_session_class, mock_process_class): heartbeat = Heartbeat() heartbeat.start_ping(test=True) - # Verify process was created correctly + # Verify process was created correctly (mirror defaults to None here) mock_process_class.assert_called_once_with( target=Heartbeat.process_loop, - args=(True,) + args=(None, True) ) # Verify daemon and start @@ -214,10 +213,11 @@ def side_effect(interval): # Should have sent 3 pings assert mock_send.call_count == 3 - def test_send_ping_success(self, mock_env, mock_worker_id, mock_session, mock_jobs, mock_logger): + def test_send_ping_success(self, mock_env, mock_worker_id, mock_session, mock_mirror, mock_logger): """Test successful ping send""" heartbeat = Heartbeat() - + heartbeat._mirror = mock_mirror + # Mock successful response mock_response = MagicMock() mock_response.url = "https://test.com/ping/test_worker_123" @@ -239,30 +239,29 @@ def test_send_ping_success(self, mock_env, mock_worker_id, mock_session, mock_jo mock_logger.debug.assert_called_once() def test_send_ping_no_jobs(self, mock_env, mock_worker_id, mock_session, mock_logger): - """Test ping send with no jobs""" - heartbeat = Heartbeat() - - # Mock no jobs - with patch("runpod.serverless.modules.rp_ping.JobsProgress.get_job_list", return_value=None): - mock_response = MagicMock() - mock_response.url = "https://test.com/ping/test_worker_123" - mock_response.status_code = 200 - mock_session.get.return_value = mock_response - - with patch("runpod.serverless.modules.rp_ping.runpod_version", "1.0.0"): - heartbeat._send_ping() - - # Verify request params - mock_session.get.assert_called_once_with( - "https://test.com/ping/test_worker_123", - params={"job_id": None, "runpod_version": "1.0.0"}, - timeout=10 - ) + """Test ping send with no mirror / no jobs in progress.""" + heartbeat = Heartbeat() # no mirror injected => job_id is None + + mock_response = MagicMock() + mock_response.url = "https://test.com/ping/test_worker_123" + mock_response.status_code = 200 + mock_session.get.return_value = mock_response + + with patch("runpod.serverless.modules.rp_ping.runpod_version", "1.0.0"): + heartbeat._send_ping() - def test_send_ping_request_exception(self, mock_env, mock_worker_id, mock_session, mock_jobs, mock_logger): + # Verify request params + mock_session.get.assert_called_once_with( + "https://test.com/ping/test_worker_123", + params={"job_id": None, "runpod_version": "1.0.0"}, + timeout=10 + ) + + def test_send_ping_request_exception(self, mock_env, mock_worker_id, mock_session, mock_mirror, mock_logger): """Test ping send with request exception""" heartbeat = Heartbeat() - + heartbeat._mirror = mock_mirror + # Mock request exception mock_session.get.side_effect = requests.RequestException("Connection error") From 786bf4108cb813f8b00290db1d8a16ab6c6bb1a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 22 Jun 2026 21:58:56 -0700 Subject: [PATCH 3/6] fix: JobScaler pushes job-id snapshot to ping mirror JobScaler updates the per-worker PingJobMirror after each job is acquired or finished, so the separate ping process always sees the current in-progress job ids without shared-file state. Refs SLS-314 --- runpod/serverless/modules/rp_scale.py | 10 +++- tests/test_serverless/test_scale_mirror.py | 53 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 tests/test_serverless/test_scale_mirror.py diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 8c27543e..5412206e 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -42,11 +42,12 @@ class JobScaler: Job Scaler. This class is responsible for scaling the number of concurrent requests. """ - def __init__(self, config: Dict[str, Any]): + def __init__(self, config: Dict[str, Any], job_mirror=None): self._shutdown_event = asyncio.Event() self.current_concurrency = 1 self.config = config self.job_progress = JobsProgress() # Cache the singleton instance + self.job_mirror = job_mirror # one-way snapshot to the ping process self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency) @@ -156,6 +157,11 @@ def current_occupancy(self) -> int: ) return current_progress_count + current_queue_count + def _sync_mirror(self) -> None: + """Push the current in-progress job-id snapshot to the ping process.""" + if self.job_mirror is not None: + self.job_mirror.set(self.job_progress.get_job_list()) + async def get_jobs(self, session: ClientSession): """ Retrieve multiple jobs from the server in batches using blocking requests. @@ -189,6 +195,7 @@ async def get_jobs(self, session: ClientSession): for job in acquired_jobs: await self.jobs_queue.put(job) self.job_progress.add(job) + self._sync_mirror() log.debug("Job Queued", job["id"]) log.info(f"Jobs in queue: {self.jobs_queue.qsize()}") @@ -274,5 +281,6 @@ async def handle_job(self, session: ClientSession, job: dict): # Job is no longer in progress self.job_progress.remove(job) + self._sync_mirror() log.debug("Finished Job", job["id"]) diff --git a/tests/test_serverless/test_scale_mirror.py b/tests/test_serverless/test_scale_mirror.py new file mode 100644 index 00000000..2a38bad2 --- /dev/null +++ b/tests/test_serverless/test_scale_mirror.py @@ -0,0 +1,53 @@ +"""JobScaler keeps the PingJobMirror in sync and reports correct occupancy.""" + +import pytest + +from runpod.serverless.modules.rp_scale import JobScaler +from runpod.serverless.modules.worker_state import JobsProgress, PingJobMirror + + +@pytest.fixture(autouse=True) +def _reset_singleton(): + JobsProgress._instance = None + yield + JobsProgress._instance = None + + +def _make_scaler(mirror): + return JobScaler({"handler": lambda x: x}, job_mirror=mirror) + + +def test_mirror_syncs_on_add(): + mirror = PingJobMirror() + scaler = _make_scaler(mirror) + + scaler.job_progress.add("job_1") + scaler._sync_mirror() + + assert "job_1" in mirror.get() + + +def test_mirror_syncs_on_remove(): + mirror = PingJobMirror() + scaler = _make_scaler(mirror) + + scaler.job_progress.add("job_1") + scaler._sync_mirror() + scaler.job_progress.remove("job_1") + scaler._sync_mirror() + + assert mirror.get() is None + + +def test_sync_mirror_is_noop_without_mirror(): + scaler = _make_scaler(None) + scaler.job_progress.add("job_1") + scaler._sync_mirror() # must not raise + + +def test_occupancy_counts_single_in_flight_job(): + # Concurrency 1 + one in-progress job => occupancy 1, so jobs_needed == 0. + scaler = _make_scaler(None) + scaler.job_progress.add("job_1") + assert scaler.current_occupancy() == 1 + assert scaler.current_concurrency - scaler.current_occupancy() == 0 From 8c0fedb7565a138f600d74fce77dfacd7a36a9ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 22 Jun 2026 22:00:46 -0700 Subject: [PATCH 4/6] fix: create and share one PingJobMirror per worker run_worker constructs a single mirror in the main process and passes it to both the ping process and the JobScaler, completing the #432 fix. Refs SLS-314 --- runpod/serverless/worker.py | 8 +++++-- tests/test_serverless/test_scale_mirror.py | 26 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/runpod/serverless/worker.py b/runpod/serverless/worker.py index 5fee79b4..626564ac 100644 --- a/runpod/serverless/worker.py +++ b/runpod/serverless/worker.py @@ -39,11 +39,15 @@ def run_worker(config: Dict[str, Any]) -> None: # Run fitness checks before starting worker (production only) asyncio.run(run_fitness_checks()) + # One per-worker mirror, shared by the ping process and the scaler. + from runpod.serverless.modules.worker_state import PingJobMirror + mirror = PingJobMirror() + # Start pinging Runpod to show that the worker is alive. - heartbeat.start_ping() + heartbeat.start_ping(mirror) # Create a JobScaler responsible for adjusting the concurrency - job_scaler = rp_scale.JobScaler(config) + job_scaler = rp_scale.JobScaler(config, job_mirror=mirror) job_scaler.start() diff --git a/tests/test_serverless/test_scale_mirror.py b/tests/test_serverless/test_scale_mirror.py index 2a38bad2..edcc7e64 100644 --- a/tests/test_serverless/test_scale_mirror.py +++ b/tests/test_serverless/test_scale_mirror.py @@ -51,3 +51,29 @@ def test_occupancy_counts_single_in_flight_job(): scaler.job_progress.add("job_1") assert scaler.current_occupancy() == 1 assert scaler.current_concurrency - scaler.current_occupancy() == 0 + + +def test_run_worker_shares_one_mirror(): + """run_worker creates one mirror and passes it to both ping and scaler.""" + import runpod.serverless.worker as worker_mod + from unittest.mock import MagicMock, patch + + captured = {} + + def fake_job_scaler(config, job_mirror=None): + captured["scaler_mirror"] = job_mirror + scaler = MagicMock() + scaler.start = MagicMock() + return scaler + + def fake_start_ping(mirror=None, test=False): + captured["ping_mirror"] = mirror + + with patch.object(worker_mod, "run_fitness_checks", return_value=None), \ + patch("asyncio.run", lambda *a, **k: None), \ + patch.object(worker_mod.heartbeat, "start_ping", side_effect=fake_start_ping), \ + patch.object(worker_mod.rp_scale, "JobScaler", side_effect=fake_job_scaler): + worker_mod.run_worker({"handler": lambda x: x}) + + assert captured["ping_mirror"] is not None + assert captured["ping_mirror"] is captured["scaler_mirror"] From cd8005bf698ad2b3dd9f6e51c01bbd0d18032c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 22 Jun 2026 22:03:04 -0700 Subject: [PATCH 5/6] chore: drop removed job-state-file reference from local_sim The .runpod_jobs.pkl state file no longer exists; remove its cleanup from the local_sim Makefile. Refs SLS-314 --- tests/test_serverless/local_sim/Makefile | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_serverless/local_sim/Makefile b/tests/test_serverless/local_sim/Makefile index 72475872..75d880f8 100644 --- a/tests/test_serverless/local_sim/Makefile +++ b/tests/test_serverless/local_sim/Makefile @@ -9,6 +9,5 @@ worker: python worker.py clean: - find . -type f -name ".runpod_jobs.pkl" -delete find . -type f -name "*.pyc" -delete find . -type d -name "__pycache__" -delete From 68c8a95f9b940523d42b34176ea29620debb428a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 22 Jun 2026 23:15:50 -0700 Subject: [PATCH 6/6] fix: sync ping mirror inside JobsProgress, cover API mode PR #517 review (capy-ai): the API/realtime path (rp_fastapi WorkerAPI) started the ping without a mirror while tracking jobs in JobsProgress, so heartbeats sent job_id=None there. Move mirror propagation into JobsProgress.add/remove/clear via an attached mirror, so every writer path (JobScaler and rp_fastapi) stays in sync from a single place. Attach the mirror in run_worker and WorkerAPI; drop JobScaler's now-redundant job_mirror plumbing. Refs SLS-314 --- runpod/serverless/modules/rp_fastapi.py | 12 +++-- runpod/serverless/modules/rp_scale.py | 10 +--- runpod/serverless/modules/worker_state.py | 26 +++++++++- runpod/serverless/worker.py | 8 +-- .../test_integration_worker_state.py | 44 +++++++++++++++++ .../test_modules/test_fastapi.py | 32 ++++++++++++ tests/test_serverless/test_scale_mirror.py | 49 ++++--------------- 7 files changed, 124 insertions(+), 57 deletions(-) diff --git a/runpod/serverless/modules/rp_fastapi.py b/runpod/serverless/modules/rp_fastapi.py index f4aff225..5451ae40 100644 --- a/runpod/serverless/modules/rp_fastapi.py +++ b/runpod/serverless/modules/rp_fastapi.py @@ -17,7 +17,7 @@ from .rp_handler import is_generator from .rp_job import run_job, run_job_generator from .rp_ping import Heartbeat -from .worker_state import JobsProgress +from .worker_state import JobsProgress, PingJobMirror RUNPOD_ENDPOINT_ID = os.environ.get("RUNPOD_ENDPOINT_ID", None) @@ -184,8 +184,14 @@ def __init__(self, config: Dict[str, Any]): 2. Initializes the FastAPI web server. 3. Sets the handler for processing jobs. """ - # Start the heartbeat thread. - heartbeat.start_ping() + # One per-worker mirror so the separate ping process reports the + # in-progress job ids tracked here. Attaching to job_list means every + # add/remove syncs it automatically. + mirror = PingJobMirror() + job_list.set_mirror(mirror) + + # Start the heartbeat process. + heartbeat.start_ping(mirror) self.config = config diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 5412206e..8c27543e 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -42,12 +42,11 @@ class JobScaler: Job Scaler. This class is responsible for scaling the number of concurrent requests. """ - def __init__(self, config: Dict[str, Any], job_mirror=None): + def __init__(self, config: Dict[str, Any]): self._shutdown_event = asyncio.Event() self.current_concurrency = 1 self.config = config self.job_progress = JobsProgress() # Cache the singleton instance - self.job_mirror = job_mirror # one-way snapshot to the ping process self.jobs_queue = asyncio.Queue(maxsize=self.current_concurrency) @@ -157,11 +156,6 @@ def current_occupancy(self) -> int: ) return current_progress_count + current_queue_count - def _sync_mirror(self) -> None: - """Push the current in-progress job-id snapshot to the ping process.""" - if self.job_mirror is not None: - self.job_mirror.set(self.job_progress.get_job_list()) - async def get_jobs(self, session: ClientSession): """ Retrieve multiple jobs from the server in batches using blocking requests. @@ -195,7 +189,6 @@ async def get_jobs(self, session: ClientSession): for job in acquired_jobs: await self.jobs_queue.put(job) self.job_progress.add(job) - self._sync_mirror() log.debug("Job Queued", job["id"]) log.info(f"Jobs in queue: {self.jobs_queue.qsize()}") @@ -281,6 +274,5 @@ async def handle_job(self, session: ClientSession, job: dict): # Job is no longer in progress self.job_progress.remove(job) - self._sync_mirror() log.debug("Finished Job", job["id"]) diff --git a/runpod/serverless/modules/worker_state.py b/runpod/serverless/modules/worker_state.py index 41d730e6..75a3d660 100644 --- a/runpod/serverless/modules/worker_state.py +++ b/runpod/serverless/modules/worker_state.py @@ -73,6 +73,9 @@ def __new__(cls): if JobsProgress._instance is None: JobsProgress._instance = set.__new__(cls) set.__init__(JobsProgress._instance) + # One-way snapshot to the ping process; attached in the main + # process via set_mirror(). Stays None off-Runpod and in tests. + JobsProgress._instance._mirror = None return JobsProgress._instance def __init__(self): @@ -82,6 +85,21 @@ def __init__(self): def __repr__(self) -> str: return f"<{self.__class__.__name__}>: {self.get_job_list()}" + def set_mirror(self, mirror) -> None: + """Attach a PingJobMirror that mirrors the in-progress job ids to the + ping process. Every add/remove/clear then pushes the snapshot to it.""" + self._mirror = mirror + self._notify_mirror() + + def _notify_mirror(self) -> None: + """Push the current job-id snapshot to the attached mirror, if any.""" + if self._mirror is not None: + self._mirror.set(self.get_job_list()) + + def clear(self) -> None: + super().clear() + self._notify_mirror() + def add(self, element: Any): """ Adds a Job object to the set. @@ -98,7 +116,9 @@ def add(self, element: Any): if not isinstance(element, Job): raise TypeError("Only Job objects can be added to JobsProgress.") - return super().add(element) + result = super().add(element) + self._notify_mirror() + return result def remove(self, element: Any): """ @@ -116,7 +136,9 @@ def remove(self, element: Any): if not isinstance(element, Job): raise TypeError("Only Job objects can be removed from JobsProgress.") - return super().discard(element) + result = super().discard(element) + self._notify_mirror() + return result def get(self, element: Any) -> Optional[Job]: if isinstance(element, str): diff --git a/runpod/serverless/worker.py b/runpod/serverless/worker.py index 626564ac..90053ec7 100644 --- a/runpod/serverless/worker.py +++ b/runpod/serverless/worker.py @@ -39,15 +39,17 @@ def run_worker(config: Dict[str, Any]) -> None: # Run fitness checks before starting worker (production only) asyncio.run(run_fitness_checks()) - # One per-worker mirror, shared by the ping process and the scaler. - from runpod.serverless.modules.worker_state import PingJobMirror + # One per-worker mirror: the job tracker writes it, the ping process reads + # it. Attaching to JobsProgress means every add/remove syncs automatically. + from runpod.serverless.modules.worker_state import JobsProgress, PingJobMirror mirror = PingJobMirror() + JobsProgress().set_mirror(mirror) # Start pinging Runpod to show that the worker is alive. heartbeat.start_ping(mirror) # Create a JobScaler responsible for adjusting the concurrency - job_scaler = rp_scale.JobScaler(config, job_mirror=mirror) + job_scaler = rp_scale.JobScaler(config) job_scaler.start() diff --git a/tests/test_serverless/test_integration_worker_state.py b/tests/test_serverless/test_integration_worker_state.py index 62c5f9f2..2bca1ac8 100644 --- a/tests/test_serverless/test_integration_worker_state.py +++ b/tests/test_serverless/test_integration_worker_state.py @@ -67,6 +67,50 @@ def test_two_workers_do_not_share_jobs(self, tmp_path, monkeypatch): assert worker_b.get_job_list() is None +class TestJobsProgressMirrorSync: + """An attached mirror tracks the in-progress set on every mutation.""" + + def setup_method(self): + _reset_singleton() + + def teardown_method(self): + _reset_singleton() + + def test_set_mirror_pushes_current_state(self): + jp = JobsProgress() + jp.add("job_1") + + mirror = PingJobMirror() + jp.set_mirror(mirror) # initial push of existing state + assert "job_1" in mirror.get() + + def test_add_and_remove_sync_the_mirror(self): + jp = JobsProgress() + mirror = PingJobMirror() + jp.set_mirror(mirror) + + jp.add("job_1") + assert "job_1" in mirror.get() + + jp.remove("job_1") + assert mirror.get() is None + + def test_clear_syncs_the_mirror(self): + jp = JobsProgress() + mirror = PingJobMirror() + jp.set_mirror(mirror) + + jp.add("job_1") + jp.clear() + assert mirror.get() is None + + def test_mutation_without_mirror_does_not_raise(self): + jp = JobsProgress() # no mirror attached + jp.add("job_1") + jp.remove("job_1") + jp.clear() + + class TestPingJobMirror: def test_set_get_roundtrip(self): mirror = PingJobMirror() diff --git a/tests/test_serverless/test_modules/test_fastapi.py b/tests/test_serverless/test_modules/test_fastapi.py index 9b27139f..4830b9df 100644 --- a/tests/test_serverless/test_modules/test_fastapi.py +++ b/tests/test_serverless/test_modules/test_fastapi.py @@ -60,6 +60,38 @@ def test_start_serverless_with_realtime(self): os.environ.pop("RUNPOD_REALTIME_PORT") os.environ.pop("RUNPOD_ENDPOINT_ID") + def test_worker_api_attaches_ping_mirror(self): + """WorkerAPI attaches a PingJobMirror to job_list and passes the same + instance to the ping; job tracking then syncs it. Regression guard for + the API-mode ping reporting job_id=None (PR #517 review).""" + module_location = "runpod.serverless.modules.rp_fastapi" + with patch( + f"{module_location}.Heartbeat.start_ping", Mock() + ) as mock_ping, patch( + f"{module_location}.FastAPI", Mock() + ), patch( + f"{module_location}.APIRouter", return_value=Mock() + ), patch( + f"{module_location}.uvicorn", Mock() + ): + rp_fastapi.job_list.clear() + rp_fastapi.job_list._mirror = None + + rp_fastapi.WorkerAPI({"handler": self.handler}) + + # The ping received a mirror, and it is the one attached to job_list. + self.assertTrue(mock_ping.called) + passed_mirror = mock_ping.call_args[0][0] + self.assertIsNotNone(passed_mirror) + self.assertIs(rp_fastapi.job_list._mirror, passed_mirror) + + # Tracking a job in API mode now syncs into the mirror. + rp_fastapi.job_list.add("api_job_1") + self.assertIn("api_job_1", passed_mirror.get()) + + rp_fastapi.job_list.clear() + rp_fastapi.job_list._mirror = None + def test_webhook_sender_success(self): """Test the webhook sender when the request is successful.""" module_location = "runpod.serverless.modules.rp_fastapi.requests.Session.post" diff --git a/tests/test_serverless/test_scale_mirror.py b/tests/test_serverless/test_scale_mirror.py index edcc7e64..19205c31 100644 --- a/tests/test_serverless/test_scale_mirror.py +++ b/tests/test_serverless/test_scale_mirror.py @@ -1,9 +1,9 @@ -"""JobScaler keeps the PingJobMirror in sync and reports correct occupancy.""" +"""JobScaler occupancy and run_worker mirror wiring.""" import pytest from runpod.serverless.modules.rp_scale import JobScaler -from runpod.serverless.modules.worker_state import JobsProgress, PingJobMirror +from runpod.serverless.modules.worker_state import JobsProgress @pytest.fixture(autouse=True) @@ -13,55 +13,23 @@ def _reset_singleton(): JobsProgress._instance = None -def _make_scaler(mirror): - return JobScaler({"handler": lambda x: x}, job_mirror=mirror) - - -def test_mirror_syncs_on_add(): - mirror = PingJobMirror() - scaler = _make_scaler(mirror) - - scaler.job_progress.add("job_1") - scaler._sync_mirror() - - assert "job_1" in mirror.get() - - -def test_mirror_syncs_on_remove(): - mirror = PingJobMirror() - scaler = _make_scaler(mirror) - - scaler.job_progress.add("job_1") - scaler._sync_mirror() - scaler.job_progress.remove("job_1") - scaler._sync_mirror() - - assert mirror.get() is None - - -def test_sync_mirror_is_noop_without_mirror(): - scaler = _make_scaler(None) - scaler.job_progress.add("job_1") - scaler._sync_mirror() # must not raise - - def test_occupancy_counts_single_in_flight_job(): # Concurrency 1 + one in-progress job => occupancy 1, so jobs_needed == 0. - scaler = _make_scaler(None) + scaler = JobScaler({"handler": lambda x: x}) scaler.job_progress.add("job_1") assert scaler.current_occupancy() == 1 assert scaler.current_concurrency - scaler.current_occupancy() == 0 -def test_run_worker_shares_one_mirror(): - """run_worker creates one mirror and passes it to both ping and scaler.""" +def test_run_worker_attaches_one_shared_mirror(): + """run_worker attaches one mirror to JobsProgress and passes the same + instance to the ping process.""" import runpod.serverless.worker as worker_mod from unittest.mock import MagicMock, patch captured = {} - def fake_job_scaler(config, job_mirror=None): - captured["scaler_mirror"] = job_mirror + def fake_job_scaler(config): scaler = MagicMock() scaler.start = MagicMock() return scaler @@ -75,5 +43,6 @@ def fake_start_ping(mirror=None, test=False): patch.object(worker_mod.rp_scale, "JobScaler", side_effect=fake_job_scaler): worker_mod.run_worker({"handler": lambda x: x}) + # The ping got a mirror, and it is the same instance the job tracker writes. assert captured["ping_mirror"] is not None - assert captured["ping_mirror"] is captured["scaler_mirror"] + assert JobsProgress()._mirror is captured["ping_mirror"]