Source code for PyExpLabSys.drivers.omegabus

""" Driver for OmegaBus devices """
from __future__ import print_function
import time
import logging
import serial
from PyExpLabSys.common.supported_versions import python2_and_3

# Configure logger as library logger and set supported python versions
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)


[docs]class OmegaBus(object): """ Driver for OmegaBus devices """
[docs] def __init__(self, device='/dev/ttyUSB0', model='D5251', baud=300): self.ser = serial.Serial(device, baud) self.setup = {} self.setup['model'] = model self.read_setup() # Read temperature unit, if relevant time.sleep(0.1)
[docs] def comm(self, command): """ Handles serial protocol """ command = command + "\r" command = command.encode('ascii') self.ser.write(command) time.sleep(1) answer = self.ser.read(self.ser.inWaiting()) answer = answer.decode() return answer
[docs] def read_value(self, channel, convert_to_celcius=True): """ Read the measurement value """ value_string = self.comm("$" + str(channel) + "RD") # The value string is after the * if '*' in value_string: value_string = value_string.split('*', 1)[1] value = float(value_string) if convert_to_celcius and self.setup['model'] in [ 'D5311', 'D5321', 'D5331', 'D5431', ]: if self.setup['temp_unit'] == 'F': value = 5 * (value - 32) / 9 return value
[docs] def read_max(self, channel): """ The maximum read-out value """ temp_string = self.comm("$" + str(channel) + "RMX") if temp_string[1] == "*": temp_string = temp_string[3:] return float(temp_string)
[docs] def read_min(self, channel): """ The minimum read-out value """ temp_string = self.comm("$" + str(channel) + "RMN") if temp_string[1] == "*": temp_string = temp_string[2:] return float(temp_string)
[docs] def read_setup(self): """ Read Device setup information """ rs_string = self.comm("$" + "1RS") if '*' in rs_string: rs_string = rs_string.split('*', 1)[1] byte1 = rs_string[0:2] byte2 = rs_string[2:4] byte3 = rs_string[4:6] # byte4 = rs_string[6:8] setupstring = "" setupstring += "Base adress: " + chr(int(byte1, 16)) + "\n" bits_2 = (bin(int(byte2, 16))[2:]).zfill(8) setupstring += "No linefeed\n" if bits_2[0] == '0' else "Linefeed\n" if bits_2[2] == '0': # bits_2[1] will contain the parity if not none setupstring += "Parity: None" + "\n" setupstring += ( "Normal addressing\n" if bits_2[3] == '0' else "Extended addressing\n" ) if bits_2[4:8] == '0010': setupstring += "Baud rate: 9600" + "\n" bits_3 = (bin(int(byte3, 16))[2:]).zfill(8) setupstring += ( "Channel 3 enabled\n" if bits_3[0] == '1' else "Channel 3 disabled\n" ) setupstring += ( "Channel 2 enabled\n" if bits_3[1] == '1' else "Channel 2 disabled\n" ) setupstring += ( "Channel 1 enabled\n" if bits_3[2] == '1' else "Channel 1 disabled\n" ) if bits_3[3] == '1': setupstring += "No cold junction compensation\n" else: setupstring += "Cold junction compensation enabled\n" setupstring += "Unit: Fahrenheit\n" if bits_3[4] == '1' else "Unit: Celsius\n" if bits_3[4] == '1': self.setup['temp_unit'] = 'F' else: self.setup['temp_unit'] = 'C' # print (bin(int(byte4,16))[2:]).zfill(8 return setupstring
if __name__ == "__main__": OMEGA = OmegaBus(model='D5251') print(OMEGA.read_setup()) print(OMEGA.read_value(1)) print(OMEGA.read_value(2)) print(OMEGA.read_value(3)) print(OMEGA.read_value(4)) # print(OMEGA.read_min(1)) # print(OMEGA.read_max(1))