Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow import task

# INTRO: This examples shows how a task (in a linear/serial workflow) can
# produce an output that can be then consumed/used by a downstream task.


class TaskA(task.Task):
    default_provides = ['a']

    def execute(self):
        print("Executing '%s'" % (self.name))
        return 'a'


class TaskB(task.Task):
    def execute(self, a):
        print("Executing '%s'" % (self.name))
        print("Got input '%s'" % (a))


print("Constructing...")
wf = linear_flow.Flow("pass-from-to")
wf.add(TaskA('a'), TaskB('b'))

print("Loading...")
e = engines.load(wf)

print("Compiling...")
e.compile()

print("Preparing...")
e.prepare()

print("Running...")
e.run()

print("Done...")

Making phone calls

Note

Full source located at simple_linear.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: In this example we create two tasks, each of which ~calls~ a given
# ~phone~ number (provided as a function input) in a linear fashion (one after
# the other). For a workflow which is serial this shows a extremely simple way
# of structuring your tasks (the code that does the work) into a linear
# sequence (the flow) and then passing the work off to an engine, with some
# initial data to be ran in a reliable manner.
#
# NOTE(harlowja): This example shows a basic usage of the taskflow structures
# without involving the complexity of persistence. Using the structures that
# taskflow provides via tasks and flows makes it possible for you to easily at
# a later time hook in a persistence layer (and then gain the functionality
# that offers) when you decide the complexity of adding that layer in
# is 'worth it' for your applications usage pattern (which certain applications
# may not need).


class CallJim(task.Task):
    def execute(self, jim_number, *args, **kwargs):
        print("Calling jim %s." % jim_number)


class CallJoe(task.Task):
    def execute(self, joe_number, *args, **kwargs):
        print("Calling joe %s." % joe_number)


# Create your flow and associated tasks (the work to be done).
flow = lf.Flow('simple-linear').add(
    CallJim(),
    CallJoe()
)

# Now run that flow using the provided initial data (store below).
taskflow.engines.run(flow, store=dict(joe_number=444,
                                      jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: In this example we create three tasks, each of which ~calls~ a given
# number (provided as a function input), one of those tasks *fails* calling a
# given number (the suzzie calling); this causes the workflow to enter the
# reverting process, which activates the revert methods of the previous two
# phone ~calls~.
#
# This simulated calling makes it appear like all three calls occur or all
# three don't occur (transaction-like capabilities). No persistence layer is
# used here so reverting and executing will *not* be tolerant of process
# failure.


class CallJim(task.Task):
    def execute(self, jim_number, *args, **kwargs):
        print("Calling jim %s." % jim_number)

    def revert(self, jim_number, *args, **kwargs):
        print("Calling %s and apologizing." % jim_number)


class CallJoe(task.Task):
    def execute(self, joe_number, *args, **kwargs):
        print("Calling joe %s." % joe_number)

    def revert(self, joe_number, *args, **kwargs):
        print("Calling %s and apologizing." % joe_number)


class CallSuzzie(task.Task):
    def execute(self, suzzie_number, *args, **kwargs):
        raise IOError("Suzzie not home right now.")


# Create your flow and associated tasks (the work to be done).
flow = lf.Flow('simple-linear').add(
    CallJim(),
    CallJoe(),
    CallSuzzie()
)

try:
    # Now run that flow using the provided initial data (store below).
    taskflow.engines.run(flow, store=dict(joe_number=444,
                                          jim_number=555,
                                          suzzie_number=666))
except Exception as e:
    # NOTE(harlowja): This exception will be the exception that came out of the
    # 'CallSuzzie' task instead of a different exception, this is useful since
    # typically surrounding code wants to handle the original exception and not
    # a wrapped or altered one.
    #
    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
    # exceptions then the above exception would be wrapped into a combined
    # exception (the object has methods to iterate over the contained
    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
    # how to deal with multiple tasks failing while running.
    #
    # You will also note that this is not a problem in this case since no
    # parallelism is involved; this is ensured by the usage of a linear flow
    # and the default engine type which is 'serial' vs being 'parallel'.
    print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import logging
import os
import sys


logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)


import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task

import example_utils as eu  # noqa


# INTRO: This examples shows how a graph flow and linear flow can be used
# together to execute dependent & non-dependent tasks by going through the
# steps required to build a simplistic car (an assembly line if you will). It
# also shows how raw functions can be wrapped into a task object instead of
# being forced to use the more *heavy* task base class. This is useful in
# scenarios where pre-existing code has functions that you easily want to
# plug-in to taskflow, without requiring a large amount of code changes.


def build_frame():
    return 'steel'


def build_engine():
    return 'honda'


def build_doors():
    return '2'


def build_wheels():
    return '4'


# These just return true to indiciate success, they would in the real work
# do more than just that.

def install_engine(frame, engine):
    return True


def install_doors(frame, windows_installed, doors):
    return True


def install_windows(frame, doors):
    return True


def install_wheels(frame, engine, engine_installed, wheels):
    return True


def trash(**kwargs):
    eu.print_wrapped("Throwing away pieces of car!")


def startup(**kwargs):
    # If you want to see the rollback function being activated try uncommenting
    # the following line.
    #
    # raise ValueError("Car not verified")
    return True


def verify(spec, **kwargs):
    # If the car is not what we ordered throw away the car (trigger reversion).
    for key, value in kwargs.items():
        if spec[key] != value:
            raise Exception("Car doesn't match spec!")
    return True


# These two functions connect into the state transition notification emission
# points that the engine outputs, they can be used to log state transitions
# that are occurring, or they can be used to suspend the engine (or perform
# other useful activities).
def flow_watch(state, details):
    print('Flow => %s' % state)


def task_watch(state, details):
    print('Task %s => %s' % (details.get('task_name'), state))


flow = lf.Flow("make-auto").add(
    task.FunctorTask(startup, revert=trash, provides='ran'),
    # A graph flow allows automatic dependency based ordering, the ordering
    # is determined by analyzing the symbols required and provided and ordering
    # execution based on a functioning order (if one exists).
    gf.Flow("install-parts").add(
        task.FunctorTask(build_frame, provides='frame'),
        task.FunctorTask(build_engine, provides='engine'),
        task.FunctorTask(build_doors, provides='doors'),
        task.FunctorTask(build_wheels, provides='wheels'),
        # These *_installed outputs allow for other tasks to depend on certain
        # actions being performed (aka the components were installed), another
        # way to do this is to link() the tasks manually instead of creating
        # an 'artificial' data dependency that accomplishes the same goal the
        # manual linking would result in.
        task.FunctorTask(install_engine, provides='engine_installed'),
        task.FunctorTask(install_doors, provides='doors_installed'),
        task.FunctorTask(install_windows, provides='windows_installed'),
        task.FunctorTask(install_wheels, provides='wheels_installed')),
    task.FunctorTask(verify, requires=['frame',
                                       'engine',
                                       'doors',
                                       'wheels',
                                       'engine_installed',
                                       'doors_installed',
                                       'windows_installed',
                                       'wheels_installed']))

# This dictionary will be provided to the tasks as a specification for what
# the tasks should produce, in this example this specification will influence
# what those tasks do and what output they create. Different tasks depend on
# different information from this specification, all of which will be provided
# automatically by the engine to those tasks.
spec = {
    "frame": 'steel',
    "engine": 'honda',
    "doors": '2',
    "wheels": '4',
    # These are used to compare the result product, a car without the pieces
    # installed is not a car after all.
    "engine_installed": True,
    "doors_installed": True,
    "windows_installed": True,
    "wheels_installed": True,
}


engine = taskflow.engines.load(flow, store={'spec': spec.copy()})

# This registers all (*) state transitions to trigger a call to the flow_watch
# function for flow state transitions, and registers the same all (*) state
# transitions for task state transitions.
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)

