Extract Cloud Logging labels from AF3 log path when task_instance is missing in supervisor context#68246
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
dc7f1d1 to
6ba2072
Compare
c3298d7 to
85ec4b8
Compare
| # Parse labels from the structured log path as additional fallback. | ||
| path_labels = _labels_from_path(str(relative)) | ||
| labels.update(path_labels) |
There was a problem hiding this comment.
It overrides event-derived labels rather than acting as a "fallback"
There was a problem hiding this comment.
Fixed. Path labels now only fill in missing keys rather than calling update() on the whole dict.
| return "\n".join(messages), page.next_page_token | ||
|
|
||
|
|
||
| def _labels_from_path(relative_path: str) -> dict[str, str]: |
There was a problem hiding this comment.
should be renamed to _extract_labels_from_path
There was a problem hiding this comment.
Renamed to _extract_labels_from_path as requested.
| elif key == "run_id": | ||
| # run_id is NOT a standard Stackdriver label yet, but it is used | ||
| # on the write side via the log path template. Store it so the | ||
| # read path can filter on it (Bug 2 will wire this up). | ||
| pass |
There was a problem hiding this comment.
Currently it seems like a no-op, so I'd rather drop this branch
There was a problem hiding this comment.
Removed the no-op pass branch. run_id is now stored in the labels dict like the other keys.
6ed66de to
a96dbdc
Compare
In Airflow 3 the supervisor process runs REMOTE_TASK_LOG handlers, but record.task_instance is never set in supervisor context (it is a task-subprocess concept). When task_instance is missing the proc() closure shipped log entries with empty labels, making Cloud Logging entries unsearchable by dag_id / task_id. Parse dag_id, task_id, and try_number from the structured AF3 log path (dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log) instead. This requires zero DB access and works regardless of whether the handler runs in a task subprocess or the supervisor. relates to apache#68240
- Rename _labels_from_path to _extract_labels_from_path - Use path labels as fallback instead of overriding event-derived labels - Actually store run_id instead of no-op pass branch
a96dbdc to
7869fd4
Compare
Ran into this while debugging a Cloud Logging setup on GKE — log entries were landing in Stackdriver with empty labels, making it impossible to filter by dag_id or task_id.
Turns out the StackdriverRemoteLogIO.processors proc() closure reads
record.task_instanceto populate labels, but in AF3's supervisor model the REMOTE_TASK_LOG handler runs in the supervisor process where that attribute is never set. So every log entry from the supervisor just gets empty labels.This grabs dag_id, task_id, and try_number from the log path instead. AF3's log path template is
dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log— all four fields are already in the path with zero DB access needed.The fallback only kicks in when task_instance is genuinely missing, so the task-subprocess code path (where task_instance is available) is untouched.
Not sure if
run_idshould also be turned into a label here — left it out for now since the existing label set doesn't include it and the read-side filtering (bug 2) will need its own fix anyway. Happy to add it if maintainers think it belongs.relates to #68240