Source code for fastq2bcl.writer

import logging
import struct
from pathlib import Path

_logger = logging.getLogger(__name__)


[docs] def write_run_info_xml(rundir, run_id, run_number, flowcell_id, instrument, mask): """ Write RunInfo.xml """ runinfo = generate_run_info_xml(run_id, run_number, flowcell_id, instrument, mask) _logger.info(f"RunInfo.xml:\n{runinfo}") # Create directory and write file xmlout = Path.joinpath(rundir, "RunInfo.xml") xmlout.parent.mkdir(exist_ok=True, parents=True) with open(xmlout, "wt") as f_out: f_out.write(runinfo) return runinfo
[docs] def generate_run_info_xml(run_id, run_number, flowcell_id, instrument, mask): """ Generate a valid Runinfo xml file. """ # check mask and write mask xml_mask = "" for m in mask: xml_mask += f"""<Read NumCycles="{m['cycles']}" Number="{m['id']}" IsIndexedRead="{m['index']}" />""" xml = f"""<?xml version="1.0"?> <RunInfo xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" Version="2"> <Run Id="{run_id}" Number="{run_number}"> <Flowcell>{flowcell_id}</Flowcell> <Instrument>{instrument}</Instrument> <Date>YYMMDD</Date> <Reads> { xml_mask } </Reads> <FlowcellLayout LaneCount="1" SurfaceCount="1" SwathCount="1" TileCount="1" /> </Run> </RunInfo> """ return xml
[docs] def write_filter(rundir, cluster_count): """ Write filter """ path = rundir / "Data/Intensities/BaseCalls/L001/s_1_1101.filter" path.parent.mkdir(exist_ok=True, parents=True) with open(path, "wb") as f_out: f_out.write(bytes([0, 0, 0, 0])) f_out.write(bytes([3, 0, 0, 0])) f_out.write(struct.pack("<I", cluster_count)) f_out.write(bytes([1] * cluster_count))
[docs] def write_control(rundir, cluster_count): """ Write control file """ path = rundir / "Data/Intensities/BaseCalls/L001/s_1_1101.control" path.parent.mkdir(exist_ok=True, parents=True) with open(path, "wb") as f_out: f_out.write(bytes([0, 0, 0, 0])) # "Zero value (for backwards compatibility)" f_out.write(bytes([2, 0, 0, 0])) # "Format version number" f_out.write(struct.pack("<I", cluster_count)) # "Number of clusters" f_out.write(bytes([0, 0] * cluster_count)) # two bytes for each cluster
[docs] def write_locs(outdir, positions): """ Write locations. Args: Positions (List(tuple)): is a List of tuple with x and y values """ # From mkdata.sh of bcl2fastq # printf '0: 010000000000803f' | xxd -r -g0 > "$locs_filename" # printf '0: %.8x' $clusters_count | sed -E 's/0: (..)(..)(..)(..)/0: \4\3\2\1/' | xxd -r -g0 >> "$locs_filename" # So with 1 cluster count should be # 01 00 00 00 00 00 80 3f # 01 00 00 00 CDCC8C3F 9A99993F # Source of this is bcl2fastq/src/cxx/lib/data # struct Record # { # /// \brief X-coordinate. # float x_; # /// \brief y-coordinate. # float y_; # } path = Path(outdir) / "Data/Intensities/L001/s_1_1101.locs" path.parent.mkdir(exist_ok=True, parents=True) with open(path, "wb") as f_out: f_out.write(bytes([1, 0, 0, 0, 0, 0, 0x80, 0x3F])) f_out.write(struct.pack("<I", len(positions))) for position in positions: f_out.write(encode_loc_bytes(position[0], position[1]))
[docs] def encode_loc_bytes(x_pos, y_pos): """ Encode x and y positon. FIXME this is not the correct formul according to the bcl2fastq source code. """ x_bytes = struct.pack("<f", (int(x_pos) - 1000) / 10) y_bytes = struct.pack("<f", (int(y_pos) - 1000) / 10) return x_bytes + y_bytes
[docs] def encode_cluster_byte(base, qual): """ Encode cluster byte. Bits 0-1 are the bases, respectively [A, C, G, T] for [0, 1, 2, 3]: bits 2-7 are shifted by two bits and contain the quality score. All bits 0 in a byte is reserved for no-call. """ if base == "N": return bytes([0]) # no call qual = qual << 2 base = ["A", "C", "G", "T"].index(base) return bytes([qual | base])
[docs] def init_bcl_and_write_cluster_counts(cycledir, cluster_count, filename="s_1_1101.bcl"): """ Create bcl file and write cluster count """ with open(cycledir / filename, "wb") as f_out: f_out.write(struct.pack("<I", cluster_count))
[docs] def write_cycle(context, progress, task_id): """ Write a cycle file with a thread. with progress, task_id and exit event context: tuple with (cycle, cluster_count, outdir, data) data: tuple (base, quality) for a cluster """ cycle, cluster_count, outdir, data = context cycledir = get_cycle_dir(outdir, cycle) _logger.info( f"Writing {cluster_count} clusters for cycle: {cycle+1} to dir {cycledir}" ) init_bcl_and_write_cluster_counts(cycledir, cluster_count) # write data sequences_written = 0 for base, quality in data: _logger.debug(f"Appending seq: {base}") filename = cycledir / "s_1_1101.bcl" append_data_to_bcl(base, quality, filename) sequences_written += 1 progress[task_id] = {"progress": sequences_written, "total": len(data)} # write stats write_stat_file(cycledir / "s_1_1101.stats")
[docs] def write_bcl_and_stats(cycle, cluster_count, outdir, sequences): """ Single process mode to write bcls """ cycledir = get_cycle_dir(outdir, cycle) filename = cycledir / "s_1_1101.bcl" init_bcl_and_write_cluster_counts(cycledir, cluster_count) # write data for basecalls, qualscores in sequences: if cycle >= len(basecalls): _logger.info(f"Sequence is shorter than expected, adding N") append_data_to_bcl("N", 0, filename) else: append_data_to_bcl(basecalls[cycle], qualscores[cycle], filename) _logger.debug( f"Appending basecall: {basecalls[cycle]} to bcl for cycle {cycle+1} lenght sequence {len(basecalls)}" ) # write stats write_stat_file(cycledir / "s_1_1101.stats")
[docs] def write_stat_file(filename): with open(filename, "wb") as f_out: # can I get away with this? f_out.write(bytes([0] * 108))
[docs] def append_data_to_bcl(base, quality, filename): bcl_byte = encode_cluster_byte(base, quality) with open(filename, "ab") as f_out: f_out.write(bcl_byte)
[docs] def get_cycle_dir(outdir, cycle, lane="L001"): cycledir = outdir / f"Data/Intensities/BaseCalls/{lane}/C{cycle+1}.1" cycledir.mkdir(exist_ok=True, parents=True) return cycledir