Skip to content
Snippets Groups Projects

Dedal Release

Merged Adrian Ciu requested to merge dev into master
2 unresolved threads
19 files
+ 367
91
Compare changes
  • Side-by-side
  • Inline
Files
19
@@ -2,9 +2,9 @@ import os
import oras.client
from pathlib import Path
from dedal.build_cache.BuildCacheManagerInterface import BuildCacheManagerInterface
from dedal.logger.logger_builder import get_logger
from dedal.utils.utils import clean_up
from esd.build_cache.BuildCacheManagerInterface import BuildCacheManagerInterface
from esd.logger.logger_builder import get_logger
from esd.utils.utils import clean_up
class BuildCacheManager(BuildCacheManagerInterface):
"""
This class aims to manage the push/pull/delete of build cache files
"""
def __init__(self, registry_host, registry_project, registry_username, registry_password, cache_version='cache',
auth_backend='basic',
insecure=False):
self._logger = get_logger(__name__, BuildCacheManager.__name__)
self._registry_project = registry_project
def __init__(self, auth_backend='basic', insecure=False):
self.logger = get_logger(__name__, BuildCacheManager.__name__)
self.home_path = Path(os.environ.get("HOME_PATH", os.getcwd()))
self.registry_project = os.environ.get("REGISTRY_PROJECT")
self._registry_username = registry_username
self._registry_password = registry_password
self._registry_username = str(os.environ.get("REGISTRY_USERNAME"))
self._registry_password = str(os.environ.get("REGISTRY_PASSWORD"))
self._registry_host = registry_host
self.registry_host = str(os.environ.get("REGISTRY_HOST"))
# Initialize an OrasClient instance.
# This method utilizes the OCI Registry for container image and artifact management.
# Refer to the official OCI Registry documentation for detailed information on the available authentication methods.
# Supported authentication types may include basic authentication (username/password), token-based authentication,
self._client = oras.client.OrasClient(hostname=self._registry_host, auth_backend=auth_backend,
insecure=insecure)
self._client.login(username=self._registry_username, password=self._registry_password)
self.cache_version = cache_version
self._oci_registry_path = f'{self._registry_host}/{self._registry_project}/{self.cache_version}'
self.client = oras.client.OrasClient(hostname=self.registry_host, auth_backend=auth_backend, insecure=insecure)
self.client.login(username=self._registry_username, password=self._registry_password)
self.oci_registry_path = f'{self.registry_host}/{self.registry_project}/cache'
def upload(self, out_dir: Path):
"""
This method pushed all the files from the build cache folder into the OCI Registry
"""
build_cache_path = out_dir.resolve()
build_cache_path = self.home_path / out_dir
# build cache folder must exist before pushing all the artifacts
if not build_cache_path.exists():
self._logger.error(f"Path {build_cache_path} not found.")
self.logger.error(f"Path {build_cache_path} not found.")
for sub_path in build_cache_path.rglob("*"):
if sub_path.is_file():
rel_path = str(sub_path.relative_to(build_cache_path)).replace(str(sub_path.name), "")
target = f"{self._registry_host}/{self._registry_project}/{self.cache_version}:{str(sub_path.name)}"
rel_path = str(sub_path.relative_to(build_cache_path)).replace(str(sub_path.env_name), "")
target = f"{self.registry_host}/{self.registry_project}/cache:{str(sub_path.env_name)}"
try:
self._logger.info(f"Pushing folder '{sub_path}' to ORAS target '{target}' ...")
self._client.push(
self.logger.info(f"Pushing folder '{sub_path}' to ORAS target '{target}' ...")
self.client.push(
files=[str(sub_path)],
target=target,
# save in manifest the relative path for reconstruction
manifest_annotations={"path": rel_path},
disable_path_validation=True,
)
self._logger.info(f"Successfully pushed {sub_path.name}")
self.logger.info(f"Successfully pushed {sub_path.env_name}")
except Exception as e:
self._logger.error(
self.logger.error(
f"An error occurred while pushing: {e}")
# todo to be discussed hot to delete the build cache after being pushed to the OCI Registry
# clean_up([str(build_cache_path)], self.logger)
@@ -66,38 +63,37 @@ class BuildCacheManager(BuildCacheManagerInterface):
This method retrieves all tags from an OCI Registry
"""
try:
return self._client.get_tags(self._oci_registry_path)
return self.client.get_tags(self.oci_registry_path)
except Exception as e:
self._logger.error(f"Failed to list tags: {e}")
self.logger.error(f"Failed to list tags: {e}")
return None
def download(self, in_dir: Path):
"""
This method pulls all the files from the OCI Registry into the build cache folder
"""
build_cache_path = in_dir.resolve()
build_cache_path = self.home_path / in_dir
# create the buildcache dir if it does not exist
os.makedirs(build_cache_path, exist_ok=True)
tags = self.list_tags()
if tags is not None:
for tag in tags:
ref = f"{self._registry_host}/{self._registry_project}/{self.cache_version}:{tag}"
ref = f"{self.registry_host}/{self.registry_project}/cache:{tag}"
# reconstruct the relative path of each artifact by getting it from the manifest
cache_path = \
self._client.get_manifest(
f'{self._registry_host}/{self._registry_project}/{self.cache_version}:{tag}')[
self.client.get_manifest(f'{self.registry_host}/{self.registry_project}/cache:{tag}')[
'annotations'][
'path']
try:
self._client.pull(
self.client.pull(
ref,
# missing dirs to output dir are created automatically by OrasClient pull method
outdir=str(build_cache_path / cache_path),
overwrite=True
)
self._logger.info(f"Successfully pulled artifact {tag}.")
self.logger.info(f"Successfully pulled artifact {tag}.")
except Exception as e:
self._logger.error(
self.logger.error(
f"Failed to pull artifact {tag} : {e}")
def delete(self):
@@ -110,8 +106,8 @@ class BuildCacheManager(BuildCacheManagerInterface):
tags = self.list_tags()
if tags is not None:
try:
self._client.delete_tags(self._oci_registry_path, tags)
self._logger.info(f"Successfully deleted all artifacts form OCI registry.")
self.client.delete_tags(self.oci_registry_path, tags)
self.logger.info(f"Successfully deleted all artifacts form OCI registry.")
except RuntimeError as e:
self._logger.error(
self.logger.error(
f"Failed to delete artifacts: {e}")