Source code for PyExpLabSys.file_parsers.chemstation

# pylint: disable=too-few-public-methods,no-member
"""File parser for Chemstation files

Copyright (C) 2015-2018 CINF team on GitHub: https://github.com/CINF

The General Stepped Program Runner is free software: you can
redistribute it and/or modify it under the terms of the GNU
General Public License as published by the Free Software
Foundation, either version 3 of the License, or
(at your option) any later version.

The General Stepped Program Runner is distributed in the hope
that it will be useful, but WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more
details.

You should have received a copy of the GNU General Public License
along with The CINF Data Presentation Website.  If not, see
<http://www.gnu.org/licenses/>.

.. note:: This file parser went through a large re-write on ??? which
   changed the data structures of the resulting objects. This means
   that upon upgrading it *will* be necessary to update code. The
   re-write was done to fix some serious errors from the first
   version, like relying on the Report.TXT file for injections
   summaries. These are now fetched from the more ordered CSV files.

"""

from __future__ import print_function, unicode_literals, division

from collections import defaultdict
import codecs
import os
from itertools import islice
from io import BytesIO
import time
import struct
from struct import unpack

# The standard library csv module is no good for encoded CSV, which is
# kind of annoying
import unicodecsv as csv
import numpy

from PyExpLabSys.thirdparty.cached_property import cached_property
from PyExpLabSys.common.supported_versions import python2_and_3

python2_and_3(__file__)


