diff --git a/backend/app/services/dingtalk_stream.py b/backend/app/services/dingtalk_stream.py index 28a8ba8e2..3f9a0f14c 100644 --- a/backend/app/services/dingtalk_stream.py +++ b/backend/app/services/dingtalk_stream.py @@ -67,98 +67,135 @@ def _run_client_thread( app_secret: str, stop_event: threading.Event, ): - """Run the DingTalk Stream client in a blocking thread.""" + """Run the DingTalk Stream client with auto-reconnect.""" try: import dingtalk_stream + except ImportError: + logger.warning( + "[DingTalk Stream] dingtalk-stream package not installed. " + "Install with: pip install dingtalk-stream" + ) + self._threads.pop(agent_id, None) + self._stop_events.pop(agent_id, None) + return - # Reference to manager's main loop for async dispatch - main_loop = self._main_loop - - class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler): - """Custom handler that dispatches messages to the Clawith LLM pipeline.""" - - async def process(self, callback: dingtalk_stream.CallbackMessage): - """Handle incoming bot message from DingTalk Stream. + MAX_RETRIES = 5 + RETRY_DELAYS = [2, 5, 15, 30, 60] # exponential backoff, seconds + + # Reference to manager's main loop for async dispatch + main_loop = self._main_loop + retries = 0 + + class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler): + """Custom handler that dispatches messages to the Clawith LLM pipeline.""" + + async def process(self, callback: dingtalk_stream.CallbackMessage): + """Handle incoming bot message from DingTalk Stream. + + NOTE: The SDK invokes this method in the thread's own asyncio loop, + so we must dispatch to the main FastAPI loop for DB + LLM work. + """ + try: + # Parse the raw data into a ChatbotMessage via class method + incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + + # Extract text content + text_list = incoming.get_text_list() + user_text = " ".join(text_list).strip() if text_list else "" + + if not user_text: + return dingtalk_stream.AckMessage.STATUS_OK, "empty message" + + sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" + conversation_id = incoming.conversation_id or "" + conversation_type = incoming.conversation_type or "1" + session_webhook = incoming.session_webhook or "" + + logger.info( + f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}" + ) + + # Dispatch to the main FastAPI event loop for DB + LLM processing + from app.api.dingtalk import process_dingtalk_message + + if main_loop and main_loop.is_running(): + future = asyncio.run_coroutine_threadsafe( + process_dingtalk_message( + agent_id=agent_id, + sender_staff_id=sender_staff_id, + user_text=user_text, + conversation_id=conversation_id, + conversation_type=conversation_type, + session_webhook=session_webhook, + ), + main_loop, + ) + # Wait for result (with timeout) + try: + future.result(timeout=120) + except Exception as e: + logger.error(f"[DingTalk Stream] LLM processing error: {e}") + import traceback + traceback.print_exc() + else: + logger.warning("[DingTalk Stream] Main loop not available for dispatch") + + return dingtalk_stream.AckMessage.STATUS_OK, "ok" + except Exception as e: + logger.error(f"[DingTalk Stream] Error in message handler: {e}") + import traceback + traceback.print_exc() + return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e) + + while not stop_event.is_set() and retries <= MAX_RETRIES: + try: + credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret) + client = dingtalk_stream.DingTalkStreamClient(credential=credential) + client.register_callback_handler( + dingtalk_stream.chatbot.ChatbotMessage.TOPIC, + ClawithChatbotHandler(), + ) - NOTE: The SDK invokes this method in the thread's own asyncio loop, - so we must dispatch to the main FastAPI loop for DB + LLM work. - """ - try: - # Parse the raw data into a ChatbotMessage via class method - incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + logger.info( + f"[DingTalk Stream] Connecting for agent {agent_id}... " + f"(attempt {retries + 1}/{MAX_RETRIES + 1})" + ) + # start_forever() blocks until disconnected + client.start_forever() - # Extract text content - text_list = incoming.get_text_list() - user_text = " ".join(text_list).strip() if text_list else "" + # start_forever returned — connection dropped + if stop_event.is_set(): + break # intentional stop, no retry - if not user_text: - return dingtalk_stream.AckMessage.STATUS_OK, "empty message" + retries += 1 + logger.warning( + f"[DingTalk Stream] Connection lost for agent {agent_id}, will retry..." + ) - sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" - conversation_id = incoming.conversation_id or "" - conversation_type = incoming.conversation_type or "1" - session_webhook = incoming.session_webhook or "" + except Exception as e: + retries += 1 + logger.error( + f"[DingTalk Stream] Connection error for {agent_id} " + f"(attempt {retries}/{MAX_RETRIES + 1}): {e}" + ) - logger.info( - f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}" - ) + if retries > MAX_RETRIES: + logger.error( + f"[DingTalk Stream] Agent {agent_id} exhausted all {MAX_RETRIES} retries, giving up" + ) + break - # Dispatch to the main FastAPI event loop for DB + LLM processing - from app.api.dingtalk import process_dingtalk_message - - if main_loop and main_loop.is_running(): - future = asyncio.run_coroutine_threadsafe( - process_dingtalk_message( - agent_id=agent_id, - sender_staff_id=sender_staff_id, - user_text=user_text, - conversation_id=conversation_id, - conversation_type=conversation_type, - session_webhook=session_webhook, - ), - main_loop, - ) - # Wait for result (with timeout) - try: - future.result(timeout=120) - except Exception as e: - logger.error(f"[DingTalk Stream] LLM processing error: {e}") - import traceback - traceback.print_exc() - else: - logger.warning("[DingTalk Stream] Main loop not available for dispatch") - - return dingtalk_stream.AckMessage.STATUS_OK, "ok" - except Exception as e: - logger.error(f"[DingTalk Stream] Error in message handler: {e}") - import traceback - traceback.print_exc() - return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e) - - credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret) - client = dingtalk_stream.DingTalkStreamClient(credential=credential) - client.register_callback_handler( - dingtalk_stream.chatbot.ChatbotMessage.TOPIC, - ClawithChatbotHandler(), + delay = RETRY_DELAYS[min(retries - 1, len(RETRY_DELAYS) - 1)] + logger.info( + f"[DingTalk Stream] Retrying in {delay}s for agent {agent_id}..." ) + # Use stop_event.wait so we exit immediately if stopped + if stop_event.wait(timeout=delay): + break # stop was requested during wait - logger.info(f"[DingTalk Stream] Connecting for agent {agent_id}...") - # start_forever() blocks until disconnected - client.start_forever() - - except ImportError: - logger.warning( - "[DingTalk Stream] dingtalk-stream package not installed. " - "Install with: pip install dingtalk-stream" - ) - except Exception as e: - logger.error(f"[DingTalk Stream] Client error for {agent_id}: {e}") - import traceback - traceback.print_exc() - finally: - self._threads.pop(agent_id, None) - self._stop_events.pop(agent_id, None) - logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}") + self._threads.pop(agent_id, None) + self._stop_events.pop(agent_id, None) + logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}") async def stop_client(self, agent_id: uuid.UUID): """Stop a running Stream client for an agent."""