From caaec9ea1ed5c60f41b543d6019ff8c3ac4da4d3 Mon Sep 17 00:00:00 2001 From: Gijs Molenaar Date: Sat, 9 May 2026 21:21:36 +0200 Subject: [PATCH] fix: tunnel TLS inside COTP frames instead of wrapping the TCP socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S7CommPlus transports TLS records as the payload of COTP DT frames — TPKT and COTP headers stay unencrypted on the wire. We were wrapping the raw TCP socket with ssl.wrap_socket(), which encrypted TPKT/COTP headers too; the PLC couldn't parse the frame boundaries and dropped. Switch to ssl.MemoryBIO + ssl.SSLObject: TLS encrypt/decrypt happens in memory, and TLS records are sent/received through the existing ISOTCPConnection.send_data/receive_data which adds the TPKT+COTP framing. The TLS handshake itself (ClientHello, ServerHello, etc.) is tunneled through COTP frames, matching what the reference C# driver (thomas-v2/S7CommPlusDriver) does. Verified against kosh271's pcaps (#722): the C# driver sends `TPKT → COTP → [16 03 01 TLS ClientHello]` while we were sending `[16 03 01 TLS ClientHello]` directly on TCP. All existing tests pass; the emulated server tests continue to work because the test server also uses ISOTCPConnection framing. Fixes #722. Co-Authored-By: Claude Opus 4.7 (1M context) --- s7/connection.py | 111 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 28 deletions(-) diff --git a/s7/connection.py b/s7/connection.py index 878890bd..3b7bbddb 100644 --- a/s7/connection.py +++ b/s7/connection.py @@ -105,7 +105,9 @@ def __init__( ) self._ssl_context: Optional[ssl.SSLContext] = None - self._ssl_socket: Optional[ssl.SSLSocket] = None + self._ssl_object: Optional[ssl.SSLObject] = None + self._incoming_bio: Optional[ssl.MemoryBIO] = None + self._outgoing_bio: Optional[ssl.MemoryBIO] = None self._session_id: int = 0 self._sequence_number: int = 0 self._protocol_version: int = 0 # Detected from PLC response @@ -418,7 +420,9 @@ def disconnect(self) -> None: self._connected = False self._session_setup_ok = False self._tls_active = False - self._ssl_socket = None + self._ssl_object = None + self._incoming_bio = None + self._outgoing_bio = None self._oms_secret = None self._session_id = 0 self._sequence_number = 0 @@ -489,7 +493,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes: frame += struct.pack(">BBH", 0x72, frame_version, 0x0000) logger.debug(f" Full frame ({len(frame)} bytes): {frame.hex(' ')}") - self._iso_conn.send_data(frame) + self._send_s7_data(frame) # Increment the appropriate IntegrityId counter after sending if self._with_integrity_id and self._protocol_version >= ProtocolVersion.V2: @@ -499,7 +503,7 @@ def send_request(self, function_code: int, payload: bytes = b"") -> bytes: self._integrity_id_write = (self._integrity_id_write + 1) & 0xFFFFFFFF # Receive response - response_frame = self._iso_conn.receive_data() + response_frame = self._recv_s7_data() logger.debug(f"=== RECV RESPONSE === raw frame ({len(response_frame)} bytes): {response_frame.hex(' ')}") # Parse frame header, use data_length to exclude trailer @@ -575,10 +579,10 @@ def _init_ssl(self) -> None: frame += struct.pack(">BBH", 0x72, ProtocolVersion.V1, 0x0000) logger.debug(f"=== InitSSL === sending ({len(frame)} bytes): {frame.hex(' ')}") - self._iso_conn.send_data(frame) + self._send_s7_data(frame) # Receive InitSSL response - response_frame = self._iso_conn.receive_data() + response_frame = self._recv_s7_data() logger.debug(f"=== InitSSL === received ({len(response_frame)} bytes): {response_frame.hex(' ')}") # Parse S7CommPlus frame header @@ -661,10 +665,10 @@ def _create_session(self) -> None: frame += struct.pack(">BBH", 0x72, ProtocolVersion.V1, 0x0000) logger.debug(f"=== CreateObject === sending ({len(frame)} bytes): {frame.hex(' ')}") - self._iso_conn.send_data(frame) + self._send_s7_data(frame) # Receive response - response_frame = self._iso_conn.receive_data() + response_frame = self._recv_s7_data() logger.debug(f"=== CreateObject === received ({len(response_frame)} bytes): {response_frame.hex(' ')}") # Parse S7CommPlus frame header @@ -901,10 +905,10 @@ def _setup_session(self) -> bool: frame += struct.pack(">BBH", 0x72, self._protocol_version, 0x0000) logger.debug(f"=== SetupSession === sending ({len(frame)} bytes): {frame.hex(' ')}") - self._iso_conn.send_data(frame) + self._send_s7_data(frame) # Receive response - response_frame = self._iso_conn.receive_data() + response_frame = self._recv_s7_data() logger.debug(f"=== SetupSession === received ({len(response_frame)} bytes): {response_frame.hex(' ')}") version, data_length, consumed = decode_header(response_frame) @@ -948,11 +952,11 @@ def _delete_session(self) -> None: frame = encode_header(self._protocol_version, len(request)) + request frame += struct.pack(">BBH", 0x72, self._protocol_version, 0x0000) - self._iso_conn.send_data(frame) + self._send_s7_data(frame) # Best-effort receive try: - self._iso_conn.receive_data() + self._recv_s7_data() except Exception: pass @@ -962,17 +966,55 @@ def _next_sequence_number(self) -> int: self._sequence_number = (self._sequence_number + 1) & 0xFFFF return seq + def _send_s7_data(self, data: bytes) -> None: + """Send an S7CommPlus frame, routing through TLS when active.""" + if self._tls_active: + self._ssl_object.write(data) # type: ignore[union-attr] + self._tls_flush_outgoing() + else: + self._iso_conn.send_data(data) + + def _recv_s7_data(self) -> bytes: + """Receive an S7CommPlus frame, routing through TLS when active.""" + if self._tls_active: + while True: + try: + return self._ssl_object.read(65536) # type: ignore[union-attr] + except ssl.SSLWantReadError: + self._tls_read_incoming() + else: + return self._iso_conn.receive_data() + + def _tls_flush_outgoing(self) -> None: + """Send all pending TLS records through COTP framing.""" + data = self._outgoing_bio.read() # type: ignore[union-attr] + if data: + self._iso_conn.send_data(data) + + def _tls_read_incoming(self) -> None: + """Read a COTP frame and feed its payload to the TLS BIO.""" + data = self._iso_conn.receive_data() + self._incoming_bio.write(data) # type: ignore[union-attr] + def _activate_tls( self, tls_cert: Optional[str] = None, tls_key: Optional[str] = None, tls_ca: Optional[str] = None, ) -> None: - """Activate TLS 1.3 over the COTP connection. + """Activate TLS 1.3 tunneled inside COTP data frames. + + The S7CommPlus protocol transports TLS records as the payload + of COTP DT frames — TPKT and COTP headers stay unencrypted on + the wire. This differs from a standard ``ssl.wrap_socket`` call + which would encrypt everything (including TPKT/COTP), which + Siemens PLCs reject. - Called after InitSSL and before CreateObject. Wraps the underlying - TCP socket with TLS and extracts the OMS exporter secret for - legitimation key derivation. + Uses ``ssl.MemoryBIO`` + ``ssl.SSLObject`` to encrypt/decrypt + without wrapping the TCP socket, keeping TPKT/COTP framing + intact via ``ISOTCPConnection.send_data/receive_data``. + + Called after InitSSL and before CreateObject. Args: tls_cert: Path to client TLS certificate (PEM) @@ -985,29 +1027,42 @@ def _activate_tls( ca_path=tls_ca, ) - # Wrap the raw TCP socket used by ISOTCPConnection - raw_socket = self._iso_conn.socket - if raw_socket is None: - from snap7.error import S7ConnectionError - - raise S7ConnectionError("Cannot activate TLS: no TCP socket") + # BIO-based TLS: encrypt/decrypt bytes without touching the + # TCP socket, so TPKT/COTP framing stays unencrypted. + self._incoming_bio = ssl.MemoryBIO() + self._outgoing_bio = ssl.MemoryBIO() + self._ssl_object = ctx.wrap_bio( + self._incoming_bio, + self._outgoing_bio, + server_side=False, + server_hostname=self.host if ctx.check_hostname else None, + ) - self._ssl_socket = ctx.wrap_socket(raw_socket, server_hostname=self.host) + # TLS handshake — records tunnel through COTP frames + self._do_tls_handshake() - # Replace the socket in ISOTCPConnection so all subsequent - # send_data/receive_data calls go through TLS - self._iso_conn.socket = self._ssl_socket self._tls_active = True # Extract OMS exporter secret for legitimation key derivation try: - self._oms_secret = self._ssl_socket.export_keying_material("EXPERIMENTAL_OMS", 32, None) + self._oms_secret = self._ssl_object.export_keying_material("EXPERIMENTAL_OMS", 32, None) logger.debug("OMS exporter secret extracted from TLS session") except (AttributeError, ssl.SSLError) as e: logger.warning(f"Could not extract OMS exporter secret: {e}") self._oms_secret = None - logger.info("TLS 1.3 activated on COTP connection") + logger.info("TLS 1.3 activated (tunneled inside COTP frames)") + + def _do_tls_handshake(self) -> None: + """Perform TLS handshake, tunneling records through COTP.""" + while True: + try: + self._ssl_object.do_handshake() # type: ignore[union-attr] + break + except ssl.SSLWantReadError: + self._tls_flush_outgoing() + self._tls_read_incoming() + self._tls_flush_outgoing() def _setup_ssl_context( self,