Source code for rainbow.agilent.masshunter

""" 
Methods for parsing Agilent Masshunter files. 
 
"""
import os
import struct
import numpy as np
import lzf
from lxml import etree
from rainbow import DataFile


"""
MAIN PARSING METHOD 

"""

[docs] def parse_allfiles(path, prec=0): """ Finds and parses Agilent Masshunter data files. \ Currently, only HRMS data is supported. \ See the documentation on :obj:`parse_msdata` for info on the limitations. If more file formats are added in the future, \ parsing should branch out from this method. Args: path (str): Path to the Agilent .D directory. prec (int, optional): Number of decimals to round ylabels. Returns: List containing a DataFile for each parsed file. """ datafiles = [] acqdata_path = os.path.join(path, "AcqData") if not os.path.isdir(acqdata_path): return datafiles acqdata_files = set(os.listdir(acqdata_path)) if {"MSTS.xml", "MSScan.xsd", "MSScan.bin"} <= acqdata_files: if "MSProfile.bin" in acqdata_files: datafiles.append(parse_msdata(acqdata_path, prec)) # Future work should also parse the MSPeak.bin format. # elif "MSPeak.bin" in acqdata_files: # ... return datafiles
""" MS PARSING METHODS """
[docs] def parse_msdata(path, prec=0): """ Parses Masshunter MS data. IMPORTANT: Masshunter MS data can be either stored in MSProfile.bin or \ MSPeak.bin. This method only supports parsing MSProfile.bin. The following files are used (in order of listing): - MSTS.xml -> Number of retention times. - MSScan.xsd -> File structure of MSScan.bin. - MSScan.bin -> Offsets and compression info for MSProfile.bin. - MSMassCal.bin -> Calibration info for masses. - MSProfile.bin -> Actual data values. Learn more about this file format :ref:`here <hrms>`. Args: path (str): Path to the AcqData subdirectory. prec (int, optional): Number of decimals to round mz values. Returns: DataFile containing Masshunter MS data. """ # MSTS.xml: Extract number of retention times. # In future work, this step could potentially be skipped by reading # MSScan.bin until EOF and counting the offsets. # This would remove reliance on MSTS.xml being in the directory. tree = etree.parse(os.path.join(path, "MSTS.xml")) root = tree.getroot() num_times = 0 for time_seg in root.findall("TimeSegment"): num_times += int(time_seg.find("NumOfScans").text) # MSScan.xsd: Extract the file structure of MSScan.bin. # There are "simple" types can be directly translated into number types. # But there are "complex" types that are made up of other # "simple" and "complex" types. # These are stored in a dictionary to enable recursive parsing. tree = etree.parse(os.path.join(path, "MSScan.xsd")) root = tree.getroot() namespace = tree.xpath('namespace-uri(.)') complextypes_dict = {} for complextype in root.findall(f"{{{namespace}}}complexType"): innertypes = [] for element in complextype[0].findall(f"{{{namespace}}}element"): innertypes.append((element.get('name'), element.get('type'))) complextypes_dict[complextype.get('name')] = innertypes # MSScan.bin: Extract information about MSProfile.bin. # For each retention time, this includes: # - the retention time itself # - starting offset of data in MSProfile.bin # - length in bytes of the compressed data # - length in bytes of the uncompressed data # - number of masses recorded at the retention time # Future work could determine if the data blocks for each retention time # are always contiguous. If that is the case, the offset is unneeded. f = open(os.path.join(path, "MSScan.bin"), 'rb') f.seek(0x58) # start offset f.seek(struct.unpack('<I', f.read(4))[0]) data_info = np.empty(num_times, dtype=object) for i in range(num_times): # "ScanRecordType" is always the name of the root "complex" type. scan_info = read_complextype(f, complextypes_dict, "ScanRecordType") spectrum_info = scan_info['SpectrumParamValues'] data_info[i] = ( scan_info['ScanTime'], spectrum_info['PointCount'], spectrum_info['SpectrumOffset'], spectrum_info['ByteCount'], spectrum_info['UncompressedByteCount'] ) f.close() # MSMassCal.bin: Extract calibration values for masses. # There seem to be 10 doubles stored for each retention time. But this # method only uses the first 2. Future work could determine what the # other 8 doubles are used for. f = open(os.path.join(path, "MSMassCal.bin"), 'rb') f.seek(0x4c) # start offset calib_vals = np.ndarray((num_times, 2), '<d', f.read(), 0, (84, 8)) f.close() # MSProfile.bin: Extract the data values. # The raw bytes are decompressed using the `python-lzf` library. # This code may be slow. We note that LZF decompression seems to not # rely on being decoded in smaller blocks as the code suggests. # Future work could potentially implement numpy speedups by # decompressing all the bytes at once and using clever indexing. f = open(os.path.join(path, "MSProfile.bin"), 'rb') twodoubles_unpack = struct.Struct('<dd').unpack times = np.empty(num_times) num_mz_per_time = np.empty(num_times) mz_list = [] intensity_bytearr = bytearray() for i in range(num_times): time, num_mz, offset, comp_len, decomp_len = data_info[i] times[i] = time num_mz_per_time[i] = num_mz # Decompress the bytes for the current retention time. f.seek(offset) comp_bytes = f.read(comp_len) decomp_bytes = lzf.decompress(comp_bytes, decomp_len) decomp_view = memoryview(decomp_bytes) # Calculate the calibrated mz values. start_mz, delta_mz = twodoubles_unpack(decomp_view[:16]) mzs = np.arange( start_mz, start_mz + delta_mz * (num_mz - 1) + 1e-3, delta_mz) coeff, base = calib_vals[i] mzs -= base mzs *= coeff mzs **= 2 mz_list.extend(mzs.tolist()) intensity_bytearr.extend(decomp_view[16:]) # Process the extracted data values. mz_arr = np.round(np.array(mz_list), prec) intensities = np.ndarray(mz_arr.size, '<I', bytes(intensity_bytearr)) # Make the array of ylabels containing mz values. mz_ylabels = np.unique(mz_arr) mz_ylabels.sort() # Fill the `data` array containing intensities. # Optimized using numpy vectorization. mz_indices = np.searchsorted(mz_ylabels, mz_arr) data = np.zeros((num_times, mz_ylabels.size), dtype=np.uint64) cur_index = 0 for i in range(num_times): stop_index = cur_index + int(num_mz_per_time[i]) np.add.at( data[i], mz_indices[cur_index:stop_index], intensities[cur_index:stop_index]) cur_index = stop_index f.close() return DataFile("MSProfile.bin", 'MS', times, mz_ylabels, data, {})
[docs] def read_complextype(f, complextypes_dict, name): """ Reads a "complex" type from :obj:`f`. Used only for MSScan.bin. Mutually recurs with :obj:`read_type`. Args: f (_io.BufferedReader): File opened in 'rb' mode. complextypes_dict (dict): Dictionary defining all "complex" types. name (str): Name of the "complex" type to parse. Returns: Dictionary mapping subtype names to values. \ If the subtype is "complex", the value is a nested dictionary. \ Otherwise, the value is a number. """ desc_to_value = {} for subname, subtype in complextypes_dict[name]: desc_to_value[subname] = read_type(f, complextypes_dict, subtype) return desc_to_value
[docs] def read_type(f, complextype_dict, name): """ Reads a type from :obj:`f`. Used only for MSScan.bin. Mutually recurs with :obj:`read_complextype`. Args: f (_io.BufferedReader): File opened in 'rb' mode. complextypes_dict (dict): Dictionary defining all "complex" types. name (str): Name of the type to parse. Returns: If the type is "simple", a number value. \ If the type is "complex", a dictionary mapping names to values. """ if name == 'xs:byte': return struct.unpack('c', f.read(1))[0] elif name == 'xs:short': return struct.unpack('<H', f.read(2))[0] elif name == 'xs:int': return struct.unpack('<I', f.read(4))[0] elif name == 'xs:long': return struct.unpack('<Q', f.read(8))[0] elif name == 'xs:float': return struct.unpack('<f', f.read(4))[0] elif name == 'xs:double': return struct.unpack('<d', f.read(8))[0] return read_complextype(f, complextype_dict, name)