Types

Note

Even though these types are made for public consumption and usage should be encouraged/easily possible it should be noted that these may be moved out to new libraries at various points in the future. If you are using these types without using the rest of this library it is strongly encouraged that you be a vocal proponent of getting these made into isolated libraries (as using these types in this manner is not the expected and/or desired usage).

Entity

class taskflow.types.entity.Entity(kind, name, metadata)[source]

Bases: object

Entity object that identifies some resource/item/other.

Variables:
  • kindimmutable type/kind that identifies this entity (typically unique to a library/application)
  • nameimmutable name that can be used to uniquely identify this entity among many other entities
  • metadataimmutable dictionary of metadata that is associated with this entity (and typically has keys/values that further describe this entity)

Failure

Graph

class taskflow.types.graph.Graph(data=None, name='')[source]

Bases: networkx.classes.graph.Graph

A graph subclass with useful utility functions.

freeze()[source]

Freezes the graph so that no more mutations can occur.

export_to_dot()[source]

Exports the graph to a dot format (requires pydot library).

pformat()[source]

Pretty formats your graph into a string.

class taskflow.types.graph.DiGraph(data=None, name='')[source]

Bases: networkx.classes.digraph.DiGraph

A directed graph subclass with useful utility functions.

freeze()[source]

Freezes the graph so that no more mutations can occur.

get_edge_data(u, v, default=None)[source]

Returns a copy of the edge attribute dictionary between (u, v).

NOTE(harlowja): this differs from the networkx get_edge_data() as that function does not return a copy (but returns a reference to the actual edge data).

topological_sort()[source]

Return a list of nodes in this graph in topological sort order.

pformat()[source]

Pretty formats your graph into a string.

This pretty formatted string representation includes many useful details about your graph, including; name, type, frozeness, node count, nodes, edge count, edges, graph density and graph cycles (if any).

export_to_dot()[source]

Exports the graph to a dot format (requires pydot library).

is_directed_acyclic()[source]

Returns if this graph is a DAG or not.

no_successors_iter()[source]

Returns an iterator for all nodes with no successors.

no_predecessors_iter()[source]

Returns an iterator for all nodes with no predecessors.

bfs_predecessors_iter(n)[source]

Iterates breadth first over all predecessors of a given node.

This will go through the nodes predecessors, then the predecessor nodes predecessors and so on until no more predecessors are found.

NOTE(harlowja): predecessor cycles (if they exist) will not be iterated over more than once (this prevents infinite iteration).

class taskflow.types.graph.OrderedDiGraph(data=None, name='')[source]

Bases: taskflow.types.graph.DiGraph

A directed graph subclass with useful utility functions.

This derivative retains node, edge, insertation and iteration ordering (so that the iteration order matches the insertation order).

node_dict_factory

alias of OrderedDict

adjlist_dict_factory

alias of OrderedDict

edge_attr_dict_factory

alias of OrderedDict

taskflow.types.graph.merge_graphs(graph, *graphs, **kwargs)[source]

Merges a bunch of graphs into a new graph.

If no additional graphs are provided the first graph is returned unmodified otherwise the merged graph is returned.

Notifier

class taskflow.types.notifier.Listener(callback, args=None, kwargs=None, details_filter=None)[source]

Bases: object

Immutable helper that represents a notification listener/target.

callback

Callback (can not be none) to call with event + details.

details_filter

Callback (may be none) to call to discard events + details.

kwargs

Dictionary of keyword arguments to use in future calls.

args

Tuple of positional arguments to use in future calls.

__call__(event_type, details)[source]

Activate the target callback with the given event + details.

NOTE(harlowja): if a details filter callback exists and it returns a falsey value when called with the provided details, then the target callback will not be called.

is_equivalent(callback, details_filter=None)[source]

Check if the callback is same

Parameters:
  • callback – callback used for comparison
  • details_filter – callback used for comparison
Returns:

false if not the same callback, otherwise true

Return type:

boolean

class taskflow.types.notifier.Notifier[source]

Bases: object

A notification (pub/sub like) helper class.

It is intended to be used to subscribe to notifications of events occurring as well as allow a entity to post said notifications to any associated subscribers without having either entity care about how this notification occurs.

Not thread-safe when a single notifier is mutated at the same time by multiple threads. For example having multiple threads call into register() or reset() at the same time could potentially end badly. It is thread-safe when only notify() calls or other read-only actions (like calling into is_registered()) are occuring at the same time.

RESERVED_KEYS = ('details',)

Keys that can not be used in callbacks arguments

ANY = '*'

Kleene star constant that is used to recieve all notifications

