Source code for fastq2bcl.cli

"""
CLI for fastq2bcl app

``[options.entry_points]`` section in ``setup.cfg``::

    console_scripts =
        fastq2bcl = fastq2bcl.cli:run

Then run ``pip install .`` (or ``pip install -e .`` for editable mode)
which will install the command ``fastq2bcl`` inside your current environment.

References:
    - https://setuptools.pypa.io/en/latest/userguide/entry_point.html
    - https://pip.pypa.io/en/stable/reference/pip_install
"""
import signal
import argparse
import logging
import sys
import os
import re
import textwrap
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

from pathlib import Path
from rich import print, pretty
from rich.progress import (
    Progress,
    TextColumn,
    BarColumn,
    TaskProgressColumn,
    TimeRemainingColumn,
    TimeElapsedColumn,
)
from rich.progress import track
from fastq2bcl import __version__
from fastq2bcl.parser import parse_seqdesc_fields
from fastq2bcl.reader import read_first_record, read_fastq_files, get_mask_from_files
from fastq2bcl.writer import (
    write_run_info_xml,
    write_filter,
    write_control,
    write_locs,
    write_bcl_and_stats,
    write_cycle,
)

__author__ = "Davide Rambaldi"
__copyright__ = "Davide Rambaldi"
__license__ = "MIT"

_logger = logging.getLogger(__name__)

# ---- Python API ----
# The functions defined in this section can be imported by users in their
# Python scripts/interactive interpreter, e.g. via
# `from fastq2bcl.skeleton import fib`,
# when using this Python module as a library.


