Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 118 additions & 81 deletions backend/app/services/dingtalk_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,98 +67,135 @@ def _run_client_thread(
app_secret: str,
stop_event: threading.Event,
):
"""Run the DingTalk Stream client in a blocking thread."""
"""Run the DingTalk Stream client with auto-reconnect."""
try:
import dingtalk_stream
except ImportError:
logger.warning(
"[DingTalk Stream] dingtalk-stream package not installed. "
"Install with: pip install dingtalk-stream"
)
self._threads.pop(agent_id, None)
self._stop_events.pop(agent_id, None)
return

# Reference to manager's main loop for async dispatch
main_loop = self._main_loop

class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler):
"""Custom handler that dispatches messages to the Clawith LLM pipeline."""

async def process(self, callback: dingtalk_stream.CallbackMessage):
"""Handle incoming bot message from DingTalk Stream.
MAX_RETRIES = 5
RETRY_DELAYS = [2, 5, 15, 30, 60] # exponential backoff, seconds

# Reference to manager's main loop for async dispatch
main_loop = self._main_loop
retries = 0

class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler):
"""Custom handler that dispatches messages to the Clawith LLM pipeline."""

async def process(self, callback: dingtalk_stream.CallbackMessage):
"""Handle incoming bot message from DingTalk Stream.

NOTE: The SDK invokes this method in the thread's own asyncio loop,
so we must dispatch to the main FastAPI loop for DB + LLM work.
"""
try:
# Parse the raw data into a ChatbotMessage via class method
incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data)

# Extract text content
text_list = incoming.get_text_list()
user_text = " ".join(text_list).strip() if text_list else ""

if not user_text:
return dingtalk_stream.AckMessage.STATUS_OK, "empty message"

sender_staff_id = incoming.sender_staff_id or incoming.sender_id or ""
conversation_id = incoming.conversation_id or ""
conversation_type = incoming.conversation_type or "1"
session_webhook = incoming.session_webhook or ""

logger.info(
f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}"
)

# Dispatch to the main FastAPI event loop for DB + LLM processing
from app.api.dingtalk import process_dingtalk_message

if main_loop and main_loop.is_running():
future = asyncio.run_coroutine_threadsafe(
process_dingtalk_message(
agent_id=agent_id,
sender_staff_id=sender_staff_id,
user_text=user_text,
conversation_id=conversation_id,
conversation_type=conversation_type,
session_webhook=session_webhook,
),
main_loop,
)
# Wait for result (with timeout)
try:
future.result(timeout=120)
except Exception as e:
logger.error(f"[DingTalk Stream] LLM processing error: {e}")
import traceback
traceback.print_exc()
else:
logger.warning("[DingTalk Stream] Main loop not available for dispatch")

return dingtalk_stream.AckMessage.STATUS_OK, "ok"
except Exception as e:
logger.error(f"[DingTalk Stream] Error in message handler: {e}")
import traceback
traceback.print_exc()
return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e)

while not stop_event.is_set() and retries <= MAX_RETRIES:
try:
credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret)
client = dingtalk_stream.DingTalkStreamClient(credential=credential)
client.register_callback_handler(
dingtalk_stream.chatbot.ChatbotMessage.TOPIC,
ClawithChatbotHandler(),
)

NOTE: The SDK invokes this method in the thread's own asyncio loop,
so we must dispatch to the main FastAPI loop for DB + LLM work.
"""
try:
# Parse the raw data into a ChatbotMessage via class method
incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
logger.info(
f"[DingTalk Stream] Connecting for agent {agent_id}... "
f"(attempt {retries + 1}/{MAX_RETRIES + 1})"
)
# start_forever() blocks until disconnected
client.start_forever()

# Extract text content
text_list = incoming.get_text_list()
user_text = " ".join(text_list).strip() if text_list else ""
# start_forever returned — connection dropped
if stop_event.is_set():
break # intentional stop, no retry

if not user_text:
return dingtalk_stream.AckMessage.STATUS_OK, "empty message"
retries += 1
logger.warning(
f"[DingTalk Stream] Connection lost for agent {agent_id}, will retry..."
)

sender_staff_id = incoming.sender_staff_id or incoming.sender_id or ""
conversation_id = incoming.conversation_id or ""
conversation_type = incoming.conversation_type or "1"
session_webhook = incoming.session_webhook or ""
except Exception as e:
retries += 1
logger.error(
f"[DingTalk Stream] Connection error for {agent_id} "
f"(attempt {retries}/{MAX_RETRIES + 1}): {e}"
)

logger.info(
f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}"
)
if retries > MAX_RETRIES:
logger.error(
f"[DingTalk Stream] Agent {agent_id} exhausted all {MAX_RETRIES} retries, giving up"
)
break

# Dispatch to the main FastAPI event loop for DB + LLM processing
from app.api.dingtalk import process_dingtalk_message

if main_loop and main_loop.is_running():
future = asyncio.run_coroutine_threadsafe(
process_dingtalk_message(
agent_id=agent_id,
sender_staff_id=sender_staff_id,
user_text=user_text,
conversation_id=conversation_id,
conversation_type=conversation_type,
session_webhook=session_webhook,
),
main_loop,
)
# Wait for result (with timeout)
try:
future.result(timeout=120)
except Exception as e:
logger.error(f"[DingTalk Stream] LLM processing error: {e}")
import traceback
traceback.print_exc()
else:
logger.warning("[DingTalk Stream] Main loop not available for dispatch")

return dingtalk_stream.AckMessage.STATUS_OK, "ok"
except Exception as e:
logger.error(f"[DingTalk Stream] Error in message handler: {e}")
import traceback
traceback.print_exc()
return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e)

credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret)
client = dingtalk_stream.DingTalkStreamClient(credential=credential)
client.register_callback_handler(
dingtalk_stream.chatbot.ChatbotMessage.TOPIC,
ClawithChatbotHandler(),
delay = RETRY_DELAYS[min(retries - 1, len(RETRY_DELAYS) - 1)]
logger.info(
f"[DingTalk Stream] Retrying in {delay}s for agent {agent_id}..."
)
# Use stop_event.wait so we exit immediately if stopped
if stop_event.wait(timeout=delay):
break # stop was requested during wait

logger.info(f"[DingTalk Stream] Connecting for agent {agent_id}...")
# start_forever() blocks until disconnected
client.start_forever()

except ImportError:
logger.warning(
"[DingTalk Stream] dingtalk-stream package not installed. "
"Install with: pip install dingtalk-stream"
)
except Exception as e:
logger.error(f"[DingTalk Stream] Client error for {agent_id}: {e}")
import traceback
traceback.print_exc()
finally:
self._threads.pop(agent_id, None)
self._stop_events.pop(agent_id, None)
logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}")
self._threads.pop(agent_id, None)
self._stop_events.pop(agent_id, None)
logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}")

async def stop_client(self, agent_id: uuid.UUID):
"""Stop a running Stream client for an agent."""
Expand Down