From 8f6768948fdf07a83d0ad6c96034c33d8da78dea Mon Sep 17 00:00:00 2001 From: sujata-m Date: Fri, 9 Jan 2026 13:13:49 +0530 Subject: [PATCH 01/11] Stop server after processing one message --- src/config.py | 2 +- src/osw_validator.py | 44 ++++++++++++++++++++++++++++++-- tests/unit_tests/test_service.py | 8 ++++-- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/src/config.py b/src/config.py index 7c92892..d1e324c 100644 --- a/src/config.py +++ b/src/config.py @@ -17,7 +17,7 @@ class Settings(BaseSettings): app_name: str = 'python-osw-validation' event_bus = EventBusSettings() auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None) - max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2) + max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) @property def auth_provider(self) -> str: diff --git a/src/osw_validator.py b/src/osw_validator.py index cfcc5f7..9e5019c 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -1,5 +1,8 @@ import gc import logging +import os +import signal +import time import urllib.parse from typing import List from python_ms_core import Core @@ -31,7 +34,8 @@ def __init__(self): self.logger = self.core.get_logger() self.storage_client = self.core.get_storage_client() self.auth = self.core.get_authorizer(config=options) - self.listener_thread = threading.Thread(target=self.start_listening) + self._shutdown_triggered = threading.Event() + self.listener_thread = threading.Thread(target=self.start_listening, daemon=True) self.listener_thread.start() def start_listening(self): @@ -45,6 +49,7 @@ def process(message) -> None: def validate(self, received_message: Upload): tdei_record_id: str = '' + status_sent = False try: tdei_record_id = received_message.message_id logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation !') @@ -66,6 +71,7 @@ def validate(self, received_message: Upload): validation_result = Validation(file_path=file_upload_path, storage_client=self.storage_client) result = validation_result.validate() self.send_status(result=result, upload_message=received_message) + status_sent = True else: raise Exception('File entity not found') except Exception as e: @@ -74,6 +80,13 @@ def validate(self, received_message: Upload): result.is_valid = False result.validation_message = f'Error occurred while validating OSW request {e}' self.send_status(result=result, upload_message=received_message) + status_sent = True + finally: + if status_sent: + logger.info('Triggering server shutdown after status send.') + else: + logger.warning('Server shutdown skipped because status was not sent.') + self._stop_server_and_container(delay_seconds=2) def send_status(self, result: ValidationResult, upload_message: Upload): upload_message.data.success = result.is_valid @@ -90,6 +103,7 @@ def send_status(self, result: ValidationResult, upload_message: Upload): 'data': resp_data }) try: + logger.info('Sending validation result to response topic.') self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data) logger.info(f'Publishing message for : {upload_message.message_id}') except Exception as e: @@ -113,4 +127,30 @@ def has_permission(self, roles: List[str], queue_message: Upload) -> bool: return False def stop_listening(self): - self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work. \ No newline at end of file + self._stop_server_and_container() + if hasattr(self, 'listener_thread'): + self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work. + + def _stop_server_and_container(self, delay_seconds: float = 0.0): + """ + Attempt to gracefully stop the current process (stopping FastAPI/uvicorn and the Docker container). + """ + logger.info('Gracefully stopping FastAPI/uvicorn and Docker container') + if self._shutdown_triggered.is_set(): + logger.info('Server stop already in progress; skipping duplicate trigger.') + return + self._shutdown_triggered.set() + logger.info('Server stop triggered; scheduling shutdown.') + def _terminate(): + if delay_seconds: + time.sleep(delay_seconds) + try: + logger.info('Sending SIGTERM to stop server/container.') + os.kill(os.getpid(), signal.SIGTERM) + except Exception as err: + logger.warning(f'Error occurred while sending SIGTERM: {err}') + finally: + logger.info('Forcing process exit to stop server/container.') + os._exit(0) + + threading.Thread(target=_terminate, daemon=True).start() diff --git a/tests/unit_tests/test_service.py b/tests/unit_tests/test_service.py index 1c16397..0ae4261 100644 --- a/tests/unit_tests/test_service.py +++ b/tests/unit_tests/test_service.py @@ -39,9 +39,10 @@ def setUp(self, mock_core, mock_settings): } } + @patch.object(OSWValidator, '_stop_server_and_container') @patch('src.osw_validator.QueueMessage') @patch('src.osw_validator.Upload') - def test_subscribe_with_valid_message(self, mock_request_message, mock_queue_message): + def test_subscribe_with_valid_message(self, mock_stop_server, mock_queue_message, mock_request_message): # Arrange mock_message = MagicMock() mock_queue_message.to_dict.return_value = self.sample_message @@ -55,6 +56,7 @@ def test_subscribe_with_valid_message(self, mock_request_message, mock_queue_mes # Assert self.service.validate.assert_called_once_with(received_message=mock_request_message.data_from()) + mock_stop_server.assert_called_once() @patch('src.osw_validator.Validation') def test_validate_with_valid_file_path(self, mock_validation): @@ -164,8 +166,9 @@ def test_validate_with_validation_only_in_message_type(self, mock_has_permission self.assertTrue(actual_result.is_valid) self.assertEqual(actual_upload_message, mock_request_message) + @patch.object(OSWValidator, '_stop_server_and_container') @patch('src.osw_validator.threading.Thread') - def test_stop_listening(self, mock_thread): + def test_stop_listening(self, mock_stop, mock_thread): # Arrange mock_thread_instance = MagicMock() mock_thread.return_value = mock_thread_instance @@ -177,6 +180,7 @@ def test_stop_listening(self, mock_thread): # Assert mock_thread_instance.join.assert_called_once_with(timeout=0) + mock_stop.assert_called_once() self.assertIsNone(result) def test_has_permission_success(self): From 424a62706d3129d564d3ee458752feb4493da358 Mon Sep 17 00:00:00 2001 From: Naresh Kumar D Date: Thu, 5 Mar 2026 11:22:44 +0530 Subject: [PATCH 02/11] Added new core with the changes - Maximum receivable messages env to be added - Defaults to -1 --- requirements.txt | 3 ++- src/config.py | 1 + src/main.py | 1 + src/osw_validator.py | 5 ++++- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index aa22a5a..8e58d15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ +--extra-index-url https://test.pypi.org/simple/ fastapi==0.88.0 pydantic==1.10.4 -python-ms-core==0.0.23 +python-ms-core==0.0.24 uvicorn==0.20.0 html_testRunner==1.2.1 geopandas==0.14.4 diff --git a/src/config.py b/src/config.py index d1e324c..f9d84ad 100644 --- a/src/config.py +++ b/src/config.py @@ -18,6 +18,7 @@ class Settings(BaseSettings): event_bus = EventBusSettings() auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None) max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1) + max_receivable_messages: int = os.environ.get('MAX_RECEIVABLE_MESSAGES',-1) # -1 means no limit @property def auth_provider(self) -> str: diff --git a/src/main.py b/src/main.py index b8cfc2e..f60b321 100644 --- a/src/main.py +++ b/src/main.py @@ -22,6 +22,7 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None: try: # OSWValidator() app.validator = OSWValidator() + except: print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m') print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m') diff --git a/src/osw_validator.py b/src/osw_validator.py index 9e5019c..86d41b4 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -24,6 +24,9 @@ class OSWValidator: def __init__(self): self.core = Core() + + ## Print the core version + print(f'Core version: {self.core.__version__}') options = { 'provider': self._settings.auth_provider, 'api_url': self._settings.auth_permission_url @@ -45,7 +48,7 @@ def process(message) -> None: upload_message = Upload.data_from(queue_message) self.validate(received_message=upload_message) - self.listening_topic.subscribe(subscription=self.subscription_name, callback=process) + self.listening_topic.subscribe(subscription=self.subscription_name, callback=process, max_receivable_messages=self._settings.max_receivable_messages) def validate(self, received_message: Upload): tdei_record_id: str = '' From c249d4a5a7e8507fdd3140eba1a2b5339ccfe37d Mon Sep 17 00:00:00 2001 From: MashB Date: Thu, 12 Mar 2026 10:13:21 +0530 Subject: [PATCH 03/11] readme update --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index a2b29ac..4a39ea8 100644 --- a/README.md +++ b/README.md @@ -151,4 +151,6 @@ The format is mentioned in [osw-upload.json](./src/assets/osw-upload.json) The outgoing messages will be to the `osw-validation` topic. The format of the message is at [osw-validation.json](./src/assets/osw-validation.json) +### Git Workflow +The Git workflow automates building the Docker image, pushing it to Azure Container Registry (ACR), and updating the Scalar function app with the new image and any application settings from the repository environment variables. From a86b89dc37854951e7effb310a6548275fce7cdb Mon Sep 17 00:00:00 2001 From: sujata-m Date: Fri, 13 Mar 2026 14:06:32 +0530 Subject: [PATCH 04/11] Fixed unit test cases --- tests/unit_tests/test_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/test_config.py b/tests/unit_tests/test_config.py index 65f9bd4..fe2659f 100644 --- a/tests/unit_tests/test_config.py +++ b/tests/unit_tests/test_config.py @@ -44,7 +44,7 @@ def test_default_settings(self): self.assertEqual(settings.app_name, 'python-osw-validation') self.assertEqual(settings.event_bus.container_name, 'osw') self.assertIsNone(settings.auth_permission_url) - self.assertEqual(settings.max_concurrent_messages, 2) + self.assertEqual(settings.max_concurrent_messages, 1) if __name__ == '__main__': From 55718252c8a19a72da51d61f655ba9364fe186f3 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Fri, 13 Mar 2026 14:23:07 +0530 Subject: [PATCH 05/11] Fixed unit test cases and updated package from testpypi to pypi --- requirements.txt | 1 - .../models/test_queue_message_content.py | 4 ++-- tests/unit_tests/test_osw_validator.py | 4 ++-- tests/unit_tests/test_service.py | 18 +++++++++--------- tests/unit_tests/test_validation.py | 9 +++++++-- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/requirements.txt b/requirements.txt index 17bdf07..640c433 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ ---extra-index-url https://test.pypi.org/simple/ fastapi==0.88.0 pydantic==1.10.4 python-ms-core==0.0.24 diff --git a/tests/unit_tests/models/test_queue_message_content.py b/tests/unit_tests/models/test_queue_message_content.py index 4ac1e2c..3316f63 100644 --- a/tests/unit_tests/models/test_queue_message_content.py +++ b/tests/unit_tests/models/test_queue_message_content.py @@ -9,8 +9,8 @@ TEST_JSON_FILE = os.path.join(parent_dir, 'src/assets/osw-upload.json') -TEST_FILE = open(TEST_JSON_FILE) -TEST_DATA = json.loads(TEST_FILE.read()) +with open(TEST_JSON_FILE) as test_file: + TEST_DATA = json.load(test_file) class TestUpload(unittest.TestCase): diff --git a/tests/unit_tests/test_osw_validator.py b/tests/unit_tests/test_osw_validator.py index 626b749..ed00980 100644 --- a/tests/unit_tests/test_osw_validator.py +++ b/tests/unit_tests/test_osw_validator.py @@ -10,8 +10,8 @@ TEST_JSON_FILE = os.path.join(parent_dir, 'src/assets/osw-upload.json') -TEST_FILE = open(TEST_JSON_FILE) -TEST_DATA = json.loads(TEST_FILE.read()) +with open(TEST_JSON_FILE) as test_file: + TEST_DATA = json.load(test_file) class PermissionResponse: diff --git a/tests/unit_tests/test_service.py b/tests/unit_tests/test_service.py index 0ae4261..3c2e15a 100644 --- a/tests/unit_tests/test_service.py +++ b/tests/unit_tests/test_service.py @@ -19,6 +19,8 @@ def setUp(self, mock_core, mock_settings): mock_settings.return_value.event_bus.container_name = 'test_container' # Mock Core + mock_core.__version__ = 'test-core-version' + mock_core.return_value.__version__ = 'test-core-version' mock_core.return_value.get_topic.return_value = MagicMock() mock_core.return_value.get_storage_client.return_value = MagicMock() @@ -42,11 +44,12 @@ def setUp(self, mock_core, mock_settings): @patch.object(OSWValidator, '_stop_server_and_container') @patch('src.osw_validator.QueueMessage') @patch('src.osw_validator.Upload') - def test_subscribe_with_valid_message(self, mock_stop_server, mock_queue_message, mock_request_message): + def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message, mock_stop_server): # Arrange mock_message = MagicMock() mock_queue_message.to_dict.return_value = self.sample_message - mock_request_message.from_dict.return_value = mock_request_message + mock_upload_message = MagicMock() + mock_upload.data_from.return_value = mock_upload_message self.service.validate = MagicMock() # Act @@ -55,8 +58,8 @@ def test_subscribe_with_valid_message(self, mock_stop_server, mock_queue_message callback(mock_message) # Assert - self.service.validate.assert_called_once_with(received_message=mock_request_message.data_from()) - mock_stop_server.assert_called_once() + self.service.validate.assert_called_once_with(received_message=mock_upload_message) + mock_stop_server.assert_not_called() @patch('src.osw_validator.Validation') def test_validate_with_valid_file_path(self, mock_validation): @@ -167,12 +170,9 @@ def test_validate_with_validation_only_in_message_type(self, mock_has_permission self.assertEqual(actual_upload_message, mock_request_message) @patch.object(OSWValidator, '_stop_server_and_container') - @patch('src.osw_validator.threading.Thread') - def test_stop_listening(self, mock_stop, mock_thread): + def test_stop_listening(self, mock_stop_server): # Arrange mock_thread_instance = MagicMock() - mock_thread.return_value = mock_thread_instance - self.service.listener_thread = mock_thread_instance # Act @@ -180,7 +180,7 @@ def test_stop_listening(self, mock_stop, mock_thread): # Assert mock_thread_instance.join.assert_called_once_with(timeout=0) - mock_stop.assert_called_once() + mock_stop_server.assert_called_once() self.assertIsNone(result) def test_has_permission_success(self): diff --git a/tests/unit_tests/test_validation.py b/tests/unit_tests/test_validation.py index 2238151..493adc8 100644 --- a/tests/unit_tests/test_validation.py +++ b/tests/unit_tests/test_validation.py @@ -122,13 +122,18 @@ def test_validate_invalid_file_with_errors(self, mock_download_file, mock_clean_ for expected, error in zip(expected_errors, errors): self.assertEqual(error['filename'], error_in_file) self.assertEqual(error['feature_index'], expected['feature_index']) - self.assertEqual(error['error_message'][0], expected['error_message']) + self.assertTrue( + error['error_message'][0].startswith( + "Additional properties are not allowed ('crossing' was unexpected)" + ) + ) # Ensure clean_up is called twice (once for the file, once for the folder) self.assertEqual(mock_clean_up.call_count, 2) + @patch('src.validation.Validation.download_single_file', return_value=None) @patch('src.validation.OSWValidation') @patch('src.validation.Validation.clean_up') - def test_validate_invalid_zip(self, mock_clean_up, mock_osw_validation): + def test_validate_invalid_zip(self, mock_clean_up, mock_osw_validation, mock_download_file): """Test validate method for invalid zip file with errors.""" # Mock the OSWValidation validate method to return errors mock_validation_result = MagicMock() From c939b2ceefa47050aa6222c5a23f6cf49d3767a1 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Fri, 13 Mar 2026 14:30:43 +0530 Subject: [PATCH 06/11] Fixed unit test cases --- tests/unit_tests/test_service.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/unit_tests/test_service.py b/tests/unit_tests/test_service.py index 3c2e15a..13d2791 100644 --- a/tests/unit_tests/test_service.py +++ b/tests/unit_tests/test_service.py @@ -7,9 +7,10 @@ class TestOSWValidatorService(unittest.TestCase): + @patch('src.osw_validator.threading.Thread') @patch('src.osw_validator.Settings') @patch('src.osw_validator.Core') - def setUp(self, mock_core, mock_settings): + def setUp(self, mock_core, mock_settings, mock_thread): # Mock Settings mock_settings.return_value.event_bus.upload_subscription = 'test_subscription' mock_settings.return_value.event_bus.upload_topic = 'test_request_topic' @@ -23,12 +24,15 @@ def setUp(self, mock_core, mock_settings): mock_core.return_value.__version__ = 'test-core-version' mock_core.return_value.get_topic.return_value = MagicMock() mock_core.return_value.get_storage_client.return_value = MagicMock() + self.mock_listener_thread = MagicMock() + mock_thread.return_value = self.mock_listener_thread # Initialize OSWValidator with mocked dependencies self.service = OSWValidator() self.service.storage_client = MagicMock() self.service.container_name = 'test_container' self.service.auth = MagicMock() + self.service._stop_server_and_container = MagicMock() # Define a sample message with proper strings self.sample_message = { @@ -41,10 +45,9 @@ def setUp(self, mock_core, mock_settings): } } - @patch.object(OSWValidator, '_stop_server_and_container') @patch('src.osw_validator.QueueMessage') @patch('src.osw_validator.Upload') - def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message, mock_stop_server): + def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message): # Arrange mock_message = MagicMock() mock_queue_message.to_dict.return_value = self.sample_message @@ -59,7 +62,6 @@ def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message, moc # Assert self.service.validate.assert_called_once_with(received_message=mock_upload_message) - mock_stop_server.assert_not_called() @patch('src.osw_validator.Validation') def test_validate_with_valid_file_path(self, mock_validation): @@ -169,18 +171,16 @@ def test_validate_with_validation_only_in_message_type(self, mock_has_permission self.assertTrue(actual_result.is_valid) self.assertEqual(actual_upload_message, mock_request_message) - @patch.object(OSWValidator, '_stop_server_and_container') - def test_stop_listening(self, mock_stop_server): + def test_stop_listening(self): # Arrange - mock_thread_instance = MagicMock() - self.service.listener_thread = mock_thread_instance + self.service.listener_thread = self.mock_listener_thread # Act result = self.service.stop_listening() # Assert - mock_thread_instance.join.assert_called_once_with(timeout=0) - mock_stop_server.assert_called_once() + self.mock_listener_thread.join.assert_called_once_with(timeout=0) + self.service._stop_server_and_container.assert_called_once() self.assertIsNone(result) def test_has_permission_success(self): From ebcc83472bb87bcb7ac5e1f94fc52cd87c923b54 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Fri, 13 Mar 2026 14:33:35 +0530 Subject: [PATCH 07/11] Updated unit test cases --- .../interface/test_validator_abstract.py | 13 ++++++ tests/unit_tests/test_main.py | 40 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/tests/unit_tests/interface/test_validator_abstract.py b/tests/unit_tests/interface/test_validator_abstract.py index 5f95071..1f22573 100644 --- a/tests/unit_tests/interface/test_validator_abstract.py +++ b/tests/unit_tests/interface/test_validator_abstract.py @@ -12,6 +12,11 @@ def validate(self, message: QueueMessage) -> None: pass +class SuperCallingValidator(ValidatorAbstract): + def validate(self, message: QueueMessage) -> None: + return super().validate(message) + + class TestValidatorAbstract(unittest.TestCase): def test_abstract_method_enforcement(self): @@ -37,6 +42,14 @@ def test_validate_method_called(self): # Assert that the mocked message object is a valid argument self.assertTrue(hasattr(message, '__class__')) + def test_abstract_base_method_body_returns_none(self): + message = MagicMock(spec=QueueMessage) + validator = SuperCallingValidator() + + result = validator.validate(message) + + self.assertIsNone(result) + if __name__ == '__main__': unittest.main() diff --git a/tests/unit_tests/test_main.py b/tests/unit_tests/test_main.py index 6beac7a..33d6201 100644 --- a/tests/unit_tests/test_main.py +++ b/tests/unit_tests/test_main.py @@ -1,6 +1,9 @@ import unittest +import asyncio +from unittest.mock import MagicMock, patch from fastapi import status from fastapi.testclient import TestClient +import src.main as main from src.main import app, get_settings @@ -22,6 +25,43 @@ def test_get_settings(self): settings = get_settings() self.assertIsNotNone(settings) + @patch('src.main.OSWValidator') + def test_startup_event_sets_validator(self, mock_validator): + validator = MagicMock() + mock_validator.return_value = validator + main.app.validator = None + + asyncio.run(main.startup_event()) + + self.assertIs(main.app.validator, validator) + + @patch('builtins.print') + @patch('src.main.psutil.Process') + @patch('src.main.os.getpid', return_value=123) + @patch('src.main.OSWValidator', side_effect=Exception('boom')) + def test_startup_event_handles_validator_init_failure(self, mock_validator, mock_getpid, mock_process, mock_print): + child_one = MagicMock() + child_two = MagicMock() + parent = MagicMock() + parent.children.return_value = [child_one, child_two] + mock_process.return_value = parent + + asyncio.run(main.startup_event()) + + parent.children.assert_called_once_with(recursive=True) + child_one.kill.assert_called_once() + child_two.kill.assert_called_once() + parent.kill.assert_called_once() + self.assertGreaterEqual(mock_print.call_count, 6) + + def test_shutdown_event_stops_validator(self): + validator = MagicMock() + main.app.validator = validator + + asyncio.run(main.shutdown_event()) + + validator.stop_listening.assert_called_once() + if __name__ == '__main__': unittest.main() From 883facfc75d8bf13dd03dc323891345ddfbd7c9d Mon Sep 17 00:00:00 2001 From: sujata-m Date: Mon, 16 Mar 2026 17:01:41 +0530 Subject: [PATCH 08/11] 3246 changes ## DevBoard Task https://dev.azure.com/TDEI-UW/TDEI/_workitems/edit/3246 ## Changes Introduced - Updated the package version ## Impacted Areas for Testing - Test dev workflow --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 640c433..6c7b451 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ python-ms-core==0.0.24 uvicorn==0.20.0 html_testRunner==1.2.1 geopandas==0.14.4 -python-osw-validation==0.3.4 +python-osw-validation==0.3.5 From d4737d5cb059a9703074f8845d2fadcaecc445b6 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Wed, 18 Mar 2026 21:20:07 +0530 Subject: [PATCH 09/11] Added multiprocessing --- requirements.txt | 3 ++- src/osw_validator.py | 8 +++++--- tests/unit_tests/test_service.py | 15 ++++++++++++++- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/requirements.txt b/requirements.txt index 6c7b451..ff95b27 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ +--extra-index-url https://test.pypi.org/simple/ fastapi==0.88.0 pydantic==1.10.4 -python-ms-core==0.0.24 +python-ms-core==0.2.5.2 uvicorn==0.20.0 html_testRunner==1.2.1 geopandas==0.14.4 diff --git a/src/osw_validator.py b/src/osw_validator.py index 86d41b4..34083f4 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -49,6 +49,9 @@ def process(message) -> None: self.validate(received_message=upload_message) self.listening_topic.subscribe(subscription=self.subscription_name, callback=process, max_receivable_messages=self._settings.max_receivable_messages) + if self._settings.max_receivable_messages > 0: + logger.info('Listener finished processing available messages; stopping server/container.') + self._stop_server_and_container(delay_seconds=2) def validate(self, received_message: Upload): tdei_record_id: str = '' @@ -86,10 +89,9 @@ def validate(self, received_message: Upload): status_sent = True finally: if status_sent: - logger.info('Triggering server shutdown after status send.') + logger.info('Validation status sent for %s.', tdei_record_id) else: - logger.warning('Server shutdown skipped because status was not sent.') - self._stop_server_and_container(delay_seconds=2) + logger.warning('Validation status was not sent for %s.', tdei_record_id) def send_status(self, result: ValidationResult, upload_message: Upload): upload_message.data.success = result.is_valid diff --git a/tests/unit_tests/test_service.py b/tests/unit_tests/test_service.py index 13d2791..29c16fa 100644 --- a/tests/unit_tests/test_service.py +++ b/tests/unit_tests/test_service.py @@ -1,5 +1,5 @@ import unittest -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, ANY from src.osw_validator import OSWValidator from src.models.queue_message_content import Upload from src.models.queue_message_content import ValidationResult @@ -16,6 +16,7 @@ def setUp(self, mock_core, mock_settings, mock_thread): mock_settings.return_value.event_bus.upload_topic = 'test_request_topic' mock_settings.return_value.event_bus.validation_topic = 'test_response_topic' mock_settings.return_value.max_concurrent_messages = 10 + mock_settings.return_value.max_receivable_messages = -1 mock_settings.return_value.get_download_directory.return_value = '/tmp' mock_settings.return_value.event_bus.container_name = 'test_container' @@ -63,6 +64,18 @@ def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message): # Assert self.service.validate.assert_called_once_with(received_message=mock_upload_message) + def test_start_listening_stops_container_after_subscribe_returns(self): + self.service._settings.max_receivable_messages = 1 + + self.service.start_listening() + + self.service.listening_topic.subscribe.assert_called_once_with( + subscription=self.service.subscription_name, + callback=ANY, + max_receivable_messages=1, + ) + self.service._stop_server_and_container.assert_called_once_with(delay_seconds=2) + @patch('src.osw_validator.Validation') def test_validate_with_valid_file_path(self, mock_validation): # Arrange From 22c70e8cd7c01d3f0af240ddb15f1e09f50f0842 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Thu, 19 Mar 2026 15:54:11 +0530 Subject: [PATCH 10/11] Updated package --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index ff95b27..d708be6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ ---extra-index-url https://test.pypi.org/simple/ fastapi==0.88.0 pydantic==1.10.4 -python-ms-core==0.2.5.2 +python-ms-core==0.0.25 uvicorn==0.20.0 html_testRunner==1.2.1 geopandas==0.14.4 From 771fc2b1f3df75d471c74457cfcfbb8a83ffbdc2 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Thu, 19 Mar 2026 15:57:10 +0530 Subject: [PATCH 11/11] Added core version in logs --- src/osw_validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/osw_validator.py b/src/osw_validator.py index 34083f4..6b8397e 100644 --- a/src/osw_validator.py +++ b/src/osw_validator.py @@ -58,7 +58,7 @@ def validate(self, received_message: Upload): status_sent = False try: tdei_record_id = received_message.message_id - logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation !') + logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation! Core version: {Core.__version__}') if received_message.data.file_upload_path is None: error_msg = 'Request does not have valid file path specified.'