Skip to content

Commit d0249f3

Browse files
add before_started, after_completed and after_failed functionality
1 parent 9966ea0 commit d0249f3

File tree

11 files changed

+641
-41
lines changed

11 files changed

+641
-41
lines changed

README.md

+39-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ end
113113
process = MyProcess.create_process Date.today, :option_1 => true
114114
```
115115

116-
_NOTE:_ The current implementation performs a naive check on the count of arguments.
116+
_NOTE:_ The current implementation performs a naïve check on the count of arguments.
117117

118118
Next, specify the tasks with their corresponding implementation methods, that make up the
119119
process, using the `task` method and providing the `method` to execute for the task.
@@ -291,6 +291,44 @@ module MyProcess
291291
end
292292
```
293293

294+
#### Before Process Started and After Process Completion or Failure
295+
296+
You may want to run further tasks asynchrously before or after a process has completed
297+
or failed. These tasks provide a way to execute logic independently of the process.
298+
299+
Specify these tasks using the `before_started`, `after_completed` or `after_failed` methods.
300+
301+
For example, using `after_completed` to set off another business process or `after_failed` to
302+
send an email to an operator.
303+
304+
```ruby
305+
module MyProcess
306+
extend Taskinator::Definition
307+
308+
# defines a process
309+
define_process do
310+
311+
# tasks, sub-process, etc.
312+
313+
# define task to execute on completion
314+
after_completed :further_process
315+
316+
# define task to execute on failure
317+
after_failed :email_operations
318+
319+
end
320+
321+
def further_process
322+
# ...
323+
end
324+
325+
def email_operations
326+
# ...
327+
end
328+
329+
end
330+
```
331+
294332
#### Complex Process Definitions
295333

296334
Any combination or nesting of `task`, `sequential`, `concurrent` and `for_each` steps are

lib/taskinator/builder.rb

+48-7
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,32 @@ def job(job, options={})
8383
nil
8484
end
8585

86-
# TODO: add mailer
87-
# TODO: add complete!
88-
# TODO: add fail!
86+
# defines a task which executes the given @method before the process has started
87+
def before_started(method, options={})
88+
raise ArgumentError, 'method' if method.nil?
89+
raise NoMethodError, method unless @executor.respond_to?(method)
90+
91+
define_before_started_task(@process, method, @args, options)
92+
nil
93+
end
94+
95+
# defines a task which executes the given @method after the process has completed
96+
def after_completed(method, options={})
97+
raise ArgumentError, 'method' if method.nil?
98+
raise NoMethodError, method unless @executor.respond_to?(method)
99+
100+
define_after_completed_task(@process, method, @args, options)
101+
nil
102+
end
103+
104+
# defines a task which executes the given @method after the process has failed
105+
def after_failed(method, options={})
106+
raise ArgumentError, 'method' if method.nil?
107+
raise NoMethodError, method unless @executor.respond_to?(method)
108+
109+
define_after_failed_task(@process, method, @args, options)
110+
nil
111+
end
89112

90113
# defines a sub process task, for the given @definition
91114
# the definition specified must have input compatible arguments
@@ -104,13 +127,31 @@ def sub_process(definition, options={})
104127
private
105128

