Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,35 @@
import io.dapr.durabletask.TaskFailedException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;

import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.time.Duration;

public class BookTripWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
List<String> compensations = new ArrayList<>();

// Define retry policy for compensation activities
WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setFirstRetryInterval(Duration.ofSeconds(1))
.setMaxNumberOfAttempts(3)
.build();

WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy);
CompensationHelper compensationHelper = new CompensationHelper();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siri-varma this is weird, because each Workflow instance will create its own CompensationHelper() so the helper is not durable itself.


try {
// Book flight
String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), null, String.class).await();
String flightResult = ctx.callActivity(
BookFlightActivity.class.getName(), null, String.class).await();
ctx.getLogger().info("Flight booking completed: {}", flightResult);
compensations.add("CancelFlight");
compensationHelper.addCompensation("CancelFlight", () ->
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siri-varma one of the main ideas of having a helper is to be able to store the input payload for the activity, so the same input can be used to call the compensation. In this case you are setting the compensation after the activity is called, so the compensation will never be called if the BookFlightActivity crashes, right?

ctx.callActivity(CancelFlightActivity.class.getName(), null, String.class).await());

// Book hotel
String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), null, String.class).await();
String hotelResult = ctx.callActivity(
BookHotelActivity.class.getName(), null, String.class).await();
ctx.getLogger().info("Hotel booking completed: {}", hotelResult);
compensations.add("CancelHotel");
compensationHelper.addCompensation("CancelHotel", () ->
ctx.callActivity(CancelHotelActivity.class.getName(), null, String.class).await());

// Book car
String carResult = ctx.callActivity(BookCarActivity.class.getName(), null, String.class).await();
String carResult = ctx.callActivity(
BookCarActivity.class.getName(), null, String.class).await();
ctx.getLogger().info("Car booking completed: {}", carResult);
compensations.add("CancelCar");
compensationHelper.addCompensation("CancelCar", () ->
ctx.callActivity(CancelCarActivity.class.getName(), null, String.class).await());
Comment thread
siri-varma marked this conversation as resolved.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siri-varma this example fails to show the use of the the helper to send the same input to the compensation activity.


