Source code for runway.cfngin.hooks.utils

"""Hook utils."""

from __future__ import annotations

import collections.abc
import logging
import sys
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any

import pydantic

from ...exceptions import FailedVariableLookup
from ...utils import BaseModel, load_object_from_string
from ...variables import Variable, resolve_variables
from ..blueprints.base import Blueprint

if TYPE_CHECKING:
    from ...config.models.cfngin import CfnginHookDefinitionModel
    from ...context import CfnginContext
    from ..providers.aws.default import Provider

LOGGER = logging.getLogger(__name__)


[docs] class BlankBlueprint(Blueprint): """Blueprint that can be built programmatically."""
[docs] def create_template(self) -> None: """Create template without raising NotImplementedError."""
# TODO (kyle): BREAKING move to runway.providers.aws.models.TagModel
[docs] class TagDataModel(BaseModel): """AWS Resource Tag data model.""" model_config = pydantic.ConfigDict(extra="forbid", populate_by_name=True) key: Annotated[str, pydantic.Field(alias="Key")] value: Annotated[str, pydantic.Field(alias="Value")]
[docs] def full_path(path: str) -> str: """Return full path.""" return str(Path(path).absolute())
# TODO (kyle): split up to reduce number of statements
[docs] def handle_hooks( # noqa: C901, PLR0912, PLR0915 stage: str, hooks: list[CfnginHookDefinitionModel], provider: Provider, context: CfnginContext, ) -> None: """Handle pre/post_deploy hooks. These are pieces of code that we want to run before/after deploying stacks. Args: stage: The current stage (pre_run, post_run, etc). hooks: Hooks to execute. provider: Provider instance. context: Context instance. """ if not hooks: LOGGER.debug("no %s hooks defined", stage) return hook_paths: list[str] = [] for i, hook in enumerate(hooks): try: hook_paths.append(hook.path) except KeyError as exc: raise ValueError(f"{stage} hook #{i} missing path.") from exc LOGGER.info("executing %s hooks: %s", stage, ", ".join(hook_paths)) for hook in hooks: if not hook.enabled: LOGGER.debug("hook with method %s is disabled; skipping", hook.path) continue try: method = load_object_from_string(hook.path, try_reload=True) except (AttributeError, ImportError): LOGGER.exception("unable to load method at %s", hook.path) if hook.required: raise continue if hook.args: args = [Variable(k, v) for k, v in hook.args.items()] try: # handling for output or similar being used in pre_deploy resolve_variables(args, context, provider) except FailedVariableLookup: if "pre" in stage: LOGGER.error( "lookups that change the order of execution, like " '"output", can only be used in "post_*" hooks; ' "please ensure that the hook being used does " "not rely on a stack, hook_data, or context that " "does not exist yet" ) raise kwargs: dict[str, Any] = {v.name: v.value for v in args} else: kwargs = {} try: if isinstance(method, type): result: Any = getattr(method(context=context, provider=provider, **kwargs), stage)() else: result = method(context=context, provider=provider, **kwargs) except Exception: LOGGER.exception("hook %s threw an exception", hook.path) if hook.required: raise continue if not result: if hook.required: LOGGER.error("required hook %s failed; return value: %s", hook.path, result) sys.exit(1) LOGGER.warning("non-required hook %s failed; return value: %s", hook.path, result) elif isinstance(result, (collections.abc.Mapping, pydantic.BaseModel)): if hook.data_key: LOGGER.debug( "adding result for hook %s to context in data_key %s", hook.path, hook.data_key, ) context.set_hook_data(hook.data_key, result) else: LOGGER.debug( "hook %s returned result data but no data key set; ignoring", hook.path, )