eu.print_wrapped("Building a car")
engine.run()

# Alter the specification and ensure that the reverting logic gets triggered
# since the resultant car that will be built by the build_wheels function will
# build a car with 4 doors only (not 5), this will cause the verification
# task to mark the car that is produced as not matching the desired spec.
spec['doors'] = 5

engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)

eu.print_wrapped("Building a wrong car that doesn't match specification")
try:
    engine.run()
except Exception as e:
    eu.print_wrapped("Flow failed: %s" % e)

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


# INTRO: In this example a linear flow is used to group four tasks to calculate
# a value. A single added task is used twice, showing how this can be done
# and the twice added task takes in different bound values. In the first case
# it uses default parameters ('x' and 'y') and in the second case arguments
# are bound with ('z', 'd') keys from the engines internal storage mechanism.
#
# A multiplier task uses a binding that another task also provides, but this
# example explicitly shows that 'z' parameter is bound with 'a' key
# This shows that if a task depends on a key named the same as a key provided
# from another task the name can be remapped to take the desired key from a
# different origin.


# This task provides some values from as a result of execution, this can be
# useful when you want to provide values from a static set to other tasks that
# depend on those values existing before those tasks can run.
#
# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
# that just provides those values on engine running by prepopulating the
# storage backend before your tasks are ran (which accomplishes a similar goal
# in a more uniform manner).
class Provider(task.Task):

    def __init__(self, name, *args, **kwargs):
        super(Provider, self).__init__(name=name, **kwargs)
        self._provide = args

    def execute(self):
        return self._provide


# This task adds two input variables and returns the result.
#
# Note that since this task does not have a revert() function (since addition
# is a stateless operation) there are no side-effects that this function needs
# to undo if some later operation fails.
class Adder(task.Task):
    def execute(self, x, y):
        return x + y


# This task multiplies an input variable by a multiplier and returns the
# result.
#
# Note that since this task does not have a revert() function (since
# multiplication is a stateless operation) and there are no side-effects that
# this function needs to undo if some later operation fails.
class Multiplier(task.Task):
    def __init__(self, name, multiplier, provides=None, rebind=None):
        super(Multiplier, self).__init__(name=name, provides=provides,
                                         rebind=rebind)
        self._multiplier = multiplier

    def execute(self, z):
        return z * self._multiplier


# Note here that the ordering is established so that the correct sequences
# of operations occurs where the adding and multiplying is done according
# to the expected and typical mathematical model. A graph flow could also be
# used here to automatically infer & ensure the correct ordering.
flow = lf.Flow('root').add(
    # Provide the initial values for other tasks to depend on.
    #
    # x = 2, y = 3, d = 5
    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
    # z = x+y = 5
    Adder("add-1", provides='z'),
    # a = z+d = 10
    Adder("add-2", provides='a', rebind=['z', 'd']),
    # Calculate 'r = a*3 = 30'
    #
    # Note here that the 'z' argument of the execute() function will not be
    # bound to the 'z' variable provided from the above 'provider' object but
    # instead the 'z' argument will be taken from the 'a' variable provided
    # by the second add-2 listed above.
    Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
)

# The result here will be all results (from all tasks) which is stored in an
# in-memory storage location that backs this engine since it is not configured
# with persistence storage.
results = taskflow.engines.run(flow)
print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task


# In this example there are complex *inferred* dependencies between tasks that
# are used to perform a simple set of linear equations.
#
# As you will see below the tasks just define what they require as input
# and produce as output (named values). Then the user doesn't care about
# ordering the tasks (in this case the tasks calculate pieces of the overall
# equation).
#
# As you will notice a graph flow resolves dependencies automatically using the
# tasks symbol requirements and provided symbol values and no orderin
# dependency has to be manually created.
#
# Also notice that flows of any types can be nested into a graph flow; showing
# that subflow dependencies (and associated ordering) will be inferred too.


class Adder(task.Task):

    def execute(self, x, y):
        return x + y


flow = gf.Flow('root').add(
    lf.Flow('nested_linear').add(
        # x2 = y3+y4 = 12
        Adder("add2", provides='x2', rebind=['y3', 'y4']),
        # x1 = y1+y2 = 4
        Adder("add1", provides='x1', rebind=['y1', 'y2'])
    ),
    # x5 = x1+x3 = 20
    Adder("add5", provides='x5', rebind=['x1', 'x3']),
    # x3 = x1+x2 = 16
    Adder("add3", provides='x3', rebind=['x1', 'x2']),
    # x4 = x2+y5 = 21
    Adder("add4", provides='x4', rebind=['x2', 'y5']),
    # x6 = x5+x4 = 41
    Adder("add6", provides='x6', rebind=['x5', 'x4']),
    # x7 = x6+x6 = 82
    Adder("add7", provides='x7', rebind=['x6', 'x6']))

# Provide the initial variable inputs using a storage dictionary.
store = {
    "y1": 1,
    "y2": 3,
    "y3": 5,
    "y4": 7,
    "y5": 9,
}

result = taskflow.engines.run(
    flow, engine_conf='serial', store=store)

print("Single threaded engine result %s" % result)

result = taskflow.engines.run(
    flow, engine_conf='parallel', store=store)

print("Multi threaded engine result %s" % result)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: This examples shows how a linear flow and a unordered flow can be
# used together to execute calculations in parallel and then use the
# result for the next task/s. The adder task is used for all calculations
# and argument bindings are used to set correct parameters for each task.


# This task provides some values from as a result of execution, this can be
# useful when you want to provide values from a static set to other tasks that
# depend on those values existing before those tasks can run.
#
# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
# that provides those values on engine running by prepopulating the storage
# backend before your tasks are ran (which accomplishes a similar goal in a
# more uniform manner).
class Provider(task.Task):
    def __init__(self, name, *args, **kwargs):
        super(Provider, self).__init__(name=name, **kwargs)
        self._provide = args

    def execute(self):
        return self._provide


