diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index badf9ce4352ad..d497d2bcdb2df 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -677,7 +677,7 @@ run_tpch() { echo "Running tpch benchmark..." FORMAT=$2 - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --scale-factor "${SCALE_FACTOR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the tpch in memory (needs tpch parquet data) @@ -693,7 +693,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --scale-factor "${SCALE_FACTOR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } # Runs the tpcds benchmark diff --git a/benchmarks/queries/q10.sql b/benchmarks/queries/q10.sql index 8613fd4962837..8ac2fd90798c9 100644 --- a/benchmarks/queries/q10.sql +++ b/benchmarks/queries/q10.sql @@ -16,7 +16,7 @@ where c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate >= date '1993-10-01' - and o_orderdate < date '1994-01-01' + and o_orderdate < date '1993-10-01' + interval '3' month and l_returnflag = 'R' and c_nationkey = n_nationkey group by diff --git a/benchmarks/queries/q11.sql b/benchmarks/queries/q11.sql index c23ed1c71bfb3..9a9710d09ec35 100644 --- a/benchmarks/queries/q11.sql +++ b/benchmarks/queries/q11.sql @@ -13,7 +13,7 @@ group by ps_partkey having sum(ps_supplycost * ps_availqty) > ( select - sum(ps_supplycost * ps_availqty) * 0.0001 + sum(ps_supplycost * ps_availqty) * 0.0001 /* __TPCH_Q11_FRACTION__ */ from partsupp, supplier, @@ -24,4 +24,4 @@ group by and n_name = 'GERMANY' ) order by - value desc; \ No newline at end of file + value desc; diff --git a/benchmarks/queries/q12.sql b/benchmarks/queries/q12.sql index f8e6d960c8420..c3f4d62344701 100644 --- a/benchmarks/queries/q12.sql +++ b/benchmarks/queries/q12.sql @@ -23,8 +23,8 @@ where and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1994-01-01' - and l_receiptdate < date '1995-01-01' + and l_receiptdate < date '1994-01-01' + interval '1' year group by l_shipmode order by - l_shipmode; \ No newline at end of file + l_shipmode; diff --git a/benchmarks/queries/q14.sql b/benchmarks/queries/q14.sql index d8ef6afaca9bb..6fe88c42662d0 100644 --- a/benchmarks/queries/q14.sql +++ b/benchmarks/queries/q14.sql @@ -10,4 +10,4 @@ from where l_partkey = p_partkey and l_shipdate >= date '1995-09-01' - and l_shipdate < date '1995-10-01'; \ No newline at end of file + and l_shipdate < date '1995-09-01' + interval '1' month; diff --git a/benchmarks/queries/q5.sql b/benchmarks/queries/q5.sql index 5a336b231184b..146980ccd6f76 100644 --- a/benchmarks/queries/q5.sql +++ b/benchmarks/queries/q5.sql @@ -17,8 +17,8 @@ where and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date '1994-01-01' - and o_orderdate < date '1995-01-01' + and o_orderdate < date '1994-01-01' + interval '1' year group by n_name order by - revenue desc; \ No newline at end of file + revenue desc; diff --git a/benchmarks/queries/q6.sql b/benchmarks/queries/q6.sql index 5806f980f8088..5a13fe7df765a 100644 --- a/benchmarks/queries/q6.sql +++ b/benchmarks/queries/q6.sql @@ -4,6 +4,6 @@ from lineitem where l_shipdate >= date '1994-01-01' - and l_shipdate < date '1995-01-01' + and l_shipdate < date '1994-01-01' + interval '1' year and l_discount between 0.06 - 0.01 and 0.06 + 0.01 - and l_quantity < 24; \ No newline at end of file + and l_quantity < 24; diff --git a/benchmarks/src/tpch/mod.rs b/benchmarks/src/tpch/mod.rs index 681aa0a403ee1..08cedc0e5b4c3 100644 --- a/benchmarks/src/tpch/mod.rs +++ b/benchmarks/src/tpch/mod.rs @@ -33,6 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[ pub const TPCH_QUERY_START_ID: usize = 1; pub const TPCH_QUERY_END_ID: usize = 22; +const TPCH_Q11_FRACTION_SENTINEL: &str = "0.0001 /* __TPCH_Q11_FRACTION__ */"; /// The `.tbl` file contains a trailing column pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { @@ -139,6 +140,21 @@ pub fn get_tpch_table_schema(table: &str) -> Schema { /// Get the SQL statements from the specified query file pub fn get_query_sql(query: usize) -> Result> { + get_query_sql_for_scale_factor(query, 1.0) +} + +/// Get the SQL statements from the specified query file using the provided scale factor for +/// TPC-H substitutions such as Q11 FRACTION. +pub fn get_query_sql_for_scale_factor( + query: usize, + scale_factor: f64, +) -> Result> { + if !(scale_factor.is_finite() && scale_factor > 0.0) { + return plan_err!( + "invalid scale factor. Expected a positive finite value, got {scale_factor}" + ); + } + if query > 0 && query < 23 { let possibilities = vec![ format!("queries/q{query}.sql"), @@ -148,6 +164,7 @@ pub fn get_query_sql(query: usize) -> Result> { for filename in possibilities { match fs::read_to_string(&filename) { Ok(contents) => { + let contents = customize_query_sql(query, contents, scale_factor)?; return Ok(contents .split(';') .map(|s| s.trim()) @@ -164,6 +181,27 @@ pub fn get_query_sql(query: usize) -> Result> { } } +fn customize_query_sql( + query: usize, + contents: String, + scale_factor: f64, +) -> Result { + if query != 11 { + return Ok(contents); + } + + if !contents.contains(TPCH_Q11_FRACTION_SENTINEL) { + return plan_err!( + "invalid query 11. Missing fraction marker {TPCH_Q11_FRACTION_SENTINEL}" + ); + } + + Ok(contents.replace( + TPCH_Q11_FRACTION_SENTINEL, + &format!("(0.0001 / {scale_factor})"), + )) +} + pub const QUERY_LIMIT: [Option; 22] = [ None, Some(100), @@ -188,3 +226,51 @@ pub const QUERY_LIMIT: [Option; 22] = [ Some(100), None, ]; + +#[cfg(test)] +mod tests { + use super::{get_query_sql, get_query_sql_for_scale_factor}; + use datafusion::error::Result; + + fn get_single_query(query: usize) -> Result { + let mut queries = get_query_sql(query)?; + assert_eq!(queries.len(), 1); + Ok(queries.remove(0)) + } + + fn get_single_query_for_scale_factor( + query: usize, + scale_factor: f64, + ) -> Result { + let mut queries = get_query_sql_for_scale_factor(query, scale_factor)?; + assert_eq!(queries.len(), 1); + Ok(queries.remove(0)) + } + + #[test] + fn q11_uses_scale_factor_substitution() -> Result<()> { + let sf1_sql = get_single_query(11)?; + assert!(sf1_sql.contains("(0.0001 / 1)")); + + let sf01_sql = get_single_query_for_scale_factor(11, 0.1)?; + assert!(sf01_sql.contains("(0.0001 / 0.1)")); + + let sf10_sql = get_single_query_for_scale_factor(11, 10.0)?; + assert!(sf10_sql.contains("(0.0001 / 10)")); + + let sf30_sql = get_single_query_for_scale_factor(11, 30.0)?; + assert!(sf30_sql.contains("(0.0001 / 30)")); + assert!(!sf10_sql.contains("__TPCH_Q11_FRACTION__")); + Ok(()) + } + + #[test] + fn interval_queries_use_interval_arithmetic() -> Result<()> { + assert!(get_single_query(5)?.contains("date '1994-01-01' + interval '1' year")); + assert!(get_single_query(6)?.contains("date '1994-01-01' + interval '1' year")); + assert!(get_single_query(10)?.contains("date '1993-10-01' + interval '3' month")); + assert!(get_single_query(12)?.contains("date '1994-01-01' + interval '1' year")); + assert!(get_single_query(14)?.contains("date '1995-09-01' + interval '1' month")); + Ok(()) + } +} diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 0d1268013c168..ec7aa8c554a28 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -15,17 +15,18 @@ // specific language governing permissions and limitations // under the License. -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use super::{ - TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql, + TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql_for_scale_factor, get_tbl_tpch_table_schema, get_tpch_table_schema, }; use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::{self, pretty_format_batches}; +use datafusion::common::exec_err; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -71,6 +72,11 @@ pub struct RunOpt { #[arg(required = true, short = 'p', long = "path")] path: PathBuf, + /// TPC-H scale factor used for query substitutions such as Q11 FRACTION. + /// If omitted, the benchmark tries to infer it from paths like `.../tpch_sf10/...`. + #[arg(long)] + scale_factor: Option, + /// File format: `csv` or `parquet` #[arg(short = 'f', long = "format", default_value = "csv")] file_format: String, @@ -133,10 +139,11 @@ impl RunOpt { let ctx = SessionContext::new_with_config_rt(config, rt); // register tables self.register_tables(&ctx).await?; + let scale_factor = self.scale_factor()?; for query_id in query_range { benchmark_run.start_new_case(&format!("Query {query_id}")); - let query_run = self.benchmark_query(query_id, &ctx).await; + let query_run = self.benchmark_query(query_id, scale_factor, &ctx).await; match query_run { Ok(query_results) => { for iter in query_results { @@ -157,13 +164,14 @@ impl RunOpt { async fn benchmark_query( &self, query_id: usize, + scale_factor: f64, ctx: &SessionContext, ) -> Result> { let mut millis = vec![]; // run benchmark let mut query_results = vec![]; - let sql = &get_query_sql(query_id)?; + let sql = &get_query_sql_for_scale_factor(query_id, scale_factor)?; for i in 0..self.iterations() { let start = Instant::now(); @@ -346,6 +354,82 @@ impl RunOpt { .partitions .unwrap_or_else(get_available_parallelism) } + + fn scale_factor(&self) -> Result { + resolve_scale_factor(self.scale_factor, &self.path) + } +} + +fn resolve_scale_factor(scale_factor: Option, path: &Path) -> Result { + let scale_factor = scale_factor + .or_else(|| infer_scale_factor_from_path(path)) + .unwrap_or(1.0); + + if scale_factor.is_finite() && scale_factor > 0.0 { + Ok(scale_factor) + } else { + exec_err!( + "Invalid TPC-H scale factor {scale_factor}. Expected a positive finite value" + ) + } +} + +fn infer_scale_factor_from_path(path: &Path) -> Option { + path.iter().find_map(|component| { + component + .to_str()? + .strip_prefix("tpch_sf")? + .parse::() + .ok() + }) +} + +#[cfg(test)] +mod scale_factor_tests { + use std::path::Path; + + use super::{infer_scale_factor_from_path, resolve_scale_factor}; + use datafusion::error::Result; + + #[test] + fn uses_explicit_scale_factor_when_provided() -> Result<()> { + let scale_factor = + resolve_scale_factor(Some(30.0), Path::new("benchmarks/data/tpch_sf10"))?; + assert_eq!(scale_factor, 30.0); + Ok(()) + } + + #[test] + fn infers_scale_factor_from_standard_tpch_path() -> Result<()> { + let scale_factor = + resolve_scale_factor(None, Path::new("benchmarks/data/tpch_sf10"))?; + assert_eq!(scale_factor, 10.0); + assert_eq!( + infer_scale_factor_from_path(Path::new("benchmarks/data/tpch_sf0.1")), + Some(0.1) + ); + Ok(()) + } + + #[test] + fn defaults_to_sf1_when_path_has_no_scale_factor() -> Result<()> { + let scale_factor = resolve_scale_factor(None, Path::new("benchmarks/data"))?; + assert_eq!(scale_factor, 1.0); + Ok(()) + } + + #[test] + fn rejects_invalid_scale_factors() { + assert!(resolve_scale_factor(Some(0.0), Path::new("benchmarks/data")).is_err()); + assert!(resolve_scale_factor(Some(-1.0), Path::new("benchmarks/data")).is_err()); + assert!( + resolve_scale_factor(Some(f64::NAN), Path::new("benchmarks/data")).is_err() + ); + assert!( + resolve_scale_factor(Some(f64::INFINITY), Path::new("benchmarks/data")) + .is_err() + ); + } } #[cfg(test)] @@ -392,6 +476,7 @@ mod tests { query: Some(query), common, path: PathBuf::from(path.to_string()), + scale_factor: Some(1.0), file_format: "tbl".to_string(), mem_table: false, output_path: None, @@ -402,7 +487,7 @@ mod tests { hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; - let queries = get_query_sql(query)?; + let queries = crate::tpch::get_query_sql(query)?; for query in queries { let plan = ctx.sql(&query).await?; let plan = plan.into_optimized_plan()?; @@ -432,6 +517,7 @@ mod tests { query: Some(query), common, path: PathBuf::from(path.to_string()), + scale_factor: Some(1.0), file_format: "tbl".to_string(), mem_table: false, output_path: None, @@ -442,7 +528,7 @@ mod tests { hash_join_buffering_capacity: 0, }; opt.register_tables(&ctx).await?; - let queries = get_query_sql(query)?; + let queries = crate::tpch::get_query_sql(query)?; for query in queries { let plan = ctx.sql(&query).await?; let plan = plan.create_physical_plan().await?;