Source code for openvariant.tasks.count

"""
Count  task
====================================
A core functionality to execute count task.
"""
from functools import partial
from multiprocessing import Pool
from os import cpu_count
from typing import Tuple, Union

from tqdm import tqdm

from openvariant.annotation.annotation import Annotation
from openvariant.find_files.find_files import findfiles

from openvariant.variant.variant import Variant


def _count_task(selection: [str, str], group_by: str, where: str, skip_files: bool) -> Tuple[int, Union[dict, None]]:
    """Main functionality for count task"""

    i = 0
    input_file, input_annotations = selection
    annotation = Annotation(input_annotations)
    result = Variant(input_file, annotation, skip_files)

    if group_by is None:
        for _ in result.read(where=where):
            i += 1
        return i, None
    else:
        groups = {}
        for r in result.read(where=where, group_key=group_by):
            try:
                val = r[group_by]
                val_count = groups.get(val, 0)
                groups[val] = val_count + 1
                i += 1
            except (ValueError, KeyError):
                pass
        return i, groups


[docs]def count(base_path: str, annotation_path: str or None, group_by: str = None, where: str = None, cores: int = cpu_count(), quite: bool = False, skip_files: bool = False) -> Tuple[int, Union[None, dict]]: """Print on the stdout the count result. It'll parse the input files with its proper annotation schema, and it'll show the count result on the stdout. Can be grouped by a field and can be added a 'where' expression. Parameters ---------- base_path : srt Base path of input files. annotation_path : str or None Path of annotation file. group_by : str Field to group the result. where : str Conditional statement. quite : bool Discard progress bar. cores : int Number of cores to parallelize the task. skip_files : bool Skip unreadable files and directories. Returns ---------- int The total number of rows. dict A schema with separate groups and the numbers of rows for each. """ selection = [] for k, a in findfiles(base_path, annotation_path): selection += [(k, a.path)] with Pool(cores) as pool: groups = {} task = partial(_count_task, group_by=group_by, where=where, skip_files=skip_files) map_method = pool.imap_unordered if len(selection) > 1 else map total = 0 for c, g in tqdm(map_method(task, selection), total=len(selection), desc="Counting variants".rjust(40), disable=(len(selection) < 2) or quite ): # Update groups if g is not None: for k, v in g.items(): val = groups.get(k, 0) groups[k] = val + v total += c return total, groups