From cbe564a3c9ea9d354cade1bad28108c14ea7c015 Mon Sep 17 00:00:00 2001 From: Tomasz Swierszcz Date: Fri, 12 Jun 2026 15:28:32 +0200 Subject: [PATCH 1/2] Add NGTS (Strata Cloud Manager) support Add NGTSConnection for Palo Alto Networks Next-Gen Trust Security, a VaaS-derived backend that reuses CloudConnection's outagedetection/v1 REST endpoints but differs in authentication and zone format: - Auth: Strata Cloud Manager OAuth2 client-credentials via a service account (client_id/client_secret/tsg_id), sent as a Bearer token and auto-refreshed ahead of the ~15-min expiry, instead of the api-key header. - Zone: a Certificate Issuing Template alias only (no Application\CIT split); requests omit applicationId. - Base and token URLs are environment-specific and must be supplied (no hardcoded default). Wire NGTSConnection into venafi_connection (auto-detected from token_url + client_id + client_secret, or platform=VenafiPlatform.NGTS) and add the VenafiPlatform.NGTS enum and NGTS fields on Authentication. Add a usage example, live tests (tests/test_ngts.py, gated on NGTS_* env vars) and offline unit tests, and ignore generated cert material. --- .gitignore | 10 +- examples/ngts/get_cert_ngts.py | 68 ++++++ tests/test_env.py | 9 + tests/test_local_methods.py | 98 ++++++++- tests/test_ngts.py | 87 ++++++++ vcert/__init__.py | 24 ++- vcert/common.py | 11 +- vcert/connection_ngts.py | 383 +++++++++++++++++++++++++++++++++ 8 files changed, 684 insertions(+), 6 deletions(-) create mode 100644 examples/ngts/get_cert_ngts.py create mode 100644 tests/test_ngts.py create mode 100644 vcert/connection_ngts.py diff --git a/.gitignore b/.gitignore index 43877db..a3c10b3 100644 --- a/.gitignore +++ b/.gitignore @@ -162,5 +162,13 @@ secrets.json venv venv3 venv27 +.venv vcert/example_cert.py -credentials \ No newline at end of file +credentials + +# Generated certificate material from example scripts / live tests (never commit) +/cert.pem +/cert.key +/key.pem +/chain.pem +/renewed_cert.pem \ No newline at end of file diff --git a/examples/ngts/get_cert_ngts.py b/examples/ngts/get_cert_ngts.py new file mode 100644 index 0000000..699b75d --- /dev/null +++ b/examples/ngts/get_cert_ngts.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +# +# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk") +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from vcert import (CertificateRequest, venafi_connection, VenafiPlatform) +import string +import random +import logging +from os import environ + +logging.basicConfig(level=logging.INFO) +logging.getLogger("urllib3").setLevel(logging.ERROR) + + +def main(): + # Get credentials from environment variables. + # NGTS (Palo Alto Networks Next-Gen Trust Security) authenticates with Strata Cloud Manager + # OAuth2 client credentials issued by a service account. Both the API base URL and the token + # URL differ per environment (dev/prod), so both must be supplied. + url = environ.get('NGTS_URL') # NGTS API base URL (e.g. https://api.sase.paloaltonetworks.com/ngts) + token_url = environ.get('NGTS_TOKEN_URL') # OAuth2 token endpoint (different FQDN, env-specific) + client_id = environ.get('NGTS_CLIENT_ID') # Service-account client id + client_secret = environ.get('NGTS_CLIENT_SECRET') # Service-account client secret + tsg_id = environ.get('NGTS_TSG_ID') # Tenant service group id (used to build the scope) + scope = environ.get('NGTS_SCOPE') # Optional: a ready "tsg_id:" scope + zone = environ.get('NGTS_ZONE') # Certificate Issuing Template alias (CIT-only) + + # The connection is chosen automatically: when token_url + client_id + client_secret are + # present, an NGTS connection is built. The platform can also be set explicitly: + # conn = venafi_connection(platform=VenafiPlatform.NGTS, ...) + conn = venafi_connection(url=url, token_url=token_url, client_id=client_id, client_secret=client_secret, + tsg_id=tsg_id, scope=scope) + + # Build a Certificate request + request = CertificateRequest(common_name=f"{random_word(10)}.venafi.example.com") + request.san_dns = ["www.dns.venafi.example.com", "ww1.dns.venafi.example.com"] + + # Request the certificate. + conn.request_cert(request, zone) + # Wait for the certificate to be retrieved (until ISSUED or timeout, 180s by default). + cert = conn.retrieve_cert(request) + + # Print the certificate + print(cert.full_chain) + # Save it into a file + with open("./cert.pem", "w") as f: + f.write(cert.full_chain) + + +def random_word(length): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for i in range(length)) + + +if __name__ == '__main__': + main() diff --git a/tests/test_env.py b/tests/test_env.py index e9f5aab..393b014 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -40,5 +40,14 @@ TPP_SSH_CADN = environ.get('TPP_SSH_CADN') +# NGTS (Palo Alto Networks Next-Gen Trust Security) +NGTS_URL = environ.get('NGTS_URL') +NGTS_TOKEN_URL = environ.get('NGTS_TOKEN_URL') +NGTS_CLIENT_ID = environ.get('NGTS_CLIENT_ID') +NGTS_CLIENT_SECRET = environ.get('NGTS_CLIENT_SECRET') +NGTS_TSG_ID = environ.get('NGTS_TSG_ID') +NGTS_SCOPE = environ.get('NGTS_SCOPE') +NGTS_ZONE = environ.get('NGTS_ZONE') + if RANDOM_DOMAIN and not isinstance(RANDOM_DOMAIN, text_type): RANDOM_DOMAIN = RANDOM_DOMAIN.decode() diff --git a/tests/test_local_methods.py b/tests/test_local_methods.py index 2bc8cd1..1490a3b 100644 --- a/tests/test_local_methods.py +++ b/tests/test_local_methods.py @@ -16,13 +16,16 @@ # import json import unittest +from unittest import mock from cryptography import x509 from cryptography.hazmat.backends import default_backend from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection, - logger) + NGTSConnection, logger) +from vcert.connection_ngts import _parse_ngts_zone +from vcert.errors import ClientBadData, ServerUnexptedBehavior from vcert.pem import parse_pem, Certificate pkcs12_enc_cert = """-----BEGIN CERTIFICATE----- @@ -346,3 +349,96 @@ def test_pkcs12_plain_pk(self): cert = Certificate(cert=pkcs12_plain_cert, chain=chain, key=pkcs12_plain_pk) output = cert.as_pkcs12() log.info(f"PKCS12 created successfully:\n{output}") + + # -- NGTS (offline) ----------------------------------------------------------------------- + + @staticmethod + def _ngts_conn(**kwargs): + defaults = dict( + client_id="cid", + client_secret="csecret", + token_url="https://auth.example.com/oauth2/token", + tsg_id="123", + url="https://api.sase.paloaltonetworks.com/ngts", + ) + defaults.update(kwargs) + return NGTSConnection(**defaults) + + def test_parse_ngts_zone(self): + # CIT-only: the whole (trimmed) string is the template alias, no backslash split. + self.assertEqual(_parse_ngts_zone("MyTemplate"), "MyTemplate") + self.assertEqual(_parse_ngts_zone(" MyTemplate "), "MyTemplate") + # A backslash is NOT a separator for NGTS - it is part of the alias. + self.assertEqual(_parse_ngts_zone("App\\CIT"), "App\\CIT") + with self.assertRaises(ClientBadData): + _parse_ngts_zone("") + with self.assertRaises(ClientBadData): + _parse_ngts_zone(None) + + def test_ngts_scope_from_tsg_id(self): + conn = self._ngts_conn() + self.assertEqual(conn._scope, "tsg_id:123") + + def test_ngts_explicit_scope_wins(self): + conn = self._ngts_conn(scope="tsg_id:999", tsg_id=None) + self.assertEqual(conn._scope, "tsg_id:999") + + def test_ngts_requires_url(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(url=None) + + def test_ngts_requires_token_url_without_access_token(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(token_url=None) + + def test_ngts_requires_scope_or_tsg_id(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(tsg_id=None, scope=None) + + def test_ngts_get_access_token(self): + conn = self._ngts_conn() + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.json.return_value = { + 'access_token': 'abc.def.ghi', + 'token_type': 'Bearer', + 'expires_in': 900, + 'scope': 'tsg_id:123', + } + with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp) as post: + token = conn._get_access_token() + + self.assertEqual(token, 'abc.def.ghi') + self.assertEqual(conn._access_token, 'abc.def.ghi') + self.assertIsNotNone(conn._token_expires) + # client_id/client_secret go through HTTP Basic auth; the body carries grant_type + scope. + _, kwargs = post.call_args + self.assertEqual(kwargs['auth'], ('cid', 'csecret')) + self.assertEqual(kwargs['data']['grant_type'], 'client_credentials') + self.assertEqual(kwargs['data']['scope'], 'tsg_id:123') + + def test_ngts_access_token_rejects_non_bearer(self): + conn = self._ngts_conn() + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.json.return_value = {'access_token': 'x', 'token_type': 'mac', 'expires_in': 900} + with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp): + with self.assertRaises(ServerUnexptedBehavior): + conn._get_access_token() + + def test_ngts_auth_header_is_bearer(self): + conn = self._ngts_conn(access_token='pre.issued.token', token_url=None) + headers = conn._auth_headers('application/json') + self.assertEqual(headers['Authorization'], 'Bearer pre.issued.token') + self.assertNotIn('tppl-api-key', headers) + + def test_ngts_get_sends_bearer_header(self): + conn = self._ngts_conn(access_token='pre.issued.token', token_url=None) + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.headers = {'content-type': 'application/json'} + fake_resp.json.return_value = {} + with mock.patch('vcert.connection_ngts.requests.get', return_value=fake_resp) as get: + conn._get("v1/certificateissuingtemplates") + _, kwargs = get.call_args + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer pre.issued.token') diff --git a/tests/test_ngts.py b/tests/test_ngts.py new file mode 100644 index 0000000..b215e5c --- /dev/null +++ b/tests/test_ngts.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# +# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk") +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Live tests for the NGTS (Palo Alto Networks Next-Gen Trust Security) connector. +# These hit a live backend and are skipped unless the NGTS_* credentials are present in the +# environment (see tests/test_env.py). +# +import binascii +import time +import unittest + +from cryptography.hazmat.primitives import hashes + +from test_env import (NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_TSG_ID, NGTS_SCOPE, + NGTS_ZONE) +from test_utils import random_word, enroll, renew, renew_by_thumbprint +from vcert import NGTSConnection, KeyType, logger +from vcert.common import RetireRequest + +log = logger.get_child("test-ngts") + +_HAS_CREDS = all([NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_ZONE]) \ + and (NGTS_TSG_ID or NGTS_SCOPE) + + +@unittest.skipUnless(_HAS_CREDS, "NGTS_* credentials are not set; skipping live NGTS tests") +class TestNGTSMethods(unittest.TestCase): + def setUp(self): + # Built in setUp (not __init__) so collecting this module without NGTS_* creds does not + # try to construct a connection - the class is skipped before setUp runs. + self.ngts_zone = NGTS_ZONE + self.ngts_conn = NGTSConnection(client_id=NGTS_CLIENT_ID, client_secret=NGTS_CLIENT_SECRET, + token_url=NGTS_TOKEN_URL, scope=NGTS_SCOPE, tsg_id=NGTS_TSG_ID, url=NGTS_URL) + + def test_ngts_auth(self): + token = self.ngts_conn.auth() + self.assertTrue(token) + self.assertIsNotNone(self.ngts_conn._token_expires) + + def test_ngts_enroll(self): + cn = f"{random_word(10)}.venafi.example.com" + enroll(self.ngts_conn, self.ngts_zone, cn) + + def test_ngts_renew(self): + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + time.sleep(5) + renew(self.ngts_conn, cert_id, pkey, cert.serial_number, cn) + + def test_ngts_renew_by_thumbprint(self): + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + time.sleep(5) + renew_by_thumbprint(self.ngts_conn, cert) + + def test_ngts_retire_by_thumbprint(self): + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode() + ret_request = RetireRequest(thumbprint=fingerprint) + self.assertTrue(self.ngts_conn.retire_cert(ret_request)) + + def test_ngts_read_zone_config(self): + zone = self.ngts_conn.read_zone_conf(self.ngts_zone) + self.assertIsNotNone(zone.policy) + self.assertTrue(len(zone.policy.key_types) > 0) + + def test_ngts_read_zone_invalid_zone(self): + with self.assertRaises(Exception): + self.ngts_conn.read_zone_conf(f"non-existent-cit-{random_word(8)}") + + +if __name__ == '__main__': + unittest.main() diff --git a/vcert/__init__.py b/vcert/__init__.py index 2ef1acc..02e5bfc 100644 --- a/vcert/__init__.py +++ b/vcert/__init__.py @@ -17,6 +17,7 @@ CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, CHAIN_OPTION_LAST, VenafiPlatform) from .connection_cloud import CloudConnection +from .connection_ngts import NGTSConnection from .connection_tpp import TPPConnection from .connection_tpp_token import TPPTokenConnection from .connection_fake import FakeConnection @@ -54,21 +55,28 @@ def Connection(url=None, token=None, user=None, password=None, fake=False, http_ def venafi_connection(url=None, api_key=None, user=None, password=None, access_token=None, refresh_token=None, - fake=False, http_request_kwargs=None, platform=None): + fake=False, http_request_kwargs=None, platform=None, client_id=None, client_secret=None, + token_url=None, scope=None, tsg_id=None): """ Return connection based on credentials list. CyberArk Platform (CyberArk Certificate Manager, Self-Hosted) requires URL and access_token (or user and password for getting a new access_token) Cloud requires api_key and optional URL + NGTS (Palo Alto Networks Next-Gen Trust Security) requires URL, token_url and OAuth2 service-account credentials (client_id, client_secret, tsg_id/scope) Fake requires no parameters - :param str url: CyberArk Certificate Manager, Self-Hosted or CyberArk Certificate Manager, SaaS URL (for Cloud is optional) + :param str url: CyberArk Certificate Manager, Self-Hosted / SaaS / NGTS URL (for Cloud is optional, required for NGTS) :param str api_key: CyberArk Certificate Manager, SaaS API Key :param str user: CyberArk Certificate Manager, Self-Hosted username for getting new tokens :param str password: CyberArk Certificate Manager, Self-Hosted password for getting new tokens - :param str access_token: CyberArk Certificate Manager, Self-Hosted access token + :param str access_token: CyberArk Certificate Manager, Self-Hosted access token (or a pre-issued NGTS access token) :param str refresh_token: CyberArk Certificate Manager, Self-Hosted refresh token (optional) :param bool fake: Use fake connection :param dict[str, Any] http_request_kwargs: Option for specifying trust bundle or to operate insecurely. :param VenafiPlatform platform: The platform to be used with the Connector + :param str client_id: NGTS OAuth2 service-account client id + :param str client_secret: NGTS OAuth2 service-account client secret + :param str token_url: NGTS OAuth2 token endpoint (differs per environment) + :param str scope: NGTS OAuth2 scope (``tsg_id:``); derived from tsg_id when omitted + :param str tsg_id: NGTS tenant service group id :rtype CommonConnection: """ if platform: @@ -79,11 +87,21 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t refresh_token=refresh_token, http_request_kwargs=http_request_kwargs) elif platform == VenafiPlatform.VAAS: return CloudConnection(token=api_key, url=url, http_request_kwargs=http_request_kwargs) + elif platform == VenafiPlatform.NGTS: + return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope, + tsg_id=tsg_id, access_token=access_token, url=url, + http_request_kwargs=http_request_kwargs) else: raise VenafiError(f"Invalid Platform: {platform}. Cannot instantiate a Connector.") else: if fake: return FakeConnection() + # NGTS is detected before the TPP/Cloud branches so its OAuth service-account credentials + # are not shadowed by them. + if token_url and client_id and client_secret: + return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope, + tsg_id=tsg_id, access_token=access_token, url=url, + http_request_kwargs=http_request_kwargs) if url and (access_token or refresh_token or (user and password)): return TPPTokenConnection(url=url, user=user, password=password, access_token=access_token, refresh_token=refresh_token, http_request_kwargs=http_request_kwargs) diff --git a/vcert/common.py b/vcert/common.py index 3b55d4c..eb8742c 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -609,7 +609,8 @@ def __init__(self, req_id=None, thumbprint=None, guid=None, description=None): class Authentication: def __init__(self, user=None, password=None, access_token=None, refresh_token=None, api_key=None, state=None, - token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM): + token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM, client_secret=None, token_url=None, + tsg_id=None): self.user = user self.password = password self.access_token = access_token @@ -617,6 +618,13 @@ def __init__(self, user=None, password=None, access_token=None, refresh_token=No self.token_expires = token_expires self.api_key = api_key self.client_id = client_id + self.client_secret = client_secret + self.token_url = token_url + self.tsg_id = tsg_id + # NGTS OAuth scope is structured as "tsg_id:" (tenant service group id). + # Derive it from tsg_id when an explicit scope was not supplied. + if tsg_id and scope == SCOPE_CM: + scope = f"tsg_id:{tsg_id}" self.scope = scope self.state = state @@ -772,4 +780,5 @@ def __new__(cls, value, description): FAKE = 100, "Connector for testing purposes" TPP = 200, "Trust Protection Platform" + NGTS = 300, "Next-Gen Trust Security" VAAS = 400, "Venafi as a Service" diff --git a/vcert/connection_ngts.py b/vcert/connection_ngts.py new file mode 100644 index 0000000..2800ffa --- /dev/null +++ b/vcert/connection_ngts.py @@ -0,0 +1,383 @@ +# +# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk") +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import re +from datetime import datetime, timedelta + +import requests + +from .common import (ZoneConfig, CertField, CertificateRequest, KeyType, get_ip_address, MIME_JSON, MIME_ANY, + CSR_ORIGIN_SERVICE) +from .connection_cloud import CloudConnection, URLS, APPLICATION_SERVER_TYPE_ID +from .errors import (VenafiConnectionError, ServerUnexptedBehavior, ClientBadData, CertificateRequestError, + CertificateRenewError, VenafiError) +from .http_status import HTTPStatus +from .logger import get_child + +# OAuth2 access tokens issued by Strata Cloud Manager live ~15 minutes. Refresh a little +# ahead of expiry so in-flight calls never race the boundary (mirrors Go's +# tokenBufferToExpiryWindow). +TOKEN_EXPIRY_BUFFER_SECONDS = 120 +OAUTH_TOKEN_TYPE = "Bearer" # nosec B105 +DEFAULT_TOKEN_LIFESPAN_SECONDS = 900 + +log = get_child("connection-ngts") + + +def _parse_ngts_zone(zone): + """ + NGTS zones are a Certificate Issuing Template alias only - the entire zone string is the + template name. Unlike Cloud/VaaS there is no ``Application\\CIT`` split and no Applications API. + + :param str zone: + :rtype: str + """ + if not zone: + log.error("Invalid Zone. It is empty") + raise ClientBadData("You need to specify a zone") + return zone.strip() + + +class NGTSConnection(CloudConnection): + """ + Connector for Palo Alto Networks Next-Gen Trust Security (NGTS). + + NGTS is VaaS-derived: it reuses the same ``outagedetection/v1/*`` REST endpoints as + :class:`CloudConnection`. Only authentication and zone format differ: + + - Auth is Strata Cloud Manager OAuth2 client-credentials via a service account + (Client ID, Client Secret, TSG ID). Resource calls carry ``Authorization: Bearer `` + instead of the ``tppl-api-key`` header. + - Zones are a Certificate Issuing Template alias only (no ``Application\\CIT`` split), and + request payloads omit ``applicationId``. + """ + + def __init__(self, client_id, client_secret, token_url, scope=None, tsg_id=None, access_token=None, url=None, + http_request_kwargs=None): + # The NGTS API base URL and token URL both differ per environment (dev/prod), including the + # path, so neither can be hardcoded - both must be supplied by the caller. + if not url: + raise ClientBadData("NGTS requires the API base URL (it differs per environment)") + if not access_token and not token_url: + raise ClientBadData("NGTS requires the token URL (it differs per environment) " + "when no access_token is supplied") + + if not scope: + if not tsg_id: + raise ClientBadData("NGTS requires either a scope or a tsg_id") + scope = f"tsg_id:{tsg_id}" + + # CloudConnection.__init__ normalizes/verifies the base URL and sets up + # self._http_request_kwargs. The Bearer token replaces the api-key token entirely. + super().__init__(token=None, url=url, http_request_kwargs=http_request_kwargs) + + self._client_id = client_id + self._client_secret = client_secret + self._token_url = token_url + self._scope = scope + self._tsg_id = tsg_id + self._access_token = access_token + self._token_expires = None + + def __str__(self): + return f"[NGTS] {self._base_url}" + + def _normalize_and_verify_base_url(self): + # Unlike Cloud (host-only), NGTS base URLs carry an environment-specific path + # (e.g. https://api.sase.paloaltonetworks.com/ngts), so path segments must be allowed. + u = self._base_url + if u.startswith('http://'): + u = f"https://{u[7:]}" + elif not u.startswith('https://'): + u = f"https://{u}" + if not u.endswith("/"): + u += "/" + if not re.match(r"^https://[a-z\d]+[-a-z\d.]+[a-z\d][:\d]*(/[-a-zA-Z\d._~]+)*/$", u): + raise ClientBadData + self._base_url = u + + # -- Authentication -------------------------------------------------------------------------- + + def _get_access_token(self): + """ + Fetch an OAuth2 access token via the client-credentials grant. ``client_id``/ + ``client_secret`` are sent through HTTP Basic auth; the body carries only ``grant_type`` + and the structured ``scope`` (``tsg_id:``). + + :rtype: str + """ + if not self._client_id or not self._client_secret: + raise ClientBadData("client_id and client_secret are required to fetch an access token") + + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + data = { + 'grant_type': 'client_credentials', + 'scope': self._scope, + } + r = requests.post(self._token_url, data=data, auth=(self._client_id, self._client_secret), + headers=headers, **self._http_request_kwargs) # nosec B113 + if r.status_code != HTTPStatus.OK: + log.error(f"Failed to obtain access token. Server status: {r.status_code}") + raise VenafiConnectionError(f"Failed to obtain access token. Server status: {r.status_code}") + + response = r.json() + token_type = response.get('token_type') + if token_type != OAUTH_TOKEN_TYPE: + log.error(f"Unexpected token type: {token_type}") + raise ServerUnexptedBehavior(f"Unexpected token type: {token_type}") + + self._access_token = response.get('access_token') + if not self._access_token: + raise ServerUnexptedBehavior("Access token missing from token response") + + expires_in = response.get('expires_in', DEFAULT_TOKEN_LIFESPAN_SECONDS) + self._token_expires = datetime.now() + timedelta(seconds=expires_in - TOKEN_EXPIRY_BUFFER_SECONDS) + return self._access_token + + def auth(self): + """ + Use a valid supplied access token, otherwise fetch a new one. + """ + if not (self._access_token and self._token_is_valid()): + self._get_access_token() + return self._access_token + + def _token_is_valid(self): + """ + :rtype: bool + """ + if not self._access_token: + return False + # A supplied token without a known expiry is taken at face value. + if self._token_expires is None: + return True + return datetime.now() < self._token_expires + + def _ensure_token(self): + """ + Lazily (re-)fetch the access token when it is missing or near expiry and client + credentials are available. Simpler than Go's background renewal goroutine and adequate + for an SDK that authenticates at call time. + """ + if self._token_is_valid(): + return + if self._client_id and self._client_secret: + self._get_access_token() + elif not self._access_token: + raise ClientBadData("No valid access token and no client credentials to obtain one") + + def _auth_headers(self, accept): + """ + :param str accept: + :rtype: dict[str, str] + """ + return { + 'Authorization': f"Bearer {self._access_token}", + 'accept': accept, + 'cache-control': "no-cache", + } + + # -- HTTP verbs (Bearer auth instead of tppl-api-key) -------------------------------------- + + def _get(self, url, params=None): + self._ensure_token() + headers = self._auth_headers(MIME_ANY) + r = requests.get(self._base_url + url, params=params, headers=headers, + **self._http_request_kwargs) # nosec B113 + return self.process_server_response(r) + + def _post(self, url, data=None): + self._ensure_token() + headers = self._auth_headers(MIME_JSON) + if isinstance(data, dict): + r = requests.post(self._base_url + url, json=data, headers=headers, + **self._http_request_kwargs) # nosec B113 + else: + log.error(f"Unexpected client data type: {type(data)} for {url}") + raise ClientBadData + return self.process_server_response(r) + + def _put(self, url, data=None): + self._ensure_token() + headers = self._auth_headers(MIME_JSON) + if isinstance(data, dict): + r = requests.put(self._base_url + url, json=data, headers=headers, + **self._http_request_kwargs) # nosec B113 + else: + log.error(f"Unexpected client data type: {type(data)} for {url}") + raise ClientBadData + return self.process_server_response(r) + + # -- Certificate lifecycle (deltas vs Cloud) ---------------------------------------------- + + def _get_cit_or_fail(self, zone): + """ + Resolve the Certificate Issuing Template for an NGTS (CIT-only) zone via the global + template list. + + :param str zone: + :rtype: dict + """ + cit = self._get_cit(_parse_ngts_zone(zone)) + if not cit: + log.error(f"Certificate issuing template not found for zone [{zone}]") + raise VenafiError(f"Certificate issuing template not found for zone [{zone}]") + return cit + + def request_cert(self, request, zone): + cit = self._get_cit_or_fail(zone) + cit_id = cit['id'] + + ip_address = get_ip_address() + request_data = { + 'certificateIssuingTemplateId': cit_id, + 'apiClientInformation': { + 'type': request.origin, + 'identifier': ip_address + } + } + zone_config = self.read_zone_conf(zone) + request.update_from_zone_config(zone_config) + + if request.csr_origin != CSR_ORIGIN_SERVICE: + if not request.csr: + request.build_csr() + request_data['certificateSigningRequest'] = request.csr + else: + request_data['isVaaSGenerated'] = True + request_data['applicationServerTypeId'] = APPLICATION_SERVER_TYPE_ID + request_data['csrAttributes'] = self._get_service_generated_csr_attr(request, zone) + + if request.validity_hours is not None: + request_data['validityPeriod'] = f"PT{request.validity_hours}H" + + status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=request_data) + if status == HTTPStatus.CREATED: + request.id = data['certificateRequests'][0]['id'] + if 'certificateIds' in data['certificateRequests'][0] \ + and len(data['certificateRequests'][0]['certificateIds']) > 0: + request.cert_guid = data['certificateRequests'][0]['certificateIds'][0] + return True + else: + log.error(f"unexpected server response {status}: {data}") + raise CertificateRequestError + + def renew_cert(self, request, reuse_key=False): + cert_request_id = None + if not request.id and not request.thumbprint: + log.error("prev_cert_id or thumbprint or manage_id must be specified for renewing certificate") + raise ClientBadData + + if request.thumbprint: + response = self.search_by_thumbprint(request.thumbprint) + cert_request_id = response.csrId + + if request.id: + cert_request_id = request.id + + prev_request = self._get_cert_status(CertificateRequest(cert_id=cert_request_id)) + certificate_id = prev_request.certificateIds[0] + cit_id = prev_request.citId + + if not certificate_id or not cit_id: + log.error("Can't find certificate_id") + raise ClientBadData + + status, data = self._get(URLS.CERTIFICATE_BY_ID.format(certificate_id)) + if status == HTTPStatus.OK: + request.id = data['certificateRequestId'] + else: + raise ServerUnexptedBehavior + + ip_address = get_ip_address() + d = {'existingCertificateId': certificate_id, + 'certificateIssuingTemplateId': cit_id, + 'apiClientInformation': { + 'type': request.origin, + 'identifier': ip_address + }} + + if reuse_key: + if request.csr: + d['certificateSigningRequest'] = request.csr + d['reuseCSR'] = False + else: + log.error("Certificate renew by reusing the CSR is not supported right now. " + "Set [reuse_key] to False or just remove it") + raise VenafiError + else: + c = data + if c.get('subjectCN'): + request.common_name = c['subjectCN'][0] + if c.get('subjectC'): + request.country = c['subjectC'] + if c.get('subjectO'): + request.organization = c['subjectO'] + if c.get('subjectOU'): + request.organizational_unit = c['subjectOU'] + if c.get('subjectL'): + request.locality = c['subjectL'] + if c.get('subjectAlternativeNameDns'): + request.san_dns = c['subjectAlternativeNameDns'] + request.key_type = KeyType(KeyType.RSA, c['keyStrength']) + request.build_csr() + d['certificateSigningRequest'] = request.csr + d['reuseCSR'] = False + + status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=d) + if status == HTTPStatus.CREATED: + request.id = data['certificateRequests'][0]['id'] + return True + else: + log.error(f"server unexpected status {status}") + raise CertificateRenewError + + def read_zone_conf(self, zone): + cit = self._get_cit_or_fail(zone) + policy = self._parse_policy_response_to_object(cit) + rs = policy.recommended_settings + org = CertField("") + org_unit = CertField("") + locality = CertField("") + state = CertField("") + country = CertField("") + if rs: + org = CertField(rs.subjectOValue) + org_unit = CertField(rs.subjectOUValue) + locality = CertField(rs.subjectLValue) + state = CertField(rs.subjectSTValue) + country = CertField(rs.subjectCValue) + + z = ZoneConfig( + organization=org, + organizational_unit=org_unit, + country=country, + province=state, + locality=locality, + policy=policy, + key_type=policy.key_types[0] if policy.key_types else None, + ) + return z + + # -- Out of scope for NGTS ---------------------------------------------------------------- + + def get_policy(self, zone): + raise NotImplementedError + + def set_policy(self, zone, policy_spec): + raise NotImplementedError + + def get_version(self): + raise NotImplementedError From f0596f6d68cf62d2869a52576572152b1fd5b063 Mon Sep 17 00:00:00 2001 From: Tomasz Swierszcz Date: Tue, 16 Jun 2026 15:30:24 +0200 Subject: [PATCH 2/2] feat(ngts): default url/token_url and harden the token_url credential sink Make url and token_url optional (default to Palo Alto production), so NGTS works with service-account credentials alone. Since token_url is where credentials are exchanged, add safeguards: force HTTPS, warn on default fallback, and warn when the host is outside .paloaltonetworks.com. Also anchor scope validation, simplify NGTS auto-detection (client_id + client_secret), and add README-NGTS.md. --- README-NGTS.md | 206 +++++++++++++++++++++++++++++++++ examples/ngts/get_cert_ngts.py | 13 ++- tests/test_local_methods.py | 91 ++++++++++++--- vcert/__init__.py | 11 +- vcert/connection_ngts.py | 103 +++++++++++++++-- 5 files changed, 390 insertions(+), 34 deletions(-) create mode 100644 README-NGTS.md diff --git a/README-NGTS.md b/README-NGTS.md new file mode 100644 index 0000000..3c5578a --- /dev/null +++ b/README-NGTS.md @@ -0,0 +1,206 @@ +[![Apache 2.0 License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +![Community Supported](https://img.shields.io/badge/Support%20Level-Community-brightgreen) +![Compatible with Palo Alto NGTS](https://img.shields.io/badge/Compatibility-Palo_Alto_NGTS-f9a90c) + +_**This open source project is community-supported.** To report a problem or share an idea, use +**[Issues](https://github.com/Venafi/vcert-python/issues)**; and if you have a suggestion for fixing the issue, please include those details, too. +In addition, use **[Pull Requests](https://github.com/Venafi/vcert-python/pulls)** to contribute actual bug fixes or proposed enhancements. +We welcome and appreciate all contributions. Got questions or want to discuss something with our team? +**[Join us on Slack](https://join.slack.com/t/venafi-integrations/shared_invite/zt-i8fwc379-kDJlmzU8OiIQOJFSwiA~dg)**!_ + +# VCert Python SDK for Palo Alto Networks Next-Gen Trust Security (NGTS) + +VCert Python is a library and SDK designed to simplify key generation and certificate +enrollment. This guide covers using it against **Palo Alto Networks Next-Gen Trust Security +(NGTS)**, also known as Strata Cloud Manager. + +> 📌 **NOTE:** Unlike the [Go VCert](https://github.com/Venafi/vcert) project, vcert-python is +> **SDK-only** — there is no CLI, playbook, or certificate provisioning. NGTS support in this +> SDK is **certificate-lifecycle only**: `get_policy`/`set_policy`, SSH, and `get_version` +> raise `NotImplementedError`. + +## Quick Links + +- [Prerequisites](#prerequisites) +- [Connecting](#connecting) + - [Connection Parameters](#connection-parameters) + - [API URL Default and Token URL](#api-url-default-and-token-url) +- [Zone Format](#zone-format) +- [Examples](#examples) + - [Connect with service-account credentials](#connect-with-service-account-credentials) + - [Connect with a pre-issued access token](#connect-with-a-pre-issued-access-token) + - [Request and retrieve a certificate](#request-and-retrieve-a-certificate) + - [Renew a certificate](#renew-a-certificate) + +## Prerequisites + +1. The Palo Alto Networks NGTS API is reachable from where your code runs. The production + endpoint is `https://api.strata.paloaltonetworks.com/ngts`. +2. A Palo Alto Networks NGTS service account has been registered and granted the permissions + needed for the operations you use. See the + [Palo Alto Networks service account documentation](https://pan.dev/scm/docs/service-accounts/). + Minimum permissions per SDK operation: + + | SDK operation | Required NGTS permissions | + |---|---| + | `request_cert` (enroll) | `ngts.certificate_issuing_template.get`, `ngts.certificate_request.create`, `ngts.certificate_request.get`, `ngts.certificate_content.get` | + | `retrieve_cert` (pickup) | `ngts.certificate_request.get`, `ngts.certificate.get`, `ngts.edge_encryption_key.get`, `ngts.certificate_content.get` | + | `renew_cert` | `ngts.certificate.search`, `ngts.certificate_content.get`, `ngts.certificate_request.get`, `ngts.certificate.get`, `ngts.certificate_request.create` | + | `revoke_cert` | _(see your NGTS CA Account configuration)_ | + | `retire_cert` | `ngts.certificate.search`, `ngts.certificate_request.get`, `ngts.certificate.retire` | + +3. You have **either** an OAuth access token, **or** service-account credentials (Client ID, + Client Secret, and a TSG ID or scope) the SDK can use to obtain one. +4. A CA Account and an Issuing Template (CIT) exist and are configured with Recommended + Settings (OU, O, L, ST, C) and appropriate Issuing Rules. You know the Issuing Template's + **API alias**. + +## Connecting + +Use `venafi_connection`. NGTS is selected either explicitly via +`platform=VenafiPlatform.NGTS`, or automatically when `client_id` and `client_secret` are +supplied (these are NGTS-specific, so they aren't shadowed by the TPP/Cloud branches). + +```python +from vcert import venafi_connection, VenafiPlatform + +# Explicit platform selection (token_url defaults to production; override for non-prod) +conn = venafi_connection( + platform=VenafiPlatform.NGTS, + client_id="", + client_secret="", + tsg_id="", +) + +# Auto-detection (client_id + client_secret present) +conn = venafi_connection( + client_id="", + client_secret="", + tsg_id="", +) +``` + +### Connection Parameters + +| Parameter | Required | Description | +|---|---|---| +| `client_id` | yes¹ | Service-account Client ID used to obtain an access token. | +| `client_secret` | yes¹ | Service-account Client Secret used to obtain an access token. | +| `tsg_id` | yes² | Tenant Service Group ID. Used to derive the OAuth scope as `tsg_id:`. | +| `scope` | yes² | OAuth scope (e.g. `tsg_id:`). Takes precedence over `tsg_id` when supplied. | +| `access_token` | no¹ | A pre-issued OAuth access token. When supplied, `client_id`/`client_secret` become optional (but are still used to refresh the token if present). | +| `token_url` | no | OAuth token endpoint. Defaults to the Palo Alto production endpoint (see below); override it for non-production environments. | +| `url` | no | NGTS API base URL. Defaults to the Palo Alto production endpoint (see below). | +| `http_request_kwargs` | no | Passed through to `requests` (e.g. a trust bundle via `verify`). | + +¹ Provide **either** `access_token`, **or** `client_id` + `client_secret`. +² Provide **either** `scope`, **or** `tsg_id` (used to derive the scope). + +### API URL and Token URL Defaults + +Both `url` (API base URL) and `token_url` (OAuth token endpoint) are optional. When omitted +they default to the published Palo Alto **production** endpoints — supply them only for +non-production environments: + +| Parameter | Default | +|---|---| +| `url` | `https://api.strata.paloaltonetworks.com/ngts` | +| `token_url` | `https://auth.apps.paloaltonetworks.com/auth/v1/oauth2/access_token` | + +The production token endpoint is taken from the Palo Alto SASE auth API +([reference](https://pan.dev/sase/api/auth/post-auth-v-1-oauth-2-access-token/)). + +> **Note:** Defaulting `token_url` is a deliberate divergence from the Go VCert +> implementation, which still requires the token URL. It is a planned-but-not-yet-upstreamed +> change. + +#### Safeguards around `token_url` + +`token_url` is the **credential sink** — your service-account `client_id`/`client_secret` are +exchanged there via HTTP Basic auth — so the SDK guards it without giving up the default: + +- **HTTPS is enforced.** An `http://` `token_url` is upgraded to `https://` (with a warning) and + a scheme-less value is assumed to be `https://`, so credentials never travel in cleartext. +- **Defaulting is logged.** Omitting `token_url` logs a **WARNING** before falling back to the + production endpoint. This is intentional: if you target a **non-production** tenant but forget to + set `token_url`, your non-prod credentials would otherwise be sent to the **production** token + endpoint silently. Always set `token_url` explicitly for non-production. +- **Untrusted hosts are flagged.** A `token_url` whose host is outside `.paloaltonetworks.com` + (which covers both production and the documented dev endpoints) logs a **WARNING**, surfacing a + typo'd or hostile endpoint that would leak credentials. + +The last two **warn rather than block**, so a future legitimate endpoint on another domain still +works — but you should treat these warnings as a prompt to double-check your configuration. + +## Zone Format + +For NGTS, the **zone is the Issuing Template (CIT) API alias only**. There is no +`Application\IssuingTemplate` split as there is for CyberArk Certificate Manager, SaaS — the +entire string (trimmed) is the template alias, and a backslash is part of the alias, not a +separator. + +```python +zone = "PublicTrust" # the Issuing Template API alias +``` + +## Examples + +For the examples below, assume the Issuing Template has an API alias of `PublicTrust`. + +### Connect with service-account credentials + +```python +from vcert import venafi_connection, VenafiPlatform + +conn = venafi_connection( + platform=VenafiPlatform.NGTS, + token_url="", + client_id="", + client_secret="", + tsg_id="", +) +``` + +### Connect with a pre-issued access token + +```python +from vcert import venafi_connection, VenafiPlatform + +conn = venafi_connection( + platform=VenafiPlatform.NGTS, + access_token="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", +) +``` + +### Request and retrieve a certificate + +```python +from vcert import CertificateRequest + +zone = "PublicTrust" +request = CertificateRequest(common_name="first-time.venafi.example") +request.san_dns = ["first-san.venafi.example", "second-san.venafi.example"] + +conn.request_cert(request, zone) +cert = conn.retrieve_cert(request) # polls until the certificate is issued + +print(cert.cert) # end-entity certificate (PEM) +print(cert.chain) # chain certificates (PEM) +print(request.private_key_pem) +``` + +### Renew a certificate + +```python +from vcert import CertificateRequest + +# Renew by the enrollment (pickup) id of the existing certificate +request = CertificateRequest(cert_id="{7428fac3-d0e8-4679-9f48-d9e867a326ca}") +conn.renew_cert(request) +cert = conn.retrieve_cert(request) +``` + +--- + +For backend-neutral SDK usage (request/retrieve/renew/revoke data objects, output formats), +see the main [README](README.md). diff --git a/examples/ngts/get_cert_ngts.py b/examples/ngts/get_cert_ngts.py index 699b75d..e23cf87 100644 --- a/examples/ngts/get_cert_ngts.py +++ b/examples/ngts/get_cert_ngts.py @@ -27,18 +27,19 @@ def main(): # Get credentials from environment variables. # NGTS (Palo Alto Networks Next-Gen Trust Security) authenticates with Strata Cloud Manager - # OAuth2 client credentials issued by a service account. Both the API base URL and the token - # URL differ per environment (dev/prod), so both must be supplied. - url = environ.get('NGTS_URL') # NGTS API base URL (e.g. https://api.sase.paloaltonetworks.com/ngts) - token_url = environ.get('NGTS_TOKEN_URL') # OAuth2 token endpoint (different FQDN, env-specific) + # OAuth2 client credentials issued by a service account. The API base URL and the token URL + # both default to the Palo Alto production endpoints; supply them only for non-production + # environments (unset env vars fall back to None -> the production defaults). + url = environ.get('NGTS_URL') # Optional NGTS API base URL (defaults to production) + token_url = environ.get('NGTS_TOKEN_URL') # Optional OAuth2 token endpoint (defaults to production) client_id = environ.get('NGTS_CLIENT_ID') # Service-account client id client_secret = environ.get('NGTS_CLIENT_SECRET') # Service-account client secret tsg_id = environ.get('NGTS_TSG_ID') # Tenant service group id (used to build the scope) scope = environ.get('NGTS_SCOPE') # Optional: a ready "tsg_id:" scope zone = environ.get('NGTS_ZONE') # Certificate Issuing Template alias (CIT-only) - # The connection is chosen automatically: when token_url + client_id + client_secret are - # present, an NGTS connection is built. The platform can also be set explicitly: + # The connection is chosen automatically: when client_id + client_secret are present, an + # NGTS connection is built. The platform can also be set explicitly: # conn = venafi_connection(platform=VenafiPlatform.NGTS, ...) conn = venafi_connection(url=url, token_url=token_url, client_id=client_id, client_secret=client_secret, tsg_id=tsg_id, scope=scope) diff --git a/tests/test_local_methods.py b/tests/test_local_methods.py index 1490a3b..a2d41b6 100644 --- a/tests/test_local_methods.py +++ b/tests/test_local_methods.py @@ -15,6 +15,7 @@ # limitations under the License. # import json +import logging import unittest from unittest import mock @@ -24,7 +25,8 @@ from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection, NGTSConnection, logger) -from vcert.connection_ngts import _parse_ngts_zone +from vcert.connection_ngts import (_parse_ngts_zone, DEFAULT_API_URL, DEFAULT_TOKEN_URL, + TRUSTED_TOKEN_HOST_SUFFIX) from vcert.errors import ClientBadData, ServerUnexptedBehavior from vcert.pem import parse_pem, Certificate @@ -358,8 +360,8 @@ def _ngts_conn(**kwargs): client_id="cid", client_secret="csecret", token_url="https://auth.example.com/oauth2/token", - tsg_id="123", - url="https://api.sase.paloaltonetworks.com/ngts", + tsg_id="1000000001", + url="https://api.strata.paloaltonetworks.com/ngts", ) defaults.update(kwargs) return NGTSConnection(**defaults) @@ -377,24 +379,85 @@ def test_parse_ngts_zone(self): def test_ngts_scope_from_tsg_id(self): conn = self._ngts_conn() - self.assertEqual(conn._scope, "tsg_id:123") + self.assertEqual(conn._scope, "tsg_id:1000000001") def test_ngts_explicit_scope_wins(self): - conn = self._ngts_conn(scope="tsg_id:999", tsg_id=None) - self.assertEqual(conn._scope, "tsg_id:999") + conn = self._ngts_conn(scope="tsg_id:9999999999", tsg_id=None) + self.assertEqual(conn._scope, "tsg_id:9999999999") - def test_ngts_requires_url(self): - with self.assertRaises(ClientBadData): - self._ngts_conn(url=None) + def test_ngts_url_defaults_to_palo_alto_production(self): + # Omitting url falls back to the published production endpoint (matches Go's normalizeURL); + # CloudConnection's normalizer appends a trailing slash. + conn = self._ngts_conn(url=None) + self.assertEqual(conn._base_url, DEFAULT_API_URL + "/") - def test_ngts_requires_token_url_without_access_token(self): - with self.assertRaises(ClientBadData): - self._ngts_conn(token_url=None) + def test_ngts_token_url_defaults_to_palo_alto_production(self): + # Omitting token_url falls back to the published production OAuth2 endpoint. (Deliberate + # divergence from Go, which still requires the token URL.) + conn = self._ngts_conn(token_url=None) + self.assertEqual(conn._token_url, DEFAULT_TOKEN_URL) def test_ngts_requires_scope_or_tsg_id(self): with self.assertRaises(ClientBadData): self._ngts_conn(tsg_id=None, scope=None) + def test_ngts_rejects_non_10_digit_tsg_id(self): + # Palo Alto TSG IDs are 10-digit integers (matches Go's validateScope regex). + with self.assertRaises(ClientBadData): + self._ngts_conn(tsg_id="123") + + def test_ngts_rejects_malformed_scope(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(scope="1000000001", tsg_id=None) # missing tsg_id: prefix + with self.assertRaises(ClientBadData): + self._ngts_conn(scope="tsg_id:notdigits", tsg_id=None) + + def test_ngts_scope_is_anchored(self): + # fullmatch (not search): too-many digits or surrounding garbage must be rejected. + with self.assertRaises(ClientBadData): + self._ngts_conn(scope="tsg_id:10000000011", tsg_id=None) # 11 digits + with self.assertRaises(ClientBadData): + self._ngts_conn(scope="x tsg_id:1000000001 x", tsg_id=None) # embedded + + def test_ngts_token_url_forced_to_https(self): + # Credentials are sent to token_url via Basic auth, so an http:// URL is upgraded. + conn = self._ngts_conn(token_url="http://auth.example.com/oauth2/token") + self.assertEqual(conn._token_url, "https://auth.example.com/oauth2/token") + # A scheme-less URL is assumed to be https. + conn = self._ngts_conn(token_url="auth.example.com/oauth2/token") + self.assertEqual(conn._token_url, "https://auth.example.com/oauth2/token") + + def test_ngts_token_url_fallback_warns(self): + # Omitting token_url silently used to be impossible (it errored); now it defaults, so the + # fallback must be logged - a non-prod tenant whose token_url is unset would otherwise send + # its credentials to production unnoticed. + with self.assertLogs("vcert.connection-ngts", level="WARNING") as cm: + self._ngts_conn(token_url=None) + self.assertTrue(any("defaulting to the production endpoint" in m for m in cm.output)) + + def test_ngts_token_url_untrusted_host_warns(self): + # token_url is a credential sink; a host outside the trusted Palo Alto domain is flagged. + with self.assertLogs("vcert.connection-ngts", level="WARNING") as cm: + self._ngts_conn(token_url="https://auth.evil.example.com/oauth2/token") + self.assertTrue(any("is outside" in m and TRUSTED_TOKEN_HOST_SUFFIX in m for m in cm.output)) + + def test_ngts_trusted_token_host_does_not_warn(self): + # A token_url inside paloaltonetworks.com (prod or documented non-prod) is not flagged. + # (assertNoLogs is 3.10+; collect records manually so this also runs on 3.9.) + records = [] + handler = logging.Handler() + handler.setLevel(logging.WARNING) + handler.emit = records.append + ngts_log = logging.getLogger("vcert.connection-ngts") + ngts_log.addHandler(handler) + try: + for url in ("https://auth.apps.paloaltonetworks.com/auth/v1/oauth2/access_token", + "https://auth.dev.appsvc.paloaltonetworks.com/auth/v1/oauth2/access_token"): + self._ngts_conn(token_url=url) + finally: + ngts_log.removeHandler(handler) + self.assertEqual([], [r.getMessage() for r in records]) + def test_ngts_get_access_token(self): conn = self._ngts_conn() fake_resp = mock.MagicMock() @@ -403,7 +466,7 @@ def test_ngts_get_access_token(self): 'access_token': 'abc.def.ghi', 'token_type': 'Bearer', 'expires_in': 900, - 'scope': 'tsg_id:123', + 'scope': 'tsg_id:1000000001', } with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp) as post: token = conn._get_access_token() @@ -415,7 +478,7 @@ def test_ngts_get_access_token(self): _, kwargs = post.call_args self.assertEqual(kwargs['auth'], ('cid', 'csecret')) self.assertEqual(kwargs['data']['grant_type'], 'client_credentials') - self.assertEqual(kwargs['data']['scope'], 'tsg_id:123') + self.assertEqual(kwargs['data']['scope'], 'tsg_id:1000000001') def test_ngts_access_token_rejects_non_bearer(self): conn = self._ngts_conn() diff --git a/vcert/__init__.py b/vcert/__init__.py index 02e5bfc..02e096f 100644 --- a/vcert/__init__.py +++ b/vcert/__init__.py @@ -61,9 +61,9 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t Return connection based on credentials list. CyberArk Platform (CyberArk Certificate Manager, Self-Hosted) requires URL and access_token (or user and password for getting a new access_token) Cloud requires api_key and optional URL - NGTS (Palo Alto Networks Next-Gen Trust Security) requires URL, token_url and OAuth2 service-account credentials (client_id, client_secret, tsg_id/scope) + NGTS (Palo Alto Networks Next-Gen Trust Security) requires OAuth2 service-account credentials (client_id, client_secret, tsg_id/scope); url and token_url are optional and default to the Palo Alto production endpoints Fake requires no parameters - :param str url: CyberArk Certificate Manager, Self-Hosted / SaaS / NGTS URL (for Cloud is optional, required for NGTS) + :param str url: CyberArk Certificate Manager, Self-Hosted / SaaS / NGTS URL (optional for Cloud and NGTS) :param str api_key: CyberArk Certificate Manager, SaaS API Key :param str user: CyberArk Certificate Manager, Self-Hosted username for getting new tokens :param str password: CyberArk Certificate Manager, Self-Hosted password for getting new tokens @@ -74,7 +74,7 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t :param VenafiPlatform platform: The platform to be used with the Connector :param str client_id: NGTS OAuth2 service-account client id :param str client_secret: NGTS OAuth2 service-account client secret - :param str token_url: NGTS OAuth2 token endpoint (differs per environment) + :param str token_url: NGTS OAuth2 token endpoint (optional; defaults to the Palo Alto production endpoint, override for non-production environments) :param str scope: NGTS OAuth2 scope (``tsg_id:``); derived from tsg_id when omitted :param str tsg_id: NGTS tenant service group id :rtype CommonConnection: @@ -97,8 +97,9 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t if fake: return FakeConnection() # NGTS is detected before the TPP/Cloud branches so its OAuth service-account credentials - # are not shadowed by them. - if token_url and client_id and client_secret: + # are not shadowed by them. client_id + client_secret are NGTS-specific (TPP/Cloud use + # neither); token_url is optional now that it defaults to the production endpoint. + if client_id and client_secret: return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope, tsg_id=tsg_id, access_token=access_token, url=url, http_request_kwargs=http_request_kwargs) diff --git a/vcert/connection_ngts.py b/vcert/connection_ngts.py index 2800ffa..6e7baf9 100644 --- a/vcert/connection_ngts.py +++ b/vcert/connection_ngts.py @@ -15,6 +15,7 @@ # import re from datetime import datetime, timedelta +from urllib.parse import urlsplit import requests @@ -33,9 +34,68 @@ OAUTH_TOKEN_TYPE = "Bearer" # nosec B105 DEFAULT_TOKEN_LIFESPAN_SECONDS = 900 +# Palo Alto Networks NGTS production API base URL. Matches Go's normalizeURL fallback behavior +# (vcert/pkg/venafi/ngts/connector.go), but uses the current production host: Go's apiURL constant +# is the stale "api.sase..." host; production is "api.strata..." - do NOT "fix" this back to sase. +DEFAULT_API_URL = "https://api.strata.paloaltonetworks.com/ngts" + +# Palo Alto Networks NGTS production OAuth2 token endpoint. Defaulted (like DEFAULT_API_URL) so the +# production path needs no token URL. Non-production environments (e.g. dev) use a different FQDN and +# must override it. NOTE: Go does not default the token URL yet (validateTokenUrl requires it); this +# default is a planned-but-not-yet-upstreamed divergence we add deliberately, accepting that pointing +# the credential exchange at a fixed endpoint is a known security trade-off. +DEFAULT_TOKEN_URL = "https://auth.apps.paloaltonetworks.com/auth/v1/oauth2/access_token" # nosec B105 + +# Service-account credentials (client_id/client_secret) are sent to token_url via HTTP Basic auth, so +# its host is a credential sink. Every known NGTS token/API host - production and the documented +# non-production (dev) endpoints alike - lives under paloaltonetworks.com. We warn (not block) when a +# supplied token_url falls outside this suffix: it surfaces typo'd or hostile overrides that would +# leak the service-account secrets, without breaking a future legitimate host on another domain. +TRUSTED_TOKEN_HOST_SUFFIX = ".paloaltonetworks.com" # nosec B105 + +# Service-account scope format. Palo Alto TSG IDs are 10-digit integers, so the scope must be +# exactly "tsg_id:<10 digits>" (mirrors Go's validateScope regex; https://pan.dev/scm/docs/scope/). +# Matched with fullmatch so the whole scope must conform - no extra digits, prefixes, or trailing +# garbage. +SCOPE_PATTERN = re.compile(r"tsg_id:[0-9]{10}") + log = get_child("connection-ngts") +def _ensure_https(url): + """ + Force an HTTPS scheme on the OAuth token endpoint. Service-account credentials are sent to + this URL via HTTP Basic auth, so they must never travel over cleartext: an ``http://`` URL is + upgraded to ``https://`` (with a warning) and a scheme-less URL is assumed to be ``https://``. + + :param str url: + :rtype: str + """ + if url.startswith("http://"): + log.warning("token_url uses http://; upgrading to https:// to protect service-account credentials") + url = f"https://{url[7:]}" + elif not url.startswith("https://"): + url = f"https://{url}" + _warn_if_untrusted_token_host(url) + return url + + +def _warn_if_untrusted_token_host(url): + """ + Warn when ``token_url`` points outside the trusted Palo Alto domain. ``token_url`` is where the + service-account ``client_id``/``client_secret`` are exchanged via HTTP Basic auth, so a host + outside :data:`TRUSTED_TOKEN_HOST_SUFFIX` (a typo'd or hostile override) would leak those + credentials. This warns rather than blocks so a future legitimate host on another domain still + works; the warning makes the credential sink auditable. + + :param str url: + """ + host = (urlsplit(url).hostname or "").lower() + if not host.endswith(TRUSTED_TOKEN_HOST_SUFFIX): + log.warning("token_url host [%s] is outside [%s]; service-account credentials will be sent " + "there - verify this endpoint is trusted", host, TRUSTED_TOKEN_HOST_SUFFIX) + + def _parse_ngts_zone(zone): """ NGTS zones are a Certificate Issuing Template alias only - the entire zone string is the @@ -62,22 +122,47 @@ class NGTSConnection(CloudConnection): instead of the ``tppl-api-key`` header. - Zones are a Certificate Issuing Template alias only (no ``Application\\CIT`` split), and request payloads omit ``applicationId``. + + ``url`` is optional: when omitted it defaults to the published Palo Alto production API + endpoint (:data:`DEFAULT_API_URL`), matching Go's ``normalizeURL`` fallback. ``token_url`` is + likewise optional and defaults to the production OAuth2 token endpoint + (:data:`DEFAULT_TOKEN_URL`); non-production environments must supply their own. (Go still + requires the token URL - this default is a deliberate planned divergence.) + + Because ``token_url`` is the credential sink (service-account ``client_id``/``client_secret`` + are exchanged there via HTTP Basic auth), two safeguards guard misconfiguration without giving + up the default: falling back to the production ``token_url`` is logged at WARNING (so a non-prod + tenant with an unset ``token_url`` doesn't silently leak its credentials to production), and a + ``token_url`` whose host falls outside :data:`TRUSTED_TOKEN_HOST_SUFFIX` is flagged at WARNING + (typo'd or hostile override). Both warn rather than block. """ - def __init__(self, client_id, client_secret, token_url, scope=None, tsg_id=None, access_token=None, url=None, + def __init__(self, client_id, client_secret, token_url=None, scope=None, tsg_id=None, access_token=None, url=None, http_request_kwargs=None): - # The NGTS API base URL and token URL both differ per environment (dev/prod), including the - # path, so neither can be hardcoded - both must be supplied by the caller. - if not url: - raise ClientBadData("NGTS requires the API base URL (it differs per environment)") - if not access_token and not token_url: - raise ClientBadData("NGTS requires the token URL (it differs per environment) " - "when no access_token is supplied") + # url defaults to the published Palo Alto production endpoint (Go defaults the base URL + # too); it must be defaulted before the super().__init__ call, which normalizes whatever + # base URL it receives. token_url likewise defaults to the production OAuth2 endpoint; + # non-production environments must override it. (Go requires the token URL - defaulting it + # is a deliberate planned divergence.) + url = url or DEFAULT_API_URL + # Service-account credentials are exchanged at token_url via HTTP Basic auth, so force + # HTTPS - never let a misconfigured http:// endpoint leak them in cleartext. Falling back to + # the production default is logged: an unset/typo'd token_url against a non-prod tenant would + # otherwise silently send that tenant's credentials to the production endpoint. + if not token_url: + log.warning("token_url not supplied; defaulting to the production endpoint [%s]. Set " + "token_url explicitly for non-production environments", DEFAULT_TOKEN_URL) + token_url = _ensure_https(token_url or DEFAULT_TOKEN_URL) if not scope: if not tsg_id: raise ClientBadData("NGTS requires either a scope or a tsg_id") scope = f"tsg_id:{tsg_id}" + # Validate the scope format (mirrors Go's validateScope); also covers tsg_id, since the + # scope is derived from it above. + if not SCOPE_PATTERN.fullmatch(scope): + raise ClientBadData('scope should be in the format "tsg_id:" ' + "(TSG IDs are 10-digit integers)") # CloudConnection.__init__ normalizes/verifies the base URL and sets up # self._http_request_kwargs. The Bearer token replaces the api-key token entirely. @@ -96,7 +181,7 @@ def __str__(self): def _normalize_and_verify_base_url(self): # Unlike Cloud (host-only), NGTS base URLs carry an environment-specific path - # (e.g. https://api.sase.paloaltonetworks.com/ngts), so path segments must be allowed. + # (e.g. https://api.strata.paloaltonetworks.com/ngts), so path segments must be allowed. u = self._base_url if u.startswith('http://'): u = f"https://{u[7:]}"