diff --git a/roborock/data/b01_q10/b01_q10_code_mappings.py b/roborock/data/b01_q10/b01_q10_code_mappings.py index 8020894d..848f7c86 100644 --- a/roborock/data/b01_q10/b01_q10_code_mappings.py +++ b/roborock/data/b01_q10/b01_q10_code_mappings.py @@ -1,4 +1,5 @@ from ..code_mappings import RoborockModeEnum +from enum import Enum class B01_Q10_DP(RoborockModeEnum): @@ -217,3 +218,11 @@ class YXDeviceDustCollectionFrequency(RoborockModeEnum): INTERVAL_30 = "interval_30", 30 INTERVAL_45 = "interval_45", 45 INTERVAL_60 = "interval_60", 60 + +class RemoteCommand(Enum): + FORWARD = 0 + LEFT = 2 + RIGHT = 3 + STOP = 4 + EXIT = 5 + diff --git a/roborock/devices/traits/b01/q10/__init__.py b/roborock/devices/traits/b01/q10/__init__.py index 1cd89bd8..ea415ef8 100644 --- a/roborock/devices/traits/b01/q10/__init__.py +++ b/roborock/devices/traits/b01/q10/__init__.py @@ -12,6 +12,7 @@ from .command import CommandTrait from .status import StatusTrait from .vacuum import VacuumTrait +from .remote import RemoteTrait __all__ = [ "Q10PropertiesApi", @@ -32,11 +33,15 @@ class Q10PropertiesApi(Trait): vacuum: VacuumTrait """Trait for sending vacuum related commands to Q10 devices.""" + remote: RemoteTrait + """Trait for sending remote control related commands to Q10 devices.""" + def __init__(self, channel: MqttChannel) -> None: """Initialize the B01Props API.""" self._channel = channel self.command = CommandTrait(channel) self.vacuum = VacuumTrait(self.command) + self.remote = RemoteTrait(self.command) self.status = StatusTrait() self._subscribe_task: asyncio.Task[None] | None = None diff --git a/roborock/devices/traits/b01/q10/remote.py b/roborock/devices/traits/b01/q10/remote.py new file mode 100644 index 00000000..cfa43c0e --- /dev/null +++ b/roborock/devices/traits/b01/q10/remote.py @@ -0,0 +1,44 @@ +"""Traits for Q10 B01 devices.""" + +from roborock.data.b01_q10.b01_q10_code_mappings import ( + B01_Q10_DP, + RemoteCommand, +) +from typing import Any + +from .command import CommandTrait + + +class RemoteTrait: + """Trait for sending vacuum commands. + + This is a wrapper around the CommandTrait for sending remote related + commands to Q10 devices. + """ + + def __init__(self, command: CommandTrait) -> None: + """Initialize the RemoteTrait.""" + self._command = command + + async def _send_remote(self, action: RemoteCommand) -> None: + await self._command.send(B01_Q10_DP.COMMON, params={B01_Q10_DP.REMOTE: action.value}) + + async def forward(self) -> None: + """Move forward.""" + await self._send_remote(RemoteCommand.FORWARD) + + async def left(self) -> None: + """Turn left.""" + await self._send_remote(RemoteCommand.LEFT) + + async def right(self) -> None: + """Turn right.""" + await self._send_remote(RemoteCommand.RIGHT) + + async def stop(self) -> None: + """Stop last moving command or start remote control.""" + await self._send_remote(RemoteCommand.STOP) + + async def exit(self) -> None: + """Exit remote control.""" + await self._send_remote(RemoteCommand.EXIT) diff --git a/tests/devices/traits/b01/q10/test_remote.py b/tests/devices/traits/b01/q10/test_remote.py new file mode 100644 index 00000000..240c0e7d --- /dev/null +++ b/tests/devices/traits/b01/q10/test_remote.py @@ -0,0 +1,50 @@ +import json +from collections.abc import Awaitable, Callable +from typing import Any + +import pytest + +from roborock.devices.traits.b01.q10 import Q10PropertiesApi +from roborock.devices.traits.b01.q10.remote import RemoteTrait +from tests.fixtures.channel_fixtures import FakeChannel + + +@pytest.fixture(name="fake_channel") +def fake_channel_fixture() -> FakeChannel: + return FakeChannel() + + +@pytest.fixture(name="q10_api") +def q10_api_fixture(fake_channel: FakeChannel) -> Q10PropertiesApi: + return Q10PropertiesApi(fake_channel) # type: ignore[arg-type] + + +@pytest.fixture(name="remote") +def remote_fixture(q10_api: Q10PropertiesApi) -> RemoteTrait: + return q10_api.remote + + +@pytest.mark.parametrize( + ("command_fn", "expected_payload"), + [ + (lambda x: x.forward(), {"101": {"12": 0}}), + (lambda x: x.left(), {"101": {"12": 2}}), + (lambda x: x.right(), {"101": {"12": 3}}), + (lambda x: x.stop(), {"101": {"12": 4}}), + (lambda x: x.exit(), {"101": {"12": 5}}), + ], +) +async def test_remote_commands( + remote: RemoteTrait, + fake_channel: FakeChannel, + command_fn: Callable[[RemoteTrait], Awaitable[None]], + expected_payload: dict[str, Any], +) -> None: + """Test sending a remote start command.""" + await command_fn(remote) + + assert len(fake_channel.published_messages) == 1 + message = fake_channel.published_messages[0] + assert message.payload + payload_data = json.loads(message.payload.decode()) + assert payload_data == {"dps": expected_payload}