diff --git a/sagemaker-serve/tests/integ/test_model_customization_deployment.py b/sagemaker-serve/tests/integ/test_model_customization_deployment.py index b38ca249c7..aa4f882949 100644 --- a/sagemaker-serve/tests/integ/test_model_customization_deployment.py +++ b/sagemaker-serve/tests/integ/test_model_customization_deployment.py @@ -13,14 +13,24 @@ """Integration tests for ModelBuilder model customization deployment.""" from __future__ import absolute_import +import os +import json import boto3 +import time import pytest import random +import logging +from botocore.config import Config +from datetime import datetime, timezone, timedelta + + +logger = logging.getLogger(__name__) from sagemaker.core.helper.session_helper import Session # This test relies on resources in a specific region AWS_REGION = "us-west-2" +os.environ.setdefault("AWS_DEFAULT_REGION", AWS_REGION) @pytest.fixture(scope="module") @@ -135,6 +145,38 @@ def test_deploy_from_training_job(self, training_job_name, endpoint_name, cleanu adapter_ic = InferenceComponent.get(inference_component_name=adapter_name, region=AWS_REGION) assert adapter_ic is not None + # Invoke verification + time.sleep(10) # brief buffer for IC readiness + + invoke_ic_name = adapter_name if peft_type == "LORA" else f"{endpoint_name}-inference-component" + + test_payload = { + "inputs": "What is machine learning?", + "parameters": {"max_new_tokens": 32}, + } + + invoke_response = endpoint.invoke( + body=json.dumps(test_payload), + content_type="application/json", + accept="application/json", + inference_component_name=invoke_ic_name, + ) + + response_body = json.loads(invoke_response.body.read()) + + # Validate response structure + assert response_body is not None, f"Empty response from invoke on {invoke_ic_name}" + if isinstance(response_body, list): + assert len(response_body) > 0 + assert "generated_text" in response_body[0] or "generation" in response_body[0] + elif isinstance(response_body, dict): + assert ( + "generated_text" in response_body + or "generation" in response_body + or "outputs" in response_body + ) + + def test_fetch_endpoint_names_for_base_model(self, training_job_name, sagemaker_session): """Test fetching endpoint names for base model.""" from sagemaker.core.resources import TrainingJob @@ -300,9 +342,6 @@ def test_dpo_trainer_build(self, training_job_name, sagemaker_session): - Improved test assertions to work with new object structures """ -import json -import time -import pytest from sagemaker.core.resources import TrainingJob, ModelPackage from sagemaker.serve.bedrock_model_builder import BedrockModelBuilder @@ -316,7 +355,7 @@ def setup_config(self, training_job_name): from sagemaker.core.helper.session_helper import get_execution_role return { "training_job_name": training_job_name, - "region": "us-west-2", + "region": AWS_REGION, "bucket": "models-sdk-testing-pdx", "role_arn": get_execution_role() } @@ -336,29 +375,48 @@ def s3_client(self, setup_config): @pytest.fixture(scope="class") def bedrock_client(self, setup_config): - """Create Bedrock client.""" + """Create Bedrock client. Eagerly cleans up test import jobs older than 24h.""" + client = boto3.client('bedrock', region_name=setup_config["region"]) - # Cleanup existing import jobs + try: + cutoff = datetime.now(timezone.utc) - timedelta(hours=24) jobs = client.list_model_import_jobs() for job in jobs.get('modelImportJobSummaries', []): - if job['jobName'].startswith('test-bedrock-'): + if not job['jobName'].startswith('test-bedrock-'): + continue + created = job.get('creationTime') or job.get('lastModifiedTime') + if created and created < cutoff: try: - client.stop_model_import_job(jobIdentifier=job['jobArn']) - except Exception: - pass - except Exception: - pass + status = job.get('status') + if status in ('InProgress', 'Pending'): + client.stop_model_import_job(jobIdentifier=job['jobArn']) + elif status == 'Completed' and job.get('importedModelArn'): + client.delete_imported_model( + modelIdentifier=job['importedModelArn'] + ) + except Exception as e: + logger.warning(f"Eager cleanup failed for {job['jobName']}: {e}") + except Exception as e: + logger.warning(f"Failed to list import jobs for eager cleanup: {e}") + return client @pytest.fixture(scope="class") def bedrock_runtime(self, setup_config): """Create Bedrock runtime client.""" - return boto3.client('bedrock-runtime', region_name=setup_config["region"]) + # Adding config based on: https://docs.aws.amazon.com/bedrock/latest/userguide/invoke-imported-model.html#handle-model-not-ready-exception + config = Config( + retries={ + 'total_max_attempts': 10, + 'mode': 'standard' + } + ) + return boto3.client('bedrock-runtime', region_name=setup_config["region"], config=config) @pytest.fixture(scope="class") def deployed_model_arn(self, training_job, bedrock_client, s3_client, setup_config): - """Deploy model and return ARN.""" + """Deploy model and return ARN. Cleans up the imported model after tests.""" self._setup_model_files(training_job, s3_client, setup_config) job_name = f"test-bedrock-{random.randint(1000, 9999)}-{int(time.time())}" @@ -373,21 +431,37 @@ def deployed_model_arn(self, training_job, bedrock_client, s3_client, setup_conf job_arn = deployment_result['jobArn'] - # Wait for completion - while True: + # Wait for completion (max 1 hour wait) + max_wait = 60 * 60 # 60 minutes + start = time.time() + while time.time() - start < max_wait: response = bedrock_client.get_model_import_job(jobIdentifier=job_arn) status = response['status'] if status in ['Completed', 'Failed']: break time.sleep(30) + else: + pytest.fail(f"Model import job timed out after {max_wait}s") - model_arn = response['importedModelName'] - return model_arn + if status == 'Failed': + pytest.fail( + f"Model import job failed: {response.get('failureMessage', 'unknown reason')}") + + model_arn = response['importedModelArn'] + + yield model_arn + + # Cleanup: delete the imported model + try: + logger.info(f"Cleaning up imported model: {model_arn}") + bedrock_client.delete_imported_model(modelIdentifier=model_arn) + logger.info(f"Successfully deleted imported model: {model_arn}") + except Exception as e: + logger.warning(f"Failed to delete imported model {model_arn}: {e}") except Exception as e: - # If there's an issue with the new sagemaker-core integration, provide helpful error info pytest.fail( - f"Deployment failed with error: {str(e)}.") + f"Bedrock deployment failed with error: {str(e)}.") def _setup_model_files(self, training_job, s3_client, setup_config): """Setup required model files for Bedrock deployment.""" @@ -504,24 +578,79 @@ def test_bedrock_job_created(self, deployed_model_arn): """Test that Bedrock import job was created successfully.""" assert deployed_model_arn is not None - def test_zzz_cleanup_deployed_model(self, bedrock_client): - """Cleanup deployed model and import jobs (runs last due to zzz prefix).""" - if hasattr(self, 'model_arn_for_cleanup'): + # Note: Below test is flaky and fails due to model not ready exception. + # Documentation recommends retries: https://docs.aws.amazon.com/bedrock/latest/userguide/invoke-imported-model.html#handle-model-not-ready-exception. + # TODO: Fix using provisioned throughput or better wait mechanism + @pytest.mark.slow + def test_bedrock_model_invoke(self, deployed_model_arn, bedrock_runtime): + logger.warning( + "This test is known to be flaky due to 'model not ready' exceptions from Bedrock. " + "See: https://docs.aws.amazon.com/bedrock/latest/userguide/invoke-imported-model.html" + "#handle-model-not-ready-exception" + ) + """Test invoking the imported Bedrock model to ensure it works end-to-end. + + Retries on failure since models can take several minutes + to become ready after import. + """ + max_retries = 2 + base_delay = 10 + + for attempt in range(max_retries): try: - bedrock_client.delete_imported_model(modelIdentifier=self.model_arn_for_cleanup) - except Exception: - pass - # Cleanup all test import jobs + response = bedrock_runtime.invoke_model( + modelId=deployed_model_arn, + body=json.dumps({ + "prompt": "What is the capital of France?", + "max_gen_len": 100, + "temperature": 0.7, + "top_p": 0.9 + }) + ) + + result = json.loads(response['body'].read().decode()) + + # Validate response structure + assert "generation" in result, "Response missing 'generation' field" + assert isinstance(result["generation"], str), "'generation' should be a string" + assert len(result["generation"]) > 0, "'generation' should not be empty" + return # Success + + except Exception as e: + if attempt < max_retries - 1: + logger.info( + f"Invoke failed (attempt {attempt + 1}/{max_retries}): {e}. " + f"Retrying in {base_delay}s..." + ) + time.sleep(base_delay) + else: + pytest.fail( + f"Invoke failed after {max_retries} attempts. " + f"Last error: {e}" + ) + + + @pytest.fixture(scope="class", autouse=True) + def cleanup_import_jobs(self, bedrock_client): + """Cleanup any leftover test import jobs after all tests in this class.""" + yield try: jobs = bedrock_client.list_model_import_jobs() for job in jobs.get('modelImportJobSummaries', []): if job['jobName'].startswith('test-bedrock-'): try: - bedrock_client.stop_model_import_job(jobIdentifier=job['jobArn']) - except Exception: - pass - except Exception: - pass + # Stop in-progress jobs + if job.get('status') in ('InProgress', 'Pending'): + bedrock_client.stop_model_import_job(jobIdentifier=job['jobArn']) + # Delete completed imported models + elif job.get('status') == 'Completed' and job.get('importedModelArn'): + bedrock_client.delete_imported_model( + modelIdentifier=job['importedModelArn'] + ) + except Exception as e: + logger.warning(f"Cleanup failed for job {job['jobName']}: {e}") + except Exception as e: + logger.warning(f"Failed to list/cleanup import jobs: {e}") def test_model_customization_workflow(training_job_name): diff --git a/sagemaker-serve/tests/integ/test_nova_model_customization_deployment.py b/sagemaker-serve/tests/integ/test_nova_model_customization_deployment.py new file mode 100644 index 0000000000..c6b59dfc10 --- /dev/null +++ b/sagemaker-serve/tests/integ/test_nova_model_customization_deployment.py @@ -0,0 +1,596 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Integration tests for ModelBuilder model customization deployment.""" +from __future__ import absolute_import + +import boto3 +import json +import logging +import os +import time +import pytest +import random +from sagemaker.serve import ModelBuilder +from sagemaker.core.resources import TrainingJob + +logger = logging.getLogger(__name__) + +from sagemaker.core.helper.session_helper import Session + +# This test relies on resources in a specific region +AWS_REGION = "us-east-1" +os.environ.setdefault("AWS_DEFAULT_REGION", AWS_REGION) + + +@pytest.fixture(scope="module") +def sagemaker_session(): + """Create a SageMaker session with explicit region.""" + boto_session = boto3.Session(region_name=AWS_REGION) + return Session(boto_session=boto_session) + + +@pytest.fixture(scope="module") +def training_job_name(): + """Training job name for testing.""" + return "nova-textgeneration-lite-sft-integ-test-reusable-model-20260531" + +@pytest.fixture(scope="module") +def model_package_arn(): + """Model package ARN for testing.""" + return "arn:aws:sagemaker:us-west-2:729646638167:model-package/sdk-test-finetuned-models/1" + + +@pytest.fixture +def endpoint_name(): + """Generate unique endpoint name.""" + import time + return f"e2e-nova-{int(time.time())}-{random.randint(100, 10000)}" + + +@pytest.fixture(scope="module") +def cleanup_endpoints(): + """Track endpoints to cleanup after tests.""" + endpoints_to_cleanup = [] + yield endpoints_to_cleanup + + for ep_name in endpoints_to_cleanup: + try: + from sagemaker.core.resources import Endpoint + endpoint = Endpoint.get(endpoint_name=ep_name, region=AWS_REGION) + endpoint.delete() + except Exception: + pass + + +class TestModelCustomizationFromTrainingJob: + """Test model customization deployment from TrainingJob.""" + + # def test_build_from_training_job(self, training_job_name, sagemaker_session): + # """Test building model from training job.""" + # from sagemaker.core.resources import TrainingJob + # from sagemaker.serve import ModelBuilder + # import time + + # training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + # model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + # model_builder.accept_eula = True + # model = model_builder.build(model_name=f"test-model-{int(time.time())}-{random.randint(100, 10000)}", region=AWS_REGION) + + # assert model is not None + # assert model.model_arn is not None + # assert model_builder.image_uri is not None + # assert model_builder.instance_type is not None + + def test_deploy_from_training_job(self, training_job_name, endpoint_name, cleanup_endpoints, sagemaker_session): + """Test deploying model from training job. + """ + + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder(model=training_job, instance_type="ml.g6.48xlarge", sagemaker_session=sagemaker_session) + model_builder.accept_eula = True + model_builder.build(model_name=f"test-model-{int(time.time())}-{random.randint(100, 10000)}", region=AWS_REGION) + + # peft_type = model_builder._fetch_peft() + # adapter_name = f"{endpoint_name}-adapter" + + endpoint = model_builder.deploy( + endpoint_name=endpoint_name, + ) + + cleanup_endpoints.append(endpoint_name) + + assert endpoint is not None + assert endpoint.endpoint_arn is not None + assert endpoint.endpoint_status == "InService" + + # Invoke verification + time.sleep(10) # brief buffer for IC readiness + + invoke_response = endpoint.invoke( + body=json.dumps({ + "messages": [ + {"role": "user", "content": [{"type": "text", "text": "What is 7+7?"}]} + ] + }), + content_type="application/json", + accept="application/json" + ) + + response_body = json.loads(invoke_response.body.read()) + + # Validate response structure + assert response_body is not None, f"Empty response from invoke on {endpoint_name}" + assert isinstance(response_body, dict) + + + def test_fetch_endpoint_names_for_base_model(self, training_job_name, sagemaker_session): + """Test fetching endpoint names for base model.""" + from sagemaker.core.resources import TrainingJob + from sagemaker.serve import ModelBuilder + + training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) + model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + endpoint_names = model_builder.fetch_endpoint_names_for_base_model() + + assert isinstance(endpoint_names, set) + + +# class TestModelCustomizationFromModelPackage: + +# def test_build_from_model_package(self, model_package_arn, sagemaker_session): +# """Test building model from model package.""" +# from sagemaker.core.resources import ModelPackage +# from sagemaker.serve import ModelBuilder + +# model_package = ModelPackage.get(model_package_name=model_package_arn, region=AWS_REGION) +# model_builder = ModelBuilder(model=model_package, sagemaker_session=sagemaker_session) +# model_builder.accept_eula = True +# model = model_builder.build(region=AWS_REGION) + +# assert model is not None +# assert model.model_arn is not None + +# def test_deploy_from_model_package(self, model_package_arn, cleanup_endpoints, sagemaker_session): +# """Test deploying model from model package.""" +# from sagemaker.core.resources import ModelPackage +# from sagemaker.serve import ModelBuilder +# import time + +# model_package = ModelPackage.get(model_package_name=model_package_arn, region=AWS_REGION) +# endpoint_name = f"e2e-{int(time.time())}-{random.randint(100, 10000)}" +# model_builder = ModelBuilder(model=model_package, sagemaker_session=sagemaker_session) +# model_builder.accept_eula = True +# model_builder.build(region=AWS_REGION) +# endpoint = model_builder.deploy(endpoint_name=endpoint_name) + +# cleanup_endpoints.append(endpoint_name) + +# assert endpoint is not None +# assert endpoint.endpoint_arn is not None + + +# class TestInstanceTypeAutoDetection: +# """Test automatic instance type detection.""" + +# def test_instance_type_from_recipe(self, training_job_name, sagemaker_session): +# """Test instance type auto-detection from recipe.""" +# from sagemaker.core.resources import TrainingJob +# from sagemaker.serve import ModelBuilder + +# training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) +# model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) +# model_builder.accept_eula = True +# model_builder.build(region=AWS_REGION) + +# assert model_builder.instance_type is not None +# assert "ml." in model_builder.instance_type + + +# class TestModelCustomizationDetection: +# """Test model customization detection logic.""" + +# def test_is_model_customization_training_job(self, training_job_name, sagemaker_session): +# """Test detection from training job.""" +# from sagemaker.core.resources import TrainingJob +# from sagemaker.serve import ModelBuilder + +# training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) +# model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + +# assert model_builder._is_model_customization() is True + +# def test_is_model_customization_model_package(self, model_package_arn, sagemaker_session): +# """Test detection from model package.""" +# from sagemaker.core.resources import ModelPackage +# from sagemaker.serve import ModelBuilder + +# model_package = ModelPackage.get(model_package_name=model_package_arn, region=AWS_REGION) +# model_builder = ModelBuilder(model=model_package, sagemaker_session=sagemaker_session) + +# assert model_builder._is_model_customization() is True + +# def test_fetch_model_package_arn(self, training_job_name, sagemaker_session): +# """Test fetching model package ARN.""" +# from sagemaker.core.resources import TrainingJob +# from sagemaker.serve import ModelBuilder + +# training_job = TrainingJob.get(training_job_name=training_job_name, region=AWS_REGION) +# model_builder = ModelBuilder(model=training_job, sagemaker_session=sagemaker_session) + +# arn = model_builder._fetch_model_package_arn() + +# assert arn is not None +# assert "model-package" in arn + + +# class TestTrainerIntegration: +# """Test ModelBuilder integration with SFTTrainer and DPOTrainer.""" + +# def test_sft_trainer_build(self, training_job_name, sagemaker_session): +# """Test building model from SFTTrainer.""" +# from sagemaker.core.resources import TrainingJob +# from sagemaker.train.sft_trainer import SFTTrainer +# from sagemaker.serve import ModelBuilder + +# training_job = TrainingJob.get( +# training_job_name=training_job_name, region=AWS_REGION +# ) + +# trainer = SFTTrainer( +# model="meta-textgeneration-llama-3-2-1b-instruct", +# training_dataset="s3://dummy/data.jsonl", +# accept_eula=True, +# model_package_group="test-group" +# ) +# trainer._latest_training_job = training_job + +# model_builder = ModelBuilder(model=trainer, sagemaker_session=sagemaker_session) +# model = model_builder.build(region=AWS_REGION) + +# assert model is not None +# assert model.model_arn is not None + +# def test_dpo_trainer_build(self, training_job_name, sagemaker_session): +# """Test building model from DPOTrainer.""" +# from sagemaker.core.resources import TrainingJob +# from sagemaker.train.dpo_trainer import DPOTrainer +# from sagemaker.serve import ModelBuilder +# from unittest.mock import patch + +# training_job = TrainingJob.get( +# training_job_name=training_job_name, region=AWS_REGION +# ) + +# with patch('sagemaker.train.common_utils.finetune_utils._get_fine_tuning_options_and_model_arn', +# return_value=(None, None)): +# trainer = DPOTrainer( +# model="meta-textgeneration-llama-3-2-1b-instruct", +# training_dataset="s3://dummy/data.jsonl", +# accept_eula=True, +# model_package_group="test-group" +# ) +# trainer._latest_training_job = training_job + +# model_builder = ModelBuilder(model=trainer, sagemaker_session=sagemaker_session) +# model = model_builder.build(region=AWS_REGION) + +# assert model is not None +# assert model.model_arn is not None + + +# """Integration tests for model customization deployment to Bedrock. + +# Updated for sagemaker-core integration: +# - Added ModelPackage import for new model handling +# - Enhanced error handling for sagemaker-core compatibility issues +# - Updated model artifacts access to handle both old and new patterns +# - Added fallback logic for different model artifact locations +# - Improved test assertions to work with new object structures +# """ + +# import json +# import time +# import pytest +# from sagemaker.core.resources import TrainingJob, ModelPackage +# from sagemaker.serve.bedrock_model_builder import BedrockModelBuilder + + +# class TestModelCustomizationDeployment: +# """Test suite for deploying fine-tuned models to Bedrock.""" + +# @pytest.fixture(scope="class") +# def setup_config(self, training_job_name): +# """Setup test configuration.""" +# from sagemaker.core.helper.session_helper import get_execution_role +# return { +# "training_job_name": training_job_name, +# "region": AWS_REGION, +# "bucket": "models-sdk-testing-pdx", +# "role_arn": get_execution_role() +# } + +# @pytest.fixture(scope="class") +# def training_job(self, setup_config): +# """Get the training job.""" +# return TrainingJob.get( +# training_job_name=setup_config["training_job_name"], +# region=setup_config["region"], +# ) + +# @pytest.fixture(scope="class") +# def s3_client(self, setup_config): +# """Create S3 client.""" +# return boto3.client('s3', region_name=setup_config["region"]) + +# @pytest.fixture(scope="class") +# def bedrock_client(self, setup_config): +# """Create Bedrock client.""" +# client = boto3.client('bedrock', region_name=setup_config["region"]) +# # Cleanup existing import jobs +# try: +# jobs = client.list_model_import_jobs() +# for job in jobs.get('modelImportJobSummaries', []): +# if job['jobName'].startswith('test-bedrock-'): +# try: +# client.stop_model_import_job(jobIdentifier=job['jobArn']) +# except Exception: +# pass +# except Exception: +# pass +# return client + +# @pytest.fixture(scope="class") +# def bedrock_runtime(self, setup_config): +# """Create Bedrock runtime client.""" +# return boto3.client('bedrock-runtime', region_name=setup_config["region"]) + +# @pytest.fixture(scope="class") +# def deployed_model_arn(self, training_job, bedrock_client, s3_client, setup_config): +# """Deploy model and return ARN.""" +# self._setup_model_files(training_job, s3_client, setup_config) + +# job_name = f"test-bedrock-{random.randint(1000, 9999)}-{int(time.time())}" +# bedrock_builder = BedrockModelBuilder(model=training_job) + +# try: +# deployment_result = bedrock_builder.deploy( +# job_name=job_name, +# imported_model_name=job_name, +# role_arn=setup_config["role_arn"] +# ) + +# job_arn = deployment_result['jobArn'] + +# # Wait for completion +# while True: +# response = bedrock_client.get_model_import_job(jobIdentifier=job_arn) +# status = response['status'] +# if status in ['Completed', 'Failed']: +# break +# time.sleep(30) + +# model_arn = response['importedModelArn'] +# return model_arn + +# except Exception as e: +# # If there's an issue with the new sagemaker-core integration, provide helpful error info +# pytest.fail( +# f"Deployment failed with error: {str(e)}.") + +# def _setup_model_files(self, training_job, s3_client, setup_config): +# """Setup required model files for Bedrock deployment.""" +# # Get S3 model artifacts path from training job +# try: +# # Try to access model artifacts from training job +# if hasattr(training_job, 'model_artifacts') and hasattr(training_job.model_artifacts, 's3_model_artifacts'): +# base_s3_path = training_job.model_artifacts.s3_model_artifacts +# elif hasattr(training_job, 'output_model_package_arn'): +# # If training job has model package ARN, get artifacts from model package +# model_package = ModelPackage.get(training_job.output_model_package_arn, region=AWS_REGION) +# if hasattr(model_package, +# 'inference_specification') and model_package.inference_specification.containers: +# container = model_package.inference_specification.containers[0] +# if hasattr(container, 'model_data_source') and container.model_data_source: +# # Access s3_uri from the s3_data_source attribute +# if hasattr(container.model_data_source, +# 's3_data_source') and container.model_data_source.s3_data_source: +# base_s3_path = container.model_data_source.s3_data_source.s3_uri +# else: +# # Fallback to model_data_url if available +# base_s3_path = getattr(container, 'model_data_url', None) +# else: +# # Fallback to model_data_url if available +# base_s3_path = getattr(container, 'model_data_url', None) +# else: +# raise AttributeError("Cannot find model artifacts in model package") +# else: +# raise AttributeError("Cannot find model artifacts in training job") + +# if not base_s3_path: +# raise ValueError("Model artifacts S3 path is empty") + +# except Exception as e: +# pytest.fail( +# f"Failed to get model artifacts path: {str(e)}. This might be due to sagemaker-core integration changes.") + +# bucket = setup_config["bucket"] + +# # Create bucket if it doesn't exist +# try: +# s3_client.head_bucket(Bucket=bucket) +# except Exception: +# try: +# s3_client.create_bucket( +# Bucket=bucket, +# CreateBucketConfiguration={'LocationConstraint': setup_config["region"]} +# ) +# except Exception: +# pass + +# # Copy files from hf_merged to root +# hf_merged_prefix = base_s3_path.replace(f's3://{bucket}/', '') + 'checkpoints/hf_merged/' +# root_prefix = base_s3_path.replace(f's3://{bucket}/', '') + '/' + +# files_to_copy = ['config.json', 'tokenizer.json', 'tokenizer_config.json', 'model.safetensors'] + +# for file in files_to_copy: +# try: +# s3_client.head_object(Bucket=bucket, Key=root_prefix + file) +# except Exception: +# try: +# s3_client.copy_object( +# Bucket=bucket, +# CopySource={'Bucket': bucket, 'Key': hf_merged_prefix + file}, +# Key=root_prefix + file +# ) +# except Exception as e: +# print(f"Warning: Could not copy {file}: {str(e)}") + +# # Create added_tokens.json if missing +# try: +# s3_client.head_object(Bucket=bucket, Key=root_prefix + 'added_tokens.json') +# except Exception: +# try: +# s3_client.put_object( +# Bucket=bucket, +# Key=root_prefix + 'added_tokens.json', +# Body=json.dumps({}), +# ContentType='application/json' +# ) +# except Exception as e: +# print(f"Warning: Could not create added_tokens.json: {str(e)}") + +# def test_training_job_exists(self, training_job): +# """Test that the training job exists and is completed.""" +# assert training_job is not None +# assert training_job.training_job_status == "Completed" +# # Check for model artifacts in different possible locations due to sagemaker-core changes +# has_artifacts = ( +# hasattr(training_job, 'model_artifacts') or +# hasattr(training_job, 'output_model_package_arn') +# ) +# assert has_artifacts, "Training job should have model artifacts or model package ARN" + +# def test_bedrock_model_builder_creation(self, training_job): +# """Test BedrockModelBuilder creation.""" +# try: +# bedrock_builder = BedrockModelBuilder(model=training_job) +# assert bedrock_builder is not None +# assert bedrock_builder.model == training_job + +# # Test that the builder can fetch model package if needed +# if hasattr(bedrock_builder, 'model_package'): +# # This tests the new sagemaker-core integration +# assert bedrock_builder.model_package is not None or bedrock_builder.model_package is None + +# except Exception as e: +# pytest.fail( +# f"BedrockModelBuilder creation failed: {str(e)}. This might be due to sagemaker-core integration issues.") + +# @pytest.mark.slow +# def test_bedrock_job_created(self, deployed_model_arn): +# """Test that Bedrock import job was created successfully.""" +# assert deployed_model_arn is not None + +# @pytest.mark.slow +# def test_bedrock_model_invoke(self, deployed_model_arn, bedrock_runtime): +# """Test invoking the imported Bedrock model to ensure it works end-to-end. + +# Retries on failure since models can take several minutes +# to become ready after import. +# """ +# max_retries = 5 +# base_delay = 10 + +# for attempt in range(max_retries): +# try: +# response = bedrock_runtime.invoke_model( +# modelId=deployed_model_arn, +# body=json.dumps({ +# "prompt": "What is the capital of France?", +# "max_gen_len": 100, +# "temperature": 0.7, +# "top_p": 0.9 +# }) +# ) + +# result = json.loads(response['body'].read().decode()) + +# # Validate response structure +# assert "generation" in result, "Response missing 'generation' field" +# assert isinstance(result["generation"], str), "'generation' should be a string" +# assert len(result["generation"]) > 0, "'generation' should not be empty" +# return # Success + +# except Exception as e: +# if attempt < max_retries - 1: +# logger.info( +# f"Invoke failed (attempt {attempt + 1}/{max_retries}): {e}. " +# f"Retrying in {base_delay}s..." +# ) +# time.sleep(base_delay) +# else: +# pytest.fail( +# f"Invoke failed after {max_retries} attempts. " +# f"Last error: {e}" +# ) + + +# def test_zzz_cleanup_deployed_model(self, bedrock_client): +# """Cleanup deployed model and import jobs (runs last due to zzz prefix).""" +# if hasattr(self, 'model_arn_for_cleanup'): +# try: +# bedrock_client.delete_imported_model(modelIdentifier=self.model_arn_for_cleanup) +# except Exception: +# pass +# # Cleanup all test import jobs +# try: +# jobs = bedrock_client.list_model_import_jobs() +# for job in jobs.get('modelImportJobSummaries', []): +# if job['jobName'].startswith('test-bedrock-'): +# try: +# bedrock_client.stop_model_import_job(jobIdentifier=job['jobArn']) +# except Exception: +# pass +# except Exception: +# pass + + +# def test_model_customization_workflow(training_job_name): +# """Standalone test function for pytest discovery. + +# Uses explicit region parameter for all SDK calls. +# """ +# config = { +# "training_job_name": training_job_name, +# "region": "us-west-2", +# "bucket": "open-models-testing-pdx" +# } + +# try: +# s3_client = boto3.client('s3', region_name=config["region"]) +# training_job = TrainingJob.get(training_job_name=config["training_job_name"], region=config["region"]) + +# test_class = TestModelCustomizationDeployment() +# test_class.test_training_job_exists(training_job) +# test_class.test_bedrock_model_builder_creation(training_job) + +# except Exception as e: +# print(f"Standalone test failed: {str(e)}") +# print("This might be due to sagemaker-core integration issues. Please check:") +# print("1. TrainingJob.get() method compatibility") +# print("2. Model artifacts access patterns") +# print("3. BedrockModelBuilder initialization with new sagemaker-core objects") +# raise + +