Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/workos/user_management/_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,9 @@ def authenticate_with_code_pkce(
code: str,
code_verifier: str,
client_id: Optional[str] = None,
ip_address: Optional[str] = None,
device_id: Optional[str] = None,
user_agent: Optional[str] = None,
request_options: Optional[RequestOptions] = None,
) -> AuthenticateResponse:
"""Exchange an authorization code using a PKCE code_verifier.
Expand All @@ -2205,6 +2208,9 @@ def authenticate_with_code_pkce(
code: The authorization code received from the redirect.
code_verifier: The PKCE code verifier generated alongside the code challenge.
client_id: The WorkOS client ID. Defaults to the client's configured ID.
ip_address: The IP address of the user's request.
device_id: A unique identifier for the device.
user_agent: The user agent string from the user's browser.
request_options: Per-request options for headers, timeout, retries, or base URL.

Returns:
Expand All @@ -2219,6 +2225,12 @@ def authenticate_with_code_pkce(
}
if self._client._api_key:
body["client_secret"] = self._client._api_key
if ip_address is not None:
body["ip_address"] = ip_address
if device_id is not None:
body["device_id"] = device_id
if user_agent is not None:
body["user_agent"] = user_agent

return cast(
AuthenticateResponse,
Expand Down Expand Up @@ -4361,6 +4373,9 @@ async def authenticate_with_code_pkce(
code: str,
code_verifier: str,
client_id: Optional[str] = None,
ip_address: Optional[str] = None,
device_id: Optional[str] = None,
user_agent: Optional[str] = None,
request_options: Optional[RequestOptions] = None,
) -> AuthenticateResponse:
"""Exchange an authorization code using a PKCE code_verifier.
Expand All @@ -4369,6 +4384,9 @@ async def authenticate_with_code_pkce(
code: The authorization code received from the redirect.
code_verifier: The PKCE code verifier generated alongside the code challenge.
client_id: The WorkOS client ID. Defaults to the client's configured ID.
ip_address: The IP address of the user's request.
device_id: A unique identifier for the device.
user_agent: The user agent string from the user's browser.
request_options: Per-request options for headers, timeout, retries, or base URL.

Returns:
Expand All @@ -4383,6 +4401,12 @@ async def authenticate_with_code_pkce(
}
if self._client._api_key:
body["client_secret"] = self._client._api_key
if ip_address is not None:
body["ip_address"] = ip_address
if device_id is not None:
body["device_id"] = device_id
if user_agent is not None:
body["user_agent"] = user_agent

return cast(
AuthenticateResponse,
Expand Down
54 changes: 54 additions & 0 deletions tests/test_inline_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,32 @@ def test_includes_client_secret_when_api_key_present(self, workos, httpx_mock):
body = json.loads(request.content)
assert "client_secret" in body

def test_forwards_radar_context(self, workos, httpx_mock):
httpx_mock.add_response(json=load_fixture("authenticate_response.json"))
workos.user_management.authenticate_with_code_pkce(
code="auth_code_123",
code_verifier="test_verifier_abc",
ip_address="203.0.113.42",
device_id="device_01HXYZ",
user_agent="Mozilla/5.0",
)
request = httpx_mock.get_request()
body = json.loads(request.content)
assert body["ip_address"] == "203.0.113.42"
assert body["device_id"] == "device_01HXYZ"
assert body["user_agent"] == "Mozilla/5.0"

def test_omits_radar_context_when_not_provided(self, workos, httpx_mock):
httpx_mock.add_response(json=load_fixture("authenticate_response.json"))
workos.user_management.authenticate_with_code_pkce(
code="auth_code_123", code_verifier="test_verifier_abc"
)
request = httpx_mock.get_request()
body = json.loads(request.content)
assert "ip_address" not in body
assert "device_id" not in body
assert "user_agent" not in body


@pytest.mark.asyncio
class TestAsyncAuthKitPKCECodeExchange:
Expand All @@ -148,6 +174,34 @@ async def test_sends_code_verifier(self, async_workos, httpx_mock):
body = json.loads(request.content)
assert body["code_verifier"] == "test_verifier_abc"

async def test_forwards_radar_context(self, async_workos, httpx_mock):
httpx_mock.add_response(json=load_fixture("authenticate_response.json"))
await async_workos.user_management.authenticate_with_code_pkce(
code="auth_code_123",
code_verifier="test_verifier_abc",
ip_address="203.0.113.42",
device_id="device_01HXYZ",
user_agent="Mozilla/5.0",
)
request = httpx_mock.get_request()
body = json.loads(request.content)
assert body["ip_address"] == "203.0.113.42"
assert body["device_id"] == "device_01HXYZ"
assert body["user_agent"] == "Mozilla/5.0"
Comment thread
greptile-apps[bot] marked this conversation as resolved.

async def test_omits_radar_context_when_not_provided(
self, async_workos, httpx_mock
):
httpx_mock.add_response(json=load_fixture("authenticate_response.json"))
await async_workos.user_management.authenticate_with_code_pkce(
code="auth_code_123", code_verifier="test_verifier_abc"
)
request = httpx_mock.get_request()
body = json.loads(request.content)
assert "ip_address" not in body
assert "device_id" not in body
assert "user_agent" not in body


class TestSSOPKCEAuthorizationUrl:
def test_returns_required_keys(self, workos):
Expand Down
Loading