# This task adds two input variables and returns the result of that addition.
#
# Note that since this task does not have a revert() function (since addition
# is a stateless operation) there are no side-effects that this function needs
# to undo if some later operation fails.
class Adder(task.Task):
    def execute(self, x, y):
        return x + y


flow = lf.Flow('root').add(
    # Provide the initial values for other tasks to depend on.
    #
    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
    Provider("provide-adder", 2, 3, 5, 8,
             provides=('x1', 'y1', 'x2', 'y2')),
    # Note here that we define the flow that contains the 2 adders to be an
    # unordered flow since the order in which these execute does not matter,
    # another way to solve this would be to use a graph_flow pattern, which
    # also can run in parallel (since they have no ordering dependencies).
    uf.Flow('adders').add(
        # Calculate 'z1 = x1+y1 = 5'
        #
        # Rebind here means that the execute() function x argument will be
        # satisfied from a previous output named 'x1', and the y argument
        # of execute() will be populated from the previous output named 'y1'
        #
        # The output (result of adding) will be mapped into a variable named
        # 'z1' which can then be refereed to and depended on by other tasks.
        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
        # z2 = x2+y2 = 13
        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
    ),
    # r = z1+z2 = 18
    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))


# The result here will be all results (from all tasks) which is stored in an
# in-memory storage location that backs this engine since it is not configured
# with persistence storage.
result = taskflow.engines.run(flow, engine_conf='parallel')
print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import contextlib
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import reflection

# INTRO: This examples shows how unordered_flow can be used to create a large
# number of fake volumes in parallel (or serially, depending on a constant that
# can be easily changed).


@contextlib.contextmanager
def show_time(name):
    start = time.time()
    yield
    end = time.time()
    print(" -- %s took %0.3f seconds" % (name, end - start))


# This affects how many volumes to create and how much time to *simulate*
# passing for that volume to be created.
MAX_CREATE_TIME = 3
VOLUME_COUNT = 5

# This will be used to determine if all the volumes are created in parallel
# or whether the volumes are created serially (in an undefined ordered since
# a unordered flow is used). Note that there is a disconnection between the
# ordering and the concept of parallelism (since unordered items can still be
# ran in a serial ordering). A typical use-case for offering both is to allow
# for debugging using a serial approach, while when running at a larger scale
# one would likely want to use the parallel approach.
#
# If you switch this flag from serial to parallel you can see the overall
# time difference that this causes.
SERIAL = False
if SERIAL:
    engine_conf = {
        'engine': 'serial',
    }
else:
    engine_conf = {
        'engine': 'parallel',
    }


class VolumeCreator(task.Task):
    def __init__(self, volume_id):
        # Note here that the volume name is composed of the name of the class
        # along with the volume id that is being created, since a name of a
        # task uniquely identifies that task in storage it is important that
        # the name be relevant and identifiable if the task is recreated for
        # subsequent resumption (if applicable).
        #
        # UUIDs are *not* used as they can not be tied back to a previous tasks
        # state on resumption (since they are unique and will vary for each
        # task that is created). A name based off the volume id that is to be
        # created is more easily tied back to the original task so that the
        # volume create can be resumed/revert, and is much easier to use for
        # audit and tracking purposes.
        base_name = reflection.get_callable_name(self)
        super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
                                                            volume_id))
        self._volume_id = volume_id

    def execute(self):
        print("Making volume %s" % (self._volume_id))
        time.sleep(random.random() * MAX_CREATE_TIME)
        print("Finished making volume %s" % (self._volume_id))


# Assume there is no ordering dependency between volumes.
flow = uf.Flow("volume-maker")
for i in range(0, VOLUME_COUNT):
    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))


# Show how much time the overall engine loading and running takes.
with show_time(name=flow.name.title()):
    eng = engines.load(flow, engine_conf=engine_conf)
    # This context manager automatically adds (and automatically removes) a
    # helpful set of state transition notification printing helper utilities
    # that show you exactly what transitions the engine is going through
    # while running the various volume create tasks.
    with printing.PrintingListener(eng):
        eng.run()

Storing & emitting a bill

Note

Full source located at fake_billing

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import json
import logging
import os
import sys
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)


from taskflow import engines
from taskflow.listeners import printing
from taskflow.openstack.common import uuidutils
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.utils import misc

# INTRO: This example walks through a miniature workflow which simulates a
# the reception of a API request, creation of a database entry, driver
# activation (which invokes a 'fake' webservice) and final completion.
#
# This example also shows how a function/object (in this class the url sending)
# that occurs during driver activation can update the progress of a task
# without being aware of the internals of how to do this by associating a
# callback that the url sending can update as the sending progresses from 0.0%
# complete to 100% complete.


class DB(object):
    def query(self, sql):
        print("Querying with: %s" % (sql))


class UrlCaller(object):
    def __init__(self):
        self._send_time = 0.5
        self._chunks = 25

    def send(self, url, data, status_cb=None):
        sleep_time = float(self._send_time) / self._chunks
        for i in range(0, len(data)):
            time.sleep(sleep_time)
            # As we send the data, each chunk we 'fake' send will progress
            # the sending progress that much further to 100%.
            if status_cb:
                status_cb(float(i) / len(data))


# Since engines save the output of tasks to a optional persistent storage
# backend resources have to be dealt with in a slightly different manner since
# resources are transient and can *not* be persisted (or serialized). For tasks
# that require access to a set of resources it is a common pattern to provide
# a object (in this case this object) on construction of those tasks via the
# task constructor.
class ResourceFetcher(object):
    def __init__(self):
        self._db_handle = None
        self._url_handle = None

    @property
    def db_handle(self):
        if self._db_handle is None:
            self._db_handle = DB()
        return self._db_handle

    @property
    def url_handle(self):
        if self._url_handle is None:
            self._url_handle = UrlCaller()
        return self._url_handle


class ExtractInputRequest(task.Task):
    def __init__(self, resources):
        super(ExtractInputRequest, self).__init__(provides="parsed_request")
        self._resources = resources

    def execute(self, request):
        return {
            'user': request.user,
            'user_id': misc.as_int(request.id),
            'request_id': uuidutils.generate_uuid(),
        }


class MakeDBEntry(task.Task):
    def __init__(self, resources):
        super(MakeDBEntry, self).__init__()
        self._resources = resources

    def execute(self, parsed_request):
        db_handle = self._resources.db_handle
        db_handle.query("INSERT %s INTO mydb" % (parsed_request))

    def revert(self, result, parsed_request):
        db_handle = self._resources.db_handle
        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))


