diff --git a/configs/README.md b/configs/README.md index 46c17f4c..64970fe8 100644 --- a/configs/README.md +++ b/configs/README.md @@ -77,11 +77,13 @@ benchmarks: - name: "Benchmark Name" description: "What this benchmark tests" variables: - - type: payload|node_type|num_blocks|gas_limit + - type: payload|node_type|num_blocks|gas_limit|target_gps|consensus_timing value: single-value values: [array, of, values] # for matrix testing ``` +`consensus_timing` can be `prevent-late-fcu` or `base-consensus`. Snapshot load-test runs default to `base-consensus`; other benchmark runs default to `prevent-late-fcu`. + ## 🎯 Choosing the Right Configuration - **Development/Testing**: Use `examples/` configurations for focused testing diff --git a/configs/examples/load-test-config.yml b/configs/examples/load-test-config.yml new file mode 100644 index 00000000..77074cdd --- /dev/null +++ b/configs/examples/load-test-config.yml @@ -0,0 +1,20 @@ +transaction_submission_rpcs: + - "http://localhost:8545" +query_rpc: "http://localhost:8545" +txpool_nodes: [] +flashblocks_ws: "ws://localhost:7111" + +sender_count: 10 +target_gps: 500000000 +duration: "60s" +funding_amount: "10000000000000000000" + +transactions: + - weight: 70 + type: transfer + - weight: 20 + type: calldata + max_size: 256 + - weight: 10 + type: precompile + target: sha256 diff --git a/configs/examples/load-test.yml b/configs/examples/load-test.yml index 6743ca60..143b9774 100644 --- a/configs/examples/load-test.yml +++ b/configs/examples/load-test.yml @@ -4,16 +4,8 @@ payloads: - name: Load Test type: load-test id: load-test - sender_count: 10 - transactions: - - weight: 70 - type: transfer - - weight: 20 - type: calldata - max_size: 256 - - weight: 10 - type: precompile - target: sha256 + network: devnet + config_file: ./load-test-config.yml benchmarks: - variables: @@ -27,3 +19,5 @@ benchmarks: value: 10 - type: gas_limit value: 1000000000 + - type: target_gps + value: 500000000 diff --git a/report/src/components/ChartGrid.tsx b/report/src/components/ChartGrid.tsx index bd525329..fe4596e5 100644 --- a/report/src/components/ChartGrid.tsx +++ b/report/src/components/ChartGrid.tsx @@ -20,6 +20,22 @@ function resolveMetricKey( return key; } } + + const metricKeys = chartData.flatMap((d) => Object.keys(d.ExecutionMetrics)); + for (const key of keys) { + const quantileSuffix = key.match(/(_quantile_\d+(?:_\d+)?)$/)?.[1] ?? ""; + const metricPrefix = quantileSuffix + ? key.slice(0, -quantileSuffix.length) + : key; + const labeledMetricKeys = metricKeys.filter( + (metricKey) => + metricKey.startsWith(`${metricPrefix}_`) && + (!quantileSuffix || metricKey.endsWith(quantileSuffix)), + ); + if (labeledMetricKeys.length === 1) { + return labeledMetricKeys[0]; + } + } return primaryKey; } diff --git a/report/src/components/ConfigurationTags.tsx b/report/src/components/ConfigurationTags.tsx index 5d3591b7..5568120b 100644 --- a/report/src/components/ConfigurationTags.tsx +++ b/report/src/components/ConfigurationTags.tsx @@ -7,6 +7,61 @@ interface ConfigurationTagsProps { className?: string; } +const CONFIG_LABELS: Record = { + BlockTimeMilliseconds: "Block Time", + ConsensusTimingMode: "Consensus Timing", + GasLimit: "Gas Limit", + NodeType: "Node Type", + TargetGPS: "Target Gas/s", + TransactionPayload: "Transaction Payload", + ValidatorNodeType: "Validator Node Type", +}; + +const CONFIG_ORDER = [ + "TargetGPS", + "GasLimit", + "BlockTimeMilliseconds", + "ConsensusTimingMode", + "NodeType", + "ValidatorNodeType", + "TransactionPayload", + "Roles", +]; + +const configLabel = (key: string): string => + CONFIG_LABELS[key] ?? camelToTitleCase(key); + +const configValue = (key: string, value: unknown): string => { + if (key === "GasLimit") { + return formatValue(Number(value), "gas"); + } + if (key === "TargetGPS") { + return formatValue(Number(value), "gas/s"); + } + if (key === "BlockTimeMilliseconds") { + return formatValue(Number(value), "ms"); + } + return String(formatLabel(`${value}`)); +}; + +const configEntries = (testConfig: Record) => + Object.entries(testConfig || {}) + .filter(([key, value]) => key !== "BenchmarkRun" && value !== "") + .sort(([a], [b]) => { + const aIndex = CONFIG_ORDER.indexOf(a); + const bIndex = CONFIG_ORDER.indexOf(b); + if (aIndex === -1 && bIndex === -1) { + return a.localeCompare(b); + } + if (aIndex === -1) { + return 1; + } + if (bIndex === -1) { + return -1; + } + return aIndex - bIndex; + }); + const ConfigurationTags = ({ testConfig, clientVersion, @@ -25,28 +80,18 @@ const ConfigurationTags = ({ {clientVersion} )} - {Object.entries(testConfig || {}) - .filter(([k]) => k !== "BenchmarkRun" && k !== "GasLimit") - .map(([key, value]) => ( - - - {camelToTitleCase(key)}: - - {key === "GasLimit" ? ( - - {formatValue(Number(value), "gas")} - - ) : ( - - {String(formatLabel(`${value}`))} - - )} + {configEntries(testConfig).map(([key, value]) => ( + + + {configLabel(key)}: - ))} + {configValue(key, value)} + + ))} ); }; diff --git a/report/src/components/RunList.tsx b/report/src/components/RunList.tsx index 0265129b..6c485ba4 100644 --- a/report/src/components/RunList.tsx +++ b/report/src/components/RunList.tsx @@ -291,9 +291,13 @@ const RunList = ({ const statusCounts = groupBy(section.runs, "status"); const sortedRuns = isExpanded ? sortRuns(section.runs) : section.runs; const gasLimit = Number(section.runs?.[0]?.testConfig?.GasLimit); + const targetGasPerSecond = Number( + section.runs?.[0]?.testConfig?.TargetGPS, + ); const blockTimeMilliseconds = Number(section.runs?.[0]?.testConfig?.BlockTimeMilliseconds) || 2000; - const gasPerSecond = gasLimit / (blockTimeMilliseconds / 1000); + const gasPerSecond = + targetGasPerSecond || gasLimit / (blockTimeMilliseconds / 1000); return (
diff --git a/report/src/metricDefinitions.ts b/report/src/metricDefinitions.ts index 58085ebd..52a36b2b 100644 --- a/report/src/metricDefinitions.ts +++ b/report/src/metricDefinitions.ts @@ -192,6 +192,29 @@ export const CHART_CONFIG = { unit: "s", aliases: ["reth_op_rbuilder_state_root_calculation_duration"], }, + reth_base_builder_state_root_calculation_duration_quantile_0_5: { + type: "line", + title: "Builder State Root Calculation Duration p50", + description: "p50 time taken to calculate the state root", + unit: "s", + }, + reth_base_builder_state_root_calculation_duration_quantile_0_9: { + type: "line", + title: "Builder State Root Calculation Duration p90", + description: "p90 time taken to calculate the state root", + unit: "s", + }, + reth_base_builder_state_root_calculation_duration_quantile_0_99: { + type: "line", + title: "Builder State Root Calculation Duration p99", + description: "p99 time taken to calculate the state root", + unit: "s", + }, + reth_base_builder_state_root_time_per_gas_ratio_quantile_0_9: { + type: "line", + title: "Builder State Root Time per Gas p90", + description: "p90 state-root calculation time divided by gas processed", + }, reth_base_builder_sequencer_tx_duration_avg: { type: "line", title: "Builder Sequencer Tx Duration", @@ -224,6 +247,135 @@ export const CHART_CONFIG = { description: "Average gas headroom percentage across flashblocks", unit: "count", }, + reth_storage_providers_database_save_blocks_total_quantile_0_9: { + type: "line", + title: "Save Blocks Total p90", + description: "p90 total database save-blocks duration", + unit: "s", + }, + reth_storage_providers_database_save_blocks_block_count_last: { + type: "line", + title: "Save Blocks Block Count", + description: + "Number of blocks included in the most recent save-blocks operation", + unit: "blocks", + }, + reth_storage_providers_database_save_blocks_commit_sf_quantile_0_9: { + type: "line", + title: "Save Blocks Static File Commit p90", + description: "p90 static-file commit duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_commit_mdbx_quantile_0_9: { + type: "line", + title: "Save Blocks MDBX Commit p90", + description: "p90 MDBX commit duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_write_state_quantile_0_9: { + type: "line", + title: "Save Blocks Write State p90", + description: "p90 state write duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_write_hashed_state_quantile_0_9: { + type: "line", + title: "Save Blocks Write Hashed State p90", + description: "p90 hashed-state write duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_write_trie_updates_quantile_0_9: { + type: "line", + title: "Save Blocks Write Trie Updates p90", + description: "p90 trie-update write duration during save-blocks", + unit: "s", + }, + reth_storage_providers_database_save_blocks_sf_quantile_0_9: { + type: "line", + title: "Save Blocks Static Files p90", + description: "p90 static-file save-blocks duration", + unit: "s", + }, + reth_trie_leaves_added_quantile_0_9: { + type: "line", + title: "Trie Leaves Added p90", + description: "p90 trie leaves added", + unit: "count", + }, + reth_trie_branches_added_quantile_0_9: { + type: "line", + title: "Trie Branches Added p90", + description: "p90 trie branches added", + unit: "count", + }, + reth_tree_root_sparse_trie_total_duration_histogram_quantile_0_9: { + type: "line", + title: "Sparse Trie Total Duration p90", + description: "p90 sparse-trie total duration", + unit: "s", + aliases: ["reth_tree_root_sparse_trie_total_duration_histogram"], + }, + reth_tree_root_sparse_trie_final_update_duration_histogram_quantile_0_9: { + type: "line", + title: "Sparse Trie Final Update Duration p90", + description: "p90 sparse-trie final update duration", + unit: "s", + aliases: ["reth_tree_root_sparse_trie_final_update_duration_histogram"], + }, + reth_parallel_sparse_trie_subtrie_hash_update_latency_quantile_0_9: { + type: "line", + title: "Sparse Trie Subtrie Hash Update p90", + description: "p90 subtrie hash update latency", + unit: "s", + }, + reth_parallel_sparse_trie_subtrie_upper_hash_latency_quantile_0_9: { + type: "line", + title: "Sparse Trie Subtrie Upper Hash p90", + description: "p90 subtrie upper-hash latency", + unit: "s", + }, + reth_trie_proof_task_storage_worker_idle_time_seconds_quantile_0_9: { + type: "line", + title: "Trie Proof Storage Worker Idle p90", + description: "p90 trie-proof storage worker idle time", + unit: "s", + }, + reth_trie_proof_task_account_worker_idle_time_seconds_quantile_0_9: { + type: "line", + title: "Trie Proof Account Worker Idle p90", + description: "p90 trie-proof account worker idle time", + unit: "s", + }, + reth_trie_proof_task_blinded_storage_nodes_quantile_0_9: { + type: "line", + title: "Trie Proof Blinded Storage Nodes p90", + description: "p90 blinded storage nodes handled by trie proof tasks", + unit: "count", + }, + reth_trie_proof_task_blinded_account_nodes_quantile_0_9: { + type: "line", + title: "Trie Proof Blinded Account Nodes p90", + description: "p90 blinded account nodes handled by trie proof tasks", + unit: "count", + }, + reth_trie_cursor_overall_duration_quantile_0_9: { + type: "line", + title: "Trie Cursor Overall Duration p90", + description: "p90 trie cursor overall duration", + unit: "s", + }, + reth_trie_hashed_cursor_overall_duration_quantile_0_9: { + type: "line", + title: "Trie Hashed Cursor Overall Duration p90", + description: "p90 hashed trie cursor overall duration", + unit: "s", + }, + reth_db_freelist: { + type: "line", + title: "MDBX Freelist", + description: "MDBX freelist size", + unit: "count", + }, reth_sync_state_provider_total_storage_fetch_latency_avg: { type: "line", title: "Validator Storage Load Latency", @@ -268,6 +420,31 @@ const CHART_CONFIG_ORDER: (keyof typeof CHART_CONFIG)[] = [ "chain/storage/commits.50-percentile", "chain/snapshot/commits.50-percentile", "chain/triedb/commits.50-percentile", + "reth_base_builder_state_root_calculation_duration_quantile_0_5", + "reth_base_builder_state_root_calculation_duration_quantile_0_9", + "reth_base_builder_state_root_calculation_duration_quantile_0_99", + "reth_base_builder_state_root_time_per_gas_ratio_quantile_0_9", + "reth_storage_providers_database_save_blocks_total_quantile_0_9", + "reth_storage_providers_database_save_blocks_block_count_last", + "reth_storage_providers_database_save_blocks_commit_sf_quantile_0_9", + "reth_storage_providers_database_save_blocks_commit_mdbx_quantile_0_9", + "reth_storage_providers_database_save_blocks_write_state_quantile_0_9", + "reth_storage_providers_database_save_blocks_write_hashed_state_quantile_0_9", + "reth_storage_providers_database_save_blocks_write_trie_updates_quantile_0_9", + "reth_storage_providers_database_save_blocks_sf_quantile_0_9", + "reth_trie_leaves_added_quantile_0_9", + "reth_trie_branches_added_quantile_0_9", + "reth_tree_root_sparse_trie_total_duration_histogram_quantile_0_9", + "reth_tree_root_sparse_trie_final_update_duration_histogram_quantile_0_9", + "reth_parallel_sparse_trie_subtrie_hash_update_latency_quantile_0_9", + "reth_parallel_sparse_trie_subtrie_upper_hash_latency_quantile_0_9", + "reth_trie_proof_task_storage_worker_idle_time_seconds_quantile_0_9", + "reth_trie_proof_task_account_worker_idle_time_seconds_quantile_0_9", + "reth_trie_proof_task_blinded_storage_nodes_quantile_0_9", + "reth_trie_proof_task_blinded_account_nodes_quantile_0_9", + "reth_trie_cursor_overall_duration_quantile_0_9", + "reth_trie_hashed_cursor_overall_duration_quantile_0_9", + "reth_db_freelist", ]; export const SORTED_CHART_CONFIG: [string, ChartConfig][] = Object.entries( diff --git a/report/src/pages/BenchmarkLoadTestDetail.tsx b/report/src/pages/BenchmarkLoadTestDetail.tsx new file mode 100644 index 00000000..bfbba762 --- /dev/null +++ b/report/src/pages/BenchmarkLoadTestDetail.tsx @@ -0,0 +1,50 @@ +import { useParams } from "react-router-dom"; +import Navbar from "../components/Navbar"; +import { useBenchmarkLoadTestResult } from "../utils/useDataSeries"; +import { LoadTestReportContent } from "./LoadTestDetail"; + +const BenchmarkLoadTestDetail = () => { + const { outputDir } = useParams(); + const { + data: result, + isLoading, + error, + } = useBenchmarkLoadTestResult(outputDir); + + return ( +
+ +
+ {isLoading && ( +
+ Loading benchmark load test… +
+ )} + + {error && ( +
+ Failed to load benchmark load test result: {String(error)} +
+ )} + + {result && ( + + Output dir: {outputDir} + + } + backLink={{ + to: "/latest", + label: "View benchmark runs →", + }} + /> + )} +
+
+ ); +}; + +export default BenchmarkLoadTestDetail; diff --git a/report/src/pages/LoadTestDetail.tsx b/report/src/pages/LoadTestDetail.tsx index fd5050c8..d902355f 100644 --- a/report/src/pages/LoadTestDetail.tsx +++ b/report/src/pages/LoadTestDetail.tsx @@ -1,5 +1,5 @@ import { Link, useParams } from "react-router-dom"; -import { useMemo } from "react"; +import { type ReactNode, useMemo } from "react"; import Navbar from "../components/Navbar"; import StatCard, { Stat, StatGrid } from "../components/StatCard"; import PercentileBarChart, { @@ -149,50 +149,127 @@ const SummarySection = ({ result }: { result: LoadTestResult }) => { ); }; -const LoadTestDetail = () => { - const { network, timestamp } = useParams(); - const { - data: result, - isLoading, - error, - } = useLoadTestResult(network, timestamp); +interface LoadTestReportContentProps { + result: LoadTestResult; + title: string; + subtitle: ReactNode; + backLink?: { + to: string; + label: string; + }; +} +export const LoadTestReportContent = ({ + result, + title, + subtitle, + backLink, +}: LoadTestReportContentProps) => { const blockLatencyRows = useMemo( - () => (result ? buildLatencyRows(result.block_latency) : []), + () => buildLatencyRows(result.block_latency), [result], ); const flashblocksLatencyRows = useMemo( - () => (result ? buildLatencyRows(result.flashblocks_latency) : []), + () => buildLatencyRows(result.flashblocks_latency), [result], ); return ( -
- -
-
-
+ <> +
+
+ {backLink && ( - View all runs → + {backLink.label} -

- {timestamp ? formatLoadTestTimestamp(timestamp) : "Load test"} -

-

- Network: {network} - {timestamp && ( - <> - {" · "} - {timestamp} - - )} -

-
-
+ )} +

+ {title} +

+

{subtitle}

+
+
+ + + + {result.throughput_timeseries && + result.throughput_timeseries.length > 1 && ( + + + + )} + + {result.config && } + + + + + + + + + + + + + {(() => { + const reverted = result.throughput.total_reverted; + const reasons: [string, number][] = [ + ...(reverted > 0 + ? [["reverted", reverted] as [string, number]] + : []), + ...result.top_failure_reasons, + ]; + if (reasons.length === 0) { + return ( +
+ No failures recorded. +
+ ); + } + return ( +
    + {reasons.map(([reason, count]) => ( +
  • + {reason} + {count.toLocaleString()} +
  • + ))} +
+ ); + })()} +
+ + ); +}; +const LoadTestDetail = () => { + const { network, timestamp } = useParams(); + const { + data: result, + isLoading, + error, + } = useLoadTestResult(network, timestamp); + + return ( +
+ +
{isLoading && (
Loading load test…
)} @@ -204,74 +281,27 @@ const LoadTestDetail = () => { )} {result && ( - <> - - - {result.throughput_timeseries && - result.throughput_timeseries.length > 1 && ( - - - - )} - - {result.config && } - - - - - - - - - - - - - {(() => { - const reverted = result.throughput.total_reverted; - const reasons: [string, number][] = [ - ...(reverted > 0 - ? [["reverted", reverted] as [string, number]] - : []), - ...result.top_failure_reasons, - ]; - if (reasons.length === 0) { - return ( -
- No failures recorded. -
- ); - } - return ( -
    - {reasons.map(([reason, count]) => ( -
  • - {reason} - - {count.toLocaleString()} - -
  • - ))} -
- ); - })()} -
- + + Network: {network} + {timestamp && ( + <> + {" · "} + + {timestamp} + + + )} + + } + backLink={{ + to: `/load-tests/${network ?? "sepolia"}/all`, + label: "View all runs →", + }} + /> )}
diff --git a/report/src/pages/RunIndex.tsx b/report/src/pages/RunIndex.tsx index 30caa7e2..f20d03e8 100644 --- a/report/src/pages/RunIndex.tsx +++ b/report/src/pages/RunIndex.tsx @@ -51,7 +51,12 @@ const RunIndexInner = ({ benchmarkRuns }: { benchmarkRuns: BenchmarkRuns }) => { } }); - const groups = groupBy(matchedRuns, "testConfig.GasLimit"); + const groups = groupBy(matchedRuns, (run) => { + if (run.testConfig.TargetGPS) { + return `target-gps-${run.testConfig.TargetGPS}`; + } + return `gas-limit-${run.testConfig.GasLimit}`; + }); // Build sections array with diffKeyStart const sections: { diff --git a/runner/benchmark/benchmark.go b/runner/benchmark/benchmark.go index f1614af3..48de407b 100644 --- a/runner/benchmark/benchmark.go +++ b/runner/benchmark/benchmark.go @@ -70,6 +70,15 @@ func NewParamsFromValues(assignments map[string]interface{}) (*types.RunParams, } else { return nil, fmt.Errorf("invalid gas limit %s", v) } + case "consensus_timing": + if vStr, ok := v.(string); ok { + if vStr != "" && vStr != types.ConsensusTimingModePreventLateFCU && vStr != types.ConsensusTimingModeBaseConsensus { + return nil, fmt.Errorf("invalid consensus timing %s", v) + } + params.ConsensusTimingMode = vStr + } else { + return nil, fmt.Errorf("invalid consensus timing %s", v) + } case "env": if vStr, ok := v.(string); ok { entries := strings.Split(vStr, ";") diff --git a/runner/benchmark/matrix.go b/runner/benchmark/matrix.go index 180f78f1..46832457 100644 --- a/runner/benchmark/matrix.go +++ b/runner/benchmark/matrix.go @@ -3,6 +3,8 @@ package benchmark import ( "fmt" "time" + + "github.com/base/base-bench/runner/network/types" ) type ThresholdConfig struct { @@ -134,6 +136,7 @@ func ResolveTestRunsFromMatrix(c TestDefinition, testFileName string, config *Be if c.Tags != nil { params.Tags = *c.Tags } + params.ConsensusTimingMode = consensusTimingMode(params, c, config) testParams[i] = TestRun{ ID: id, @@ -166,3 +169,25 @@ func ResolveTestRunsFromMatrix(c TestDefinition, testFileName string, config *Be return testParams, nil } + +func consensusTimingMode(params *types.RunParams, definition TestDefinition, config *BenchmarkConfig) string { + if params.ConsensusTimingMode != "" { + return params.ConsensusTimingMode + } + if isSnapshotLoadTest(params.PayloadID, definition, config) { + return types.ConsensusTimingModeBaseConsensus + } + return types.ConsensusTimingModePreventLateFCU +} + +func isSnapshotLoadTest(payloadID string, definition TestDefinition, config *BenchmarkConfig) bool { + if definition.Snapshot == nil || definition.Snapshot.Command == "" { + return false + } + for _, transactionPayload := range config.TransactionPayloads { + if transactionPayload.ID == payloadID && transactionPayload.Type == "load-test" { + return true + } + } + return false +} diff --git a/runner/benchmark/matrix_test.go b/runner/benchmark/matrix_test.go index ec3b05e4..35d592e3 100644 --- a/runner/benchmark/matrix_test.go +++ b/runner/benchmark/matrix_test.go @@ -6,6 +6,7 @@ import ( "github.com/base/base-bench/runner/benchmark" "github.com/base/base-bench/runner/network/types" + "github.com/base/base-bench/runner/payload" "github.com/stretchr/testify/require" ) @@ -29,10 +30,11 @@ func TestResolveTestRunsFromMatrix(t *testing.T) { want: []benchmark.TestRun{ { Params: types.RunParams{ - NodeType: "geth", - PayloadID: "simple", - GasLimit: benchmark.DefaultParams.GasLimit, - BlockTime: 1 * time.Second, + NodeType: "geth", + PayloadID: "simple", + GasLimit: benchmark.DefaultParams.GasLimit, + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, }, @@ -55,34 +57,38 @@ func TestResolveTestRunsFromMatrix(t *testing.T) { want: []benchmark.TestRun{ { Params: types.RunParams{ - NodeType: "geth", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "simple", - BlockTime: 1 * time.Second, + NodeType: "geth", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "simple", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, { Params: types.RunParams{ - NodeType: "erigon", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "simple", - BlockTime: 1 * time.Second, + NodeType: "erigon", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "simple", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, { Params: types.RunParams{ - NodeType: "geth", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "complex", - BlockTime: 1 * time.Second, + NodeType: "geth", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "complex", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, { Params: types.RunParams{ - NodeType: "erigon", - GasLimit: benchmark.DefaultParams.GasLimit, - PayloadID: "complex", - BlockTime: 1 * time.Second, + NodeType: "erigon", + GasLimit: benchmark.DefaultParams.GasLimit, + PayloadID: "complex", + BlockTime: 1 * time.Second, + ConsensusTimingMode: types.ConsensusTimingModePreventLateFCU, }, }, }, @@ -182,6 +188,7 @@ func TestNewTestPlanFromConfigRoles(t *testing.T) { func TestNewTestPlanFromConfigDefaultsToBothRoles(t *testing.T) { config := &benchmark.BenchmarkConfig{Name: "test"} + definition := benchmark.TestDefinition{ Variables: []benchmark.Param{ { @@ -300,6 +307,81 @@ func TestNewTestPlanFromConfigAllowsSequencerThresholdsWithoutValidator(t *testi require.False(t, plan.Mode.RunValidator) } +func TestResolveTestRunsFromMatrixDefaultsSnapshotLoadTestsToBaseConsensusTiming(t *testing.T) { + config := &benchmark.BenchmarkConfig{ + Name: "snapshot load test", + TransactionPayloads: []payload.Definition{ + { + ID: "mainnet-snapshot-load-test", + Type: "load-test", + }, + }, + } + definition := benchmark.TestDefinition{ + Snapshot: &benchmark.SnapshotDefinition{ + Command: "./setup-initial-snapshot.sh --network mainnet", + }, + Variables: []benchmark.Param{ + { + ParamType: "payload", + Value: "mainnet-snapshot-load-test", + }, + }, + } + + runs, err := benchmark.ResolveTestRunsFromMatrix(definition, "snapshot-load-test.yml", config) + require.NoError(t, err) + require.Len(t, runs, 1) + require.Equal(t, types.ConsensusTimingModeBaseConsensus, runs[0].Params.ConsensusTimingMode) +} + +func TestResolveTestRunsFromMatrixAllowsSnapshotLoadTestsToOverrideConsensusTiming(t *testing.T) { + config := &benchmark.BenchmarkConfig{ + Name: "snapshot load test", + TransactionPayloads: []payload.Definition{ + { + ID: "mainnet-snapshot-load-test", + Type: "load-test", + }, + }, + } + definition := benchmark.TestDefinition{ + Snapshot: &benchmark.SnapshotDefinition{ + Command: "./setup-initial-snapshot.sh --network mainnet", + }, + Variables: []benchmark.Param{ + { + ParamType: "payload", + Value: "mainnet-snapshot-load-test", + }, + { + ParamType: "consensus_timing", + Value: types.ConsensusTimingModePreventLateFCU, + }, + }, + } + + runs, err := benchmark.ResolveTestRunsFromMatrix(definition, "snapshot-load-test.yml", config) + require.NoError(t, err) + require.Len(t, runs, 1) + require.Equal(t, types.ConsensusTimingModePreventLateFCU, runs[0].Params.ConsensusTimingMode) +} + +func TestResolveTestRunsFromMatrixRejectsInvalidConsensusTiming(t *testing.T) { + config := &benchmark.BenchmarkConfig{Name: "benchmark"} + definition := benchmark.TestDefinition{ + Variables: []benchmark.Param{ + { + ParamType: "consensus_timing", + Value: "aligned", + }, + }, + } + + _, err := benchmark.ResolveTestRunsFromMatrix(definition, "benchmark.yml", config) + require.ErrorContains(t, err, "invalid consensus timing") +} + func stringPtr(s string) *string { return &s } diff --git a/runner/benchmark/result_metadata.go b/runner/benchmark/result_metadata.go index 22f79e6f..f0e2c5ce 100644 --- a/runner/benchmark/result_metadata.go +++ b/runner/benchmark/result_metadata.go @@ -12,6 +12,7 @@ type RunResult struct { SequencerMetrics *types.SequencerKeyMetrics `json:"sequencerMetrics,omitempty"` ValidatorMetrics *types.ValidatorKeyMetrics `json:"validatorMetrics,omitempty"` ClientVersion string `json:"clientVersion,omitempty"` + Artifacts map[string]string `json:"artifacts,omitempty"` } // MachineInfo contains information about the machine running the benchmark @@ -51,7 +52,11 @@ func (runs *RunGroup) AddResult(testIdx int, runResult RunResult) { } const ( - BenchmarkRunTag = "BenchmarkRun" + BenchmarkRunTag = "BenchmarkRun" + LoadTestResultArtifactKey = "loadTestResult" + LoadTestResultFileName = "load-test-result.json" + LoadTestResultsDir = "load-tests" + LoadTestTimestampLayout = "2006-01-02-15-04-05" ) func RunGroupFromTestPlans(testPlans []TestPlan, machineInfo *MachineInfo) RunGroup { diff --git a/runner/clients/baserethnode/client.go b/runner/clients/baserethnode/client.go index 844d33b3..3a84cff4 100644 --- a/runner/clients/baserethnode/client.go +++ b/runner/clients/baserethnode/client.go @@ -276,3 +276,7 @@ func (r *BaseRethNodeClient) FlashblocksClient() types.FlashblocksClient { func (r *BaseRethNodeClient) SupportsFlashblocks() bool { return true } + +func (r *BaseRethNodeClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/builder/client.go b/runner/clients/builder/client.go index b229905c..605f93e1 100644 --- a/runner/clients/builder/client.go +++ b/runner/clients/builder/client.go @@ -128,3 +128,12 @@ func (r *BuilderClient) FlashblocksClient() types.FlashblocksClient { func (r *BuilderClient) SupportsFlashblocks() bool { return false } + +// FlashblocksWsURL returns the local WebSocket URL of the flashblocks server +// hosted by the builder. +func (r *BuilderClient) FlashblocksWsURL() string { + if r.websocketPort == 0 { + return "" + } + return fmt.Sprintf("ws://localhost:%d", r.websocketPort) +} diff --git a/runner/clients/builder/metrics.go b/runner/clients/builder/metrics.go index d6f83366..5c0152ae 100644 --- a/runner/clients/builder/metrics.go +++ b/runner/clients/builder/metrics.go @@ -5,11 +5,17 @@ import ( "context" "fmt" "io" + "math" "net/http" + "sort" + "strconv" + "strings" + "unicode" "github.com/base/base-bench/runner/metrics" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" ) @@ -19,6 +25,7 @@ type metricsCollector struct { client *ethclient.Client metrics []metrics.BlockMetrics metricsPort int + prevMetrics map[string]*io_prometheus_client.Metric } func newMetricsCollector(log log.Logger, client *ethclient.Client, metricsPort int) metrics.Collector { @@ -27,6 +34,7 @@ func newMetricsCollector(log log.Logger, client *ethclient.Client, metricsPort i client: client, metricsPort: metricsPort, metrics: make([]metrics.BlockMetrics, 0), + prevMetrics: make(map[string]*io_prometheus_client.Metric), } } @@ -40,15 +48,102 @@ func (r *metricsCollector) GetMetrics() []metrics.BlockMetrics { func (r *metricsCollector) GetMetricTypes() map[string]bool { return map[string]bool{ - "reth_sync_execution_execution_duration": true, - "reth_sync_block_validation_state_root_duration": true, - "reth_op_rbuilder_block_built_success": true, - "reth_op_rbuilder_flashblock_count": true, - "reth_op_rbuilder_total_block_built_duration": true, - "reth_op_rbuilder_flashblock_build_duration": true, - "reth_op_rbuilder_state_root_calculation_duration": true, - "reth_op_rbuilder_sequencer_tx_duration": true, - "reth_op_rbuilder_payload_tx_simulation_duration": true, + "reth_sync_execution_execution_duration": true, + "reth_sync_block_validation_state_root_duration": true, + "reth_op_rbuilder_block_built_success": true, + "reth_op_rbuilder_flashblock_count": true, + "reth_op_rbuilder_total_block_built_duration": true, + "reth_op_rbuilder_flashblock_build_duration": true, + "reth_op_rbuilder_state_root_calculation_duration": true, + "reth_op_rbuilder_sequencer_tx_duration": true, + "reth_op_rbuilder_payload_tx_simulation_duration": true, + "reth_base_builder_block_built_success": true, + "reth_base_builder_flashblock_count": true, + "reth_base_builder_total_block_built_duration": true, + "reth_base_builder_flashblock_build_duration": true, + "reth_base_builder_state_root_calculation_duration": true, + "reth_base_builder_state_root_time_per_gas_ratio": true, + "reth_base_builder_sequencer_tx_duration": true, + "reth_base_builder_payload_transaction_simulation_duration": true, + "reth_base_builder_payload_tx_simulation_duration": true, + "reth_base_builder_tx_simulation_duration": true, + "reth_base_builder_transaction_pool_fetch_duration": true, + "reth_base_builder_transaction_pool_fetch_gauge": true, + "reth_base_builder_state_transition_merge_duration": true, + "reth_base_builder_state_transition_merge_gauge": true, + "reth_base_builder_payload_num_tx_considered": true, + "reth_base_builder_payload_num_tx_considered_gauge": true, + "reth_base_builder_payload_num_tx": true, + "reth_base_builder_payload_num_tx_gauge": true, + "reth_base_builder_payload_num_tx_simulated": true, + "reth_base_builder_payload_num_tx_simulated_gauge": true, + "reth_base_builder_payload_num_tx_simulated_success": true, + "reth_base_builder_payload_num_tx_simulated_success_gauge": true, + "reth_base_builder_payload_num_tx_simulated_fail": true, + "reth_base_builder_payload_num_tx_simulated_fail_gauge": true, + "reth_base_builder_payload_reverted_tx_gas_used": true, + "reth_base_builder_reverted_tx_gas_used": true, + "reth_base_builder_successful_tx_gas_used": true, + "reth_base_builder_tx_accounts_modified": true, + "reth_base_builder_tx_storage_slots_modified": true, + "reth_base_builder_rejection_cache_hits": true, + "reth_base_builder_rejection_cache_insertions": true, + "reth_base_builder_rejection_cache_size": true, + "reth_base_builder_metering_data_pending_skip": true, + "reth_base_builder_gas_limit_exceeded_total": true, + "reth_base_builder_tx_da_size_exceeded_total": true, + "reth_base_builder_block_da_size_exceeded_total": true, + "reth_base_builder_da_footprint_exceeded_total": true, + "reth_base_builder_block_uncompressed_size_exceeded_total": true, + "reth_base_builder_block_uncompressed_size": true, + "reth_base_builder_resource_limit_would_reject_total": true, + "reth_base_builder_tx_execution_time_exceeded_total": true, + "reth_base_builder_flashblock_execution_time_exceeded_total": true, + "reth_base_builder_block_state_root_gas_exceeded_total": true, + "reth_base_builder_flashblock_txs_considered": true, + "reth_base_builder_flashblock_txs_included": true, + "reth_base_builder_flashblock_txs_rejected": true, + "reth_base_builder_flashblock_selection_total": true, + "reth_base_builder_flashblock_rejections_total": true, + "reth_base_builder_flashblock_gas_headroom": true, + "reth_base_builder_flashblock_gas_headroom_pct": true, + "reth_base_builder_flashblock_da_bytes_used": true, + "reth_base_builder_flashblock_da_headroom_bytes": true, + "reth_base_builder_flashblock_execution_time_used_us": true, + "reth_base_builder_flashblock_execution_time_headroom_us": true, + "reth_base_builder_flashblock_state_root_gas_used": true, + "reth_base_builder_flashblock_state_root_gas_headroom": true, + "reth_base_builder_flashblock_byte_size_histogram": true, + "reth_base_builder_flashblock_num_tx_histogram": true, + "reth_base_builder_payload_byte_size": true, + "reth_base_builder_payload_byte_size_gauge": true, + "reth_base_builder_tx_byte_size": true, + "reth_base_builder_block_state_root_gas": true, + "reth_base_builder_flashblocks_time_drift": true, + "reth_base_builder_first_flashblock_time_offset": true, + "reth_base_builder_reduced_flashblocks_number": true, + "reth_base_builder_missing_flashblocks_count": true, + "reth_storage_providers_database_save_blocks_total": true, + "reth_storage_providers_database_save_blocks_block_count_last": true, + "reth_storage_providers_database_save_blocks_commit_sf": true, + "reth_storage_providers_database_save_blocks_commit_mdbx": true, + "reth_storage_providers_database_save_blocks_write_state": true, + "reth_storage_providers_database_save_blocks_write_hashed_state": true, + "reth_storage_providers_database_save_blocks_write_trie_updates": true, + "reth_storage_providers_database_save_blocks_sf": true, + "reth_trie_leaves_added": true, + "reth_trie_branches_added": true, + "reth_tree_root_sparse_trie_total_duration_histogram": true, + "reth_tree_root_sparse_trie_final_update_duration_histogram": true, + "reth_parallel_sparse_trie_subtrie_hash_update_latency": true, + "reth_parallel_sparse_trie_subtrie_upper_hash_latency": true, + "reth_trie_proof_task_storage_worker_idle_time_seconds": true, + "reth_trie_proof_task_account_worker_idle_time_seconds": true, + "reth_trie_proof_task_blinded_storage_nodes": true, + "reth_trie_proof_task_blinded_account_nodes": true, + "reth_trie_cursor_overall_duration": true, + "reth_trie_hashed_cursor_overall_duration": true, + "reth_db_freelist": true, } } @@ -73,21 +168,260 @@ func (r *metricsCollector) Collect(ctx context.Context, m *metrics.BlockMetrics) } metricTypes := r.GetMetricTypes() + m.SetPreviousPrometheusMetrics(r.prevMetrics) for _, metric := range metrics { name := metric.GetName() if metricTypes[name] { metricVal := metric.GetMetric() - if len(metricVal) != 1 { - r.log.Warn("expected 1 metric, got %d for metric %s", len(metricVal), name) - } - err = m.UpdatePrometheusMetric(name, metricVal[0]) - if err != nil { - r.log.Warn("failed to add metric %s: %s", name, err) + for _, value := range metricVal { + metricName := r.prometheusMetricName(name, value) + r.addPrometheusMetric(m, name, metricName, value, true) + + if metricName != name && len(metricVal) == 1 { + r.addPrometheusMetric(m, name, name, value, false) + } } } } + r.prevMetrics = m.PreviousPrometheusMetrics() r.metrics = append(r.metrics, *m.Copy()) return nil } + +func (r *metricsCollector) addPrometheusMetric(m *metrics.BlockMetrics, rawName string, name string, metric *io_prometheus_client.Metric, recordSample bool) { + prevMetric := m.PreviousPrometheusMetric(name) + r.addHistogramQuantiles(m, name, metric, prevMetric) + if recordSample { + m.AddPrometheusMetricSample(r.prometheusMetricSample(rawName, name, metric, prevMetric)) + } + + err := m.UpdatePrometheusMetric(name, metric) + if err != nil { + r.log.Warn("failed to add metric %s: %s", name, err) + } + r.addSummaryQuantiles(m, name, metric) +} + +func (r *metricsCollector) prometheusMetricSample(rawName string, key string, metric *io_prometheus_client.Metric, prevMetric *io_prometheus_client.Metric) metrics.PrometheusMetricSample { + sample := metrics.PrometheusMetricSample{ + Name: rawName, + Key: key, + Labels: r.prometheusLabels(metric), + } + + if metric.Gauge != nil { + sample.Type = "gauge" + if metric.Gauge.Value != nil && !math.IsNaN(*metric.Gauge.Value) { + sample.Value = prometheusFloat64Ptr(*metric.Gauge.Value) + } + return sample + } + + if metric.Counter != nil { + sample.Type = "counter" + if metric.Counter.Value != nil && !math.IsNaN(*metric.Counter.Value) { + value := *metric.Counter.Value + sample.Value = prometheusFloat64Ptr(value) + if prevMetric != nil && prevMetric.Counter != nil && prevMetric.Counter.Value != nil { + sample.Delta = prometheusFloat64Ptr(r.deltaFloat64(value, *prevMetric.Counter.Value)) + } else { + sample.Delta = prometheusFloat64Ptr(value) + } + } + return sample + } + + if metric.Histogram != nil { + sample.Type = "histogram" + histogram := metric.Histogram + if histogram.SampleSum != nil && !math.IsNaN(*histogram.SampleSum) { + sum := *histogram.SampleSum + sample.Sum = prometheusFloat64Ptr(sum) + if prevMetric != nil && prevMetric.Histogram != nil && prevMetric.Histogram.SampleSum != nil { + sample.SumDelta = prometheusFloat64Ptr(r.deltaFloat64(sum, *prevMetric.Histogram.SampleSum)) + } else { + sample.SumDelta = prometheusFloat64Ptr(sum) + } + } + if histogram.SampleCount != nil { + count := *histogram.SampleCount + sample.Count = prometheusUint64Ptr(count) + if prevMetric != nil && prevMetric.Histogram != nil && prevMetric.Histogram.SampleCount != nil { + countDelta := r.deltaUint64(count, *prevMetric.Histogram.SampleCount) + sample.CountDelta = &countDelta + } else { + sample.CountDelta = prometheusUint64Ptr(count) + } + } + return sample + } + + if metric.Summary != nil { + sample.Type = "summary" + summary := metric.Summary + if summary.SampleSum != nil && !math.IsNaN(*summary.SampleSum) { + sum := *summary.SampleSum + sample.Sum = prometheusFloat64Ptr(sum) + if prevMetric != nil && prevMetric.Summary != nil && prevMetric.Summary.SampleSum != nil { + sample.SumDelta = prometheusFloat64Ptr(r.deltaFloat64(sum, *prevMetric.Summary.SampleSum)) + } else { + sample.SumDelta = prometheusFloat64Ptr(sum) + } + } + if summary.SampleCount != nil { + count := *summary.SampleCount + sample.Count = prometheusUint64Ptr(count) + if prevMetric != nil && prevMetric.Summary != nil && prevMetric.Summary.SampleCount != nil { + countDelta := r.deltaUint64(count, *prevMetric.Summary.SampleCount) + sample.CountDelta = &countDelta + } else { + sample.CountDelta = prometheusUint64Ptr(count) + } + } + if len(summary.Quantile) > 0 { + sample.Quantiles = make(map[string]float64, len(summary.Quantile)) + for _, quantile := range summary.Quantile { + sample.Quantiles[r.formatQuantile(quantile.GetQuantile())] = quantile.GetValue() + } + } + return sample + } + + sample.Type = "unknown" + return sample +} + +func (r *metricsCollector) prometheusMetricName(name string, metric *io_prometheus_client.Metric) string { + labels := metric.GetLabel() + if len(labels) == 0 { + return name + } + + parts := make([]string, 0, len(labels)) + for _, label := range labels { + parts = append(parts, r.sanitizeMetricPart(label.GetName())+"_"+r.sanitizeMetricPart(label.GetValue())) + } + sort.Strings(parts) + return name + "_" + strings.Join(parts, "_") +} + +func (r *metricsCollector) prometheusLabels(metric *io_prometheus_client.Metric) map[string]string { + labels := metric.GetLabel() + if len(labels) == 0 { + return nil + } + + result := make(map[string]string, len(labels)) + for _, label := range labels { + result[label.GetName()] = label.GetValue() + } + return result +} + +func (r *metricsCollector) addSummaryQuantiles(m *metrics.BlockMetrics, name string, metric *io_prometheus_client.Metric) { + if metric.Summary == nil { + return + } + + for _, quantile := range metric.Summary.GetQuantile() { + m.AddExecutionMetric(name+"_quantile_"+r.formatQuantile(quantile.GetQuantile()), quantile.GetValue()) + } +} + +func (r *metricsCollector) addHistogramQuantiles(m *metrics.BlockMetrics, name string, metric *io_prometheus_client.Metric, prevMetric *io_prometheus_client.Metric) { + if metric.Histogram == nil { + return + } + + var prevHistogram *io_prometheus_client.Histogram + if prevMetric != nil { + prevHistogram = prevMetric.Histogram + } + + for _, quantile := range []float64{0.5, 0.9, 0.99} { + value, ok := r.histogramQuantile(quantile, metric.Histogram, prevHistogram) + if ok { + m.AddExecutionMetric(name+"_quantile_"+r.formatQuantile(quantile), value) + } + } +} + +func (r *metricsCollector) histogramQuantile(quantile float64, histogram *io_prometheus_client.Histogram, prevHistogram *io_prometheus_client.Histogram) (float64, bool) { + if histogram == nil || histogram.SampleCount == nil || len(histogram.Bucket) == 0 { + return 0, false + } + + prevCount := uint64(0) + if prevHistogram != nil && prevHistogram.SampleCount != nil { + prevCount = *prevHistogram.SampleCount + } + count := r.deltaUint64(*histogram.SampleCount, prevCount) + if count == 0 { + return 0, false + } + + rank := quantile * float64(count) + lastFiniteUpperBound := 0.0 + hasFiniteUpperBound := false + for i, bucket := range histogram.Bucket { + if bucket.CumulativeCount == nil || bucket.UpperBound == nil { + continue + } + if !math.IsInf(*bucket.UpperBound, 0) { + lastFiniteUpperBound = *bucket.UpperBound + hasFiniteUpperBound = true + } + + prevBucketCount := uint64(0) + if prevHistogram != nil && i < len(prevHistogram.Bucket) && prevHistogram.Bucket[i].CumulativeCount != nil { + prevBucketCount = *prevHistogram.Bucket[i].CumulativeCount + } + + bucketCount := r.deltaUint64(*bucket.CumulativeCount, prevBucketCount) + if float64(bucketCount) >= rank { + if math.IsInf(*bucket.UpperBound, 0) { + return lastFiniteUpperBound, hasFiniteUpperBound + } + return *bucket.UpperBound, true + } + } + + return 0, false +} + +func (r *metricsCollector) deltaUint64(current uint64, previous uint64) uint64 { + if current < previous { + return current + } + return current - previous +} + +func (r *metricsCollector) deltaFloat64(current float64, previous float64) float64 { + if current < previous { + return current + } + return current - previous +} + +func (r *metricsCollector) formatQuantile(quantile float64) string { + return strings.ReplaceAll(strconv.FormatFloat(quantile, 'f', -1, 64), ".", "_") +} + +func prometheusFloat64Ptr(value float64) *float64 { + return &value +} + +func prometheusUint64Ptr(value uint64) *uint64 { + return &value +} + +func (r *metricsCollector) sanitizeMetricPart(value string) string { + return strings.Map(func(r rune) rune { + if unicode.IsLetter(r) || unicode.IsDigit(r) { + return r + } + return '_' + }, value) +} diff --git a/runner/clients/builder/metrics_test.go b/runner/clients/builder/metrics_test.go new file mode 100644 index 00000000..eb0310d4 --- /dev/null +++ b/runner/clients/builder/metrics_test.go @@ -0,0 +1,171 @@ +package builder + +import ( + "math" + "testing" + + io_prometheus_client "github.com/prometheus/client_model/go" +) + +func TestPrometheusMetricNameSortsAndSanitizesLabels(t *testing.T) { + collector := &metricsCollector{} + metric := &io_prometheus_client.Metric{ + Label: []*io_prometheus_client.LabelPair{ + {Name: stringPtr("type"), Value: stringPtr("account.worker")}, + {Name: stringPtr("configname"), Value: stringPtr("mainnet/snapshot")}, + }, + } + + got := collector.prometheusMetricName("reth_example_metric", metric) + want := "reth_example_metric_configname_mainnet_snapshot_type_account_worker" + if got != want { + t.Fatalf("prometheusMetricName() = %q, want %q", got, want) + } +} + +func TestHistogramQuantileUsesIntervalBuckets(t *testing.T) { + collector := &metricsCollector{} + prev := histogramMetric( + 10, + bucket(1, 5), + bucket(2, 9), + bucket(3, 10), + ) + current := histogramMetric( + 20, + bucket(1, 8), + bucket(2, 17), + bucket(3, 20), + ) + + got, ok := collector.histogramQuantile(0.5, current.Histogram, prev.Histogram) + if !ok { + t.Fatal("histogramQuantile() did not return a value") + } + if got != 2 { + t.Fatalf("histogramQuantile(0.5) = %f, want 2", got) + } + + got, ok = collector.histogramQuantile(0.9, current.Histogram, prev.Histogram) + if !ok { + t.Fatal("histogramQuantile() did not return a value") + } + if got != 3 { + t.Fatalf("histogramQuantile(0.9) = %f, want 3", got) + } +} + +func TestHistogramQuantileAvoidsInfiniteUpperBound(t *testing.T) { + collector := &metricsCollector{} + current := histogramMetric( + 10, + bucket(1, 5), + bucket(math.Inf(1), 10), + ) + + got, ok := collector.histogramQuantile(0.9, current.Histogram, nil) + if !ok { + t.Fatal("histogramQuantile() did not return a value") + } + if got != 1 { + t.Fatalf("histogramQuantile(0.9) = %f, want 1", got) + } +} + +func TestPrometheusMetricSamplePreservesLabelsAndHistogramDeltas(t *testing.T) { + collector := &metricsCollector{} + prev := histogramMetric( + 10, + bucket(1, 5), + bucket(2, 10), + ) + *prev.Histogram.SampleSum = 100 + + current := histogramMetric( + 16, + bucket(1, 8), + bucket(2, 16), + ) + *current.Histogram.SampleSum = 172 + current.Label = []*io_prometheus_client.LabelPair{ + {Name: stringPtr("flashblock_index"), Value: stringPtr("7")}, + } + + sample := collector.prometheusMetricSample( + "reth_base_builder_flashblock_txs_considered", + "reth_base_builder_flashblock_txs_considered_flashblock_index_7", + current, + prev, + ) + + if sample.Name != "reth_base_builder_flashblock_txs_considered" { + t.Fatalf("Name = %q", sample.Name) + } + if sample.Key != "reth_base_builder_flashblock_txs_considered_flashblock_index_7" { + t.Fatalf("Key = %q", sample.Key) + } + if sample.Labels["flashblock_index"] != "7" { + t.Fatalf("flashblock_index label = %q", sample.Labels["flashblock_index"]) + } + if sample.Count == nil || *sample.Count != 16 { + t.Fatalf("Count = %v, want 16", sample.Count) + } + if sample.CountDelta == nil || *sample.CountDelta != 6 { + t.Fatalf("CountDelta = %v, want 6", sample.CountDelta) + } + if sample.Sum == nil || *sample.Sum != 172 { + t.Fatalf("Sum = %v, want 172", sample.Sum) + } + if sample.SumDelta == nil || *sample.SumDelta != 72 { + t.Fatalf("SumDelta = %v, want 72", sample.SumDelta) + } +} + +func TestBuilderMetricTypesIncludeFlashblockDiagnostics(t *testing.T) { + collector := &metricsCollector{} + metricTypes := collector.GetMetricTypes() + + for _, name := range []string{ + "reth_base_builder_flashblock_txs_considered", + "reth_base_builder_flashblock_txs_included", + "reth_base_builder_flashblock_txs_rejected", + "reth_base_builder_flashblock_selection_total", + "reth_base_builder_flashblock_rejections_total", + "reth_base_builder_transaction_pool_fetch_duration", + "reth_base_builder_state_transition_merge_duration", + "reth_base_builder_flashblock_count", + } { + if !metricTypes[name] { + t.Fatalf("GetMetricTypes()[%q] = false, want true", name) + } + } +} + +func histogramMetric(count uint64, buckets ...*io_prometheus_client.Bucket) *io_prometheus_client.Metric { + return &io_prometheus_client.Metric{ + Histogram: &io_prometheus_client.Histogram{ + SampleCount: uint64Ptr(count), + SampleSum: float64Ptr(0), + Bucket: buckets, + }, + } +} + +func bucket(upperBound float64, count uint64) *io_prometheus_client.Bucket { + return &io_prometheus_client.Bucket{ + UpperBound: float64Ptr(upperBound), + CumulativeCount: uint64Ptr(count), + } +} + +func stringPtr(value string) *string { + return &value +} + +func float64Ptr(value float64) *float64 { + return &value +} + +func uint64Ptr(value uint64) *uint64 { + return &value +} diff --git a/runner/clients/common/proxy/proxy.go b/runner/clients/common/proxy/proxy.go index f261c39e..b3697198 100644 --- a/runner/clients/common/proxy/proxy.go +++ b/runner/clients/common/proxy/proxy.go @@ -17,9 +17,11 @@ import ( "fmt" "io" "net/http" + "sync" "github.com/base/base-bench/runner/network/mempool" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" ethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" ) @@ -31,6 +33,22 @@ type ProxyServer struct { pendingTxs []*ethTypes.Transaction clientURL string mempool *mempool.StaticWorkloadMempool + nextNonce map[common.Address]uint64 + mu sync.Mutex +} + +type rpcRequest struct { + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` + JSONRPC string `json:"jsonrpc"` +} + +type rpcResponse struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error interface{} `json:"error,omitempty"` } func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool.StaticWorkloadMempool) *ProxyServer { @@ -39,6 +57,7 @@ func NewProxyServer(clientURL string, log log.Logger, port int, mempool *mempool log: log, port: port, mempool: mempool, + nextNonce: make(map[common.Address]uint64), } } @@ -61,12 +80,13 @@ func (p *ProxyServer) Run(ctx context.Context) error { return nil } -func (p *ProxyServer) PendingTxs() []*ethTypes.Transaction { - return p.pendingTxs -} +func (p *ProxyServer) DrainPendingTxs() []*ethTypes.Transaction { + p.mu.Lock() + defer p.mu.Unlock() -func (p *ProxyServer) ClearPendingTxs() { + txs := p.pendingTxs p.pendingTxs = make([]*ethTypes.Transaction, 0) + return txs } // Stop stops both the proxy server and the underlying client @@ -89,13 +109,13 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { return } - var request struct { - Method string `json:"method"` - Params json.RawMessage `json:"params"` - ID interface{} `json:"id"` - JSONRPC string `json:"jsonrpc"` + if len(body) > 0 && body[0] == '[' { + p.handleBatchRequest(w, body) + return } + var request rpcRequest + if err := json.Unmarshal(body, &request); err != nil { http.Error(w, "Error parsing request", http.StatusBadRequest) return @@ -108,11 +128,7 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { } if handled { - resp := struct { - JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` - Result json.RawMessage `json:"result"` - }{ + resp := rpcResponse{ JSONRPC: request.JSONRPC, ID: request.ID, Result: response, @@ -164,6 +180,78 @@ func (p *ProxyServer) handleRequest(w http.ResponseWriter, r *http.Request) { p.DebugResponse(request.Method, request.Params, respBody) } +func (p *ProxyServer) handleBatchRequest(w http.ResponseWriter, body []byte) { + var requests []rpcRequest + if err := json.Unmarshal(body, &requests); err != nil { + http.Error(w, "Error parsing batch request", http.StatusBadRequest) + return + } + + responses := make([]rpcResponse, 0, len(requests)) + for _, request := range requests { + handled, result, err := p.OverrideRequest(request.Method, request.Params) + response := rpcResponse{ + JSONRPC: "2.0", + ID: request.ID, + } + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else if handled { + response.Result = result + } else { + forwardedResponse, err := p.forwardRPCRequest(request) + if err != nil { + response.Error = map[string]interface{}{"code": -32000, "message": err.Error()} + } else { + response = forwardedResponse + } + } + responses = append(responses, response) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(responses); err != nil { + p.log.Error("Error encoding batch response", "err", err) + } +} + +func (p *ProxyServer) forwardRPCRequest(request rpcRequest) (rpcResponse, error) { + body, err := json.Marshal(request) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to marshal upstream request: %w", err) + } + + resp, err := http.Post(p.clientURL, "application/json", bytes.NewReader(body)) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to forward request: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + p.log.Error("Error closing response body", "err", err) + } + }() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return rpcResponse{}, fmt.Errorf("failed to read upstream response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return rpcResponse{}, fmt.Errorf("upstream request returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var forwardedResponse rpcResponse + if err := json.Unmarshal(respBody, &forwardedResponse); err != nil { + return rpcResponse{}, fmt.Errorf("failed to decode upstream response: %w", err) + } + if forwardedResponse.JSONRPC == "" { + forwardedResponse.JSONRPC = request.JSONRPC + } + if forwardedResponse.ID == nil { + forwardedResponse.ID = request.ID + } + return forwardedResponse, nil +} + func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) (bool, json.RawMessage, error) { switch method { case "eth_getTransactionCount": @@ -171,8 +259,22 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) if err := json.Unmarshal(rawParams, ¶ms); err != nil { return false, nil, fmt.Errorf("failed to unmarshal params: %w", err) } + if len(params) == 0 { + return false, nil, fmt.Errorf("no params found") + } - nonce := p.mempool.GetTransactionCount(common.HexToAddress(params[0])) + address := common.HexToAddress(params[0]) + nonce, err := p.upstreamTransactionCount(rawParams) + if err != nil { + if observedNonce, ok := p.observedTransactionCount(address); ok { + jsonResponse, _ := json.Marshal(fmt.Sprintf("0x%x", observedNonce)) + return true, jsonResponse, nil + } + return false, nil, err + } + if observedNonce, ok := p.observedTransactionCount(address); ok && observedNonce > nonce { + nonce = observedNonce + } jsonResponse, _ := json.Marshal(fmt.Sprintf("0x%x", nonce)) return true, jsonResponse, nil @@ -204,7 +306,7 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) return false, nil, fmt.Errorf("failed to decode transaction: %w", err) } - p.pendingTxs = append(p.pendingTxs, &tx) + p.recordPendingTransaction(&tx) txHash := tx.Hash().Hex() jsonResponse, _ := json.Marshal(txHash) @@ -214,10 +316,101 @@ func (p *ProxyServer) OverrideRequest(method string, rawParams json.RawMessage) } } +func (p *ProxyServer) upstreamTransactionCount(rawParams json.RawMessage) (uint64, error) { + body, err := json.Marshal(struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + }{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_getTransactionCount", + Params: rawParams, + }) + if err != nil { + return 0, fmt.Errorf("failed to marshal upstream nonce request: %w", err) + } + + resp, err := http.Post(p.clientURL, "application/json", bytes.NewReader(body)) + if err != nil { + return 0, fmt.Errorf("failed to fetch upstream transaction count: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + p.log.Error("Error closing response body", "err", err) + } + }() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read upstream transaction count response: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("upstream transaction count request returned status %d: %s", resp.StatusCode, string(respBody)) + } + + var rpcResp struct { + Result json.RawMessage `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` + } + if err := json.Unmarshal(respBody, &rpcResp); err != nil { + return 0, fmt.Errorf("failed to decode upstream transaction count response: %w", err) + } + if rpcResp.Error != nil { + return 0, fmt.Errorf("upstream transaction count error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + var nonceHex string + if err := json.Unmarshal(rpcResp.Result, &nonceHex); err != nil { + return 0, fmt.Errorf("failed to decode upstream transaction count result: %w", err) + } + nonce, err := hexutil.DecodeUint64(nonceHex) + if err != nil { + return 0, fmt.Errorf("failed to parse upstream transaction count %q: %w", nonceHex, err) + } + return nonce, nil +} + +func (p *ProxyServer) observedTransactionCount(address common.Address) (uint64, bool) { + p.mu.Lock() + defer p.mu.Unlock() + + nonce, ok := p.nextNonce[address] + return nonce, ok +} + +func (p *ProxyServer) recordPendingTransaction(tx *ethTypes.Transaction) { + from, err := ethTypes.Sender(ethTypes.LatestSignerForChainID(tx.ChainId()), tx) + if err != nil { + p.log.Warn("failed to recover sender for observed transaction", "err", err, "hash", tx.Hash()) + p.mu.Lock() + p.pendingTxs = append(p.pendingTxs, tx) + p.mu.Unlock() + return + } + + nextNonce := tx.Nonce() + 1 + p.mu.Lock() + p.pendingTxs = append(p.pendingTxs, tx) + if nextNonce > p.nextNonce[from] { + p.nextNonce[from] = nextNonce + } + p.mu.Unlock() +} + func (p *ProxyServer) DebugResponse(method string, params json.RawMessage, respBody []byte) { p.log.Debug("method", "method", method) p.log.Debug("params", "params", params) + if !bytes.HasPrefix(respBody, []byte{0x1f, 0x8b}) { + p.log.Debug("Response body", "body", string(respBody)) + return + } + gzipReader, err := gzip.NewReader(bytes.NewReader(respBody)) if err != nil { p.log.Error("Error creating gzip reader", "err", err) diff --git a/runner/clients/common/proxy/proxy_test.go b/runner/clients/common/proxy/proxy_test.go new file mode 100644 index 00000000..013f3ebe --- /dev/null +++ b/runner/clients/common/proxy/proxy_test.go @@ -0,0 +1,368 @@ +package proxy + +import ( + "bytes" + "encoding/json" + "math/big" + "net/http" + "net/http/httptest" + "testing" + + "github.com/base/base-bench/runner/network/mempool" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" +) + +func TestHandleBatchRequestCapturesRawTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + server := NewProxyServer( + "http://127.0.0.1:8545", + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[0].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func TestHandleBatchRequestForwardsPassThroughMethods(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpcRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + t.Fatalf("decode upstream request: %v", err) + } + if req.Method != "eth_chainId" { + t.Fatalf("expected eth_chainId, got %s", req.Method) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": req.ID, + "result": "0x2105", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), big.NewInt(8453)), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 7, + "method": "eth_chainId", + "params": []string{}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 1 { + t.Fatalf("expected 1 response, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful response, got error %v", responses[0].Error) + } + if responses[0].Result != "0x2105" { + t.Fatalf("expected forwarded result 0x2105, got %s", responses[0].Result) + } +} + +func TestHandleBatchRequestSupportsMixedForwardAndCapture(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTx(t, chainID) + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req rpcRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + t.Fatalf("decode upstream request: %v", err) + } + if req.Method != "eth_gasPrice" { + t.Fatalf("expected eth_gasPrice, got %s", req.Method) + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": req.ID, + "result": "0x3b9aca00", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + body, err := json.Marshal([]map[string]any{ + { + "jsonrpc": "2.0", + "id": 0, + "method": "eth_gasPrice", + "params": []string{}, + }, + { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_sendRawTransaction", + "params": []string{hexutil.Encode(rawTx)}, + }, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var responses []struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &responses); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if len(responses) != 2 { + t.Fatalf("expected 2 responses, got %d", len(responses)) + } + if responses[0].Error != nil { + t.Fatalf("expected successful forwarded response, got error %v", responses[0].Error) + } + if responses[0].Result != "0x3b9aca00" { + t.Fatalf("expected forwarded gas price, got %s", responses[0].Result) + } + if responses[1].Error != nil { + t.Fatalf("expected successful captured tx response, got error %v", responses[1].Error) + } + if responses[1].Result != tx.Hash().Hex() { + t.Fatalf("expected tx hash %s, got %s", tx.Hash().Hex(), responses[1].Result) + } + + pending := server.DrainPendingTxs() + if len(pending) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(pending)) + } + if pending[0].Hash() != tx.Hash() { + t.Fatalf("expected pending tx %s, got %s", tx.Hash(), pending[0].Hash()) + } +} + +func TestGetTransactionCountForwardsUpstreamNonce(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xfa", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), big.NewInt(8453)), + ) + + result := callProxyRPC(t, server, "eth_getTransactionCount", []string{common.Address{1}.Hex(), "pending"}) + if result != "0xfa" { + t.Fatalf("expected upstream nonce 0xfa, got %s", result) + } +} + +func TestGetTransactionCountIncludesObservedPendingTransactions(t *testing.T) { + chainID := big.NewInt(8453) + tx := signedTestTxWithNonce(t, chainID, 250) + from, err := types.Sender(types.LatestSignerForChainID(chainID), tx) + if err != nil { + t.Fatalf("recover sender: %v", err) + } + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xfa", + }); err != nil { + t.Fatalf("encode upstream response: %v", err) + } + })) + defer upstream.Close() + + server := NewProxyServer( + upstream.URL, + log.New(), + 0, + mempool.NewStaticWorkloadMempool(log.New(), chainID), + ) + + rawTx, err := tx.MarshalBinary() + if err != nil { + t.Fatalf("marshal tx: %v", err) + } + callProxyRPC(t, server, "eth_sendRawTransaction", []string{hexutil.Encode(rawTx)}) + + result := callProxyRPC(t, server, "eth_getTransactionCount", []string{from.Hex(), "pending"}) + if result != "0xfb" { + t.Fatalf("expected observed nonce 0xfb, got %s", result) + } +} + +func callProxyRPC(t *testing.T, server *ProxyServer, method string, params any) string { + t.Helper() + + body, err := json.Marshal(map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params, + }) + if err != nil { + t.Fatalf("marshal request: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + rec := httptest.NewRecorder() + server.handleRequest(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } + + var response struct { + Result string `json:"result"` + Error map[string]any `json:"error"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &response); err != nil { + t.Fatalf("unmarshal response: %v", err) + } + if response.Error != nil { + t.Fatalf("expected successful response, got error %v", response.Error) + } + return response.Result +} + +func signedTestTx(t *testing.T, chainID *big.Int) *types.Transaction { + return signedTestTxWithNonce(t, chainID, 0) +} + +func signedTestTxWithNonce(t *testing.T, chainID *big.Int, nonce uint64) *types.Transaction { + t.Helper() + + key, err := crypto.GenerateKey() + if err != nil { + t.Fatalf("generate key: %v", err) + } + + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: chainID, + Nonce: nonce, + GasTipCap: big.NewInt(1), + GasFeeCap: big.NewInt(1), + Gas: 21_000, + To: &common.Address{1}, + Value: big.NewInt(1), + }) + + signed, err := types.SignTx(tx, types.NewIsthmusSigner(chainID), key) + if err != nil { + t.Fatalf("sign tx: %v", err) + } + return signed +} diff --git a/runner/clients/geth/client.go b/runner/clients/geth/client.go index 89e40b67..a4efd25b 100644 --- a/runner/clients/geth/client.go +++ b/runner/clients/geth/client.go @@ -280,3 +280,7 @@ func (g *GethClient) FlashblocksClient() types.FlashblocksClient { func (g *GethClient) SupportsFlashblocks() bool { return false } + +func (g *GethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/reth/client.go b/runner/clients/reth/client.go index 7a4a25c8..35739d4e 100644 --- a/runner/clients/reth/client.go +++ b/runner/clients/reth/client.go @@ -296,3 +296,7 @@ func (r *RethClient) FlashblocksClient() types.FlashblocksClient { func (r *RethClient) SupportsFlashblocks() bool { return true } + +func (r *RethClient) FlashblocksWsURL() string { + return "" +} diff --git a/runner/clients/types/types.go b/runner/clients/types/types.go index a0be0080..c88b6212 100644 --- a/runner/clients/types/types.go +++ b/runner/clients/types/types.go @@ -32,4 +32,7 @@ type ExecutionClient interface { SetHead(ctx context.Context, blockNumber uint64) error FlashblocksClient() FlashblocksClient // returns nil for clients that don't support flashblocks SupportsFlashblocks() bool // returns true if the client supports receiving flashblock payloads + // FlashblocksWsURL returns the local WebSocket URL hosted by this client, + // or an empty string when the client does not expose one. + FlashblocksWsURL() string } diff --git a/runner/metrics/metrics_interface.go b/runner/metrics/metrics_interface.go index 00c0811f..59f56f59 100644 --- a/runner/metrics/metrics_interface.go +++ b/runner/metrics/metrics_interface.go @@ -19,18 +19,36 @@ type Collector interface { } type BlockMetrics struct { - BlockNumber uint64 - Timestamp time.Time - prevMetrics map[string]*io_prometheus_client.Metric - ExecutionMetrics map[string]interface{} + BlockNumber uint64 + Timestamp time.Time + prevMetrics map[string]*io_prometheus_client.Metric + ExecutionMetrics map[string]interface{} + PrometheusMetrics []PrometheusMetricSample `json:",omitempty"` +} + +// PrometheusMetricSample preserves a raw Prometheus sample alongside the +// flattened ExecutionMetrics values used by the report UI. +type PrometheusMetricSample struct { + Name string `json:"name"` + Key string `json:"key,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Type string `json:"type"` + Value *float64 `json:"value,omitempty"` + Delta *float64 `json:"delta,omitempty"` + Sum *float64 `json:"sum,omitempty"` + Count *uint64 `json:"count,omitempty"` + SumDelta *float64 `json:"sumDelta,omitempty"` + CountDelta *uint64 `json:"countDelta,omitempty"` + Quantiles map[string]float64 `json:"quantiles,omitempty"` } func NewBlockMetrics() *BlockMetrics { return &BlockMetrics{ - BlockNumber: 0, - prevMetrics: make(map[string]*io_prometheus_client.Metric), - ExecutionMetrics: make(map[string]interface{}), - Timestamp: time.Now(), + BlockNumber: 0, + prevMetrics: make(map[string]*io_prometheus_client.Metric), + ExecutionMetrics: make(map[string]interface{}), + PrometheusMetrics: make([]PrometheusMetricSample, 0), + Timestamp: time.Now(), } } @@ -43,12 +61,45 @@ func (m *BlockMetrics) Copy() *BlockMetrics { newPrevMetrics := make(map[string]*io_prometheus_client.Metric) maps.Copy(newMetrics, m.ExecutionMetrics) maps.Copy(newPrevMetrics, m.prevMetrics) + newPrometheusMetrics := make([]PrometheusMetricSample, len(m.PrometheusMetrics)) + for i, sample := range m.PrometheusMetrics { + newPrometheusMetrics[i] = sample.Copy() + } return &BlockMetrics{ - BlockNumber: m.BlockNumber, - prevMetrics: newPrevMetrics, - ExecutionMetrics: newMetrics, - Timestamp: m.Timestamp, + BlockNumber: m.BlockNumber, + prevMetrics: newPrevMetrics, + ExecutionMetrics: newMetrics, + PrometheusMetrics: newPrometheusMetrics, + Timestamp: m.Timestamp, + } +} + +func (s PrometheusMetricSample) Copy() PrometheusMetricSample { + copied := s + if s.Labels != nil { + copied.Labels = make(map[string]string, len(s.Labels)) + maps.Copy(copied.Labels, s.Labels) } + if s.Quantiles != nil { + copied.Quantiles = make(map[string]float64, len(s.Quantiles)) + maps.Copy(copied.Quantiles, s.Quantiles) + } + return copied +} + +func (m *BlockMetrics) SetPreviousPrometheusMetrics(prev map[string]*io_prometheus_client.Metric) { + m.prevMetrics = make(map[string]*io_prometheus_client.Metric, len(prev)) + maps.Copy(m.prevMetrics, prev) +} + +func (m *BlockMetrics) PreviousPrometheusMetrics() map[string]*io_prometheus_client.Metric { + prevMetrics := make(map[string]*io_prometheus_client.Metric, len(m.prevMetrics)) + maps.Copy(prevMetrics, m.prevMetrics) + return prevMetrics +} + +func (m *BlockMetrics) PreviousPrometheusMetric(name string) *io_prometheus_client.Metric { + return m.prevMetrics[name] } func (m *BlockMetrics) UpdatePrometheusMetric(name string, value *io_prometheus_client.Metric) error { @@ -143,6 +194,10 @@ func (m *BlockMetrics) AddExecutionMetric(name string, value interface{}) { m.ExecutionMetrics[name] = value } +func (m *BlockMetrics) AddPrometheusMetricSample(sample PrometheusMetricSample) { + m.PrometheusMetrics = append(m.PrometheusMetrics, sample) +} + func (m *BlockMetrics) GetMetricTypes() map[string]bool { return map[string]bool{ "execution": true, diff --git a/runner/network/consensus/client.go b/runner/network/consensus/client.go index a2308230..76cb5d8b 100644 --- a/runner/network/consensus/client.go +++ b/runner/network/consensus/client.go @@ -23,6 +23,8 @@ type ConsensusClientOptions struct { GasLimitSetup uint64 // ParallelTxBatches is the number of parallel batches for sending transactions ParallelTxBatches int + // ConsensusTimingMode controls how FCU and getPayload calls are scheduled. + ConsensusTimingMode string } // BaseConsensusClient contains common functionality shared between different consensus client implementations. diff --git a/runner/network/consensus/sequencer_consensus.go b/runner/network/consensus/sequencer_consensus.go index 2e2ed3c9..81faaed4 100644 --- a/runner/network/consensus/sequencer_consensus.go +++ b/runner/network/consensus/sequencer_consensus.go @@ -112,7 +112,7 @@ func marshalBinaryWithSignature(info *derive.L1BlockInfo, signature []byte) ([]b return w.Bytes(), nil } -func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]byte, isSetupPayload bool, nextBoundary time.Time, blockTime time.Duration) (*eth.PayloadAttributes, *common.Hash, error) { +func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]byte, isSetupPayload bool, timestamp uint64) (*eth.PayloadAttributes, *common.Hash, error) { gasLimit := eth.Uint64Quantity(f.options.GasLimit) if isSetupPayload { gasLimit = eth.Uint64Quantity(f.options.GasLimitSetup) @@ -121,12 +121,6 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by var b8 eth.Bytes8 copy(b8[:], eip1559.EncodeHolocene1559Params(50, 1)) - // Use nextBoundary (the block-time-aligned wall-clock boundary we slept to) plus - // one block time as the block timestamp. This guarantees the FCU always arrives - // at the same point relative to the block deadline, eliminating the jitter that - // causes "FCU arrived too late" and empty blocks when sendTxs takes variable time. - timestamp := uint64(nextBoundary.Add(blockTime).Unix()) - number := uint64(0) time := uint64(0) baseFee := big.NewInt(1) @@ -210,6 +204,29 @@ func (f *SequencerConsensusClient) generatePayloadAttributes(sequencerTxs [][]by return payloadAttrs, &root, nil } +func nextPayloadTimestamp(lastTimestamp uint64, now time.Time, blockTime time.Duration) uint64 { + blockTimeSeconds := uint64(blockTime / time.Second) + if blockTimeSeconds == 0 { + blockTimeSeconds = 1 + } + + // Match the sequencer cadence when possible: the next payload timestamp is + // one block time after the parent. If transaction draining made that slot + // too close to wall clock, skip ahead so the builder still has time to work. + timestamp := lastTimestamp + blockTimeSeconds + minLead := blockTime / 2 + if minLead <= 0 { + minLead = time.Second + } + + minDeadline := now.Add(minLead) + for time.Unix(int64(timestamp), 0).Before(minDeadline) { + timestamp += blockTimeSeconds + } + + return timestamp +} + // Propose starts block generation, waits BlockTime, and generates a block. func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *metrics.BlockMetrics, isSetupPayload bool) (*engine.ExecutableData, error) { startTime := time.Now() @@ -265,16 +282,32 @@ func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *me f.log.Info("Sent transactions", "duration", duration, "num_txs", len(sendTxs)) blockMetrics.AddExecutionMetric(networktypes.SendTxsLatencyMetric, duration) - now := time.Now() - nextBoundary := now.Truncate(f.options.BlockTime).Add(f.options.BlockTime) - sleepDuration := time.Until(nextBoundary) - f.log.Info("Aligning to next block time boundary before FCU", "sleep", sleepDuration, "block_time", f.options.BlockTime) - time.Sleep(sleepDuration) - startBlockBuildingTime := time.Now() + var payloadTimestamp uint64 + var blockDeadline time.Time + if f.options.ConsensusTimingMode == networktypes.ConsensusTimingModeBaseConsensus { + payloadTimestamp = nextPayloadTimestamp(f.lastTimestamp, startBlockBuildingTime, f.options.BlockTime) + blockDeadline = time.Unix(int64(payloadTimestamp), 0) + } else { + nextBoundary := startBlockBuildingTime.Truncate(f.options.BlockTime).Add(f.options.BlockTime) + sleepDuration := time.Until(nextBoundary) + f.log.Info("Aligning to next block time boundary before FCU", "sleep", sleepDuration, "block_time", f.options.BlockTime) + if sleepDuration > 0 { + time.Sleep(sleepDuration) + } + startBlockBuildingTime = time.Now() + + // Use nextBoundary (the block-time-aligned wall-clock boundary we slept to) plus + // one block time as the block timestamp. This guarantees the FCU always arrives + // at the same point relative to the block deadline, eliminating the jitter that + // causes "FCU arrived too late" and empty blocks when sendTxs takes variable time. + blockDeadline = nextBoundary.Add(f.options.BlockTime) + payloadTimestamp = uint64(blockDeadline.Unix()) + } + f.log.Info("Starting block building") - payloadAttrs, beaconRoot, err := f.generatePayloadAttributes(sequencerTxs, isSetupPayload, nextBoundary, f.options.BlockTime) + payloadAttrs, beaconRoot, err := f.generatePayloadAttributes(sequencerTxs, isSetupPayload, payloadTimestamp) if err != nil { return nil, errors.Wrap(err, "failed to generate payload attributes") } @@ -292,10 +325,11 @@ func (f *SequencerConsensusClient) Propose(ctx context.Context, blockMetrics *me blockMetrics.AddExecutionMetric(networktypes.UpdateForkChoiceLatencyMetric, duration) f.currentPayloadID = payloadID - blockDeadline := nextBoundary.Add(f.options.BlockTime) waitDuration := time.Until(blockDeadline) f.log.Info("Waiting for block deadline", "wait", waitDuration) - time.Sleep(waitDuration) + if waitDuration > 0 { + time.Sleep(waitDuration) + } startTime = time.Now() diff --git a/runner/network/consensus/sequencer_consensus_test.go b/runner/network/consensus/sequencer_consensus_test.go new file mode 100644 index 00000000..54c8421f --- /dev/null +++ b/runner/network/consensus/sequencer_consensus_test.go @@ -0,0 +1,36 @@ +package consensus + +import ( + "testing" + "time" +) + +func TestNextPayloadTimestampUsesNextBlockTime(t *testing.T) { + now := time.Unix(100, int64(100*time.Millisecond)) + + timestamp := nextPayloadTimestamp(100, now, 2*time.Second) + + if timestamp != 102 { + t.Fatalf("expected next payload timestamp 102, got %d", timestamp) + } +} + +func TestNextPayloadTimestampSkipsTooCloseSlot(t *testing.T) { + now := time.Unix(101, int64(250*time.Millisecond)) + + timestamp := nextPayloadTimestamp(100, now, 2*time.Second) + + if timestamp != 104 { + t.Fatalf("expected next payload timestamp 104, got %d", timestamp) + } +} + +func TestNextPayloadTimestampCatchesUpFromWallClock(t *testing.T) { + now := time.Unix(120, 0) + + timestamp := nextPayloadTimestamp(100, now, 2*time.Second) + + if timestamp != 122 { + t.Fatalf("expected next payload timestamp 122, got %d", timestamp) + } +} diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 37b336c7..a9b41ee7 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -279,6 +279,17 @@ func (nb *NetworkBenchmark) GetResult() (*benchmark.RunResult, error) { result.ValidatorMetrics = nb.collectedValidatorMetrics } + artifacts := make(map[string]string) + if nb.testConfig.LoadTestOutputPath != "" { + if _, err := os.Stat(nb.testConfig.LoadTestOutputPath); err == nil { + artifacts[benchmark.LoadTestResultArtifactKey] = benchmark.LoadTestResultFileName + } + } + if len(artifacts) == 0 { + artifacts = nil + } + result.Artifacts = artifacts + return result, nil } diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index 29aa8c28..5562df2c 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -15,6 +15,7 @@ import ( "github.com/base/base-bench/runner/network/proofprogram/fakel1" benchtypes "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + payloadworker "github.com/base/base-bench/runner/payload/worker" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -24,6 +25,37 @@ import ( "github.com/pkg/errors" ) +const gracefulWorkerShutdownTimeout = 90 * time.Second + +type benchmarkRunController struct { + maxBlocks int + completion payloadworker.CompletionWorker +} + +func newBenchmarkRunController(transactionWorker payloadworker.Worker, params benchtypes.RunParams) benchmarkRunController { + completion, ok := transactionWorker.(payloadworker.CompletionWorker) + if ok { + return benchmarkRunController{completion: completion} + } + return benchmarkRunController{maxBlocks: params.NumBlocks} +} + +func (c benchmarkRunController) shouldStop(nextBlockIndex uint64) (bool, error) { + if c.completion != nil { + select { + case <-c.completion.Done(): + return true, c.completion.Err() + default: + return false, nil + } + } + return int(nextBlockIndex) > c.maxBlocks, nil +} + +func (c benchmarkRunController) usesWorkerCompletion() bool { + return c.completion != nil +} + type sequencerBenchmark struct { log log.Logger sequencerClient types.ExecutionClient @@ -204,10 +236,11 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. go func() { consensusClient := consensus.NewSequencerConsensusClient(nb.log, sequencerClient.Client(), sequencerClient.AuthClient(), mempool, consensus.ConsensusClientOptions{ - BlockTime: params.BlockTime, - GasLimit: params.GasLimit, - GasLimitSetup: 1e9, // 1G gas - ParallelTxBatches: nb.config.Config.ParallelTxBatches(), + BlockTime: params.BlockTime, + GasLimit: params.GasLimit, + GasLimitSetup: 1e9, // 1G gas + ParallelTxBatches: nb.config.Config.ParallelTxBatches(), + ConsensusTimingMode: params.ConsensusTimingMode, }, headBlockHash, headBlockNumber, l1Chain, nb.config.BatcherAddr()) payloads := make([]engine.ExecutableData, 0) @@ -236,54 +269,53 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. payloads = append(payloads, *lastSetupPayload) - blockMetrics := metrics.NewBlockMetrics() - pendingTxs := 0 + runController := newBenchmarkRunController(transactionWorker, params) + if runController.usesWorkerCompletion() { + nb.log.Info("Running benchmark blocks until payload worker completes") + } - // run for a few blocks - for i := 0; i < params.NumBlocks; i++ { - blockMetrics.SetBlockNumber(uint64(i) + 1) - txsSent, err := transactionWorker.SendTxs(benchmarkCtx, pendingTxs) + blockIndex := uint64(1) + for { + stop, err := runController.shouldStop(blockIndex) if err != nil { - nb.log.Warn("failed to send transactions", "err", err) - errChan <- err + errChan <- errors.Wrap(err, "payload worker failed") return } + if stop { + if runController.usesWorkerCompletion() { + nb.log.Info("Payload worker completed", "blocks", blockIndex-1) + } + break + } - payload, err := consensusClient.Propose(benchmarkCtx, blockMetrics, false) + payload, updatedPendingTxs, err := nb.proposeBlock( + benchmarkCtx, + transactionWorker, + consensusClient, + metricsCollector, + blockIndex, + pendingTxs, + false, + true, + ) if err != nil { errChan <- err return } + pendingTxs = updatedPendingTxs + payloads = append(payloads, *payload) + blockIndex++ + } - if payload == nil { - errChan <- errors.New("received nil payload from consensus client") + if !runController.usesWorkerCompletion() { + if err := nb.settleGracefulWorkerShutdown(benchmarkCtx, transactionWorker, consensusClient, pendingTxs); err != nil { + errChan <- err return } - - // Track how many user txs are still pending in the node's mempool. - // payload.Transactions includes the L1 info deposit tx, so user txs = total - 1. - userTxsIncluded := len(payload.Transactions) - 1 - if userTxsIncluded < 0 { - userTxsIncluded = 0 - } - pendingTxs = pendingTxs + txsSent - userTxsIncluded - if pendingTxs < 0 { - pendingTxs = 0 - } - - log.Info("Sleeping for block time", "block_time", params.BlockTime) - time.Sleep(params.BlockTime) - - err = metricsCollector.Collect(benchmarkCtx, blockMetrics) - if err != nil { - nb.log.Error("Failed to collect metrics", "error", err) - } - payloads = append(payloads, *payload) } - err = consensusClient.Stop(benchmarkCtx) - if err != nil { + if err := consensusClient.Stop(benchmarkCtx); err != nil { nb.log.Warn("failed to stop consensus client", "err", err) } @@ -309,6 +341,102 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. } } +func (nb *sequencerBenchmark) proposeBlock( + ctx context.Context, + transactionWorker payloadworker.Worker, + consensusClient *consensus.SequencerConsensusClient, + metricsCollector metrics.Collector, + blockIndex uint64, + pendingTxs int, + isSetupPayload bool, + collectMetrics bool, +) (*engine.ExecutableData, int, error) { + blockMetrics := metrics.NewBlockMetrics() + blockMetrics.SetBlockNumber(blockIndex) + + txsSent, err := transactionWorker.SendTxs(ctx, pendingTxs) + if err != nil { + nb.log.Warn("failed to send transactions", "err", err) + return nil, pendingTxs, err + } + + payload, err := consensusClient.Propose(ctx, blockMetrics, isSetupPayload) + if err != nil { + return nil, pendingTxs, err + } + if payload == nil { + return nil, pendingTxs, errors.New("received nil payload from consensus client") + } + + // Track how many user txs are still pending in the node's mempool. + // payload.Transactions includes the L1 info deposit tx, so user txs = total - 1. + userTxsIncluded := len(payload.Transactions) - 1 + if userTxsIncluded < 0 { + userTxsIncluded = 0 + } + updatedPendingTxs := pendingTxs + txsSent - userTxsIncluded + if updatedPendingTxs < 0 { + updatedPendingTxs = 0 + } + + if !nb.config.Params.UseBaseConsensusTiming() { + log.Info("Sleeping for block time", "block_time", nb.config.Params.BlockTime) + time.Sleep(nb.config.Params.BlockTime) + } + + if collectMetrics { + if err := metricsCollector.Collect(ctx, blockMetrics); err != nil { + nb.log.Error("Failed to collect metrics", "error", err) + } + } + + return payload, updatedPendingTxs, nil +} + +func (nb *sequencerBenchmark) settleGracefulWorkerShutdown( + ctx context.Context, + transactionWorker payloadworker.Worker, + consensusClient *consensus.SequencerConsensusClient, + pendingTxs int, +) error { + gracefulWorker, ok := transactionWorker.(payloadworker.GracefulShutdownWorker) + if !ok { + return nil + } + + if err := gracefulWorker.BeginGracefulShutdown(ctx); err != nil { + return errors.Wrap(err, "failed to begin graceful payload worker shutdown") + } + + timeout := time.NewTimer(gracefulWorkerShutdownTimeout) + defer timeout.Stop() + + settlementBlock := 0 + for { + select { + case <-gracefulWorker.Done(): + nb.log.Info("Payload worker stopped gracefully", "settlement_blocks", settlementBlock) + return nil + case <-timeout.C: + nb.log.Warn("Timed out waiting for payload worker to stop gracefully", "settlement_blocks", settlementBlock) + return nil + default: + } + + var payload *engine.ExecutableData + var err error + payload, pendingTxs, err = nb.proposeBlock(ctx, transactionWorker, consensusClient, nil, uint64(settlementBlock+1), pendingTxs, true, false) + if err != nil { + return errors.Wrap(err, "failed to propose settlement block") + } + if payload == nil { + return errors.New("received nil settlement payload from consensus client") + } + + settlementBlock++ + } +} + // flashblockCollector implements FlashblockListener to collect flashblocks. type flashblockCollector struct { log log.Logger diff --git a/runner/network/types/types.go b/runner/network/types/types.go index c219c72a..ee684344 100644 --- a/runner/network/types/types.go +++ b/runner/network/types/types.go @@ -40,6 +40,10 @@ type TestConfig struct { PrefundPrivateKey ecdsa.PrivateKey PrefundAmount big.Int + + // LoadTestOutputPath is the optional normal load-test report JSON path used + // by the load-test payload worker. + LoadTestOutputPath string } // BatcherAddr returns the batcher address, computing it if necessary @@ -77,6 +81,9 @@ type RunParams struct { // BlockTime is the time between blocks in the benchmark run. BlockTime time.Duration + // ConsensusTimingMode controls how the fake consensus client schedules FCU/getPayload calls. + ConsensusTimingMode string + // Env is the environment variables for the benchmark run. Env map[string]string @@ -93,6 +100,15 @@ type RunParams struct { ClientBinPath string } +const ( + ConsensusTimingModePreventLateFCU = "prevent-late-fcu" + ConsensusTimingModeBaseConsensus = "base-consensus" +) + +func (p RunParams) UseBaseConsensusTiming() bool { + return p.ConsensusTimingMode == ConsensusTimingModeBaseConsensus +} + func (p RunParams) ToConfig() map[string]interface{} { params := map[string]interface{}{ "NodeType": p.NodeType, @@ -108,6 +124,10 @@ func (p RunParams) ToConfig() map[string]interface{} { params["ValidatorNodeType"] = p.ValidatorNodeType } + if p.ConsensusTimingMode != "" { + params["ConsensusTimingMode"] = p.ConsensusTimingMode + } + for k, v := range p.Tags { params[k] = v } diff --git a/runner/payload/factory.go b/runner/payload/factory.go index b8e7176e..7894563f 100644 --- a/runner/payload/factory.go +++ b/runner/payload/factory.go @@ -38,7 +38,7 @@ func NewPayloadWorker(ctx context.Context, log log.Logger, testConfig *benchtype def = &loadtest.LoadTestPayloadDefinition{} } worker, err = loadtest.NewLoadTestPayloadWorker( - log, sequencerClient.ClientURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def) + log, sequencerClient.ClientURL(), sequencerClient.FlashblocksWsURL(), params, privateKey, amount, config, genesis.Config.ChainID, *def, testConfig.LoadTestOutputPath) case "transfer-only": worker, err = transferonly.NewTransferPayloadWorker( ctx, log, sequencerClient.ClientURL(), params, privateKey, amount, &genesis, definition.Params) diff --git a/runner/payload/loadtest/load_test_worker.go b/runner/payload/loadtest/load_test_worker.go index 5b6d62ff..2158ddde 100644 --- a/runner/payload/loadtest/load_test_worker.go +++ b/runner/payload/loadtest/load_test_worker.go @@ -3,14 +3,16 @@ package loadtest import ( "context" "crypto/ecdsa" - cryptorand "crypto/rand" "encoding/hex" "fmt" "math/big" "os" "os/exec" + "path/filepath" + "strconv" + "sync" + "time" - "github.com/base/base-bench/runner/clients/common/proxy" "github.com/base/base-bench/runner/config" "github.com/base/base-bench/runner/network/mempool" "github.com/base/base-bench/runner/network/types" @@ -21,70 +23,69 @@ import ( ) // LoadTestPayloadDefinition is the YAML payload params for the load-test type. -// Fields map directly to the Rust base-load-test config format. -// The `transactions` field is passed through as raw YAML to support the full -// Rust config schema (transfer, calldata, precompile, erc20, etc.). +// The load-test workload itself lives in a native base-load-tester config file; +// benchmark mode overlays the RPC fields it must control and overlays target_gps +// only when the benchmark matrix specifies one. type LoadTestPayloadDefinition struct { - SenderCount uint64 `yaml:"sender_count"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` -} - -// loadTestConfig is the YAML config written to a temp file for the load-test binary. -type loadTestConfig struct { - RPC string `yaml:"rpc"` - SenderCount uint64 `yaml:"sender_count"` - TargetGPS uint64 `yaml:"target_gps"` - Duration string `yaml:"duration"` - Seed uint64 `yaml:"seed"` - FundingAmount string `yaml:"funding_amount"` - Transactions yaml.Node `yaml:"transactions"` + ConfigFile string `yaml:"config_file"` + Network string `yaml:"network"` } type loadTestPayloadWorker struct { - log log.Logger - prefundSK string - loadTestBin string - elRPCURL string - gasLimit uint64 - blockTimeSec uint64 - params LoadTestPayloadDefinition - mempool *mempool.StaticWorkloadMempool - proxyServer *proxy.ProxyServer - cmd *exec.Cmd - configFilePath string + log log.Logger + prefundSK string + loadTestBin string + elRPCURL string + flashblocksURL string + gasLimit uint64 + blockTime time.Duration + params LoadTestPayloadDefinition + mempool *mempool.StaticWorkloadMempool + cmd *exec.Cmd + done chan struct{} + startOnce sync.Once + shutdownOnce sync.Once + waitErrMu sync.Mutex + waitErr error + sourceConfigPath string + renderedConfigPath string + outputPath string } // NewLoadTestPayloadWorker creates a worker that runs the base-load-test binary -// as an external transaction generator, capturing transactions via a proxy server. +// as an external transaction generator against the benchmark node's RPC. func NewLoadTestPayloadWorker( log log.Logger, elRPCURL string, + flashblocksURL string, params types.RunParams, prefundedPrivateKey ecdsa.PrivateKey, prefundAmount *big.Int, cfg config.Config, chainID *big.Int, definition LoadTestPayloadDefinition, + outputPath string, ) (worker.Worker, error) { mp := mempool.NewStaticWorkloadMempool(log, chainID) - ps := proxy.NewProxyServer(elRPCURL, log, cfg.ProxyPort(), mp) - blockTimeSec := uint64(params.BlockTime.Seconds()) - if blockTimeSec == 0 { - blockTimeSec = 1 + sourceConfigPath, err := resolveConfigFilePath(cfg.ConfigPath(), definition.ConfigFile) + if err != nil { + return nil, err } w := &loadTestPayloadWorker{ - log: log, - prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), - loadTestBin: cfg.LoadTestBinary(), - elRPCURL: elRPCURL, - gasLimit: params.GasLimit, - blockTimeSec: blockTimeSec, - params: definition, - mempool: mp, - proxyServer: ps, + log: log, + prefundSK: hex.EncodeToString(prefundedPrivateKey.D.Bytes()), + loadTestBin: cfg.LoadTestBinary(), + elRPCURL: elRPCURL, + flashblocksURL: flashblocksURL, + gasLimit: params.GasLimit, + blockTime: params.BlockTime, + params: definition, + mempool: mp, + done: make(chan struct{}), + sourceConfigPath: sourceConfigPath, + outputPath: outputPath, } return w, nil @@ -95,47 +96,119 @@ func (w *loadTestPayloadWorker) Mempool() mempool.FakeMempool { } func (w *loadTestPayloadWorker) Setup(ctx context.Context) error { - if err := w.proxyServer.Run(ctx); err != nil { - return errors.Wrap(err, "failed to run proxy server") - } - configPath, err := w.writeConfig() if err != nil { return errors.Wrap(err, "failed to write load-test config") } - w.configFilePath = configPath + w.renderedConfigPath = configPath - w.log.Info("Starting load test", "binary", w.loadTestBin, "config", configPath) + w.log.Info("Prepared load test", "binary", w.loadTestBin, "config", configPath) + return nil +} - cmd := exec.CommandContext(ctx, w.loadTestBin, configPath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stdout - cmd.Env = append(os.Environ(), fmt.Sprintf("FUNDER_KEY=%s", w.prefundSK)) +func (w *loadTestPayloadWorker) start(ctx context.Context) error { + w.startOnce.Do(func() { + if w.renderedConfigPath == "" { + w.finish(errors.New("load-test config has not been prepared")) + return + } - if err := cmd.Start(); err != nil { - return errors.Wrap(err, "failed to start load test binary") + w.log.Info("Starting load test", "binary", w.loadTestBin, "config", w.renderedConfigPath) + + cmd := exec.CommandContext(ctx, w.loadTestBin, w.renderedConfigPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stdout + cmd.Env = append(os.Environ(), fmt.Sprintf("FUNDER_KEY=%s", w.prefundSK)) + if w.outputPath != "" { + if err := os.MkdirAll(filepath.Dir(w.outputPath), 0755); err != nil { + w.finish(errors.Wrap(err, "failed to create load-test output directory")) + return + } + cmd.Env = append(cmd.Env, fmt.Sprintf("LOAD_TEST_OUTPUT=%s", w.outputPath)) + } + + if err := cmd.Start(); err != nil { + w.finish(errors.Wrap(err, "failed to start load test binary")) + return + } + w.cmd = cmd + go func() { + w.finish(cmd.Wait()) + }() + }) + + return w.Err() +} + +func (w *loadTestPayloadWorker) BeginGracefulShutdown(ctx context.Context) error { + if w.cmd == nil || w.cmd.Process == nil { + return nil } - w.cmd = cmd - return nil + select { + case <-ctx.Done(): + return ctx.Err() + case <-w.Done(): + return nil + default: + } + + var signalErr error + w.shutdownOnce.Do(func() { + w.log.Info("Stopping load test process gracefully", "pid", w.cmd.Process.Pid, "output", w.outputPath) + signalErr = w.cmd.Process.Signal(os.Interrupt) + }) + if signalErr != nil { + select { + case <-w.Done(): + return nil + default: + } + } + return signalErr +} + +func (w *loadTestPayloadWorker) Done() <-chan struct{} { + return w.done +} + +func (w *loadTestPayloadWorker) Err() error { + w.waitErrMu.Lock() + defer w.waitErrMu.Unlock() + return w.waitErr +} + +func (w *loadTestPayloadWorker) finish(err error) { + w.waitErrMu.Lock() + w.waitErr = err + w.waitErrMu.Unlock() + close(w.done) } func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { if w.cmd != nil && w.cmd.Process != nil { - w.log.Info("Stopping load test process", "pid", w.cmd.Process.Pid) - if err := w.cmd.Process.Kill(); err != nil { - w.log.Warn("failed to kill load test process", "err", err) - } else { - // Reap the process to avoid zombies. - _, _ = w.cmd.Process.Wait() + if err := w.BeginGracefulShutdown(ctx); err != nil { + w.log.Warn("failed to signal load test process", "err", err) } - } - w.proxyServer.Stop() + select { + case <-w.Done(): + case <-time.After(10 * time.Second): + w.log.Warn("load test process did not stop gracefully, killing", "pid", w.cmd.Process.Pid) + if err := w.cmd.Process.Kill(); err != nil { + w.log.Warn("failed to kill load test process", "err", err) + } + select { + case <-w.Done(): + case <-time.After(5 * time.Second): + w.log.Warn("timed out waiting for killed load test process") + } + } + } - if w.configFilePath != "" { - if err := os.Remove(w.configFilePath); err != nil { - w.log.Warn("failed to remove load-test config", "path", w.configFilePath, "err", err) + if w.renderedConfigPath != "" { + if err := os.Remove(w.renderedConfigPath); err != nil { + w.log.Warn("failed to remove load-test config", "path", w.renderedConfigPath, "err", err) } } @@ -143,80 +216,103 @@ func (w *loadTestPayloadWorker) Stop(ctx context.Context) error { } func (w *loadTestPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { - w.log.Info("Collecting txs from load test") - pendingTxs := w.proxyServer.PendingTxs() - w.proxyServer.ClearPendingTxs() - - w.mempool.AddTransactions(pendingTxs) - return len(pendingTxs), nil -} - -// defaultTransactions returns the default transaction mix as a yaml.Node. -func defaultTransactions() yaml.Node { - var node yaml.Node - // Default: 70% transfer, 20% calldata, 10% precompile - defaultYAML := ` -- weight: 70 - type: transfer -- weight: 20 - type: calldata - max_size: 256 -- weight: 10 - type: precompile - target: sha256 -` - if err := yaml.Unmarshal([]byte(defaultYAML), &node); err != nil { - panic(fmt.Sprintf("failed to parse default transactions YAML: %v", err)) - } - // yaml.Unmarshal wraps in a document node; return the inner sequence - if node.Kind == yaml.DocumentNode && len(node.Content) > 0 { - return *node.Content[0] + if err := w.start(ctx); err != nil { + return 0, err } - return node + return 0, nil } -// randomSeed returns a cryptographically random uint64 seed. -func randomSeed() uint64 { - var b [8]byte - if _, err := cryptorand.Read(b[:]); err != nil { - return 42 +func resolveConfigFilePath(benchmarkConfigPath string, loadTestConfigPath string) (string, error) { + if loadTestConfigPath == "" { + return "", errors.New("load-test payload requires config_file") + } + if filepath.IsAbs(loadTestConfigPath) { + return loadTestConfigPath, nil } - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return filepath.Join(filepath.Dir(benchmarkConfigPath), loadTestConfigPath), nil } -// writeConfig generates a temporary YAML config file for the load-test binary -// with the RPC URL pointing to the proxy server. -func (w *loadTestPayloadWorker) writeConfig() (string, error) { - senderCount := w.params.SenderCount - if senderCount == 0 { - senderCount = 10 +func (w *loadTestPayloadWorker) buildConfig() (*yaml.Node, error) { + data, err := os.ReadFile(w.sourceConfigPath) + if err != nil { + return nil, errors.Wrap(err, "failed to read load-test config file") + } + + var doc yaml.Node + if err := yaml.Unmarshal(data, &doc); err != nil { + return nil, errors.Wrap(err, "failed to parse load-test config file") } - fundingAmount := w.params.FundingAmount - if fundingAmount == "" { - fundingAmount = "10000000000000000000" + config, err := mappingRoot(&doc) + if err != nil { + return nil, err } - // Compute target GPS from gas limit and block time - targetGPS := w.gasLimit / w.blockTimeSec + setMappingValue(config, "transaction_submission_rpcs", stringSequenceNode(w.elRPCURL)) + setMappingValue(config, "query_rpc", stringNode(w.elRPCURL)) - transactions := w.params.Transactions - if transactions.Kind == 0 { - transactions = defaultTransactions() + flashblocksURL := w.flashblocksURL + if flashblocksURL == "" { + flashblocksURL = "ws://localhost:7111" + } + setMappingValue(config, "flashblocks_ws", stringNode(flashblocksURL)) + if w.blockTime > 0 && w.gasLimit > 0 { + targetGPS := w.gasLimit / uint64(w.blockTime.Seconds()) + setMappingValue(config, "target_gps", uintNode(targetGPS)) } - config := loadTestConfig{ - RPC: w.proxyServer.ClientURL(), - SenderCount: senderCount, - TargetGPS: targetGPS, - Duration: "99999s", - Seed: randomSeed(), - FundingAmount: fundingAmount, - Transactions: transactions, + return config, nil +} + +func mappingRoot(doc *yaml.Node) (*yaml.Node, error) { + root := doc + if doc.Kind == yaml.DocumentNode { + if len(doc.Content) == 0 { + return nil, errors.New("load-test config file is empty") + } + root = doc.Content[0] } - data, err := yaml.Marshal(&config) + if root.Kind != yaml.MappingNode { + return nil, fmt.Errorf("load-test config file must be a YAML mapping, got kind %d", root.Kind) + } + return root, nil +} + +func setMappingValue(mapping *yaml.Node, key string, value *yaml.Node) { + for i := 0; i < len(mapping.Content)-1; i += 2 { + if mapping.Content[i].Value == key { + mapping.Content[i+1] = value + return + } + } + mapping.Content = append(mapping.Content, stringNode(key), value) +} + +func stringNode(value string) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!str", Value: value} +} + +func uintNode(value uint64) *yaml.Node { + return &yaml.Node{Kind: yaml.ScalarNode, Tag: "!!int", Value: strconv.FormatUint(value, 10)} +} + +func stringSequenceNode(values ...string) *yaml.Node { + node := &yaml.Node{Kind: yaml.SequenceNode, Tag: "!!seq"} + for _, value := range values { + node.Content = append(node.Content, stringNode(value)) + } + return node +} + +// writeConfig generates a temporary YAML config file for the load-test binary +// with benchmark-controlled RPC, timing, and report fields. +func (w *loadTestPayloadWorker) writeConfig() (string, error) { + config, err := w.buildConfig() + if err != nil { + return "", err + } + data, err := yaml.Marshal(config) if err != nil { return "", errors.Wrap(err, "failed to marshal load-test config") } @@ -236,10 +332,9 @@ func (w *loadTestPayloadWorker) writeConfig() (string, error) { } w.log.Info("Generated load-test config", - "sender_count", senderCount, - "target_gps", targetGPS, + "source_config", w.sourceConfigPath, "gas_limit", w.gasLimit, - "block_time_sec", w.blockTimeSec, + "block_time", w.blockTime, ) return tmpFile.Name(), nil diff --git a/runner/payload/loadtest/load_test_worker_test.go b/runner/payload/loadtest/load_test_worker_test.go new file mode 100644 index 00000000..a0f706d4 --- /dev/null +++ b/runner/payload/loadtest/load_test_worker_test.go @@ -0,0 +1,194 @@ +package loadtest + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestBuildConfigOverlaysBenchmarkFieldsAndPreservesLoadTestConfig(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "mainnet-state-weth-usdc-swaps.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +flashblocks_ws: "ws://standalone-flashblocks.invalid" +target_gps: 123 +duration: "60s" +chain_id: 8453 +sender_count: 250 +in_flight_per_sender: 64 +batch_size: 20 +batch_timeout: "10ms" +seed: 654789 +funding_amount: "200000000000000000" +real_token_setup: + enabled: true + allow_chain_id_8453: true + weth: "0x4200000000000000000000000000000000000006" + weth_amount_per_sender: "50000000000000000" + pair_token: + token: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + amount_per_sender: "10000000" + acquisition: + type: uniswap_v3_exact_input + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + fee: 500 + amount_in: "10000000000000000" + min_amount_out: "0" +transactions: + - weight: 50 + type: uniswap_v3 + router: "0x2626664c2603336E57B271c5C0b26F421741e481" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + fee: 500 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" + - weight: 50 + type: aerodrome_cl + router: "0xBE6D8f0d05cC4be24d5167a3eF062215bE6D18a5" + token_in: "0x4200000000000000000000000000000000000006" + token_out: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" + tick_spacing: 100 + min_amount: "10000000000000" + max_amount: "100000000000000" + reverse_min_amount: "100000" + reverse_max_amount: "1000000" +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + flashblocksURL: "ws://benchmark-flashblocks.example", + gasLimit: 150_000_000, + blockTime: 2 * time.Second, + elRPCURL: "http://sequencer.example", + sourceConfigPath: configPath, + } + + config, err := worker.buildConfig() + require.NoError(t, err) + + encoded, err := yaml.Marshal(config) + require.NoError(t, err) + output := string(encoded) + + for _, want := range []string{ + "transaction_submission_rpcs:\n - http://sequencer.example", + "query_rpc: http://sequencer.example", + "flashblocks_ws: ws://benchmark-flashblocks.example", + "target_gps: 75000000", + "duration: \"60s\"", + "chain_id: 8453", + "sender_count: 250", + "in_flight_per_sender: 64", + "batch_size: 20", + "batch_timeout: \"10ms\"", + "seed: 654789", + "real_token_setup:", + "allow_chain_id_8453: true", + "type: uniswap_v3", + "type: aerodrome_cl", + "reverse_min_amount: \"100000\"", + } { + require.Contains(t, output, want) + } + for _, oldValue := range []string{ + "standalone-submitter.invalid", + "standalone-query.invalid", + "standalone-flashblocks.invalid", + "target_gps: 123", + } { + require.NotContains(t, output, oldValue) + } +} + +func TestBuildConfigPreservesNativeTargetGPSWhenGasLimitOrBlockTimeIsZero(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "load-test.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +flashblocks_ws: "ws://standalone-flashblocks.invalid" +target_gps: 123 +duration: "60s" +transactions: + - weight: 100 + type: transfer +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + flashblocksURL: "ws://benchmark-flashblocks.example", + elRPCURL: "http://sequencer.example", + sourceConfigPath: configPath, + } + + config, err := worker.buildConfig() + require.NoError(t, err) + + encoded, err := yaml.Marshal(config) + require.NoError(t, err) + output := string(encoded) + + require.Contains(t, output, "target_gps: 123") + require.Contains(t, output, "duration: \"60s\"") +} + +func TestSetupPreparesConfigWithoutStartingProcess(t *testing.T) { + configPath := filepath.Join(t.TempDir(), "load-test.yaml") + err := os.WriteFile(configPath, []byte(` +transaction_submission_rpcs: + - "http://standalone-submitter.invalid" +query_rpc: "http://standalone-query.invalid" +duration: "60s" +transactions: + - weight: 100 + type: transfer +`), 0644) + require.NoError(t, err) + + worker := &loadTestPayloadWorker{ + log: log.New(), + elRPCURL: "http://sequencer.example", + sourceConfigPath: configPath, + done: make(chan struct{}), + } + t.Cleanup(func() { + if worker.renderedConfigPath != "" { + require.NoError(t, os.Remove(worker.renderedConfigPath)) + } + }) + + require.NoError(t, worker.Setup(context.Background())) + require.NotEmpty(t, worker.renderedConfigPath) + require.Nil(t, worker.cmd) + + select { + case <-worker.Done(): + t.Fatal("load-test worker should not be done before it starts") + default: + } +} + +func TestResolveConfigFilePath(t *testing.T) { + resolved, err := resolveConfigFilePath("/tmp/configs/benchmark.yml", "load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/tmp/configs/load-tests/mainnet.yaml", resolved) + + resolved, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "/var/load-tests/mainnet.yaml") + require.NoError(t, err) + require.Equal(t, "/var/load-tests/mainnet.yaml", resolved) + + _, err = resolveConfigFilePath("/tmp/configs/benchmark.yml", "") + require.Error(t, err) + require.Contains(t, err.Error(), "config_file") +} diff --git a/runner/payload/txfuzz/tx_fuzz_worker.go b/runner/payload/txfuzz/tx_fuzz_worker.go index b5167d42..87fd5010 100644 --- a/runner/payload/txfuzz/tx_fuzz_worker.go +++ b/runner/payload/txfuzz/tx_fuzz_worker.go @@ -84,8 +84,7 @@ func (t *txFuzzPayloadWorker) Stop(ctx context.Context) error { func (t *txFuzzPayloadWorker) SendTxs(ctx context.Context, _ int) (int, error) { t.log.Info("Sending txs in tx-fuzz mode") - pending := t.proxyServer.PendingTxs() - t.proxyServer.ClearPendingTxs() + pending := t.proxyServer.DrainPendingTxs() t.mempool.AddTransactions(pending) return len(pending), nil diff --git a/runner/payload/worker/types.go b/runner/payload/worker/types.go index 69906ffe..942df001 100644 --- a/runner/payload/worker/types.go +++ b/runner/payload/worker/types.go @@ -18,3 +18,17 @@ type Worker interface { Stop(ctx context.Context) error Mempool() mempool.FakeMempool } + +// GracefulShutdownWorker can stop generating transactions while the benchmark +// sequencer keeps producing settlement blocks. +type GracefulShutdownWorker interface { + BeginGracefulShutdown(ctx context.Context) error + Done() <-chan struct{} +} + +// CompletionWorker owns its own run duration. The benchmark sequencer keeps +// producing blocks until Done closes, then treats Err as the worker result. +type CompletionWorker interface { + Done() <-chan struct{} + Err() error +} diff --git a/runner/service.go b/runner/service.go index 2d7f64fa..2d9e1e31 100644 --- a/runner/service.go +++ b/runner/service.go @@ -26,6 +26,7 @@ import ( "github.com/base/base-bench/runner/network" "github.com/base/base-bench/runner/network/types" "github.com/base/base-bench/runner/payload" + "github.com/base/base-bench/runner/payload/loadtest" "github.com/base/base-bench/runner/utils" "github.com/ethereum/go-ethereum/core" ethparams "github.com/ethereum/go-ethereum/params" @@ -373,6 +374,54 @@ func (s *service) setupBlobsDir(workingDir string) error { return nil } +func loadTestNetwork(genesis *core.Genesis, transactionPayload payload.Definition) string { + if envNetwork := os.Getenv("BASE_BENCH_LOAD_TEST_NETWORK"); envNetwork != "" { + return envNetwork + } + + if transactionPayload.Type == "load-test" { + if def, ok := transactionPayload.Params.(*loadtest.LoadTestPayloadDefinition); ok && def.Network != "" { + return def.Network + } + } + + if genesis == nil || genesis.Config == nil || genesis.Config.ChainID == nil { + return "unknown" + } + + switch genesis.Config.ChainID.Uint64() { + case 8453: + return "mainnet" + case 84532: + return "sepolia" + case 13371337: + return "devnet" + default: + return fmt.Sprintf("chain-%s", genesis.Config.ChainID.String()) + } +} + +func (s *service) loadTestOutputPath(genesis *core.Genesis, transactionPayload payload.Definition) string { + if transactionPayload.Type != "load-test" { + return "" + } + + network := loadTestNetwork(genesis, transactionPayload) + baseTime := time.Now().UTC() + for i := 0; ; i++ { + timestamp := baseTime.Add(time.Duration(i) * time.Second).Format(benchmark.LoadTestTimestampLayout) + outputPath := path.Join( + s.config.OutputDir(), + benchmark.LoadTestResultsDir, + network, + fmt.Sprintf("%s.json", timestamp), + ) + if _, err := os.Stat(outputPath); err != nil { + return outputPath + } + } +} + func (s *service) runTest(ctx context.Context, params types.RunParams, workingDir string, outputDir string, snapshotConfig *benchmark.SnapshotDefinition, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, datadirsConfig *benchmark.DatadirConfig, mode benchmark.BenchmarkExecutionMode, flashblocksBlockTime string) (*benchmark.RunResult, error) { s.log.Info(fmt.Sprintf("Running benchmark with params: %+v", params)) @@ -428,12 +477,13 @@ func (s *service) runTest(ctx context.Context, params types.RunParams, workingDi prefundAmount := new(big.Int).Mul(big.NewInt(1e6), big.NewInt(ethparams.Ether)) config := &types.TestConfig{ - Params: params, - Config: s.config, - Genesis: *genesis, - BatcherKey: *batcherKey, - PrefundPrivateKey: *prefundKey, - PrefundAmount: *prefundAmount, + Params: params, + Config: s.config, + Genesis: *genesis, + BatcherKey: *batcherKey, + PrefundPrivateKey: *prefundKey, + PrefundAmount: *prefundAmount, + LoadTestOutputPath: s.loadTestOutputPath(genesis, transactionPayload), } // Run benchmark