Skip to content

Commit 91d04fd

Browse files
authored
overriding microbatch macro for capturing the compiled code of the microbatch models(#1000)
1 parent 907e181 commit 91d04fd

5 files changed

Lines changed: 207 additions & 5 deletions

File tree

integration_tests/dbt_project/dbt_project.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@ models:
2929
+schema: elementary
3030
+enabled: "{{ var('elementary_enabled', True) }}"
3131
+file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}"
32+
33+
flags:
34+
require_batched_execution_for_custom_microbatch_strategy: True
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
from contextlib import contextmanager
2+
3+
import pytest
4+
5+
from dbt_project import DbtProject
6+
7+
8+
def _microbatch_source_model_sql() -> str:
9+
return """
10+
{{ config(event_time='order_date') }}
11+
{% set event_time_data_type = 'datetime2' if target.type == 'sqlserver' else 'timestamp' %}
12+
13+
select
14+
1 as order_id,
15+
1 as customer_id,
16+
42 as amount,
17+
cast('2024-01-01 00:00:00' as {{ event_time_data_type }}) as order_date
18+
union all
19+
select
20+
2 as order_id,
21+
2 as customer_id,
22+
84 as amount,
23+
cast('2025-01-01 00:00:00' as {{ event_time_data_type }}) as order_date
24+
"""
25+
26+
27+
def _microbatch_model_sql(source_model_name: str) -> str:
28+
return """
29+
{% set model_config = {
30+
"materialized": "incremental",
31+
"incremental_strategy": "microbatch",
32+
"event_time": "order_date",
33+
"batch_size": "year",
34+
"begin": "2024-01-01"
35+
} %}
36+
{% if target.type != "duckdb" %}
37+
{% do model_config.update({"unique_key": "order_id"}) %}
38+
{% endif %}
39+
{{ config(**model_config) }}
40+
41+
select
42+
order_id,
43+
customer_id,
44+
amount,
45+
order_date
46+
from {{ ref('__MICROBATCH_SOURCE_MODEL__') }}
47+
""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name)
48+
49+
50+
@contextmanager
51+
def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str):
52+
source_model_name = f"mb_src_{model_suffix}"
53+
target_model_name = f"mb_tgt_{model_suffix}"
54+
source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql")
55+
target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql")
56+
57+
source_model_path.write_text(_microbatch_source_model_sql())
58+
target_model_path.write_text(_microbatch_model_sql(source_model_name))
59+
relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path)
60+
relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path)
61+
try:
62+
yield relative_source_model_path, relative_target_model_path, target_model_name
63+
finally:
64+
if source_model_path.exists():
65+
source_model_path.unlink()
66+
if target_model_path.exists():
67+
target_model_path.unlink()
68+
69+
70+
def _run_microbatch_model_and_get_latest_success_result(
71+
dbt_project: DbtProject, model_suffix: str
72+
):
73+
with _with_microbatch_test_models(dbt_project, model_suffix) as (
74+
source_model_path,
75+
model_path,
76+
target_model_name,
77+
):
78+
dbt_project.dbt_runner.run(
79+
select=f"{source_model_path} {model_path}"
80+
)
81+
82+
unique_id = f"model.elementary_tests.{target_model_name}"
83+
run_results = dbt_project.read_table(
84+
"dbt_run_results",
85+
where=f"unique_id = '{unique_id}' and status = 'success'",
86+
order_by="generated_at desc",
87+
limit=1,
88+
)
89+
return run_results
90+
91+
92+
@contextmanager
93+
def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
94+
macro_path = (
95+
dbt_project.project_dir_path / "macros" / "microbatch.sql"
96+
)
97+
macro_sql = """
98+
{% macro __MACRO_NAME__(arg_dict) %}
99+
{{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }}
100+
{% endmacro %}
101+
""".replace("__MACRO_NAME__", macro_name)
102+
if macro_path.exists():
103+
raise FileExistsError(f"Expected no macro file at {macro_path}")
104+
105+
macro_path.write_text(macro_sql)
106+
try:
107+
yield
108+
finally:
109+
if macro_path.exists():
110+
macro_path.unlink()
111+
112+
113+
@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"])
114+
@pytest.mark.skip_for_dbt_fusion
115+
@pytest.mark.parametrize(
116+
"macro_name,expected_compiled_code,model_suffix",
117+
[
118+
("get_incremental_microbatch_sql", True, "with_override"),
119+
("get_incremental_microbatch_sql_not_used", False, "without_override"),
120+
],
121+
ids=["with_override", "without_override"],
122+
)
123+
def test_microbatch_run_results_compiled_code_behavior(
124+
dbt_project: DbtProject,
125+
macro_name: str,
126+
expected_compiled_code: bool,
127+
model_suffix: str,
128+
):
129+
dbt_project.dbt_runner.vars["disable_run_results"] = False
130+
131+
with _with_microbatch_macro_file(dbt_project, macro_name):
132+
run_results = _run_microbatch_model_and_get_latest_success_result(
133+
dbt_project, model_suffix
134+
)
135+
assert run_results, "Expected a successful run result row for microbatch model"
136+
if expected_compiled_code:
137+
assert run_results[0]["compiled_code"], (
138+
"Expected compiled_code to be populated when override macro is present"
139+
)
140+
else:
141+
assert not run_results[0]["compiled_code"], (
142+
"Expected compiled_code to stay empty when override macro is absent"
143+
)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
{#-
2+
NOTE FOR PACKAGE CONSUMERS:
3+
This package macro is not guaranteed to be picked up automatically by dbt's
4+
incremental strategy resolution in all projects.
5+
To apply this behavior, users should:
6+
1) Override `get_incremental_microbatch_sql` in their own project and delegate to
7+
`elementary.get_incremental_microbatch_sql(arg_dict)`.
8+
2) Enable dbt behavior flag `require_batched_execution_for_custom_microbatch_strategy`.
9+
10+
This flow is currently not supported for adapters:
11+
- spark
12+
- bigquery
13+
- athena
14+
- clickhouse
15+
- dremio
16+
- vertica
17+
18+
This flow is currently not supported for dbt Fusion.
19+
-#}
20+
{% macro get_incremental_microbatch_sql(arg_dict) %}
21+
{% if execute and model is defined %}
22+
{% do elementary.capture_microbatch_compiled_code_for_model() %}
23+
{% endif %}
24+
25+
{{ return(adapter.dispatch("get_incremental_microbatch_sql", "dbt")(arg_dict)) }}
26+
{% endmacro %}
27+
28+
29+
{% macro capture_microbatch_compiled_code_for_model() %}
30+
{% set model_unique_id = (
31+
model.get("unique_id") if model is mapping else model.unique_id
32+
) | default(none, true) %}
33+
{% set model_compiled_code = (
34+
model.get("compiled_code") if model is mapping else model.compiled_code
35+
) | default(none, true) %}
36+
{% if model_unique_id is none %}
37+
{{ return(none) }}
38+
{% endif %}
39+
{% if not model_compiled_code %}
40+
{{ return(none) }}
41+
{% endif %}
42+
43+
{% set compiled_code_by_unique_id = elementary.get_cache(
44+
"microbatch_compiled_code_by_unique_id"
45+
) %}
46+
{% if model_unique_id in compiled_code_by_unique_id %}
47+
{{ return(none) }}
48+
{% endif %}
49+
{% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %}
50+
{% endmacro %}

macros/edr/tests/on_run_start/init_elementary_graph.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
},
1212
"temp_test_table_relations_map": {},
1313
"duration_context_stack": {},
14+
"microbatch_compiled_code_by_unique_id": {},
1415
},
1516
) %}
1617
{% endmacro %}

macros/utils/graph/get_compiled_code.sql

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
{% macro get_compiled_code(node, as_column_value=false) %}
2-
{% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %}
2+
{% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %}
3+
{% if not compiled_code and node and node.get("unique_id") %}
4+
{% set compiled_code = elementary.get_cache(
5+
"microbatch_compiled_code_by_unique_id", {}
6+
).get(node.get("unique_id")) %}
7+
{% endif %}
8+
{% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %}
39

410
{% set max_column_size = elementary.get_column_size() %}
511
{% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %}
@@ -9,12 +15,11 @@
915
{% do return(compiled_code) %}
1016
{% endmacro %}
1117

12-
{% macro default__get_compiled_code(node) %}
13-
{% do return(node.get("compiled_code") or node.get("compiled_sql")) %}
18+
{% macro default__format_compiled_code(compiled_code) %}
19+
{% do return(compiled_code) %}
1420
{% endmacro %}
1521

16-
{% macro redshift__get_compiled_code(node) %}
17-
{% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %}
22+
{% macro redshift__format_compiled_code(compiled_code) %}
1823
{% if not compiled_code %} {% do return(none) %}
1924
{% else %} {% do return(compiled_code.replace("%", "%%")) %}
2025
{% endif %}

0 commit comments

Comments
 (0)