From 2c6ffa6dc1e56faf3f7d16517130dbe1df36d97f Mon Sep 17 00:00:00 2001 From: Jordan Waters <1waterrj@users.noreply.github.com> Date: Mon, 22 Jun 2026 22:52:01 -0400 Subject: [PATCH] Resolve queues by exact name and accept all credential providers Fixes three related issues with queue discovery and credential handling. Queue discovery (#61, #43): queues were located via `list_queues(QueueNamePrefix=...)` using the main queue's name as the prefix. An error queue whose name does not share that prefix (or a CloudFormation-generated name) was never matched, so the listener tried to create it and hit `QueueNameExists` when it already existed with different attributes. Both SqsListener and SqsLauncher now resolve each queue by its exact name with `get_queue_url`, creating it only when it genuinely does not exist. The resolved error-queue URL is now stored and reused when forwarding failed messages. Credentials (#62): the AWS_ACCOUNT_ID gate only accepted a hard-coded set of role-based credential methods, rejecting valid providers such as a shared credentials file, env vars, or a config-file profile (and raising AttributeError instead of EnvironmentError when no credentials resolved). The gate now accepts credentials from any provider boto3 can resolve and only falls back to requiring AWS_ACCOUNT_ID when none are found. Adds a moto-based test suite covering all three cases. Co-Authored-By: Claude Opus 4.8 --- requirements-test.txt | 3 + sqs_launcher/__init__.py | 28 ++++----- sqs_listener/__init__.py | 110 +++++++++++++++------------------ tests/__init__.py | 0 tests/conftest.py | 22 +++++++ tests/test_credentials.py | 67 ++++++++++++++++++++ tests/test_error_queue_push.py | 55 +++++++++++++++++ tests/test_queue_discovery.py | 85 +++++++++++++++++++++++++ 8 files changed, 297 insertions(+), 73 deletions(-) create mode 100644 requirements-test.txt create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_credentials.py create mode 100644 tests/test_error_queue_push.py create mode 100644 tests/test_queue_discovery.py diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..87a1405 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,3 @@ +boto3 +pytest +moto[sqs] diff --git a/sqs_launcher/__init__.py b/sqs_launcher/__init__.py index d81407b..53739f6 100644 --- a/sqs_launcher/__init__.py +++ b/sqs_launcher/__init__.py @@ -41,11 +41,14 @@ def __init__(self, queue=None, queue_url=None, create_queue=False, visibility_ti if not any([queue, queue_url]): raise ValueError('Either `queue` or `queue_url` should be provided.') - if ( - not os.environ.get('AWS_ACCOUNT_ID', None) and - not (boto3.Session().get_credentials().method in ['iam-role', 'assume-role', 'assume-role-with-web-identity']) - ): - raise EnvironmentError('Environment variable `AWS_ACCOUNT_ID` not set and no role found.') + # Accept credentials from any provider boto3 can resolve (env vars, + # shared credentials file, config file, container/instance roles, SSO, + # assumed roles, ...). AWS_ACCOUNT_ID is only needed as a fallback when + # no credentials are discoverable. See issue #62. + if not os.environ.get('AWS_ACCOUNT_ID', None) and boto3.Session().get_credentials() is None: + raise EnvironmentError( + 'No AWS credentials found and environment variable `AWS_ACCOUNT_ID` is not set.' + ) # new session for each instantiation self._session = boto3.session.Session() @@ -56,15 +59,12 @@ def __init__(self, queue=None, queue_url=None, create_queue=False, visibility_ti self._serializer = serializer if not queue_url: - queues = self._client.list_queues(QueueNamePrefix=self._queue_name) - exists = False - for q in queues.get('QueueUrls', []): - qname = q.split('/')[-1] - if qname == self._queue_name: - exists = True - self._queue_url = q - - if not exists: + # Resolve the queue by its exact name instead of a name prefix, so a + # queue is not confused with another whose name it is a prefix of. + # See issues #43 and #61. + try: + self._queue_url = self._client.get_queue_url(QueueName=self._queue_name)['QueueUrl'] + except self._client.exceptions.QueueDoesNotExist: if create_queue: q = self._client.create_queue( QueueName=self._queue_name, diff --git a/sqs_listener/__init__.py b/sqs_listener/__init__.py index 9fc9e36..c48b1ed 100644 --- a/sqs_listener/__init__.py +++ b/sqs_listener/__init__.py @@ -48,11 +48,14 @@ def __init__(self, queue, **kwargs): ) else: boto3_session = None - if ( - not os.environ.get('AWS_ACCOUNT_ID', None) and - not (boto3.Session().get_credentials().method in ['sso', 'iam-role', 'assume-role', 'assume-role-with-web-identity']) - ): - raise EnvironmentError('Environment variable `AWS_ACCOUNT_ID` not set and no role found.') + # Accept credentials from any provider boto3 can resolve (env vars, + # shared credentials file, config file, container/instance roles, + # SSO, assumed roles, ...). AWS_ACCOUNT_ID is only needed as a + # fallback when no credentials are discoverable. See issue #62. + if not os.environ.get('AWS_ACCOUNT_ID', None) and boto3.Session().get_credentials() is None: + raise EnvironmentError( + 'No AWS credentials found and environment variable `AWS_ACCOUNT_ID` is not set.' + ) self._queue_name = queue self._poll_interval = kwargs.get("interval", 60) @@ -60,6 +63,7 @@ def __init__(self, queue, **kwargs): self._error_queue_name = kwargs.get('error_queue', None) self._error_queue_visibility_timeout = kwargs.get('error_visibility_timeout', '600') self._queue_url = kwargs.get('queue_url', None) + self._error_queue_url = None self._message_attribute_names = kwargs.get('message_attribute_names', []) self._attribute_names = kwargs.get('attribute_names', []) self._force_delete = kwargs.get('force_delete', False) @@ -83,65 +87,50 @@ def _initialize_client(self): ssl = False sqs = self._session.client('sqs', region_name=self._region_name, endpoint_url=self._endpoint_name, use_ssl=ssl) - try: - queues = sqs.list_queues(QueueNamePrefix=self._queue_name) - except SSOTokenLoadError: - raise EnvironmentError('Error loading SSO Token. Reauthenticate via aws sso login.') - main_queue_exists = False - error_queue_exists = False - if 'QueueUrls' in queues: - for q in queues['QueueUrls']: - qname = q.split('/')[-1] - if qname == self._queue_name: - main_queue_exists = True - if self._error_queue_name and qname == self._error_queue_name: - error_queue_exists = True - - # create queue if necessary. - # creation is idempotent, no harm in calling on a queue if it already exists. + # Resolve each queue by its exact name rather than by a shared name + # prefix. The old prefix-based lookup misidentified error queues that + # don't share the main queue's prefix (and CloudFormation-generated + # names), causing spurious queue creation and QueueNameExists errors. + # See issues #43 and #61. if self._queue_url is None: - if not main_queue_exists: - sqs_logger.warning("main queue not found, creating now") - - # is this a fifo queue? - if self._queue_name.endswith(".fifo"): - fifo_queue = "true" - q = sqs.create_queue( - QueueName=self._queue_name, - Attributes={ - 'VisibilityTimeout': self._queue_visibility_timeout, # 10 minutes - 'FifoQueue': fifo_queue - } - ) - else: - # need to avoid FifoQueue property for normal non-fifo queues - q = sqs.create_queue( - QueueName=self._queue_name, - Attributes={ - 'VisibilityTimeout': self._queue_visibility_timeout, # 10 minutes - } - ) - self._queue_url = q['QueueUrl'] - - if self._error_queue_name and not error_queue_exists: - sqs_logger.warning("error queue not found, creating now") - q = sqs.create_queue( - QueueName=self._error_queue_name, - Attributes={ - 'VisibilityTimeout': self._queue_visibility_timeout # 10 minutes - } + self._queue_url = self._get_or_create_queue_url( + sqs, self._queue_name, self._queue_visibility_timeout + ) + + if self._error_queue_name: + self._error_queue_url = self._get_or_create_queue_url( + sqs, self._error_queue_name, self._error_queue_visibility_timeout ) - if self._queue_url is None: - if os.environ.get('AWS_ACCOUNT_ID', None): - qs = sqs.get_queue_url(QueueName=self._queue_name, - QueueOwnerAWSAccountId=os.environ.get('AWS_ACCOUNT_ID', None)) - else: - qs = sqs.get_queue_url(QueueName=self._queue_name) - self._queue_url = qs['QueueUrl'] return sqs + def _get_or_create_queue_url(self, sqs, queue_name, visibility_timeout): + """Return the URL of ``queue_name``, creating the queue only if absent. + + Creation is skipped when the queue already exists, so pre-created + queues (including ones whose names don't share a common prefix) are + used as-is instead of triggering a QueueNameExists error. + """ + try: + account_id = os.environ.get('AWS_ACCOUNT_ID', None) + if account_id: + response = sqs.get_queue_url(QueueName=queue_name, QueueOwnerAWSAccountId=account_id) + else: + response = sqs.get_queue_url(QueueName=queue_name) + return response['QueueUrl'] + except sqs.exceptions.QueueDoesNotExist: + sqs_logger.warning("queue %s not found, creating now", queue_name) + except SSOTokenLoadError: + raise EnvironmentError('Error loading SSO Token. Reauthenticate via aws sso login.') + + attributes = {'VisibilityTimeout': visibility_timeout} + # FIFO queues must be created with the FifoQueue attribute; it must be + # omitted for standard queues. + if queue_name.endswith(".fifo"): + attributes['FifoQueue'] = "true" + return sqs.create_queue(QueueName=queue_name, Attributes=attributes)['QueueUrl'] + def _start_listening(self): # TODO consider incorporating output processing from here: https://github.com/debrouwere/sqs-antenna/blob/master/antenna/__init__.py while True: @@ -193,7 +182,10 @@ def _start_listening(self): exc_type, exc_obj, exc_tb = sys.exc_info() sqs_logger.info("Pushing exception to error queue") - error_launcher = SqsLauncher(queue=self._error_queue_name, create_queue=True) + # The error queue was already resolved/created during + # initialization, so reuse its URL instead of looking + # it up again on every failure. + error_launcher = SqsLauncher(queue_url=self._error_queue_url) error_launcher.launch_message( { 'exception_type': str(exc_type), diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..92a351d --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,22 @@ +"""Shared pytest fixtures for the SQS listener/launcher test-suite.""" + +import os + +import pytest + + +@pytest.fixture(autouse=True) +def aws_environment(monkeypatch): + """Provide a clean, predictable AWS environment for every test. + + - A default region so boto3 clients can be created under moto. + - Dummy static credentials so ``get_credentials()`` resolves. + - ``AWS_ACCOUNT_ID`` removed so the credential gate is actually exercised + (several issues are specifically about behaviour when it is unset). + """ + monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1") + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing") + monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing") + monkeypatch.setenv("AWS_SECURITY_TOKEN", "testing") + monkeypatch.setenv("AWS_SESSION_TOKEN", "testing") + monkeypatch.delenv("AWS_ACCOUNT_ID", raising=False) diff --git a/tests/test_credentials.py b/tests/test_credentials.py new file mode 100644 index 0000000..b4fde81 --- /dev/null +++ b/tests/test_credentials.py @@ -0,0 +1,67 @@ +"""Tests for the credential / AWS_ACCOUNT_ID gate. + +Regression coverage for issue #62 (support of shared-credentials-file): +credentials resolved from a shared credentials file (or any other provider) +must be accepted even when ``AWS_ACCOUNT_ID`` is not set. +""" + +import boto3 +import pytest +from moto import mock_aws + +from sqs_launcher import SqsLauncher +from sqs_listener import SqsListener + + +class _Listener(SqsListener): + def handle_message(self, body, attributes, messages_attributes): + pass + + +@mock_aws +def test_listener_accepts_shared_credentials_file(monkeypatch): + """A non-role credential method (e.g. shared-credentials-file) must work.""" + real_get_credentials = boto3.Session.get_credentials + + def fake_get_credentials(self): + creds = real_get_credentials(self) + if creds is not None: + creds.method = "shared-credentials-file" + return creds + + monkeypatch.setattr(boto3.Session, "get_credentials", fake_get_credentials) + + # Should not raise EnvironmentError just because the method isn't a role. + listener = _Listener("some-queue", region_name="us-east-1") + assert listener._queue_url is not None + + +@mock_aws +def test_launcher_accepts_shared_credentials_file(monkeypatch): + real_get_credentials = boto3.Session.get_credentials + + def fake_get_credentials(self): + creds = real_get_credentials(self) + if creds is not None: + creds.method = "shared-credentials-file" + return creds + + monkeypatch.setattr(boto3.Session, "get_credentials", fake_get_credentials) + + launcher = SqsLauncher("some-queue", create_queue=True) + assert launcher._queue_url is not None + + +def test_listener_raises_when_no_credentials_and_no_account_id(monkeypatch): + """The safety net stays: no credentials AND no account id is still an error.""" + monkeypatch.setattr(boto3.Session, "get_credentials", lambda self: None) + + with pytest.raises(EnvironmentError): + _Listener("some-queue", region_name="us-east-1") + + +def test_launcher_raises_when_no_credentials_and_no_account_id(monkeypatch): + monkeypatch.setattr(boto3.Session, "get_credentials", lambda self: None) + + with pytest.raises(EnvironmentError): + SqsLauncher("some-queue", create_queue=True) diff --git a/tests/test_error_queue_push.py b/tests/test_error_queue_push.py new file mode 100644 index 0000000..c039908 --- /dev/null +++ b/tests/test_error_queue_push.py @@ -0,0 +1,55 @@ +"""When a handler raises, the failing message must be forwarded to the error +queue that was resolved during initialization.""" + +import json + +import boto3 +from moto import mock_aws + +from sqs_listener import SqsListener + + +class _Stop(Exception): + """Sentinel used to break out of the otherwise-infinite poll loop.""" + + +@mock_aws +def test_handler_exception_is_pushed_to_error_queue(): + sqs = boto3.client("sqs", region_name="us-east-1") + sqs.create_queue(QueueName="work-queue") + error_url = sqs.create_queue(QueueName="dead-letters")["QueueUrl"] + + class _Listener(SqsListener): + def handle_message(self, body, attributes, messages_attributes): + raise ValueError("boom") + + listener = _Listener( + "work-queue", error_queue="dead-letters", region_name="us-east-1" + ) + + # First poll yields one message; the second poll aborts the loop so the + # test doesn't spin forever. + calls = {"n": 0} + messages = [ + {"Messages": [{"ReceiptHandle": "rh", "Body": json.dumps({"hi": 1})}]}, + ] + + def fake_receive(**kwargs): + if calls["n"] < len(messages): + msg = messages[calls["n"]] + calls["n"] += 1 + return msg + raise _Stop() + + listener._client.receive_message = fake_receive + + try: + listener._start_listening() + except _Stop: + pass + + received = sqs.receive_message(QueueUrl=error_url, WaitTimeSeconds=0) + assert "Messages" in received + payload = json.loads(received["Messages"][0]["Body"]) + assert "boom" in payload["error_message"] + assert "ValueError" in payload["exception_type"] diff --git a/tests/test_queue_discovery.py b/tests/test_queue_discovery.py new file mode 100644 index 0000000..669534d --- /dev/null +++ b/tests/test_queue_discovery.py @@ -0,0 +1,85 @@ +"""Tests for queue discovery. + +Regression coverage for: +- issue #61: an error queue whose name does not share the main queue's prefix + is excluded from the ``list_queues`` prefix check and wrongly treated as + missing. +- issue #43: ``botocore.errorfactory.QueueNameExists`` when the error queue + already exists but does not share a prefix with the main queue (e.g. queue + names auto-generated by CloudFormation). +""" + +import boto3 +import pytest +from moto import mock_aws + +from sqs_launcher import SqsLauncher +from sqs_listener import SqsListener + + +class _Listener(SqsListener): + def handle_message(self, body, attributes, messages_attributes): + pass + + +@mock_aws +def test_existing_error_queue_without_shared_prefix_is_discovered(): + """Pre-existing error queue with a different prefix must be detected, + not recreated (which would raise QueueNameExists when attrs differ).""" + sqs = boto3.client("sqs", region_name="us-east-1") + sqs.create_queue(QueueName="process-queue") + # Distinct attributes so a re-create attempt raises QueueNameExists, + # exactly reproducing issue #43. + sqs.create_queue( + QueueName="error-queue", Attributes={"VisibilityTimeout": "1234"} + ) + + # Must not raise. + listener = _Listener( + "process-queue", error_queue="error-queue", region_name="us-east-1" + ) + + assert listener._queue_url.endswith("/process-queue") + assert listener._error_queue_url.endswith("/error-queue") + + +@mock_aws +def test_main_and_error_queue_created_when_missing(): + """When neither queue exists they should both be created.""" + listener = _Listener( + "main-q", error_queue="totally-different-error-q", region_name="us-east-1" + ) + + sqs = boto3.client("sqs", region_name="us-east-1") + urls = sqs.list_queues().get("QueueUrls", []) + names = {u.split("/")[-1] for u in urls} + assert "main-q" in names + assert "totally-different-error-q" in names + assert listener._error_queue_url.endswith("/totally-different-error-q") + + +@mock_aws +def test_listener_discovers_main_queue_that_is_prefix_of_another(): + """A queue whose name is a prefix of another must resolve to itself.""" + sqs = boto3.client("sqs", region_name="us-east-1") + sqs.create_queue(QueueName="orders") + sqs.create_queue(QueueName="orders-archive") + + listener = _Listener("orders", region_name="us-east-1") + assert listener._queue_url.endswith("/orders") + + +@mock_aws +def test_launcher_resolves_exact_queue_when_prefix_of_another(): + sqs = boto3.client("sqs", region_name="us-east-1") + sqs.create_queue(QueueName="orders") + sqs.create_queue(QueueName="orders-archive") + + launcher = SqsLauncher("orders") + assert launcher._queue_url.endswith("/orders") + + +@mock_aws +def test_launcher_raises_when_queue_missing_and_not_creating(): + with pytest.raises(ValueError): + SqsLauncher("no-such-queue", create_queue=False)