runway.cfngin.dag package¶
CFNgin directed acyclic graph (DAG) implementation.
- class runway.cfngin.dag.DAG[source]¶
Bases:
object
Directed acyclic graph implementation.
- add_edge(ind_node: str, dep_node: str) None [source]¶
Add an edge (dependency) between the specified nodes.
- Parameters:
ind_node – The independent node to add an edge to.
dep_node – The dependent node that has a dependency on the ind_node.
- Raises:
KeyError – Either the ind_node, or dep_node do not exist.
DAGValidationError – Raised if the resulting graph is invalid.
- add_node(node_name: str) None [source]¶
Add a node if it does not exist yet, or error out.
- Parameters:
node_name – The unique name of the node to add.
- Raises:
KeyError – Raised if a node with the same name already exist in the graph
- add_node_if_not_exists(node_name: str) None [source]¶
Add a node if it does not exist yet, ignoring duplicates.
- Parameters:
node_name – The name of the node to add.
- all_downstreams(node: str) list[str] [source]¶
Return a list of all nodes downstream in topological order.
- Parameters:
node – The node whose downstream nodes you want to find.
- Returns:
A list of nodes that are downstream from the node.
- delete_edge(ind_node: str, dep_node: str) None [source]¶
Delete an edge from the graph.
- Parameters:
ind_node – The independent node to delete an edge from.
dep_node – The dependent node that has a dependency on the ind_node.
- Raises:
KeyError – Raised when the edge doesn’t already exist.
- delete_node(node_name: str) None [source]¶
Delete this node and all edges referencing it.
- Parameters:
node_name – The name of the node to delete.
- Raises:
KeyError – Raised if the node does not exist in the graph.
- delete_node_if_exists(node_name: str) None [source]¶
Delete this node and all edges referencing it.
Ignores any node that is not in the graph, rather than throwing an exception.
- Parameters:
node_name – The name of the node to delete.
- downstream(node: str) list[str] [source]¶
Return a list of all nodes this node has edges towards.
- Parameters:
node – The node whose downstream nodes you want to find.
- Returns:
A list of nodes that are immediately downstream from the node.
- filter(nodes: list[str]) DAG [source]¶
Return a new DAG with only the given nodes and their dependencies.
- Parameters:
nodes – The nodes you are interested in.
- from_dict(graph_dict: dict[str, Iterable[str] | Any]) None [source]¶
Reset the graph and build it from the passed dictionary.
The dictionary takes the form of {node_name: [directed edges]}
- Parameters:
graph_dict – The dictionary used to create the graph.
- Raises:
TypeError – Raised if the value of items in the dict are not lists.
- predecessors(node: str) list[str] [source]¶
Return a list of all immediate predecessors of the given node.
- rename_edges(old_node_name: str, new_node_name: str) None [source]¶
Change references to a node in existing edges.
- Parameters:
old_node_name – The old name for the node.
new_node_name – The new name for the node.
- topological_sort() list[str] [source]¶
Return a topological ordering of the DAG.
- Raises:
ValueError – Raised if the graph is not acyclic.
- transitive_reduction() None [source]¶
Perform a transitive reduction on the DAG.
The transitive reduction of a graph is a graph with as few edges as possible with the same reachability as the original graph.
- walk(walk_func: Callable[[str], Any]) None [source]¶
Walk each node of the graph in reverse topological order.
This can be used to perform a set of operations, where the next operation depends on the previous operation. It’s important to note that walking happens serially, and is not parallelized.
- Parameters:
walk_func – The function to be called on each node of the graph.
- exception runway.cfngin.dag.DAGValidationError[source]¶
Bases:
Exception
Raised when DAG validation fails.
- class runway.cfngin.dag.ThreadedWalker[source]¶
Bases:
object
Walk a DAG as quickly as the graph topology allows, using threads.
- __init__(semaphore: threading.Semaphore | UnlimitedSemaphore) None [source]¶
Instantiate class.
- Parameters:
semaphore – A semaphore object which can be used to control how many steps are executed in parallel.