Source code for runway.cfngin.hooks.docker.data_models
"""Hook data models.These are makeshift data models for use until Runway v2 is released and pydanticcan be used."""from__future__importannotationsfromtypingimportTYPE_CHECKING,Annotated,Any,ClassVar,castfromdocker.models.imagesimportImagefrompydanticimportConfigDict,Field,PrivateAttr,model_validatorfrom....core.providers.awsimportAccountDetailsfrom....utilsimportBaseModel,MutableMapifTYPE_CHECKING:from....contextimportCfnginContextECR_REPO_FQN_TEMPLATE="{aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/{repo_name}"
[docs]classElasticContainerRegistry(BaseModel):"""AWS Elastic Container Registry."""PUBLIC_URI_TEMPLATE:ClassVar[str]="public.ecr.aws/{registry_alias}/"URI_TEMPLATE:ClassVar[str]="{aws_account_id}.dkr.ecr.{aws_region}.amazonaws.com/"model_config=ConfigDict(arbitrary_types_allowed=True,populate_by_name=True)account_id:str|None=None"""AWS account ID that owns the registry being logged into."""alias:str|None=None"""If it is a public repository, the alias of the repository."""public:bool=True"""Whether the repository is public."""region:str|None=Field(default=None,alias="aws_region")"""AWS region where the registry is located."""@propertydeffqn(self)->str:"""Fully qualified ECR name."""ifself.public:returnself.PUBLIC_URI_TEMPLATE.format(registry_alias=self.alias)returnself.URI_TEMPLATE.format(aws_account_id=self.account_id,aws_region=self.region)@model_validator(mode="before")@classmethoddef_set_defaults(cls,values:dict[str,Any])->dict[str,Any]:"""Set default values based on other values."""values.setdefault("public",bool(values.get("alias")))ifnotvalues["public"]:account_id=values.get("account_id")ctx:CfnginContext|None=values.get("context")aws_region=values.get("aws_region")ifnotctxandnot(account_idoraws_region):raiseValueError("context is required to resolve values")ifctx:ifnotaccount_id:values["account_id"]=AccountDetails(ctx).idifnotaws_region:values["aws_region"]=ctx.env.aws_regionor"us-east-1"returnvalues
[docs]classDockerImage(BaseModel):"""Wrapper for :class:`docker.models.images.Image`."""model_config=ConfigDict(arbitrary_types_allowed=True)_repo:str|None=PrivateAttr(default=None)image:Image@propertydefid(self)->str:"""ID of the image."""returnself.image.id@propertydefrepo(self)->str:"""Repository URI."""ifnotself._repo:self._repo=self.image.attrs["RepoTags"][0].rsplit(":",1)[0]returncast("str",self._repo)@propertydefshort_id(self)->str:"""ID of the image truncated to 10 characters plus the ``sha256:`` prefix."""returnself.image.short_id@propertydeftags(self)->list[str]:"""List of image tags."""self.image.reload()return[uri.split(":")[-1]foruriinself.image.tags]@propertydefuri(self)->MutableMap:"""Return a mapping of tag to image URI."""returnMutableMap(**{uri.split(":")[-1]:uriforuriinself.image.tags})def__bool__(self)->bool:"""Evaluate the boolean value of the object instance."""returnTrue
[docs]classElasticContainerRegistryRepository(BaseModel):"""AWS Elastic Container Registry (ECR) Repository."""model_config=ConfigDict(populate_by_name=True)name:Annotated[str,Field(alias="repo_name")]"""The name of the repository."""registry:ElasticContainerRegistry"""Information about an ECR registry."""@propertydeffqn(self)->str:"""Fully qualified ECR repo name."""returnself.registry.fqn+self.name