Skip to content
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Python SDK now honors the `disableCounterMetrics`, `disableStringSetMetrics`, and `disableBoundedTrieMetrics` pipeline experiments to opt out of emitting the corresponding user metric kinds, matching Java SDK behavior ([#38746](https://github.com/apache/beam/issues/38746)).

## Breaking Changes

Expand Down
56 changes: 52 additions & 4 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,51 @@
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet
from apache_beam.options.pipeline_options import DebugOptions

if TYPE_CHECKING:
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metricbase import Metric
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.histogram import BucketType

__all__ = ['Metrics', 'MetricsFilter', 'Lineage']

_LOGGER = logging.getLogger(__name__)


class MetricsFlag(object):
"""Process-wide flags controlling which user metric kinds are emitted."""
counter_disabled = False
string_set_disabled = False
bounded_trie_disabled = False
_initialized = False

@classmethod
def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
if cls._initialized:
return
debug_options = options.view_as(DebugOptions)
if debug_options.lookup_experiment('disableCounterMetrics'):
cls.counter_disabled = True
_LOGGER.info('Counter metrics are disabled.')
if debug_options.lookup_experiment('disableStringSetMetrics'):
cls.string_set_disabled = True
_LOGGER.info('StringSet metrics are disabled.')
if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
cls.bounded_trie_disabled = True
_LOGGER.info('BoundedTrie metrics are disabled.')
cls._initialized = True

@classmethod
def reset(cls) -> None:
cls.counter_disabled = False
cls.string_set_disabled = False
cls.bounded_trie_disabled = False
cls._initialized = False


class Metrics(object):
"""Lets users create/access metric objects during pipeline execution."""
@staticmethod
Expand Down Expand Up @@ -204,12 +237,17 @@ class DelegatingCounter(Counter):
def __init__(
self, metric_name: MetricName, process_wide: bool = False) -> None:
super().__init__(metric_name)
self.inc = MetricUpdater( # type: ignore[method-assign]
self._updater = MetricUpdater(
cells.CounterCell,
metric_name,
default_value=1,
process_wide=process_wide)

def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled:
return
self._updater(n)
Comment on lines +246 to +249
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Access the counter_disabled class attribute directly instead of calling a classmethod getter to avoid method call overhead on the hot path.

Suggested change
def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled():
return
self._updater(n)
def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled:
return
self._updater(n)


class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
def __init__(
Expand All @@ -231,13 +269,23 @@ class DelegatingStringSet(StringSet):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.StringSetCell, metric_name)

def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled:
return
self._updater(value)

class DelegatingBoundedTrie(BoundedTrie):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)

def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled:
return
self._updater(value)
Comment on lines +274 to +288
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Access the string_set_disabled and bounded_trie_disabled class attributes directly instead of calling classmethod getters to avoid method call overhead on the hot path.

Suggested change
def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled():
return
self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled():
return
self._updater(value)
def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled:
return
self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled:
return
self._updater(value)



class MetricResults(object):
Expand Down
104 changes: 104 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.metrics.metric import MetricsFlag
from apache_beam.metrics.metricbase import MetricName
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.worker import statesampler
from apache_beam.testing.metric_result_matchers import DistributionMatcher
Expand Down Expand Up @@ -121,6 +123,108 @@ def test_get_namespace_error(self):
with self.assertRaises(ValueError):
Metrics.get_namespace(object())

def test_metrics_flag(self):
MetricsFlag.reset()
try:
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions([
'--experiments=disableCounterMetrics',
'--experiments=disableStringSetMetrics',
'--experiments=disableBoundedTrieMetrics',
])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)
finally:
MetricsFlag.reset()

def test_disabled_counter_is_noop(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
MetricsFlag.reset()
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()
Metrics.counter('ns', 'baseline').inc()
self.assertEqual(len(container.metrics), 1)
options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
Metrics.counter('ns', 'after_disable').inc()
Metrics.counter('ns', 'after_disable').inc(5)
Metrics.counter('ns', 'after_disable').dec()
self.assertEqual(len(container.metrics), 1)
finally:
sampler.stop()
MetricsFlag.reset()

def test_disabled_string_set_is_noop(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
MetricsFlag.reset()
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()
Metrics.string_set('ns', 'baseline').add('seed')
self.assertEqual(len(container.metrics), 1)
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
Metrics.string_set('ns', 'after_disable').add('value')
self.assertEqual(len(container.metrics), 1)
finally:
sampler.stop()
MetricsFlag.reset()

def test_disabled_bounded_trie_is_noop(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ..._is_noop tests aren't checking anything.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 859715d. Each test now sets up a StateSampler with a MetricsContainer, runs a baseline inc/add to confirm the container grows to 1, then enables the disable experiment and runs another inc/add and asserts the count is still 1, proving no MetricCell was created.

sampler = statesampler.StateSampler('', counters.CounterFactory())
statesampler.set_current_tracker(sampler)
state = sampler.scoped_state(
'mystep', 'myState', metrics_container=MetricsContainer('mystep'))
MetricsFlag.reset()
try:
sampler.start()
with state:
container = MetricsEnvironment.current_container()
Metrics.bounded_trie('ns', 'baseline').add(['a'])
self.assertEqual(len(container.metrics), 1)
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b'])
self.assertEqual(len(container.metrics), 1)
finally:
sampler.stop()
MetricsFlag.reset()

def test_counter_empty_name(self):
with self.assertRaises(ValueError):
Metrics.counter("namespace", "")
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
from apache_beam.metrics.metric import MetricsFlag
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -192,6 +193,7 @@ def __init__(
self._options = PipelineOptions([])

FileSystems.set_options(self._options)
MetricsFlag.set_default_pipeline_options(self._options)

if runner is None:
runner = self._options.view_as(StandardOptions).runner
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from apache_beam.internal import pickler
from apache_beam.io import filesystems
from apache_beam.metrics import metric
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False):
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
filesystems.FileSystems.set_options(sdk_pipeline_options)
metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)

Expand Down
Loading