class ActivateDriver(task.Task):
    def __init__(self, resources):
        super(ActivateDriver, self).__init__(provides='sent_to')
        self._resources = resources
        self._url = "http://blahblah.com"

    def execute(self, parsed_request):
        print("Sending billing data to %s" % (self._url))
        url_sender = self._resources.url_handle
        # Note that here we attach our update_progress function (which is a
        # function that the engine also 'binds' to) to the progress function
        # that the url sending helper class uses. This allows the task progress
        # to be tied to the url sending progress, which is very useful for
        # downstream systems to be aware of what a task is doing at any time.
        url_sender.send(self._url, json.dumps(parsed_request),
                        status_cb=self.update_progress)
        return self._url

    def update_progress(self, progress, **kwargs):
        # Override the parent method to also print out the status.
        super(ActivateDriver, self).update_progress(progress, **kwargs)
        print("%s is %0.2f%% done" % (self.name, progress * 100))


class DeclareSuccess(task.Task):
    def execute(self, sent_to):
        print("Done!")
        print("All data processed and sent to %s" % (sent_to))


# Resources (db handles and similar) of course can *not* be persisted so we
# need to make sure that we pass this resource fetcher to the tasks constructor
# so that the tasks have access to any needed resources (the resources are
# lazily loaded so that they are only created when they are used).
resources = ResourceFetcher()
flow = lf.Flow("initialize-me")

# 1. First we extract the api request into a usable format.
# 2. Then we go ahead and make a database entry for our request.
flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))

# 3. Then we activate our payment method and finally declare success.
sub_flow = gf.Flow("after-initialize")
sub_flow.add(ActivateDriver(resources), DeclareSuccess())
flow.add(sub_flow)

# Initially populate the storage with the following request object,
# prepopulating this allows the tasks that dependent on the 'request' variable
# to start processing (in this case this is the ExtractInputRequest task).
store = {
    'request': misc.AttrDict(user="bob", id="1.35"),
}
eng = engines.load(flow, engine_conf='serial', store=store)

# This context manager automatically adds (and automatically removes) a
# helpful set of state transition notification printing helper utilities
# that show you exactly what transitions the engine is going through
# while running the various billing related tasks.
with printing.PrintingListener(eng):
    eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.utils import persistence_utils as p_utils

import example_utils as eu  # noqa

# INTRO: In this example linear_flow is used to group three tasks, one which
# will suspend the future work the engine may do. This suspend engine is then
# discarded and the workflow is reloaded from the persisted data and then the
# workflow is resumed from where it was suspended. This allows you to see how
# to start an engine, have a task stop the engine from doing future work (if
# a multi-threaded engine is being used, then the currently active work is not
# preempted) and then resume the work later.
#
# Usage:
#
#   With a filesystem directory as backend
#
#     python taskflow/examples/resume_from_backend.py
#
#   With ZooKeeper as backend
#
#     python taskflow/examples/resume_from_backend.py \
#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/


# UTILITY FUNCTIONS #########################################


def print_task_states(flowdetail, msg):
    eu.print_wrapped(msg)
    print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
    # Sort by these so that our test validation doesn't get confused by the
    # order in which the items in the flow detail can be in.
    items = sorted((td.name, td.version, td.state, td.results)
                   for td in flowdetail)
    for item in items:
        print(" %s==%s: %s, result=%s" % item)


def find_flow_detail(backend, lb_id, fd_id):
    conn = backend.get_connection()
    lb = conn.get_logbook(lb_id)
    return lb.find(fd_id)


# CREATE FLOW ###############################################


class InterruptTask(task.Task):
    def execute(self):
        # DO NOT TRY THIS AT HOME
        engine.suspend()


class TestTask(task.Task):
    def execute(self):
        print('executing %s' % self)
        return 'ok'


def flow_factory():
    return lf.Flow('resume from backend example').add(
        TestTask(name='first'),
        InterruptTask(name='boom'),
        TestTask(name='second'))


# INITIALIZE PERSISTENCE ####################################

with eu.get_backend() as backend:
    logbook = p_utils.temporary_log_book(backend)

    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################

    flow = flow_factory()
    flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
    engine = taskflow.engines.load(flow, flow_detail=flowdetail,
                                   backend=backend)

    print_task_states(flowdetail, "At the beginning, there is no state")
    eu.print_wrapped("Running")
    engine.run()
    print_task_states(flowdetail, "After running")

    # RE-CREATE, RESUME, RUN ####################################

    eu.print_wrapped("Resuming and running again")

    # NOTE(harlowja): reload the flow detail from backend, this will allow us
    # to resume the flow from its suspended state, but first we need to search
    # for the right flow details in the correct logbook where things are
    # stored.
    #
    # We could avoid re-loading the engine and just do engine.run() again, but
    # this example shows how another process may unsuspend a given flow and
    # start it again for situations where this is useful to-do (say the process
    # running the above flow crashes).
    flow2 = flow_factory()
    flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid)
    engine2 = taskflow.engines.load(flow2,
                                    flow_detail=flowdetail2,
                                    backend=backend)
    engine2.run()
    print_task_states(flowdetail2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import contextlib
import hashlib
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow import exceptions as exc
from taskflow.openstack.common import uuidutils
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.utils import eventlet_utils as e_utils
from taskflow.utils import persistence_utils as p_utils

import example_utils as eu  # noqa

# INTRO: This examples shows how a hierarchy of flows can be used to create a
# vm in a reliable & resumable manner using taskflow + a miniature version of
# what nova does while booting a vm.


@contextlib.contextmanager
def slow_down(how_long=0.5):
    try:
        yield how_long
    finally:
        if len(sys.argv) > 1:
            # Only both to do this if user input provided.
            print("** Ctrl-c me please!!! **")
            time.sleep(how_long)


class PrintText(task.Task):
    """Just inserts some text print outs in a workflow."""
    def __init__(self, print_what, no_slow=False):
        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
        self._text = print_what
        self._no_slow = no_slow

    def execute(self):
        if self._no_slow:
            eu.print_wrapped(self._text)
        else:
            with slow_down():
                eu.print_wrapped(self._text)


class DefineVMSpec(task.Task):
    """Defines a vm specification to be."""
    def __init__(self, name):
        super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)

    def execute(self):
        return {
            'type': 'kvm',
            'disks': 2,
            'vcpu': 1,
            'ips': 1,
            'volumes': 3,
        }


class LocateImages(task.Task):
    """Locates where the vm images are."""
    def __init__(self, name):
        super(LocateImages, self).__init__(provides='image_locations',
                                           name=name)

    def execute(self, vm_spec):
        image_locations = {}
        for i in range(0, vm_spec['disks']):
            url = "http://www.yahoo.com/images/%s" % (i)
            image_locations[url] = "/tmp/%s.img" % (i)
        return image_locations


