Source code for PyExpLabSys.drivers.omega_cni
# -*- coding: utf-8 -*-
""" This module contains drivers for equipment from Omega. Specifically it
contains a driver for the ??? thermo couple read out unit.
"""
from __future__ import print_function
import time
import logging
import serial
from PyExpLabSys.common.supported_versions import python2_and_3
# Configure logger as library logger and set supported python versions
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)
[docs]class ISeries(object):
"""Driver for the iSeries omega temperature controllers"""
pre_string = chr(42)
end_string = chr(13)
[docs] def __init__(self, port, baudrate=19200, comm_stnd='rs232'):
"""Initialize internal parameters
:param port: A serial port designation as understood by `pySerial
<http://pyserial.sourceforge.net/pyserial_api.html#native-ports>`_
"""
LOGGER.debug('Initialize driver')
self.serial = serial.Serial(
port,
baudrate,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_ODD,
stopbits=serial.STOPBITS_ONE,
timeout=2,
)
self.comm_stnd = comm_stnd
time.sleep(0.1)
LOGGER.info('Driver initialized')
[docs] def command(self, command, response_length=None, address=None):
"""Run a command and return the result
:param command: The command to execute
:type command: str
:param response_length: The expected legth of the response. Will force
the driver to wait untill this many characters is ready as a
response from the device.
:type response_length: int
"""
if address is None and self.comm_stnd == 'rs485':
address = '1'
LOGGER.debug('command called with {}, {}'.format(command, response_length))
if self.comm_stnd == 'rs485':
command = '0' + str(address) + command
comm_string = (self.pre_string + command + self.end_string).encode('ascii')
# print('Comm string: ' + comm_string)
self.serial.write(comm_string)
if response_length is not None:
while self.serial.inWaiting() < response_length + 1:
# If faster replies are needed this can be lowered to e.g. 0.05
time.sleep(0.17)
else:
# If a response length is not given, assume that the command can be
# executed in 0.5 seconds
time.sleep(0.5)
response = self.serial.read(self.serial.inWaiting())
response = response.decode()
if len(response) > 15:
response2 = []
for el in response:
response2.append(ord(el))
# print('RES2: ' + str(response2))
# Strip \r from responseRemove the echo response from the device
LOGGER.debug('comand return {}'.format(response[:-1]))
if response[0 : len(command)] == command:
response = response[len(command) :]
return response[:-1]
[docs] def reset_device(self, address=None):
"""Reset the device"""
command = 'Z02'
print('Reseting device')
return self.command(command, address=address)
[docs] def identify_device(self, address=None):
"""Return the identity of the device"""
command = 'R26'
return self.command(command, address=address)
[docs] def read_temperature(self, address=None):
"""Return the temperature"""
LOGGER.debug('read_temperature called')
command = 'X01'
error = 1
while (error > 0) and (error < 10):
try:
response = float(self.command(command, address=address))
error = 0
except ValueError:
error = error + 1
# print('AAA')
response = None
LOGGER.debug('read_temperature return {}'.format(response))
return response
[docs] def close(self):
"""Close the connection to the device"""
LOGGER.debug('Driver asked to close')
self.serial.close()
LOGGER.info('Driver closed')
[docs]class CNi3244_C24(ISeries):
"""Driver for the CNi3244_C24 device"""
[docs] def __init__(self, port):
"""Initialize internal parameters
:param port: A serial port designation as understood by `pySerial
<http://pyserial.sourceforge.net/pyserial_api.html#native-ports>`_
"""
super(CNi3244_C24, self).__init__(port)
if __name__ == '__main__':
# This port name should be chages to a local port to do a local test
OMEGA = ISeries('/dev/ttyUSB0', 9600, comm_stnd='rs485')
# print(OMEGA.identify_device(1))
# print(OMEGA.identify_device(2))
print(OMEGA.read_temperature())
print(OMEGA.read_temperature(2))