From 47b019ce8178fd0be533e019208f1b4a87a78b5a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 25 May 2026 20:36:09 -0400 Subject: [PATCH 1/7] Add fallback platform for stager. --- .../apache_beam/runners/portability/stager.py | 67 ++++++++++++++----- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 136c320da009..729ed00899eb 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -88,6 +88,14 @@ # One of the choices for user to use for requirements cache during staging SKIP_REQUIREMENTS_CACHE = 'skip' +# Ordered list of manylinux tags from newest (strictest) to oldest (most compatible) +# used for cross-platform binary dependency downloads. +_MANYLINUX_PLATFORMS = [ + 'manylinux_2_28_x86_64', + 'manylinux2014_x86_64', # equivalent to manylinux_2_17 + 'manylinux2010_x86_64', # equivalent to manylinux_2_12 +] + _LOGGER = logging.getLogger(__name__) @@ -762,15 +770,13 @@ def _populate_requirements_cache( # Download to a temporary directory first, then copy to cache. # This allows us to track exactly which packages are needed for this # requirements file. - download_dir = tempfile.mkdtemp(dir=temp_directory) + download_dir = None cmd_args = [ Stager._get_python_executable(), '-m', 'pip', 'download', - '--dest', - download_dir, '--find-links', cache_dir, '-r', @@ -781,23 +787,54 @@ def _populate_requirements_cache( ] if populate_cache_with_sdists: - cmd_args.extend(['--no-binary', ':all:']) + download_dir = tempfile.mkdtemp(dir=temp_directory) + cmd_args.extend(['--dest', download_dir, '--no-binary', ':all:']) + _LOGGER.info('Executing command: %s', cmd_args) + processes.check_output(cmd_args, stderr=processes.STDOUT) else: language_implementation_tag = 'cp' abi_suffix = 'm' if sys.version_info < (3, 8) else '' abi_tag = 'cp%d%d%s' % ( sys.version_info[0], sys.version_info[1], abi_suffix) - platform_tag = Stager._get_platform_for_default_sdk_container() - cmd_args.extend([ - '--implementation', - language_implementation_tag, - '--abi', - abi_tag, - '--platform', - platform_tag - ]) - _LOGGER.info('Executing command: %s', cmd_args) - processes.check_output(cmd_args, stderr=processes.STDOUT) + preferred_platform = Stager._get_platform_for_default_sdk_container() + + # Fallback platform tags in case the preferred modern tag is too strict + # for some dependencies on PyPI. + try: + start_idx = _MANYLINUX_PLATFORMS.index(preferred_platform) + platforms = _MANYLINUX_PLATFORMS[start_idx:] + except ValueError: + platforms = [preferred_platform] + + last_exception = None + for platform in platforms: + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + attempt_cmd_args = cmd_args + [ + '--dest', + attempt_download_dir, + '--implementation', + language_implementation_tag, + '--abi', + abi_tag, + '--platform', + platform + ] + _LOGGER.info('Executing command: %s', attempt_cmd_args) + try: + processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) + download_dir = attempt_download_dir + last_exception = None + break + except Exception as e: + _LOGGER.warning( + 'Pip download failed with platform %s, trying fallback: %s', + platform, + e) + shutil.rmtree(attempt_download_dir) + last_exception = e + + if last_exception: + raise last_exception # Get list of downloaded packages and copy them to the cache downloaded_packages = set() From 9e422d196965bcd658706fdbf4ac6f32751dc084 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 10:20:38 -0400 Subject: [PATCH 2/7] Force wheel if we have other platform to try --- sdks/python/apache_beam/runners/portability/stager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 729ed00899eb..b587d6a8a1d1 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -807,7 +807,7 @@ def _populate_requirements_cache( platforms = [preferred_platform] last_exception = None - for platform in platforms: + for idx, platform in enumerate(platforms): attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) attempt_cmd_args = cmd_args + [ '--dest', @@ -819,6 +819,10 @@ def _populate_requirements_cache( '--platform', platform ] + # Force binary wheel only if we have more platform fallbacks to try + if idx < len(platforms) - 1: + attempt_cmd_args.extend(['--only-binary', ':all:']) + _LOGGER.info('Executing command: %s', attempt_cmd_args) try: processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) From bb20201d69601aaa2d6cef92cd4b48e6352e5109 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 10:45:25 -0400 Subject: [PATCH 3/7] More comments. --- sdks/python/apache_beam/runners/portability/stager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index b587d6a8a1d1..1cbde2d72f80 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -819,7 +819,9 @@ def _populate_requirements_cache( '--platform', platform ] - # Force binary wheel only if we have more platform fallbacks to try + # Force binary wheel only if we have more platform fallbacks to try. + # For the last platform, we omit this flag so it can natively fall back + # to downloading a source distribution (sdist) if no matching wheel is found. if idx < len(platforms) - 1: attempt_cmd_args.extend(['--only-binary', ':all:']) From 09c1dfd49ed7bd64b2142da9171c3a1eaa4bb690 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 11:22:28 -0400 Subject: [PATCH 4/7] Trigger PostCommit Python --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 91226bd08ee3..931ae76b69d5 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "38069", - "modification": 41 + "modification": 42 } From 5ecec1bfc966d56edda46cf45268c091c6590504 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 15:40:18 -0400 Subject: [PATCH 5/7] Refactor code. --- .../apache_beam/runners/portability/stager.py | 48 +++++-------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 1cbde2d72f80..846e7fc0b20e 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -89,11 +89,13 @@ SKIP_REQUIREMENTS_CACHE = 'skip' # Ordered list of manylinux tags from newest (strictest) to oldest (most compatible) -# used for cross-platform binary dependency downloads. +# paired with the minimum pip version required to support the tag. +# See https://github.com/pypa/manylinux. _MANYLINUX_PLATFORMS = [ - 'manylinux_2_28_x86_64', - 'manylinux2014_x86_64', # equivalent to manylinux_2_17 - 'manylinux2010_x86_64', # equivalent to manylinux_2_12 + ('manylinux_2_28_x86_64', '20.3'), + ('manylinux2014_x86_64', '19.3'), # equivalent to manylinux_2_17 + ('manylinux2010_x86_64', + '0.0'), # equivalent to manylinux_2_12, the fallback if pip is too old ] _LOGGER = logging.getLogger(__name__) @@ -725,30 +727,6 @@ def _extract_local_packages(requirements_file): else: return [], requirements_file - @staticmethod - def _get_platform_for_default_sdk_container(): - """ - Get the platform for apache beam SDK container based on Pip version. - - Note: pip is still expected to download compatible wheel of a package - with platform tag manylinux1 if the package on PyPI doesn't - have (manylinux2014) or (manylinux2010) wheels. - Reference: https://www.python.org/dev/peps/pep-0599/#id21 - """ - - # TODO(anandinguva): When https://github.com/pypa/pip/issues/10760 is - # addressed, download wheel based on glibc version in Beam's Python - # Base image - pip_version = distribution('pip').version - # See more information about manylinux at - # https://github.com/pypa/manylinux - if version.parse(pip_version) >= version.parse('20.3'): - return 'manylinux_2_28_x86_64' - elif version.parse(pip_version) >= version.parse('19.3'): - return 'manylinux2014_x86_64' - else: - return 'manylinux2010_x86_64' - @staticmethod @retry.with_exponential_backoff( num_retries=4, retry_filter=retry_on_non_zero_exit) @@ -796,15 +774,11 @@ def _populate_requirements_cache( abi_suffix = 'm' if sys.version_info < (3, 8) else '' abi_tag = 'cp%d%d%s' % ( sys.version_info[0], sys.version_info[1], abi_suffix) - preferred_platform = Stager._get_platform_for_default_sdk_container() - - # Fallback platform tags in case the preferred modern tag is too strict - # for some dependencies on PyPI. - try: - start_idx = _MANYLINUX_PLATFORMS.index(preferred_platform) - platforms = _MANYLINUX_PLATFORMS[start_idx:] - except ValueError: - platforms = [preferred_platform] + pip_version = distribution('pip').version + platforms = [ + platform for platform, min_pip_version in _MANYLINUX_PLATFORMS + if version.parse(pip_version) >= version.parse(min_pip_version) + ] last_exception = None for idx, platform in enumerate(platforms): From a435fbd81daef2d2c598e7a22887f07b1d9d3e52 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 15:49:16 -0400 Subject: [PATCH 6/7] Refactor more. --- .../apache_beam/runners/portability/stager.py | 93 ++++++++++--------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 846e7fc0b20e..ade7cd81e113 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -770,51 +770,7 @@ def _populate_requirements_cache( _LOGGER.info('Executing command: %s', cmd_args) processes.check_output(cmd_args, stderr=processes.STDOUT) else: - language_implementation_tag = 'cp' - abi_suffix = 'm' if sys.version_info < (3, 8) else '' - abi_tag = 'cp%d%d%s' % ( - sys.version_info[0], sys.version_info[1], abi_suffix) - pip_version = distribution('pip').version - platforms = [ - platform for platform, min_pip_version in _MANYLINUX_PLATFORMS - if version.parse(pip_version) >= version.parse(min_pip_version) - ] - - last_exception = None - for idx, platform in enumerate(platforms): - attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) - attempt_cmd_args = cmd_args + [ - '--dest', - attempt_download_dir, - '--implementation', - language_implementation_tag, - '--abi', - abi_tag, - '--platform', - platform - ] - # Force binary wheel only if we have more platform fallbacks to try. - # For the last platform, we omit this flag so it can natively fall back - # to downloading a source distribution (sdist) if no matching wheel is found. - if idx < len(platforms) - 1: - attempt_cmd_args.extend(['--only-binary', ':all:']) - - _LOGGER.info('Executing command: %s', attempt_cmd_args) - try: - processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) - download_dir = attempt_download_dir - last_exception = None - break - except Exception as e: - _LOGGER.warning( - 'Pip download failed with platform %s, trying fallback: %s', - platform, - e) - shutil.rmtree(attempt_download_dir) - last_exception = e - - if last_exception: - raise last_exception + download_dir = Stager._download_pypi_packages(cmd_args, temp_directory) # Get list of downloaded packages and copy them to the cache downloaded_packages = set() @@ -828,6 +784,53 @@ def _populate_requirements_cache( return downloaded_packages + @staticmethod + def _download_pypi_packages(cmd_args, temp_directory): + language_implementation_tag = 'cp' + abi_suffix = 'm' if sys.version_info < (3, 8) else '' + abi_tag = 'cp%d%d%s' % ( + sys.version_info[0], sys.version_info[1], abi_suffix) + pip_version = distribution('pip').version + platforms = [ + platform for platform, min_pip_version in _MANYLINUX_PLATFORMS + if version.parse(pip_version) >= version.parse(min_pip_version) + ] + + last_exception = None + for idx, platform in enumerate(platforms): + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + attempt_cmd_args = cmd_args + [ + '--dest', + attempt_download_dir, + '--implementation', + language_implementation_tag, + '--abi', + abi_tag, + '--platform', + platform + ] + # Force binary wheel only if we have more platform fallbacks to try. + # For the last platform, we omit this flag so it can natively fall back + # to downloading a source distribution (sdist) if no matching wheel is found. + if idx < len(platforms) - 1: + attempt_cmd_args.extend(['--only-binary', ':all:']) + + _LOGGER.info('Executing command: %s', attempt_cmd_args) + try: + processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) + last_exception = None + return attempt_download_dir + except Exception as e: + _LOGGER.warning( + 'Pip download failed with platform %s, trying fallback: %s', + platform, + e) + shutil.rmtree(attempt_download_dir) + last_exception = e + + if last_exception: + raise last_exception + @staticmethod def _build_setup_package( setup_file: str, From b132da2efec31c5da45163715e29f39fbba6e210 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 27 May 2026 15:41:59 -0400 Subject: [PATCH 7/7] pip download dep in requirement one by one. --- .../apache_beam/runners/portability/stager.py | 63 ++++++++++++------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index ade7cd81e113..542a44edd702 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -748,29 +748,46 @@ def _populate_requirements_cache( # Download to a temporary directory first, then copy to cache. # This allows us to track exactly which packages are needed for this # requirements file. - download_dir = None - - cmd_args = [ - Stager._get_python_executable(), - '-m', - 'pip', - 'download', - '--find-links', - cache_dir, - '-r', - tmp_requirements_filepath, - '--exists-action', - 'i', - '--no-deps' - ] - - if populate_cache_with_sdists: - download_dir = tempfile.mkdtemp(dir=temp_directory) - cmd_args.extend(['--dest', download_dir, '--no-binary', ':all:']) - _LOGGER.info('Executing command: %s', cmd_args) - processes.check_output(cmd_args, stderr=processes.STDOUT) - else: - download_dir = Stager._download_pypi_packages(cmd_args, temp_directory) + download_dir = tempfile.mkdtemp(dir=temp_directory) + + # Read packages from the requirements file + requirements = [] + with open(tmp_requirements_filepath, 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + requirements.append(line) + + for req in requirements: + cmd_args = [ + Stager._get_python_executable(), + '-m', + 'pip', + 'download', + '--find-links', + cache_dir, + req, + '--exists-action', + 'i', + '--no-deps' + ] + + if populate_cache_with_sdists: + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + cmd_args.extend( + ['--dest', attempt_download_dir, '--no-binary', ':all:']) + _LOGGER.info('Executing command: %s', cmd_args) + processes.check_output(cmd_args, stderr=processes.STDOUT) + else: + attempt_download_dir = Stager._download_pypi_packages( + cmd_args, temp_directory) + + # Move downloaded packages to our common download directory + for pkg_file in os.listdir(attempt_download_dir): + src_path = os.path.join(attempt_download_dir, pkg_file) + dest_path = os.path.join(download_dir, pkg_file) + if not os.path.exists(dest_path): + shutil.move(src_path, dest_path) # Get list of downloaded packages and copy them to the cache downloaded_packages = set()