class DownloadImages(task.Task):
    """Downloads all the vm images."""
    def __init__(self, name):
        super(DownloadImages, self).__init__(provides='download_paths',
                                             name=name)

    def execute(self, image_locations):
        for src, loc in image_locations.items():
            with slow_down(1):
                print("Downloading from %s => %s" % (src, loc))
        return sorted(image_locations.values())


class CreateNetworkTpl(task.Task):
    """Generates the network settings file to be placed in the images."""
    SYSCONFIG_CONTENTS = """DEVICE=eth%s
BOOTPROTO=static
IPADDR=%s
ONBOOT=yes"""

    def __init__(self, name):
        super(CreateNetworkTpl, self).__init__(provides='network_settings',
                                               name=name)

    def execute(self, ips):
        settings = []
        for i, ip in enumerate(ips):
            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
        return settings


class AllocateIP(task.Task):
    """Allocates the ips for the given vm."""
    def __init__(self, name):
        super(AllocateIP, self).__init__(provides='ips', name=name)

    def execute(self, vm_spec):
        ips = []
        for i in range(0, vm_spec.get('ips', 0)):
            ips.append("192.168.0.%s" % (random.randint(1, 254)))
        return ips


class WriteNetworkSettings(task.Task):
    """Writes all the network settings into the downloaded images."""
    def execute(self, download_paths, network_settings):
        for j, path in enumerate(download_paths):
            with slow_down(1):
                print("Mounting %s to /tmp/%s" % (path, j))
            for i, setting in enumerate(network_settings):
                filename = ("/tmp/etc/sysconfig/network-scripts/"
                            "ifcfg-eth%s" % (i))
                with slow_down(1):
                    print("Writing to %s" % (filename))
                    print(setting)


class BootVM(task.Task):
    """Fires off the vm boot operation."""
    def execute(self, vm_spec):
        print("Starting vm!")
        with slow_down(1):
            print("Created: %s" % (vm_spec))


class AllocateVolumes(task.Task):
    """Allocates the volumes for the vm."""
    def execute(self, vm_spec):
        volumes = []
        for i in range(0, vm_spec['volumes']):
            with slow_down(1):
                volumes.append("/dev/vda%s" % (i + 1))
                print("Allocated volume %s" % volumes[-1])
        return volumes


class FormatVolumes(task.Task):
    """Formats the volumes for the vm."""
    def execute(self, volumes):
        for v in volumes:
            print("Formatting volume %s" % v)
            with slow_down(1):
                pass
            print("Formatted volume %s" % v)


def create_flow():
    # Setup the set of things to do (mini-nova).
    flow = lf.Flow("root").add(
        PrintText("Starting vm creation.", no_slow=True),
        lf.Flow('vm-maker').add(
            # First create a specification for the final vm to-be.
            DefineVMSpec("define_spec"),
            # This does all the image stuff.
            gf.Flow("img-maker").add(
                LocateImages("locate_images"),
                DownloadImages("download_images"),
            ),
            # This does all the network stuff.
            gf.Flow("net-maker").add(
                AllocateIP("get_my_ips"),
                CreateNetworkTpl("fetch_net_settings"),
                WriteNetworkSettings("write_net_settings"),
            ),
            # This does all the volume stuff.
            gf.Flow("volume-maker").add(
                AllocateVolumes("allocate_my_volumes", provides='volumes'),
                FormatVolumes("volume_formatter"),
            ),
            # Finally boot it all.
            BootVM("boot-it"),
        ),
        # Ya it worked!
        PrintText("Finished vm create.", no_slow=True),
        PrintText("Instance is running!", no_slow=True))
    return flow

eu.print_wrapped("Initializing")

# Setup the persistence & resumption layer.
with eu.get_backend() as backend:
    try:
        book_id, flow_id = sys.argv[2].split("+", 1)
        if not uuidutils.is_uuid_like(book_id):
            book_id = None
        if not uuidutils.is_uuid_like(flow_id):
            flow_id = None
    except (IndexError, ValueError):
        book_id = None
        flow_id = None

    # Set up how we want our engine to run, serial, parallel...
    engine_conf = {
        'engine': 'parallel',
    }
    if e_utils.EVENTLET_AVAILABLE:
        engine_conf['executor'] = e_utils.GreenExecutor(5)

    # Create/fetch a logbook that will track the workflows work.
    book = None
    flow_detail = None
    if all([book_id, flow_id]):
        with contextlib.closing(backend.get_connection()) as conn:
            try:
                book = conn.get_logbook(book_id)
                flow_detail = book.find(flow_id)
            except exc.NotFound:
                pass
    if book is None and flow_detail is None:
        book = p_utils.temporary_log_book(backend)
        engine = engines.load_from_factory(create_flow,
                                           backend=backend, book=book,
                                           engine_conf=engine_conf)
        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
                                                   engine.storage.flow_uuid))
        print("!! Please submit this on later runs for tracking purposes")
    else:
        # Attempt to load from a previously partially completed flow.
        engine = engines.load_from_detail(flow_detail,
                                          backend=backend,
                                          engine_conf=engine_conf)

    # Make me my vm please!
    eu.print_wrapped('Running')
    engine.run()

# How to use.
#
# 1. $ python me.py "sqlite:////tmp/nova.db"
# 2. ctrl-c before this finishes
# 3. Find the tracking id (search for 'Your tracking id is')
# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
# 5. Watch it pick up where it left off.
# 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import contextlib
import hashlib
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.utils import persistence_utils as p_utils

import example_utils  # noqa

# INTRO: This examples shows how a hierarchy of flows can be used to create a
# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
# version of what cinder does while creating a volume (very miniature).


@contextlib.contextmanager
def slow_down(how_long=0.5):
    try:
        yield how_long
    finally:
        print("** Ctrl-c me please!!! **")
        time.sleep(how_long)


def find_flow_detail(backend, book_id, flow_id):
    # NOTE(harlowja): this is used to attempt to find a given logbook with
    # a given id and a given flow details inside that logbook, we need this
    # reference so that we can resume the correct flow (as a logbook tracks
    # flows and a flow detail tracks a individual flow).
    #
    # Without a reference to the logbook and the flow details in that logbook
    # we will not know exactly what we should resume and that would mean we
    # can't resume what we don't know.
    with contextlib.closing(backend.get_connection()) as conn:
        lb = conn.get_logbook(book_id)
        return lb.find(flow_id)


class PrintText(task.Task):
    def __init__(self, print_what, no_slow=False):
        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
        self._text = print_what
        self._no_slow = no_slow

    def execute(self):
        if self._no_slow:
            print("-" * (len(self._text)))
            print(self._text)
            print("-" * (len(self._text)))
        else:
            with slow_down():
                print("-" * (len(self._text)))
                print(self._text)
                print("-" * (len(self._text)))


