"""CFNgin plan, plan components, and functions for interacting with a plan."""
from __future__ import annotations
import json
import logging
import threading
import time
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, NoReturn, TypeVar, overload
from .._logging import LogLevels, PrefixAdaptor
from ..utils import merge_dicts
from .dag import DAG, DAGValidationError, walk
from .exceptions import CancelExecution, GraphError, PersistentGraphLocked, PlanFailed
from .stack import Stack
from .status import COMPLETE, FAILED, PENDING, SKIPPED, SUBMITTED, FailedStatus, SkippedStatus
from .ui import ui
from .utils import stack_template_key_name
if TYPE_CHECKING:
from collections import OrderedDict
from ..context import CfnginContext
from .providers.aws.default import Provider
from .status import Status
LOGGER = logging.getLogger(__name__)
_T = TypeVar("_T")
@overload
def json_serial(obj: set[_T]) -> list[_T]: ...
@overload
def json_serial(obj: dict[Any, Any] | int | list[Any] | str) -> NoReturn: ...
[docs]
def json_serial(obj: set[Any] | Any) -> Any:
"""Serialize json.
Args:
obj: A python object.
Example:
json.dumps(data, default=json_serial)
"""
if isinstance(obj, set):
return list(obj) # pyright: ignore[reportUnknownArgumentType]
raise TypeError
[docs]
def merge_graphs(graph1: Graph, graph2: Graph) -> Graph:
"""Combine two Graphs into one, retaining steps.
Args:
graph1: Graph that ``graph2`` will be merged into.
graph2: Graph that will be merged into ``graph1``.
"""
merged_graph_dict = merge_dicts(graph1.to_dict().copy(), graph2.to_dict())
steps = [graph1.steps.get(name, graph2.steps.get(name)) for name in merged_graph_dict]
return Graph.from_steps([step for step in steps if step])
[docs]
class Step:
"""State machine for executing generic actions related to stacks.
Attributes:
fn: Function to run to execute the step.
This function will be ran multiple times until the step is "done".
last_updated: Time when the step was last updated.
logger: Logger for logging messages about the step.
stack: the stack associated with this step
status: The status of step.
watch_func: Function that will be called to "tail" the step action.
"""
fn: Callable[..., Any] | None
last_updated: float
logger: PrefixAdaptor
stack: Stack
status: Status
watch_func: Callable[..., Any] | None
[docs]
def __init__(
self,
stack: Stack,
*,
fn: Callable[..., Any] | None = None,
watch_func: Callable[..., Any] | None = None,
) -> None:
"""Instantiate class.
Args:
stack: The stack associated
with this step
fn: Function to run to execute the step.
This function will be ran multiple times until the step is "done".
watch_func: Function that will be called to "tail" the step action.
"""
self.stack = stack
self.status = PENDING
self.last_updated = time.time()
self.logger = PrefixAdaptor(self.stack.name, LOGGER)
self.fn = fn
self.watch_func = watch_func
[docs]
def run(self) -> bool:
"""Run this step until it has completed or been skipped."""
stop_watcher = threading.Event()
watcher = None
if self.watch_func:
watcher = threading.Thread(target=self.watch_func, args=(self.stack, stop_watcher))
watcher.start()
try:
while not self.done:
self._run_once()
finally:
if watcher:
stop_watcher.set()
watcher.join()
return self.ok
def _run_once(self) -> Status:
"""Run a step exactly once."""
if not self.fn:
raise TypeError("Step.fn must be type Callable[..., Status] not None")
try:
status = self.fn(self.stack, status=self.status)
except CancelExecution:
status = SkippedStatus("canceled execution")
except Exception as err:
LOGGER.exception(err)
status = FailedStatus(reason=str(err))
self.set_status(status)
return status
@property
def name(self) -> str:
"""Name of the step.
This is equal to the name of the stack it operates on.
"""
return self.stack.name
@property
def requires(self) -> set[str]:
"""Return a list of step names this step depends on."""
return self.stack.requires
@property
def required_by(self) -> set[str]:
"""Return a list of step names that depend on this step."""
return self.stack.required_by
@property
def completed(self) -> bool:
"""Return True if the step is in a COMPLETE state."""
return self.status == COMPLETE
@property
def skipped(self) -> bool:
"""Return True if the step is in a SKIPPED state."""
return self.status == SKIPPED
@property
def failed(self) -> bool:
"""Return True if the step is in a FAILED state."""
return self.status == FAILED
@property
def done(self) -> bool:
"""Return True if the step is finished.
To be ``True``, status must be either COMPLETE, SKIPPED or FAILED)
"""
return self.completed or self.skipped or self.failed
@property
def ok(self) -> bool:
"""Return True if the step is finished (either COMPLETE or SKIPPED)."""
return self.completed or self.skipped
@property
def submitted(self) -> bool:
"""Return True if the step is SUBMITTED, COMPLETE, or SKIPPED."""
return self.status >= SUBMITTED
[docs]
def set_status(self, status: Status) -> None:
"""Set the current step's status.
Args:
status: The status to set the step to.
"""
if status is not self.status:
LOGGER.debug("setting %s state to %s...", self.stack.name, status.name)
self.status = status
self.last_updated = time.time()
if self.stack.logging:
self.log_step()
[docs]
def complete(self) -> None:
"""Shortcut for ``set_status(COMPLETE)``."""
self.set_status(COMPLETE)
[docs]
def log_step(self) -> None:
"""Construct a log message for a set and log it to the UI."""
msg = self.status.name
if self.status.reason:
msg += f" ({self.status.reason})"
if self.status.code == SUBMITTED.code:
ui.log(LogLevels.NOTICE, msg, logger=self.logger)
elif self.status.code == COMPLETE.code:
ui.log(LogLevels.SUCCESS, msg, logger=self.logger)
elif self.status.code == FAILED.code:
ui.log(LogLevels.ERROR, msg, logger=self.logger)
else:
ui.info(msg, logger=self.logger)
[docs]
def skip(self) -> None:
"""Shortcut for ``set_status(SKIPPED)``."""
self.set_status(SKIPPED)
[docs]
def submit(self) -> None:
"""Shortcut for ``set_status(SUBMITTED)``."""
self.set_status(SUBMITTED)
[docs]
@classmethod
def from_stack_name(
cls,
stack_name: str,
context: CfnginContext,
requires: list[str] | set[str] | None = None,
fn: Callable[..., Status] | None = None,
watch_func: Callable[..., Any] | None = None,
) -> Step:
"""Create a step using only a stack name.
Args:
stack_name: Name of a CloudFormation stack.
context: Context object. Required to initialize a "fake"
:class:`runway.cfngin.stack.Stack`.
requires: Stacks that this stack depends on.
fn: The function to run to execute the step.
This function will be ran multiple times until the step is "done".
watch_func: an optional function that will be called to "tail" the
step action.
"""
from runway.config.models.cfngin import CfnginStackDefinitionModel
stack_def = CfnginStackDefinitionModel.model_construct(
name=stack_name, requires=requires or []
)
stack = Stack(stack_def, context)
return cls(stack, fn=fn, watch_func=watch_func)
[docs]
@classmethod
def from_persistent_graph(
cls,
graph_dict: dict[str, list[str]] | dict[str, set[str]] | OrderedDict[str, set[str]],
context: CfnginContext,
fn: Callable[..., Status] | None = None,
watch_func: Callable[..., Any] | None = None,
) -> list[Step]:
"""Create a steps for a persistent graph dict.
Args:
graph_dict: A graph dict.
context: Context object. Required to initialize a "fake"
:class:`runway.cfngin.stack.Stack`.
requires: Stacks that this stack depends on.
fn: The function to run to execute the step.
This function will be ran multiple times until the step is "done".
watch_func: an optional function that will be called to "tail" the
step action.
"""
return [
cls.from_stack_name(name, context, requires, fn, watch_func)
for name, requires in graph_dict.items()
]
[docs]
def __repr__(self) -> str:
"""Object represented as a string."""
return f"<CFNgin.plan.Step:{self.stack.name}>"
[docs]
def __str__(self) -> str:
"""Object displayed as a string."""
return self.stack.name
[docs]
class Graph:
"""Graph represents a graph of steps.
The :class:`Graph` helps organize the steps needed to execute a particular
action for a set of :class:`runway.cfngin.stack.Stack` objects. When
initialized with a set of steps, it will first build a Directed Acyclic
Graph from the steps and their dependencies.
Example:
>>> dag = DAG()
>>> a = Step("a", fn=deploy)
>>> b = Step("b", fn=deploy)
>>> dag.add_step(a)
>>> dag.add_step(b)
>>> dag.connect(a, b)
"""
dag: DAG
steps: dict[str, Step]
[docs]
def __init__(self, steps: dict[str, Step] | None = None, dag: DAG | None = None) -> None:
"""Instantiate class.
Args:
steps: Dict with key of step name and value of :class:`Step` for
steps to initialize the Graph with. Note that if this is provided,
a pre-configured :class:`runway.cfngin.dag.DAG` that already
includes these steps should also be provided..
dag: An optional :class:`runway.cfngin.dag.DAG` object. If one is
not provided, a new one will be initialized.
"""
self.steps = steps or {}
self.dag = dag or DAG()
[docs]
def add_step(
self, step: Step, add_dependencies: bool = False, add_dependents: bool = False
) -> None:
"""Add a step to the graph.
Args:
step: The step to be added.
add_dependencies: Connect steps that need to be completed before this
step.
add_dependents: Connect steps that require this step.
"""
self.steps[step.name] = step
self.dag.add_node(step.name)
if add_dependencies:
for dep in step.requires:
self.connect(step.name, dep)
if add_dependents:
for parent in step.required_by:
self.connect(parent, step.name)
[docs]
def add_step_if_not_exists(
self, step: Step, add_dependencies: bool = False, add_dependents: bool = False
) -> None:
"""Try to add a step to the graph.
Can be used when failure to add is acceptable.
Args:
step: The step to be added.
add_dependencies: Connect steps that need to be completed before this
step.
add_dependents: Connect steps that require this step.
"""
if self.steps.get(step.name):
return
self.steps[step.name] = step
self.dag.add_node_if_not_exists(step.name)
if add_dependencies:
for dep in step.requires:
try:
self.connect(step.name, dep)
except GraphError:
continue
if add_dependents:
for parent in step.required_by:
try:
self.connect(parent, step.name)
except GraphError:
continue
[docs]
def add_steps(self, steps: list[Step]) -> None:
"""Add a list of steps.
Args:
steps: The step to be added.
"""
for step in steps:
self.add_step(step)
for step in steps:
for dep in step.requires:
self.connect(step.name, dep)
for parent in step.required_by:
self.connect(parent, step.name)
[docs]
def pop(self, step: Step, default: Any = None) -> Any:
"""Remove a step from the graph.
Args:
step: The step to remove from the graph.
default: Returned if the step could not be popped
"""
self.dag.delete_node_if_exists(step.name)
return self.steps.pop(step.name, default)
[docs]
def connect(self, step: str, dep: str) -> None:
"""Connect a dependency to a step.
Args:
step: Step name to add a dependency to.
dep: Name of dependent step.
"""
try:
self.dag.add_edge(step, dep)
except (DAGValidationError, KeyError) as exc:
raise GraphError(exc, step, dep) from None
[docs]
def transitive_reduction(self) -> None:
"""Perform a transitive reduction on the underlying DAG.
The transitive reduction of a graph is a graph with as few edges as
possible with the same reachability as the original graph.
See https://en.wikipedia.org/wiki/Transitive_reduction
"""
self.dag.transitive_reduction()
[docs]
def walk(
self,
walker: Callable[[DAG, Callable[[str], Any]], Any],
walk_func: Callable[[Step], Any],
) -> Any:
"""Walk the steps of the graph.
Args:
walker: Function used to walk the steps.
walk_func: Function called with a :class:`Step` as the only argument
for each step of the plan.
"""
def fn(step_name: str) -> Any:
"""Get a step by step name and execute the ``walk_func`` on it.
Args:
step_name: Name of a step.
"""
step = self.steps[step_name]
return walk_func(step)
return walker(self.dag, fn)
[docs]
def downstream(self, step_name: str) -> list[Step]:
"""Return the direct dependencies of the given step."""
return [self.steps[dep] for dep in self.dag.downstream(step_name)]
[docs]
def transposed(self) -> Graph:
"""Return a "transposed" version of this graph.
Useful for walking in reverse.
"""
return Graph(steps=self.steps, dag=self.dag.transpose())
[docs]
def filtered(self, step_names: list[str]) -> Graph:
"""Return a "filtered" version of this graph.
Args:
step_names: Steps to filter.
"""
return Graph(steps=self.steps, dag=self.dag.filter(step_names))
[docs]
def topological_sort(self) -> list[Step]:
"""Perform a topological sort of the underlying DAG."""
nodes = self.dag.topological_sort()
return [self.steps[step_name] for step_name in nodes]
[docs]
def to_dict(self) -> OrderedDict[str, set[str]]:
"""Return the underlying DAG as a dictionary."""
return self.dag.graph
[docs]
def dumps(self, indent: int | None = None) -> str:
"""Output the graph as a json serialized string for storage.
Args:
indent: Number of spaces for each indentation.
"""
return json.dumps(self.to_dict(), default=json_serial, indent=indent)
[docs]
@classmethod
def from_dict(
cls,
graph_dict: dict[str, list[str]] | dict[str, set[str]] | OrderedDict[str, set[str]],
context: CfnginContext,
) -> Graph:
"""Create a Graph from a graph dict.
Args:
graph_dict: The dictionary used to create the graph.
context: Required to init stacks.
"""
return cls.from_steps(Step.from_persistent_graph(graph_dict, context))
[docs]
@classmethod
def from_steps(cls, steps: list[Step]) -> Graph:
"""Create a Graph from Steps.
Args:
steps: Steps used to create the graph.
"""
graph = cls()
graph.add_steps(steps)
return graph
[docs]
def __str__(self) -> str:
"""Object displayed as a string."""
return self.dumps()
[docs]
class Plan:
"""A convenience class for working on a Graph.
Attributes:
context: Context object.
description: Plan description.
graph: Graph of the plan.
id: UUID for the plan.
reverse: The graph has been transposed for walking in reverse.
require_unlocked: Require the persistent graph to be unlocked before
executing steps.
"""
context: CfnginContext | None
description: str
graph: Graph
id: uuid.UUID
require_unlocked: bool
reverse: bool
[docs]
def __init__(
self,
description: str,
graph: Graph,
context: CfnginContext | None = None,
reverse: bool = False,
require_unlocked: bool = True,
) -> None:
"""Initialize class.
Args:
description: Description of what the plan is going to do.
graph: Local graph used for the plan.
context: Context object.
reverse: Transpose the graph for walking in reverse.
require_unlocked: Require the persistent graph to be
unlocked before executing steps.
"""
self.context = context
self.description = description
self.id = uuid.uuid4()
self.reverse = reverse
self.require_unlocked = require_unlocked
if self.reverse:
graph = graph.transposed()
if self.context:
self.locked = self.context.persistent_graph_locked
if self.context.stack_names:
nodes = [target for target in self.context.stack_names if graph.steps.get(target)]
graph = graph.filtered(nodes)
else:
self.locked = False
self.graph = graph
[docs]
def outline(self, level: int = logging.INFO, message: str = "") -> None:
"""Print an outline of the actions the plan is going to take.
The outline will represent the rough ordering of the steps that will be
taken.
Args:
level: a valid log level that should be used to log
the outline
message: a message that will be logged to
the user after the outline has been logged.
"""
LOGGER.log(level, 'plan "%s":', self.description)
for steps, step in enumerate(self.steps, start=1):
LOGGER.log(
level,
' - step: %s: target: "%s", action: "%s"',
steps,
step.name,
step.fn.__name__ if callable(step.fn) else step.fn,
)
if message:
LOGGER.log(level, message)
[docs]
def dump(
self,
*,
directory: str,
context: CfnginContext,
provider: Provider | None = None,
) -> Any:
"""Output the rendered blueprint for all stacks in the plan.
Args:
directory: Directory where files will be created.
context: Current CFNgin context.
provider: Provider to use when resolving the blueprints.
"""
LOGGER.info('dumping "%s"...', self.description)
dir_path = Path(directory).expanduser()
dir_path.mkdir(exist_ok=True, parents=True)
def walk_func(step: Step) -> bool:
"""Walk function."""
step.stack.resolve(context=context, provider=provider)
blueprint = step.stack.blueprint
filename = stack_template_key_name(blueprint)
path = dir_path / filename
path.parent.mkdir(exist_ok=True, parents=True)
LOGGER.info('writing stack "%s" -> %s', step.name, path)
with Path(path).open("w", encoding="utf-8") as _file:
_file.write(blueprint.rendered)
return True
return self.graph.walk(walk, walk_func)
[docs]
def execute(self, *args: Any, **kwargs: Any) -> None:
"""Walk each step in the underlying graph.
Raises:
PersistentGraphLocked: Raised if the persistent graph is
locked prior to execution and this session did not lock it.
PlanFailed: Raised if any of the steps fail.
"""
if self.locked and self.require_unlocked:
raise PersistentGraphLocked
self.walk(*args, **kwargs)
failed_steps = [step for step in self.steps if step.status == FAILED]
if failed_steps:
raise PlanFailed(failed_steps)
[docs]
def walk(self, walker: Callable[..., Any]) -> Any:
"""Walk each step in the underlying graph, in topological order.
Args:
walker: a walker function to be passed to :class:`runway.cfngin.dag.DAG`
to walk the graph.
"""
def walk_func(step: Step) -> bool:
"""Execute a :class:`Step` wile walking the graph.
Handles updating the persistent graph if one is being used.
Args:
step: :class:`Step` to execute.
"""
# Before we execute the step, we need to ensure that it's
# transitive dependencies are all in an "ok" state. If not, we
# won't execute this step.
for dep in self.graph.downstream(step.name):
if not dep.ok:
step.set_status(FailedStatus("dependency has failed"))
return step.ok
result = step.run()
if not self.context or not self.context.persistent_graph:
return result
if step.completed or (
step.skipped and step.status.reason == ("does not exist in cloudformation")
):
fn_name = step.fn.__name__ if callable(step.fn) else step.fn
if fn_name == "_destroy_stack":
self.context.persistent_graph.pop(step)
LOGGER.debug("removed step '%s' from the persistent graph", step.name)
elif fn_name == "_launch_stack":
self.context.persistent_graph.add_step_if_not_exists(
step, add_dependencies=True, add_dependents=True
)
LOGGER.debug("added step '%s' to the persistent graph", step.name)
else:
return result
self.context.put_persistent_graph(self.lock_code)
return result
return self.graph.walk(walker, walk_func)
@property
def lock_code(self) -> str:
"""Code to lock/unlock the persistent graph."""
return str(self.id)
@property
def steps(self) -> list[Step]:
"""Return a list of all steps in the plan."""
steps = self.graph.topological_sort()
steps.reverse()
return steps
@property
def step_names(self) -> list[str]:
"""Return a list of all step names."""
return [step.name for step in self.steps]
[docs]
def keys(self) -> list[str]:
"""Return a list of all step names."""
return self.step_names