From 5464900d4c109c1d3a493e7cd99b8d5d89cac7ae Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 10:55:39 +0100 Subject: [PATCH 01/27] Fix worker stop, timestamp handling, and imports Preserve explicit timestamp values and strengthen worker shutdown behavior. Key changes: - recording_manager: use timestamp if timestamp is not None else time.time() so falsy timestamps (e.g. 0) are preserved instead of being overridden. - services/dlc_processor: import DLCLive at module top and enable profiling; make _stop_worker return a boolean and have reset() check that return value to avoid resetting the DLCLive instance while the worker thread is still alive (with warnings/logging). Adjusted comments about thread termination. - gui/main_window: use a truthiness check for dlc_cam_id to avoid treating empty values as valid IDs. These updates prevent spurious timestamp overrides, avoid unsafe DLCLive resets when threads are still running, and clarify import/profiling intent. --- dlclivegui/gui/main_window.py | 2 +- dlclivegui/gui/recording_manager.py | 2 +- dlclivegui/services/dlc_processor.py | 25 +++++++++++++------------ 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index b1ee0f1..caf0937 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -1376,7 +1376,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None: dlc_cam_id = selected_id else: dlc_cam_id = available_ids[0] if available_ids else "" - if dlc_cam_id is not None: + if dlc_cam_id: self._inference_camera_id = dlc_cam_id self._set_dlc_combo_to_id(dlc_cam_id) self.statusBar().showMessage( diff --git a/dlclivegui/gui/recording_manager.py b/dlclivegui/gui/recording_manager.py index 22491c6..49ac993 100644 --- a/dlclivegui/gui/recording_manager.py +++ b/dlclivegui/gui/recording_manager.py @@ -139,7 +139,7 @@ def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None = if not rec or not rec.is_running: return try: - rec.write(frame, timestamp=timestamp or time.time()) + rec.write(frame, timestamp=timestamp if timestamp is not None else time.time()) except Exception as exc: log.warning("Failed to write frame for %s: %s", cam_id, exc) try: diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 74d61bc..2ebe8ed 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -14,6 +14,7 @@ from typing import Any import numpy as np +from dlclive import DLCLive from PySide6.QtCore import QObject, Signal from dlclivegui.config import DLCProcessorSettings, ModelType @@ -22,17 +23,9 @@ logger = logging.getLogger(__name__) -# Enable profiling +# Enable profiling to get more detailed timing metrics for debugging and optimization. ENABLE_PROFILING = True -try: # pragma: no cover - optional dependency - from dlclive import ( - DLCLive, # type: ignore - ) -except Exception as e: # pragma: no cover - handled gracefully - logger.error(f"dlclive package could not be imported: {e}") - DLCLive = None # type: ignore[assignment] - class PoseBackends(Enum): DLC_LIVE = auto() @@ -174,7 +167,12 @@ def configure(self, settings: DLCProcessorSettings, processor: Any | None = None def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" - self._stop_worker() + stopped = self._stop_worker() + if not stopped: + logger.warning( + "Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues." + ) + return self._dlc = None self._initialized = False with self._stats_lock: @@ -280,17 +278,20 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: def _stop_worker(self) -> None: if self._worker_thread is None: - return + return True self._stop_event.set() - # Just wait for the timed get() loop to observe the flag and drain + # Wait for timed get() loop to observe the flag and drain self._worker_thread.join(timeout=2.0) if self._worker_thread.is_alive(): logger.warning("DLC worker thread did not terminate cleanly") + # IMPORTANT: do not clear references; thread may still be using them + return False self._worker_thread = None self._queue = None + return True @contextmanager def _timed_processor(self): From 13bc76f7d68d49ad125bec275e52e0c5c001a0c5 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 10:58:37 +0100 Subject: [PATCH 02/27] Add WorkerState enum for worker lifecycle Introduce a WorkerState enum and import Enum/auto in dlc_processor.py to represent worker lifecycle states (STOPPED, STARTING, RUNNING, STOPPING, FAULTED). This centralizes state representation for processing workers and improves clarity for future state management. --- dlclivegui/services/dlc_processor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 2ebe8ed..098bd74 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -29,6 +29,12 @@ class PoseBackends(Enum): DLC_LIVE = auto() +class WorkerState(Enum): + STOPPED = auto() + STARTING = auto() + RUNNING = auto() + STOPPING = auto() + FAULTED = auto() @dataclass From 5b849ddfc8c86aed0794ce7b139defad20b9b27f Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:18:17 +0100 Subject: [PATCH 03/27] Improve DLCLive processor lifecycle and safety Add explicit worker lifecycle management and synchronization to DLCLive processor: introduce WorkerState, a lifecycle Lock, and stop/start state transitions. Make _start/_stop behavior safer by snapshotting the queue, returning boolean on stop, joining the thread with fault handling, and avoiding races by using a local queue reference in the worker loop. Harden enqueue_frame to respect lifecycle/state/stop flags and reduce copies by reusing a single frame copy and enqueuing enqueue timestamp. Improve DLCLive initialization error handling (set FAULTED on failure) and ensure caller-visible errors when DLCLive is not initialized. Prevent configuring the processor while running and add logging/warnings for shutdown/termination issues. Minor cleanup: adjust where initialized/state flags are set and remove redundant checks. --- dlclivegui/services/dlc_processor.py | 107 ++++++++++++++++++--------- 1 file changed, 74 insertions(+), 33 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 098bd74..d751fab 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -29,6 +29,8 @@ class PoseBackends(Enum): DLC_LIVE = auto() + + class WorkerState(Enum): STOPPED = auto() STARTING = auto() @@ -139,11 +141,15 @@ class DLCLiveProcessor(QObject): def __init__(self) -> None: super().__init__() + # DLCLive instance and config self._settings = DLCProcessorSettings() self._dlc: Any | None = None self._processor: Any | None = None + # Worker thread and queue self._queue: queue.Queue[Any] | None = None self._worker_thread: threading.Thread | None = None + self._state = WorkerState.STOPPED + self._lifecycle_lock = threading.Lock() self._stop_event = threading.Event() self._initialized = False @@ -195,22 +201,29 @@ def reset(self) -> None: self._processor_overhead_times.clear() def shutdown(self) -> None: - self._stop_worker() + stopped = self._stop_worker() + if not stopped: + logger.warning( + "Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released." + ) + return self._dlc = None self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: - # Start worker on first frame - if self._worker_thread is None: - self._start_worker(frame.copy(), timestamp) - return + frame_c = frame.copy() + enq_time = time.perf_counter() + + with self._lifecycle_lock: + if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + return + q = self._queue # snapshot under lock - # As long as worker and queue are ready, ALWAYS enqueue - if self._queue is None: + if q is None: return try: - self._queue.put_nowait((frame.copy(), timestamp, time.perf_counter())) + q.put_nowait((frame_c, timestamp, enq_time)) with self._stats_lock: self._frames_enqueued += 1 except queue.Full: @@ -268,12 +281,13 @@ def get_stats(self) -> ProcessorStats: avg_processor_overhead=avg_proc_overhead, ) - def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: + def _start_worker_locked(self, init_frame: np.ndarray, init_timestamp: float) -> None: + # lifecycle_lock must already be held if self._worker_thread is not None and self._worker_thread.is_alive(): return - self._queue = queue.Queue(maxsize=1) self._stop_event.clear() + self._state = WorkerState.STARTING self._worker_thread = threading.Thread( target=self._worker_loop, args=(init_frame, init_timestamp), @@ -282,21 +296,31 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: ) self._worker_thread.start() - def _stop_worker(self) -> None: - if self._worker_thread is None: - return True - - self._stop_event.set() - - # Wait for timed get() loop to observe the flag and drain - self._worker_thread.join(timeout=2.0) - if self._worker_thread.is_alive(): - logger.warning("DLC worker thread did not terminate cleanly") - # IMPORTANT: do not clear references; thread may still be using them + def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: + with self._lifecycle_lock: + self._start_worker_locked(init_frame, init_timestamp) + + def _stop_worker(self) -> bool: + with self._lifecycle_lock: + t = self._worker_thread + if t is None: + self._state = WorkerState.STOPPED + return True + self._state = WorkerState.STOPPING + self._stop_event.set() + + t.join(timeout=2.0) + if t.is_alive(): + qsize = self._queue.qsize() if self._queue is not None else -1 + logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED return False - self._worker_thread = None - self._queue = None + with self._lifecycle_lock: + self._worker_thread = None + self._queue = None + self._state = WorkerState.STOPPED return True @contextmanager @@ -340,6 +364,8 @@ def _process_frame( Single source of truth for: inference -> (optional) processor timing -> signal emit -> stats. Updates: frames_processed, latency, processing timeline, profiling metrics. """ + if self._dlc is None: + raise RuntimeError("DLCLive instance is not initialized.") # Time GPU inference (and processor overhead when present) with self._timed_processor() as proc_holder: inference_start = time.perf_counter() @@ -389,8 +415,6 @@ def _process_frame( def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: try: # -------- Initialization (unchanged) -------- - if DLCLive is None: - raise RuntimeError("The 'dlclive' package is required for pose estimation.") if not self._settings.model_path: raise RuntimeError("No DLCLive model path configured.") @@ -415,7 +439,14 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: if self._settings.device is not None: options["device"] = self._settings.device - self._dlc = DLCLive(**options) + try: + self._dlc = DLCLive(**options) + except Exception as exc: + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + raise RuntimeError( + f"Failed to initialize DLCLive with model '{self._settings.model_path}': {exc}" + ) from exc # First inference to initialize init_inference_start = time.perf_counter() @@ -428,6 +459,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self._initialized = True self.initialized.emit(True) + with self._lifecycle_lock: + self._state = WorkerState.RUNNING total_init_time = time.perf_counter() - init_start logger.info( @@ -447,14 +480,20 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.initialized.emit(False) return + q = ( + self._queue + ) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running. + if q is None: + logger.warning("Worker loop exiting because queue was unexpectedly None") + return # -------- Main processing loop: stop-flag + timed get + drain -------- # NOTE: We never exit early unless _stop_event is set. while True: # If stop requested, only exit when queue is empty if self._stop_event.is_set(): - if self._queue is not None: + if q is not None: try: - frame, ts, enq = self._queue.get_nowait() + frame, ts, enq = q.get_nowait() except queue.Empty: # NOW it is safe to exit break @@ -467,7 +506,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.error.emit(str(exc)) finally: try: - self._queue.task_done() + q.task_done() except ValueError: pass continue # check stop_event again WITHOUT breaking @@ -475,7 +514,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: # Normal operation: timed get try: wait_start = time.perf_counter() - item = self._queue.get(timeout=0.05) + item = q.get(timeout=0.05) queue_wait_time = time.perf_counter() - wait_start except queue.Empty: continue @@ -488,7 +527,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.error.emit(str(exc)) finally: try: - self._queue.task_done() + q.task_done() except ValueError: pass @@ -525,6 +564,10 @@ def enqueue(self, frame, ts): self._proc.enqueue_frame(frame, ts) def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, selected_key) -> bool: + with self._proc._lifecycle_lock: + if self._proc._state != WorkerState.STOPPED: + raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") + processor = None if selected_key is not None and scanned_processors: try: @@ -538,11 +581,9 @@ def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, se def start(self): self._proc.reset() self.active = True - self.initialized = False def stop(self): self.active = False - self.initialized = False self._proc.reset() self._last_pose = None From 88473f4cf8b3ef9612fcbd200490b432776efb1a Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:24:12 +0100 Subject: [PATCH 04/27] Restart worker if dead and clear stop event When queuing work, ensure the background worker is started if the worker thread is None or not alive by calling _start_worker_locked(frame_c, timestamp). Also clear the _stop_event in both the early-return (no thread) and final cleanup paths when stopping so a stale stop flag doesn't prevent future restarts. --- dlclivegui/services/dlc_processor.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index d751fab..70eb7ca 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -217,6 +217,11 @@ def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: with self._lifecycle_lock: if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): return + t = self._worker_thread + if t is None or not t.is_alive(): + self._start_worker_locked(frame_c, timestamp) + return + q = self._queue # snapshot under lock if q is None: @@ -305,6 +310,7 @@ def _stop_worker(self) -> bool: t = self._worker_thread if t is None: self._state = WorkerState.STOPPED + self._stop_event.clear() return True self._state = WorkerState.STOPPING self._stop_event.set() @@ -321,6 +327,7 @@ def _stop_worker(self) -> bool: self._worker_thread = None self._queue = None self._state = WorkerState.STOPPED + self._stop_event.clear() return True @contextmanager From c4cc84267a2b54bb5e3dfeae769214283593d9e8 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:30:47 +0100 Subject: [PATCH 05/27] Mark worker FAULTED on queue errors Improve error handling in the worker loop when the queue is missing or queue.get raises unexpected exceptions. When q is None the code now logs a clearer message, sets the worker state to WorkerState.FAULTED under the lifecycle lock, and emits an error signal. Also adds a broad except around queue.get to log the exception, set FAULTED state, emit the error message, and break the loop to avoid silent failures. --- dlclivegui/services/dlc_processor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 70eb7ca..185ddc4 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -491,8 +491,12 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self._queue ) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running. if q is None: - logger.warning("Worker loop exiting because queue was unexpectedly None") + logger.warning("Worker started without a queue; exiting") + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + self.error.emit("Worker started without a queue") return + # -------- Main processing loop: stop-flag + timed get + drain -------- # NOTE: We never exit early unless _stop_event is set. while True: @@ -525,6 +529,12 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: queue_wait_time = time.perf_counter() - wait_start except queue.Empty: continue + except Exception as exc: + logger.exception("Error getting item from queue", exc_info=exc) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + self.error.emit(str(exc)) + break try: frame, ts, enq = item From a7cf9e6cb2d76fc409e7d646b7da42ecd11da3f5 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:31:04 +0100 Subject: [PATCH 06/27] Force-terminate frozen camera threads on stop Make multi-camera shutdown more robust by iterating threads with their camera IDs, skipping non-running threads, and quitting each thread. If a thread fails to stop within 5000ms, log an error and call terminate() then wait an additional 1000ms to ensure it stops. Also simplify the start() docstring. --- dlclivegui/services/multi_camera_controller.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 8db469f..b579c86 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -143,7 +143,7 @@ def get_active_count(self) -> int: return len(self._started_cameras) def start(self, camera_settings: list[CameraSettings]) -> None: - """Start multiple cameras; accepts dataclasses, pydantic models, or dicts.""" + """Start multiple cameras.""" if self._running: LOGGER.warning("Multi-camera controller already running") return @@ -201,10 +201,15 @@ def stop(self, wait: bool = True) -> None: # Wait for threads to finish if wait: - for thread in self._threads.values(): - if thread.isRunning(): - thread.quit() - thread.wait(5000) + for cam_id, thread in list(self._threads.items()): + if not thread.isRunning(): + continue + + thread.quit() + if not thread.wait(5000): + LOGGER.error("Frozen camera thread %s; Forcing terminate()", cam_id) + thread.terminate() + thread.wait(1000) self._workers.clear() self._threads.clear() From 6deb0051942ad4c05c1fa4c5521f7080f57d5e17 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:32:37 +0100 Subject: [PATCH 07/27] Count unique cameras when starting multi-camera Previously _expected_cameras was set to len(active_settings), which could overcount when multiple CameraSettings referred to the same physical camera. This change computes a set of unique camera IDs (via get_camera_id) and sets _expected_cameras to its length, ensuring duplicates aren't double-counted. --- dlclivegui/services/multi_camera_controller.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index b579c86..1375ba2 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -158,7 +158,9 @@ def start(self, camera_settings: list[CameraSettings]) -> None: self._timestamps.clear() self._started_cameras.clear() self._failed_cameras.clear() - self._expected_cameras = len(active_settings) + + unique_ids = {get_camera_id(s) for s in active_settings} + self._expected_cameras = len(unique_ids) for settings in active_settings: self._start_camera(settings) From b2d911b7005c5e507034d15483fa000ff23a1f40 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:42:20 +0100 Subject: [PATCH 08/27] Improve VideoRecorder shutdown and writer loop Add robust shutdown and error handling for the video recorder. Prevent writes after stop by checking the stop event early; record and raise encoding errors as before. Refactor stop() to use local vars, signal the writer via a sentinel, join the writer thread safely, and avoid saving timestamps prematurely. Rework _writer_loop to handle a missing queue, broaden exception handling when pulling/writing items, ensure q.task_done() is always called, properly handle the sentinel, update encoding stats and logging, and perform final cleanup (save timestamps, clear queue and thread) when exiting. --- dlclivegui/services/video_recorder.py | 116 ++++++++++++++++++-------- 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 1addbca..7a6967c 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -118,11 +118,13 @@ def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None self._frame_rate = frame_rate def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: - if not self.is_running or self._queue is None: - return False error = self._current_error() if error is not None: raise RuntimeError(f"Video encoding failed: {error}") from error + if not self.is_running or self._queue is None: + return False + if self._stop_event.is_set(): + return False # Capture timestamp now, but only record it if frame is successfully enqueued if timestamp is None: @@ -181,25 +183,32 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: def stop(self) -> None: if self._writer is None and not self.is_running: return + self._stop_event.set() - if self._queue is not None: + + q = self._queue + if q is not None: try: - self._queue.put_nowait(_SENTINEL) + q.put_nowait(_SENTINEL) except queue.Full: pass - # self._queue.put(_SENTINEL) - if self._writer_thread is not None: - self._writer_thread.join(timeout=5.0) - if self._writer_thread.is_alive(): + + t = self._writer_thread + if t is not None: + t.join(timeout=5.0) + if t.is_alive(): logger.warning("Video recorder thread did not terminate cleanly") + return + if self._writer is not None: try: self._writer.close() except Exception: logger.exception("Failed to close WriteGear cleanly") - # Save timestamps to JSON file - self._save_timestamps() + if self._writer_thread is None: + # Save timestamps to JSON file + self._save_timestamps() self._writer = None self._writer_thread = None @@ -236,45 +245,80 @@ def get_stats(self) -> RecorderStats | None: ) def _writer_loop(self) -> None: - assert self._queue is not None + q = self._queue + if q is None: + with self._stats_lock: + self._encode_error = RuntimeError("Writer loop started without a queue") + logger.error("Writer loop started without a queue; exiting") + return + try: while True: try: - item = self._queue.get(timeout=0.1) + item = q.get(timeout=0.1) except queue.Empty: if self._stop_event.is_set(): break continue - if item is _SENTINEL: - self._queue.task_done() - break - frame, timestamp = item - start = time.perf_counter() - try: - assert self._writer is not None - self._writer.write(frame) - except OSError as exc: + except Exception as exc: with self._stats_lock: self._encode_error = exc - logger.exception("Video encoding failed while writing frame") - self._queue.task_done() + logger.exception("Could not retrieve item from queue", exc_info=exc) self._stop_event.set() break - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._queue.qsize() - self._last_log_time = now - self._queue.task_done() + + stop_now = False + try: + if item is _SENTINEL: + stop_now = True + continue + + frame, timestamp = item + start = time.perf_counter() + + try: + writer = self._writer + if writer is None: + raise RuntimeError("WriteGear writer is not initialized") + writer.write(frame) + except Exception as exc: # <- broader than OSError + with self._stats_lock: + self._encode_error = exc + logger.exception("Video encoding failed while writing frame", exc_info=exc) + self._stop_event.set() + stop_now = True + continue + + elapsed = time.perf_counter() - start + now = time.perf_counter() + with self._stats_lock: + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now + + finally: + # Ensure queue accounting is correct for every item pulled from q + try: + q.task_done() + except ValueError: + pass + + if stop_now: + break + finally: self._finalize_writer() + self._save_timestamps() + + # Safe cleanup only once the thread is actually exiting + self._queue = None + if self._writer_thread is threading.current_thread(): + self._writer_thread = None def _finalize_writer(self) -> None: writer = self._writer From 188694a09c49689f7441a089705c36c31c946206 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:46:00 +0100 Subject: [PATCH 09/27] Refactor writer thread error handling and stats Consolidate the writer-thread loop and tighten up exception handling. Removed the conditional _save_timestamps path during teardown. In the writer loop sentinel handling and frame processing are unified; on write errors the exception is stored under _stats_lock, logged, and stop_event/stop_now are set. The stats updates (frames_written, total_latency, last_latency, written_times, frame_timestamps and periodic FPS computation) were moved into the same control flow to ensure consistent accounting and termination behavior. --- dlclivegui/services/video_recorder.py | 58 ++++++++++++--------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 7a6967c..91d400a 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -206,10 +206,6 @@ def stop(self) -> None: except Exception: logger.exception("Failed to close WriteGear cleanly") - if self._writer_thread is None: - # Save timestamps to JSON file - self._save_timestamps() - self._writer = None self._writer_thread = None self._queue = None @@ -271,35 +267,33 @@ def _writer_loop(self) -> None: try: if item is _SENTINEL: stop_now = True - continue - - frame, timestamp = item - start = time.perf_counter() - - try: - writer = self._writer - if writer is None: - raise RuntimeError("WriteGear writer is not initialized") - writer.write(frame) - except Exception as exc: # <- broader than OSError + else: + frame, timestamp = item + start = time.perf_counter() + + try: + writer = self._writer + if writer is None: + raise RuntimeError("WriteGear writer is not initialized") + writer.write(frame) + except Exception as exc: + with self._stats_lock: + self._encode_error = exc + logger.exception("Video encoding failed while writing frame", exc_info=exc) + self._stop_event.set() + stop_now = True + + elapsed = time.perf_counter() - start + now = time.perf_counter() with self._stats_lock: - self._encode_error = exc - logger.exception("Video encoding failed while writing frame", exc_info=exc) - self._stop_event.set() - stop_now = True - continue - - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._last_log_time = now + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now finally: # Ensure queue accounting is correct for every item pulled from q From 3fbaeeb3d4daa75499aad9d6d8bb020f4da4ce72 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:57:30 +0100 Subject: [PATCH 10/27] Revert DLCLive imports due to unguarded torch --- dlclivegui/services/dlc_processor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 185ddc4..51fbbe9 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -14,7 +14,8 @@ from typing import Any import numpy as np -from dlclive import DLCLive + +# from dlclive import DLCLive from PySide6.QtCore import QObject, Signal from dlclivegui.config import DLCProcessorSettings, ModelType @@ -23,6 +24,15 @@ logger = logging.getLogger(__name__) +try: # pragma: no cover - optional dependency + from dlclive import ( + DLCLive, # type: ignore + ) +except Exception as e: # pragma: no cover - handled gracefully + logger.error(f"dlclive package could not be imported: {e}") + DLCLive = None # type: ignore[assignment] + + # Enable profiling to get more detailed timing metrics for debugging and optimization. ENABLE_PROFILING = True From d18dd53bfcaa39d45a640a0821ab298d2627f9cb Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 13:48:54 +0100 Subject: [PATCH 11/27] Improve camera thread cleanup and recorder finalization Add cooperative shutdown timeouts and robust cleanup for camera threads: introduce QUIT_WAIT_MS and TERMINATE_WAIT_MS, connect thread.finished to a new _cleanup_camera that removes frame data and deletes worker/thread QObjects, and enhance stop() to wait, terminate, and log/retain references if threads refuse to terminate (avoids use-after-free/segfaults). Adjust per-camera shutdown checks and ensure frames/timestamps are cleared only after safe termination. In video_recorder, remove premature clearing of the queue and _writer_thread during finalization to avoid unsafe cleanup while writer thread may still be exiting. --- .../services/multi_camera_controller.py | 73 +++++++++++++++---- dlclivegui/services/video_recorder.py | 5 -- 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 1375ba2..16b6ccb 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -5,6 +5,7 @@ import logging import time from dataclasses import dataclass +from functools import partial from threading import Event, Lock import cv2 @@ -20,6 +21,9 @@ LOGGER = logging.getLogger(__name__) +QUIT_WAIT_MS = 5000 # wait for cooperative quit (5s) +TERMINATE_WAIT_MS = 1000 # wait after terminate (1s) + @dataclass class MultiFrameData: @@ -188,8 +192,25 @@ def _start_camera(self, settings: CameraSettings) -> None: self._workers[cam_id] = worker self._threads[cam_id] = thread + thread.finished.connect(partial(self._cleanup_camera, cam_id)) + worker.stopped.connect(thread.quit) thread.start() + def _cleanup_camera(self, camera_id: str) -> None: + # remove stored frame data + with self._frame_lock: + self._frames.pop(camera_id, None) + self._timestamps.pop(camera_id, None) + + worker = self._workers.pop(camera_id, None) + thread = self._threads.pop(camera_id, None) + self._settings.pop(camera_id, None) + + if worker is not None: + worker.deleteLater() + if thread is not None: + thread.deleteLater() + def stop(self, wait: bool = True) -> None: """Stop all cameras.""" if not self._running: @@ -203,22 +224,47 @@ def stop(self, wait: bool = True) -> None: # Wait for threads to finish if wait: + still_running: list[str] = [] for cam_id, thread in list(self._threads.items()): + if thread is None: + continue if not thread.isRunning(): continue thread.quit() - if not thread.wait(5000): - LOGGER.error("Frozen camera thread %s; Forcing terminate()", cam_id) - thread.terminate() - thread.wait(1000) - - self._workers.clear() - self._threads.clear() - self._settings.clear() + if thread.wait(QUIT_WAIT_MS): + continue # Clean exit + + LOGGER.error( + "Camera thread %s did not quit within %dms; forcing terminate()", + cam_id, + QUIT_WAIT_MS, + ) + + thread.terminate() + if thread.wait(TERMINATE_WAIT_MS): + continue # Terminated successfully + + LOGGER.critical( + "Camera thread %s refused to terminate after terminate()+wait(%dms). " + "Keeping references to avoid use-after-free/segfaults. " + "Application restart may be required.", + cam_id, + TERMINATE_WAIT_MS, + ) + still_running.append(cam_id) + + if still_running: + self._started_cameras.clear() + return + self._started_cameras.clear() self._failed_cameras.clear() + with self._frame_lock: + self._frames.clear() + self._timestamps.clear() self._expected_cameras = 0 + self.all_stopped.emit() def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float) -> None: @@ -437,14 +483,9 @@ def _on_camera_stopped(self, camera_id: str) -> None: # Cleanup thread if camera_id in self._threads: - thread = self._threads[camera_id] - if thread.isRunning(): + thread = self._threads.get(camera_id) + if thread is not None and thread.isRunning(): thread.quit() - thread.wait(1000) - del self._threads[camera_id] - - if camera_id in self._workers: - del self._workers[camera_id] # Remove frame data with self._frame_lock: @@ -463,7 +504,7 @@ def _on_camera_stopped(self, camera_id: str) -> None: return # Check if all running cameras have stopped (normal shutdown) - if not self._started_cameras and self._running and not self._workers: + if not self._started_cameras and all(not t.isRunning() for t in self._threads.values() if t is not None): self._running = False self.all_stopped.emit() diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 91d400a..e376c56 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -309,11 +309,6 @@ def _writer_loop(self) -> None: self._finalize_writer() self._save_timestamps() - # Safe cleanup only once the thread is actually exiting - self._queue = None - if self._writer_thread is threading.current_thread(): - self._writer_thread = None - def _finalize_writer(self) -> None: writer = self._writer self._writer = None From b646190b8048a85e0aee4dd83bd95da4db544a88 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 13:49:58 +0100 Subject: [PATCH 12/27] Update dlc_processor.py --- dlclivegui/services/dlc_processor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 51fbbe9..fff79a9 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -457,6 +457,10 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: options["device"] = self._settings.device try: + if DLCLive is None: + raise RuntimeError( + "DLCLive class is not available. Ensure the dlclive package is installed and can be imported." + ) self._dlc = DLCLive(**options) except Exception as exc: with self._lifecycle_lock: From af0ca391ecc9db1d277f23648baa1570ca154a16 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 14:02:25 +0100 Subject: [PATCH 13/27] Add lifecycle management & robust writer shutdown Introduce lifecycle locking and an "abandoned" state to VideoRecorder to prevent unsafe restarts and improve shutdown handling. start() now acquires a lifecycle lock, refuses to restart when a leftover thread is marked abandoned, and initializes/reset writer state inside the lock. stop() now marks the recorder as abandoned and records an encode error if the writer thread fails to terminate within the timeout, and ensures timestamps are saved. The writer loop was simplified to break directly on sentinel or encode errors and now closes WriteGear during finalization with exception handling. Miscellaneous reordering and small bookkeeping fixes were made to improve safety and logging during shutdown. --- dlclivegui/services/video_recorder.py | 132 +++++++++++++++----------- 1 file changed, 79 insertions(+), 53 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index e376c56..0ae735a 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -53,6 +53,7 @@ def __init__( crf: int = 23, buffer_size: int = 240, ): + # Config self._output = Path(output) self._writer: Any | None = None self._frame_size = frame_size @@ -60,10 +61,14 @@ def __init__( self._codec = codec self._crf = int(crf) self._buffer_size = max(1, int(buffer_size)) + # Worker state self._queue: queue.Queue[Any] | None = None self._writer_thread: threading.Thread | None = None self._stop_event = threading.Event() self._stats_lock = threading.Lock() + self._lifecycle_lock = threading.Lock() + self._abandoned = False + # Stats self._frames_enqueued = 0 self._frames_written = 0 self._dropped_frames = 0 @@ -81,37 +86,46 @@ def is_running(self) -> bool: def start(self) -> None: if WriteGear is None: raise RuntimeError("vidgear is required for video recording. Install it with 'pip install vidgear'.") - if self._writer is not None: - return - fps_value = float(self._frame_rate) if self._frame_rate else 30.0 - - writer_kwargs: dict[str, Any] = { - "compression_mode": True, - "logging": False, - "-input_framerate": fps_value, - "-vcodec": (self._codec or "libx264").strip() or "libx264", - "-crf": int(self._crf), - } - # TODO deal with pixel format - - self._output.parent.mkdir(parents=True, exist_ok=True) - self._writer = WriteGear(output=str(self._output), **writer_kwargs) - self._queue = queue.Queue(maxsize=self._buffer_size) - self._frames_enqueued = 0 - self._frames_written = 0 - self._dropped_frames = 0 - self._total_latency = 0.0 - self._last_latency = 0.0 - self._written_times.clear() - self._frame_timestamps.clear() - self._encode_error = None - self._stop_event.clear() - self._writer_thread = threading.Thread( - target=self._writer_loop, - name="VideoRecorderWriter", - daemon=True, - ) - self._writer_thread.start() + + with self._lifecycle_lock: + if self.is_running: + return + if self._abandoned: + raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") + if self._writer is not None: + self._writer = None + self._queue = None + self._writer_thread = None + + fps_value = float(self._frame_rate) if self._frame_rate else 30.0 + + writer_kwargs: dict[str, Any] = { + "compression_mode": True, + "logging": False, + "-input_framerate": fps_value, + "-vcodec": (self._codec or "libx264").strip() or "libx264", + "-crf": int(self._crf), + } + # TODO deal with pixel format + + self._output.parent.mkdir(parents=True, exist_ok=True) + self._writer = WriteGear(output=str(self._output), **writer_kwargs) + self._queue = queue.Queue(maxsize=self._buffer_size) + self._frames_enqueued = 0 + self._frames_written = 0 + self._dropped_frames = 0 + self._total_latency = 0.0 + self._last_latency = 0.0 + self._written_times.clear() + self._frame_timestamps.clear() + self._encode_error = None + self._stop_event.clear() + self._writer_thread = threading.Thread( + target=self._writer_loop, + name="VideoRecorderWriter", + daemon=True, + ) + self._writer_thread.start() def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None) -> None: self._frame_size = frame_size @@ -197,8 +211,16 @@ def stop(self) -> None: if t is not None: t.join(timeout=5.0) if t.is_alive(): - logger.warning("Video recorder thread did not terminate cleanly") - return + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + self._abandoned = True + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return if self._writer is not None: try: @@ -206,9 +228,13 @@ def stop(self) -> None: except Exception: logger.exception("Failed to close WriteGear cleanly") + self._save_timestamps() + self._writer = None self._writer_thread = None self._queue = None + self._stop_event.clear() + self._abandoned = False def get_stats(self) -> RecorderStats | None: if ( @@ -263,10 +289,9 @@ def _writer_loop(self) -> None: self._stop_event.set() break - stop_now = False try: if item is _SENTINEL: - stop_now = True + break else: frame, timestamp = item start = time.perf_counter() @@ -281,19 +306,19 @@ def _writer_loop(self) -> None: self._encode_error = exc logger.exception("Video encoding failed while writing frame", exc_info=exc) self._stop_event.set() - stop_now = True - - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._last_log_time = now + break + else: + elapsed = time.perf_counter() - start + now = time.perf_counter() + with self._stats_lock: + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now finally: # Ensure queue accounting is correct for every item pulled from q @@ -302,12 +327,13 @@ def _writer_loop(self) -> None: except ValueError: pass - if stop_now: - break - finally: - self._finalize_writer() - self._save_timestamps() + writer = self._writer + if writer is not None: + try: + writer.close() + except Exception: + logger.exception("Failed to close WriteGear during writer loop finalization") def _finalize_writer(self) -> None: writer = self._writer From 260ef2f6f55b4aa2572400da2bfc19938e48212f Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 14:33:41 +0100 Subject: [PATCH 14/27] Improve lifecycle handling and stale-writer cleanup dlclivegui/services/dlc_processor.py: Move the frame copy and enqueue timestamp into the lifecycle lock to avoid unnecessary copying when the processor is stopped; on initialization errors set WorkerState.FAULTED under the lifecycle lock before emitting the error. dlclivegui/services/multi_camera_controller.py: Reformat the running-cameras shutdown check into a multi-line condition (keeps the same intent of detecting normal shutdown when no cameras are started and all threads are stopped). dlclivegui/services/video_recorder.py: Add best-effort cleanup for a stale video writer in start() by attempting to call its close() and logging any errors before clearing writer/queue/thread references. Wrap stop() logic with the lifecycle lock to avoid races when stopping, preserve timeout/abandon behavior for a non-terminating writer thread, and ensure writer closure and timestamp saving are handled with proper error logging. --- dlclivegui/services/dlc_processor.py | 8 +- .../services/multi_camera_controller.py | 6 +- dlclivegui/services/video_recorder.py | 89 +++++++++++-------- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index fff79a9..e8730d8 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -221,17 +221,15 @@ def shutdown(self) -> None: self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: - frame_c = frame.copy() - enq_time = time.perf_counter() - with self._lifecycle_lock: if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): return + frame_c = frame.copy() + enq_time = time.perf_counter() t = self._worker_thread if t is None or not t.is_alive(): self._start_worker_locked(frame_c, timestamp) return - q = self._queue # snapshot under lock if q is None: @@ -497,6 +495,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: except Exception as exc: logger.exception("Failed to initialize DLCLive", exc_info=exc) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED self.error.emit(str(exc)) self.initialized.emit(False) return diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 16b6ccb..4ecf8a0 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -504,7 +504,11 @@ def _on_camera_stopped(self, camera_id: str) -> None: return # Check if all running cameras have stopped (normal shutdown) - if not self._started_cameras and all(not t.isRunning() for t in self._threads.values() if t is not None): + if ( + not self._started_cameras + and not self._started_cameras + and all(not t.isRunning() for t in self._threads.values() if t is not None) + ): self._running = False self.all_stopped.emit() diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 0ae735a..2552a8a 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -93,9 +93,21 @@ def start(self) -> None: if self._abandoned: raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") if self._writer is not None: - self._writer = None - self._queue = None - self._writer_thread = None + # Best-effort cleanup of a stale writer to avoid leaking resources. + logger.warning( + "VideoRecorder.start() found an existing writer while not running; " + "attempting to close the stale writer before restarting." + ) + try: + close_method = getattr(self._writer, "close", None) + if callable(close_method): + close_method() + except Exception: + logger.exception("Error while closing stale video writer in start().") + finally: + self._writer = None + self._queue = None + self._writer_thread = None fps_value = float(self._frame_rate) if self._frame_rate else 30.0 @@ -195,46 +207,47 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: return True def stop(self) -> None: - if self._writer is None and not self.is_running: - return + with self._lifecycle_lock: + if self._writer is None and not self.is_running: + return - self._stop_event.set() + self._stop_event.set() - q = self._queue - if q is not None: - try: - q.put_nowait(_SENTINEL) - except queue.Full: - pass - - t = self._writer_thread - if t is not None: - t.join(timeout=5.0) - if t.is_alive(): - with self._stats_lock: - self._encode_error = RuntimeError( - "Failed to stop VideoRecorder within timeout; thread is still alive." - ) - self._abandoned = True - logger.critical( - "Failed to stop VideoRecorder within timeout; thread is still alive. " - "Marking recorder as abandoned to prevent restart." - ) - return + q = self._queue + if q is not None: + try: + q.put_nowait(_SENTINEL) + except queue.Full: + pass + + t = self._writer_thread + if t is not None: + t.join(timeout=5.0) + if t.is_alive(): + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + self._abandoned = True + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return - if self._writer is not None: - try: - self._writer.close() - except Exception: - logger.exception("Failed to close WriteGear cleanly") + if self._writer is not None: + try: + self._writer.close() + except Exception: + logger.exception("Failed to close WriteGear cleanly") - self._save_timestamps() + self._save_timestamps() - self._writer = None - self._writer_thread = None - self._queue = None - self._stop_event.clear() - self._abandoned = False + self._writer = None + self._writer_thread = None + self._queue = None + self._stop_event.clear() + self._abandoned = False def get_stats(self) -> RecorderStats | None: if ( From aef03b5bcf0e306aeb7a2c7269441392ec78cc02 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 14:58:09 +0100 Subject: [PATCH 15/27] Cleanup stale camera workers on start/shutdown Detect and remove stale camera worker/thread state when starting cameras and during shutdown. _start_camera now checks the existing thread for a cam_id and calls _cleanup_camera if the worker exists but the thread is missing or not running, and updates the log message to indicate the camera is already running. The shutdown loop likewise invokes _cleanup_camera for None or not-running threads and after successful quit/terminate waits to ensure worker/thread entries are cleared. These changes prevent stale entries from blocking restarts and ensure cleaner shutdown behavior. --- dlclivegui/services/multi_camera_controller.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 4ecf8a0..075673e 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -172,8 +172,12 @@ def start(self, camera_settings: list[CameraSettings]) -> None: def _start_camera(self, settings: CameraSettings) -> None: """Start a single camera.""" cam_id = get_camera_id(settings) + existing_thread = self._threads.get(cam_id) + if cam_id in self._workers and (existing_thread is None or not existing_thread.isRunning()): + LOGGER.warning(f"Stale camera worker/thread found for {cam_id}, cleaning up") + self._cleanup_camera(cam_id) if cam_id in self._workers: - LOGGER.warning(f"Camera {cam_id} already has a worker") + LOGGER.warning(f"Camera {cam_id} is already running, skipping start") return # Normalize and store the dataclass once @@ -227,12 +231,15 @@ def stop(self, wait: bool = True) -> None: still_running: list[str] = [] for cam_id, thread in list(self._threads.items()): if thread is None: + self._cleanup_camera(cam_id) continue if not thread.isRunning(): + self._cleanup_camera(cam_id) continue thread.quit() if thread.wait(QUIT_WAIT_MS): + self._cleanup_camera(cam_id) continue # Clean exit LOGGER.error( @@ -243,6 +250,7 @@ def stop(self, wait: bool = True) -> None: thread.terminate() if thread.wait(TERMINATE_WAIT_MS): + self._cleanup_camera(cam_id) continue # Terminated successfully LOGGER.critical( From 981e59b9de0714aff01674da96060003ca6140a2 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 15:04:33 +0100 Subject: [PATCH 16/27] Make stop join timeout configurable and add test Introduce STOP_JOIN_TIMEOUT (5s) and use it for the writer thread join timeout instead of a hardcoded value. Reorder lifecycle checks in VideoRecorder.start to raise if the recorder is marked _abandoned before short-circuiting on is_running, preventing accidental restarts of abandoned recorders. Add a BlockingWriteGear fake, fixture, and a test (test_stop_timeout_marks_abandoned_and_prevents_restart) that forces a stop timeout, verifies the recorder is marked abandoned, prevents restart, and performs cleanup of the blocked writer thread. --- dlclivegui/services/video_recorder.py | 8 +-- tests/services/test_video_recorder.py | 72 +++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 2552a8a..74a33f9 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -23,6 +23,8 @@ logger = logging.getLogger(__name__) +STOP_JOIN_TIMEOUT = 5.0 # seconds + @dataclass class RecorderStats: @@ -88,10 +90,10 @@ def start(self) -> None: raise RuntimeError("vidgear is required for video recording. Install it with 'pip install vidgear'.") with self._lifecycle_lock: - if self.is_running: - return if self._abandoned: raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") + if self.is_running: + return if self._writer is not None: # Best-effort cleanup of a stale writer to avoid leaking resources. logger.warning( @@ -222,7 +224,7 @@ def stop(self) -> None: t = self._writer_thread if t is not None: - t.join(timeout=5.0) + t.join(timeout=STOP_JOIN_TIMEOUT) if t.is_alive(): with self._stats_lock: self._encode_error = RuntimeError( diff --git a/tests/services/test_video_recorder.py b/tests/services/test_video_recorder.py index 7665ae3..1ab0c10 100644 --- a/tests/services/test_video_recorder.py +++ b/tests/services/test_video_recorder.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import threading import time from pathlib import Path @@ -75,6 +76,38 @@ def gray_frame(): return np.zeros((48, 64), dtype=np.uint8) +class BlockingWriteGear(FakeWriteGear): + """ + Fake WriteGear that blocks inside write() to simulate a hung encoder/IO stall. + """ + + instances = [] + + def __init__(self, output: str, **kwargs): + super().__init__(output, **kwargs) + self.entered_write = threading.Event() + self.release_write = threading.Event() + BlockingWriteGear.instances.append(self) + + def write(self, frame): + # Signal that the writer thread is now stuck inside write() + self.entered_write.set() + # Block until the test releases us (long timeout as safety, but test releases explicitly) + self.release_write.wait(timeout=60.0) + # If released, behave like normal FakeWriteGear + return super().write(frame) + + +@pytest.fixture +def patch_blocking_writegear(monkeypatch): + """ + Patch module-level WriteGear to BlockingWriteGear for the hung-thread test. + """ + BlockingWriteGear.instances.clear() + monkeypatch.setattr(vr_mod, "WriteGear", BlockingWriteGear) + return BlockingWriteGear + + # ---------------------------- # Tests # ---------------------------- @@ -295,3 +328,42 @@ def test_overlay_frame_size_mismatch_still_detected(patch_writegear, output_path rec.write(np.zeros((48, 64, 3), dtype=np.uint8), timestamp=2.0) rec.stop() + + +def test_stop_timeout_marks_abandoned_and_prevents_restart( + patch_blocking_writegear, monkeypatch, output_path, rgb_frame +): + # Make stop timeout tiny so the test is fast. + monkeypatch.setattr(vr_mod, "STOP_JOIN_TIMEOUT", 0.01) + + rec = vr_mod.VideoRecorder(output_path, frame_size=(48, 64), buffer_size=10) + rec.start() + assert rec.is_running is True + + # Enqueue one frame so writer thread enters BlockingWriteGear.write() and blocks. + assert rec.write(rgb_frame, timestamp=1.0) is True + + wg = patch_blocking_writegear.instances[0] + wait_until(lambda: wg.entered_write.is_set(), timeout=1.0) + + # Now stop: should time out and mark abandoned + rec.stop() + + assert rec._abandoned is True + err = rec._current_error() + assert err is not None + assert "Failed to stop VideoRecorder within timeout" in str(err) + + # Thread should still be alive (blocked in write) + assert rec.is_running is True + + # Restart should be prevented while abandoned + with pytest.raises(RuntimeError, match="Cannot restart VideoRecorder"): + rec.start() + + # ---- Cleanup to avoid leaking a blocked daemon thread into other tests ---- + wg.release_write.set() # let write() return + wait_until(lambda: not rec.is_running, timeout=2.0) + + # Call stop again to clear resources / reset flags (now it can join cleanly) + rec.stop() From 0056d6a7a410f695d5a73b194465f79b336b1c00 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 15:42:42 +0100 Subject: [PATCH 17/27] Fix locking, camera check, and writer finalization Minimize time holding the DLCLiveProcessor lifecycle lock by moving expensive operations (frame.copy and timestamp handling) outside the lock and re-checking state before starting the worker to avoid races. Correct a duplicated condition in MultiCameraController so normal shutdown is detected using self._running. Move writer cleanup into a dedicated _finalize_writer(), add a warning when Queue.task_done() is called too many times, and centralize writer close/exception handling. --- dlclivegui/services/dlc_processor.py | 28 +++++++++++++------ .../services/multi_camera_controller.py | 2 +- dlclivegui/services/video_recorder.py | 8 ++---- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index e8730d8..05fc2be 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -14,8 +14,6 @@ from typing import Any import numpy as np - -# from dlclive import DLCLive from PySide6.QtCore import QObject, Signal from dlclivegui.config import DLCProcessorSettings, ModelType @@ -221,16 +219,30 @@ def shutdown(self) -> None: self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: + # Keep lifecycle lock held only for quick state checks and snapshots. with self._lifecycle_lock: if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): return - frame_c = frame.copy() - enq_time = time.perf_counter() t = self._worker_thread - if t is None or not t.is_alive(): - self._start_worker_locked(frame_c, timestamp) - return - q = self._queue # snapshot under lock + q = self._queue + should_start = t is None or not t.is_alive() + + frame_c = frame.copy() + enq_time = time.perf_counter() + + if should_start: + # Re-acquire the lifecycle lock to safely (re)start the worker if needed. + with self._lifecycle_lock: + # Re-check state in case it changed while we were copying the frame. + if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + return + t = self._worker_thread + if t is None or not t.is_alive(): + # _start_worker_locked expects the lifecycle lock to be held. + self._start_worker_locked(frame_c, timestamp) + return + # Worker is now running; refresh queue snapshot. + q = self._queue if q is None: return diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 075673e..e72d618 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -514,7 +514,7 @@ def _on_camera_stopped(self, camera_id: str) -> None: # Check if all running cameras have stopped (normal shutdown) if ( not self._started_cameras - and not self._started_cameras + and self._running and all(not t.isRunning() for t in self._threads.values() if t is not None) ): self._running = False diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 74a33f9..f346975 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -340,15 +340,11 @@ def _writer_loop(self) -> None: try: q.task_done() except ValueError: + logger.warning("Queue task_done() called too many times in writer loop") pass finally: - writer = self._writer - if writer is not None: - try: - writer.close() - except Exception: - logger.exception("Failed to close WriteGear during writer loop finalization") + self._finalize_writer() def _finalize_writer(self) -> None: writer = self._writer From b5359a3acb50689c53b3de5aaf56c6c4a53a26de Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:17:49 +0100 Subject: [PATCH 18/27] Add background reaper for stalled DLC worker Introduce STOP_WORKER_TIMEOUT and a background reaper to handle worker threads that don't terminate within the stop timeout. Increase join timeout to STOP_WORKER_TIMEOUT, set _pending_reset when a reset is requested while the worker is alive, and schedule _schedule_reap to join the thread in the background instead of immediately marking the processor as FAULTED. The reaper performs final cleanup (clearing thread, queue, state, stop event) and applies a pending DLCLive reset once the thread is reaped, with guards to ensure only one reaper runs at a time. --- dlclivegui/services/dlc_processor.py | 41 ++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 05fc2be..e5481d0 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -21,6 +21,7 @@ from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released logger = logging.getLogger(__name__) +STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before marking as faulted try: # pragma: no cover - optional dependency from dlclive import ( @@ -160,6 +161,9 @@ def __init__(self) -> None: self._lifecycle_lock = threading.Lock() self._stop_event = threading.Event() self._initialized = False + ## Worker cleanup + self._reaping = False + self._pending_reset = False # Statistics tracking self._frames_enqueued = 0 @@ -189,6 +193,8 @@ def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" stopped = self._stop_worker() if not stopped: + with self._lifecycle_lock: + self._pending_reset = True logger.warning( "Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues." ) @@ -335,14 +341,14 @@ def _stop_worker(self) -> bool: self._state = WorkerState.STOPPING self._stop_event.set() - t.join(timeout=2.0) + t.join(timeout=STOP_WORKER_TIMEOUT) if t.is_alive(): qsize = self._queue.qsize() if self._queue is not None else -1 logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize) - with self._lifecycle_lock: - self._state = WorkerState.FAULTED + self._schedule_reap(t) return False + # Normal cleanup with self._lifecycle_lock: self._worker_thread = None self._queue = None @@ -350,6 +356,35 @@ def _stop_worker(self) -> bool: self._stop_event.clear() return True + def _schedule_reap(self, t: threading.Thread) -> None: + with self._lifecycle_lock: + if self._reaping: + return + self._reaping = True + + # ensure only one reaper + def reap(): + try: + t.join() # wait without timeout in background + with self._lifecycle_lock: + # only clean if we're still stopping this thread + if self._worker_thread is t: + self._worker_thread = None + self._queue = None + self._state = WorkerState.STOPPED + self._stop_event.clear() + + if self._pending_reset: + self._dlc = None + self._initialized = False + self._pending_reset = False + finally: + with self._lifecycle_lock: + self._reaping = False + logger.debug("[Stop worker] DLC worker thread reaped; processor is STOPPED again") + + threading.Thread(target=reap, name="DLCLiveReaper", daemon=True).start() + @contextmanager def _timed_processor(self): """ From d834a7bd4a1e5b8df8fe1092b781c39e43714ef4 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:23:10 +0100 Subject: [PATCH 19/27] Cache _queue to local variable in VideoRecorder Cache self._queue to a local variable (q) and use it for subsequent operations to avoid races and repeated attribute access. Remove the redundant assert and simplify qsize lookup by relying on the cached reference. This makes queue checks and put/qsize calls consistent in a multithreaded context and prevents potential None/attribute changes between checks and use. --- dlclivegui/services/video_recorder.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index f346975..20f7453 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -149,7 +149,9 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: error = self._current_error() if error is not None: raise RuntimeError(f"Video encoding failed: {error}") from error - if not self.is_running or self._queue is None: + + q = self._queue + if not self.is_running or q is None: return False if self._stop_event.is_set(): return False @@ -192,12 +194,11 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: return False try: - assert self._queue is not None - self._queue.put((frame, timestamp), block=False) + q.put((frame, timestamp), block=False) except queue.Full: with self._stats_lock: self._dropped_frames += 1 - queue_size = self._queue.qsize() if self._queue is not None else -1 + queue_size = q.qsize() logger.warning( "Video recorder queue full; dropping frame. queue=%d buffer=%d", queue_size, From 6570904ef568a4ff7559f65ef6b04fc577b67125 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:25:39 +0100 Subject: [PATCH 20/27] Prevent configure while processor running Add a lifecycle lock and guard in DLCLiveProcessor.configure to ensure the processor is stopped before reconfiguration. The method now acquires self._lifecycle_lock and raises a RuntimeError if the processor state is not WorkerState.STOPPED, preventing concurrent/race-condition updates to settings or processor while running. --- dlclivegui/services/dlc_processor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index e5481d0..93beabc 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -186,6 +186,9 @@ def get_model_backend(model_path: str) -> Engine: return Engine.from_model_path(model_path) def configure(self, settings: DLCProcessorSettings, processor: Any | None = None) -> None: + with self._lifecycle_lock: + if self._state != WorkerState.STOPPED: + raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") self._settings = settings self._processor = processor From 1c5b5524979c50ee90d1e1f22f3e5a1e4a6317f8 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:35:18 +0100 Subject: [PATCH 21/27] Cleanup abandoned recorder and set pending reset Ensure proper cleanup and restartability when writer threads time out: VideoRecorder.stop now clears abandoned state and associated resources if the writer thread has already exited, allowing the recorder to be fully stopped and restarted. DLCLiveProcessor: moved settings/processor assignment under the lifecycle lock, and mark _pending_reset when shutdown cannot stop the worker thread so the instance can be reaped later. Minor comment tweak for STOP_WORKER_TIMEOUT semantics. Tests updated to assert abandoned is cleared and that recorder can be restarted after the writer exits. --- dlclivegui/services/dlc_processor.py | 8 +++++--- dlclivegui/services/video_recorder.py | 11 ++++++++++- tests/services/test_video_recorder.py | 5 +++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 93beabc..bf11b9f 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -21,7 +21,7 @@ from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released logger = logging.getLogger(__name__) -STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before marking as faulted +STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before STOPPING state and reaping try: # pragma: no cover - optional dependency from dlclive import ( @@ -189,8 +189,8 @@ def configure(self, settings: DLCProcessorSettings, processor: Any | None = None with self._lifecycle_lock: if self._state != WorkerState.STOPPED: raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") - self._settings = settings - self._processor = processor + self._settings = settings + self._processor = processor def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" @@ -220,6 +220,8 @@ def reset(self) -> None: def shutdown(self) -> None: stopped = self._stop_worker() if not stopped: + with self._lifecycle_lock: + self._pending_reset = True logger.warning( "Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released." ) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 20f7453..c3ba68e 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -212,7 +212,16 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: def stop(self) -> None: with self._lifecycle_lock: if self._writer is None and not self.is_running: - return + # If the recorder was previously marked as abandoned because the + # writer thread did not stop in time, but the thread has since + # exited, perform cleanup so the recorder can become fully stopped + # and restartable. + t = self._writer_thread + if self._abandoned and (t is None or not t.is_alive()): + self._writer_thread = None + self._queue = None + self._stop_event.clear() + self._abandoned = False self._stop_event.set() diff --git a/tests/services/test_video_recorder.py b/tests/services/test_video_recorder.py index 1ab0c10..28bb856 100644 --- a/tests/services/test_video_recorder.py +++ b/tests/services/test_video_recorder.py @@ -367,3 +367,8 @@ def test_stop_timeout_marks_abandoned_and_prevents_restart( # Call stop again to clear resources / reset flags (now it can join cleanly) rec.stop() + # After the writer thread has exited, a second stop() should fully reset state + # so that the recorder is no longer marked as abandoned and can be restarted. + assert rec._abandoned is False + rec.start() + rec.stop() From ce97ad109da5076be86150b8e5c93838db1f3173 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 17:01:28 +0100 Subject: [PATCH 22/27] Refactor VideoRecorder.stop lifecycle handling Detect and early-return when recorder is already stopped or recovered from an abandoned state, and reduce the time holding the lifecycle lock by moving queue/thread/writer shutdown work outside the lock. Ensure sentinel is enqueued, writer thread is joined with timeout (marking recorder as abandoned and recording an encode_error if it remains alive), close the writer cleanly, save timestamps, and finally clear lifecycle fields under the lock. Improves concurrency, cleanup correctness, and prevents restarting a recorder that was marked abandoned. --- dlclivegui/services/video_recorder.py | 57 +++++++++++++++------------ 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index c3ba68e..b31c8a8 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -211,7 +211,8 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: def stop(self) -> None: with self._lifecycle_lock: - if self._writer is None and not self.is_running: + already_stopped = (self._writer is None) and (not self.is_running) + if already_stopped: # If the recorder was previously marked as abandoned because the # writer thread did not stop in time, but the thread has since # exited, perform cleanup so the recorder can become fully stopped @@ -222,39 +223,43 @@ def stop(self) -> None: self._queue = None self._stop_event.clear() self._abandoned = False + return self._stop_event.set() - q = self._queue - if q is not None: - try: - q.put_nowait(_SENTINEL) - except queue.Full: - pass - t = self._writer_thread - if t is not None: - t.join(timeout=STOP_JOIN_TIMEOUT) - if t.is_alive(): - with self._stats_lock: - self._encode_error = RuntimeError( - "Failed to stop VideoRecorder within timeout; thread is still alive." - ) + writer = self._writer + + if q is not None: + try: + q.put_nowait(_SENTINEL) + except queue.Full: + pass + + if t is not None: + t.join(timeout=STOP_JOIN_TIMEOUT) + if t.is_alive(): + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + with self._lifecycle_lock: self._abandoned = True - logger.critical( - "Failed to stop VideoRecorder within timeout; thread is still alive. " - "Marking recorder as abandoned to prevent restart." - ) - return + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return - if self._writer is not None: - try: - self._writer.close() - except Exception: - logger.exception("Failed to close WriteGear cleanly") + if writer is not None: + try: + writer.close() + except Exception: + logger.exception("Failed to close WriteGear cleanly") - self._save_timestamps() + self._save_timestamps() + with self._lifecycle_lock: self._writer = None self._writer_thread = None self._queue = None From 9e58bf3d57370cf68ce01902eead6805b9c34a37 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 1 Jun 2026 10:56:11 +0200 Subject: [PATCH 23/27] Disable profiling for now --- dlclivegui/services/dlc_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index bf11b9f..7bb1ab4 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -33,7 +33,7 @@ # Enable profiling to get more detailed timing metrics for debugging and optimization. -ENABLE_PROFILING = True +ENABLE_PROFILING = False class PoseBackends(Enum): From dc579dfad87b82ef08e31bc5e59e1e3a204317a2 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 1 Jun 2026 16:07:54 +0200 Subject: [PATCH 24/27] Log late DLC cleanup; save recorder timestamps Add a warning when the DLC worker thread is cleaned up after a timeout to make late reaping visible in logs. When VideoRecorder fails to stop within the timeout, ensure the recorder is marked abandoned under the lifecycle lock and save timestamps before returning; expand the critical log to note that timestamps were saved (but may be incomplete). These changes improve observability and help preserve timing data for debugging. --- dlclivegui/services/dlc_processor.py | 4 ++++ dlclivegui/services/video_recorder.py | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 7bb1ab4..f0bd8c1 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -383,9 +383,13 @@ def reap(): self._dlc = None self._initialized = False self._pending_reset = False + + logger.warning("DLC worker thread stopped after the timeout and was cleaned up late.") + finally: with self._lifecycle_lock: self._reaping = False + logger.debug("[Stop worker] DLC worker thread reaped; processor is STOPPED again") threading.Thread(target=reap, name="DLCLiveReaper", daemon=True).start() diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index b31c8a8..4ceec10 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -243,13 +243,18 @@ def stop(self) -> None: self._encode_error = RuntimeError( "Failed to stop VideoRecorder within timeout; thread is still alive." ) - with self._lifecycle_lock: - self._abandoned = True - logger.critical( - "Failed to stop VideoRecorder within timeout; thread is still alive. " - "Marking recorder as abandoned to prevent restart." - ) - return + + with self._lifecycle_lock: + self._abandoned = True + + self._save_timestamps() + + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart. " + "Timestamps were saved, but may be incomplete." + ) + return if writer is not None: try: From a8b1a6a7f718bdb4320acbdc16150509fdf71523 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 1 Jun 2026 16:10:40 +0200 Subject: [PATCH 25/27] Handle force-stop with leftover camera threads When a forced stop finds camera threads still running, also clear failed camera state, buffered frames and timestamps (under the frame lock), and reset expected cameras to 0. Add a critical log warning that background camera threads may still be active and emit all_stopped so the GUI can leave the running state; warn that an application restart may be required. This prevents stale state from blocking future starts while making the risk visible in logs. --- dlclivegui/services/multi_camera_controller.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index e72d618..e158142 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -264,6 +264,20 @@ def stop(self, wait: bool = True) -> None: if still_running: self._started_cameras.clear() + self._failed_cameras.clear() + with self._frame_lock: + self._frames.clear() + self._timestamps.clear() + self._expected_cameras = 0 + + LOGGER.critical( + "Force stop failed for camera threads: %s." + "Sending all_stopped to leave GUI in running state," + "but background camera threads may still be active and interfere with future camera starts." + "Application restart may be required.", + still_running, + ) + self.all_stopped.emit() return self._started_cameras.clear() From 9c0443d9a07683a8b96ec84000197d75870f9287 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 1 Jun 2026 16:14:55 +0200 Subject: [PATCH 26/27] Remove direct writer.close() call Stop capturing and closing the WriteGear writer from the shutdown path. The change removes the local `writer` assignment and the try/except block that called `writer.close()`, avoiding potential double-close/race-condition exceptions during shutdown. The recorder closure is handled by the finally block calling _finalize_writer --- dlclivegui/services/video_recorder.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 4ceec10..e2ae15c 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -228,7 +228,6 @@ def stop(self) -> None: self._stop_event.set() q = self._queue t = self._writer_thread - writer = self._writer if q is not None: try: @@ -256,12 +255,6 @@ def stop(self) -> None: ) return - if writer is not None: - try: - writer.close() - except Exception: - logger.exception("Failed to close WriteGear cleanly") - self._save_timestamps() with self._lifecycle_lock: From d3dc4ced42a2f9ee84e4fb5c5254dd9c231cf399 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 1 Jun 2026 16:23:30 +0200 Subject: [PATCH 27/27] Clarify timeout comment and camera log message Tweak wording for clarity in two places: update STOP_WORKER_TIMEOUT comment to better describe waiting in the STOPPING state before scheduling background reaping, and reflow the MultiCameraController critical log message into a single, spaced string to avoid awkward concatenation. These are documentation/logging refinements only and do not change runtime behavior. Files modified: - dlclivegui/services/dlc_processor.py: adjust timeout comment - dlclivegui/services/multi_camera_controller.py: reformat and clarify critical log message --- dlclivegui/services/dlc_processor.py | 2 +- dlclivegui/services/multi_camera_controller.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index f0bd8c1..b4476e1 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -21,7 +21,7 @@ from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released logger = logging.getLogger(__name__) -STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before STOPPING state and reaping +STOP_WORKER_TIMEOUT = 10.0 # # seconds to wait in STOPPING state before scheduling background reaping try: # pragma: no cover - optional dependency from dlclive import ( diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index e158142..18b2883 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -271,9 +271,9 @@ def stop(self, wait: bool = True) -> None: self._expected_cameras = 0 LOGGER.critical( - "Force stop failed for camera threads: %s." - "Sending all_stopped to leave GUI in running state," - "but background camera threads may still be active and interfere with future camera starts." + "Force stop failed for camera threads: %s. " + "Sending all_stopped to leave GUI in running state, " + "but background camera threads may still be active and interfere with future camera starts. " "Application restart may be required.", still_running, )