String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult);
ctx.getLogger().info("Trip booked successfully: {}", result);
Expand All @@ -62,44 +53,7 @@ public WorkflowStub create() {
} catch (TaskFailedException e) {
ctx.getLogger().info("******** executing compensation logic ********");
ctx.getLogger().error("Activity failed: {}", e.getMessage());

// Execute compensations in reverse order
Collections.reverse(compensations);
for (String compensation : compensations) {
try {
switch (compensation) {
case "CancelCar":
String carCancelResult = ctx.callActivity(
CancelCarActivity.class.getName(),
null,
compensationOptions,
String.class).await();
ctx.getLogger().info("Car cancellation completed: {}", carCancelResult);
break;

case "CancelHotel":
String hotelCancelResult = ctx.callActivity(
CancelHotelActivity.class.getName(),
null,
compensationOptions,
String.class).await();
ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult);
break;

case "CancelFlight":
String flightCancelResult = ctx.callActivity(
CancelFlightActivity.class.getName(),
null,
compensationOptions,
String.class).await();
ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult);
break;
}
} catch (TaskFailedException ex) {
// Only catch TaskFailedException for actual activity failures
ctx.getLogger().error("Activity failed during compensation: {}", ex.getMessage());
}
}
compensationHelper.compensate();
Comment thread
siri-varma marked this conversation as resolved.
ctx.complete("Workflow failed, compensation applied");
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.workflows.compensation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class CompensationHelper {

private final Map<String, Runnable> compensations = new LinkedHashMap<>();

public void addCompensation(String name, Runnable compensation) {
compensations.put(name, compensation);
}

public void compensate() {
List<String> keys = new ArrayList<>(compensations.keySet());
Collections.reverse(keys);
for (String key : keys) {
compensations.get(key).run();
}
Comment thread
siri-varma marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.spring.workflows.config.EnableDaprWorkflows;
import io.dapr.springboot.examples.wfp.chain.ChainWorkflow;
import io.dapr.springboot.examples.wfp.compensation.BookTripWorkflow;
import io.dapr.springboot.examples.wfp.child.ParentWorkflow;
import io.dapr.springboot.examples.wfp.continueasnew.CleanUpLog;
import io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow;
Expand Down Expand Up @@ -191,6 +192,19 @@ public Decision suspendResumeContinue(@RequestParam("orderId") String orderId, @
return workflowInstanceStatus.readOutputAs(Decision.class);
}

/**
* Run Compensation Demo Workflow (Book Trip with Saga pattern).
* @return the output of the BookTripWorkflow execution
*/
@PostMapping("wfp/compensation")
public String compensation() throws TimeoutException {
String instanceId = daprWorkflowClient.scheduleNewWorkflow(BookTripWorkflow.class);
logger.info("Workflow instance " + instanceId + " started");
return daprWorkflowClient
.waitForWorkflowCompletion(instanceId, Duration.ofSeconds(30), true)
.readOutputAs(String.class);
}
Comment thread
siri-varma marked this conversation as resolved.
Comment thread
siri-varma marked this conversation as resolved.

@PostMapping("wfp/durationtimer")
public String durationTimerWorkflow() {
return daprWorkflowClient.scheduleNewWorkflow(DurationTimerWorkflow.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot.examples.wfp.compensation;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

import org.springframework.stereotype.Component;

@Component
public class BookCarActivity implements WorkflowActivity {
private static final Logger logger = LoggerFactory.getLogger(BookCarActivity.class);

@Override
public Object run(WorkflowActivityContext ctx) {
logger.info("Starting Activity: " + ctx.getName());

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Comment thread
siri-varma marked this conversation as resolved.
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}

logger.info("Forcing Failure to trigger compensation for activity: " + ctx.getName());
throw new RuntimeException("Failed to book car");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot.examples.wfp.compensation;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class BookFlightActivity implements WorkflowActivity {
private static final Logger logger = LoggerFactory.getLogger(BookFlightActivity.class);

@Override
public Object run(WorkflowActivityContext ctx) {
logger.info("Starting Activity: " + ctx.getName());

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Comment thread
siri-varma marked this conversation as resolved.
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}

String result = "Flight booked successfully";
logger.info("Activity completed with result: " + result);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot.examples.wfp.compensation;

import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class BookHotelActivity implements WorkflowActivity {
private static final Logger logger = LoggerFactory.getLogger(BookHotelActivity.class);

@Override
public Object run(WorkflowActivityContext ctx) {
logger.info("Starting Activity: " + ctx.getName());

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Comment thread
siri-varma marked this conversation as resolved.
logger.warn("Activity '{}' was interrupted.", ctx.getName(), e);
throw new RuntimeException("Activity was interrupted", e);
}

String result = "Hotel booked successfully";
logger.info("Activity completed with result: " + result);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot.examples.wfp.compensation;

import io.dapr.durabletask.TaskFailedException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Component
public class BookTripWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
List<String> compensations = new ArrayList<>();

WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setFirstRetryInterval(Duration.ofSeconds(1))
.setMaxNumberOfAttempts(3)
.build();

WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy);

try {
String flightResult = ctx.callActivity(
BookFlightActivity.class.getName(), null, String.class).await();
ctx.getLogger().info("Flight booking completed: {}", flightResult);
compensations.add("CancelFlight");

String hotelResult = ctx.callActivity(
BookHotelActivity.class.getName(), null, String.class).await();
ctx.getLogger().info("Hotel booking completed: {}", hotelResult);
Comment thread
siri-varma marked this conversation as resolved.
compensations.add("CancelHotel");

String carResult = ctx.callActivity(
BookCarActivity.class.getName(), null, String.class).await();
ctx.getLogger().info("Car booking completed: {}", carResult);
compensations.add("CancelCar");

String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult);
ctx.getLogger().info("Trip booked successfully: {}", result);
ctx.complete(result);

} catch (TaskFailedException e) {
ctx.getLogger().info("******** executing compensation logic ********");
ctx.getLogger().error("Activity failed", e);

Comment thread
siri-varma marked this conversation as resolved.
Collections.reverse(compensations);
for (String compensation : compensations) {
try {
switch (compensation) {
case "CancelCar":
String carCancelResult = ctx.callActivity(
CancelCarActivity.class.getName(), null, compensationOptions, String.class).await();
ctx.getLogger().info("Car cancellation completed: {}", carCancelResult);
break;
case "CancelHotel":
String hotelCancelResult = ctx.callActivity(
CancelHotelActivity.class.getName(), null, compensationOptions, String.class).await();
ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult);
break;
case "CancelFlight":
String flightCancelResult = ctx.callActivity(
CancelFlightActivity.class.getName(), null, compensationOptions, String.class).await();
ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult);
break;
default:
break;
}
} catch (TaskFailedException ex) {
ctx.getLogger().error("Activity failed during compensation", ex);
}
Comment thread
siri-varma marked this conversation as resolved.
}
ctx.complete("Workflow failed, compensation applied");
}
};
}
}
Loading
Loading