diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index 541dc4ea8e87..8ed972c9f579 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 2 + "revision": 3 } diff --git a/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml index ea1c255f7cc9..afa437b64de1 100644 --- a/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PostCommit_Yaml_Xlang_Direct.yml @@ -80,7 +80,7 @@ jobs: - name: run PostCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:postCommitYamlIntegrationTests -PyamlTestSet=${{ matrix.test_set }} -PbeamPythonExtra=ml_test,yaml + gradle-command: :sdks:python:postCommitYamlIntegrationTests -PyamlTestSet=${{ matrix.test_set }} - name: Archive Python Test Results uses: actions/upload-artifact@v7 if: failure() diff --git a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml index 34a1af741f74..bde75568101b 100644 --- a/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml +++ b/.github/workflows/beam_PreCommit_Yaml_Xlang_Direct.yml @@ -91,7 +91,7 @@ jobs: - name: run PreCommit Yaml Xlang Direct script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:python:yamlIntegrationTests -PbeamPythonExtra=ml_test,yaml + gradle-command: :sdks:python:yamlIntegrationTests - name: Archive Python Test Results uses: actions/upload-artifact@v7 if: failure() diff --git a/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml b/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml new file mode 100644 index 000000000000..8728a6f544ad --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/runinference_huggingface.yaml @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - text: "I love Apache Beam!" + - text: "I hate this error." + - type: RunInference + config: + model_handler: + type: "HuggingFacePipeline" + config: + task: "text-classification" + inference_fn: + callable: | + def real_inference(batch, pipeline, inference_args): + predictions = pipeline(batch, **inference_args) + + # If it's a single dictionary (batch size of 1), wrap it in a list + if isinstance(predictions, dict): + predictions = [predictions] + + return { + 'label': [p['label'] for p in predictions], + 'score': [p['score'] for p in predictions] + } + preprocess: + callable: 'lambda x: x.text' + - type: MapToFields + config: + language: python + fields: + text: text + sentiment: + callable: 'lambda x: x.inference.inference["label"]' + - type: AssertEqual + config: + elements: + - text: "I love Apache Beam!" + sentiment: "POSITIVE" + - text: "I hate this error." + sentiment: "NEGATIVE" + + options: + yaml_experimental_features: ['ML'] diff --git a/sdks/python/apache_beam/yaml/tests/runinference.yaml b/sdks/python/apache_beam/yaml/tests/runinference_vertexai.yaml similarity index 100% rename from sdks/python/apache_beam/yaml/tests/runinference.yaml rename to sdks/python/apache_beam/yaml/tests/runinference_vertexai.yaml diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index 51f18c733046..188530180c46 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -282,6 +282,55 @@ def inference_output_type(self): ('model_id', Optional[str])]) +@ModelHandlerProvider.register_handler_type('HuggingFacePipeline') +class HuggingFacePipelineProvider(ModelHandlerProvider): + def __init__( + self, + task: Optional[str] = None, + model: Optional[str] = None, + preprocess: Optional[dict[str, str]] = None, + postprocess: Optional[dict[str, str]] = None, + device: Optional[Any] = None, + inference_fn: Optional[dict[str, str]] = None, + load_pipeline_args: Optional[dict[str, Any]] = None, + **kwargs): + try: + from apache_beam.ml.inference.huggingface_inference import HuggingFacePipelineModelHandler + except ImportError: + raise ValueError( + 'Unable to import HuggingFacePipelineModelHandler. Please ' + 'install transformers dependencies.') + + kwargs = {k: v for k, v in kwargs.items() if not k.startswith('_')} + + inference_fn_obj = self.parse_processing_transform( + inference_fn, 'inference_fn') if inference_fn else None + + handler_kwargs = {} + if inference_fn_obj: + handler_kwargs['inference_fn'] = inference_fn_obj + + _handler = HuggingFacePipelineModelHandler( + task=task, + model=model, + device=device, + load_pipeline_args=load_pipeline_args, + **handler_kwargs, + **kwargs) + + super().__init__(_handler, preprocess, postprocess) + + @staticmethod + def validate(config): + if not config or (not config.get('task') and not config.get('model')): + raise ValueError( + "HuggingFacePipeline requires either 'task' or " + "'model' to be specified.") + + def inference_output_type(self): + return Any + + @beam.ptransform.ptransform_fn def run_inference( pcoll, diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index 5f09dff57e8f..9a2728f2077f 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -124,10 +124,35 @@ tasks.register("generateYamlDocs") { outputs.file "${buildDir}/yaml-examples.html" } +tasks.register("installYamlIntegrationTestDeps") { + dependsOn installGcpTest + doLast { + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && " + + "py_ver=\$(python -c 'import sys; print(f\"{sys.version_info.major}{sys.version_info.minor}\")') && " + + "ml_extra=\"p\${py_ver}_ml_test\" && " + + "constraint_file=\"\" && " + + "if [ -f \"container/ml/py\${py_ver}/base_image_requirements.txt\" ]; then " + + " constraint_file=\"container/ml/py\${py_ver}/base_image_requirements.txt\"; " + + "elif [ -f \"container/py\${py_ver}/base_image_requirements.txt\" ]; then " + + " constraint_file=\"container/py\${py_ver}/base_image_requirements.txt\"; " + + "fi && " + + "if [ -n \"\$constraint_file\" ]; then " + + " echo \"Installing with constraint file: \$constraint_file\" && " + + " pip install --pre --retries 10 -c \"\$constraint_file\" ${buildDir}/apache-beam.tar.gz[\$ml_extra,yaml,transformers] --extra-index-url https://download.pytorch.org/whl/cpu; " + + "else " + + " echo \"No constraint file found, installing without constraints\" && " + + " pip install --pre --retries 10 ${buildDir}/apache-beam.tar.gz[\$ml_extra,yaml,transformers] --extra-index-url https://download.pytorch.org/whl/cpu; " + + "fi" + } + } +} + tasks.register("yamlIntegrationTests") { description "Runs precommit integration tests for yaml pipelines." - dependsOn installGcpTest + dependsOn installYamlIntegrationTestDeps // Need to build all expansion services referenced in apache_beam/yaml/*.* // grep -oh 'sdk.*Jar' sdks/python/apache_beam/yaml/*.yaml | sort | uniq dependsOn ":sdks:java:extensions:schemaio-expansion-service:shadowJar" @@ -146,7 +171,7 @@ tasks.register("yamlIntegrationTests") { tasks.register("postCommitYamlIntegrationTests") { description "Runs postcommit integration tests for yaml pipelines - parameterized by yamlTestSet." - dependsOn installGcpTest + dependsOn installYamlIntegrationTestDeps // Need to build all expansion services referenced in apache_beam/yaml/*.* // grep -oh 'sdk.*Jar' sdks/python/apache_beam/yaml/*.yaml | sort | uniq dependsOn ":sdks:java:extensions:schemaio-expansion-service:shadowJar" diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 4c1384c31517..44d74b1d9d1d 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -211,9 +211,9 @@ def cythonize(*args, **kwargs): # proto-plus<1.24 caps protobuf<5; opentelemetry-proto (via ADK) needs # protobuf>=5. Scoped here so the main dependency list stays broader. 'proto-plus>=1.26.1,<2', - 'opentelemetry-api==1.37.0', - 'opentelemetry-sdk==1.37.0', - 'opentelemetry-exporter-otlp-proto-http==1.37.0', + 'opentelemetry-api>=1.37.0,<2', + 'opentelemetry-sdk>=1.37.0,<2', + 'opentelemetry-exporter-otlp-proto-http>=1.37.0,<2', # protobuf>=5 (ADK/OTel); tf2onnx 1.16.x pins protobuf~=3.20 only. 'tf2onnx>=1.17.0,<1.18', ] @@ -609,9 +609,8 @@ def get_portability_package_data(): 'p310_ml_test': [ 'datatable', ] + ml_base, - 'p312_ml_test': [ - 'datatable', - ] + ml_base, + 'p311_ml_test': ml_base, + 'p312_ml_test': ml_base, # maintainer: milvus tests only run with this extension. Make sure it # is covered by docker-in-docker test when changing py version 'p313_ml_test': ml_base + milvus_dependency,