class CreateSpecForVolumes(task.Task):
    def execute(self):
        volumes = []
        for i in range(0, random.randint(1, 10)):
            volumes.append({
                'type': 'disk',
                'location': "/dev/vda%s" % (i + 1),
            })
        return volumes


class PrepareVolumes(task.Task):
    def execute(self, volume_specs):
        for v in volume_specs:
            with slow_down():
                print("Dusting off your hard drive %s" % (v))
            with slow_down():
                print("Taking a well deserved break.")
            print("Your drive %s has been certified." % (v))


# Setup the set of things to do (mini-cinder).
flow = lf.Flow("root").add(
    PrintText("Starting volume create", no_slow=True),
    gf.Flow('maker').add(
        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
        PrintText("I need a nap, it took me a while to build those specs."),
        PrepareVolumes(),
    ),
    PrintText("Finished volume create", no_slow=True))

# Setup the persistence & resumption layer.
with example_utils.get_backend() as backend:
    try:
        book_id, flow_id = sys.argv[2].split("+", 1)
    except (IndexError, ValueError):
        book_id = None
        flow_id = None

    if not all([book_id, flow_id]):
        # If no 'tracking id' (think a fedex or ups tracking id) is provided
        # then we create one by creating a logbook (where flow details are
        # stored) and creating a flow detail (where flow and task state is
        # stored). The combination of these 2 objects unique ids (uuids) allows
        # the users of taskflow to reassociate the workflows that were
        # potentially running (and which may have partially completed) back
        # with taskflow so that those workflows can be resumed (or reverted)
        # after a process/thread/engine has failed in someway.
        logbook = p_utils.temporary_log_book(backend)
        flow_detail = p_utils.create_flow_detail(flow, logbook, backend)
        print("!! Your tracking id is: '%s+%s'" % (logbook.uuid,
                                                   flow_detail.uuid))
        print("!! Please submit this on later runs for tracking purposes")
    else:
        flow_detail = find_flow_detail(backend, book_id, flow_id)

    # Load and run.
    engine_conf = {
        'engine': 'serial',
    }
    engine = engines.load(flow,
                          flow_detail=flow_detail,
                          backend=backend,
                          engine_conf=engine_conf)
    engine.run()

# How to use.
#
# 1. $ python me.py "sqlite:////tmp/cinder.db"
# 2. ctrl-c before this finishes
# 3. Find the tracking id (search for 'Your tracking id is')
# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
# 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import logging
import os
import sys

import six

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)


from taskflow.engines.action_engine import engine
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import task
from taskflow.utils import persistence_utils


# INTRO: This examples shows how to run a set of engines at the same time, each
# running in different engines using a single thread of control to iterate over
# each engine (which causes that engine to advanced to its next state during
# each iteration).


class EchoTask(task.Task):
    def execute(self, value):
        print(value)
        return chr(ord(value) + 1)


def make_alphabet_flow(i):
    f = lf.Flow("alphabet_%s" % (i))
    start_value = 'A'
    end_value = 'Z'
    curr_value = start_value
    while ord(curr_value) <= ord(end_value):
        next_value = chr(ord(curr_value) + 1)
        if curr_value != end_value:
            f.add(EchoTask(name="echoer_%s" % curr_value,
                           rebind={'value': curr_value},
                           provides=next_value))
        else:
            f.add(EchoTask(name="echoer_%s" % curr_value,
                           rebind={'value': curr_value}))
        curr_value = next_value
    return f


# Adjust this number to change how many engines/flows run at once.
flow_count = 1
flows = []
for i in range(0, flow_count):
    f = make_alphabet_flow(i + 1)
    flows.append(make_alphabet_flow(i + 1))
be = impl_memory.MemoryBackend({})
book = persistence_utils.temporary_log_book(be)
engines = []
for f in flows:
    fd = persistence_utils.create_flow_detail(f, book, be)
    e = engine.SingleThreadedActionEngine(f, fd, be, {})
    e.compile()
    e.storage.inject({'A': 'A'})
    e.prepare()
    engines.append(e)
engine_iters = []
for e in engines:
    engine_iters.append(e.run_iter())
while engine_iters:
    for it in list(engine_iters):
        try:
            print(six.next(it))
        except StopIteration:
            engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import retry
from taskflow import task

# INTRO: In this example we create a retry controller that receives a phone
# directory and tries different phone numbers. The next task tries to call Jim
# using the given number. If if is not a Jim's number, the tasks raises an
# exception and retry controller takes the next number from the phone
# directory and retries the call.
#
# This example shows a basic usage of retry controllers in a flow.
# Retry controllers allows to revert and retry a failed subflow with new
# parameters.


class CallJim(task.Task):
    def execute(self, jim_number):
        print ("Calling jim %s." % jim_number)
        if jim_number != 555:
            raise Exception("Wrong number!")
        else:
            print ("Hello Jim!")

    def revert(self, jim_number, **kwargs):
        print ("Wrong number, apologizing.")


# Create your flow and associated tasks (the work to be done).
flow = lf.Flow('retrying-linear',
               retry=retry.ParameterizedForEach(
                   rebind=['phone_directory'],
                   provides='jim_number')).add(CallJim())

# Now run that flow using the provided initial data (store below).
taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json
import logging
import os
import sys
import tempfile
import threading

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
from taskflow.tests import utils

import example_utils  # noqa

# INTRO: This example walks through a miniature workflow which shows how to
# start up a number of workers (these workers will process task execution and
# reversion requests using any provided input data) and then use an engine
# that creates a set of *capable* tasks and flows (the engine can not create
# tasks that the workers are not able to run, this will end in failure) that
# those workers will run and then executes that workflow seamlessly using the
# workers to perform the actual execution.
#
# NOTE(harlowja): this example simulates the expected larger number of workers
# by using a set of threads (which in this example simulate the remote workers
# that would typically be running on other external machines).

# A filesystem can also be used as the queue transport (useful as simple
# transport type that does not involve setting up a larger mq system). If this
# is false then the memory transport is used instead, both work in standalone
# setups.
USE_FILESYSTEM = False
BASE_SHARED_CONF = {
    'exchange': 'taskflow',
}

# Until https://github.com/celery/kombu/issues/398 is resolved it is not
# recommended to run many worker threads in this example due to the types
# of errors mentioned in that issue.
MEMORY_WORKERS = 2
FILE_WORKERS = 1
WORKER_CONF = {
    # These are the tasks the worker can execute, they *must* be importable,
    # typically this list is used to restrict what workers may execute to
    # a smaller set of *allowed* tasks that are known to be safe (one would
    # not want to allow all python code to be executed).
    'tasks': [
        'taskflow.tests.utils:TaskOneArgOneReturn',
        'taskflow.tests.utils:TaskMultiArgOneReturn'
    ],
}
ENGINE_CONF = {
    'engine': 'worker-based',
}


