Skip to content

API

helix.utility.unit_prefix.UnitPrefix

Convert a quantity into a more readable form using unit prefixes.

This class is expecially useful for very big or very small number, that are difficult to read or visualize.

Examples:

>>> UnitPrefix.convert_bytes(1024)
'1kB'
>>> UnitPrefix.convert(1000)
'1k'
>>> UnitPrefix.convert(0.01)
'10m'
Source code in helix\utility\unit_prefix.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class UnitPrefix:
    """Convert a quantity into a more readable form using unit prefixes.

    This class is expecially useful for very big or very small number, that
    are difficult to read or visualize.

    Examples:
        >>> UnitPrefix.convert_bytes(1024)
        '1kB'
        >>> UnitPrefix.convert(1000)
        '1k'
        >>> UnitPrefix.convert(0.01)
        '10m'
    """

    def convert_bytes(input: int, decimal=2) -> str:
        """Convert bytes using binary prefixes.

        Args:
            input (int): The number to convert.
            decimal (int, optional): Number of decimals to use. Defaults to 2.

        Raises:
            ValueError: The input number is a float
            ValueError: The input number is negative.

        Returns:
            str: Converted number.
        """
        if input is None:
            return None
        if isinstance(input, float):
            raise ValueError("Can't convert a fractional number of bytes")
        elif input < 0:
            raise ValueError("Can't convert a negative number of bytes")
        converted = None
        si_prefixes = OrderedDict(
            [
                (" B", (0, 10, 0)),
                (" kB", (10, 20, 10)),
                (" MB", (20, 30, 20)),
                (" GB", (30, 40, 30)),
                (" TB", (40, None, 40)),
            ]
        )
        if input == 0:
            return "0 B"

        input_log = log(input, 2)
        input_log = int(floor(input_log))

        converted = None
        for letter, interval in si_prefixes.items():
            min_interval = interval[0]
            max_interval = interval[1]
            in_interval = False
            if max_interval is None:
                in_interval = input_log >= min_interval
            elif min_interval is None:
                in_interval = input_log < max_interval
            else:
                in_interval = input_log >= min_interval and input_log < max_interval

            if in_interval:
                converted = f"{(input / 2**interval[2]):.{decimal}f}"
                # Not interested in any .0000 at the end
                converted = converted.rstrip("0").rstrip(".")
                # If converted == "0" will become ""
                if converted == "":
                    converted = "0"
                converted += letter
                break
        return converted

    def convert(input: int, decimal=2) -> str:
        """Convert a quantity using metric prefixes.

        Args:
            input (int): The number to convert.
            decimal (int, optional): Number of decimals to use. Defaults to 2.

        Returns:
            str: Converted number.
        """
        converted = None
        si_prefixes = OrderedDict(
            [
                ("p", (None, -9, -12)),
                ("n", (-9, -6, -9)),
                ("μ", (-6, -3, -6)),
                ("m", (-3, 0, -3)),
                ("", (0, 3, 0)),
                ("k", (3, 6, 3)),
                ("M", (6, 9, 6)),
                ("G", (9, 12, 9)),
                ("T", (12, 15, 12)),
                ("P", (15, None, 15)),
            ]
        )
        if input == 0:
            return "0"

        input_log = log10(input)
        input_log = int(floor(input_log))

        converted = None
        for letter, interval in si_prefixes.items():
            min_interval = interval[0]
            max_interval = interval[1]
            in_interval = False
            if max_interval is None:
                in_interval = input_log >= min_interval
            elif min_interval is None:
                in_interval = input_log < max_interval
            else:
                in_interval = input_log >= min_interval and input_log < max_interval

            if in_interval:
                converted = f"{(input / 10**interval[2]):.{decimal}f}"
                # Not interested in any .0000 at the end
                converted = converted.rstrip("0").rstrip(".")
                # If converted == "0" will become ""
                if converted == "":
                    converted = "0"
                converted += letter
                break
        return converted

convert(input, decimal=2)

Convert a quantity using metric prefixes.

Parameters:

Name Type Description Default
input int

The number to convert.

required
decimal int

Number of decimals to use. Defaults to 2.

2

Returns:

Name Type Description
str str

Converted number.

Source code in helix\utility\unit_prefix.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def convert(input: int, decimal=2) -> str:
    """Convert a quantity using metric prefixes.

    Args:
        input (int): The number to convert.
        decimal (int, optional): Number of decimals to use. Defaults to 2.

    Returns:
        str: Converted number.
    """
    converted = None
    si_prefixes = OrderedDict(
        [
            ("p", (None, -9, -12)),
            ("n", (-9, -6, -9)),
            ("μ", (-6, -3, -6)),
            ("m", (-3, 0, -3)),
            ("", (0, 3, 0)),
            ("k", (3, 6, 3)),
            ("M", (6, 9, 6)),
            ("G", (9, 12, 9)),
            ("T", (12, 15, 12)),
            ("P", (15, None, 15)),
        ]
    )
    if input == 0:
        return "0"

    input_log = log10(input)
    input_log = int(floor(input_log))

    converted = None
    for letter, interval in si_prefixes.items():
        min_interval = interval[0]
        max_interval = interval[1]
        in_interval = False
        if max_interval is None:
            in_interval = input_log >= min_interval
        elif min_interval is None:
            in_interval = input_log < max_interval
        else:
            in_interval = input_log >= min_interval and input_log < max_interval

        if in_interval:
            converted = f"{(input / 10**interval[2]):.{decimal}f}"
            # Not interested in any .0000 at the end
            converted = converted.rstrip("0").rstrip(".")
            # If converted == "0" will become ""
            if converted == "":
                converted = "0"
            converted += letter
            break
    return converted