[docs] def fastq2bcl( outdir, r1, r2=None, i1=None, i2=None, mask_string=None, exclude_umi=False, exclude_index=False, threads=1, ): """fastq2bcl function call :param outdir: output directory to create run flowcell fake dir :param r1: R1 fastq.gz :param r2: R2 fastq.gz :param i1: I1 fastq.gz :param i2: I2 fastq.gz Content of returned tuple: rundir: final absolute path od created rundir run_id: generated mock run_id seq_fields: fields parsed and validated from first R1 record mask_string: mask used to generate RunInfo.xml :rtype: tuple """ # First validate outdir outdir = Path(outdir).absolute() assert outdir.is_dir() assert os.access(outdir, os.W_OK) _logger.info(f"Output directory: {outdir}") # Validate R1 and extract first read r1 = Path(r1) assert r1.is_file() first_record = read_first_record(r1) seqdesc_fields = parse_seqdesc_fields(first_record.description) _logger.info(f"first record seq length: {len(first_record.seq)}") _logger.info(f"first record sequence: {str(first_record.seq)}") _logger.info(f"first record seqdesc fields: {seqdesc_fields}") print(f"[green]First sequence length[/green]: {len(first_record.seq)}") print(f"[green]First sequence[/green]:") print(textwrap.fill(str(first_record.seq), 50)) # SEE LSO docs/flow.drawio.png # CHECK INDEX if seqdesc_fields["index"] != "1": if exclude_index: print( f"[red]Founded INDEX sequence in first record[/red] {seqdesc_fields['index']}", f"[red]index sequences will NOT be included in the cycles[/red]", ) else: print( f"[green]Founded INDEX sequence in first record[/green]: {seqdesc_fields['index']}", f"[green]index sequences will be included in the cycles[/green]", ) # CHECK UMI if seqdesc_fields["UMI"] != None: if exclude_umi: print( f"[red]Founded UMI sequence in first record[/red] {seqdesc_fields['UMI']}", f"[red]umi sequences will NOT be included in the cycles[/red]", ) else: print( f"[green]Founded UMI sequence in first record[/green]: {seqdesc_fields['UMI']}", f"[green]umi sequences will be included in the cycles[/green]", ) # RUNDIR run_id = mock_run_id(seqdesc_fields) rundir = Path.joinpath(outdir, run_id) print(f"[green]RUNDIR[/green]: {rundir}") if not mask_string: # get cycles string from files mask_string = get_mask_from_files(r1, r2, i1, i2, exclude_umi, exclude_index) _logger.info(f"mask string from files: {mask_string}") print(f"[green]MASK[/green]: {mask_string}") # READ SEQUENCES sequences, positions = read_fastq_files(r1, r2, i1, i2, exclude_umi, exclude_index) # SET MASK FROM STRING mask = set_mask(mask_string) # WRITE RUN INFO _logger.info(f"Writing RunInfo.mxl to dir: {rundir}") run_info = write_run_info_xml( rundir, run_id, seqdesc_fields["run_number"], seqdesc_fields["flowcell_id"], seqdesc_fields["instrument"], mask, ) print(f"[green]RunInfo.xml:[/green]:\n", run_info) # WRITE FILTER print(f"[bold magenta]Writing filter file [/bold magenta]") _logger.info( f"Writing filter file to dir: {rundir} with cluster count: {len(sequences)}" ) write_filter(rundir, len(sequences)) # WRITE CONTROL print(f"[bold magenta]Writing control file [/bold magenta]") _logger.info( f"Writing control file to dir: {rundir} with cluster count: {len(sequences)}" ) write_control(rundir, len(sequences)) # WRITE LOCATIONS print(f"[bold magenta]Writing location file [/bold magenta]") _logger.info(f"Writing {len(positions)} locations to dir: {rundir}") write_locs(rundir, positions) # WRITE BCL AND STATS with threadss print(f"[bold magenta]Writing cycles files with {threads} threads[/bold magenta]") _logger.info(f"Writing {len(sequences)} sequences bcl and stats to dir: {rundir}") # count cycles and clusters cycles = len(sequences[0][0]) cluster_count = len(sequences) # PARALLEL WRITE OF BCL FILES # Using workers to write files. # Each worker should write a cycle file with clusters init # 1. init bcl file with cluster_count # 2. write the cycle: all clusters data for the position (cycle) # 3. write stat file # # For each cycle pass to the read the data in a tuple (cycle, clusters_base, clusters_quality) if threads > 1: with Progress( TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeRemainingColumn(), TimeElapsedColumn(), refresh_per_second=1, # bit slower updates ) as progress: futures = [] # keep track of the jobs with multiprocessing.Manager() as manager: # this is the key - we share some state between our # main process and our worker functions _progress = manager.dict() overall_progress_task = progress.add_task( "[green]All jobs progress:[/green]" ) with ProcessPoolExecutor(max_workers=threads) as executor: # iterate over the jobs we need to run for cycle in range(cycles): # set visible false so we don't have a lot of bars all at once: task_id = progress.add_task(f"cycle {cycle+1}", visible=False) # build the data required by write_cycle # TODO optimize cycle_data = [] for basecalls, qualscores in sequences: if cycle >= len(basecalls): cycle_data.append(("N", 0)) else: cycle_data.append((basecalls[cycle], qualscores[cycle])) context = (cycle, cluster_count, rundir, cycle_data) futures.append( executor.submit(write_cycle, context, _progress, task_id) ) # monitor the progress: while ( n_finished := sum([future.done() for future in futures]) ) < len(futures): progress.update( overall_progress_task, completed=n_finished, total=len(futures), ) for task_id, update_data in _progress.items(): latest = update_data["progress"] total = update_data["total"] # update the progress bar for this task: progress.update( task_id, completed=latest, total=total, visible=latest < total, ) # wait for all tasks to complete by getting all results for future in futures: future.result() else: # single thread mode for cycle in track( range(cycles), description="[bold magenta]Initialize bcl files with cluster counts ...[/bold magenta]", ): _logger.info( f"Creating bcl file for cycle #{cycle+1} with {cluster_count} clusters" ) write_bcl_and_stats(cycle, cluster_count, rundir, sequences) return run_id, rundir, seqdesc_fields, mask_string
[docs] def mock_run_id(fields): """ Mock the run directory id and Path """ run_id = ( "YYMMDD_" + fields["instrument"] + "_" + fields["run_number"].zfill(4) + "_" + fields["flowcell_id"] ) return run_id
[docs] def set_mask(mask_string): if mask_string: mask = [] regexp_mask = r"([0-9]+[NY])([0-9]+[NY])?([0-9]+[NY])?([0-9]+[NY])?" m = re.match(regexp_mask, mask_string) if not m: raise ValueError(f"Incorrect mask parse: {mask_string}") reads = [g for g in m.groups() if g != None] for g_idx in range(len(reads)): read = re.match(r"([0-9]+)([YN])", reads[g_idx]) mask.append( { "cycles": read.groups()[0], "index": read.groups()[1], "id": str(g_idx + 1), } ) return mask else: raise ValueError(f"Incorrect mask string: {mask_string}")
# ---- CLI ---- # The functions defined in this section are wrappers around the main Python # API allowing them to be called directly from the terminal as a CLI # executable/script.
[docs] def parse_args(args): """Parse command line parameters Args: args (List[str]): command line parameters as list of strings (for example ``["--help"]``). Returns: :obj:`argparse.Namespace`: command line parameters namespace """ parser = argparse.ArgumentParser( description="Convert fastq.gz reads and metadata in a bcl2fastq-able run directory" ) parser.add_argument( "--version", action="version", version=f"fastq2bcl {__version__}", ) parser.add_argument( "-v", "--verbose", dest="loglevel", help="set loglevel to INFO", action="store_const", const=logging.INFO, ) parser.add_argument( "-vv", "--very-verbose", dest="loglevel", help="set loglevel to DEBUG", action="store_const", const=logging.DEBUG, ) parser.add_argument( "-m", "--mask", dest="mask", help="define mask in format 110N10Y10Y110N" ) parser.add_argument( "-r1", "--read-1", dest="r1", help="fastq.gz with R1 reads", metavar="R1", required=True, ) parser.add_argument( "-r2", "--read-2", dest="r2", help="fastq.gz with R2 reads (optional)", metavar="R2", ) parser.add_argument( "-i1", "--index-1", dest="i1", help="fastq.gz with I1 reads (optional)", metavar="I1", ) parser.add_argument( "-i2", "--index-2", dest="i2", help="fastq.gz with I2 reads (optional)", metavar="I2", ) parser.add_argument( "-o", "--outdir", dest="outdir", help="Set the output directory for mocked run. default: cwd", default=os.getcwd(), ) parser.add_argument( "--exclude-umi", dest="exclude_umi", help="Do not write UMI from the R1 and R2 fastq reads to the cycles", action="store_true", ) parser.add_argument( "--exclude-index", dest="exclude_index", help="Do not write Index from the R1 and R2 fastq reads to the cycles", action="store_true", ) parser.add_argument( "-T", "--threads", help="Number of threads to use to write bcls. Default 1", type=int, default=1, dest="threads", ) return parser.parse_args(args)
[docs] def setup_logging(loglevel): """Setup basic logging Args: loglevel (int): minimum loglevel for emitting messages """ logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s" logging.basicConfig( level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S" )
[docs] def main(args): """Wrapper allowing :func:`fastq2bcl` to be called with string arguments in a CLI fashion Instead of returning the value from :func:`fastq2bcl`, it prints the result to the ``stdout`` in a nicely formatted message. Args: args (List[str]): command line parameters as list of strings (for example ``["--verbose", "42"]``). """ pretty.install() args = parse_args(args) setup_logging(args.loglevel) _logger.info("Starting application...") _logger.info(f"User defined mask: {args.mask}") _logger.info(f"Input files: R1={args.r1} R2={args.r2} I1={args.i1} I2={args.i2}") print("[bold green]fastq2bcl[/bold green]") print("Args:", args) # call fastq2bcl run_id, rundir, seqdesc_fields, mask_string = fastq2bcl( args.outdir, args.r1, args.r2, args.i1, args.i2, args.mask, args.exclude_umi, args.exclude_index, args.threads, ) _logger.info("Script ends here")
[docs] def run(): """Calls :func:`main` passing the CLI arguments extracted from :obj:`sys.argv` This function can be used as entry point to create console scripts with setuptools. """ main(sys.argv[1:])
if __name__ == "__main__": # ^ This is a guard statement that will prevent the following code from # being executed in the case someone imports this file instead of # executing it as a script. # https://docs.python.org/3/library/__main__.html # After installing your project with pip, users can also run your Python # modules as scripts via the ``-m`` flag, as defined in PEP 338:: # # python -m fastq2bcl.skeleton 42 # run()