Source code for openvariant.annotation.annotation

"""
Annotation
====================================
A core class to represent the schema which files will be parsed.
"""
import logging
import re

from typing import List
from yaml import safe_load, YAMLError

from openvariant.utils.utils import import_class_from_module
from openvariant.annotation.config_annotation import (AnnotationGeneralKeys, AnnotationKeys, AnnotationTypes,
                                                      ExcludesKeys, DEFAULT_FORMAT, DEFAULT_DELIMITER,
                                                      AnnotationFormat, AnnotationDelimiter)


def _check_general_keys(annot: dict) -> None:
    """Check if general annotations are writen in a proper format"""
    # Pattern key
    if AnnotationGeneralKeys.PATTERN.value not in annot or not isinstance(
            annot[AnnotationGeneralKeys.PATTERN.value], list) \
            and not all(isinstance(x, str) for x in annot[AnnotationGeneralKeys.PATTERN.value]):
        raise KeyError(f"'{AnnotationGeneralKeys.PATTERN.value}' key not found or is not a str.")

    # Recursive key
    if AnnotationGeneralKeys.RECURSIVE.value in annot and \
            not isinstance(annot[AnnotationGeneralKeys.RECURSIVE.value], bool):
        raise KeyError(f"'{AnnotationGeneralKeys.RECURSIVE.value}' key is not a boolean.")

    # Format key
    if AnnotationGeneralKeys.FORMAT.value in annot and \
            (not isinstance(annot[AnnotationGeneralKeys.FORMAT.value], str) or
             annot[AnnotationGeneralKeys.FORMAT.value].upper() not in [e.name for e in AnnotationFormat]):
        raise KeyError(f"'{AnnotationGeneralKeys.FORMAT.value}' key is not a string.")

    # Delimiter key
    if AnnotationGeneralKeys.DELIMITER.value in annot and \
            (not isinstance(annot[AnnotationGeneralKeys.DELIMITER.value], str) or
             annot[AnnotationGeneralKeys.DELIMITER.value].upper() not in [e.name for e in AnnotationDelimiter]):
        raise KeyError(f"'{AnnotationGeneralKeys.DELIMITER.value}' key is not valid or is not a string.")

    # Columns key
    if AnnotationGeneralKeys.COLUMNS.value in annot and \
            not isinstance(annot[AnnotationGeneralKeys.COLUMNS.value], list):
        raise KeyError(f"'{AnnotationGeneralKeys.COLUMNS.value}' key is not a list.")

    # Annotations key
    if AnnotationGeneralKeys.ANNOTATION.value in annot and \
            not isinstance(annot[AnnotationGeneralKeys.ANNOTATION.value], list):
        raise KeyError(f"'{AnnotationGeneralKeys.ANNOTATION.value}' key is not a list.")

    # Excludes key
    if AnnotationGeneralKeys.EXCLUDE.value in annot and \
            (not isinstance(annot[AnnotationGeneralKeys.EXCLUDE.value], list) or
             not all([ExcludesKeys.FIELD.value in x and ExcludesKeys.VALUE.value in x
                      for x in annot[AnnotationGeneralKeys.EXCLUDE.value]])):
        raise KeyError(f"'{AnnotationGeneralKeys.EXCLUDE.value}' key in bad format.")


