Source code for PyExpLabSys.drivers.mks_g_series

""" Driver for MKS g-series flow controller """
from __future__ import print_function
import time
import logging
import serial
from PyExpLabSys.common.supported_versions import python2_and_3

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)


[docs]class MksGSeries: """ Driver for G-series flow controllers from MKS """
[docs] def __init__(self, port='/dev/ttyUSB0'): # TODO: Auto-check all possible baud-rates self.ser = serial.Serial(port, 9600) self.ser.parity = serial.PARITY_NONE self.ser.bytesize = serial.EIGHTBITS self.ser.stopbits = serial.STOPBITS_ONE
[docs] def checksum(self, command, reply=False): """ Calculate checksum of command """ if not reply: com_string = '@' + command else: com_string = command total = 0 for i in range(0, len(com_string)): total = total + ord(com_string[i]) return (hex(total)[-2:]).upper()
[docs] def comm(self, command, addr): """ Implements communication protocol """ com_string = str(addr).zfill(3) + command + ';' checksum = self.checksum(com_string) com_string = '@@@@' + com_string + checksum com_string = com_string.encode('ascii') self.ser.write(com_string) time.sleep(0.1) reply = self.ser.read(self.ser.inWaiting()) try: reply = reply.decode('ascii') except UnicodeDecodeError: reply = reply.decode('ascii', 'ignore') reply = reply.strip('\x00') reply = '@' + reply if len(reply) == 0: LOGGER.warning('No such device') else: if reply[-3:] == self.checksum(reply[1:-3], reply=True): reply = reply[6:-3] # Cut away master address and checksum else: LOGGER.error('Checksum error in reply') reply = '' if reply[1:4] == 'ACK': reply = reply[4:-3] else: LOGGER.warning('Error in command') return reply
[docs] def read_full_scale_range(self, addr): """ Read back the current full scale range from the instrument """ command = 'U?' unit = self.comm(command, addr) command = 'FS?' value = self.comm(command, addr) return value + unit
[docs] def read_device_address(self, address=254): """ Read the device address """ command = 'CA?' return self.comm(command, address)
[docs] def set_device_address(self, old_addr, new_addr): """ Set the device address """ if (new_addr > 0) and (new_addr < 254): addr_string = str(new_addr).zfill(3) command = 'CA!' + addr_string self.comm(command, old_addr)
[docs] def read_current_gas_type(self, addr): """ Read the current default gas type """ command = 'PG?' reply = self.comm(command, addr) return reply
[docs] def read_run_hours(self, addr): """ Return number of running hours of mfc """ command = 'RH?' return self.comm(command, addr)
[docs] def read_setpoint(self, addr): """ Read current setpoint """ command = 'SX?' value = float(self.comm(command, addr)) return value
[docs] def set_flow(self, value, addr=254): """ Set the flow setpoint """ command = 'SX!' + str(round(value, 1)) self.comm(command, addr) return True
[docs] def purge(self, t=1, addr=254): """ purge for t seconds, default is 1 second """ command1 = 'VO!PURGE' command2 = 'VO!NORMAL' self.comm(command1, addr) print('PURGING') time.sleep(abs(t)) self.comm(command2, addr) print('DONE PURGING')
[docs] def read_flow(self, addr=254): """ Read the flow """ command = 'FX?' error = 1 while (error > 0) and (error < 50): try: flow = float(self.comm(command, addr)) error = -1 except ValueError: error = error + 1 flow = -1 return flow
[docs] def read_serial_number(self, addr=254): """ Read the serial number of the device """ command = 'SN?' return self.comm(command, addr)
if __name__ == '__main__': MKS = MksGSeries() print(MKS.read_serial_number(1)) print(MKS.read_full_scale_range(1)) # print(MKS.set_device_address(254,005))