Source code for runway.cfngin.hooks.ecr._purge_repository
"""Purge all images from an ECR repository."""from__future__importannotationsimportloggingfromtypingimportTYPE_CHECKING,Anyfrom....utilsimportBaseModelifTYPE_CHECKING:frommypy_boto3_ecr.clientimportECRClientfrommypy_boto3_ecr.type_defsimportImageIdentifierTypeDeffrom....contextimportCfnginContextLOGGER=logging.getLogger(__name__.replace("._","."))classHookArgs(BaseModel):"""Hook arguments for ``purge_repository``."""repository_name:str"""Name of the repository to purge."""defdelete_ecr_images(client:ECRClient,image_ids:list[ImageIdentifierTypeDef],repository_name:str,)->None:"""Delete images from an ECR repository."""response=client.batch_delete_image(repositoryName=repository_name,imageIds=image_ids)ifresponse.get("failures"):formsginresponse["failures"]:LOGGER.info("failed to delete image %s: (%s) %s",msg.get("imageId",{}).get("imageDigest")ormsg.get("imageId",{}).get("imageTag"),msg.get("failureCode"),msg.get("failureReason"),)raiseValueError("failures present in response")deflist_ecr_images(client:ECRClient,repository_name:str)->list[ImageIdentifierTypeDef]:"""List all images in an ECR repository."""image_ids:list[ImageIdentifierTypeDef]=[]try:response=client.list_images(repositoryName=repository_name,filter={"tagStatus":"ANY"})image_ids.extend(response["imageIds"])while"nextToken"inresponse:response=client.list_images(filter={"tagStatus":"ANY"},nextToken=response["nextToken"],repositoryName=repository_name,)image_ids.extend(response["imageIds"])return[{"imageDigest":digest}fordigestin{image.get("imageDigest")forimageinimage_ids}ifdigest]exceptclient.exceptions.RepositoryNotFoundException:LOGGER.info("repository %s does not exist",repository_name)return[]
[docs]defpurge_repository(context:CfnginContext,*_args:Any,**kwargs:Any)->dict[str,str]:"""Purge all images from an ECR repository. Args: context: CFNgin context object. **kwargs: Arbitrary keyword arguments. """args=HookArgs.model_validate(kwargs)client=context.get_session().client("ecr")image_ids=list_ecr_images(client,repository_name=args.repository_name)ifnotimage_ids:LOGGER.info("no images found in repository %s",args.repository_name)return{"status":"skipped"}delete_ecr_images(client,image_ids=image_ids,repository_name=args.repository_name)LOGGER.info("purged all images from repository")return{"status":"success"}