def run(engine_conf):
    flow = lf.Flow('simple-linear').add(
        utils.TaskOneArgOneReturn(provides='result1'),
        utils.TaskMultiArgOneReturn(provides='result2')
    )
    eng = engines.load(flow,
                       store=dict(x=111, y=222, z=333),
                       engine_conf=engine_conf)
    eng.run()
    return eng.storage.fetch_all()


if __name__ == "__main__":
    logging.basicConfig(level=logging.ERROR)

    # Setup our transport configuration and merge it into the worker and
    # engine configuration so that both of those use it correctly.
    shared_conf = dict(BASE_SHARED_CONF)

    tmp_path = None
    if USE_FILESYSTEM:
        worker_count = FILE_WORKERS
        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
        shared_conf.update({
            'transport': 'filesystem',
            'transport_options': {
                'data_folder_in': tmp_path,
                'data_folder_out': tmp_path,
                'polling_interval': 0.1,
            },
        })
    else:
        worker_count = MEMORY_WORKERS
        shared_conf.update({
            'transport': 'memory',
            'transport_options': {
                'polling_interval': 0.1,
            },
        })
    worker_conf = dict(WORKER_CONF)
    worker_conf.update(shared_conf)
    engine_conf = dict(ENGINE_CONF)
    engine_conf.update(shared_conf)
    workers = []
    worker_topics = []

    try:
        # Create a set of workers to simulate actual remote workers.
        print('Running %s workers.' % (worker_count))
        for i in range(0, worker_count):
            worker_conf['topic'] = 'worker-%s' % (i + 1)
            worker_topics.append(worker_conf['topic'])
            w = worker.Worker(**worker_conf)
            runner = threading.Thread(target=w.run)
            runner.daemon = True
            runner.start()
            w.wait()
            workers.append((runner, w.stop))

        # Now use those workers to do something.
        print('Executing some work.')
        engine_conf['topics'] = worker_topics
        result = run(engine_conf)
        print('Execution finished.')
        # This is done so that the test examples can work correctly
        # even when the keys change order (which will happen in various
        # python versions).
        print("Result = %s" % json.dumps(result, sort_keys=True))
    finally:
        # And cleanup.
        print('Stopping workers.')
        while workers:
            r, stopper = workers.pop()
            stopper()
            r.join()
        if tmp_path:
            example_utils.rm_path(tmp_path)

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import logging
import math
import os
import sys
import threading

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from six.moves import range as compat_range

from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: This example walks through a workflow that will in parallel compute
# a mandelbrot result set (using X 'remote' workers) and then combine their
# results together to form a final mandelbrot fractal image. It shows a usage
# of taskflow to perform a well-known embarrassingly parallel problem that has
# the added benefit of also being an elegant visualization.
#
# NOTE(harlowja): this example simulates the expected larger number of workers
# by using a set of threads (which in this example simulate the remote workers
# that would typically be running on other external machines).
#
# NOTE(harlowja): to have it produce an image run (after installing pillow):
#
# $ python taskflow/examples/wbe_mandelbrot.py output.png

BASE_SHARED_CONF = {
    'exchange': 'taskflow',
}
WORKERS = 2
WORKER_CONF = {
    # These are the tasks the worker can execute, they *must* be importable,
    # typically this list is used to restrict what workers may execute to
    # a smaller set of *allowed* tasks that are known to be safe (one would
    # not want to allow all python code to be executed).
    'tasks': [
        '%s:MandelCalculator' % (__name__),
    ],
}
ENGINE_CONF = {
    'engine': 'worker-based',
}

# Mandelbrot & image settings...
IMAGE_SIZE = (512, 512)
CHUNK_COUNT = 8
MAX_ITERATIONS = 25


class MandelCalculator(task.Task):
    def execute(self, image_config, mandelbrot_config, chunk):
        """Returns the number of iterations before the computation "escapes".

        Given the real and imaginary parts of a complex number, determine if it
        is a candidate for membership in the mandelbrot set given a fixed
        number of iterations.
        """

        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
        #
        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
        def mandelbrot(x, y, max_iters):
            c = complex(x, y)
            z = 0.0j
            for i in compat_range(max_iters):
                z = z * z + c
                if (z.real * z.real + z.imag * z.imag) >= 4:
                    return i
            return max_iters

        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
        height, width = image_config['size']
        pixel_size_x = (max_x - min_x) / width
        pixel_size_y = (max_y - min_y) / height
        block = []
        for y in compat_range(chunk[0], chunk[1]):
            row = []
            imag = min_y + y * pixel_size_y
            for x in compat_range(0, width):
                real = min_x + x * pixel_size_x
                row.append(mandelbrot(real, imag, max_iters))
            block.append(row)
        return block


def calculate(engine_conf):
    # Subdivide the work into X pieces, then request each worker to calculate
    # one of those chunks and then later we will write these chunks out to
    # an image bitmap file.

    # And unordered flow is used here since the mandelbrot calculation is an
    # example of a embarrassingly parallel computation that we can scatter
    # across as many workers as possible.
    flow = uf.Flow("mandelbrot")

    # These symbols will be automatically given to tasks as input to there
    # execute method, in this case these are constants used in the mandelbrot
    # calculation.
    store = {
        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
        'image_config': {
            'size': IMAGE_SIZE,
        }
    }

    # We need the task names to be in the right order so that we can extract
    # the final results in the right order (we don't care about the order when
    # executing).
    task_names = []

    # Compose our workflow.
    height, width = IMAGE_SIZE
    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
    for i in compat_range(0, CHUNK_COUNT):
        chunk_name = 'chunk_%s' % i
        task_name = "calculation_%s" % i
        # Break the calculation up into chunk size pieces.
        rows = [i * chunk_size, i * chunk_size + chunk_size]
        flow.add(
            MandelCalculator(task_name,
                             # This ensures the storage symbol with name
                             # 'chunk_name' is sent into the tasks local
                             # symbol 'chunk'. This is how we give each
                             # calculator its own correct sequence of rows
                             # to work on.
                             rebind={'chunk': chunk_name}))
        store[chunk_name] = rows
        task_names.append(task_name)

    # Now execute it.
    eng = engines.load(flow, store=store, engine_conf=engine_conf)
    eng.run()

    # Gather all the results and order them for further processing.
    gather = []
    for name in task_names:
        gather.extend(eng.storage.get(name))
    points = []
    for y, row in enumerate(gather):
        for x, color in enumerate(row):
            points.append(((x, y), color))
    return points


