runway.cfngin.dag package

CFNgin directed acyclic graph (DAG) implementation.

class runway.cfngin.dag.DAG[source]

Bases: object

Directed acyclic graph implementation.

__init__() None[source]

Instantiate a new DAG with no nodes or edges.

__len__() int[source]

How the length of a DAG is calculated.

add_edge(ind_node: str, dep_node: str) None[source]

Add an edge (dependency) between the specified nodes.

Parameters:
  • ind_node – The independent node to add an edge to.

  • dep_node – The dependent node that has a dependency on the ind_node.

Raises:
add_node(node_name: str) None[source]

Add a node if it does not exist yet, or error out.

Parameters:

node_name – The unique name of the node to add.

Raises:

KeyError – Raised if a node with the same name already exist in the graph

add_node_if_not_exists(node_name: str) None[source]

Add a node if it does not exist yet, ignoring duplicates.

Parameters:

node_name – The name of the node to add.

all_downstreams(node: str) list[str][source]

Return a list of all nodes downstream in topological order.

Parameters:

node – The node whose downstream nodes you want to find.

Returns:

A list of nodes that are downstream from the node.

all_leaves() list[str][source]

Return a list of all leaves (nodes with no downstreams).

delete_edge(ind_node: str, dep_node: str) None[source]

Delete an edge from the graph.

Parameters:
  • ind_node – The independent node to delete an edge from.

  • dep_node – The dependent node that has a dependency on the ind_node.

Raises:

KeyError – Raised when the edge doesn’t already exist.

delete_node(node_name: str) None[source]

Delete this node and all edges referencing it.

Parameters:

node_name – The name of the node to delete.

Raises:

KeyError – Raised if the node does not exist in the graph.

delete_node_if_exists(node_name: str) None[source]

Delete this node and all edges referencing it.

Ignores any node that is not in the graph, rather than throwing an exception.

Parameters:

node_name – The name of the node to delete.

downstream(node: str) list[str][source]

Return a list of all nodes this node has edges towards.

Parameters:

node – The node whose downstream nodes you want to find.

Returns:

A list of nodes that are immediately downstream from the node.

filter(nodes: list[str]) DAG[source]

Return a new DAG with only the given nodes and their dependencies.

Parameters:

nodes – The nodes you are interested in.

from_dict(graph_dict: dict[str, Iterable[str] | Any]) None[source]

Reset the graph and build it from the passed dictionary.

The dictionary takes the form of {node_name: [directed edges]}

Parameters:

graph_dict – The dictionary used to create the graph.

Raises:

TypeError – Raised if the value of items in the dict are not lists.

ind_nodes() list[str][source]

Return a list of all nodes in the graph with no dependencies.

predecessors(node: str) list[str][source]

Return a list of all immediate predecessors of the given node.

Parameters:

node (str) – The node whose predecessors you want to find.

Returns:

A list of nodes that are immediate predecessors to node.

Return type:

list[str]

rename_edges(old_node_name: str, new_node_name: str) None[source]

Change references to a node in existing edges.

Parameters:
  • old_node_name – The old name for the node.

  • new_node_name – The new name for the node.

reset_graph() None[source]

Restore the graph to an empty state.

size() int[source]

Count of nodes in the graph.

topological_sort() list[str][source]

Return a topological ordering of the DAG.

Raises:

ValueError – Raised if the graph is not acyclic.

transitive_reduction() None[source]

Perform a transitive reduction on the DAG.

The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph.

See https://en.wikipedia.org/wiki/Transitive_reduction

transpose() DAG[source]

Build a new graph with the edges reversed.

validate() tuple[bool, str][source]

Return (Boolean, message) of whether DAG is valid.

walk(walk_func: Callable[[str], Any]) None[source]

Walk each node of the graph in reverse topological order.

This can be used to perform a set of operations, where the next operation depends on the previous operation. It’s important to note that walking happens serially, and is not parallelized.

Parameters:

walk_func – The function to be called on each node of the graph.

exception runway.cfngin.dag.DAGValidationError[source]

Bases: Exception

Raised when DAG validation fails.

class runway.cfngin.dag.ThreadedWalker[source]

Bases: object

Walk a DAG as quickly as the graph topology allows, using threads.

__init__(semaphore: threading.Semaphore | UnlimitedSemaphore) None[source]

Instantiate class.

Parameters:

semaphore – A semaphore object which can be used to control how many steps are executed in parallel.

walk(dag: DAG, walk_func: Callable[[str], Any]) None[source]

Walk each node of the graph, in parallel if it can.

The walk_func is only called when the nodes dependencies have been satisfied.

class runway.cfngin.dag.UnlimitedSemaphore[source]

Bases: object

threading.Semaphore, but acquire always succeeds.

acquire(*args: Any) Any[source]

Do nothing.

release() Any[source]

Do nothing.

runway.cfngin.dag.walk(dag: DAG, walk_func: Callable[[str], Any]) None[source]

Walk a DAG.