Source code for runway.cfngin.hooks.docker.image._push
"""Docker image push hook.Replicates the functionality of the ``docker image push`` CLI command."""from__future__importannotationsimportloggingfromtypingimportTYPE_CHECKING,Annotated,AnyfrompydanticimportConfigDict,Field,field_validatorfrom.....contextimportCfnginContextfrom.....utilsimportBaseModelfrom..data_modelsimport(DockerImage,ElasticContainerRegistry,ElasticContainerRegistryRepository,)from..hook_dataimportDockerHookDataifTYPE_CHECKING:frompydanticimportValidationInfoLOGGER=logging.getLogger(__name__.replace("._","."))
[docs]classImagePushArgs(BaseModel):"""Args passed to image.push."""model_config=ConfigDict(arbitrary_types_allowed=True)ctx:Annotated[CfnginContext|None,Field(alias="context",exclude=True)]=Noneecr_repo:ElasticContainerRegistryRepository|None=None# depends on ctx"""AWS Elastic Container Registry repository information. Providing this will automatically construct the repo URI. If provided, do not provide ``repo``. If using a private registry, only ``repo_name`` is required. If using a public registry, ``repo_name`` and ``registry_alias``. """image:DockerImage|None=None"""Image to push."""repo:Annotated[str|None,Field(validate_default=True)]=None# depends on ecr_repo & image"""URI of a non Docker Hub repository where the image will be stored."""tags:Annotated[list[str],Field(validate_default=True)]=[]# depends on image"""List of tags to push."""@field_validator("ecr_repo",mode="before")@classmethoddef_set_ecr_repo(cls,v:Any,info:ValidationInfo)->Any:"""Set the value of ``ecr_repo``."""ifvandisinstance(v,dict):returnElasticContainerRegistryRepository.model_validate({"repo_name":v.get("repo_name"),"registry":ElasticContainerRegistry.model_validate({"account_id":v.get("account_id"),"alias":v.get("registry_alias"),"aws_region":v.get("aws_region"),"context":info.data.get("context"),}),})returnv@field_validator("repo",mode="before")@classmethoddef_set_repo(cls,v:str|None,info:ValidationInfo)->str|None:"""Set the value of ``repo``."""ifv:returnvimage:DockerImage|None=info.data.get("image")ifimage:returnimage.repoecr_repo:ElasticContainerRegistryRepository|None=info.data.get("ecr_repo")ifecr_repo:returnecr_repo.fqnreturnNone@field_validator("tags",mode="before")@classmethoddef_set_tags(cls,v:list[str],info:ValidationInfo)->list[str]:"""Set the value of ``tags``."""ifv:returnvimage:DockerImage|None=info.data.get("image")ifimage:returnimage.tagsreturn["latest"]
[docs]defpush(*,context:CfnginContext,**kwargs:Any)->DockerHookData:"""Docker image push hook. Replicates the functionality of ``docker image push`` CLI command. kwargs are parsed by :class:`~runway.cfngin.hooks.docker.image.ImagePushArgs`. """args=ImagePushArgs.model_validate({"context":context,**kwargs})docker_hook_data=DockerHookData.from_cfngin_context(context)LOGGER.info("pushing image %s...",args.repo)fortaginargs.tags:docker_hook_data.client.images.push(repository=args.repo,tag=tag)LOGGER.info("successfully pushed image %s:%s",args.repo,tag)returndocker_hook_data.update_context(context)