Source code for PyExpLabSys.drivers.pfeiffer_turbo_pump

"""
Self contained module to run a Pfeiffer turbo pump including fall-back
text gui and data logging.
"""
import time
import curses
import threading
import logging
import serial
from PyExpLabSys.common.supported_versions import python2_and_3

# Configure logger as library logger and set supported python versions
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)


[docs]class CursesTui(threading.Thread): """ Text gui for controlling the pump """
[docs] def __init__(self, turbo_instance): # TODO: Add support for several pumps in one gui threading.Thread.__init__(self) self.quit = False self.turbo = turbo_instance self.screen = curses.initscr() curses.noecho() curses.cbreak() curses.curs_set(False) self.screen.keypad(1) self.screen.nodelay(1)
[docs] def run(self): while not self.quit: self.screen.addstr(3, 2, 'Turbo controller running') # if self.turbo.status['pump_accelerating']: self.screen.addstr( 3, 41, '| Vent mode: ' + self.turbo.status['vent_mode'] + ' ' ) # self.screen.clrtoeol() # else: # self.screen.addstr(3, 30, 'Pump at constant speed') self.screen.addstr( 4, 2, 'Gas mode: ' + self.turbo.status['gas_mode'] + ' ' ) tmp = '| Venting setpoint: {:d}% ' self.screen.addstr(4, 41, tmp.format(self.turbo.status['vent_freq'])) self.screen.addstr(5, 2, 'Acc A1: ' + self.turbo.status['A1'] + ' ') tmp = '| Venting time: {0:.2f} minutes ' self.screen.addstr(5, 41, tmp.format(self.turbo.status['vent_time'])) self.screen.addstr( 6, 2, 'Sealing gas: ' + self.turbo.status['sealing_gas'] + ' ' ) tmp = "Rotation speed: {0:.2f}Hz " self.screen.addstr(8, 2, tmp.format(self.turbo.status['rotation_speed'])) tmp = "Setpoint speed: {0:.2f}Hz " self.screen.addstr( 8, 28, tmp.format(self.turbo.status['set_rotation_speed']) ) tmp = "Drive current: {0:.2f}A " self.screen.addstr(9, 2, tmp.format(self.turbo.status['drive_current'])) tmp = "Drive power: {0:.0f}W " self.screen.addstr(10, 2, tmp.format(self.turbo.status['drive_power'])) tmp = "Temperature, Electronics: {0:.0f}C " self.screen.addstr(12, 2, tmp.format(self.turbo.status['temp_electronics'])) tmp = "Temperature, Bottom: {0:.0f}C " self.screen.addstr(13, 2, tmp.format(self.turbo.status['temp_bottom'])) tmp = "Temperature, Bearings: {0:.0f}C " self.screen.addstr(14, 2, tmp.format(self.turbo.status['temp_bearings'])) tmp = "Temperature, Motor: {0:.0f}C " self.screen.addstr(15, 2, tmp.format(self.turbo.status['temp_motor'])) tmp = "Operating hours: {0:.0f} ({1:.1f}days) " hours = self.turbo.status['operating_hours'] self.screen.addstr(18, 2, tmp.format(hours, hours / 24.0)) tmp = "Driver runtime: {0:.1f}s " self.screen.addstr(19, 2, tmp.format(self.turbo.status['runtime'])) self.screen.addstr(20, 2, 'Port: ' + self.turbo.serial.port) self.screen.addstr(21, 2, 'q: quit, u: spin up, d: spin down') char_num = self.screen.getch() if char_num == ord('q'): self.turbo.running = False self.quit = True self.screen.addstr(2, 2, 'Quitting...') if char_num == ord('d'): self.turbo.status['spin_down'] = True if char_num == ord('u'): self.turbo.status['spin_up'] = True self.screen.refresh() time.sleep(0.2) LOGGER.info('TUI ended')
[docs] def stop(self): """ Cleanup terminal """ curses.nocbreak() self.screen.keypad(False) curses.echo() curses.endwin()
[docs]class TurboReader(threading.Thread): """ Keeps track of all data from a turbo pump with the intend of logging them """
[docs] def __init__(self, turbo_instance): # TODO: Add support for several pumps threading.Thread.__init__(self) self.mal = 20 # Moving average length self.turbo = turbo_instance self.log = {} self.log['rotation_speed'] = {} self.log['rotation_speed']['time'] = 600 self.log['rotation_speed']['change'] = 1.03 self.log['rotation_speed']['mean'] = [0] * self.mal self.log['rotation_speed']['last_recorded_value'] = 0 self.log['rotation_speed']['last_recorded_time'] = 0 self.log['drive_current'] = {} self.log['drive_current']['time'] = 600 self.log['drive_current']['change'] = 1.05 self.log['drive_current']['mean'] = [0] * self.mal self.log['drive_current']['last_recorded_value'] = 0 self.log['drive_current']['last_recorded_time'] = 0 self.log['drive_power'] = {} self.log['drive_power']['time'] = 600 self.log['drive_power']['change'] = 1.05 self.log['drive_power']['mean'] = [0] * self.mal self.log['drive_power']['last_recorded_value'] = 0 self.log['drive_power']['last_recorded_time'] = 0 self.log['temp_motor'] = {} self.log['temp_motor']['time'] = 600 self.log['temp_motor']['change'] = 1.05 self.log['temp_motor']['mean'] = [0] * self.mal self.log['temp_motor']['last_recorded_value'] = 0 self.log['temp_motor']['last_recorded_time'] = 0 self.log['temp_electronics'] = {} self.log['temp_electronics']['time'] = 600 self.log['temp_electronics']['change'] = 1.05 self.log['temp_electronics']['mean'] = [0] * self.mal self.log['temp_electronics']['last_recorded_value'] = 0 self.log['temp_electronics']['last_recorded_time'] = 0 self.log['temp_bottom'] = {} self.log['temp_bottom']['time'] = 600 self.log['temp_bottom']['change'] = 1.05 self.log['temp_bottom']['mean'] = [0] * self.mal self.log['temp_bottom']['last_recorded_value'] = 0 self.log['temp_bottom']['last_recorded_time'] = 0 self.log['temp_bearings'] = {} self.log['temp_bearings']['time'] = 600 self.log['temp_bearings']['change'] = 1.05 self.log['temp_bearings']['mean'] = [0] * self.mal self.log['temp_bearings']['last_recorded_value'] = 0 self.log['temp_bearings']['last_recorded_time'] = 0
[docs] def run(self): for i in range(0, self.mal): time.sleep(0.5) for param in self.log: self.log[param]['mean'][i] = self.turbo.status[param] # Mean values now populated with meaningfull data while True: for i in range(0, self.mal): time.sleep(0.5) for param in self.log: log = self.log[param] log['mean'][i] = self.turbo.status[param]
[docs]class TurboLogger(threading.Thread): """ Read a specific value and determine whether it should be logged """
[docs] def __init__(self, turboreader, parameter, maximumtime=600): threading.Thread.__init__(self) self.turboreader = turboreader self.parameter = parameter self.value = None self.maximumtime = maximumtime self.quit = False self.last_recorded_time = 0 self.last_recorded_value = 0 self.trigged = False
[docs] def read_value(self): """ Read the value of the logger """ return self.value
[docs] def run(self): while not self.quit: time.sleep(2.5) log = self.turboreader.log[self.parameter] mean = sum(log['mean']) / float(len(log['mean'])) self.value = mean time_trigged = (time.time() - self.last_recorded_time) > self.maximumtime val_trigged = not ( self.last_recorded_value * 0.9 < self.value < self.last_recorded_value * 1.1 ) if time_trigged or val_trigged: self.trigged = True self.last_recorded_time = time.time() self.last_recorded_value = self.value
[docs]class TurboDriver(threading.Thread): """ The actual driver that will communicate with the pump """
[docs] def __init__(self, adress=1, port='/dev/ttyUSB0'): threading.Thread.__init__(self) with open('turbo.txt', 'w'): pass logging.basicConfig(filename="turbo.txt", level=logging.INFO) logging.info('Program started.') logging.basicConfig(level=logging.INFO) self.serial = serial.Serial(port, 9600) self.serial.stopbits = 2 self.serial.timeout = 0.1 self.adress = adress self.status = {} # Hold parameters to be accessible by gui self.status['starttime'] = time.time() self.status['runtime'] = 0 self.status['rotation_speed'] = 0 self.status['set_rotation_speed'] = 0 self.status['operating_hours'] = 0 self.status['pump_accelerating'] = False self.status['gas_mode'] = '' self.status['vent_mode'] = '' self.status['A1'] = '' self.status['vent_freq'] = 50 self.status['vent_time'] = 0 self.status['sealing_gas'] = '' self.status['drive_current'] = 0 self.status['drive_power'] = 0 self.status['temp_electronics'] = 0 self.status['temp_bottom'] = 0 self.status['temp_bearings'] = 0 self.status['temp_motor'] = 0 self.status['spin_down'] = False self.status['spin_up'] = False self.running = True
[docs] def comm(self, command, read=True): """Implementaion of the communication protocol with the pump. The function deals with common syntax need for all commands. :param command: The command to send to the pump :type command: str :param read: If True, read only not action performed :type read: Boolean :return: The reply from the pump :rtype: Str """ adress_string = str(self.adress).zfill(3) if read: action = '00' datatype = '=?' length = str(len(datatype)).zfill(2) command = action + command + length + datatype crc = self.crc_calc(adress_string + command) command = adress_string + command + crc + '\r' LOGGER.debug(command) self.serial.write(command.encode('ascii')) response = self.serial.readline() try: length = int(response[8:10]) reply = response[10 : 10 + length] crc = response[10 + length : 10 + length + 3] except ValueError: logging.warn('Value error, unreadable reply') reply = -1 # TODO: Implement real crc check except serial.SerialException: logging.warn('Serial connection problem') reply = -1 return reply
[docs] def crc_calc(self, command): """Helper function to calculate crc for commands :param command: The command for which to calculate crc :type command: str :return: The crc value :rtype: Str """ crc = 0 for character in command: crc += ord(character) crc = crc % 256 crc_string = str(crc).zfill(3) return crc_string
[docs] def read_rotation_speed(self): """Read the rotational speed of the pump :return: The rotaional speed in Hz :rtype: Float """ command = '398' reply = self.comm(command, True) val = int(reply) / 60.0 return val
[docs] def read_set_rotation_speed(self): """Read the intended rotational speed of the pump :return: The intended rotaional speed in Hz :rtype: Int """ command = '308' reply = self.comm(command, True) val = int(reply) return val
[docs] def read_operating_hours(self): """Read the number of operating hours :return: Number of operating hours :rtype: Int """ command = '311' reply = self.comm(command, True) val = int(reply) return val
[docs] def read_gas_mode(self): """Read the gas mode :return: The gas mode :rtype: Str """ command = '027' reply = self.comm(command, True) mode = int(reply) mode_string = '' if mode == 0: mode_string = 'Heavy gasses' if mode == 1: mode_string = 'Light gasses' if mode == 2: mode_string = 'Helium' return mode_string
[docs] def read_vent_mode(self): """Read the venting mode :return: The venting mode :rtype: Str """ command = '030' reply = self.comm(command, True) mode = int(reply) mode_string = '' if mode == 0: mode_string = 'Delayed Venting' if mode == 1: mode_string = 'No Venting' if mode == 2: mode_string = 'Direct Venting' return mode_string
[docs] def read_vent_rotation(self): """Adjust the rotation speed below which the turbo starts venting """ command = '720' reply = self.comm(command, True) val = int(reply) return val
[docs] def read_vent_time(self): """Read the time the venting valve is kept open""" command = '721' reply = self.comm(command, True) val = int(reply) / 60.0 return val
[docs] def read_acc_a1(self): """Read the status of accessory A1""" command = '035' reply = self.comm(command, True) mode = int(reply) mode_string = '' if mode == 0: mode_string = "Fan (continous)" elif mode == 1: mode_string = "Venting valve, normally closed" elif mode == 4: mode_string = "Fan (temp controlled)" else: mode_string = "Mode is: " + str(mode) return mode_string
[docs] def read_sealing_gas(self): """Read whether sealing gas is applied :return: The sealing gas mode :rtype: Str """ command = '050' reply = self.comm(command, True) mode = int(reply) mode_string = '' if mode == 0: mode_string = 'No sealing gas' if mode == 1: mode_string = 'Sealing gas on' return mode_string
[docs] def is_pump_accelerating(self): """Read if pump is accelerating :return: True if pump is accelerating, false if not :rtype: Boolean """ command = '307' reply = self.comm(command, True) return int(reply) == 1
[docs] def turn_pump_on(self, off=False): """Spin the pump up or down :param off: If True the pump will spin down :type off: Boolean :return: Always returns True :rtype: Boolean """ if not off: command = '1001006111111' else: command = '1001006000000' self.comm(command, False) return True
[docs] def read_temperature(self): """Read the various measured temperatures of the pump :return: Dictionary with temperatures :rtype: Dict """ command = '326' reply = self.comm(command, True) elec = int(reply) command = '330' reply = self.comm(command, True) bottom = int(reply) command = '342' reply = self.comm(command, True) bearings = int(reply) command = '346' reply = self.comm(command, True) motor = int(reply) return_val = {} return_val['elec'] = elec return_val['bottom'] = bottom return_val['bearings'] = bearings return_val['motor'] = motor return return_val
[docs] def read_drive_power(self): """Read the current power consumption of the pump :return: Dictionary containing voltage, current and power :rtype: Dict """ command = '310' reply = self.comm(command, True) current = int(reply) / 100.0 command = '313' reply = self.comm(command, True) voltage = int(reply) / 100.0 command = '316' reply = self.comm(command, True) power = int(reply) return_val = {} return_val['voltage'] = voltage return_val['current'] = current return_val['power'] = power return return_val
[docs] def run(self): round_robin_counter = 0 while self.running: # time.sleep(0.1) self.status['runtime'] = time.time() - self.status['starttime'] self.status['rotation_speed'] = self.read_rotation_speed() power = self.read_drive_power() self.status['drive_current'] = power['current'] self.status['drive_power'] = power['power'] if round_robin_counter == 0: temp = self.read_temperature() self.status['temp_electronics'] = temp['elec'] self.status['temp_bottom'] = temp['bottom'] self.status['temp_bearings'] = temp['bearings'] self.status['temp_motor'] = temp['motor'] if round_robin_counter == 1: self.status['pump_accelerating'] = self.is_pump_accelerating() self.status['set_rotation_speed'] = self.read_set_rotation_speed() self.status['gas_mode'] = self.read_gas_mode() self.status['vent_mode'] = self.read_vent_mode() self.status['A1'] = self.read_acc_a1() self.status['vent_freq'] = self.read_vent_rotation() self.status['vent_time'] = self.read_vent_time() self.status['sealing_gas'] = self.read_sealing_gas() self.status['operating_hours'] = self.read_operating_hours() round_robin_counter += 1 round_robin_counter = round_robin_counter % 2 if self.status['spin_up']: self.turn_pump_on() self.status['spin_up'] = False if self.status['spin_down']: self.turn_pump_on(off=True) self.status['spin_down'] = False
if __name__ == '__main__': # Initialize communication with the turbo TURBO = TurboDriver() TURBO.start() # Start the user interface TUI = CursesTui(TURBO) TUI.start() try: while not TUI.quit: time.sleep(1) except KeyboardInterrupt: LOGGER.info("Program interrupted by user") finally: TUI.stop()