Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
"interaction": {
"drilldown": false
},
"integrations": {
"posthog": {
"apiKey": "phc_neTVVD8S3p76S25BXYXYf7TadgWK6ri4pg8sLqb4TVhJ"
}
},
"navigation": {
"tabs": [
{
Expand Down
13 changes: 9 additions & 4 deletions guides/workflows/multi-language.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ class ScheduleImageCapture(Task):

def execute(self, context: ExecutionContext) -> None:
# Here you can implement your task logic, submit subtasks, etc.
print(f"Image captured for {self.location} with {self.resolution_m}m resolution and bands {self.spectral_bands}")
context.logger.info(
"Image captured",
location=self.location,
resolution_m=self.resolution_m,
spectral_bands=self.spectral_bands,
)

@staticmethod
def identifier() -> tuple[str, str]:
Expand Down Expand Up @@ -197,10 +202,10 @@ Submit a job to the Go server.
curl http://localhost:8080/submit?lat=40.75&lon=-73.98&resolution=30&bands[]=489.0,560.6,666.5
```

Check the Python runner output, it should print the following line:
Check the Python runner logs for the following entry:

```plaintext Output
Image captured for [40.75, -73.98] with 30m resolution and bands [489, 560.6, 666.5]
```plaintext Logs
Image captured location=[40.75, -73.98] resolution_m=30 spectral_bands=[489, 560.6, 666.5]
```

## Next Steps
Expand Down
14 changes: 7 additions & 7 deletions workflows/caches.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ The following snippet illustrates storing and retrieving data from the cache.
class ConsumerTask(Task):
def execute(self, context: ExecutionContext) -> None:
data = context.job_cache["data"]
print(f"Read {data} from cache")
context.logger.info("Read data from cache", data=data.decode())
```
</CodeGroup>

In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts.

To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and observe the output of the `ConsumerTask`.
To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and inspect the logs emitted by the `ConsumerTask`.

<CodeGroup>
```python Python
Expand All @@ -185,8 +185,8 @@ To test the workflow, you can start a local task runner using the `InMemoryCache
```
</CodeGroup>

```plaintext Output
Read b'my_binary_data_to_store' from cache
```plaintext Logs
Read data from cache data=my_binary_data_to_store
```

## Groups and Hierarchical Keys
Expand Down Expand Up @@ -244,7 +244,7 @@ class PrintSum(Task):
number = group[key] # read data from cache
numbers.append(int(number.decode())) # convert bytes back to int

print(f"Sum of all numbers: {sum(numbers)}")
context.logger.info("Computed sum of all numbers", total=sum(numbers))
```
</CodeGroup>

Expand All @@ -265,6 +265,6 @@ Submitting a job of the `CacheGroupDemo` and running it with a task runner can b
```
</CodeGroup>

```plaintext Output
Sum of all numbers: 284
```plaintext Logs
Computed sum of all numbers total=284
```
40 changes: 26 additions & 14 deletions workflows/concepts/jobs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,16 @@ class PrintMovieStats(Task):
def execute(self, context: ExecutionContext) -> None:
params = {"t": self.title, "apikey": "<OMDB API Key>"}
url = "http://www.omdbapi.com/?" + urlencode(params)
response = httpx.get(url).json()
with context.tracer.span("fetch-movie-stats") as span:
span.set_attribute("movie.title", self.title)
response = httpx.get(url).json()
# set the display name of the task to the title of the movie:
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
context.logger.info(
"Movie release date fetched",
title=response["Title"],
released=response["Released"],
)
```
```go Go
package movie
Expand Down Expand Up @@ -521,11 +527,11 @@ job, err := client.Jobs.Submit(ctx, "movies-stats",

One of the `PrintMovieStats` tasks fails with a `KeyError`. This error occurs when a movie title is not found by the [OMDb API](http://www.omdbapi.com/), leading to a response without the `Title` and `Released` fields.

Console output from the task runners confirms this:
Task logs from the runners confirm this:

```plaintext Output
The Matrix was released on 31 Mar 1999
Shrek 2 was released on 19 May 2004
```plaintext Logs
Movie release date fetched title="The Matrix" released="31 Mar 1999"
Movie release date fetched title="Shrek 2" released="19 May 2004"
ERROR: Task PrintMovieStats failed with exception: KeyError('Title')
```

Expand All @@ -539,13 +545,19 @@ class PrintMovieStats(Task):
def execute(self, context: ExecutionContext) -> None:
params = {"t": self.title, "apikey": "<OMDB API Key>"}
url = "http://www.omdbapi.com/?" + urlencode(params)
response = httpx.get(url).json()
with context.tracer.span("fetch-movie-stats") as span:
span.set_attribute("movie.title", self.title)
response = httpx.get(url).json()
if "Title" in response and "Released" in response:
context.current_task.display = response["Title"]
print(f"{response['Title']} was released on {response['Released']}")
context.logger.info(
"Movie release date fetched",
title=response["Title"],
released=response["Released"],
)
else:
context.current_task.display = f"NotFound: {self.title}"
print(f"Could not find the release date for {self.title}")
context.logger.info("Movie release date not found", title=self.title)
```
```go Go
type PrintMovieStats struct {
Expand Down Expand Up @@ -607,15 +619,15 @@ _, err := client.Jobs.Retry(ctx, job.ID)
<img src="/assets/workflows/diagrams/svg/movies-retried.dark.svg" alt="Job retried successfully" className="hidden dark:block" />
</Frame>

Now the console output shows:
Now the task logs show:

```plaintext Output
Could not find the release date for Tilebox - The Movie
The Avengers was released on 04 May 2012
```plaintext Logs
Movie release date not found title="Tilebox - The Movie"
Movie release date fetched title="The Avengers" released="04 May 2012"
```

<Note>
The output confirms that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks.
The logs confirm that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks.
</Note>

The job was retried and succeeded. The two tasks that completed before the failure were not re-executed.
74 changes: 43 additions & 31 deletions workflows/concepts/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ For python, the key components of this task are:
The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation.
</Accordion>
<Accordion title="context: ExecutionContext">
The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache).
The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job, [task logging](/api-reference/python/tilebox.workflows/ExecutionContext.logger), [custom tracing](/api-reference/python/tilebox.workflows/ExecutionContext.tracer), and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache).
</Accordion>
</AccordionGroup>

Expand Down Expand Up @@ -129,7 +129,7 @@ class ChildTask(Task):
index: int

def execute(self, context: ExecutionContext) -> None:
print(f"Executing ChildTask {self.index}")
context.logger.info("Executing child task", index=self.index)

# after submitting this task, a task runner may pick it up and execute it
# which will result in 5 ChildTasks being submitted and executed as well
Expand Down Expand Up @@ -425,7 +425,7 @@ class Sum(Task): # The reduce step
for key in squares:
result += int(squares[key].decode())

print("Sum of squares is:", result)
context.logger.info("Computed sum of squares", result=result)
```
</CodeGroup>

Expand All @@ -449,8 +449,8 @@ jobs.display(job)
```
</CodeGroup>

```plaintext Output
Sum of squares is: 336448
```plaintext Logs
Computed sum of squares result=336448
```

<Frame>
Expand Down Expand Up @@ -479,7 +479,7 @@ For example, the `RecursiveTask` below is a valid task that submits smaller inst
num: int

def execute(self, context: ExecutionContext) -> None:
print(f"Executing RecursiveTask with num={self.num}")
context.logger.info("Executing recursive task", num=self.num)
# if num < 2, we reached the base case and stop recursion
if self.num >= 2:
context.submit_subtask(RecursiveTask(self.num // 2))
Expand Down Expand Up @@ -613,7 +613,7 @@ class RootTask(Task):

class FlakyTask(Task):
def execute(self, context: ExecutionContext) -> None:
print(f"Executing FlakyTask")
context.logger.info("Executing flaky task")

if random.random() < 0.1:
raise Exception("FlakyTask failed randomly")
Expand Down Expand Up @@ -683,7 +683,7 @@ class PrintTask(Task):
message: str

def execute(self, context: ExecutionContext) -> None:
print(self.message)
context.logger.info("Print task executed", message=self.message)
```
```go Go
type RootTask struct{}
Expand Down Expand Up @@ -761,23 +761,35 @@ A practical example is a workflow that fetches news articles from an API and pro

def execute(self, context: ExecutionContext) -> None:
url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY"
news = httpx.get(url).json()
with context.tracer.span("fetch-news") as span:
span.set_attribute("category", self.category)
span.set_attribute("max_articles", self.max_articles)
news = httpx.get(url).json()
# check out our documentation page on caches to learn
# about a better way of passing data between tasks
Path("news.json").write_text(json.dumps(news))
context.logger.info(
"Fetched news articles",
category=self.category,
article_count=len(news["articles"]),
)

class PrintHeadlines(Task):
def execute(self, context: ExecutionContext) -> None:
news = json.loads(Path("news.json").read_text())
for article in news["articles"]:
print(f"{article['publishedAt'][:10]}: {article['title']}")
context.logger.info(
"News headline",
published_at=article["publishedAt"][:10],
title=article["title"],
)

class MostFrequentAuthors(Task):
def execute(self, context: ExecutionContext) -> None:
news = json.loads(Path("news.json").read_text())
authors = [article["author"] for article in news["articles"]]
for author, count in Counter(authors).most_common():
print(f"Author {author} has written {count} articles")
context.logger.info("Author article count", author=author, count=count)

# now submit a job, and then visualize it
job = job_client.submit("process-news",
Expand Down Expand Up @@ -935,17 +947,17 @@ job, err := client.Jobs.Submit(ctx, "process-news",
```
</CodeGroup>

```plaintext Output
2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews
2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel
2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org
2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News
2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org
Author Jeff Foust has written 1 articles
Author Richard Tribou has written 1 articles
Author Jeff Renaud has written 1 articles
Author Neuroscience News has written 1 articles
Author Science X has written 1 articles
```plaintext Logs
News headline published_at=2024-02-15 title="NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews"
News headline published_at=2024-02-15 title="SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel"
News headline published_at=2024-02-14 title="Saturn's largest moon most likely uninhabitable - Phys.org"
News headline published_at=2024-02-14 title="AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News"
News headline published_at=2024-02-14 title="Anthropologists' research unveils early stone plaza in the Andes - Phys.org"
Author article count author="Jeff Foust" count=1
Author article count author="Richard Tribou" count=1
Author article count author="Jeff Renaud" count=1
Author article count author="Neuroscience News" count=1
Author article count author="Science X" count=1
```

<Frame>
Expand All @@ -963,8 +975,8 @@ This workflow consists of four tasks:
| ------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------- |
| NewsWorkflow | - | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them. |
| FetchNews | - | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. |
| PrintHeadlines | FetchNews | A task that prints the headlines of the news articles to the console. |
| MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and prints the result to the console. |
| PrintHeadlines | FetchNews | A task that logs the headlines of the news articles. |
| MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and logs the result. |

An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available.

Expand Down Expand Up @@ -1017,17 +1029,17 @@ class RootTask(Task):
class RequiredTask(Task):
def execute(self, context: ExecutionContext) -> None:
# this task may fail, but the job will continue regardless
print("This task is required to succeed, otherwise the job would stop")
context.logger.info("Required task completed")

class FlakyTask(Task):
def execute(self, context: ExecutionContext) -> None:
# this task may fail, but the job will continue regardless
print("Attempting flaky operation...")
context.logger.info("Attempting flaky operation")

class FinalTask(Task):
def execute(self, context: ExecutionContext) -> None:
# this task runs even if FlakyTask failed
print("Running final step")
context.logger.info("Running final step")
```
```go Go
type RootTask struct{}
Expand Down Expand Up @@ -1111,23 +1123,23 @@ class Step1(Task):

class Step1A(Task):
def execute(self, context: ExecutionContext) -> None:
print("Step1A executed successfully")
context.logger.info("Step1A executed successfully")

class Step1B(Task):
def execute(self, context: ExecutionContext) -> None:
raise ValueError("something went wrong")

class Step1C(Task):
def execute(self, context: ExecutionContext) -> None:
print("This will be skipped because Step1B failed")
context.logger.info("This will be skipped because Step1B failed")

class Step2(Task):
def execute(self, context: ExecutionContext) -> None:
print("This will be skipped because Step1B failed")
context.logger.info("This will be skipped because Step1B failed")

class AlwaysRuns(Task):
def execute(self, context: ExecutionContext) -> None:
print("This runs regardless")
context.logger.info("This runs regardless")
```
```go Go
type Pipeline struct{}
Expand Down
Loading