Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8f67689
Stop server after processing one message
sujata-m Jan 9, 2026
424a627
Added new core with the changes
susrisha Mar 5, 2026
f1600bd
Merge pull request #74 from TaskarCenterAtUW/main
MashB Mar 10, 2026
e628bb8
Merge pull request #75 from TaskarCenterAtUW/stage
MashB Mar 10, 2026
c249d4a
readme update
MashB Mar 12, 2026
9087a75
Merge pull request #76 from TaskarCenterAtUW/feature-scalar
MashB Mar 12, 2026
2cb81b1
Merge pull request #77 from TaskarCenterAtUW/dev
MashB Mar 12, 2026
7a748ce
Merge branch 'dev' into feature-stop-server-after-processing-message
sujata-m Mar 13, 2026
a86b89d
Fixed unit test cases
sujata-m Mar 13, 2026
3d29882
Merge branch 'dev' into feature-stop-server-after-processing-message
sujata-m Mar 13, 2026
5571825
Fixed unit test cases and updated package from testpypi to pypi
sujata-m Mar 13, 2026
c939b2c
Fixed unit test cases
sujata-m Mar 13, 2026
ebcc834
Updated unit test cases
sujata-m Mar 13, 2026
14efd0d
Merge pull request #78 from TaskarCenterAtUW/feature-stop-server-afte…
sujata-m Mar 13, 2026
192c014
Merge branch 'stage' into dev
sujata-m Mar 13, 2026
eaebde7
Merge pull request #79 from TaskarCenterAtUW/dev
sujata-m Mar 13, 2026
883facf
3246 changes
sujata-m Mar 16, 2026
2c4f9a6
Merge pull request #80 from TaskarCenterAtUW/feature-3246
sujata-m Mar 16, 2026
332f7ac
Merge branch 'stage' into dev
sujata-m Mar 16, 2026
4b44418
Merge pull request #81 from TaskarCenterAtUW/dev
sujata-m Mar 16, 2026
d4737d5
Added multiprocessing
sujata-m Mar 18, 2026
22c70e8
Updated package
sujata-m Mar 19, 2026
771fc2b
Added core version in logs
sujata-m Mar 19, 2026
fac54b2
Merge pull request #82 from TaskarCenterAtUW/feature-stop-at-one-mult…
sujata-m Mar 19, 2026
3e437aa
Merge branch 'stage' into dev
sujata-m Mar 23, 2026
21ab649
Merge pull request #83 from TaskarCenterAtUW/dev
sujata-m Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,6 @@ The format is mentioned in [osw-upload.json](./src/assets/osw-upload.json)
The outgoing messages will be to the `osw-validation` topic.
The format of the message is at [osw-validation.json](./src/assets/osw-validation.json)

### Git Workflow

The Git workflow automates building the Docker image, pushing it to Azure Container Registry (ACR), and updating the Scalar function app with the new image and any application settings from the repository environment variables.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
fastapi==0.88.0
pydantic==1.10.4
python-ms-core==0.0.23
python-ms-core==0.0.25
uvicorn==0.20.0
html_testRunner==1.2.1
geopandas==0.14.4
python-osw-validation==0.3.4
python-osw-validation==0.3.5
3 changes: 2 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class Settings(BaseSettings):
app_name: str = 'python-osw-validation'
event_bus = EventBusSettings()
auth_permission_url: str = os.environ.get('AUTH_PERMISSION_URL', None)
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 2)
max_concurrent_messages: int = os.environ.get('MAX_CONCURRENT_MESSAGES', 1)
max_receivable_messages: int = os.environ.get('MAX_RECEIVABLE_MESSAGES',-1) # -1 means no limit

