Skip to content

Commit cfffddf

Browse files
timsaucerclaude
andcommitted
feat: expose SessionContext.write_csv, write_json, write_parquet
Adds three plan-level writers on SessionContext that mirror the upstream datafusion::execution::context API. Each takes an ExecutionPlan and an output directory path; the plan is executed and its results are written one partition per file inside that directory. These complement the existing DataFrame.write_* methods, which are the right choice when callers need finer control (CSV header, Parquet compression, write options). The new SessionContext methods are the right choice when a caller already holds a physical ExecutionPlan (for example after custom physical optimizer rules or hand-built plans) and just wants the rows materialized. Related to #462. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d021e6a commit cfffddf

2 files changed

Lines changed: 94 additions & 0 deletions

File tree

crates/core/src/context.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
4444
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
4545
use datafusion::execution::session_state::SessionStateBuilder;
4646
use datafusion::execution::{FunctionRegistry, TaskContextProvider};
47+
use datafusion::physical_plan::ExecutionPlan;
4748
use datafusion::prelude::{
4849
AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
4950
};
@@ -1410,6 +1411,42 @@ impl PySessionContext {
14101411
Ok(PyRecordBatchStream::new(stream))
14111412
}
14121413

1414+
/// Execute an `ExecutionPlan` and write the results to a partitioned CSV file at `path`.
1415+
pub fn write_csv(
1416+
&self,
1417+
plan: PyExecutionPlan,
1418+
path: &str,
1419+
py: Python,
1420+
) -> PyDataFusionResult<()> {
1421+
let plan: Arc<dyn ExecutionPlan> = plan.into();
1422+
wait_for_future(py, self.ctx.write_csv(plan, path))??;
1423+
Ok(())
1424+
}
1425+
1426+
/// Execute an `ExecutionPlan` and write the results to a partitioned newline-delimited JSON file at `path`.
1427+
pub fn write_json(
1428+
&self,
1429+
plan: PyExecutionPlan,
1430+
path: &str,
1431+
py: Python,
1432+
) -> PyDataFusionResult<()> {
1433+
let plan: Arc<dyn ExecutionPlan> = plan.into();
1434+
wait_for_future(py, self.ctx.write_json(plan, path))??;
1435+
Ok(())
1436+
}
1437+
1438+
/// Execute an `ExecutionPlan` and write the results to a partitioned Parquet file at `path`.
1439+
pub fn write_parquet(
1440+
&self,
1441+
plan: PyExecutionPlan,
1442+
path: &str,
1443+
py: Python,
1444+
) -> PyDataFusionResult<()> {
1445+
let plan: Arc<dyn ExecutionPlan> = plan.into();
1446+
wait_for_future(py, self.ctx.write_parquet(plan, path, None))??;
1447+
Ok(())
1448+
}
1449+
14131450
pub fn __datafusion_task_context_provider__<'py>(
14141451
&self,
14151452
py: Python<'py>,

python/datafusion/context.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1902,6 +1902,63 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
19021902
"""Execute the ``plan`` and return the results."""
19031903
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))
19041904

1905+
def write_csv(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None:
1906+
"""Execute ``plan`` and write the results to a partitioned CSV file.
1907+
1908+
``path`` is treated as a directory; one file per partition is written
1909+
inside it. For per-DataFrame writes with options (header control,
1910+
write options), use :py:meth:`DataFrame.write_csv` instead.
1911+
1912+
Examples:
1913+
>>> import tempfile, pathlib
1914+
>>> ctx = dfn.SessionContext()
1915+
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
1916+
>>> with tempfile.TemporaryDirectory() as tmp:
1917+
... out = pathlib.Path(tmp) / "out"
1918+
... ctx.write_csv(df.execution_plan(), str(out))
1919+
... sorted(p.suffix for p in out.iterdir())
1920+
['.csv']
1921+
"""
1922+
self.ctx.write_csv(plan._raw_plan, str(path))
1923+
1924+
def write_json(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None:
1925+
"""Execute ``plan`` and write the results to a partitioned NDJSON file.
1926+
1927+
``path`` is treated as a directory; one newline-delimited JSON file
1928+
per partition is written inside it. For per-DataFrame writes with
1929+
options, use :py:meth:`DataFrame.write_json` instead.
1930+
1931+
Examples:
1932+
>>> import tempfile, pathlib
1933+
>>> ctx = dfn.SessionContext()
1934+
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
1935+
>>> with tempfile.TemporaryDirectory() as tmp:
1936+
... out = pathlib.Path(tmp) / "out"
1937+
... ctx.write_json(df.execution_plan(), str(out))
1938+
... sorted(p.suffix for p in out.iterdir())
1939+
['.json']
1940+
"""
1941+
self.ctx.write_json(plan._raw_plan, str(path))
1942+
1943+
def write_parquet(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None:
1944+
"""Execute ``plan`` and write the results to a partitioned Parquet file.
1945+
1946+
``path`` is treated as a directory; one Parquet file per partition is
1947+
written inside it. For per-DataFrame writes with compression and
1948+
writer options, use :py:meth:`DataFrame.write_parquet` instead.
1949+
1950+
Examples:
1951+
>>> import tempfile, pathlib
1952+
>>> ctx = dfn.SessionContext()
1953+
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
1954+
>>> with tempfile.TemporaryDirectory() as tmp:
1955+
... out = pathlib.Path(tmp) / "out"
1956+
... ctx.write_parquet(df.execution_plan(), str(out))
1957+
... sorted(p.suffix for p in out.iterdir())
1958+
['.parquet']
1959+
"""
1960+
self.ctx.write_parquet(plan._raw_plan, str(path))
1961+
19051962
@staticmethod
19061963
def _convert_file_sort_order(
19071964
file_sort_order: Sequence[Sequence[SortKey]] | None,

0 commit comments

Comments
 (0)