Source code for mosaic_orchestrator.pdk

"""this module contains the abstract PDK implementation"""
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List

from semantic_version import Version, NpmSpec

from mosaic_orchestrator.result import Result
from mosaic_orchestrator.protocols import LifeCycleListener


[docs]@dataclass class PdkItem: """Single entry in the PDK, subclass to implement more functionality""" version: str name: str def __init__(self, name: str, version: str) -> None: self.name = name self.version = version def _try_init(self) -> bool: """is called during initialization, can be used for custom validation""" return True
[docs]@dataclass class FileBasePdkItem(PdkItem): """File PdkItem implementation. Contains a path, validation fails if file not exists.""" path: str = None def _try_init(self) -> bool: return self.exists()
[docs] def exists(self) -> bool: """checks if th file exists""" return os.path.exists(self.path)
[docs]@dataclass class Library: """A Library can be used to combine related Items""" name: str items: List[PdkItem]
def _get_highest_version(version_matches): """returns the highest version matches the pattern""" return sorted(version_matches, key=lambda v: Version(v.version))[-1]
[docs]class PDK(ABC): """abstract base PDK implementation, use RootPDK for lifecycle dependent PDKs. Implementation of the `task_hash` method is mandatory in order to include the PDK in the state representation of a `CachableTask`. """ @property def name(self): """Return the name of the pdk """ return f"There is not a name specified for this PDK {self}."
[docs] def get_item(self, name, versions: str = None) -> Result[PdkItem]: """get a item from the pdk identified by its name and a versions string. The versions string follows the NPM range specification scheme (https://docs.npmjs.com/about-semantic-versioning). When multiple versions match the newest one is returned. Returns: A `Result` object containing either the item or an error. """ name_matches = list(filter(lambda v: v.name == name, self.supported_items())) if not name_matches: return Result.Fail(f"PDKItem {name} in version {versions} not found.") if versions is None: # If no version is specified, then take the first entry return Result.Ok(name_matches[0]) def find_exact_match(): for name_match in name_matches: if name_match.version == versions: # found an exact match return Result.Ok(name_match) return Result.Fail(f"PDKItem {name} in version {versions} not found.") try: NpmSpec(versions) except ValueError as error: # We are likly to look for an exact match print(f"PDKItem {name} in version {versions} not found. " f"Semantic format is not correct. Are you sure you do not have a typo? {error}") return find_exact_match() version_matches = [] for name_match in name_matches: try: if Version(name_match.version) in NpmSpec(versions): version_matches.append(name_match) except ValueError as error: # pdk item has a strange version format # ignore it print(f"[WARN] PDK item {name_match} has a strange version format: {error}") continue if version_matches: return Result.Ok(_get_highest_version(version_matches)) # It could be that we still looking for an exact match return find_exact_match()
[docs] @abstractmethod def supported_items(self) -> List[PdkItem]: """implement this method to provide a list of all items supported by your implementation.""" raise NotImplementedError("supported_items not implemented")
[docs] @abstractmethod def get_library(self, name) -> Library: """implement this method to provide a library for the given name""" raise NotImplementedError("get_library not implemented")
[docs] @abstractmethod def supported_libraries(self) -> List[Library]: """implement this method to provide a list of all libraries supported by your implementation.""" raise NotImplementedError("supported_libraries not implemented")
[docs] def task_hash(self): """represents the state of this PDK. The same function will be used for all PDK items in the task hierarchy. """ return self.name
[docs]class RootPDK(PDK, ABC, LifeCycleListener): """Abstract base class for a PDK. """ def _on_start(self): """Is called before run() is executed. Can be used for example to initialize the PDK or connect to a database""" def _on_end(self, exception: Exception = None): """Is called after run() is executed. Can be used to de-initialize the PDK, e.g. disconnect from DB or close file..."""