convert_bytes(input, decimal=2)

Convert bytes using binary prefixes.

Parameters:

Name Type Description Default
input int

The number to convert.

required
decimal int

Number of decimals to use. Defaults to 2.

2

Raises:

Type Description
ValueError

The input number is a float

ValueError

The input number is negative.

Returns:

Name Type Description
str str

Converted number.

Source code in helix\utility\unit_prefix.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def convert_bytes(input: int, decimal=2) -> str:
    """Convert bytes using binary prefixes.

    Args:
        input (int): The number to convert.
        decimal (int, optional): Number of decimals to use. Defaults to 2.

    Raises:
        ValueError: The input number is a float
        ValueError: The input number is negative.

    Returns:
        str: Converted number.
    """
    if input is None:
        return None
    if isinstance(input, float):
        raise ValueError("Can't convert a fractional number of bytes")
    elif input < 0:
        raise ValueError("Can't convert a negative number of bytes")
    converted = None
    si_prefixes = OrderedDict(
        [
            (" B", (0, 10, 0)),
            (" kB", (10, 20, 10)),
            (" MB", (20, 30, 20)),
            (" GB", (30, 40, 30)),
            (" TB", (40, None, 40)),
        ]
    )
    if input == 0:
        return "0 B"

    input_log = log(input, 2)
    input_log = int(floor(input_log))

    converted = None
    for letter, interval in si_prefixes.items():
        min_interval = interval[0]
        max_interval = interval[1]
        in_interval = False
        if max_interval is None:
            in_interval = input_log >= min_interval
        elif min_interval is None:
            in_interval = input_log < max_interval
        else:
            in_interval = input_log >= min_interval and input_log < max_interval

        if in_interval:
            converted = f"{(input / 2**interval[2]):.{decimal}f}"
            # Not interested in any .0000 at the end
            converted = converted.rstrip("0").rstrip(".")
            # If converted == "0" will become ""
            if converted == "":
                converted = "0"
            converted += letter
            break
    return converted

helix.alignment_map.variant_caller.VariantCaller

Bases: SimpleWorker

This class is responsible for calling variants for a given alignment-map file.

Parameters:

Name Type Description Default
input AlignmentMapFile

File that contains aligned reads.

required
calling_type VariantCallingType

Type of variant calling to perform. Defaults to VariantCallingType.Both.

Both
repo_config RepositoryConfig

Configuration for reference genome repository. Defaults to MANAGER_CFG.REPOSITORY.

REPOSITORY
ext_config ExternalConfig

Configuration for external tools. Defaults to MANAGER_CFG.EXTERNAL.

EXTERNAL
external External

Object that contains functions to call external tools. Defaults to External().