is_registered(event_type, callback, details_filter=None)[source]

Check if a callback is registered.

Returns:checks if the callback is registered
Return type:boolean
reset()[source]

Forget all previously registered callbacks.

notify(event_type, details)[source]

Notify about event occurrence.

All callbacks registered to receive notifications about given event type will be called. If the provided event type can not be used to emit notifications (this is checked via the can_be_registered() method) then it will silently be dropped (notification failures are not allowed to cause or raise exceptions).

Parameters:
  • event_type – event type that occurred
  • details (dictionary) – additional event details dictionary passed to callback keyword argument with the same name
register(event_type, callback, args=None, kwargs=None, details_filter=None)[source]

Register a callback to be called when event of a given type occurs.

Callback will be called with provided args and kwargs and when event type occurs (or on any event if event_type equals to ANY). It will also get additional keyword argument, details, that will hold event details provided to the notify() method (if a details filter callback is provided then the target callback will only be triggered if the details filter callback returns a truthy value).

Parameters:
  • event_type – event type input
  • callback – function callback to be registered.
  • args (list) – non-keyworded arguments
  • kwargs (dictionary) – key-value pair arguments
deregister(event_type, callback, details_filter=None)[source]

Remove a single listener bound to event event_type.

Parameters:event_type – deregister listener bound to event_type
deregister_event(event_type)[source]

Remove a group of listeners bound to event event_type.

Parameters:event_type – deregister listeners bound to event_type
listeners_iter()[source]

Return an iterator over the mapping of event => listeners bound.

NOTE(harlowja): Each listener in the yielded (event, listeners) tuple is an instance of the Listener type, which itself wraps a provided callback (and its details filter callback, if any).

can_be_registered(event_type)[source]

Checks if the event can be registered/subscribed to.

can_trigger_notification(event_type)[source]

Checks if the event can trigger a notification.

Parameters:event_type – event that needs to be verified
Returns:whether the event can trigger a notification
Return type:boolean
class taskflow.types.notifier.RestrictedNotifier(watchable_events, allow_any=True)[source]

Bases: taskflow.types.notifier.Notifier

A notification class that restricts events registered/triggered.

NOTE(harlowja): This class unlike Notifier restricts and disallows registering callbacks for event types that are not declared when constructing the notifier.

events_iter()[source]

Returns iterator of events that can be registered/subscribed to.

NOTE(harlowja): does not include back the ANY event type as that meta-type is not a specific event but is a capture-all that does not imply the same meaning as specific event types.

can_be_registered(event_type)[source]

Checks if the event can be registered/subscribed to.

Parameters:event_type – event that needs to be verified
Returns:whether the event can be registered/subscribed to
Return type:boolean
taskflow.types.notifier.register_deregister(*args, **kwds)[source]

Context manager that registers a callback, then deregisters on exit.

NOTE(harlowja): if the callback is none, then this registers nothing, which
is different from the behavior of the register method which will not accept none as it is not callable...

Sets

class taskflow.types.sets.OrderedSet(iterable=None)[source]

Bases: _abcoll.Set, _abcoll.Hashable

A read-only hashable set that retains insertion/initial ordering.

It should work in all existing places that frozenset is used.

See: https://mail.python.org/pipermail/python-ideas/2009-May/004567.html for an idea thread that may eventually (someday) result in this (or similar) code being included in the mainline python codebase (although the end result of that thread is somewhat discouraging in that regard).

copy()[source]

Return a shallow copy of a set.

intersection(*sets)[source]

Return the intersection of two or more sets as a new set.

(i.e. elements that are common to all of the sets.)

issuperset(other)[source]

Report whether this set contains another set.

issubset(other)[source]

Report whether another set contains this set.

difference(*sets)[source]

Return the difference of two or more sets as a new set.

(i.e. all elements that are in this set but not the others.)

union(*sets)[source]

Return the union of sets as a new set.

(i.e. all elements that are in either set.)

Timing

class taskflow.types.timing.Timeout(value, event_factory=<function Event>)[source]

Bases: object

An object which represents a timeout.

This object has the ability to be interrupted before the actual timeout is reached.

value

Immutable value of the internally used timeout.

interrupt()[source]

Forcefully set the timeout (releases any waiters).

is_stopped()[source]

Returns if the timeout has been interrupted.

wait()[source]

Block current thread (up to timeout) and wait until interrupted.

reset()[source]

Reset so that interruption (and waiting) can happen again.

taskflow.types.timing.convert_to_timeout(value=None, default_value=None, event_factory=<function Event>)[source]

Converts a given value to a timeout instance (and returns it).

Does nothing if the value provided is already a timeout instance.

Tree