1111import urllib .error
1212import urllib .request
1313import uuid
14- from typing import Any , Callable , Dict , Literal , Optional , TypedDict , Union
14+ from typing import Any , Callable , Dict , Iterable , List , Literal , Optional , Sequence , TypedDict , Union
1515from cryptography .exceptions import InvalidSignature
1616from cryptography .hazmat .primitives .asymmetric .ed25519 import Ed25519PublicKey
1717
1818
19+ PublicKeyArg = Union [str , Sequence [str ]]
20+
21+
1922DEFAULT_API_BASE_URL = "https://auth.authforge.cc"
2023RATE_LIMIT_RETRY_DELAYS = (2 , 5 )
2124NETWORK_RETRY_DELAY = 2
@@ -62,7 +65,7 @@ def __init__(
6265 self ,
6366 app_id : str ,
6467 app_secret : str ,
65- public_key : str ,
68+ public_key : PublicKeyArg ,
6669 heartbeat_mode : str ,
6770 heartbeat_interval : int = 900 ,
6871 api_base_url : str = DEFAULT_API_BASE_URL ,
@@ -75,8 +78,11 @@ def __init__(
7578 raise ValueError ("app_id must be a non-empty string" )
7679 if not app_secret or not isinstance (app_secret , str ):
7780 raise ValueError ("app_secret must be a non-empty string" )
78- if not public_key or not isinstance (public_key , str ):
79- raise ValueError ("public_key must be a non-empty base64 string" )
81+ public_key_list = self ._normalize_public_key_list (public_key )
82+ if not public_key_list :
83+ raise ValueError (
84+ "public_key must be a non-empty base64 string or list of base64 strings"
85+ )
8086 mode = (heartbeat_mode or "" ).upper ()
8187 if mode not in {"LOCAL" , "SERVER" }:
8288 raise ValueError ("heartbeat_mode must be LOCAL or SERVER" )
@@ -85,7 +91,11 @@ def __init__(
8591
8692 self .app_id = app_id
8793 self .app_secret = app_secret
88- self .public_key = public_key
94+ # `public_key` is the historical public attribute. We now hold the
95+ # full trust list to support key rotation, but expose the first entry
96+ # as `public_key` for callers that read it directly.
97+ self .public_keys : List [str ] = public_key_list
98+ self .public_key = public_key_list [0 ]
8999 self .heartbeat_mode = mode
90100 self .heartbeat_interval = int (heartbeat_interval )
91101 self .api_base_url = api_base_url .rstrip ("/" )
@@ -114,7 +124,9 @@ def __init__(
114124 self ._license_variables : Optional [Dict [str , Any ]] = None
115125 self ._authenticated = False
116126 self ._hwid = self ._resolve_hwid (hwid_override )
117- self ._ed25519_public_key = self ._load_public_key (public_key )
127+ self ._ed25519_public_keys : List [Ed25519PublicKey ] = [
128+ self ._load_public_key (k ) for k in public_key_list
129+ ]
118130
119131 def login (self , license_key : str ) -> bool :
120132 if not license_key or not isinstance (license_key , str ):
@@ -554,13 +566,42 @@ def _verify_signature(self, raw_payload_b64: str, signature: str) -> None:
554566 )
555567 except Exception as exc :
556568 raise ValueError ("invalid_signature_encoding" ) from exc
557- try :
558- self ._ed25519_public_key .verify (
559- signature_bytes ,
560- raw_payload_b64 .encode ("utf-8" ),
561- )
562- except InvalidSignature as exc :
563- raise ValueError ("signature_mismatch" ) from exc
569+ # During a key rotation the SDK may be pinned to the previous key
570+ # while a new server-side key signs responses (or vice-versa). Trust
571+ # any key in the configured list.
572+ payload_bytes = raw_payload_b64 .encode ("utf-8" )
573+ for key in self ._ed25519_public_keys :
574+ try :
575+ key .verify (signature_bytes , payload_bytes )
576+ return
577+ except InvalidSignature :
578+ continue
579+ raise ValueError ("signature_mismatch" )
580+
581+ @staticmethod
582+ def _normalize_public_key_list (value : PublicKeyArg ) -> List [str ]:
583+ """Coerce the public_key constructor arg to a list of base64 strings.
584+
585+ Accepts:
586+ - "abc..." single-key historical contract
587+ - ["abc...", "def..."] current first, previous after
588+ - "abc...,def..." env-var convenience form
589+ """
590+ keys : List [str ] = []
591+ candidates : Iterable [Any ]
592+ if isinstance (value , str ):
593+ candidates = value .split ("," ) if "," in value else [value ]
594+ elif isinstance (value , Sequence ):
595+ candidates = value
596+ else :
597+ return []
598+ for entry in candidates :
599+ if not isinstance (entry , str ):
600+ continue
601+ trimmed = entry .strip ()
602+ if trimmed :
603+ keys .append (trimmed )
604+ return keys
564605
565606 def _generate_nonce (self ) -> str :
566607 return secrets .token_hex (16 )
0 commit comments