"""Terraform version management."""
from __future__ import annotations
import hashlib
import json
import locale
import logging
import os
import platform
import re
import shutil
import subprocess
import sys
import tempfile
import zipfile
from typing import TYPE_CHECKING, Any, Final, cast, overload
from urllib.error import URLError
from urllib.request import urlretrieve
import hcl
import hcl2
import requests
from packaging.version import InvalidVersion
from ..compat import cached_property
from ..exceptions import HclParserError
from ..utils import FileHash, Version, get_hash_for_filename, merge_dicts
from . import EnvManager, handle_bin_download_error
if TYPE_CHECKING:
from collections.abc import Generator
from pathlib import Path
from types import ModuleType
from .._logging import RunwayLogger
LOGGER = cast("RunwayLogger", logging.getLogger(__name__))
TF_VERSION_FILENAME = ".terraform-version"
[docs]
def download_tf_release(
version: str,
versions_dir: Path,
command_suffix: str,
tf_platform: str | None = None,
arch: str | None = None,
) -> None:
"""Download Terraform archive and return path to it."""
version_dir = versions_dir / version
if arch is None:
arch = os.getenv("TFENV_ARCH", "amd64")
if tf_platform:
tfver_os = tf_platform + "_" + arch
elif platform.system().startswith("Darwin"):
tfver_os = f"darwin_{arch}"
elif platform.system().startswith("Windows") or (
platform.system().startswith("MINGW64")
or (platform.system().startswith("MSYS_NT") or platform.system().startswith("CYGWIN_NT"))
):
tfver_os = f"windows_{arch}"
else:
tfver_os = f"linux_{arch}"
download_dir = tempfile.mkdtemp()
filename = f"terraform_{version}_{tfver_os}.zip"
shasums_name = f"terraform_{version}_SHA256SUMS"
tf_url = "https://releases.hashicorp.com/terraform/" + version
try:
LOGGER.verbose("downloading Terraform from %s...", tf_url)
for i in [filename, shasums_name]:
urlretrieve(tf_url + "/" + i, os.path.join(download_dir, i)) # noqa: PTH118, S310
except URLError as exc:
handle_bin_download_error(exc, "Terraform")
tf_hash = get_hash_for_filename(
filename,
os.path.join(download_dir, shasums_name), # noqa: PTH118
)
checksum = FileHash(hashlib.sha256())
checksum.add_file(os.path.join(download_dir, filename)) # noqa: PTH118
if tf_hash != checksum.hexdigest:
LOGGER.error("downloaded Terraform %s does not match sha256 %s", filename, tf_hash)
sys.exit(1)
with zipfile.ZipFile(os.path.join(download_dir, filename)) as tf_zipfile: # noqa: PTH118
version_dir.mkdir(parents=True, exist_ok=True)
tf_zipfile.extractall(str(version_dir))
shutil.rmtree(download_dir)
result = version_dir / ("terraform" + command_suffix)
result.chmod(result.stat().st_mode | 0o0111) # ensure it is executable
[docs]
def get_available_tf_versions(include_prerelease: bool = False) -> list[str]:
"""Return available Terraform versions."""
tf_releases = json.loads(
requests.get("https://releases.hashicorp.com/index.json", timeout=30).text
)["terraform"]
# Remove versions that don't align with
# PEP440 (https://peps.python.org/pep-0440/)
for k, _v in list(tf_releases["versions"].items()):
try:
Version(k)
except InvalidVersion:
LOGGER.debug("InvalidVersion found, skipping %s. ", k)
del tf_releases["versions"][k]
tf_versions = sorted(
[k for k, _v in tf_releases["versions"].items()], # descending
key=Version,
reverse=True,
)
if include_prerelease:
return [i for i in tf_versions if i]
return [i for i in tf_versions if i and "-" not in i]
[docs]
def get_latest_tf_version(include_prerelease: bool = False) -> str:
"""Return latest Terraform version."""
return get_available_tf_versions(include_prerelease)[0]
[docs]
class TFEnvManager(EnvManager):
"""Terraform version management.
Designed to be compatible with https://github.com/tfutils/tfenv.
"""
VERSION_REGEX: Final[str] = r"^(Terraform v)?(?P<version>[0-9]+\.[0-9]+\.[0-9]+\S*)"
VERSION_OUTPUT_REGEX: Final[str] = (
r"^Terraform v(?P<version>[0-9]*\.[0-9]*\.[0-9]*)(?P<suffix>-.*)?"
)
[docs]
def __init__(self, path: Path | None = None) -> None:
"""Initialize class."""
super().__init__("terraform", "tfenv", path)
@cached_property
def backend(self) -> dict[str, Any]:
"""Backend config of the Terraform module."""
# Terraform can only have one backend configured; this formats the
# data to make it easier to work with
return next(
{"type": k, "config": v}
for k, v in self.terraform_block.get(
"backend", {None: cast("dict[str, str]", {})}
).items()
)
@cached_property
def terraform_block(self) -> dict[str, Any]: # noqa: C901
"""Collect Terraform configuration blocks from a Terraform module."""
@overload
def _flatten_lists(data: dict[str, Any]) -> dict[str, Any]: ...
@overload
def _flatten_lists(data: list[Any]) -> list[Any]: ...
@overload
def _flatten_lists(data: str) -> str: ...
def _flatten_lists(data: dict[str, Any] | list[Any] | Any) -> dict[str, Any] | Any:
"""Flatten HCL2 list attributes until its fixed.
python-hcl2 incorrectly turns all attributes into lists so we need
to flatten them so they are more similar to HCL.
https://github.com/amplify-education/python-hcl2/issues/6
Args:
data: Dict with lists to flatten.
"""
if not isinstance(data, dict): # TODO (kyle): improve with `typing.TypeIs` narrowing
return data
copy_data = cast("dict[str, Any]", data).copy()
for attr, val in copy_data.items():
if isinstance(val, list):
if len(cast("list[Any]", val)) == 1:
# pull single values out of lists
data[attr] = _flatten_lists(cast("Any", val[0]))
else:
data[attr] = [_flatten_lists(v) for v in cast("list[Any]", val)]
elif isinstance(val, dict):
data[attr] = _flatten_lists(cast("dict[str, Any]", val))
return cast("dict[str, Any]", data)
try:
result: dict[str, Any] | list[dict[str, Any]] = load_terraform_module(
hcl2, self.path
).get("terraform", cast("dict[str, Any]", {}))
except HclParserError as exc:
LOGGER.warning(exc)
LOGGER.warning("failed to parse as HCL2; trying HCL...")
try:
result = load_terraform_module(hcl, self.path).get(
"terraform", cast("dict[str, Any]", {})
)
except HclParserError as exc2:
LOGGER.warning(exc2)
# return an empty dict if we can't parse HCL
# let Terraform decide if it's actually valid
result = {}
# python-hcl2 turns all blocks into lists in v0.3.0. this flattens it.
if isinstance(result, list):
return _flatten_lists({k: v for i in result for k, v in i.items()})
return _flatten_lists(result)
@cached_property
def version(self) -> Version | None:
"""Terraform version."""
version_requested = self.current_version or self.get_version_from_file()
if not version_requested:
return None
if re.match(r"^min-required$", version_requested):
LOGGER.debug("tfenv: detecting minimal required version")
version_requested = self.get_min_required()
if re.match(r"^latest:.*$", version_requested):
regex = re.search(r"latest:(.*)", version_requested).group(1) # type: ignore
include_prerelease_versions = False
elif re.match(r"^latest$", version_requested):
regex = r"^[0-9]+\.[0-9]+\.[0-9]+$"
include_prerelease_versions = False
else:
regex = f"^{version_requested}$"
include_prerelease_versions = True
# Return early (i.e before reaching out to the internet) if the
# matching version is already installed
if (self.versions_dir / version_requested).is_dir():
self.current_version = version_requested
return self.parse_version_string(self.current_version)
try:
version = next(
i
for i in get_available_tf_versions(include_prerelease_versions)
if re.match(regex, i)
)
except StopIteration:
LOGGER.error("unable to find a Terraform version matching regex: %s", regex)
sys.exit(1)
self.current_version = version
return self.parse_version_string(self.current_version)
@cached_property
def version_file(self) -> Path | None:
"""Find and return a ".terraform-version" file if one is present.
Returns:
Path to the Terraform version file.
"""
for path in [self.path, self.path.parent]:
test_path = path / TF_VERSION_FILENAME
if test_path.is_file():
LOGGER.debug("using version file: %s", test_path)
return test_path
return None
[docs]
def get_min_required(self) -> str:
"""Get the defined minimum required version of Terraform.
Returns:
The minimum required version as defined in the module.
"""
version = self.terraform_block.get("required_version")
if version:
if re.match(r"^!=.+", version):
LOGGER.error(
"min required Terraform version is a negation (%s) "
"- unable to determine required version",
version,
)
sys.exit(1)
else:
version = re.search(r"[0-9]*\.[0-9]*(?:\.[0-9]*)?", version)
if version:
LOGGER.debug("detected minimum Terraform version is %s", version)
return version.group(0)
LOGGER.error(
"Terraform version specified as min-required, but unable to "
"find a specified version requirement in this module's tf files"
)
sys.exit(1)
[docs]
def get_version_from_file(self, file_path: Path | None = None) -> str | None:
"""Get Terraform version from a file.
Args:
file_path: Path to file that will be read.
"""
file_path = file_path or self.version_file
if file_path and file_path.is_file():
return file_path.read_text(
encoding=locale.getpreferredencoding(do_setlocale=False)
).strip()
LOGGER.debug("file path not provided and version file could not be found")
return None
[docs]
def install(self, version_requested: str | None = None) -> str:
"""Ensure Terraform is available."""
if version_requested:
self.set_version(version_requested)
if not self.version:
raise ValueError(
f"version not provided and unable to find a {TF_VERSION_FILENAME} file"
)
# Now that a version has been selected, skip downloading if it's
# already been downloaded
if (self.versions_dir / str(self.version)).is_dir():
LOGGER.verbose("Terraform version %s already installed; using it...", self.version)
return str(self.bin)
LOGGER.info("downloading and using Terraform version %s ...", self.version)
download_tf_release(str(self.version), self.versions_dir, self.command_suffix)
LOGGER.verbose("downloaded Terraform %s successfully", self.version)
return str(self.bin)
[docs]
def list_installed(self) -> Generator[Path, None, None]:
"""List installed versions of Terraform.
Only lists versions of Terraform that have been installed by an instance
if this class or by tfenv.
"""
LOGGER.verbose("checking %s for Terraform versions...", self.versions_dir)
return self.versions_dir.rglob("*.*.*")
[docs]
def set_version(self, version: str) -> None:
"""Set current version.
Clears cached values as needed.
Args:
version: Version string. Must be in the format of
``<major>.<minor>.<patch>`` with an optional ``-<prerelease>``.
"""
if self.current_version == version:
return
self.current_version = version
self._del_cached_property("version")
[docs]
@classmethod
def get_version_from_executable(
cls,
bin_path: Path | str,
*,
cwd: Path | str | None = None,
env: dict[str, str] | None = None,
) -> Version | None:
"""Get Terraform version from an executable.
Args:
bin_path: Path to the Terraform binary to retrieve the version from.
cwd: Current working directory to use when calling the executable.
env: Environment variable overrides.
"""
output = subprocess.check_output([str(bin_path), "-version"], cwd=cwd, env=env).decode()
match = re.search(cls.VERSION_OUTPUT_REGEX, output)
if not match:
return None
return cls.parse_version_string(output)
[docs]
@classmethod
def parse_version_string(cls, version: str) -> Version:
"""Parse version string into a :class:`Version`.
Args:
version: Version string to parse. Must be in the format of
``<major>.<minor>.<patch>`` with an optional ``-<prerelease>``.
"""
match = re.search(cls.VERSION_REGEX, version)
if not match:
raise ValueError(f"provided version doesn't conform to regex: {cls.VERSION_REGEX}")
return Version(match.group("version"))