From 6d32cf5c7adf6a5bd182933c9a53741ae40be4bc Mon Sep 17 00:00:00 2001 From: piexian <64474352+piexian@users.noreply.github.com> Date: Sun, 28 Jun 2026 15:45:00 +0800 Subject: [PATCH 1/2] fix(qqofficial): resilient websocket reconnect and send retry - ManagedBotWebSocket: heartbeat (op10/7/11), reconnect backoff, rate-limit delay, session-invalid close codes (4006/4007/4009) - retry startup when gateway metadata unavailable - drop expired reply msg_id and retry proactive send - add tests --- .../qqofficial/qqofficial_message_event.py | 1 + .../qqofficial/qqofficial_platform_adapter.py | 305 +++++++++++++++++- tests/test_qqofficial_group_message_create.py | 193 ++++++++++- 3 files changed, 478 insertions(+), 21 deletions(-) diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py index 68a93b09b9..b35aae0b16 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_message_event.py @@ -494,6 +494,7 @@ async def _send_with_markdown_fallback( logger.info("[QQOfficial] 回复消息失败: %s, 尝试使用主动发送接口。", err) if payload.get("msg_id"): fallback_payload = payload.copy() + fallback_payload.pop("msg_id", None) try: ret = await send_func(fallback_payload) logger.info("[QQOfficial] 使用主动发送接口发送成功。") diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py index fc96d44040..171c743e58 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import json import logging import os import random @@ -12,8 +13,9 @@ import botpy import botpy.message from botpy import Client -from botpy.connection import ConnectionState +from botpy.connection import ConnectionSession, ConnectionState from botpy.gateway import BotWebSocket +from botpy.robot import Robot, Token from astrbot import logger from astrbot.api.event import MessageChain @@ -36,6 +38,16 @@ for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) +_RECONNECT_DELAYS_SECONDS = (1, 2, 5, 10, 30, 60) +_RATE_LIMIT_RECONNECT_DELAY_SECONDS = 60 +_MAX_QUICK_DISCONNECTS = 3 +_QUICK_DISCONNECT_THRESHOLD_SECONDS = 5 +_SESSION_INVALID_CLOSE_CODES = {4006, 4007, 4009} + + +class QQOfficialGatewayUnavailableError(RuntimeError): + """Raised when qq-botpy returns unusable gateway metadata.""" + class PatchedGroupMessage(botpy.message.GroupMessage): class _User: @@ -74,13 +86,100 @@ class ManagedBotWebSocket(BotWebSocket): def __init__(self, session, connection: Any, client: botClient): super().__init__(session, connection) self._client = client + self._heartbeat_interval_seconds: float | None = None + self._last_heartbeat_ack_at = 0.0 + + async def on_connected(self, ws) -> None: + self._client.mark_websocket_connected() + await super().on_connected(ws) + + async def on_message(self, ws, message) -> None: + event = None + try: + payload = json.loads(message) + event = payload.get("t") + except Exception: + pass + + await super().on_message(ws, message) + + if event in {"READY", "RESUMED"}: + self._client.reset_reconnect_backoff() + + async def _is_system_event(self, message_event, ws): + event_op = message_event["op"] + if event_op == self.WS_HELLO: + interval_ms = (message_event.get("d") or {}).get("heartbeat_interval") + if isinstance(interval_ms, int | float) and interval_ms > 0: + self._heartbeat_interval_seconds = interval_ms / 1000 + logger.info(f"[QQOfficial] Gateway heartbeat interval: {interval_ms}ms") + return await super()._is_system_event(message_event, ws) + if event_op == self.WS_HEARTBEAT_ACK: + self._last_heartbeat_ack_at = time.monotonic() + return True + if event_op == self.WS_RECONNECT: + logger.info("[QQOfficial] Gateway requested reconnect.") + self._client.schedule_reconnect_delay("server requested reconnect") + self._connection.add(self._session) + await ws.close() + return True + if event_op == self.WS_INVALID_SESSION: + can_resume = bool(message_event.get("d")) + logger.warning( + f"[QQOfficial] Gateway reported invalid session, can_resume={can_resume}." + ) + if not can_resume: + self._session["session_id"] = "" + self._session["last_seq"] = 0 + self._client.schedule_reconnect_delay("invalid session", custom_delay=3) + self._connection.add(self._session) + await ws.close() + return True + return await super()._is_system_event(message_event, ws) + + async def _send_heart(self, interval): + """Send gateway heartbeat using the interval announced by QQ.""" + + heartbeat_interval = self._heartbeat_interval_seconds or interval + logger.info( + f"[QQOfficial] Heartbeat loop started, interval={heartbeat_interval}s." + ) + while True: + payload = { + "op": self.WS_HEARTBEAT, + "d": self._session["last_seq"], + } + + if self._conn is None: + logger.debug("[QQOfficial] Websocket is closed, stop heartbeat.") + return + if self._conn.closed: + logger.debug("[QQOfficial] Websocket closed, stop heartbeat.") + return + + await self.send_msg(json.dumps(payload)) + await asyncio.sleep(heartbeat_interval) async def on_closed(self, close_status_code, close_msg): if self._client.is_shutting_down: logger.debug("[QQOfficial] Ignore websocket reconnect during shutdown.") return + rate_limited = close_status_code == 4008 + if close_status_code in _SESSION_INVALID_CLOSE_CODES or ( + isinstance(close_status_code, int) and 4900 <= close_status_code <= 4913 + ): + self._can_reconnect = False + self._client.schedule_reconnect_delay( + f"websocket closed: {close_status_code} {close_msg}", + rate_limited=rate_limited, + ) await super().on_closed(close_status_code, close_msg) + async def on_error(self, exception: BaseException): + if not self._client.is_shutting_down: + self._client.schedule_reconnect_delay(f"websocket error: {exception}") + await super().on_error(exception) + async def close(self) -> None: self._can_reconnect = False if self._conn is not None and not self._conn.closed: @@ -93,6 +192,10 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._shutting_down = False self._active_websockets: set[ManagedBotWebSocket] = set() + self._next_connect_at = 0.0 + self._reconnect_attempts = 0 + self._last_connect_at = 0.0 + self._quick_disconnect_count = 0 def set_platform(self, platform: QQOfficialPlatformAdapter) -> None: self.platform = platform @@ -101,6 +204,97 @@ def set_platform(self, platform: QQOfficialPlatformAdapter) -> None: def is_shutting_down(self) -> bool: return self._shutting_down or self.is_closed() + async def _bot_login(self, token: Token) -> None: + logger.info("[QQOfficial] 登录机器人账号中...") + + user = await self.http.login(token) + self._ws_ap = await self.api.get_ws_url() + session_limit = ( + self._ws_ap.get("session_start_limit") + if isinstance(self._ws_ap, dict) + else None + ) + max_concurrency = ( + session_limit.get("max_concurrency") + if isinstance(session_limit, dict) + else None + ) + if not isinstance(max_concurrency, int): + raise QQOfficialGatewayUnavailableError( + "gateway metadata unavailable during qq_official startup" + ) + + self._connection = ConnectionSession( + max_async=max_concurrency, + connect=self.bot_connect, + dispatch=self.ws_dispatch, + loop=self.loop, + api=self.api, + ) + self._connection.state.robot = Robot(user) + + def mark_websocket_connected(self) -> None: + self._last_connect_at = time.monotonic() + + def reset_reconnect_backoff(self) -> None: + if self._reconnect_attempts or self._quick_disconnect_count: + logger.info("[QQOfficial] Websocket session resumed, reset backoff.") + self._next_connect_at = 0.0 + self._reconnect_attempts = 0 + self._quick_disconnect_count = 0 + + def schedule_reconnect_delay( + self, + reason: str, + *, + custom_delay: float | None = None, + rate_limited: bool = False, + ) -> None: + """Schedule the next websocket connection attempt. + + Args: + reason: Human-readable reason for logging. + custom_delay: Explicit reconnect delay in seconds. + rate_limited: Whether QQ reported gateway rate limiting. + """ + + if self.is_shutting_down: + return + + delay = custom_delay + if delay is None and rate_limited: + delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS + if delay is None: + if self._last_connect_at: + duration = time.monotonic() - self._last_connect_at + if duration < _QUICK_DISCONNECT_THRESHOLD_SECONDS: + self._quick_disconnect_count += 1 + else: + self._quick_disconnect_count = 0 + if self._quick_disconnect_count >= _MAX_QUICK_DISCONNECTS: + delay = _RATE_LIMIT_RECONNECT_DELAY_SECONDS + self._quick_disconnect_count = 0 + logger.warning( + "[QQOfficial] Too many quick disconnects; delaying reconnect." + ) + if delay is None: + idx = min( + self._reconnect_attempts, + len(_RECONNECT_DELAYS_SECONDS) - 1, + ) + delay = _RECONNECT_DELAYS_SECONDS[idx] + self._reconnect_attempts += 1 + + self._next_connect_at = max(self._next_connect_at, time.monotonic() + delay) + logger.info(f"[QQOfficial] Reconnect scheduled in {delay}s, reason: {reason}") + + async def wait_for_reconnect_delay(self) -> None: + delay = self._next_connect_at - time.monotonic() + if delay <= 0: + return + logger.info(f"[QQOfficial] Waiting {delay:.1f}s before reconnect.") + await asyncio.sleep(delay) + # 收到群消息 async def on_group_at_message_create( self, message: botpy.message.GroupMessage @@ -165,6 +359,9 @@ def _commit(self, abm: AstrBotMessage) -> None: self.platform.commit_event(self.platform.create_event(abm)) async def bot_connect(self, session) -> None: + await self.wait_for_reconnect_delay() + if self.is_shutting_down: + return logger.info("[QQOfficial] Websocket session starting.") websocket = ManagedBotWebSocket(session, self._connection, self) @@ -191,6 +388,8 @@ async def shutdown(self) -> None: @register_platform_adapter("qq_official", "QQ 机器人官方 API 适配器") class QQOfficialPlatformAdapter(Platform): + STARTUP_RETRY_DELAYS_SECONDS = (5, 10, 30, 60) + def __init__( self, platform_config: dict, @@ -215,13 +414,9 @@ def __init__( public_guild_messages=True, direct_message=guild_dm, ) - self.client = botClient( - intents=self.intents, - bot_log=False, - timeout=20, - ) - - self.client.set_platform(self) + self._shutdown_event = asyncio.Event() + self._startup_retry_attempts = 0 + self.client = self._create_client() _ensure_group_message_create_parser() @@ -231,6 +426,67 @@ def __init__( self.test_mode = os.environ.get("TEST_MODE", "off") == "on" + def _create_client(self) -> botClient: + client = botClient( + intents=self.intents, + bot_log=False, + timeout=20, + ) + client.set_platform(self) + return client + + @staticmethod + def _should_retry_startup_error(error: Exception) -> bool: + if isinstance( + error, + ( + asyncio.TimeoutError, + ConnectionError, + OSError, + QQOfficialGatewayUnavailableError, + ), + ): + return True + if isinstance(error, botpy.errors.ServerError): + error_msg = str(error) + return any( + marker in error_msg + for marker in ("100017", "频率限制", "Too many requests") + ) + return False + + def _next_startup_retry_delay(self, error: Exception | None = None) -> int: + if isinstance(error, botpy.errors.ServerError): + error_msg = str(error) + if any( + marker in error_msg + for marker in ("100017", "频率限制", "Too many requests") + ): + return _RATE_LIMIT_RECONNECT_DELAY_SECONDS + + idx = min( + self._startup_retry_attempts, + len(self.STARTUP_RETRY_DELAYS_SECONDS) - 1, + ) + self._startup_retry_attempts += 1 + return self.STARTUP_RETRY_DELAYS_SECONDS[idx] + + async def _restart_client(self) -> None: + try: + await self.client.shutdown() + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"[QQOfficial] Close client failed during recovery: {e}") + self.client = self._create_client() + + async def _sleep_until_retry_or_shutdown(self, delay: float) -> bool: + try: + await asyncio.wait_for(self._shutdown_event.wait(), timeout=delay) + return False + except asyncio.TimeoutError: + return True + async def send_by_session( self, session: MessageSesion, @@ -696,12 +952,41 @@ async def _parse_from_qqofficial( abm.self_id = "qq_official" return abm - def run(self): - return self.client.start(appid=self.appid, secret=self.secret) + async def run(self) -> None: + while not self._shutdown_event.is_set(): + try: + await self.client.start(appid=self.appid, secret=self.secret) + self._startup_retry_attempts = 0 + if self._shutdown_event.is_set(): + break + logger.warning( + f"[QQOfficial] Client stopped unexpectedly, restarting in " + f"{self.STARTUP_RETRY_DELAYS_SECONDS[0]}s." + ) + await self._restart_client() + if not await self._sleep_until_retry_or_shutdown( + self.STARTUP_RETRY_DELAYS_SECONDS[0] + ): + break + except asyncio.CancelledError: + raise + except Exception as e: + if self._shutdown_event.is_set(): + break + if not self._should_retry_startup_error(e): + raise + delay = self._next_startup_retry_delay(e) + logger.warning( + f"[QQOfficial] Startup failed, retrying in {delay}s: {e}" + ) + await self._restart_client() + if not await self._sleep_until_retry_or_shutdown(delay): + break def get_client(self) -> botClient: return self.client async def terminate(self) -> None: + self._shutdown_event.set() await self.client.shutdown() logger.info("QQ 官方机器人接口 适配器已被关闭") diff --git a/tests/test_qqofficial_group_message_create.py b/tests/test_qqofficial_group_message_create.py index 46d688d158..0ce25c3311 100644 --- a/tests/test_qqofficial_group_message_create.py +++ b/tests/test_qqofficial_group_message_create.py @@ -1,5 +1,7 @@ import asyncio +import json import re +import time from types import SimpleNamespace from typing import Any, cast from unittest.mock import AsyncMock @@ -19,7 +21,12 @@ from astrbot.core.pipeline.result_decorate.stage import ResultDecorateStage from astrbot.core.platform.message_session import MessageSession from astrbot.core.platform.message_type import MessageType +from astrbot.core.platform.sources.qqofficial.qqofficial_message_event import ( + QQOfficialMessageEvent, +) from astrbot.core.platform.sources.qqofficial.qqofficial_platform_adapter import ( + ManagedBotWebSocket, + QQOfficialGatewayUnavailableError, QQOfficialPlatformAdapter, _ensure_group_message_create_parser, ) @@ -66,19 +73,19 @@ def _dispatch_group_message(payload: dict) -> tuple[str, botpy.message.GroupMess return dispatched[0] +def _platform_config() -> dict: + return { + "id": "qq-official-test", + "appid": "123", + "secret": "secret", + "enable_group_c2c": True, + "enable_guild_direct_message": False, + } + + @pytest.mark.asyncio async def test_group_message_create_parser_is_registered_and_dispatches_group_message(): - QQOfficialPlatformAdapter( - { - "id": "qq-official-test", - "appid": "123", - "secret": "secret", - "enable_group_c2c": True, - "enable_guild_direct_message": False, - }, - {}, - asyncio.Queue(), - ) + QQOfficialPlatformAdapter(_platform_config(), {}, asyncio.Queue()) event_name, message = _dispatch_group_message(_make_group_payload()) @@ -87,6 +94,141 @@ async def test_group_message_create_parser_is_registered_and_dispatches_group_me assert message.group_openid == "group-1" +@pytest.mark.asyncio +async def test_qqofficial_bot_login_raises_retryable_error_when_gateway_metadata_missing(): + adapter = QQOfficialPlatformAdapter(_platform_config(), {}, asyncio.Queue()) + adapter.client.http = SimpleNamespace( + login=AsyncMock(return_value=SimpleNamespace()), + close=AsyncMock(), + ) + adapter.client.api = SimpleNamespace(get_ws_url=AsyncMock(return_value=None)) + + with pytest.raises( + QQOfficialGatewayUnavailableError, + match="gateway metadata unavailable", + ): + await adapter.client._bot_login(SimpleNamespace()) + + await adapter.terminate() + + +@pytest.mark.asyncio +async def test_managed_websocket_uses_qq_hello_heartbeat_interval(monkeypatch): + sent_payloads: list[dict] = [] + + class ConnectionStub: + parser = {} + + def add(self, _session): + raise AssertionError("unexpected reconnect") + + class ClientStub: + def mark_websocket_connected(self): + pass + + def reset_reconnect_backoff(self): + pass + + @property + def is_shutting_down(self): + return False + + class WebSocketStub: + closed = False + + async def send_str(self, data: str): + sent_payloads.append(json.loads(data)) + + async def close(self): + self.closed = True + + class TokenStub: + async def check_token(self): + pass + + def get_string(self): + return "QQBot token" + + async def fake_send_msg(data: str): + sent_payloads.append(json.loads(data)) + + async def fake_sleep(delay: float): + assert delay == 41.25 + raise asyncio.CancelledError + + websocket = ManagedBotWebSocket( + { + "session_id": "", + "last_seq": 7, + "intent": 1, + "token": TokenStub(), + "shards": {"shard_id": 0, "shard_count": 1}, + }, + cast(Any, ConnectionStub()), + cast(Any, ClientStub()), + ) + websocket._conn = cast(Any, WebSocketStub()) + monkeypatch.setattr(websocket, "send_msg", fake_send_msg) + + assert await websocket._is_system_event( + {"op": 10, "d": {"heartbeat_interval": 41250}}, + websocket._conn, + ) + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + sent_payloads.clear() + + with pytest.raises(asyncio.CancelledError): + await websocket._send_heart(30) + + assert sent_payloads == [{"op": 1, "d": 7}] + + +def test_bot_client_schedules_rate_limit_reconnect_delay(): + client = QQOfficialBotClient( + intents=botpy.Intents(public_messages=True), + bot_log=False, + ) + now = time.monotonic() + + client.schedule_reconnect_delay("rate limit", rate_limited=True) + + delay = client._next_connect_at - now + assert 59 <= delay <= 61 + + +@pytest.mark.asyncio +async def test_managed_websocket_requeues_on_gateway_reconnect_request(): + queued_sessions: list[dict] = [] + + class ConnectionStub: + parser = {} + + def add(self, session): + queued_sessions.append(session) + + class ClientStub: + @property + def is_shutting_down(self): + return False + + def schedule_reconnect_delay(self, reason: str, **_kwargs): + assert reason == "server requested reconnect" + + class WebSocketStub: + async def close(self): + pass + + session = {"session_id": "sid", "last_seq": 3} + websocket = ManagedBotWebSocket( + session, + cast(Any, ConnectionStub()), + cast(Any, ClientStub()), + ) + + assert await websocket._is_system_event({"op": 7}, cast(Any, WebSocketStub())) + assert queued_sessions == [session] + + @pytest.mark.asyncio async def test_parse_group_message_create_plain_message_has_no_at_component(): _, message = _dispatch_group_message( @@ -285,6 +427,35 @@ async def test_webhook_group_send_by_session_without_cached_msg_id_omits_msg_id( assert adapter._session_last_message_id["group-1"] == "sent-1" +@pytest.mark.asyncio +async def test_qqofficial_reply_fallback_removes_expired_msg_id_for_proactive_send(): + event = object.__new__(QQOfficialMessageEvent) + sent_payloads: list[dict] = [] + + async def send_func(payload: dict): + sent_payloads.append(payload.copy()) + if len(sent_payloads) == 1: + raise botpy.errors.ServerError("回复消息msg_id已过期") + return {"id": "sent-1"} + + ret = await event._send_with_markdown_fallback( + send_func, + { + "content": "hello", + "msg_type": 0, + "msg_id": "expired-msg-id", + "msg_seq": 1, + }, + "hello", + ) + + assert ret == {"id": "sent-1"} + assert sent_payloads[0]["msg_id"] == "expired-msg-id" + assert "msg_id" not in sent_payloads[1] + assert sent_payloads[1]["content"] == "hello" + assert sent_payloads[1]["msg_seq"] == 1 + + def test_qqofficial_ws_is_not_excluded_from_segmented_reply(): stage = RespondStage() stage.enable_seg = True From 85c45c69b4e64970b78debb74acda0432d6a16ed Mon Sep 17 00:00:00 2001 From: piexian <64474352+piexian@users.noreply.github.com> Date: Sun, 28 Jun 2026 16:15:57 +0800 Subject: [PATCH 2/2] fix(qqofficial): prevent duplicate session enqueue + fix async test - WS_RECONNECT/WS_INVALID_SESSION: set _can_reconnect=False before ws.close() so botpy on_closed no longer re-adds the session (review: duplicate enqueue caused concurrent websockets) - mark rate_limit reconnect test async so botpy Client.__init__ finds a running event loop (fixes CI RuntimeError) --- .../platform/sources/qqofficial/qqofficial_platform_adapter.py | 2 ++ tests/test_qqofficial_group_message_create.py | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py index 171c743e58..01445557cd 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py @@ -121,6 +121,7 @@ async def _is_system_event(self, message_event, ws): logger.info("[QQOfficial] Gateway requested reconnect.") self._client.schedule_reconnect_delay("server requested reconnect") self._connection.add(self._session) + self._can_reconnect = False await ws.close() return True if event_op == self.WS_INVALID_SESSION: @@ -133,6 +134,7 @@ async def _is_system_event(self, message_event, ws): self._session["last_seq"] = 0 self._client.schedule_reconnect_delay("invalid session", custom_delay=3) self._connection.add(self._session) + self._can_reconnect = False await ws.close() return True return await super()._is_system_event(message_event, ws) diff --git a/tests/test_qqofficial_group_message_create.py b/tests/test_qqofficial_group_message_create.py index 0ce25c3311..bdf6f93fed 100644 --- a/tests/test_qqofficial_group_message_create.py +++ b/tests/test_qqofficial_group_message_create.py @@ -183,7 +183,8 @@ async def fake_sleep(delay: float): assert sent_payloads == [{"op": 1, "d": 7}] -def test_bot_client_schedules_rate_limit_reconnect_delay(): +@pytest.mark.asyncio +async def test_bot_client_schedules_rate_limit_reconnect_delay(): client = QQOfficialBotClient( intents=botpy.Intents(public_messages=True), bot_log=False,