@property
def auth_provider(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async def startup_event(settings: Settings = Depends(get_settings)) -> None:
try:
# OSWValidator()
app.validator = OSWValidator()

except:
print('\n\n\x1b[31m Application startup failed due to missing or invalid .env file \x1b[0m')
print('\x1b[31m Please provide the valid .env file and .env file should contains following parameters\x1b[0m')
Expand Down
53 changes: 49 additions & 4 deletions src/osw_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import gc
import logging
import os
import signal
import time
import urllib.parse
from typing import List
from python_ms_core import Core
Expand All @@ -21,6 +24,9 @@ class OSWValidator:

def __init__(self):
self.core = Core()

## Print the core version
print(f'Core version: {self.core.__version__}')
options = {
'provider': self._settings.auth_provider,
'api_url': self._settings.auth_permission_url
Expand All @@ -31,7 +37,8 @@ def __init__(self):
self.logger = self.core.get_logger()
self.storage_client = self.core.get_storage_client()
self.auth = self.core.get_authorizer(config=options)
self.listener_thread = threading.Thread(target=self.start_listening)
self._shutdown_triggered = threading.Event()
self.listener_thread = threading.Thread(target=self.start_listening, daemon=True)
self.listener_thread.start()

def start_listening(self):
Expand All @@ -41,13 +48,17 @@ def process(message) -> None:
upload_message = Upload.data_from(queue_message)
self.validate(received_message=upload_message)

self.listening_topic.subscribe(subscription=self.subscription_name, callback=process)
self.listening_topic.subscribe(subscription=self.subscription_name, callback=process, max_receivable_messages=self._settings.max_receivable_messages)
if self._settings.max_receivable_messages > 0:
logger.info('Listener finished processing available messages; stopping server/container.')
self._stop_server_and_container(delay_seconds=2)

def validate(self, received_message: Upload):
tdei_record_id: str = ''
status_sent = False
try:
tdei_record_id = received_message.message_id
logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation !')
logger.info(f'Received message for : {tdei_record_id} Message received for OSW validation! Core version: {Core.__version__}')

if received_message.data.file_upload_path is None:
error_msg = 'Request does not have valid file path specified.'
Expand All @@ -66,6 +77,7 @@ def validate(self, received_message: Upload):
validation_result = Validation(file_path=file_upload_path, storage_client=self.storage_client)
result = validation_result.validate()
self.send_status(result=result, upload_message=received_message)
status_sent = True
else:
raise Exception('File entity not found')
except Exception as e:
Expand All @@ -74,6 +86,12 @@ def validate(self, received_message: Upload):
result.is_valid = False
result.validation_message = f'Error occurred while validating OSW request {e}'
self.send_status(result=result, upload_message=received_message)
status_sent = True
finally:
if status_sent:
logger.info('Validation status sent for %s.', tdei_record_id)
else:
logger.warning('Validation status was not sent for %s.', tdei_record_id)

def send_status(self, result: ValidationResult, upload_message: Upload):
upload_message.data.success = result.is_valid
Expand All @@ -90,6 +108,7 @@ def send_status(self, result: ValidationResult, upload_message: Upload):
'data': resp_data
})
try:
logger.info('Sending validation result to response topic.')
self.core.get_topic(topic_name=self._settings.event_bus.validation_topic).publish(data=data)
logger.info(f'Publishing message for : {upload_message.message_id}')
except Exception as e:
Expand All @@ -113,4 +132,30 @@ def has_permission(self, roles: List[str], queue_message: Upload) -> bool:
return False

def stop_listening(self):
self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work.
self._stop_server_and_container()
if hasattr(self, 'listener_thread'):
self.listener_thread.join(timeout=0) # Stop the thread during shutdown.Its still an attempt. Not sure if this will work.

def _stop_server_and_container(self, delay_seconds: float = 0.0):
"""
Attempt to gracefully stop the current process (stopping FastAPI/uvicorn and the Docker container).
"""
logger.info('Gracefully stopping FastAPI/uvicorn and Docker container')
if self._shutdown_triggered.is_set():
logger.info('Server stop already in progress; skipping duplicate trigger.')
return
self._shutdown_triggered.set()
logger.info('Server stop triggered; scheduling shutdown.')
def _terminate():
if delay_seconds:
time.sleep(delay_seconds)
try:
logger.info('Sending SIGTERM to stop server/container.')
os.kill(os.getpid(), signal.SIGTERM)
except Exception as err:
logger.warning(f'Error occurred while sending SIGTERM: {err}')
finally:
logger.info('Forcing process exit to stop server/container.')
os._exit(0)

threading.Thread(target=_terminate, daemon=True).start()
13 changes: 13 additions & 0 deletions tests/unit_tests/interface/test_validator_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ def validate(self, message: QueueMessage) -> None:
pass


class SuperCallingValidator(ValidatorAbstract):
def validate(self, message: QueueMessage) -> None:
return super().validate(message)