106129
def define_step_task(process, method, args, options={})
107-
define_task(process) {
130+
add_task(process.tasks) {
108131
Task.define_step_task(process, method, args, combine_options(options))
109132
}
110133
end
111134

135+
def define_before_started_task(process, method, args, options={})
136+
add_task(process.before_started_tasks) {
137+
Task.define_hook_task(process, method, args, combine_options(options))
138+
}
139+
end
140+
141+
def define_after_completed_task(process, method, args, options={})
142+
add_task(process.after_completed_tasks) {
143+
Task.define_hook_task(process, method, args, combine_options(options))
144+
}
145+
end
146+
147+
def define_after_failed_task(process, method, args, options={})
148+
add_task(process.after_failed_tasks) {
149+
Task.define_hook_task(process, method, args, combine_options(options))
150+
}
151+
end
152+
112153
def define_job_task(process, job, args, options={})
113-
define_task(process) {
154+
add_task(process.tasks) {
114155
Task.define_job_task(process, job, args, combine_options(options))
115156
}
116157
end
@@ -119,8 +160,8 @@ def define_sub_process_task(process, sub_process, options={})
119160
Task.define_sub_process_task(process, sub_process, combine_options(options))
120161
end
121162

122-
def define_task(process)
123-
process.tasks << task = yield
163+
def add_task(list)
164+
list << task = yield
124165
task
125166
end
126167

lib/taskinator/persistence.rb

+99-30
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,19 @@ def visit_process(attribute)
274274
end
275275

276276
def visit_tasks(tasks)
277-
tasks.each do |task|
278-
RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
279-
@conn.rpush "#{@key}:tasks", task.uuid
280-
unless task.is_a?(Task::SubProcess)
281-
incr_task_count unless self == @base_visitor
282-
@base_visitor.incr_task_count
283-
end
284-
end
285-
@conn.set("#{@key}.count", tasks.count)
286-
@conn.set("#{@key}.pending", tasks.count)
277+
_visit_tasks(tasks)
278+
end
279+
280+
def visit_before_started_tasks(tasks)
281+
_visit_tasks(tasks, ':before_started')
282+
end
283+
284+
def visit_after_completed_tasks(tasks)
285+
_visit_tasks(tasks, ':after_completed')
286+
end
287+
288+
def visit_after_failed_tasks(tasks)
289+
_visit_tasks(tasks, ':after_failed')
287290
end
288291

289292
def visit_attribute(attribute)
@@ -333,6 +336,21 @@ def task_count
333336
def incr_task_count
334337
@task_count += 1
335338
end
339+
340+
private
341+
342+
def _visit_tasks(tasks, list='')
343+
tasks.each do |task|
344+
RedisSerializationVisitor.new(@conn, task, @base_visitor).visit
345+
@conn.rpush "#{@key}#{list}:tasks", task.uuid
346+
unless task.is_a?(Task::SubProcess)
347+
incr_task_count unless self == @base_visitor
348+
@base_visitor.incr_task_count
349+
end
350+
end
351+
@conn.set("#{@key}#{list}.count", tasks.count)
352+
@conn.set("#{@key}#{list}.pending", tasks.count)
353+
end
336354
end
337355

338356
class XmlSerializationVisitor < Taskinator::Visitor::Base
@@ -385,17 +403,19 @@ def visit_process(attribute)
385403
end
386404

387405
def visit_tasks(tasks)
388-
builder.tag!('tasks', :count => tasks.count) do |xml|
389-
tasks.each do |task|
390-
xml.tag!('task', :key => task.key) do |xml2|
391-
XmlSerializationVisitor.new(xml2, task, @base_visitor).visit
392-
unless task.is_a?(Task::SubProcess)
393-
incr_task_count unless self == @base_visitor
394-
@base_visitor.incr_task_count
395-
end
396-
end
397-
end
398-
end
406+
_visit_tasks(tasks)
407+
end
408+
409+
def visit_before_started_tasks(tasks)
410+
_visit_tasks(tasks, 'before_started')
411+
end
412+
413+
def visit_after_completed_tasks(tasks)
414+
_visit_tasks(tasks, 'after_completed')
415+
end
416+
417+
def visit_after_failed_tasks(tasks)
418+
_visit_tasks(tasks, 'after_failed')
399419
end
400420

401421
def visit_attribute(attribute)
@@ -445,6 +465,22 @@ def task_count
445465
def incr_task_count
446466
@task_count += 1
447467
end
468+
469+
private
470+
471+
def _visit_tasks(tasks, list='tasks')
472+
builder.tag!(list, :count => tasks.count) do |xml|
473+
tasks.each do |task|
474+
xml.tag!('task', :key => task.key) do |xml2|
475+
XmlSerializationVisitor.new(xml2, task, @base_visitor).visit
476+
unless task.is_a?(Task::SubProcess)
477+
incr_task_count unless self == @base_visitor
478+
@base_visitor.incr_task_count
479+
end
480+
end
481+
end
482+
end
483+
end
448484
end
449485

450486
class UnknownTypeError < StandardError
@@ -540,11 +576,19 @@ def visit_process(attribute)
540576
end
541577

542578
def visit_tasks(tasks)
543-
# tasks are a linked list, so just get the first one
544-
Taskinator.redis do |conn|
545-
uuid = conn.lindex("#{@key}:tasks", 0)
546-
tasks.attach(lazy_instance_for(Task, uuid), conn.get("#{@key}.count").to_i) if uuid
547-
end
579+
_visit_tasks(tasks)
580+
end
581+
582+
def visit_before_started_tasks(tasks)
583+
_visit_tasks(tasks, ':before_started')
584+
end
585+
586+
def visit_after_completed_tasks(tasks)
587+
_visit_tasks(tasks, ':after_completed')
588+
end
589+
590+
def visit_after_failed_tasks(tasks)
591+
_visit_tasks(tasks, ':after_failed')
548592
end
549593

550594
def visit_process_reference(attribute)
@@ -606,6 +650,14 @@ def visit_args(attribute)
606650

607651
private
608652

653+
def _visit_tasks(tasks, list='')
654+
# tasks are a linked list, so just get the first one
655+
Taskinator.redis do |conn|
656+
uuid = conn.lindex("#{@key}#{list}:tasks", 0)
657+
tasks.attach(lazy_instance_for(Task, uuid), conn.get("#{@key}#{list}.count").to_i) if uuid
658+
end
659+
end
660+
609661
#
610662
# creates a proxy for the instance which
611663
# will only fetch the instance when used
@@ -648,14 +700,31 @@ def visit_process(attribute)
648700
end
649701

650702
def visit_tasks(tasks)
651-
@conn.expire "#{@key}:tasks", expire_in
652-
@conn.expire "#{@key}.count", expire_in
653-
@conn.expire "#{@key}.pending", expire_in
703+
_visit_tasks(tasks)
704+
end
705+
706+
def visit_before_started_tasks(tasks)
707+
_visit_tasks(tasks, ':before_started')
708+
end
709+
710+
def visit_after_completed_tasks(tasks)
711+
_visit_tasks(tasks, ':after_completed')
712+
end
713+
714+
def visit_after_failed_tasks(tasks)
715+
_visit_tasks(tasks, ':after_failed')
716+
end
717+
718+
private
719+
720+
def _visit_tasks(tasks, list='')
721+
@conn.expire "#{@key}#{list}:tasks", expire_in
722+
@conn.expire "#{@key}#{list}.count", expire_in
723+
@conn.expire "#{@key}#{list}.pending", expire_in
654724
tasks.each do |task|
655725
RedisCleanupVisitor.new(@conn, task, expire_in).visit
656726
end
657727
end
658-
659728
end
660729

661730
# lazily loads the object specified by the type and uuid

lib/taskinator/process.rb

+24
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,18 @@ def tasks
5555
@tasks ||= Tasks.new
5656
end
5757

58+
def before_started_tasks
59+
@before_started_tasks ||= Tasks.new
60+
end
61+
62+
def after_completed_tasks
63+
@after_completed_tasks ||= Tasks.new
64+
end
65+
66+
def after_failed_tasks
67+
@after_failed_tasks ||= Tasks.new
68+
end
69+
5870
def no_tasks_defined?
5971
tasks.empty?
6072
end
@@ -64,6 +76,9 @@ def accept(visitor)
6476
visitor.visit_task_reference(:parent)
6577
visitor.visit_type(:definition)
6678
visitor.visit_tasks(tasks)
79+
visitor.visit_before_started_tasks(before_started_tasks)
80+
visitor.visit_after_completed_tasks(after_completed_tasks)
81+
visitor.visit_after_failed_tasks(after_failed_tasks)
6782
visitor.visit_args(:options)
6883
visitor.visit_attribute(:scope)
6984
visitor.visit_attribute(:queue)
@@ -92,6 +107,9 @@ def enqueue!
92107
def start!
93108
return if paused? || cancelled?
94109

110+
# enqueue before started tasks independently
111+
before_started_tasks.each(&:enqueue!)
112+
95113
transition(:processing) do
96114
instrument('taskinator.process.processing', processing_payload) do
97115
start
@@ -132,6 +150,9 @@ def complete!
132150
end
133151
end
134152
end
153+
154+
# enqueue completion tasks independently
155+
after_completed_tasks.each(&:enqueue!)
135156
end
136157

137158
# TODO: add retry method - to pick up from a failed task
@@ -159,6 +180,9 @@ def fail!(error)
159180
parent.fail!(error) unless parent.nil?
160181
end
161182
end
183+
184+
# enqueue completion tasks independently
185+
after_failed_tasks.each(&:enqueue!)
162186
end
163187

164188
def task_failed(task, error)

0 commit comments

Comments
 (0)