"""Validate Docker Repository"""
import base64
import urllib.parse
import requests
from requests import Response
ENDPOINT_MAPPING = {
"dockerhub": "https://registry.hub.docker.com",
"synapse": "https://docker.synapse.org",
}
[docs]class DockerRepository:
"""Forms request url and gets the docker respository
with requests packages
"""
def __init__(self, docker_repo: str, docker_digest: str, index_endpoint: str):
"""
Args:
docker_repo: Docker repository without tags or sha
docker_digest: Docker repository sha digest
index_endpoint: Docker registry endpoint.
Dockerhub - https://registry.hub.docker.com
Synapse - https://docker.synapse.org
"""
self.docker_repo = docker_repo
self.docker_digest = docker_digest
self.index_endpoint = index_endpoint
def __str__(self):
"""str representation of docker repository"""
return f"{self.docker_repo}@{self.docker_digest}"
[docs] def get_request_url(self):
"""Gets request URL"""
url = "/".join(["v2", self.docker_repo, "manifests", self.docker_digest])
docker_request_url = urllib.parse.urljoin(self.index_endpoint, url)
return docker_request_url
def _get_bearer_token_url(self):
"""Gets bearer token URL"""
initial_request = requests.get(self.get_request_url())
www_auth = initial_request.headers["Www-Authenticate"]
auth_headers = www_auth.replace('"', "").split(",")
# Creates a mapping of the authentication headers to its values
auth_mapping = {head.split("=")[0]: head.split("=")[1] for head in auth_headers}
return "{0}?service={1}&scope={2}".format(
auth_mapping["Bearer realm"], auth_mapping["service"], auth_mapping["scope"]
)
def _get_bearer_token(self, username: str = None, password: str = None):
"""Gets Docker bearer token
Args:
user: Synapse username
password: Synapse password
Returns:
Bearer token
"""
bearer_token_url = self._get_bearer_token_url()
auth_string = f"{username}:{password}"
auth = base64.b64encode(auth_string.encode()).decode()
headers = {"Authorization": f"Basic {auth}"}
bearer_token_request = requests.get(bearer_token_url, headers=headers)
if bearer_token_request.status_code != 200:
raise ValueError(bearer_token_request.json().get("details"))
return bearer_token_request.json()["token"]
[docs] def get(self, **kwargs):
"""Gets docker repository response
Args:
**kwargs: username: Docker registry username
password: Docker registry password
Returns:
Docker response
"""
token = self._get_bearer_token(**kwargs)
resp = requests.get(
self.get_request_url(), headers={"Authorization": "Bearer %s" % token}
)
return resp
[docs]def check_docker_exists(docker_resp: "Response"):
"""Check if Docker image + sha exists
Args:
docker_resp: Docker response
Raises:
ValueError: If docker image and sha doesn't exist
"""
if docker_resp.status_code != 200:
raise ValueError("Docker image and sha digest must exist.")
[docs]def check_docker_size(docker_resp: Response, size: int = 1000):
"""Checks Docker container is less than specified size
Args:
docker_resp: Docker response
size: Size in GB
Raises:
ValueError: Docker container is over specified size
"""
docker_size = sum([layer["size"] for layer in docker_resp.json()["layers"]])
if docker_size / 1000000000.0 >= size:
raise ValueError("Docker image must be less than a terabyte.")
[docs]def validate_docker(
docker_repo: str,
docker_digest: str,
index_endpoint: str,
username: str = None,
password: str = None,
):
"""Validates a Docker Respository
Args:
docker_repo: Docker repository without tags or sha
docker_digest: Docker repository sha digest
index_endpoint: Docker registry endpoint.
Dockerhub - https://registry.hub.docker.com
Synapse - https://docker.synapse.org
username: Docker registry username
password: Docker registry password
Returns:
True if valid, False if not
"""
docker_cls = DockerRepository(
docker_repo=docker_repo,
docker_digest=docker_digest,
index_endpoint=index_endpoint,
)
docker_resp = docker_cls.get(username=username, password=password)
check_docker_exists(docker_resp)
check_docker_size(docker_resp)
return True