Source code for fastq2bcl.reader

import logging
import gzip
from fastq2bcl.parser import parse_seqdesc_fields
from Bio import SeqIO
from rich import print
import sys

_logger = logging.getLogger(__name__)


[docs] def read_first_record(fastq_file): """ Validate fastq.gz r1 file and extract first read """ _logger.info(f"Opening gz file {fastq_file}") with gzip.open(fastq_file, "rt") as fastq_fh: return next(SeqIO.parse(fastq_fh, "fastq"))
[docs] def get_file_handlers(r1, r2, i1, i2): """ Return list of FH """ files_fh = [gzip.open(r1, "rt")] if not i1 == None: files_fh.append(gzip.open(i1, "rt")) if not i2 == None: files_fh.append(gzip.open(i2, "rt")) if not r2 == None: files_fh.append(gzip.open(r2, "rt")) return files_fh
[docs] def get_mask_from_files(r1, r2, i1, i2, exclude_umi, exclude_index): """ Build a mask string using seq length. In case of index and/or UMI in R1 sequence description, write this length to the Index mask """ record_1 = read_first_record(r1) seq_fields = parse_seqdesc_fields(record_1.description) index_1_bases = 0 # Write R1 mask mask = f"{len(record_1.seq)}N" # check errors on index for R1 if seq_fields["index"] != "1" and not exclude_index: if i1 != None or i2 != None: raise ValueError( "Usage of index from sequence desc and I1 and I2 files at the same time is not supported" ) # continue and write to index I1 length TODO I2 for double index print(f"LENGTH INDEX {seq_fields['index']}") index_1_bases += len(seq_fields["index"]) # check errors on UMI for R1 if seq_fields["UMI"] != None and not exclude_umi: if i1 != None or i2 != None: raise ValueError( "Usage of UMI from sequence desc and I1 and I2 files at the same time is not supported" ) # continue and write to index I1 index_1_bases += len(seq_fields["UMI"]) # Write index I1 based on UMI and index if index_1_bases > 0: mask += f"{index_1_bases}Y" # Write indexes if i1 != None: index_1 = read_first_record(i1) mask += f"{len(index_1.seq)}Y" if i2 != None: index_2 = read_first_record(i2) mask += f"{len(index_2.seq)}Y" # Write R2 record if r2 != None: record_2 = read_first_record(r2) # finally add R2 to mask mask += f"{len(record_2.seq)}N" return mask
[docs] def read_fastq_files(r1, r2, i1, i2, exclude_umi, exclude_index): """ Read fastq files R1-R2 with I1 and I2 and return only the data we need """ # return a list of tuple with seq, qual # and a list of tuple for pos with x and y # SINGLE R1 # sequences = [('AAAA',1111)] # positions = [(1,1)] # # in case of multiple files R1-R2: # PAIR R1-R2 # sequences = [('AAAABBBB',11111111)] # positions = [(1,1)] # # I need way to handle multiple files and merge them in a single with exitstack # Ref https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack # build a list of iterators file_handlers = get_file_handlers(r1, r2, i1, i2) seq_iterators = [SeqIO.parse(fh, "fastq") for fh in file_handlers] # output Lists sequences = [] positions = [] try: # iterate over the R1 iterator for r1_record in seq_iterators[0]: # call next in additional iterators opt_data = [next(iterator) for iterator in seq_iterators[1:]] # store R1 data record_fields = parse_seqdesc_fields(r1_record.description) record_id = r1_record.id record_seq = str(r1_record.seq) record_qual = r1_record.letter_annotations["phred_quality"] if not exclude_index and record_fields["index"] != "1": _logger.info(f"Reading index field: {record_fields['index']}") record_seq += record_fields["index"] _logger.info(f"New seq len: {len(record_seq)} seq: {record_seq}") record_qual += [40] * len(record_fields["index"]) _logger.info(f"New qual: {record_qual}") # the UMI is quality MAX (40) if not exclude_umi and record_fields["UMI"] != None: _logger.info(f"Reading umi field: {record_fields['UMI']}") record_seq += record_fields["UMI"] _logger.info(f"New seq len: {len(record_seq)} seq: {record_seq}") record_qual += [40] * len(record_fields["UMI"]) _logger.info(f"New qual: {record_qual}") for opt_record in opt_data: if opt_record.id != record_id: raise ValueError( f"Seq ID mismatch for record {opt_record.id} R1 is {record_id}" ) record_seq += str(opt_record.seq) record_qual += opt_record.letter_annotations["phred_quality"] # append cluster position positions.append((record_fields["x_pos"], record_fields["y_pos"])) # append sequence and qual sequences.append((record_seq, record_qual)) finally: # close all files for file_fh in file_handlers: file_fh.close() return (sequences, positions)