def write_image(results, output_filename=None):
    print("Gathered %s results that represents a mandelbrot"
          " image (using %s chunks that are computed jointly"
          " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
    if not output_filename:
        return

    # Pillow (the PIL fork) saves us from writing our own image writer...
    try:
        from PIL import Image
    except ImportError as e:
        # To currently get this (may change in the future),
        # $ pip install Pillow
        raise RuntimeError("Pillow is required to write image files: %s" % e)

    # Limit to 255, find the max and normalize to that...
    color_max = 0
    for _point, color in results:
        color_max = max(color, color_max)

    # Use gray scale since we don't really have other colors.
    img = Image.new('L', IMAGE_SIZE, "black")
    pixels = img.load()
    for (x, y), color in results:
        if color_max == 0:
            color = 0
        else:
            color = int((float(color) / color_max) * 255.0)
        pixels[x, y] = color
    img.save(output_filename)


def create_fractal():
    logging.basicConfig(level=logging.ERROR)

    # Setup our transport configuration and merge it into the worker and
    # engine configuration so that both of those use it correctly.
    shared_conf = dict(BASE_SHARED_CONF)
    shared_conf.update({
        'transport': 'memory',
        'transport_options': {
            'polling_interval': 0.1,
        },
    })

    if len(sys.argv) >= 2:
        output_filename = sys.argv[1]
    else:
        output_filename = None

    worker_conf = dict(WORKER_CONF)
    worker_conf.update(shared_conf)
    engine_conf = dict(ENGINE_CONF)
    engine_conf.update(shared_conf)
    workers = []
    worker_topics = []

    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
    try:
        # Create a set of workers to simulate actual remote workers.
        print('Running %s workers.' % (WORKERS))
        for i in compat_range(0, WORKERS):
            worker_conf['topic'] = 'calculator_%s' % (i + 1)
            worker_topics.append(worker_conf['topic'])
            w = worker.Worker(**worker_conf)
            runner = threading.Thread(target=w.run)
            runner.daemon = True
            runner.start()
            w.wait()
            workers.append((runner, w.stop))

        # Now use those workers to do something.
        engine_conf['topics'] = worker_topics
        results = calculate(engine_conf)
        print('Execution finished.')
    finally:
        # And cleanup.
        print('Stopping workers.')
        while workers:
            r, stopper = workers.pop()
            stopper()
            r.join()
    print("Writing image...")
    write_image(results, output_filename=output_filename)


if __name__ == "__main__":
    create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import collections
import contextlib
import logging
import os
import random
import sys
import threading
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from six.moves import range as compat_range
from zake import fake_client

from taskflow import exceptions as excp
from taskflow.jobs import backends

# In this example we show how a jobboard can be used to post work for other
# entities to work on. This example creates a set of jobs using one producer
# thread (typically this would be split across many machines) and then having
# other worker threads with there own jobboards select work using a given
# filters [red/blue] and then perform that work (and consuming or abandoning
# the job after it has been completed or failed).

# Things to note:
# - No persistence layer is used (or logbook), just the job details are used
#   to determine if a job should be selected by a worker or not.
# - This example runs in a single process (this is expected to be atypical
#   but this example shows that it can be done if needed, for testing...)
# - The iterjobs(), claim(), consume()/abandon() worker workflow.
# - The post() producer workflow.

SHARED_CONF = {
    'path': "/taskflow/jobs",
    'board': 'zookeeper',
}

# How many workers and producers of work will be created (as threads).
PRODUCERS = 3
WORKERS = 5

# How many units of work each producer will create.
PRODUCER_UNITS = 10

# How many units of work are expected to be produced (used so workers can
# know when to stop running and shutdown, typically this would not be a
# a value but we have to limit this examples execution time to be less than
# infinity).
EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS

# Delay between producing/consuming more work.
WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)

# To ensure threads don't trample other threads output.
STDOUT_LOCK = threading.Lock()


def dispatch_work(job):
    # This is where the jobs contained work *would* be done
    time.sleep(1.0)


def safe_print(name, message, prefix=""):
    with STDOUT_LOCK:
        if prefix:
            print("%s %s: %s" % (prefix, name, message))
        else:
            print("%s: %s" % (name, message))


def worker(ident, client, consumed):
    # Create a personal board (using the same client so that it works in
    # the same process) and start looking for jobs on the board that we want
    # to perform.
    name = "W-%s" % (ident)
    safe_print(name, "started")
    claimed_jobs = 0
    consumed_jobs = 0
    abandoned_jobs = 0
    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
        while len(consumed) != EXPECTED_UNITS:
            favorite_color = random.choice(['blue', 'red'])
            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
                # See if we should even bother with it...
                if job.details.get('color') != favorite_color:
                    continue
                safe_print(name, "'%s' [attempting claim]" % (job))
                try:
                    board.claim(job, name)
                    claimed_jobs += 1
                    safe_print(name, "'%s' [claimed]" % (job))
                except (excp.NotFound, excp.UnclaimableJob):
                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
                else:
                    try:
                        dispatch_work(job)
                        board.consume(job, name)
                        safe_print(name, "'%s' [consumed]" % (job))
                        consumed_jobs += 1
                        consumed.append(job)
                    except Exception:
                        board.abandon(job, name)
                        abandoned_jobs += 1
                        safe_print(name, "'%s' [abandoned]" % (job))
            time.sleep(WORKER_DELAY)
    safe_print(name,
               "finished (claimed %s jobs, consumed %s jobs,"
               " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
                                        abandoned_jobs), prefix=">>>")


def producer(ident, client):
    # Create a personal board (using the same client so that it works in
    # the same process) and start posting jobs on the board that we want
    # some entity to perform.
    name = "P-%s" % (ident)
    safe_print(name, "started")
    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
        for i in compat_range(0, PRODUCER_UNITS):
            job_name = "%s-%s" % (name, i)
            details = {
                'color': random.choice(['red', 'blue']),
            }
            job = board.post(job_name, book=None, details=details)
            safe_print(name, "'%s' [posted]" % (job))
            time.sleep(PRODUCER_DELAY)
    safe_print(name, "finished", prefix=">>>")


def main():
    with contextlib.closing(fake_client.FakeClient()) as c:
        created = []
        for i in compat_range(0, PRODUCERS):
            p = threading.Thread(target=producer, args=(i + 1, c))
            p.daemon = True
            created.append(p)
            p.start()
        consumed = collections.deque()
        for i in compat_range(0, WORKERS):
            w = threading.Thread(target=worker, args=(i + 1, c, consumed))
            w.daemon = True
            created.append(w)
            w.start()
        while created:
            t = created.pop()
            t.join()
        # At the end there should be nothing leftover, let's verify that.
        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
        board.connect()
        with contextlib.closing(board):
            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
                return 1
            return 0


if __name__ == "__main__":
    sys.exit(main())