Source code for PyExpLabSys.drivers.pfeiffer_qmg422

# pylint: disable=C0103

""" This module contains the driver code for the QMG422 control box
for a pfeiffer mass-spectrometer. The code should in principle work
for multiple type of electronics. It has so far been tested with a
qme-125 box and a qme-??? box. The module is ment as a driver and
has very little function in itself. The module is ment to be used
as a sub-module for a large program providing the functionality to
actually use the mass-spectrometer.

Known bugs: Not all code has a proper distinction between the various
electronics. The qme-125 has many limitations compared to the qme-???
and these limitations are not always properly expressed in the code
or the output of the module
"""
from __future__ import print_function
import time
import logging
import serial
from PyExpLabSys.common.supported_versions import python2_and_3

python2_and_3(__file__)

LOGGER = logging.getLogger(__name__)
# Make the logger follow the logging setup from the caller
LOGGER.addHandler(logging.NullHandler())


[docs]class qmg_422(object): """ The actual driver class. """
[docs] def __init__(self, port='/dev/ttyS0', speed=19200): """Initialize the module""" self.serial = serial.Serial(port, speed, timeout=2.0) self.reverse_range = False self.communication_mode(computer_control=True) self.type = '422' if self.comm('SQA') == '1': self.series = '400' else: self.series = '125' self.state = {'emission': 'Unknown', 'sem': 'Unknown'}
[docs] def comm(self, command): """Communicates with Baltzers/Pferiffer Mass Spectrometer Implements the low-level protocol for RS-232 communication with the instrument. High-level protocol can be implemented using this as a helper :param command: The command to send :type command: str :return: The reply associated with the last command :rtype: str """ done = False iterations = 0 while not done: iterations += 1 LOGGER.debug("Command in progress: %s", command) n = self.serial.inWaiting() if n > 0: # Skip characters that are currently waiting in line debug_info = self.serial.read(n).decode() LOGGER.debug( "Elements not read: " + str(n) + ": Contains: " + debug_info ) ret = " " error_counter = 0 while not ret[0] == chr(6): error_counter += 1 command_text = command + '\r' LOGGER.debug("Command text: %s", command_text) self.serial.write(command_text.encode('ascii')) ret = self.serial.readline().decode() LOGGER.debug("Debug: Error counter: %d", error_counter) LOGGER.debug("ret: %d", ord(ret[0])) LOGGER.debug("In waiting: %d", n) if error_counter == 3: LOGGER.warning("Communication error: %d", error_counter) if error_counter == 10: LOGGER.error("Communication error: %d", error_counter) if error_counter > 11: LOGGER.error("Communication error! Quit program!") quit() # We are now quite sure the instrument is ready to give back data self.serial.write(chr(5).encode('ascii')) ret = self.serial.readline().decode() LOGGER.debug("Number in waiting after enq: %d", n) LOGGER.debug("Return value after enq: %s", ret) LOGGER.debug("Ascii value of last char in ret: %d", ord(ret[-1])) if (iterations > 1) and (iterations < 1000): LOGGER.info(iterations) if (ret[-1] == chr(10)) or (ret[-1] == chr(13)): ret_string = ret.strip() done = True else: LOGGER.info("Wrong line termination") LOGGER.info("Ascii value of last char in ret: %s", ord(ret[-1])) LOGGER.info('Value of string: %s', ret) time.sleep(0.5) self.serial.write(chr(5)) ret = ' ' while ret[-1] != '\n': ret += self.serial.read(1) # ret = self.serial.readline() ret_string = ret.strip() LOGGER.info("Ascii value of last char in ret: %d", ord(ret[-1])) LOGGER.info('Value of string: %s', ret) LOGGER.info('Returning: %s', ret_string) done = True return ret_string
[docs] def communication_mode(self, computer_control=False): """Returns and sets the communication mode. :param computer_control: Activates ASCII communication with the device :type computer_control: bool :return: The current communication mode :rtype: str """ if computer_control: ret_string = self.comm('CMO ,1') else: ret_string = self.comm('CMO') comm_mode = ret_string if ret_string == '0': comm_mode = 'Console Keybord' if ret_string == '1': comm_mode = 'ASCII' if ret_string == '2': comm_mode = 'BIN' if ret_string == '3': comm_mode = 'Modem' if ret_string == '4': comm_mode = 'LAN' return comm_mode
[docs] def simulation(self): """Chekcs wheter the instruments returns real or simulated data :return: Message telling whether the device is in simulation mode :rtype: str """ ret_string = self.comm('TSI ,0') if int(ret_string) == 0: sim_state = "Simulation not running" else: sim_state = "Simulation running" return sim_state
[docs] def set_channel(self, channel): """Set the current channel :param channel: The channel number :type channel: integer """ self.comm('SPC ,' + str(channel)) # Select the relevant channel
[docs] def read_sem_voltage(self): """Read the SEM Voltage :return: The SEM voltage :rtype: str """ sem_voltage = self.comm('SHV') return sem_voltage
[docs] def read_preamp_range(self): """Read the preamp range This function is not fully implemented :return: The preamp range :rtype: str """ auto_range = self.comm('AMO') if auto_range in ('1', '2'): preamp_range = '0' # Idicates auto-range in mysql-table else: preamp_range = self.comm('ARA') return preamp_range
[docs] def read_timestep(self): """Reads the integration period of a measurement :return: The integration period in non-physical unit :rtype: str """ # TODO: Make a look-up table to make the number physical timestep = self.comm('MSD') return timestep
[docs] def sem_status(self, voltage=-1, turn_off=False, turn_on=False): """Get or set the SEM status :param voltage: The wanted SEM-voltage :type voltage: integer :param turn_off: If True SEM will be turned on (unless turn_of==True) :type turn_off: boolean :param turn_on: If True SEM will be turned off (unless turn_on==True) :type turn_on: boolean :return: The SEM voltage, The SEM status, True means voltage on :rtype: integer, boolan """ if voltage > -1: ret_string = self.comm('SHV ,' + str(voltage)) else: ret_string = self.comm('SHV') sem_voltage = int(ret_string) if turn_off ^ turn_on: # Only accept self-consistent sem-changes if turn_off: self.comm('SEM ,0') if turn_on: self.comm('SEM ,1') ret_string = self.comm('SEM') sem_on = ret_string == "1" return sem_voltage, sem_on
[docs] def emission_status(self, current=-1, turn_off=False, turn_on=False): """Get or set the emission status. :param current: The wanted emission status. Only works for QME??? :type current: integer :param turn_off: If True emission will be turned on (unless turn_of==True) :type turn_off: boolean :param turn_on: If True emission will be turned off (unless turn_on==True) :type turn_on: boolean :return: The emission value (for QME???), The emission status, True means filament on :rtype: integer, boolan """ if current > -1: ret_string = self.comm('EMI ,' + str(current)) emission_current = float(ret_string.strip()) else: ret_string = self.comm('EMI') emission_current = float(ret_string.strip()) if turn_off ^ turn_on: if turn_off: self.comm('FIE ,0') if turn_on: self.comm('FIE ,1') ret_string = self.comm('FIE') filament_on = ret_string == '1' return emission_current, filament_on
[docs] def detector_status(self, SEM=False, faraday_cup=False): """ Choose between SEM and Faraday cup measurements""" if SEM ^ faraday_cup: if SEM: ret_string = self.comm('SDT ,1') if faraday_cup: ret_string = self.comm('SDT ,0') else: ret_string = self.comm('SDT') if int(ret_string) > 0: detector = "SEM" else: detector = "Faraday Cup" return detector
[docs] def read_voltages(self): """ Read the qme-voltages """ print("V01: " + self.comm('VO1')) # 0..150, 1V steps print("V02: " + self.comm('VO2')) # 0..125, 0.5V steps print("V03: " + self.comm('VO3')) # -30..30, 0.25V steps print("V04: " + self.comm('VO4')) # 0..60, 0.25V steps print("V05: " + self.comm('VO5')) # 0..450, 2V steps print("V06: " + self.comm('VO6')) # 0..450, 2V steps print("V07: " + self.comm('VO7')) # 0..250, 1V steps print("V08: " + self.comm('VO8')) # -125..125,1V steps print("V09: " + self.comm('VO9')) # 0..60 ,0.25V steps
[docs] def update_state(self): """ Update the knowledge of the internal knowledge of the instrument """ raw_state = self.comm('ESQ') LOGGER.error('QMS State, ESQ: %s', raw_state) state = {} if self.series == '125': state['emission_state'] = raw_state.split(',')[1] else: # Emission state is 125 specific state['emission_state'] = 'Unknown' raw_state = int(raw_state[: raw_state.find(',')]) raw_state = bin(raw_state)[2:].zfill(16) state['running'] = 'Not running' if raw_state[15] == '0' else 'Running' state['mode'] = 'Mono' if raw_state[14] == '0' else 'Multi' state['emission'] = 'Off' if raw_state[13] == '0' else 'On' state['sem'] = 'Off' if raw_state[12] == '0' else 'On' state['4'] = '0' if raw_state[11] == '0' else '1' state['5'] = '0' if raw_state[10] == '0' else '1' state['6'] = '0' if raw_state[9] == '0' else '1' state['7'] = '0' if raw_state[8] == '0' else '1' state['8'] = '0' if raw_state[7] == '0' else '1' state['9'] = '0' if raw_state[6] == '0' else '1' state['10'] = '0' if raw_state[5] == '0' else '1' state['11'] = '0' if raw_state[4] == '0' else '1' state['12'] = '0' if raw_state[3] == '0' else '1' state['13'] = '0' if raw_state[2] == '0' else '1' state['ringbuffer'] = 'Partly filled' if raw_state[1] == '0' else 'Empty' state['ringbuffer'] = state['ringbuffer'] if raw_state[0] == '0' else 'Overflow' self.state = state
[docs] def start_measurement(self): """ Start the measurement """ start = time.time() LOGGER.error('QMS Errors, ERR: %s', self.comm('ERR')) LOGGER.error('QMS Warnings, EWN: %s', self.comm('EWN')) LOGGER.error('Start time: %f', time.time() - start) self.update_state() LOGGER.error('Start time: %f', time.time() - start) self.comm('CRU ,2')
[docs] def actual_range(self, amp_range): """ Returns the range that should be send to achieve the desired range """ real_range = amp_range if self.reverse_range is True: if amp_range == -9: real_range = -11 if amp_range == -10: real_range = -12 if amp_range == -11: real_range = -9 if amp_range == -12: real_range = -10 return real_range
[docs] def get_single_sample(self): """ Read a single sample from the device """ samples = 0 while samples == 0: try: status = self.comm('MBH') except: samples = samples - 1 status = 'Error' LOGGER.error('Serial timeout, continuing measurement') LOGGER.info('Status: %s', status) try: status = status.split(',') # Sometimes an error occurs in this response (most often "526,0" instead of # "1,1,9,1,0". The correct response seems to lie unread in the machine # Solved now by just resending the command 'MBH'. if len(status) != 5: # try again LOGGER.warning('Could not read status properly - trying again') LOGGER.warning(status) continue samples = int(status[3]) except: LOGGER.warning('Could not read status, continuing measurement') LOGGER.warning(status) samples = samples - 1 if ( samples < -30 ): # This will only be invoked if status.split(',')[3] returns -30 ? usefull_value = False value = -1 break if samples > 0: try: value = self.comm('MDB') usefull_value = True except: LOGGER.error('Error in MDB command') value = -1 usefull_value = False else: value = -1 usefull_value = False return value, usefull_value
[docs] def get_multiple_samples(self, number): """ Read multiple samples from the device """ values = [0] * number for i in range(0, number): values[i] = self.comm('MDB') return values
[docs] def config_channel(self, channel, mass=-1, speed=-1, enable="", amp_range=0): """ Config a MS channel for measurement """ self.comm('SPC ,' + str(channel)) # SPC: Select current parameter channel if mass > -1: self.comm('MFM ,' + str(mass)) if speed > -1: self.comm('MSD ,' + str(speed)) if enable == "yes": self.comm('AST ,0') if enable == "no": self.comm('AST ,1') if amp_range == 0: self.comm('AMO, 1') # Auto range with lower limit # TODO: Lower limit should be read from config file self.comm('ARL, -11') # Lower auto range level else: self.comm('AMO, 0') # Fix range self.comm('ARA, ' + str(self.actual_range(amp_range))) # Default values, not currently choosable from function parameters self.comm('DSE ,0') # Use default SEM voltage self.comm('DTY ,1') # Use SEM for ion detection self.comm('SDT ,1') # Use SEM for ion detection # self.comm('DTY ,0') #Use Faraday cup for ion detection # self.comm('SDT ,0') #Use Faraday cup for ion detection self.comm('MMO ,3') # Single mass measurement (opposed to mass-scan) self.comm('MRE ,15') # Peak resolution
[docs] def measurement_running(self): """ Check if a measurement is running """ error = 0 while error < 10: status = self.comm('MBH') status = status.split(',') try: running = int(status[0]) break except ValueError: error = error + 1 if error < 9: return_val = running == 0 else: return_val = False return return_val
[docs] def waiting_samples(self): """ Return number of waiting samples """ header = self.comm('MBH') header = header.split(',') number_of_samples = int(header[3]) return number_of_samples
[docs] def mass_scan(self, first_mass, scan_width, amp_range=0, speed=9): """ Setup the mass spec for a mass scan """ speed_list = { 0: 0.0005, 1: 0.001, 2: 0.002, 3: 0.005, 4: 0.010, 5: 0.020, 6: 0.050, 7: 0.1, 8: 0.2, 9: 0.5, 10: 1, 11: 2, 12: 5, 13: 10, 14: 20, 15: 60, } # unit: [s/amu] try: total_time = scan_width * speed_list[speed] except: total_time = -1 if amp_range == 0: self.comm('AMO, 1') # Auto range with lower limit # TODO: Lower limit should be read from config file self.comm('ARL, -11') # Lower auto range level else: self.comm('AMO, 0') # Fix range self.comm('ARA, ' + str(self.actual_range(amp_range))) self.comm('CYM, 0') # 0, single. 1, multi self.comm('SMC, 0') # Channel 0 self.comm('DSE ,0') # Use default SEM voltage self.comm('DTY ,1') # Use SEM for ion detection self.comm('SDT ,1') # Use SEM for ion detection self.comm('MRE ,1') # Resolve peak self.comm('MMO, 0') # Mass scan, to enable FIR filter, set value to 1 self.comm('MST, 0') # Steps 0: 1: 2: 64/amu self.comm('MSD, ' + str(speed)) # Speed self.comm('MFM, ' + str(first_mass)) # First mass self.comm('MWI, ' + str(scan_width)) # Scan width return total_time
[docs] def mass_time(self, ns): """ Setup the mass spec for a mass-time measurement """ self.comm('CYM ,1') # 0, single. 1, multi self.comm('CTR ,0') # Trigger mode, 0=auto trigger self.comm('CYS ,1') # Number of repetitions self.comm('CBE ,1') # First measurement channel in multi mode self.comm('CEN ,' + str(ns)) # Last measurement channel in multi mod
if __name__ == '__main__': qmg = qmg_422(port='/dev/ttyUSB0') print(qmg.communication_mode(computer_control=True)) print(qmg.read_voltages()) print(qmg.detector_status()) print(qmg.comm('SMR')) print('---') print('DTY: ' + qmg.comm('DTY')) # Signal source, 0: Faraday, 1: SEM print('DSE: ' + qmg.comm('SHV')) # SEM Voltage print('ECU: ' + qmg.comm('ECU')) print('SEM: ' + qmg.comm('SEM')) # SEM Voltage print('SQA: ' + qmg.comm('SQA')) # Type of analyzer, 0: 125, 1: 400, 4:200 print('SMR: ' + qmg.comm('SMR')) # Mass-range, this needs to go in a config-file print('SDT: ' + qmg.comm('SDT')) # Detector type print('SIT: ' + qmg.comm('SIT')) # Ion source print('AIN: ' + qmg.comm('AIN')) # Analog in print(qmg.state) qmg.update_state() print(qmg.state)