diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 91226bd08ee3..931ae76b69d5 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "38069", - "modification": 41 + "modification": 42 } diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 136c320da009..542a44edd702 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -88,6 +88,16 @@ # One of the choices for user to use for requirements cache during staging SKIP_REQUIREMENTS_CACHE = 'skip' +# Ordered list of manylinux tags from newest (strictest) to oldest (most compatible) +# paired with the minimum pip version required to support the tag. +# See https://github.com/pypa/manylinux. +_MANYLINUX_PLATFORMS = [ + ('manylinux_2_28_x86_64', '20.3'), + ('manylinux2014_x86_64', '19.3'), # equivalent to manylinux_2_17 + ('manylinux2010_x86_64', + '0.0'), # equivalent to manylinux_2_12, the fallback if pip is too old +] + _LOGGER = logging.getLogger(__name__) @@ -717,30 +727,6 @@ def _extract_local_packages(requirements_file): else: return [], requirements_file - @staticmethod - def _get_platform_for_default_sdk_container(): - """ - Get the platform for apache beam SDK container based on Pip version. - - Note: pip is still expected to download compatible wheel of a package - with platform tag manylinux1 if the package on PyPI doesn't - have (manylinux2014) or (manylinux2010) wheels. - Reference: https://www.python.org/dev/peps/pep-0599/#id21 - """ - - # TODO(anandinguva): When https://github.com/pypa/pip/issues/10760 is - # addressed, download wheel based on glibc version in Beam's Python - # Base image - pip_version = distribution('pip').version - # See more information about manylinux at - # https://github.com/pypa/manylinux - if version.parse(pip_version) >= version.parse('20.3'): - return 'manylinux_2_28_x86_64' - elif version.parse(pip_version) >= version.parse('19.3'): - return 'manylinux2014_x86_64' - else: - return 'manylinux2010_x86_64' - @staticmethod @retry.with_exponential_backoff( num_retries=4, retry_filter=retry_on_non_zero_exit) @@ -764,40 +750,44 @@ def _populate_requirements_cache( # requirements file. download_dir = tempfile.mkdtemp(dir=temp_directory) - cmd_args = [ - Stager._get_python_executable(), - '-m', - 'pip', - 'download', - '--dest', - download_dir, - '--find-links', - cache_dir, - '-r', - tmp_requirements_filepath, - '--exists-action', - 'i', - '--no-deps' - ] + # Read packages from the requirements file + requirements = [] + with open(tmp_requirements_filepath, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + requirements.append(line) + + for req in requirements: + cmd_args = [ + Stager._get_python_executable(), + '-m', + 'pip', + 'download', + '--find-links', + cache_dir, + req, + '--exists-action', + 'i', + '--no-deps' + ] + + if populate_cache_with_sdists: + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + cmd_args.extend( + ['--dest', attempt_download_dir, '--no-binary', ':all:']) + _LOGGER.info('Executing command: %s', cmd_args) + processes.check_output(cmd_args, stderr=processes.STDOUT) + else: + attempt_download_dir = Stager._download_pypi_packages( + cmd_args, temp_directory) - if populate_cache_with_sdists: - cmd_args.extend(['--no-binary', ':all:']) - else: - language_implementation_tag = 'cp' - abi_suffix = 'm' if sys.version_info < (3, 8) else '' - abi_tag = 'cp%d%d%s' % ( - sys.version_info[0], sys.version_info[1], abi_suffix) - platform_tag = Stager._get_platform_for_default_sdk_container() - cmd_args.extend([ - '--implementation', - language_implementation_tag, - '--abi', - abi_tag, - '--platform', - platform_tag - ]) - _LOGGER.info('Executing command: %s', cmd_args) - processes.check_output(cmd_args, stderr=processes.STDOUT) + # Move downloaded packages to our common download directory + for pkg_file in os.listdir(attempt_download_dir): + src_path = os.path.join(attempt_download_dir, pkg_file) + dest_path = os.path.join(download_dir, pkg_file) + if not os.path.exists(dest_path): + shutil.move(src_path, dest_path) # Get list of downloaded packages and copy them to the cache downloaded_packages = set() @@ -811,6 +801,53 @@ def _populate_requirements_cache( return downloaded_packages + @staticmethod + def _download_pypi_packages(cmd_args, temp_directory): + language_implementation_tag = 'cp' + abi_suffix = 'm' if sys.version_info < (3, 8) else '' + abi_tag = 'cp%d%d%s' % ( + sys.version_info[0], sys.version_info[1], abi_suffix) + pip_version = distribution('pip').version + platforms = [ + platform for platform, min_pip_version in _MANYLINUX_PLATFORMS + if version.parse(pip_version) >= version.parse(min_pip_version) + ] + + last_exception = None + for idx, platform in enumerate(platforms): + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + attempt_cmd_args = cmd_args + [ + '--dest', + attempt_download_dir, + '--implementation', + language_implementation_tag, + '--abi', + abi_tag, + '--platform', + platform + ] + # Force binary wheel only if we have more platform fallbacks to try. + # For the last platform, we omit this flag so it can natively fall back + # to downloading a source distribution (sdist) if no matching wheel is found. + if idx < len(platforms) - 1: + attempt_cmd_args.extend(['--only-binary', ':all:']) + + _LOGGER.info('Executing command: %s', attempt_cmd_args) + try: + processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) + last_exception = None + return attempt_download_dir + except Exception as e: + _LOGGER.warning( + 'Pip download failed with platform %s, trying fallback: %s', + platform, + e) + shutil.rmtree(attempt_download_dir) + last_exception = e + + if last_exception: + raise last_exception + @staticmethod def _build_setup_package( setup_file: str,