[docs]class NoInjections(Exception): """Exception raised when there are no injections in the sequence"""
[docs]class Sequence(object): """The Sequence class for the Chemstation data format Parameters: injections (list): List of :class:`~Injection`'s in this sequence sequence_dir_path (str): The path of this sequence directory metadata (dict): Dict of metadata """
[docs] def __init__(self, sequence_dir_path): """Instantiate object properties Args: sequence_dir_path (str): The path of the sequence """ self.injections = [] self.sequence_dir_path = sequence_dir_path self.metadata = {} self._parse() if not self.injections: msg = 'No injections in sequence: {}'.format(self.sequence_dir_path) raise NoInjections(msg) self._parse_metadata()
def _parse_metadata(self): """Parse metadata""" # Add metadata from first injection to sequence metadata first_injection = self.injections[0] self.metadata['sample_name'] = first_injection.metadata['sample_name'] self.metadata['sequence_start'] = first_injection.metadata['injection_date'] self.metadata['sequence_start_timestruct'] = first_injection.metadata[ 'injection_date_timestruct' ] self.metadata['acq_method'] = first_injection.metadata['acq_method'] def _parse(self): """Parse the sequence""" sequence_dircontent = os.listdir(self.sequence_dir_path) # Put the injection folders in order sequence_dircontent.sort() for filename in sequence_dircontent: injection_fullpath = os.path.join(self.sequence_dir_path, filename) if not (filename.startswith("NV-") or filename.endswith(".D")): continue if not "Report.TXT" in os.listdir(injection_fullpath): continue self.injections.append(Injection(injection_fullpath)) def __repr__(self): """Return Sequence object representation""" return "<Sequence object at {}>".format(self.sequence_dir_path)
[docs] def full_sequence_dataset(self, column_names=None): """Generate peak name specific dataset This will collect area values for named peaks as a function of time over the different injections. Args: column_names (dict): A dict of the column names needed from the report lines. The dict should hold the keys: 'peak_name', 'retention_time' and 'area'. It defaults to: `column_names = {'peak_name': 'Compound Name', 'retention_time': 'Retention Timemin', 'area': 'Area'}` Returns: dict: Mapping of signal_and_peak names and the values """ # Set the column names default values if column_names is None: column_names = { 'peak_name': 'Compound Name', 'retention_time': 'Retention Time / min', 'area': 'Area', } # Initialize the start time and data collection objects data = defaultdict(list) start_time = self.injections[0].metadata['injection_date_unixtime'] # Loop over injections and collect data for injection in self.injections: elapsed_time = injection.metadata['injection_date_unixtime'] - start_time # Unknowns is used to sum up unknown values for a detector unknowns = defaultdict(float) # Loop over signal reports for signal, report in injection.reports.items(): # Loop over report lines for report_line in report: label = self._generate_label( data, signal, report_line, column_names ) # If it is a unknown peak, add the area area = report_line[column_names['area']] if label.endswith('?'): unknowns[label] += area else: data[label].append([elapsed_time, area]) # Add the summed unknown values for this injection for key, value in unknowns.items(): data[key].append([elapsed_time, value]) return dict(data) # Convert the defaultdict back to dict
@staticmethod def _generate_label(data, signal, report_line, column_names): """str: Return a label Args: data (dict): The data collected so far signal (str): The name of the signal report_line (dict): The current report line as a dict column_names (dict): column_names dict, see :meth:`~full_sequence_dataset` """ # Base label e.g: "FID1 A - CH4" or "TCD3 C - ?" peak_name = report_line[column_names['peak_name']] label = '{} - {}'.format(signal, peak_name) if peak_name == '?': return label # Check whether we already have a label for this detector, molecule for existing_label in data: # Existing label is something like: "FID2 B - CO2 (12.071)" # Extract the base label part from that existing_base_label = existing_label.split('(')[0].rstrip() if existing_base_label == label: return existing_label # For an known peak that we do not alread know about, add the retention time to # the label return '{} ({})'.format(label, report_line[column_names['retention_time']])
[docs]class Injection(object): """The Injection class for the Chemstation data format Parameters: injection_dirpath (str): The path of the directory of this injection reports (defaultdict): Signal -> list_of_report_lines dict. Each report line is dict of column headers to type converted column content. E.g:: {u'Area': 22.81, u'Area %': 0.24, u'Height': 12.66, u'Peak Number': 1, u'Peak Type': u'BB', u'Peak Widthmin': 0.027, u'Retention Timemin': 5.81} The columns headers are also stored in :attr`~metadata` under the `columns` key. reports_raw (defaultdict): Same as :attr:`~reports` except the content is not type converted. metadata (dict): Dict of metadata raw_files (dict): Mapping of ch_file_name -> :class:`~CHFile` objects report_txt (str or None): The content of the Report.TXT file from the injection folder is any """ # This is scary. I don't know how many standard formats exist, or # if it is customizable !!! datetime_formats = ( '%m/%d/%Y %I:%M:%S %p', # '11/24/2017 12:11:42 PM' '%d-%b-%y %I:%M:%S %p', # '24-Nov-17 12:10:07 PM' '%d-%b-%y, %H:%M:%S', # '13-Jan-15, 11:16:49' )
[docs] def __init__(self, injection_dirpath, load_raw_spectra=True, read_report_txt=True): """Instantiate Injection object Args: injection_dirpath (str): The path of the injection directory load_raw_spectra (bool): Whether to load raw spectra or not read_report_txt (bool): Whether to read and save the Report.TXT file """ self.injection_dirpath = injection_dirpath self.reports = defaultdict(list) self.reports_raw = defaultdict(list) self.metadata = {} # Parse the Report00.CSV file self._parse_header() # Parse the table CSV files self._parse_tables() # Parse the raw files if requested self.raw_files = {} if load_raw_spectra: self._load_raw_spectra(injection_dirpath) # Read and save the Report.TXT file is requested self.report_txt = None if read_report_txt: report_path = os.path.join(self.injection_dirpath, 'Report.TXT') if os.path.isfile(report_path): with codecs.open(report_path, encoding='UTF16') as file_: self.report_txt = file_.read()
def _parse_date(self, date_part): """timestruct: Parse a date string in one of the formats in self.datetime_formats""" for datetime_format in self.datetime_formats: try: timestamp = time.strptime(date_part, datetime_format) break except ValueError: pass else: msg = "None of the date formats {} match the datestring {}" raise ValueError(msg.format(self.datetime_formats, date_part)) return timestamp @staticmethod def _read_csv_data(filepath): """Return a list of rows from a csv file""" bytes_io = BytesIO() with codecs.open(filepath, encoding='UTF-16LE') as file_: content = file_.read()[1:] # Get rid of the 2 byte order bytes bytes_io.write(content.encode('utf-8')) bytes_io.seek(0) csv_reader = csv.reader(bytes_io, encoding='utf-8') return list(csv_reader) def _add_value_unit_to_metadata(self, name, value, unit): """Add value or value / unit to metadata under name""" if unit.strip() != "": self.metadata[name] = value + ' / ' + unit else: self.metadata[name] = value def _parse_header(self): # pylint: disable=too-many-branches """Parse injection metadata from the Report00.CSV file Extract information about: sample name, injection date and sequence start """ csv_rows = self._read_csv_data( os.path.join(self.injection_dirpath, 'Report00.CSV') ) # Convert names and types type_functions = { 'number_of_signals': int, 'seq_line': int, 'inj': int, 'number_of_columns': int, } for row in csv_rows: # row is [name, value, other] name, value, _ = row name = name.strip().lower().replace('. ', '_').replace(' ', '_') row[0] = name if name in type_functions: row[1] = type_functions[name](value) # Parse first section of metadata row_iter = iter(csv_rows) # Use an iterator to flexibly move through the for row in row_iter: name, value, unit = row if name == 'number_of_signals': self.metadata[name] = value break self._add_value_unit_to_metadata(name, value, unit) if name in ("data_file", "analysis_method", "sequence_file"): self.metadata[name + '_filename'] = unit # Deal with signals self.metadata['signals'] = [] for name, value, _ in islice(row_iter, self.metadata['number_of_signals']): self.metadata[name] = value self.metadata['signals'].append(value) # More metadata for row in row_iter: name, value, unit = row if name == 'number_of_columns': self.metadata[name] = value break self._add_value_unit_to_metadata(name, value, unit) # Deal with columns self.metadata['columns'] = [] for name, value, unit in islice(row_iter, self.metadata['number_of_columns']): self._add_value_unit_to_metadata(name, value, unit) self.metadata[name] = self.metadata[name].strip() self.metadata['columns'].append(self.metadata[name]) # Confirm that there are no more lines left try: next(row_iter) except StopIteration: pass else: raise RuntimeError('Still items left in metadata CSV') # Add a few extra fields for time structs for name in ("injection_date", "results_created"): if name in self.metadata: self.metadata[name + '_timestruct'] = self._parse_date( self.metadata[name] ) self.metadata[name + '_unixtime'] = time.mktime( self.metadata[name + '_timestruct'] ) def _parse_tables(self): """Parse the report tables from CSV files""" # Guess types for columns types = {} for column_name in self.metadata['columns']: if 'peak number' in column_name.lower(): types[column_name] = int elif 'peak type' in column_name.lower() or 'name' in column_name.lower(): types[column_name] = str else: types[column_name] = float # Iterate over signals for signal_number in range(1, self.metadata['number_of_signals'] + 1): self._parse_table(signal_number, types) def _parse_table(self, signal_number, types): """Parse a single report table from a CSV file""" report_filename = 'REPORT{:0>2}.CSV'.format(signal_number) report_path = os.path.join(self.injection_dirpath, report_filename) csv_data = self._read_csv_data(report_path) signal = self.metadata['signal_{}'.format(signal_number)] for row in csv_data: row_dict = {} row_dict_raw = {} for column_name, value_str in zip(self.metadata['columns'], row): row_dict_raw[column_name] = value_str.strip() type_function = types[column_name] if type_function is str: row_dict[column_name] = value_str.strip() else: row_dict[column_name] = type_function(value_str) self.reports_raw[signal].append(row_dict_raw) self.reports[signal].append(row_dict) def _load_raw_spectra(self, injection_dirpath): """Load all the raw spectra (.ch-files) associated with this injection""" for file_ in os.listdir(injection_dirpath): if os.path.splitext(file_)[1] == '.ch': filepath = os.path.join(injection_dirpath, file_) self.raw_files[file_] = CHFile(filepath) def __repr__(self): """Return object representation""" return "<Injection object at {}>".format(self.injection_dirpath)
# Constants used for binary file parsing ENDIAN = '>' STRING = ENDIAN + '{}s' UINT8 = ENDIAN + 'B' UINT16 = ENDIAN + 'H' INT16 = ENDIAN + 'h' INT32 = ENDIAN + 'i'
[docs]def parse_utf16_string(file_, encoding='UTF16'): """Parse a pascal type UTF16 encoded string from a binary file object""" # First read the expected number of CHARACTERS string_length = unpack(UINT8, file_.read(1))[0] # Then read and decode parsed = unpack(STRING.format(2 * string_length), file_.read(2 * string_length)) return parsed[0].decode(encoding)
[docs]class CHFile(object): """Class that implementats the Agilent .ch file format version 179 .. warning:: Not all aspects of the file header is understood, so there may and probably is information that is not parsed. See the method :meth:`._parse_header_status` for an overview of which parts of the header is understood. .. note:: Although the fundamental storage of the actual data has change, lots of inspiration for the parsing of the header has been drawn from the parser in the `ImportAgilent.m file <https://github.com/chemplexity/chromatography/blob/dev/ Methods/Import/ImportAgilent.m>`_ in the `chemplexity/chromatography project <https://github.com/chemplexity/chromatography>`_ project. All credit for the parts of the header parsing that could be reused goes to the author of that project. Attributes: values (numpy.array): The internsity values (y-value) or the spectrum. The unit for the values is given in `metadata['units']` metadata (dict): The extracted metadata filepath (str): The filepath this object was loaded from """ # Fields is a table of name, offset and type. Types 'x-time' and 'utf16' are specially # handled, the rest are format arguments for struct unpack fields = ( ('sequence_line_or_injection', 252, UINT16), ('injection_or_sequence_line', 256, UINT16), ('start_time', 282, 'x-time'), ('end_time', 286, 'x-time'), ('version_string', 326, 'utf16'), ('description', 347, 'utf16'), ('sample', 858, 'utf16'), ('operator', 1880, 'utf16'), ('date', 2391, 'utf16'), ('inlet', 2492, 'utf16'), ('instrument', 2533, 'utf16'), ('method', 2574, 'utf16'), ('software version', 3601, 'utf16'), ('software name', 3089, 'utf16'), ('software revision', 3802, 'utf16'), ('units', 4172, 'utf16'), ('detector', 4213, 'utf16'), ('yscaling', 4732, ENDIAN + 'd'), ) # The start position of the data data_start = 6144 # The versions of the file format supported by this implementation supported_versions = {179}
[docs] def __init__(self, filepath): """Instantiate object Args: filepath (str): The path of the data file """ self.filepath = filepath self.metadata = {} with open(self.filepath, 'rb') as file_: self._parse_header(file_) self.values = self._parse_data(file_)
def _parse_header(self, file_): """Parse the header""" # Parse and check version length = unpack(UINT8, file_.read(1))[0] parsed = unpack(STRING.format(length), file_.read(length)) version = int(parsed[0]) if version not in self.supported_versions: raise ValueError('Unsupported file version {}'.format(version)) self.metadata['magic_number_version'] = version # Parse all metadata fields for name, offset, type_ in self.fields: file_.seek(offset) if type_ == 'utf16': self.metadata[name] = parse_utf16_string(file_) elif type_ == 'x-time': self.metadata[name] = unpack(ENDIAN + 'f', file_.read(4))[0] / 60000 else: self.metadata[name] = unpack(type_, file_.read(struct.calcsize(type_)))[ 0 ] # Convert date self.metadata['datetime'] = time.strptime( self.metadata['date'], '%d-%b-%y, %H:%M:%S' ) def _parse_header_status(self): """Print known and unknown parts of the header""" file_ = open(self.filepath, 'rb') # Map positions to fields for all the known fields knowns = {item[1]: item for item in self.fields} # A couple of places has a \x01 byte before a string, these we simply skip skips = {325, 3600} # Jump to after the magic number version file_.seek(4) # Initialize variables for unknown bytes unknown_start = None unknown_bytes = b'' # While we have not yet reached the data while file_.tell() < self.data_start: current_position = file_.tell() # Just continue on skip bytes if current_position in skips: file_.read(1) continue # If we know about a data field that starts at this point if current_position in knowns: # If we have collected unknown bytes, print them out and reset if unknown_bytes != b'': print( 'Unknown at', unknown_start, repr(unknown_bytes.rstrip(b'\x00')) ) unknown_bytes = b'' unknown_start = None # Print out the position, type, name and value of the known value print('Known field at {: >4},'.format(current_position), end=' ') name, _, type_ = knowns[current_position] if type_ == 'x-time': print( 'x-time, "{: <19}'.format(name + '"'), unpack(ENDIAN + 'f', file_.read(4))[0] / 60000, ) elif type_ == 'utf16': print( ' utf16, "{: <19}'.format(name + '"'), parse_utf16_string(file_) ) else: size = struct.calcsize(type_) print( '{: >6}, "{: <19}'.format(type_, name + '"'), unpack(type_, file_.read(size))[0], ) else: # We do not know about a data field at this position If we have already # collected 4 zero bytes, assume that we are done with this unkonw field, # print and reset if unknown_bytes[-4:] == b'\x00\x00\x00\x00': print( 'Unknown at', unknown_start, repr(unknown_bytes.rstrip(b'\x00')) ) unknown_bytes = b'' unknown_start = None # Read one byte and save it one_byte = file_.read(1) if unknown_bytes == b'': # Only start a new collection of unknown bytes, if this byte is not a # zero byte if one_byte != b'\x00': unknown_bytes = one_byte unknown_start = file_.tell() - 1 else: unknown_bytes += one_byte file_.close() def _parse_data(self, file_): """Parse the data""" # Go to the end of the file and calculate how many points 8 byte floats there are file_.seek(0, 2) n_points = (file_.tell() - self.data_start) // 8 # Read the data into a numpy array file_.seek(self.data_start) return ( numpy.fromfile(file_, dtype='<d', count=n_points) * self.metadata['yscaling'] ) @cached_property def times(self): """The time values (x-value) for the data set in minutes""" return numpy.linspace( self.metadata['start_time'], self.metadata['end_time'], len(self.values) )