Source code for PyExpLabSys.drivers.brooks_s_protocol

""" Driver for Brooks s-protocol """
from __future__ import print_function
import time
import struct
import logging
import serial
from six import b, indexbytes
from PyExpLabSys.common.supported_versions import python2_and_3

# Configure logger as library logger and set supported python versions
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)


[docs]class Brooks(object): """ Driver for Brooks s-protocol """
[docs] def __init__(self, device, port='/dev/ttyUSB0'): self.ser = serial.Serial(port, 19200) self.ser.parity = serial.PARITY_ODD self.ser.bytesize = serial.EIGHTBITS self.ser.stopbits = serial.STOPBITS_ONE deviceid = self.comm('8280000000000b06' + self.pack(device[-8:])) manufactor_code = '0a' device_type = deviceid[12:14] long_address = manufactor_code + device_type + deviceid[-6:] self.long_address = long_address
[docs] def pack(self, input_string): """ Turns a string in packed-ascii format """ # This function lacks basic error checking.... klaf = '' for s in input_string: klaf += bin((ord(s) % 128) % 64)[2:].zfill(6) result = '' for i in range(0, 6): result = result + hex(int('' + klaf[i * 8 : i * 8 + 8], 2))[2:].zfill(2) return result
[docs] def crc(self, command): """ Calculate crc value of command """ i = 0 while command[i : i + 2] == 'FF': i += 2 command = command[i:] n = len(command) result = 0 for i in range(0, (n // 2)): byte_string = command[i * 2 : i * 2 + 2] byte = int(byte_string, 16) result = byte ^ result return hex(result)
[docs] def comm(self, command): """ Implements low-level details of the s-protocol """ check = str(self.crc(command)) check = check[2:].zfill(2) final_com = 'FFFFFFFF' + command + check bin_comm = '' for i in range(0, len(final_com) // 2): bin_comm += chr(int(final_com[i * 2 : i * 2 + 2], 16)) bin_comm += chr(0) bytes_for_serial = b(bin_comm) error = 1 while (error > 0) and (error < 10): self.ser.write(bytes_for_serial) time.sleep(0.2) s = self.ser.read(self.ser.inWaiting()) st = '' for i in range(0, len(s)): # char = hex(ord(s[i]))[2:].zfill(2) # char = hex(s[i])[2:].zfill(2) char = hex(indexbytes(s, i))[2:].zfill(2) if not char.upper() == 'FF': st = st + char try: # delimiter = st[0:2] # address = st[2:12] command = st[12:14] byte_count = int(st[14:16], 16) response = st[16 : 16 + 2 * byte_count] error = 0 except ValueError: error = error + 1 response = 'Error' return response
[docs] def read_flow(self): """ Read the current flow-rate """ response = self.comm('82' + self.long_address + '0100') try: # TODO: This should be handled be re-sending command # status_code = response[0:4] unit_code = int(response[4:6], 16) flow_code = response[6:] byte0 = chr(int(flow_code[0:2], 16)) byte1 = chr(int(flow_code[2:4], 16)) byte2 = chr(int(flow_code[4:6], 16)) byte3 = chr(int(flow_code[6:8], 16)) flow = struct.unpack('>f', b(byte0 + byte1 + byte2 + byte3)) value = flow[0] except ValueError: value = -1 unit_code = 171 # Satisfy assertion check, we know what is wrong assert unit_code == 171 # Flow unit should always be mL/min return value
[docs] def read_full_range(self): """ Report the full range of the device Apparantly this does not work for SLA-series... """ response = self.comm('82' + self.long_address + '980106') # Command 152 print(response) # Double check what gas-selection code really means... # currently 01 is used # status_code = response[0:4] unit_code = int(response[4:6], 16) assert unit_code == 171 # Flow controller should always be set to mL/min flow_code = response[6:] byte0 = chr(int(flow_code[0:2], 16)) byte1 = chr(int(flow_code[2:4], 16)) byte2 = chr(int(flow_code[4:6], 16)) byte3 = chr(int(flow_code[6:8], 16)) max_flow = struct.unpack('>f', byte0 + byte1 + byte2 + byte3) return max_flow[0]
[docs] def set_flow(self, flowrate): """ Set the setpoint of the flow """ ieee = struct.pack('>f', flowrate) ieee_flowrate = '' for i in range(0, 4): ieee_flowrate += hex(ord(ieee[i]))[2:].zfill(2) # 39 = unit code for percent # FA = unit code for 'same unit as flowrate measurement' # response = self.comm('82' + self.long_address + # 'ec05' + 'FA' + ieee_flowrate) # status_code = response[0:4] # unit_code = int(response[4:6], 16) return True
if __name__ == '__main__': BROOKS = Brooks('3F2320902001') print(BROOKS.long_address) print(BROOKS.read_full_range()) print(BROOKS.read_flow())