Source code for PyExpLabSys.drivers.bronkhorst

""" Driver for Bronkhorst flow controllers, including simple test case """
from __future__ import print_function
import time
import sys
import serial
from PyExpLabSys.common.supported_versions import python2_and_3

python2_and_3(__file__)


[docs]class Bronkhorst(object): """ Driver for Bronkhorst flow controllers """
[docs] def __init__(self, port, max_flow): self.ser = serial.Serial(port, 38400, timeout=2) self.max_setting = max_flow time.sleep(0.25) ser = self.read_serial() if len(ser) < 3: raise Exception('MfcNotFound')
[docs] def comm(self, command): """ Send commands to device and recieve reply """ self.ser.write(command.encode('ascii')) time.sleep(0.1) return_string = self.ser.read(self.ser.inWaiting()) return_string = return_string.decode() return return_string
[docs] def read_setpoint(self): """ Read the current setpoint """ read_setpoint = ':06800401210121\r\n' # Read setpoint response = self.comm(read_setpoint) response = int(response[11:], 16) response = (response / 32000.0) * self.max_setting return response
[docs] def read_flow(self): """ Read the actual flow """ error = 0 while error < 10: read_pressure = ':06800401210120\r\n' # Read pressure val = self.comm(read_pressure) try: val = val[-6:] num = int(val, 16) pressure = (1.0 * num / 32000) * self.max_setting break except ValueError: pressure = -99 error = error + 1 return pressure
[docs] def set_flow(self, setpoint): """ Set the desired setpoint, which could be a pressure """ setpoint = float(setpoint) if setpoint > 0: setpoint = (1.0 * setpoint / self.max_setting) * 32000 setpoint = hex(int(setpoint)) setpoint = setpoint.upper() setpoint = setpoint[2:].rstrip('L') if len(setpoint) == 3: setpoint = '0' + setpoint else: setpoint = '0000' set_setpoint = ':0680010121' + setpoint + '\r\n' # Set setpoint response = self.comm(set_setpoint) response_check = response[5:].strip() if response_check == '000005': response = 'ok' else: response = 'error' return response
[docs] def read_counter_value(self): """ Read valve counter. Not fully implemented """ read_counter = ':06030401210141\r\n' response = self.comm(read_counter) return str(response)
[docs] def set_control_mode(self): """ Set the control mode to accept rs232 setpoint """ set_control = ':058001010412\r\n' response = self.comm(set_control) return str(response)
[docs] def read_serial(self): """ Read the serial number of device """ read_serial = ':1A8004F1EC7163006D71660001AE0120CF014DF0017F077101710A\r\n' error = 0 while error < 10: response = self.comm(read_serial) response = response[13:-84] if sys.version_info[0] < 3: # Python2 try: response = response.decode('hex') except TypeError: response = '' else: # Python 3 try: response = bytes.fromhex(response).decode('utf-8') except ValueError: response = '' if response == '': error = error + 1 else: error = 10 return str(response)
[docs] def read_unit(self): """ Read the flow unit """ read_capacity = ':1A8004F1EC7163006D71660001AE0120CF014DF0017F077101710A\r\n' response = self.comm(read_capacity) response = response[77:-26] try: response = bytes.fromhex(response).decode('utf-8') except AttributeError: # Python2 response = response.decode('hex') return str(response)
[docs] def read_capacity(self): """ Read ?? from device """ read_capacity = ':1A8004F1EC7163006D71660001AE0120CF014DF0017F077101710A\r\n' response = self.comm(read_capacity) response = response[65:-44] # response = response.decode('hex') return str(response)
if __name__ == '__main__': bh = Bronkhorst('/dev/ttyUSB3', 5) # print bh.set_setpoint(1.0) # time.sleep(1) print(bh.read_serial()) print(bh.read_flow()) print(bh.read_unit()) print(bh.read_capacity()) print(bh.read_counter_value())