External()
progress Callable[[str, int]

Function that accept a status message and a percentage, for progress tracking. Defaults to None.

None
logger Logger

Logger object. Defaults to logging.getLogger(name).

getLogger(__name__)

Raises:

Type Description
RuntimeError

If the index stats inside input are not available.

RuntimeError

If the reference genome for input is not available.

Source code in helix\alignment_map\variant_caller.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class VariantCaller(SimpleWorker):
    """This class is responsible for calling variants for a given alignment-map file.

    Args:
        input (AlignmentMapFile):
            File that contains aligned reads.
        calling_type (VariantCallingType, optional):
            Type of variant calling to perform. Defaults to VariantCallingType.Both.
        repo_config (RepositoryConfig, optional):
            Configuration for reference genome repository. Defaults to
            MANAGER_CFG.REPOSITORY.
        ext_config (ExternalConfig, optional):
            Configuration for external tools. Defaults to MANAGER_CFG.EXTERNAL.
        external (External, optional):
            Object that contains functions to call external tools.
            Defaults to External().
        progress (Callable[[str, int], optional):
            Function that accept a status message and a percentage,
            for progress tracking. Defaults to None.
        logger (Logger, optional):
            Logger object. Defaults to logging.getLogger(__name__).

    Raises:
        RuntimeError:
            If the index stats inside `input` are not available.
        RuntimeError:
            If the reference genome for `input` is not available.
    """

    def __init__(
        self,
        input: AlignmentMapFile,
        calling_type: VariantCallingType = VariantCallingType.Both,
        repo_config: RepositoryConfig = MANAGER_CFG.REPOSITORY,
        ext_config: ExternalConfig = MANAGER_CFG.EXTERNAL,
        external: External = External(),
        progress: Callable[[str, int], None] = None,
        logger: logging.Logger = logging.getLogger(__name__),
    ) -> None:

        super().__init__(None)
        self._external = external
        self._ext_config = ext_config
        self._ploidy = str(repo_config.metadata.joinpath("ploidy.txt"))
        self._is_quitting = False
        self._quitting_ack = False
        self._current_operation = None
        self._progress = progress
        self._logger = logger
        self._input_file = input
        self._calling_type = calling_type

        if self._input_file.file_info.index_stats is None:
            raise RuntimeError("Index stats cannot be None for variant calling")
        if self._input_file.file_info.reference_genome.ready_reference is None:
            raise RuntimeError("Reference cannot be None for variant calling")

    def call(self):
        """Does exactly what you think it does."""
        return self.run()

    def run(self):
        self.current_file = self._input_file
        reference = str(
            self._input_file.file_info.reference_genome.ready_reference.fasta
        )
        self._logger.info(
            f"Calling variants with {self._input_file.file_info.reference_genome.ready_reference}"
        )
        input_file = self._input_file.path

        skip_variant_opt = ""
        if self._calling_type == VariantCallingType.InDel:
            skip_variant_opt = "-V snps"
        elif self._calling_type == VariantCallingType.SNP:
            skip_variant_opt = "-V indels"

        output_file = self._input_file.path.with_name(
            f"{self._input_file.path.stem}_{self._calling_type.name}.vcf.gz"
        )

        pileup_opt = (
            f"mpileup -B -I -C 50 --threads {self._ext_config.threads}"
            f'-f "{reference}" -Ou "{input_file}"'
        )
        call_opt = (
            f'call --ploidy-file "{self._ploidy}" {skip_variant_opt} -mv'
            f'-P 0 --threads {self._ext_config.threads} -Oz -o "{output_file}"'
        )
        tabix_opt = f'-p vcf "{output_file}"'

        pileup_opt = shlex.split(pileup_opt)
        call_opt = shlex.split(call_opt)
        tabix_opt = shlex.split(tabix_opt)

        progress = None
        if self._progress is not None:
            mapped_sequences = [
                x.mapped for x in self.current_file.file_info.index_stats
            ]
            mapped_sequences = sum(mapped_sequences)

            # 112 bytes seems to be the average bytes written for a single base
            total_bytes = (
                mapped_sequences
                * self.current_file.file_info.alignment_stats.average_length
                * 112
            )

            progress = BaseProgressCalculator(
                self._progress, total_bytes, "[1/2] Calling"
            )
            progress = progress.compute_on_write_bytes

        self._current_operation = self._external.bcftools(
            pileup_opt,
            stdout=subprocess.PIPE,
            io=progress,
        )

        call = self._external.bcftools(call_opt, stdin=self._current_operation.stdout)
        call.communicate()
        if self._is_quitting:
            self._quitting_ack = True
            return

        progress = BaseProgressCalculator(
            self._progress, output_file.stat().st_size, "[2/2] Indexing"
        )
        self._current_operation = self._external.tabix(
            tabix_opt, io=progress.compute_on_read_bytes
        )
        self._current_operation.wait()
        self._quitting_ack = True

    def kill(self):
        """Kill a variant calling operation."""
        self._is_quitting = True
        try:
            # The while is just to prevent a (unlikely IMO but who knows) situation
            # where _is_quitting is seen as False by the other thread after Popen.kill()
            # has been called.
            for _ in range(10):
                operation = self._current_operation
                if self._quitting_ack:
                    break
                if operation is not None:
                    operation.kill()
                self._current_operation = None
                if self._quitting_ack:
                    break
                sleep(0.5)
            if not self._quitting_ack:
                raise RuntimeError(
                    f"Failed 10 attempts to kill {self._current_operation}."
                )
        except Exception as e:
            self._logger.error(f"Error while killing variant calling: {e!s}")

call()

Does exactly what you think it does.

Source code in helix\alignment_map\variant_caller.py
78
79
80
def call(self):
    """Does exactly what you think it does."""
    return self.run()

kill()

Kill a variant calling operation.

Source code in helix\alignment_map\variant_caller.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def kill(self):
    """Kill a variant calling operation."""
    self._is_quitting = True
    try:
        # The while is just to prevent a (unlikely IMO but who knows) situation
        # where _is_quitting is seen as False by the other thread after Popen.kill()
        # has been called.
        for _ in range(10):
            operation = self._current_operation
            if self._quitting_ack:
                break
            if operation is not None:
                operation.kill()
            self._current_operation = None
            if self._quitting_ack:
                break
            sleep(0.5)
        if not self._quitting_ack:
            raise RuntimeError(
                f"Failed 10 attempts to kill {self._current_operation}."
            )
    except Exception as e:
        self._logger.error(f"Error while killing variant calling: {e!s}")

helix.files.file_type_checker.FileTypeChecker

Gets the type of a file trying to not relying on its extension (if possible)

This class uses the utility htsfile to get the type of a file. If it fails to get the type of a file using htsfile, it will try to guess its type based on its extension.

Examples:

>>> input = Path("file.fasta")
>>> checker = FileTypeChecker()
>>> assert checker.get_type() == FileType.DECOMPRESSED
>>> input = input.rename("file")
>>> assert checker.get_ext() == ".fa"
Source code in helix\files\file_type_checker.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class FileTypeChecker:
    """Gets the type of a file trying to not relying on its extension (if possible)

    This class uses the utility `htsfile` to get the type of a file. If it fails to
    get the type of a file using `htsfile`, it will try to guess its type based on its
    extension.

    Examples:
        >>> input = Path("file.fasta")
        >>> checker = FileTypeChecker()
        >>> assert checker.get_type() == FileType.DECOMPRESSED
        >>> input = input.rename("file")
        >>> assert checker.get_ext() == ".fa"
    """

    _EXT_TO_TYPE = {
        ".7z": FileType.SEVENZIP,
        ".zip": FileType.ZIP,
        ".bz2": FileType.BZIP,
        ".bz": FileType.BZIP,
        ".gz": FileType.RAZF_GZIP,
        ".fa": FileType.DECOMPRESSED,
        ".fasta": FileType.DECOMPRESSED,
        ".fna": FileType.DECOMPRESSED,
    }

    _TYPE_TO_EXT = {
        FileType.SEVENZIP: ".7z",
        FileType.ZIP: ".zip",
        FileType.BZIP: ".bz2",
        FileType.RAZF_GZIP: ".fa.gz",
        FileType.GZIP: ".fa.gz",
        FileType.DECOMPRESSED: ".fa",
    }

    _HTSFILE_TO_TYPE = {
        "BGZF": FileType.BGZIP,
        "gzip": FileType.RAZF_GZIP,
        "RAZF": FileType.RAZF_GZIP,
        "FASTA": FileType.DECOMPRESSED,
    }

    def __init__(self, external: External = External()) -> None:
        self._external = external

    def get_type(self, file: Path) -> FileType:
        """Get a Type starting from a file path.

        Args:
            file (Path): Path of the file to analyze.

        Returns:
            Type | None: Type or None if the type is unknown.
        """
        # Extensions can be wrong or misleading; Use htsfile and
        # eventually fallback on extension.

        file_type = self._external.htsfile([str(file)], wait=True, text=True)
        for key, value in FileTypeChecker._HTSFILE_TO_TYPE.items():
            if key in file_type:
                return value

        # TODO: starting from .gz in _EXT_TO_TYPE, htsfile should really
        # be able to give something useful. If it isn't the case, it could
        # mean the file is corrupted. Unsure what's the best logic here.
        extension = file.suffix
        if extension in FileTypeChecker._EXT_TO_TYPE:
            return FileTypeChecker._EXT_TO_TYPE[extension]
        return None

    def get_extension(self, file: Path) -> str:
        """Get an extension based on the content of the file.

        NOTE: if the input file is gzipped/bgzipped, this function
        will assume the extension is .fa.gz

        Args:
            file (Path): Input file

        Returns:
            str: Extension
        """
        type = self.get_type(file)
        matches = [x for x in FileTypeChecker._EXT_TO_TYPE.items() if x[1] == type]
        if len(matches) > 0:
            return matches[0][0]
        return None

get_extension(file)

Get an extension based on the content of the file.

NOTE: if the input file is gzipped/bgzipped, this function will assume the extension is .fa.gz

Parameters:

Name Type Description Default
file Path

Input file

required

Returns:

Name Type Description
str str

Extension

Source code in helix\files\file_type_checker.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_extension(self, file: Path) -> str:
    """Get an extension based on the content of the file.

    NOTE: if the input file is gzipped/bgzipped, this function
    will assume the extension is .fa.gz

    Args:
        file (Path): Input file

    Returns:
        str: Extension
    """
    type = self.get_type(file)
    matches = [x for x in FileTypeChecker._EXT_TO_TYPE.items() if x[1] == type]
    if len(matches) > 0:
        return matches[0][0]
    return None

get_type(file)

Get a Type starting from a file path.

Parameters:

Name Type Description Default
file Path

Path of the file to analyze.

required

Returns:

Type Description
FileType

Type | None: Type or None if the type is unknown.

Source code in helix\files\file_type_checker.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def get_type(self, file: Path) -> FileType:
    """Get a Type starting from a file path.

    Args:
        file (Path): Path of the file to analyze.

    Returns:
        Type | None: Type or None if the type is unknown.
    """
    # Extensions can be wrong or misleading; Use htsfile and
    # eventually fallback on extension.

    file_type = self._external.htsfile([str(file)], wait=True, text=True)
    for key, value in FileTypeChecker._HTSFILE_TO_TYPE.items():
        if key in file_type:
            return value

    # TODO: starting from .gz in _EXT_TO_TYPE, htsfile should really
    # be able to give something useful. If it isn't the case, it could
    # mean the file is corrupted. Unsure what's the best logic here.
    extension = file.suffix
    if extension in FileTypeChecker._EXT_TO_TYPE:
        return FileTypeChecker._EXT_TO_TYPE[extension]
    return None

helix.naming.converter.Converter

Source code in helix\naming\converter.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class Converter:
    def canonicalize(sequence_name: str) -> str:
        """Convert a sequence name into a "canonical" form,
            which is essentially the Number format:

            - Digits only for autosome
            - X/Y for sexual
            - M for mitochondrial
            - Other sequences are not touched

        Args:
            sequence_name (str): Sequence to convert

        Returns:
            str: Converted name sequence.
        """
        return Converter.convert(sequence_name, ChromosomeNameType.Number)

    def get_type(sequence_name: str) -> SequenceType:
        canonical_name = Converter.canonicalize(sequence_name)
        if canonical_name.isnumeric():
            return SequenceType.Autosome
        elif canonical_name == "X":
            return SequenceType.X
        elif canonical_name == "Y":
            return SequenceType.Y
        elif canonical_name == "M":
            return SequenceType.Mitochondrial
        elif canonical_name == "*":
            return SequenceType.Unmapped
        return SequenceType.Other

    def sort(sequence_names: Iterable[str], others: bool = True) -> list[str]:
        name_type_map = {x: [] for x in SequenceType}
        for name in sequence_names:
            name_type_map[Converter.get_type(name)].append(name)

        ordered = [x for x in name_type_map[SequenceType.Autosome]]
        ordered.sort(key=lambda x: int(Converter.canonicalize(x)))
        ordered.extend(name_type_map[SequenceType.X])
        ordered.extend(name_type_map[SequenceType.Y])
        ordered.extend(name_type_map[SequenceType.Mitochondrial])
        if others:
            ordered.extend(name_type_map[SequenceType.Other])
            ordered.extend(name_type_map[SequenceType.Unmapped])
        return ordered

    def _find_in_table(input, table):
        normalized = input
        for key in table:
            if input.startswith(key):
                normalized = table[key]
                break
        return normalized

    def convert(input: str, target: ChromosomeNameType) -> str:
        # Chr to Number (e.g., chr1 -> 1; chrMT->MT)
        normalized = input.upper()
        if normalized.startswith("CHR"):
            normalized = normalized.replace("CHR", "", 1)

        # MT->M. This accounts also for the translation chrMT -> M,
        # since it's executed after the previous if.
        if normalized.startswith("MT"):
            normalized = normalized.replace("MT", "M", 1)

        #  Accession to Number (e.g., NC_000001 -> 1)
        if input.startswith("NC_"):
            normalized = Converter._find_in_table(normalized, REFSEQ_TO_NUMBER)
            normalized = Converter._find_in_table(normalized, REFSEQ_T2T_TO_NUMBER)
        elif input.startswith("CM") or input.startswith("J"):
            normalized = Converter._find_in_table(normalized, GENBANK_TO_NUMBER)
        elif input.startswith("CP"):
            normalized = Converter._find_in_table(normalized, GENBANK_T2T_TO_NUMBER)

        # Convert from Number format to target
        normalized = normalized.upper()
        if target == ChromosomeNameType.Chr:
            if normalized.isnumeric() or normalized in ["M", "X", "Y"]:
                return "chr" + normalized.upper()
            return normalized
        elif target == ChromosomeNameType.Number:
            return normalized
        elif target == ChromosomeNameType.GenBank:
            return NUMBER_TO_GENBANK.get(normalized, None)
        elif target == ChromosomeNameType.RefSeq:
            return NUMBER_TO_REFSEQ.get(normalized, None)
        elif target == ChromosomeNameType.RefSeqT2T:
            return NUMBER_TO_REFSEQ_T2T.get(normalized, None)
        elif target == ChromosomeNameType.GenBankT2T:
            return NUMBER_TO_GENBANK_T2T.get(normalized, None)
        raise ValueError(f"Converting to unrecognized target format: {target.name}")

canonicalize(sequence_name)

Convert a sequence name into a "canonical" form, which is essentially the Number format:

- Digits only for autosome
- X/Y for sexual
- M for mitochondrial
- Other sequences are not touched

Parameters:

Name Type Description Default
sequence_name str

Sequence to convert

required

Returns:

Name Type Description
str str

Converted name sequence.

Source code in helix\naming\converter.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def canonicalize(sequence_name: str) -> str:
    """Convert a sequence name into a "canonical" form,
        which is essentially the Number format:

        - Digits only for autosome
        - X/Y for sexual
        - M for mitochondrial
        - Other sequences are not touched

    Args:
        sequence_name (str): Sequence to convert

    Returns:
        str: Converted name sequence.
    """
    return Converter.convert(sequence_name, ChromosomeNameType.Number)

helix.reference.reference.Reference

This class represent a reference genome.

Source code in helix\reference\reference.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class Reference:
    """This class represent a reference genome."""

    def __init__(
        self,
        reference_map: OrderedDict[Sequence, list[Sequence]],
        logger=logging.getLogger(__name__),
    ):
        self._logger = logger
        self.reference_map = reference_map
        self._genome_map = self._index_by_genome()
        self.matching: list[Genome] = self._get_matching_genomes()
        sorted = list([x for x in self._genome_map.keys() if x is not None])
        sorted.sort(
            key=lambda x: len(
                [y for y in self._genome_map[x].values() if y is not None]
            )
        )
        self.build = []
        if len(self.matching) > 0:
            self.build = set(x.build for x in self.matching)
        elif len(sorted) > 0:
            self.build = [f"Likely {sorted[-1].build}"]

        if len(self.build) > 1:
            self._logger.warn(
                "Found more than one valid builds for the reference of the file. "
                + "This is likely an issue with the reference genome metadata. "
                + "Please open a bug report."
            )
        self.build = " ".join(self.build)
        self.ready_reference = None
        for match in self.matching:
            if match.fasta.exists():
                self.ready_reference = match
                break
        if self.ready_reference is not None:
            self.status = ReferenceStatus.Available
        elif len(self.matching) > 0:
            self.status = ReferenceStatus.Downloadable
        # There's a possibility to build it only if every sequence has a
        # match in the metadata.
        elif all([True if len(x) > 0 else False for x in self.reference_map.values()]):
            self.status = ReferenceStatus.Buildable
        else:
            self.status = ReferenceStatus.Unknown

    def _md5_available(self):
        """Return True if MD5 is available for every input sequence.

        Returns:
            bool: True if MD5 available.
        """
        return all([x.md5 is not None for x in self.reference_map.keys()])

    def _index_by_genome(self) -> dict[Genome, dict[Sequence, Sequence]]:
        """For every genome that has at least one match in the input sequence,
        return a map between its sequences and input sequences.

        Returns:
            dict[Genome, dict[Sequence, Sequence]]: Map Genome <-> Sequences.
        """
        parents: set[Genome] = set(
            x.parent for y in self.reference_map.values() for x in y
        )
        genome_map = {x: {y: None for y in x.sequences} for x in parents}
        for parent in parents:
            genome_map[parent][None] = []
            for sequence in parent.sequences:
                for query_sequence, match_sequences in self.reference_map.items():
                    if sequence in match_sequences:
                        genome_map[parent][sequence] = query_sequence
                        break
            for query_sequence in self.reference_map.keys():
                if query_sequence not in genome_map[parent].values():
                    genome_map[parent][None].append(query_sequence)
        return genome_map

    def _get_matching_genomes(self):
        """Return a list of genomes that are guaranteed to match the input sequence
        by name, length or MD5 (if available in the input sequence).

        Returns:
            list[Genome]: list of matching genomes.
        """
        match_list = []
        # None is a special key for all the sequences
        # that have 0 matches among all the known references.
        # If None is in _genome_map means we didn't find a
        # reference.
        for genome, matches in self._genome_map.items():
            if len(matches[None]) > 0:
                continue
            matching = True
            for genome_sequence, query_sequence in matches.items():
                if genome_sequence is None:
                    continue
                if query_sequence is None:
                    matching = False
                    break
                if genome_sequence.name != query_sequence.name:
                    matching = False
                    break
                if query_sequence.md5 is not None:
                    if genome_sequence.md5 != query_sequence.md5:
                        matching = False
                        break
                if genome_sequence.length != query_sequence.length:
                    matching = False
                    break
            if matching:
                self._logger.info(f"{genome!s} is a perfect match.")
                match_list.append(genome)
        return match_list

    def __str__(self) -> str:
        return f"{self.build} ({self.status.name})"

    def __repr__(self) -> str:
        return self.__str__()

helix.reference.repository.Repository

Manages the reference genomes and their metadata.

Examples:

>>> repository = Repository()
>>> reference = repository.find(file.file_info.sequences)
Source code in helix\reference\repository.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class Repository:
    """
    Manages the reference genomes and their metadata.

    Examples:
        >>> repository = Repository()
        >>> reference = repository.find(file.file_info.sequences)
    """

    def __init__(
        self,
        loader: MetadataLoader = MetadataLoader(),
        downloader: Downloader = Downloader(),
        mtdna: MtDNA = MtDNA(),
        config: RepositoryConfig = RepositoryConfig(),
    ) -> None:
        self._loader = loader
        self._downloader = downloader
        self._mtdna = mtdna
        self.genomes = self._loader.load()
        # Index individual sequences by their MD5 and length
        self._sequences_by_md5: dict[str, list[Sequence]] = self._group_sequences(
            lambda s: s.md5, self.genomes
        )
        self._sequences_by_length: dict[int, list[Sequence]] = self._group_sequences(
            lambda s: s.length, self.genomes
        )
        self._config = config

    def analyze_references(self) -> None:
        # Analyzing sequence by sequence
        different_md5_same_length = []
        for length, seqs in self._sequences_by_length.items():
            unique_seq_set = set()
            name = None
            for seq in seqs:
                unique_seq_set.add(seq.md5)
                name = seq.name
            if len(unique_seq_set) > 1 and length > 57227414:
                different_md5_same_length.append(length)
                logging.debug(
                    f"There are {len(unique_seq_set)} different MD5 for length {length} ({name})."
                )

        # Analyzing the whole genome, looking for identical length sequences
        # with different MD5 sequences
        different_md5s_same_lengths = {}
        for genome in self.genomes:
            if genome.sequences is None:
                continue
            seq_lengths = ",".join([str(x.length) for x in genome.sequences])
            seq_md5 = ",".join([x.md5 for x in genome.sequences])
            if seq_lengths not in different_md5s_same_lengths:
                different_md5s_same_lengths[seq_lengths] = []
            if seq_md5 not in [x[0] for x in different_md5s_same_lengths[seq_lengths]]:
                different_md5s_same_lengths[seq_lengths].append((seq_md5, genome))

        ambiguous = len(
            [x for x, y in different_md5s_same_lengths.items() if len(y) > 1]
        )
        logging.debug(f"Found {ambiguous} possible ambiguous sequences of lengths:")
        index = 0
        for length, md5s in different_md5s_same_lengths.items():
            if len(md5s) > 1:
                for md5, genome in md5s:
                    logging.debug(f"{index+1}) {genome.fasta_url!s}")
                index += 1

    def find(self, sequences: list[Sequence]):
        md5_available = all([x.md5 is not None for x in sequences])
        matching = OrderedDict([(x, []) for x in sequences])
        if md5_available:
            for sequence in sequences:
                if sequence.md5 in self._sequences_by_md5:
                    matching[sequence] = self._sequences_by_md5[sequence.md5]
        else:
            for sequence in sequences:
                if sequence.length in self._sequences_by_length:
                    matching[sequence] = self._sequences_by_length[sequence.length]
        return Reference(matching)

    def _group_sequences(self, criteria, genomes: list[Genome]):
        grouped = {}
        for genome in genomes:
            if genome.sequences is None:
                continue
            for sequence in genome.sequences:
                evaluated_criteria = criteria(sequence)
                if evaluated_criteria not in grouped:
                    grouped[evaluated_criteria] = []
                grouped[evaluated_criteria].append(sequence)
        return grouped

    def _get_sizes(self, genome: Genome):
        if genome.fai_url is None:
            fai_url = genome.fasta_url + ".fai"
            fai_size = self._downloader.get_file_size(fai_url)
            if fai_size is not None:
                genome.fai_url = fai_url

        if genome.gzi_url is None:
            gzi_url = genome.fasta_url + ".gzi"
            gzi_size = self._downloader.get_file_size(gzi_url)
            if gzi_size is not None:
                genome.gzi_url = gzi_url

        if genome.download_size is None:
            size = self._downloader.get_file_size(genome.fasta_url)
            if size is not None:
                genome.download_size = size
            else:
                raise RuntimeError(
                    f"Unable to get the size of the fasta file for {genome.fasta_url}"
                )

    def _get_sequences(self, genome: Genome):
        dictionary: AlignmentMapHeader = AlignmentMapHeader.load_from_file(genome.dict)
        sequences = []
        for name, entry in dictionary.sequences.items():
            sequence = Sequence(name, entry.length, entry.md5)
            sequences.append(sequence)
        return sequences

    def _create_companion_files(
        self, genome: Genome, force=False, progress=None, action: str = None
    ):
        samtools = Samtools()
        bgzip = BGzip()
        logging.info(f"{genome}: Starting post-download tasks.")
        if not genome.gzi.exists():
            logging.info(f"{genome}: Generating bgzip index.")
            bgzip.bgzip_wrapper(genome.fasta, genome.gzi, BgzipAction.Reindex)
        else:
            logging.info(f"{genome}: bgzip index exists.")

        if not genome.dict.exists():
            logging.info(f"{genome}: Generating dictionary file.")
            samtools.make_dictionary(genome.fasta, genome.dict)
        else:
            logging.info(f"{genome}: Dictionary file exists.")

    def self_test(self):
        for index, reference in enumerate(self.genomes):
            if reference.sequences is None:
                try:
                    logging.info(f"Processing {reference} as it has no sequences.")
                    genome = self.ingest(
                        reference.fasta_url, reference.source, reference.build
                    )
                    self.genomes[index] = genome
                    self._loader.save(self.genomes)
                except Exception as e:
                    logging.critical(e)

    def acquire(
        self, genome: Genome, progress: Callable[[str, int], None] = None, force=False
    ) -> Genome:
        """
        Download and convert to BGZip format (if necessary) a reference genome. Create
        additionals files that are needed by DoubleHelix.

        Args:
            genome (Genome): Genome to acquire
            progress (Callable[[str, int], None]): Callback function that takes two
                arguments: a string indicating the progress message and an integer
                indicating the progress percentage.
            force (bool): Determines whether to overwrite existing files.

        Returns:
            Genome: The reference genome.
        """
        downloader = Downloader()
        decompressor = Decompressor()
        compressor = BGzip()

        if genome.fasta.exists() and not force:
            logging.info(f"File {genome.fasta.name} already exist. Re-using it.")
            self._create_companion_files(genome, force)
            return genome
        elif genome.fasta.exists() and force:
            genome.fasta.unlink()
            genome.bgzip_md5 = None
            genome.downloaded_md5 = None
            genome.decompressed_md5 = None
            genome.bgzip_size = None
            genome.download_size = None
            genome.decompressed_size = None

        logging.info(f"Start Downloading from: {genome.fasta_url}.")
        download_output = downloader.perform(
            genome, progress, f"[1/4] Downloading from: {genome.fasta_url}"
        )
        logging.info(f"Start Decompressing: {download_output.name}.")
        decompressor_output = decompressor.perform(
            genome,
            progress,
            download_output,
            f"[2/4] Decompressing: {download_output.name}",
        )
        logging.info(f"Start compressing to BGZip: {decompressor_output.name}.")
        compressor_output = compressor.perform(
            genome,
            progress,
            decompressor_output,
            f"[3/4] Compressing to BGZip: {decompressor_output.name}",
        )
        logging.info(f"Creating companion files: {compressor_output.name}.")
        self._create_companion_files(
            genome,
            force,
            progress,
            f"[4/4] Creating companion files: {compressor_output.name}",
        )
        return genome

    def ingest(self, url, source, build, force=False):
        """Add a genome to the repository.

        Build a new Genome object with the input parameters and call acquire() on it

        Args:
            url (str): Url for the FASTA file.
            source (str): Source of the Genome (i.e., "Ensembl")
            build (str): Build of the Genome (i.e., "38").
            force (bool, optional): True to force the ingestion even if the files exist.
                Defaults to False.

        Returns:
            Genome: Genome after ingestion

        Examples:
            >>> manager = RepositoryManager()
            >>> manager.genomes.append(
            >>>     manager.ingest(
            >>>         "https://source/reference.fa",
            >>>         "NIH", # Should match an entry in sources.json
            >>>         "38",  # Only 38 or 37
            >>>     )
            >>> )
            >>> GenomeMetadataLoader().save(manager.genomes)
        """
        genome = Genome(url, source=source, build=build)
        genome.parent_folder = self._config.genomes
        logging.info(f"Ingesting {genome}.")
        genome = self.acquire(genome, force)
        genome.sequences = self._get_sequences(genome)
        return genome

acquire(genome, progress=None, force=False)

Download and convert to BGZip format (if necessary) a reference genome. Create additionals files that are needed by DoubleHelix.

Parameters:

Name Type Description Default
genome Genome

Genome to acquire

required
progress Callable[[str, int], None]

Callback function that takes two arguments: a string indicating the progress message and an integer indicating the progress percentage.

None
force bool

Determines whether to overwrite existing files.

False

Returns:

Name Type Description
Genome Genome

The reference genome.

Source code in helix\reference\repository.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def acquire(
    self, genome: Genome, progress: Callable[[str, int], None] = None, force=False
) -> Genome:
    """
    Download and convert to BGZip format (if necessary) a reference genome. Create
    additionals files that are needed by DoubleHelix.

    Args:
        genome (Genome): Genome to acquire
        progress (Callable[[str, int], None]): Callback function that takes two
            arguments: a string indicating the progress message and an integer
            indicating the progress percentage.
        force (bool): Determines whether to overwrite existing files.

    Returns:
        Genome: The reference genome.
    """
    downloader = Downloader()
    decompressor = Decompressor()
    compressor = BGzip()

    if genome.fasta.exists() and not force:
        logging.info(f"File {genome.fasta.name} already exist. Re-using it.")
        self._create_companion_files(genome, force)
        return genome
    elif genome.fasta.exists() and force:
        genome.fasta.unlink()
        genome.bgzip_md5 = None
        genome.downloaded_md5 = None
        genome.decompressed_md5 = None
        genome.bgzip_size = None
        genome.download_size = None
        genome.decompressed_size = None

    logging.info(f"Start Downloading from: {genome.fasta_url}.")
    download_output = downloader.perform(
        genome, progress, f"[1/4] Downloading from: {genome.fasta_url}"
    )
    logging.info(f"Start Decompressing: {download_output.name}.")
    decompressor_output = decompressor.perform(
        genome,
        progress,
        download_output,
        f"[2/4] Decompressing: {download_output.name}",
    )
    logging.info(f"Start compressing to BGZip: {decompressor_output.name}.")
    compressor_output = compressor.perform(
        genome,
        progress,
        decompressor_output,
        f"[3/4] Compressing to BGZip: {decompressor_output.name}",
    )
    logging.info(f"Creating companion files: {compressor_output.name}.")
    self._create_companion_files(
        genome,
        force,
        progress,
        f"[4/4] Creating companion files: {compressor_output.name}",
    )
    return genome

ingest(url, source, build, force=False)

Add a genome to the repository.

Build a new Genome object with the input parameters and call acquire() on it

Parameters:

Name Type Description Default
url str

Url for the FASTA file.

required
source str

Source of the Genome (i.e., "Ensembl")

required
build str

Build of the Genome (i.e., "38").

required
force bool

True to force the ingestion even if the files exist. Defaults to False.

False

Returns:

Name Type Description
Genome

Genome after ingestion

Examples:

>>> manager = RepositoryManager()
>>> manager.genomes.append(
>>>     manager.ingest(
>>>         "https://source/reference.fa",
>>>         "NIH", # Should match an entry in sources.json
>>>         "38",  # Only 38 or 37
>>>     )
>>> )
>>> GenomeMetadataLoader().save(manager.genomes)
Source code in helix\reference\repository.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def ingest(self, url, source, build, force=False):
    """Add a genome to the repository.

    Build a new Genome object with the input parameters and call acquire() on it

    Args:
        url (str): Url for the FASTA file.
        source (str): Source of the Genome (i.e., "Ensembl")
        build (str): Build of the Genome (i.e., "38").
        force (bool, optional): True to force the ingestion even if the files exist.
            Defaults to False.

    Returns:
        Genome: Genome after ingestion

    Examples:
        >>> manager = RepositoryManager()
        >>> manager.genomes.append(
        >>>     manager.ingest(
        >>>         "https://source/reference.fa",
        >>>         "NIH", # Should match an entry in sources.json
        >>>         "38",  # Only 38 or 37
        >>>     )
        >>> )
        >>> GenomeMetadataLoader().save(manager.genomes)
    """
    genome = Genome(url, source=source, build=build)
    genome.parent_folder = self._config.genomes
    logging.info(f"Ingesting {genome}.")
    genome = self.acquire(genome, force)
    genome.sequences = self._get_sequences(genome)
    return genome