diff --git a/benchmarks/micro/bench_isinstance_dispatch.py b/benchmarks/micro/bench_isinstance_dispatch.py new file mode 100644 index 0000000000..c6e6d414d7 --- /dev/null +++ b/benchmarks/micro/bench_isinstance_dispatch.py @@ -0,0 +1,107 @@ +# Copyright ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Micro-benchmark: isinstance dispatch order in _create_response_future. + +Measures the cost of checking BoundStatement first vs SimpleStatement first +in the isinstance chain that dispatches query types to message constructors. + +For prepared-statement workloads (the perf-critical case), BoundStatement is +the most common type. Checking it first saves one wasted isinstance call. + +Run: + python benchmarks/bench_isinstance_dispatch.py +""" + +import sys +import timeit + +from cassandra.query import SimpleStatement, BoundStatement, BatchStatement, Statement + + +class _FakeGraphStatement(Statement): + """Stand-in for GraphStatement to avoid importing DSE dependencies.""" + pass + + +def make_bound_statement(): + """Create a minimal BoundStatement-like object for benchmarking.""" + # We only need isinstance() to work; no actual prepared statement needed. + bs = object.__new__(BoundStatement) + return bs + + +def make_simple_statement(): + return SimpleStatement("SELECT * FROM t") + + +def make_batch_statement(): + return BatchStatement() + + +def bench(): + bound = make_bound_statement() + simple = make_simple_statement() + batch = make_batch_statement() + graph = _FakeGraphStatement() + + # Simulate typical workload mix: ~80% BoundStatement, ~15% SimpleStatement, + # ~4% BatchStatement, ~1% GraphStatement + queries = ([bound] * 80 + [simple] * 15 + [batch] * 4 + [graph] * 1) + + def dispatch_simple_first(): + """Original order: SimpleStatement checked first.""" + for q in queries: + if isinstance(q, SimpleStatement): + pass + elif isinstance(q, BoundStatement): + pass + elif isinstance(q, BatchStatement): + pass + elif isinstance(q, _FakeGraphStatement): + pass + + def dispatch_bound_first(): + """Optimized order: BoundStatement checked first.""" + for q in queries: + if isinstance(q, BoundStatement): + pass + elif isinstance(q, SimpleStatement): + pass + elif isinstance(q, BatchStatement): + pass + elif isinstance(q, _FakeGraphStatement): + pass + + n = 200_000 + t_simple_first = timeit.timeit(dispatch_simple_first, number=n) + t_bound_first = timeit.timeit(dispatch_bound_first, number=n) + + total_calls = n * len(queries) + print(f"=== isinstance dispatch order (100 queries x {n} iters = {total_calls:,} dispatches) ===") + print(f"SimpleStatement first: {t_simple_first:.3f}s ({t_simple_first / total_calls * 1e9:.1f} ns/dispatch)") + print(f"BoundStatement first: {t_bound_first:.3f}s ({t_bound_first / total_calls * 1e9:.1f} ns/dispatch)") + + if t_bound_first < t_simple_first: + speedup = t_simple_first / t_bound_first + saving_ns = (t_simple_first - t_bound_first) / total_calls * 1e9 + print(f"Speedup: {speedup:.2f}x ({saving_ns:.1f} ns/dispatch saved)") + else: + print(f"No improvement (ratio: {t_simple_first / t_bound_first:.2f}x)") + + +if __name__ == "__main__": + print(f"Python {sys.version}") + bench() diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 9eace8810d..fa620d1bd7 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2984,7 +2984,17 @@ def _create_response_future(self, query, parameters, trace, custom_payload, else: timestamp = None - if isinstance(query, SimpleStatement): + if isinstance(query, BoundStatement): + # Check BoundStatement first: prepared-statement execution is the + # most common hot-path case, saving one isinstance() call (~15 ns). + prepared_statement = query.prepared_statement + message = ExecuteMessage( + prepared_statement.query_id, query.values, cl, + serial_cl, fetch_size, paging_state, timestamp, + skip_meta=bool(prepared_statement.result_metadata), + continuous_paging_options=continuous_paging_options, + result_metadata_id=prepared_statement.result_metadata_id) + elif isinstance(query, SimpleStatement): query_string = query.query_string statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None if parameters: @@ -2993,14 +3003,6 @@ def _create_response_future(self, query, parameters, trace, custom_payload, query_string, cl, serial_cl, fetch_size, paging_state, timestamp, continuous_paging_options, statement_keyspace) - elif isinstance(query, BoundStatement): - prepared_statement = query.prepared_statement - message = ExecuteMessage( - prepared_statement.query_id, query.values, cl, - serial_cl, fetch_size, paging_state, timestamp, - skip_meta=bool(prepared_statement.result_metadata), - continuous_paging_options=continuous_paging_options, - result_metadata_id=prepared_statement.result_metadata_id) elif isinstance(query, BatchStatement): if self._protocol_version < 2: raise UnsupportedOperation( @@ -4413,10 +4415,9 @@ class ResponseFuture(object): session = None row_factory = None message = None - default_timeout = None + prepared_statement = None _retry_policy = None - _profile_manager = None _req_id = None _final_result = _NOT_SET @@ -4439,10 +4440,9 @@ class ResponseFuture(object): _spec_execution_plan = NoSpeculativeExecutionPlan() _continuous_paging_session = None _host = None + _continuous_paging_state = None _TABLET_ROUTING_CTYPE = None - _warned_timeout = False - def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None, retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None, speculative_execution_plan=None, continuous_paging_state=None, host=None): @@ -4454,11 +4454,14 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat self.query = query self.timeout = timeout self._retry_policy = retry_policy - self._metrics = metrics - self.prepared_statement = prepared_statement + if metrics is not None: + self._metrics = metrics + if prepared_statement is not None: + self.prepared_statement = prepared_statement self._callback_lock = Lock() self._start_time = start_time or time.time() - self._host = host + if host is not None: + self._host = host self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan self._make_query_plan() self._event = Event() @@ -4467,7 +4470,8 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat self._errbacks = [] self.attempted_hosts = [] self._start_timer() - self._continuous_paging_state = continuous_paging_state + if continuous_paging_state is not None: + self._continuous_paging_state = continuous_paging_state @property def _time_remaining(self):