class TestValidatorAbstract(unittest.TestCase):

def test_abstract_method_enforcement(self):
Expand All @@ -37,6 +42,14 @@ def test_validate_method_called(self):
# Assert that the mocked message object is a valid argument
self.assertTrue(hasattr(message, '__class__'))

def test_abstract_base_method_body_returns_none(self):
message = MagicMock(spec=QueueMessage)
validator = SuperCallingValidator()

result = validator.validate(message)

self.assertIsNone(result)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/unit_tests/models/test_queue_message_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

TEST_JSON_FILE = os.path.join(parent_dir, 'src/assets/osw-upload.json')

TEST_FILE = open(TEST_JSON_FILE)
TEST_DATA = json.loads(TEST_FILE.read())
with open(TEST_JSON_FILE) as test_file:
TEST_DATA = json.load(test_file)


class TestUpload(unittest.TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit_tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_default_settings(self):
self.assertEqual(settings.app_name, 'python-osw-validation')
self.assertEqual(settings.event_bus.container_name, 'osw')
self.assertIsNone(settings.auth_permission_url)
self.assertEqual(settings.max_concurrent_messages, 2)
self.assertEqual(settings.max_concurrent_messages, 1)


if __name__ == '__main__':
Expand Down
40 changes: 40 additions & 0 deletions tests/unit_tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import unittest
import asyncio
from unittest.mock import MagicMock, patch
from fastapi import status
from fastapi.testclient import TestClient
import src.main as main
from src.main import app, get_settings


Expand All @@ -22,6 +25,43 @@ def test_get_settings(self):
settings = get_settings()
self.assertIsNotNone(settings)

@patch('src.main.OSWValidator')
def test_startup_event_sets_validator(self, mock_validator):
validator = MagicMock()
mock_validator.return_value = validator
main.app.validator = None

asyncio.run(main.startup_event())

self.assertIs(main.app.validator, validator)

@patch('builtins.print')
@patch('src.main.psutil.Process')
@patch('src.main.os.getpid', return_value=123)
@patch('src.main.OSWValidator', side_effect=Exception('boom'))
def test_startup_event_handles_validator_init_failure(self, mock_validator, mock_getpid, mock_process, mock_print):
child_one = MagicMock()
child_two = MagicMock()
parent = MagicMock()
parent.children.return_value = [child_one, child_two]
mock_process.return_value = parent

asyncio.run(main.startup_event())

parent.children.assert_called_once_with(recursive=True)
child_one.kill.assert_called_once()
child_two.kill.assert_called_once()
parent.kill.assert_called_once()
self.assertGreaterEqual(mock_print.call_count, 6)

def test_shutdown_event_stops_validator(self):
validator = MagicMock()
main.app.validator = validator

asyncio.run(main.shutdown_event())

validator.stop_listening.assert_called_once()


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/unit_tests/test_osw_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

TEST_JSON_FILE = os.path.join(parent_dir, 'src/assets/osw-upload.json')

TEST_FILE = open(TEST_JSON_FILE)
TEST_DATA = json.loads(TEST_FILE.read())
with open(TEST_JSON_FILE) as test_file:
TEST_DATA = json.load(test_file)


class PermissionResponse:
Expand Down
41 changes: 29 additions & 12 deletions tests/unit_tests/test_service.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
import unittest
from unittest.mock import patch, MagicMock
from unittest.mock import patch, MagicMock, ANY
from src.osw_validator import OSWValidator
from src.models.queue_message_content import Upload
from src.models.queue_message_content import ValidationResult


class TestOSWValidatorService(unittest.TestCase):

@patch('src.osw_validator.threading.Thread')
@patch('src.osw_validator.Settings')
@patch('src.osw_validator.Core')
def setUp(self, mock_core, mock_settings):
def setUp(self, mock_core, mock_settings, mock_thread):
# Mock Settings
mock_settings.return_value.event_bus.upload_subscription = 'test_subscription'
mock_settings.return_value.event_bus.upload_topic = 'test_request_topic'
mock_settings.return_value.event_bus.validation_topic = 'test_response_topic'
mock_settings.return_value.max_concurrent_messages = 10
mock_settings.return_value.max_receivable_messages = -1
mock_settings.return_value.get_download_directory.return_value = '/tmp'
mock_settings.return_value.event_bus.container_name = 'test_container'

# Mock Core
mock_core.__version__ = 'test-core-version'
mock_core.return_value.__version__ = 'test-core-version'
mock_core.return_value.get_topic.return_value = MagicMock()
mock_core.return_value.get_storage_client.return_value = MagicMock()
self.mock_listener_thread = MagicMock()
mock_thread.return_value = self.mock_listener_thread

# Initialize OSWValidator with mocked dependencies
self.service = OSWValidator()
self.service.storage_client = MagicMock()
self.service.container_name = 'test_container'
self.service.auth = MagicMock()
self.service._stop_server_and_container = MagicMock()

# Define a sample message with proper strings
self.sample_message = {
Expand All @@ -41,11 +48,12 @@ def setUp(self, mock_core, mock_settings):

@patch('src.osw_validator.QueueMessage')
@patch('src.osw_validator.Upload')
def test_subscribe_with_valid_message(self, mock_request_message, mock_queue_message):
def test_subscribe_with_valid_message(self, mock_upload, mock_queue_message):
# Arrange
mock_message = MagicMock()
mock_queue_message.to_dict.return_value = self.sample_message
mock_request_message.from_dict.return_value = mock_request_message
mock_upload_message = MagicMock()
mock_upload.data_from.return_value = mock_upload_message
self.service.validate = MagicMock()

# Act
Expand All @@ -54,7 +62,19 @@ def test_subscribe_with_valid_message(self, mock_request_message, mock_queue_mes
callback(mock_message)

# Assert
self.service.validate.assert_called_once_with(received_message=mock_request_message.data_from())
self.service.validate.assert_called_once_with(received_message=mock_upload_message)

def test_start_listening_stops_container_after_subscribe_returns(self):
self.service._settings.max_receivable_messages = 1

self.service.start_listening()

self.service.listening_topic.subscribe.assert_called_once_with(
subscription=self.service.subscription_name,
callback=ANY,
max_receivable_messages=1,
)
self.service._stop_server_and_container.assert_called_once_with(delay_seconds=2)

@patch('src.osw_validator.Validation')
def test_validate_with_valid_file_path(self, mock_validation):
Expand Down Expand Up @@ -164,19 +184,16 @@ def test_validate_with_validation_only_in_message_type(self, mock_has_permission
self.assertTrue(actual_result.is_valid)
self.assertEqual(actual_upload_message, mock_request_message)

@patch('src.osw_validator.threading.Thread')
def test_stop_listening(self, mock_thread):
def test_stop_listening(self):
# Arrange
mock_thread_instance = MagicMock()
mock_thread.return_value = mock_thread_instance

self.service.listener_thread = mock_thread_instance
self.service.listener_thread = self.mock_listener_thread

# Act
result = self.service.stop_listening()

# Assert
mock_thread_instance.join.assert_called_once_with(timeout=0)
self.mock_listener_thread.join.assert_called_once_with(timeout=0)
self.service._stop_server_and_container.assert_called_once()
self.assertIsNone(result)

def test_has_permission_success(self):
Expand Down
9 changes: 7 additions & 2 deletions tests/unit_tests/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,18 @@ def test_validate_invalid_file_with_errors(self, mock_download_file, mock_clean_
for expected, error in zip(expected_errors, errors):
self.assertEqual(error['filename'], error_in_file)
self.assertEqual(error['feature_index'], expected['feature_index'])
self.assertEqual(error['error_message'][0], expected['error_message'])
self.assertTrue(
error['error_message'][0].startswith(
"Additional properties are not allowed ('crossing' was unexpected)"
)
)
# Ensure clean_up is called twice (once for the file, once for the folder)
self.assertEqual(mock_clean_up.call_count, 2)

@patch('src.validation.Validation.download_single_file', return_value=None)
@patch('src.validation.OSWValidation')
@patch('src.validation.Validation.clean_up')
def test_validate_invalid_zip(self, mock_clean_up, mock_osw_validation):
def test_validate_invalid_zip(self, mock_clean_up, mock_osw_validation, mock_download_file):
"""Test validate method for invalid zip file with errors."""
# Mock the OSWValidation validate method to return errors
mock_validation_result = MagicMock()
Expand Down
Loading