MSI v2: mTLS Proof-of-Possession with KeyGuard Attestation#897
MSI v2: mTLS Proof-of-Possession with KeyGuard Attestation#897
Conversation
There was a problem hiding this comment.
Pull request overview
Adds MSI v2 support to MSAL Python for acquiring mTLS Proof-of-Possession (PoP) tokens on Windows Azure VMs with Credential Guard / KeyGuard, plus a companion msal-key-attestation package to provide native KeyGuard attestation (via AttestationClientLib.dll).
Changes:
- Introduces
msal.msi_v2implementing the KeyGuard + CSR + IMDS + SChannel/WinHTTP mTLS token acquisition path with in-memory certificate caching. - Extends
ManagedIdentityClient.acquire_token_for_client()with opt-in flags to dispatch to MSI v2 and raisesMsiV2Erroron v2 failures. - Adds a new
msal-key-attestationpackage implementingAttestationClientLib.dllbindings and an in-memory MAA JWT cache, plus tests and sample/guide content.
Reviewed changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
msal/msi_v2.py |
Implements MSI v2 flow (NCrypt/CSR/IMDS/Crypt32/WinHTTP) plus cert caching and cnf binding helper. |
msal/managed_identity.py |
Adds MsiV2Error, new parameters, and v2 dispatch/gating logic. |
msal/__init__.py |
Exports MsiV2Error at package level. |
tests/test_msi_v2.py |
Unit tests for cnf binding, DER helpers, IMDS helper logic, cert cache, and v2 gating behavior. |
sample/msi_v2_sample.py |
Sample script demonstrating MSI v2 token acquisition and optional cnf binding verification. |
sample/MSI_V2_GUIDE.md |
Setup/usage guide documenting flags, behavior matrix, and environment variables. |
msal-key-attestation/msal_key_attestation/attestation.py |
ctypes bindings to AttestationClientLib.dll + in-memory attestation JWT cache. |
msal-key-attestation/msal_key_attestation/__init__.py |
Exposes create_attestation_provider() / get_attestation_jwt and package metadata. |
msal-key-attestation/setup.cfg |
Packaging metadata and dependency on msal. |
msal-key-attestation/setup.py |
Minimal setuptools entrypoint. |
msal-key-attestation/README.md |
Package readme describing purpose, usage, and env vars. |
msal-key-attestation/tests/test_attestation.py |
Unit tests for JWT parsing, cache behavior, and provider factory wiring. |
msal-key-attestation/tests/__init__.py |
Test package marker. |
msal-key-attestation/msal_key_attestation.egg-info/SOURCES.txt |
Generated packaging artifact added to repo. |
msal-key-attestation/msal_key_attestation.egg-info/PKG-INFO |
Generated packaging artifact added to repo. |
msal-key-attestation/msal_key_attestation.egg-info/requires.txt |
Generated packaging artifact added to repo. |
msal-key-attestation/msal_key_attestation.egg-info/top_level.txt |
Generated packaging artifact added to repo. |
msal-key-attestation/msal_key_attestation.egg-info/dependency_links.txt |
Generated packaging artifact added to repo. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Fix ManagedIdentityError: use single descriptive message instead of two args - Fix _try_parse_cert_not_after: update return type to float (always returns fallback) - Fix _der_oid: allow second component >= 40 for OIDs starting with 2.x - Fix _cert_cache_key: include ManagedIdentityIdType to prevent cross-identity collisions - Fix setup.cfg: exclude tests from msal-key-attestation wheel - Remove egg-info build artifacts from git, add to .gitignore - Fix remaining CodeQL clear-text logging alerts in sample (use print instead of logger) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| winhttp.WinHttpSendRequest.argtypes = [ | ||
| ctypes.c_void_p, ctypes.c_wchar_p, wintypes.DWORD, ctypes.c_void_p, | ||
| wintypes.DWORD, wintypes.DWORD, ctypes.c_ulonglong] |
There was a problem hiding this comment.
WinHttpSendRequest’s last parameter is a DWORD_PTR/ULONG_PTR (pointer-sized). Using ctypes.c_ulonglong here will break on 32-bit Python. Prefer wintypes.DWORD_PTR (or ctypes.c_size_t) for the final argtype to match the WinHTTP API on both 32/64-bit.
| wintypes.DWORD, wintypes.DWORD, ctypes.c_ulonglong] | |
| wintypes.DWORD, wintypes.DWORD, ctypes.c_size_t] |
ededbe2 to
639917f
Compare
Blocker 1 — Exception contract: RuntimeError from msal-key-attestation (DLL load, attestation call) now gets caught and wrapped as MsiV2Error at both the provider call site in msi_v2.py and the outer boundary in managed_identity.py. Only MsiV2Error (or its subclasses) can escape to the caller. Blocker 2 — Stable attestation cache key: The provider callback signature is expanded from (endpoint, key_handle, client_id) to (endpoint, key_handle, client_id, cache_key). MSAL now passes the stable per-boot key name as cache_key, which get_attestation_jwt() uses for its MAA token cache instead of falling back to the less cache-friendly numeric handle. Tests: 59 passed (44 core + 15 attestation), including new tests for RuntimeError wrapping and cache_key forwarding. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _cert_cache_key(managed_identity: Optional[Dict[str, Any]], | ||
| attested: bool) -> str: | ||
| """Build a cache key from managed identity + identifier type + attestation flag.""" | ||
| mi_id_type = "SYSTEM_ASSIGNED" | ||
| mi_id = "SYSTEM_ASSIGNED" | ||
| if isinstance(managed_identity, dict): | ||
| mi_id_type = str( | ||
| managed_identity.get("ManagedIdentityIdType") or "SYSTEM_ASSIGNED") | ||
| mi_id = str(managed_identity.get("Id") or "SYSTEM_ASSIGNED") |
There was a problem hiding this comment.
_cert_cache_key() only treats managed_identity as a dict. ManagedIdentityClient passes a ManagedIdentity instance (subclass of UserDict), so user-assigned identities will be treated as system-assigned and can collide in the cert cache. Consider accepting Mapping/UserDict (e.g., collections.abc.Mapping or any object with .get()) instead of strict isinstance(..., dict).
| def _cert_cache_key(managed_identity: Optional[Dict[str, Any]], | |
| attested: bool) -> str: | |
| """Build a cache key from managed identity + identifier type + attestation flag.""" | |
| mi_id_type = "SYSTEM_ASSIGNED" | |
| mi_id = "SYSTEM_ASSIGNED" | |
| if isinstance(managed_identity, dict): | |
| mi_id_type = str( | |
| managed_identity.get("ManagedIdentityIdType") or "SYSTEM_ASSIGNED") | |
| mi_id = str(managed_identity.get("Id") or "SYSTEM_ASSIGNED") | |
| def _cert_cache_key(managed_identity: Optional[Any], | |
| attested: bool) -> str: | |
| """Build a cache key from managed identity + identifier type + attestation flag.""" | |
| mi_id_type = "SYSTEM_ASSIGNED" | |
| mi_id = "SYSTEM_ASSIGNED" | |
| getter = getattr(managed_identity, "get", None) | |
| if callable(getter): | |
| mi_id_type = str( | |
| getter("ManagedIdentityIdType") or "SYSTEM_ASSIGNED") | |
| mi_id = str(getter("Id") or "SYSTEM_ASSIGNED") |
| def _mi_query_params( | ||
| managed_identity: Optional[Dict[str, Any]], | ||
| ) -> Dict[str, str]: | ||
| """Build IMDS query params: cred-api-version=2.0 + optional UAMI selector.""" | ||
| params: Dict[str, str] = {_API_VERSION_QUERY_PARAM: _IMDS_V2_API_VERSION} | ||
| if not isinstance(managed_identity, dict): | ||
| return params | ||
| id_type = managed_identity.get("ManagedIdentityIdType") | ||
| identifier = managed_identity.get("Id") | ||
| mapping = {"ClientId": "client_id", "ObjectId": "object_id", | ||
| "ResourceId": "msi_res_id"} | ||
| wire = mapping.get(id_type) | ||
| if wire and identifier: | ||
| params[wire] = str(identifier) | ||
| return params | ||
|
|
There was a problem hiding this comment.
_mi_query_params() returns only cred-api-version when managed_identity is not a plain dict. In practice, ManagedIdentityClient passes a ManagedIdentity (UserDict) instance, so user-assigned managed identity selectors (client_id/object_id/msi_res_id) won’t be sent to IMDS, breaking MSI v2 for UAMI. Update the type check to handle Mapping/UserDict (or normalize managed_identity to dict before calling).
| # --------------------------------------------------------------------------- | ||
|
|
||
| def _imds_base() -> str: | ||
| return os.getenv(_IMDS_BASE_ENVVAR, _IMDS_DEFAULT_BASE).strip().rstrip("/") |
There was a problem hiding this comment.
_imds_base() will return an empty string if AZURE_POD_IDENTITY_AUTHORITY_HOST is set but empty/whitespace, producing invalid URLs like '/metadata/identity/...'. Consider falling back to the default base when the env var is unset or empty (e.g., read env, strip, and if empty use _IMDS_DEFAULT_BASE).
| return os.getenv(_IMDS_BASE_ENVVAR, _IMDS_DEFAULT_BASE).strip().rstrip("/") | |
| base = os.getenv(_IMDS_BASE_ENVVAR) | |
| if base is None: | |
| return _IMDS_DEFAULT_BASE.rstrip("/") | |
| base = base.strip().rstrip("/") | |
| return base or _IMDS_DEFAULT_BASE.rstrip("/") |
| def _maybe_add_dll_dirs(): | ||
| """Make DLL resolution more reliable (especially for packaged apps).""" | ||
| if sys.platform != "win32": | ||
| return | ||
| add_dir = getattr(os, "add_dll_directory", None) | ||
| if not add_dir: | ||
| return | ||
| for p in (os.path.dirname(sys.executable), | ||
| os.getcwd(), os.path.dirname(__file__)): | ||
| try: | ||
| if p and os.path.isdir(p): | ||
| add_dir(p) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
_maybe_add_dll_dirs() adds os.getcwd() to the DLL search path. This increases DLL preloading/hijacking risk if the working directory is attacker-controlled. Consider removing the current working directory from the default search list (prefer sys.executable dir + package dir), and rely on ATTESTATION_CLIENTLIB_PATH for custom locations.
| p = _mi_query_params( | ||
| {"ManagedIdentityIdType": "SystemAssigned", "Id": None}) | ||
| self.assertEqual(p["cred-api-version"], "2.0") | ||
| self.assertNotIn("client_id", p) | ||
|
|
||
| def test_mi_query_params_client_id(self): | ||
| p = _mi_query_params( | ||
| {"ManagedIdentityIdType": "ClientId", "Id": "abc"}) | ||
| self.assertEqual(p["client_id"], "abc") | ||
|
|
||
| def test_mi_query_params_object_id(self): | ||
| p = _mi_query_params( | ||
| {"ManagedIdentityIdType": "ObjectId", "Id": "oid"}) | ||
| self.assertEqual(p["object_id"], "oid") | ||
|
|
||
| def test_mi_query_params_resource_id(self): | ||
| p = _mi_query_params( | ||
| {"ManagedIdentityIdType": "ResourceId", "Id": "/sub/..."}) | ||
| self.assertEqual(p["msi_res_id"], "/sub/...") | ||
|
|
There was a problem hiding this comment.
The IMDS helper tests pass plain dicts into _mi_query_params() / _cert_cache_key(), but production code passes ManagedIdentity instances (UserDict). Add unit coverage using msal.UserAssignedManagedIdentity(...) and msal.SystemAssignedManagedIdentity() to ensure the helpers handle the real object types and correctly emit client_id/object_id/msi_res_id selectors.
| p = _mi_query_params( | |
| {"ManagedIdentityIdType": "SystemAssigned", "Id": None}) | |
| self.assertEqual(p["cred-api-version"], "2.0") | |
| self.assertNotIn("client_id", p) | |
| def test_mi_query_params_client_id(self): | |
| p = _mi_query_params( | |
| {"ManagedIdentityIdType": "ClientId", "Id": "abc"}) | |
| self.assertEqual(p["client_id"], "abc") | |
| def test_mi_query_params_object_id(self): | |
| p = _mi_query_params( | |
| {"ManagedIdentityIdType": "ObjectId", "Id": "oid"}) | |
| self.assertEqual(p["object_id"], "oid") | |
| def test_mi_query_params_resource_id(self): | |
| p = _mi_query_params( | |
| {"ManagedIdentityIdType": "ResourceId", "Id": "/sub/..."}) | |
| self.assertEqual(p["msi_res_id"], "/sub/...") | |
| p = _mi_query_params(msal.SystemAssignedManagedIdentity()) | |
| self.assertEqual(p["cred-api-version"], "2.0") | |
| self.assertNotIn("client_id", p) | |
| self.assertNotIn("object_id", p) | |
| self.assertNotIn("msi_res_id", p) | |
| def test_mi_query_params_client_id(self): | |
| p = _mi_query_params(msal.UserAssignedManagedIdentity(client_id="abc")) | |
| self.assertEqual(p["client_id"], "abc") | |
| self.assertNotIn("object_id", p) | |
| self.assertNotIn("msi_res_id", p) | |
| def test_mi_query_params_object_id(self): | |
| p = _mi_query_params(msal.UserAssignedManagedIdentity(object_id="oid")) | |
| self.assertEqual(p["object_id"], "oid") | |
| self.assertNotIn("client_id", p) | |
| self.assertNotIn("msi_res_id", p) | |
| def test_mi_query_params_resource_id(self): | |
| p = _mi_query_params( | |
| msal.UserAssignedManagedIdentity(resource_id="/sub/...")) | |
| self.assertEqual(p["msi_res_id"], "/sub/...") | |
| self.assertNotIn("client_id", p) | |
| self.assertNotIn("object_id", p) | |
| def test_cert_cache_key_system_assigned(self): | |
| key = _cert_cache_key(msal.SystemAssignedManagedIdentity()) | |
| self.assertNotIn("client_id", key) | |
| self.assertNotIn("object_id", key) | |
| self.assertNotIn("msi_res_id", key) | |
| def test_cert_cache_key_client_id(self): | |
| key = _cert_cache_key(msal.UserAssignedManagedIdentity(client_id="abc")) | |
| self.assertEqual(key["client_id"], "abc") | |
| self.assertNotIn("object_id", key) | |
| self.assertNotIn("msi_res_id", key) | |
| def test_cert_cache_key_object_id(self): | |
| key = _cert_cache_key(msal.UserAssignedManagedIdentity(object_id="oid")) | |
| self.assertEqual(key["object_id"], "oid") | |
| self.assertNotIn("client_id", key) | |
| self.assertNotIn("msi_res_id", key) | |
| def test_cert_cache_key_resource_id(self): | |
| key = _cert_cache_key( | |
| msal.UserAssignedManagedIdentity(resource_id="/sub/...")) | |
| self.assertEqual(key["msi_res_id"], "/sub/...") | |
| self.assertNotIn("client_id", key) | |
| self.assertNotIn("object_id", key) |
Summary
Adds MSI v2 support for acquiring mTLS Proof-of-Possession tokens on Windows Azure VMs with Credential Guard / KeyGuard. This mirrors the MSAL .NET implementation
(https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/tree/main/src/client/Microsoft.Identity.Client/ManagedIdentity/V2) using a two-package architecture.
Architecture
msalMicrosoft.Identity.Clientmsal-key-attestationMicrosoft.Identity.Client.KeyAttestationAttestationClientLib.dllnative bindingsWhen
with_attestation_support=True, MSAL auto-discovers themsal-key-attestationpackage via import. If not installed, a clear error with install instructions is raised.API
Flow (7 steps)
GET /getplatformmetadata→clientId,tenantId,cuId, attestation endpointcuIdOIDmsal-key-attestationcallsAttestationClientLib.dll→ MAA JWTPOST /issuecredentialwith CSR + attestation JWT → certificate + mTLS endpointPOST /tokento ESTS →mtls_popaccess tokenKey Features
MsiV2Error(no silent degradation)with_attestation_support=Truewithoutmtls_proof_of_possession=TrueraisesManagedIdentityErrorctypes— No C extension build required (ncrypt.dll,crypt32.dll,winhttp.dll)Files Changed
msal/msi_v2.pymsal/managed_identity.pyMsiV2Error+ new params + v2 dispatch logicmsal/__init__.pyMsiV2Errormsal-key-attestation/AttestationClientLib.dllbindings + MAA token cachetests/test_msi_v2.pymsal-key-attestation/tests/test_attestation.pysample/msi_v2_sample.pysample/MSI_V2_GUIDE.mdTest Results
References