def _check_annotation_keys(annot: dict) -> None:
    """Check if annotation keys are writen in a proper format"""
    # Type key
    if AnnotationKeys.TYPE.value not in annot or not isinstance(annot[AnnotationKeys.TYPE.value], str):
        raise KeyError(f"'{AnnotationKeys.TYPE.value}' key not found or is not a str.")
    if annot[AnnotationKeys.TYPE.value] not in [e.value for e in AnnotationTypes]:
        raise ValueError(f"'{AnnotationKeys.TYPE.value}' value is wrong.")

    # Field key
    if AnnotationKeys.FIELD.value not in annot or not isinstance(annot[AnnotationKeys.FIELD.value], str):
        raise KeyError(f"'{AnnotationKeys.FIELD.value}' key not found or is not a str.")

    # Value key
    if (annot[AnnotationKeys.TYPE.value] == AnnotationTypes.STATIC.value) and \
            not isinstance(annot[AnnotationKeys.VALUE.value], str):
        raise KeyError(f"'{AnnotationKeys.VALUE.value}' key not found or is not a str.")

    # Field source key
    if (annot[AnnotationKeys.TYPE.value] == AnnotationTypes.INTERNAL.value or
        annot[AnnotationKeys.TYPE.value] == AnnotationTypes.PLUGIN.value or
        annot[AnnotationKeys.TYPE.value] == AnnotationTypes.MAPPING.value) and \
            AnnotationKeys.FIELD_SOURCE.value in annot and \
            not isinstance(annot[AnnotationKeys.FIELD_SOURCE.value], list):
        raise KeyError(f"'{AnnotationKeys.FIELD_SOURCE.value}' key not found or is not a list.")

    # Dirname and filename key
    if (annot[AnnotationKeys.TYPE.value] == AnnotationTypes.DIRNAME.value or
        annot[AnnotationKeys.TYPE.value] == AnnotationTypes.FILENAME.value or
        annot[AnnotationKeys.TYPE.value] == AnnotationTypes.INTERNAL.value) and \
            AnnotationKeys.FUNCTION.value in annot and \
            re.compile("lambda[' ']+[a-zA-Z0-9]+[' ']*:[' ']*.*").search(annot[AnnotationKeys.FUNCTION.value]) is None:
        raise ValueError(f"'{AnnotationKeys.FUNCTION.value}' value is not an appropriated lambda function.")

    # Plugin key
    if annot[AnnotationKeys.TYPE.value] == AnnotationTypes.PLUGIN.value and \
            AnnotationKeys.PLUGIN.value not in annot:
        raise KeyError(f"'{AnnotationKeys.PLUGIN.value}' key not found.")

    if annot[AnnotationKeys.TYPE.value] == AnnotationTypes.PLUGIN.value and \
            (AnnotationKeys.PLUGIN.value in annot and not isinstance(annot[AnnotationKeys.PLUGIN.value], str)):
        raise ValueError(f"'{AnnotationKeys.PLUGIN.value}' is not a str.")

    # Mapping keys
    if annot[AnnotationKeys.TYPE.value] == AnnotationTypes.MAPPING.value and \
            (AnnotationKeys.FIELD_SOURCE.value not in annot or
             AnnotationKeys.FIELD_MAPPING.value not in annot or
             AnnotationKeys.FILE_MAPPING.value not in annot or
             AnnotationKeys.FIELD_VALUE.value not in annot or
             not isinstance(annot[AnnotationKeys.FIELD_SOURCE.value], list) or
             not isinstance(annot[AnnotationKeys.FIELD_MAPPING.value], str) or
             not isinstance(annot[AnnotationKeys.FILE_MAPPING.value], str) or
             not isinstance(annot[AnnotationKeys.FIELD_VALUE.value], str)):
        raise KeyError(f"'{AnnotationTypes.MAPPING.value}' not annotated well.")


[docs]class Annotation: """A representation of the schema that files will be parsed""" def _read_annotation_file(self) -> dict: """Read annotation file with YAML package""" with open(self._path, 'r') as stream: try: return safe_load(stream) except YAMLError as exc: logging.error(exc) stream.close() def _check_columns(self) -> None: """Check if columns exists as annotation fields""" for col in self._columns: if col not in self._annotations: raise KeyError(f"'{col}' column unable to find.") def __init__(self, annotation_path: str) -> None: """ Inits Annotation with annotation file path. Parameters --------- annotation_path : str A string path where Annotation file is located. """ self._path = annotation_path raw_annotation = self._read_annotation_file() _check_general_keys(raw_annotation) for annot in raw_annotation.get(AnnotationGeneralKeys.ANNOTATION.value, []): _check_annotation_keys(annot) patterns = raw_annotation[AnnotationGeneralKeys.PATTERN.value] self._patterns = patterns if isinstance(patterns, List) else [patterns] self._recursive = raw_annotation.get(AnnotationGeneralKeys.RECURSIVE.value, True) self._delimiter = raw_annotation.get(AnnotationGeneralKeys.DELIMITER.value, DEFAULT_DELIMITER).upper() self._format = raw_annotation.get(AnnotationGeneralKeys.FORMAT.value, DEFAULT_FORMAT).replace('.', '') self._excludes: dict = {} for k in raw_annotation.get(AnnotationGeneralKeys.EXCLUDE.value, []): key_exclude = k[AnnotationKeys.FIELD.value] value_exclude = k[AnnotationKeys.VALUE.value] if key_exclude in self._excludes: self._excludes[key_exclude].append(value_exclude) else: self._excludes[key_exclude] = [value_exclude] self._annotations: dict = {} for k in raw_annotation.get(AnnotationGeneralKeys.ANNOTATION.value, []): class_name = k[AnnotationKeys.TYPE.value].upper() module_name = "openvariant.annotation.builder" ClassAnnotation = import_class_from_module(module_name, class_name) instance = ClassAnnotation() self._annotations[k[AnnotationKeys.FIELD.value]] = instance(k, self._path) self._columns = raw_annotation.get(AnnotationGeneralKeys.COLUMNS.value, list(self.annotations.keys())) self._check_columns() @property def path(self) -> str: """str: path where annotation file is located""" return self._path @property def patterns(self) -> List[str]: """List[str]: files patterns that annotation will match""" return self._patterns @property def format(self) -> str: """str: output format that will have parsed files""" return self._format @property def delimiter(self) -> str: """str: delimiter that annotation will read on files""" return self._delimiter @property def columns(self) -> List: """List: columns that will appear on parsed output files""" return self._columns @property def annotations(self) -> dict: """dict: annotation that will cover Annotation object""" return self._annotations @property def excludes(self) -> dict: """List: values that will be excluded after the parsing""" return self._excludes @property def structure(self) -> dict: """dict: general structure of Annotation schema""" structure_aux = {AnnotationGeneralKeys.ANNOTATION.name: self._annotations, AnnotationGeneralKeys.EXCLUDE.name: self._excludes} return {e: structure_aux for e in self._patterns}