"""
Variant
====================================
A core class to represent the files that will be parsed by an Annotation file.
"""
import csv
import ctypes
import gzip
import lzma
import warnings
from fnmatch import fnmatch
from functools import lru_cache
import mmap
from os.path import isdir, isfile
import re
from typing import Generator, List, Callable, Any
from openvariant.annotation.annotation import Annotation
from openvariant.annotation.builder import MappingBuilder
from openvariant.annotation.config_annotation import AnnotationFormat, AnnotationTypes, AnnotationDelimiter
from openvariant.utils.utils import check_extension, import_class_from_module
from openvariant.variant.where import skip, parse_where
def _open_file(file_path: str, mode='r+b'):
"""Open raw files or compressed files"""
if file_path.endswith('xz'):
open_method = lzma.open
file = open_method(file_path, mode)
mm = file
else:
open_method = open
file = open_method(file_path, mode)
mm: mmap = mmap.mmap(file.fileno(), length=0, access=mmap.ACCESS_READ)
return mm, file
def _base_parser(mm_obj: mmap, file_path: str, delimiter: str, skip_files: bool) -> Generator[int, str, None]:
"""Cleaning comments and irrelevant data"""
try:
if file_path.endswith('gz') or file_path.endswith('bgz'):
mm_obj = gzip.GzipFile(mode="r+b", fileobj=mm_obj)
except KeyError:
raise KeyError(f"'{delimiter}' key not found.")
try:
for l_num, line in enumerate(iter(mm_obj.readline, b'')):
line = line.decode('utf-8')
row_line = line.split(AnnotationDelimiter[delimiter].value)
row_line = list(map(lambda w: w.rstrip("\r\n"), row_line))
if len(row_line) == 0:
continue
# Skip comments
if (row_line[0].startswith('#') or row_line[0].startswith('##') or row_line[0].startswith('browser') or
row_line[0].startswith('track')) and not row_line[0].startswith('#CHROM'):
continue
yield l_num, row_line
except Exception as e:
if skip_files:
warnings.warn(f"Warning: unable to parse {file_path}: {e}.", UserWarning)
else:
raise e
def _exclude(line: dict, excludes: dict) -> bool:
"""Excludes values described on the annotation file"""
for k, v in excludes.items():
for val in v:
if val is not None and val.startswith("!"):
valp = val.replace("!", "")
if line[k] != valp:
return True
elif line[k] == val:
return True
return False
def _extract_header(file_path: str, original_header: list, annotation: Annotation):
"""Extract header to parse the entire file, and create a reference for each field"""
header_schema = {}
mapping_fields = []
for field, ann in annotation.annotations.items():
ann_type = ann[0]
if ann_type == AnnotationTypes.MAPPING.value:
mapping_fields.append((field, ann))
else:
class_name = ann_type
module_name = "openvariant.annotation.process"
ClassAnnotation = import_class_from_module(module_name, class_name)
instance = ClassAnnotation()
header_schema.update({field: instance(ann, original_header, file_path, header_schema)})
for field, ann in mapping_fields:
ann_type = ann[0]
class_name = ann_type
module_name = "openvariant.annotation.process"
ClassAnnotation = import_class_from_module(module_name, class_name)
instance = ClassAnnotation()
header_schema.update({field: instance(ann, original_header, file_path, header_schema)})
return header_schema, annotation.columns
@lru_cache(maxsize=256)
def _parse_field(value: float | int | str, func: Callable) -> str:
"""Getting the value of a specific annotation field. Cached with LRU policy"""
result = func(value)
return result if result is not None else str(float('nan'))
def _parse_plugin_field(row: dict, field_name: str, file_path: str, value: Any, func: Callable) -> str:
"""Getting the value of a specific plugin annotation. No cached"""
ctxt = value(row, field_name, file_path)
return func(ctxt)
def _parse_mapping_field(x: MappingBuilder, row: dict, func: Callable):
"""Getting the value of a specific mapping annotation. No cached"""
if x[1] is None:
raise ValueError(f'Wrong source fields on {x[0]} annotation')
value = None
for source in x[1]:
try:
map_key = row[source]
value = x[2].get(map_key, None)
except KeyError:
pass
return str(value) if value is not None else str(float('nan'))
def _check_extension(ext: str, path: str) -> bool:
"""Check if file matches with the annotation pattern"""
if ext[0] == '*':
match = fnmatch(path, ext)
else:
reg_apply = re.compile(ext + '$')
match = len(reg_apply.findall(path)) != 0
return match
[docs]class Variant:
"""A representation of parsed files
Methods
-------
read(group_key: str or None = None)
Read the parsed files with its proper annotation.
save(file_path: str, display_header: bool = True)
Save parsed files on specified location.
"""
def __init__(self, path: str, annotation: Annotation, skip_files: bool = False) -> None:
"""
Inits Variant with files path and Annotation object
Parameters
---------
path : str
A string path where files to parse are located (could be directory or a single file).
annotation : Annotation
Object to describe the schema of parsed files.
skip_files : bool
Skip unreadable files and directories.
"""
if path is None or path == '' or not isfile(path):
raise ValueError('Invalid path, must be a file')
if annotation is None:
raise ValueError('Invalid annotation')
csv.field_size_limit(int(ctypes.c_ulong(-1).value // 2))
self._path: str = path
self._annotation: Annotation = annotation
self._header: List[str] = list(annotation.annotations.keys()) if len(annotation.columns) == 0 \
else annotation.columns
self.skip_files = skip_files
def _unify(self, base_path: str, annotation: Annotation, group_by: str = None, display_header: bool = True) \
-> Generator[dict, None, None]:
"""Parse all the files thought the annotation schema and generated yields to iterate"""
for x in self._parser(base_path, annotation, group_by, display_header):
yield x
def _parser(self, file_path: str, annotation: Annotation, group_by: str, display_header: bool) \
-> Generator[dict, None, None]:
"""Parsing of an entire file with annotation schema"""
header, row, row_header = None, {}, []
matches = [check_extension(ext, file_path) for ext in annotation.patterns]
if not any(matches):
raise NameError("Annotation patterns don't match with input file.")
try:
self.mm, self.file = _open_file(file_path, "rb")
for lnum, line in _base_parser(self.mm, file_path, annotation.delimiter, self.skip_files):
try:
if header is None:
header, row_header = _extract_header(file_path, line, annotation)
if not display_header:
continue
row = row_header
yield row
else:
line_dict = {}
row, plugin_values, mapping_values = {}, {}, {}
for head in annotation.annotations.keys():
type_ann, value, func = header[head]
if type_ann == AnnotationTypes.PLUGIN.name:
plugin_values[head] = header[head]
elif type_ann == AnnotationTypes.MAPPING.name:
mapping_values[head] = header[head]
elif type_ann == AnnotationTypes.INTERNAL.name:
if len(value[0]) == 1:
pos = list(value[0].values())[0]
value = line[pos] if value is not None else None
elif len(value[0]) == 0:
value = line[pos] if value[1] is not None else None
else:
pos = {}
for val, position in value[0].items():
pos.update({val: line[position]})
value = value[1].format(**pos)
line_dict[head] = _parse_field(value, func)
else:
line_dict[head] = _parse_field(value, func)
for head, mapping in mapping_values.items():
_, builder_mapping, func = mapping
line_dict[head] = _parse_mapping_field(builder_mapping, line_dict, func)
for head, plug in plugin_values.items():
_, ctxt_plugin, func_plugin = plug
line_dict[head] = _parse_plugin_field(line_dict, head, file_path, ctxt_plugin, func_plugin)
for k in annotation.columns:
row[k] = line_dict[k].format(**line_dict)
if group_by is not None and group_by not in annotation.columns:
try:
row[group_by] = line_dict[group_by].format(**line_dict)
except KeyError as e:
raise KeyError(f"Unable to find group by: {e}. Check annotation for {file_path} file")
if row and not _exclude(line_dict, annotation.excludes):
yield row
except IndexError as e:
if line == ['']:
warnings.warn(f"Warning: empty line {lnum} on {file_path}.", UserWarning)
pass
else:
self.mm.close()
self.file.close()
raise ValueError(f"Error parsing line: {lnum} {file_path}: {e}")
except (ValueError, KeyError) as e:
self.mm.close()
self.file.close()
raise ValueError(f"Error parsing line: {lnum} {file_path}: {e}")
self.mm.close()
self.file.close()
except PermissionError:
if self.skip_files:
warnings.warn(f"Permission denied on {file_path}", UserWarning)
else:
raise PermissionError(f"Permission denied on {file_path}")
@property
def path(self) -> str:
"""str: Path where parsed files are located"""
return self._path
@property
def header(self) -> List[str]:
"""List[str]: Header of the corresponding parsed files"""
return self._header
@property
def annotation(self) -> Annotation:
"""Annotation: Annotation object which files were parsed"""
return self._annotation
[docs] def read(self, where: dict or str = None, group_key: str or None = None) -> Generator[dict, None, None]:
"""
Read parsed files and generated an iterator for each row
Parameters
---------
where : dict or str
A conditional where structure (optional).
group_key : str or None
A string that indicates how rows will be grouped (optional).
Yields
------
dict
Representation of a parsed row.
"""
where_clauses = parse_where(where)
for i, line in enumerate(self._unify(self._path, self._annotation, group_by=group_key)):
if i != 0:
if skip(line, where_clauses):
continue
yield line
[docs] def save(self, file_path: str, mode: str = 'w', display_header: bool = True) -> None:
"""
Save parsed files in an indicated location.
Parameters
---------
file_path : str or None
A string that indicates the location to store the output file.
mode : string
Two modes for writing (optional):
'w' the cursor starts at the begging of the file.
'a' the cursor starts at the end of the file.
display_header : bool
A bool that indicates if the output will have header or not (optional).
"""
if file_path is None or isdir(file_path):
raise ValueError("The path must be a file.")
with open(file_path, mode) as file:
writer = csv.writer(file, delimiter=AnnotationFormat[self._annotation.format.upper()].value)
for i, line in enumerate(self._unify(self._path, self._annotation)):
if display_header and i == 0:
writer.writerow(line)
elif i != 0:
writer.writerow(line.values())