|
15 | 15 | .. specific language governing permissions and limitations |
16 | 16 | .. under the License. |
17 | 17 |
|
18 | | -Distributing DataFusion work |
19 | | -============================ |
20 | | - |
21 | | -Splitting a DataFusion workload across multiple processes — for |
22 | | -throughput, isolation, or to use a worker pool — comes in a few |
23 | | -different shapes depending on what is being split. |
24 | | - |
25 | | -* **Expression-level distribution** ✅ *supported today*. The driver |
26 | | - builds a DataFusion :py:class:`~datafusion.Expr`, sends it to |
27 | | - worker processes, and each worker evaluates the expression against |
28 | | - its own slice of data. Suits embarrassingly-parallel workloads |
29 | | - where the driver decides up front how to partition. |
30 | | -* **Query-level distribution via datafusion-distributed** 🚧 *work in |
31 | | - progress upstream*. A single logical / physical plan is split into |
32 | | - stages and run across worker nodes. The driver writes one SQL or |
33 | | - DataFrame query; the runtime decides partitioning. |
34 | | -* **Query-level distribution via Apache Ballista** 🚧 *work in |
35 | | - progress upstream*. Similar query-level model, with a more |
36 | | - cluster-management-oriented runtime. |
37 | | - |
38 | | -Only the first option is ready for use from datafusion-python today. |
39 | | -The other two are documented below so the surrounding story is in |
40 | | -one place; integration details will land here as those projects |
41 | | -become usable from datafusion-python. |
| 18 | +Distributing work |
| 19 | +================= |
| 20 | + |
| 21 | +DataFusion supports splitting work across processes by shipping |
| 22 | +serialized expressions to workers: the driver builds an |
| 23 | +:py:class:`~datafusion.Expr`, each worker evaluates it against its |
| 24 | +own slice of data. This pattern suits embarrassingly-parallel |
| 25 | +workloads where the driver decides partitioning up front. |
| 26 | + |
| 27 | +Query-level distribution — where the runtime partitions a single |
| 28 | +logical or physical plan across worker nodes — is in progress |
| 29 | +upstream via `datafusion-distributed |
| 30 | +<https://github.com/apache/datafusion-distributed>`_ and `Apache |
| 31 | +Ballista <https://github.com/apache/datafusion-ballista>`_. Both |
| 32 | +have short sections at the end of this page; integration details |
| 33 | +will land as those projects become usable from datafusion-python. |
42 | 34 |
|
43 | 35 | Expression-level distribution |
44 | 36 | ----------------------------- |
@@ -123,116 +115,26 @@ What travels with the expression |
123 | 115 | raises an error. |
124 | 116 |
|
125 | 117 | Portability requirements for inline Python UDFs |
126 | | -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 118 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
127 | 119 |
|
128 | 120 | Inline Python UDFs ride on `cloudpickle |
129 | 121 | <https://github.com/cloudpipe/cloudpickle>`_, which imposes two |
130 | 122 | requirements on the worker environment: |
131 | 123 |
|
132 | | -* **Matching Python minor version.** A cloudpickle payload serializes |
133 | | - Python bytecode, which is not stable across Python minor versions. A |
134 | | - UDF pickled on Python 3.12 cannot be reconstructed on a 3.11 or 3.13 |
135 | | - worker. The wire format stamps the sender's ``(major, minor)``; a |
136 | | - mismatch raises a clear error naming both versions rather than |
137 | | - failing obscurely deep inside ``cloudpickle.loads``. Align the Python |
138 | | - version on driver and workers. |
| 124 | +* **Matching Python minor version.** cloudpickle serializes Python |
| 125 | + bytecode, which is not stable across minor versions. A UDF pickled |
| 126 | + on 3.12 cannot be reconstructed on 3.11 or 3.13. The wire format |
| 127 | + stamps the sender's ``(major, minor)``; mismatches raise a clear |
| 128 | + error naming both versions. Align the Python version on driver and |
| 129 | + workers. |
139 | 130 | * **Imported modules must be importable on the worker.** cloudpickle |
140 | | - captures the UDF callable *by value* — bytecode and closure cells are |
141 | | - inlined, so locally-defined functions and lambdas travel whole. But |
142 | | - any name the callable resolves through ``import`` is captured *by |
143 | | - reference* (module path only). If a UDF body does |
144 | | - ``from mylib import transform`` and calls ``transform(...)``, the |
145 | | - worker reconstructs the reference by importing ``mylib`` — which must |
146 | | - therefore be installed on the worker. The same applies to bound |
147 | | - methods of imported classes. Self-contained UDFs (no imports beyond |
148 | | - what the worker already has, e.g. ``pyarrow``) avoid this entirely. |
149 | | - |
150 | | -Session contexts at a glance |
151 | | -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
152 | | - |
153 | | -There is only one type — :py:class:`SessionContext`. It can occupy |
154 | | -up to four *slots* in a running program: |
155 | | - |
156 | | -.. list-table:: |
157 | | - :header-rows: 1 |
158 | | - :widths: 12 18 40 30 |
159 | | - |
160 | | - * - Slot |
161 | | - - Lifetime |
162 | | - - Purpose |
163 | | - - Set how |
164 | | - * - User-held |
165 | | - - Local variable / attribute |
166 | | - - Build and run queries |
167 | | - - ``ctx = SessionContext(...)`` |
168 | | - * - Global |
169 | | - - Process singleton (lazy-init) |
170 | | - - Backs module-level |
171 | | - :py:func:`~datafusion.io.read_parquet`, |
172 | | - :py:func:`~datafusion.io.read_csv`, |
173 | | - :py:func:`~datafusion.io.read_json`, |
174 | | - :py:func:`~datafusion.io.read_avro`; final fallback for |
175 | | - :py:meth:`Expr.from_bytes` |
176 | | - - Implicit; access via |
177 | | - :py:meth:`SessionContext.global_ctx` |
178 | | - * - Sender |
179 | | - - Thread-local on the driver |
180 | | - - Codec settings for outbound :py:func:`pickle.dumps` / |
181 | | - :py:meth:`Expr.to_bytes` without ``ctx`` |
182 | | - - :py:func:`~datafusion.ipc.set_sender_ctx` |
183 | | - * - Worker |
184 | | - - Thread-local on the worker |
185 | | - - Function registry for inbound :py:func:`pickle.loads` / |
186 | | - :py:meth:`Expr.from_bytes` without ``ctx`` |
187 | | - - :py:func:`~datafusion.ipc.set_worker_ctx` |
188 | | - |
189 | | -The same :py:class:`SessionContext` object may occupy more than one |
190 | | -slot simultaneously — installing it into a slot is a reference, not |
191 | | -a copy. |
192 | | - |
193 | | -**Non-distributed user.** One user-held context. The global slot is |
194 | | -invisible unless you call top-level ``read_*`` helpers. Sender and |
195 | | -worker slots are unused. |
196 | | - |
197 | | -**Distributed user.** Two questions to answer: |
198 | | - |
199 | | -1. *Driver side — what wire format do I want?* The default (Python UDF |
200 | | - inlining on) is self-contained; you do not need a sender context. |
201 | | - To opt into the strict format, |
202 | | - :py:func:`~datafusion.ipc.set_sender_ctx` |
203 | | - with a session built via |
204 | | - :py:meth:`SessionContext.with_python_udf_inlining(enabled=False) |
205 | | - <datafusion.SessionContext.with_python_udf_inlining>`. |
206 | | - |
207 | | -2. *Worker side — what registrations does decode need?* For built-ins |
208 | | - and inline Python UDFs, nothing. For FFI-capsule UDFs (or |
209 | | - strict-mode round-trips that travel by name), call |
210 | | - :py:func:`~datafusion.ipc.set_worker_ctx` once per worker with a |
211 | | - context that has the relevant registrations. |
212 | | - |
213 | | -Resolution order on the worker side is *explicit argument → |
214 | | -worker context → global context.* Explicit ``ctx=`` on |
215 | | -:py:meth:`Expr.from_bytes` always wins; the sender slot is ignored |
216 | | -on decode and the worker slot is ignored on encode. |
217 | | - |
218 | | -Sharp edges: |
219 | | - |
220 | | -* Sender and worker slots are **thread-local**. Background threads |
221 | | - on either side see ``None`` until they install their own. |
222 | | -* Under the ``fork`` start method, the parent's ``threading.local()`` |
223 | | - values are copied into the child by copy-on-write — a forked |
224 | | - worker initially observes whatever sender / worker slot the parent |
225 | | - had set, until the worker writes its own value (or calls the |
226 | | - matching ``clear_*_ctx``). ``spawn`` and ``forkserver`` workers |
227 | | - start with empty thread-local slots. Treat the slot as |
228 | | - uninitialized on worker entry and install (or clear) it explicitly |
229 | | - in the worker initializer; do not rely on inherited state. |
230 | | -* The global slot persists across ``fork`` workers (copy-on-write |
231 | | - memory inherit) but not across ``spawn`` / ``forkserver`` workers |
232 | | - (fresh process — register or install a worker context on |
233 | | - start-up). |
234 | | -* The inlining toggle is per-context state, not a global switch. |
235 | | - Two contexts with different toggles can coexist in one process. |
| 131 | + captures the callable *by value* (bytecode and closure cells travel |
| 132 | + whole), but names resolved through ``import`` are captured *by |
| 133 | + reference* — module path only. A UDF doing |
| 134 | + ``from mylib import transform`` requires ``mylib`` installed on the |
| 135 | + worker. Same applies to bound methods of imported classes. |
| 136 | + Self-contained UDFs (no imports beyond what the worker already has, |
| 137 | + e.g. ``pyarrow``) avoid this entirely. |
236 | 138 |
|
237 | 139 | Registering shared UDFs on workers |
238 | 140 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
@@ -361,6 +263,75 @@ Security |
361 | 263 | functions and pre-registered Rust-side UDFs, and avoid |
362 | 264 | :py:func:`pickle.loads` on externally supplied bytes entirely. |
363 | 265 |
|
| 266 | +Reference: session context slots |
| 267 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 268 | + |
| 269 | +There is only one type — :py:class:`SessionContext`. It can occupy |
| 270 | +up to four *slots* in a running program: |
| 271 | + |
| 272 | +.. list-table:: |
| 273 | + :header-rows: 1 |
| 274 | + :widths: 12 18 40 30 |
| 275 | + |
| 276 | + * - Slot |
| 277 | + - Lifetime |
| 278 | + - Purpose |
| 279 | + - Set how |
| 280 | + * - User-held |
| 281 | + - Local variable / attribute |
| 282 | + - Build and run queries |
| 283 | + - ``ctx = SessionContext(...)`` |
| 284 | + * - Global |
| 285 | + - Process singleton (lazy-init) |
| 286 | + - Backs module-level |
| 287 | + :py:func:`~datafusion.io.read_parquet`, |
| 288 | + :py:func:`~datafusion.io.read_csv`, |
| 289 | + :py:func:`~datafusion.io.read_json`, |
| 290 | + :py:func:`~datafusion.io.read_avro`; final fallback for |
| 291 | + :py:meth:`Expr.from_bytes` |
| 292 | + - Implicit; access via |
| 293 | + :py:meth:`SessionContext.global_ctx` |
| 294 | + * - Sender |
| 295 | + - Thread-local on the driver |
| 296 | + - Codec settings for outbound :py:func:`pickle.dumps` / |
| 297 | + :py:meth:`Expr.to_bytes` without ``ctx`` |
| 298 | + - :py:func:`~datafusion.ipc.set_sender_ctx` |
| 299 | + * - Worker |
| 300 | + - Thread-local on the worker |
| 301 | + - Function registry for inbound :py:func:`pickle.loads` / |
| 302 | + :py:meth:`Expr.from_bytes` without ``ctx`` |
| 303 | + - :py:func:`~datafusion.ipc.set_worker_ctx` |
| 304 | + |
| 305 | +The same :py:class:`SessionContext` object may occupy more than one |
| 306 | +slot simultaneously — installing it into a slot is a reference, not |
| 307 | +a copy. A non-distributed program only ever uses the user-held slot; |
| 308 | +the global slot is invisible unless you call top-level ``read_*`` |
| 309 | +helpers. |
| 310 | + |
| 311 | +Resolution order on the worker side is *explicit argument → |
| 312 | +worker context → global context.* Explicit ``ctx=`` on |
| 313 | +:py:meth:`Expr.from_bytes` always wins; the sender slot is ignored |
| 314 | +on decode and the worker slot is ignored on encode. |
| 315 | + |
| 316 | +Sharp edges: |
| 317 | + |
| 318 | +* Sender and worker slots are **thread-local**. Background threads |
| 319 | + on either side see ``None`` until they install their own. |
| 320 | +* Under the ``fork`` start method, the parent's ``threading.local()`` |
| 321 | + values are copied into the child by copy-on-write — a forked |
| 322 | + worker initially observes whatever sender / worker slot the parent |
| 323 | + had set, until the worker writes its own value (or calls the |
| 324 | + matching ``clear_*_ctx``). ``spawn`` and ``forkserver`` workers |
| 325 | + start with empty thread-local slots. Treat the slot as |
| 326 | + uninitialized on worker entry and install (or clear) it explicitly |
| 327 | + in the worker initializer; do not rely on inherited state. |
| 328 | +* The global slot persists across ``fork`` workers (copy-on-write |
| 329 | + memory inherit) but not across ``spawn`` / ``forkserver`` workers |
| 330 | + (fresh process — register or install a worker context on |
| 331 | + start-up). |
| 332 | +* The inlining toggle is per-context state, not a global switch. |
| 333 | + Two contexts with different toggles can coexist in one process. |
| 334 | + |
364 | 335 | Query-level distribution via datafusion-distributed |
365 | 336 | --------------------------------------------------- |
366 | 337 |
|
|
0 commit comments