Source code for PyExpLabSys.drivers.omega_D6400

""" Driver for Omega D6400 daq card """
from __future__ import print_function
import time
import logging
import minimalmodbus
from PyExpLabSys.common.supported_versions import python2_and_3

python2_and_3(__file__)

LOGGER = logging.getLogger(__name__)
# Make the logger follow the logging setup from the caller
LOGGER.addHandler(logging.NullHandler())


[docs]class OmegaD6400(object): """ Driver for Omega D6400 daq card """
[docs] def __init__(self, address=1, port='/dev/ttyUSB0'): self.instrument = minimalmodbus.Instrument(port, address) self.instrument.serial.baudrate = 9600 self.instrument.serial.timeout = 1.0 # Default setting leads to comm-errors print(self.instrument.serial) self.ranges = [0] * 8 for i in range(1, 8): self.ranges[i] = {} self.ranges[i]['action'] = 'disable' self.ranges[i]['fullrange'] = '0' self.ranges[1]['action'] = 'voltage' self.ranges[1]['fullrange'] = '10' for i in range(1, 8): print(i) self.update_range_and_function( i, fullrange=self.ranges[i]['fullrange'], action=self.ranges[i]['action'], ) print('!')
[docs] def comm(self, command, value=None): """ Communicates with the device """ reply = None error = True while error is True: try: if value is None: reply = self.instrument.read_register(command) else: self.instrument.write_register(command, value) error = False except ValueError: LOGGER.warning('D6400 driver: Value Error') self.instrument.serial.read(self.instrument.serial.inWaiting()) time.sleep(0.1) error = True except IOError: LOGGER.warning('D6400 driver: IOError') self.instrument.serial.read(self.instrument.serial.inWaiting()) error = True time.sleep(0.1) return reply
[docs] def read_value(self, channel): """ Read a measurement value from a channel """ value = None reply = self.comm(47 + channel) if self.ranges[channel]['action'] == 'voltage': num_value = reply - 2 ** 15 scale = 1.0 * 2 ** 15 / float(self.ranges[channel]['fullrange']) value = num_value / scale if self.ranges[channel]['action'] == 'tc': scale = 1.0 * 2 ** 16 / 1400 value = (reply / scale) - 150 return value
[docs] def read_address(self): """ Read the RS485 address of the device """ old_address = self.comm(0) return old_address
[docs] def write_enable(self): """ Enable changes to setup values """ self.comm(240, 2) time.sleep(0.8) return True
[docs] def range_codes(self, fullrange=0, action=None): """Returns the code corresponding to a given range""" codes = {} codes['tc'] = {} codes['tc']['J'] = 21 codes['tc']['K'] = 34 codes['tc']['T'] = 23 codes['tc']['E'] = 24 codes['tc']['R'] = 25 codes['tc']['S'] = 26 codes['tc']['B'] = 27 codes['tc']['C'] = 28 codes['voltage'] = {} codes['voltage']['10'] = 1 codes['voltage']['5'] = 2 codes['voltage']['1'] = 3 codes['voltage']['0.1'] = 4 codes['voltage']['0.05'] = 5 codes['voltage']['0.025'] = 6 codes['disable'] = 0 codes['current'] = 3 if action in ('tc', 'voltage'): code = codes[action][fullrange] if action in ('disable', 'current'): code = codes[action] return code
[docs] def update_range_and_function(self, channel, fullrange=None, action=None): """ Set the range and measurement type for a channel """ if not action is None: self.write_enable() code = self.range_codes(fullrange, action) self.comm(95 + channel, code) print('##') time.sleep(0.1) self.ranges[channel]['action'] = action self.ranges[channel]['fullrange'] = fullrange return self.comm(95 + channel)
if __name__ == '__main__': OMEGA = OmegaD6400(1, port='/dev/ttyUSB0') OMEGA.update_range_and_function(1, action='voltage', fullrange='10') OMEGA.update_range_and_function(2, action='voltage', fullrange='10') print('***') print(OMEGA.read_value(1)) print(OMEGA.read_value(2))