Source code for PyExpLabSys.drivers.specs_XRC1000

# pylint: disable=C0301,R0904, C0103

"""
Self contained module to run a SPECS sputter gun including fall-back text gui
"""

import serial
import time
import threading
import curses
import socket
import json

EXCEPTION = None
log = open('error_log.txt', 'w')


[docs]class CursesTui(threading.Thread): """ Defines a fallback text-gui for the source control. """
[docs] def __init__(self, sourcecontrol): threading.Thread.__init__(self) self.sc = sourcecontrol self.screen = curses.initscr() curses.noecho() curses.cbreak() curses.curs_set(False) self.screen.keypad(1) self.screen.nodelay(1) self.time = time.time() self.countdown = False self.last_key = None self.running = True
[docs] def run(self): while self.running: self.screen.addstr( 3, 2, 'X-ray Source Control, ID: ' + str(self.sc.status['ID']) ) if self.sc.status['degas']: self.screen.addstr(4, 2, "Degassing") if self.sc.status['remote']: self.screen.addstr(5, 2, "Control mode: Remote") else: self.screen.addstr(5, 2, "Control mode: Local") if self.sc.status['standby']: self.screen.addstr(6, 2, "Device status, Standby: ON ") else: self.screen.addstr(6, 2, "Device status, Standby: OFF ") if self.sc.status['hv']: self.screen.addstr(7, 2, "Device status: HV ON ") else: self.screen.addstr(7, 2, "Device status: HV OFF ") if self.sc.status['operate']: self.screen.addstr(8, 2, "Device status: Operate ON ") else: self.screen.addstr(8, 2, "Device status, Operate: OFF ") # if self.sc.status['error'] != None: # self.screen.addstr(9, 2, "Error: " + str(self.sc.status['error'])) try: self.screen.addstr( 10, 2, "Filament bias: {0:.3f}V ".format( self.sc.status['filament_bias'] ), ) self.screen.addstr( 11, 2, "Filament Current: {0:.2f}A ".format( self.sc.status['filament_current'] ), ) self.screen.addstr( 12, 2, "Filament Power: {0:.2f}W ".format( self.sc.status['filament_power'] ), ) self.screen.addstr( 13, 2, "Emission Current: {0:.4f}A ".format( self.sc.status['emission_current'] ), ) self.screen.addstr( 14, 2, "Anode Voltage: {0:.2f}V ".format( self.sc.status['anode_voltage'] ), ) self.screen.addstr( 15, 2, "Anode Power: {0:.2f}W ".format( self.sc.status['anode_power'] ), ) self.screen.addstr( 16, 2, "Water flow: {0:.2f}L/min ".format( self.sc.status['water_flow'] ), ) except Exception as exception: global EXCEPTION EXCEPTION = exception # self.screen.addstr(10,2, exception.message) self.screen.addstr(10, 2, "Filament bias: - ") self.screen.addstr( 11, 2, "Filament Current: - " ) self.screen.addstr( 12, 2, "Filament Power: - " ) self.screen.addstr( 13, 2, "Emission Current: - " ) self.screen.addstr(14, 2, "Anode Voltage: - ") self.screen.addstr(15, 2, "Anode Power: - ") self.screen.addstr(16, 2, "water flow: - ") if self.sc.status['error'] != None: self.screen.addstr( 18, 2, "Latest error message: " + str(self.sc.status['error']) + " at time: " + str(self.sc.status['error time']), ) self.screen.addstr( 19, 2, "Runtime: {0:.0f}s ".format(time.time() - self.time) ) if self.countdown: self.screen.addstr( 18, 2, "Time until shutdown: {0:.0f}s ".format( self.countdown_end_time - time.time() ), ) if time.time() > self.countdown_end_time: self.sc.goto_off = True self.countdown = False self.screen.addstr( 21, 2, 'q: quit program, s: standby, o: operate, c: cooling, x: shutdown gun', ) self.screen.addstr(22, 2, ' 3: shutdown in 3h, r: change to remote') self.screen.addstr(24, 2, ' Latest key: ' + str(self.last_key)) n = self.screen.getch() if n == ord('q'): self.sc.running = False self.running = False self.last_key = chr(n) elif n == ord('s'): self.sc.goto_standby = True self.last_key = chr(n) elif n == ord('o'): self.sc.goto_operate = True self.last_key = chr(n) elif n == ord('c'): self.sc.goto_cooling = True self.last_key = chr(n) elif n == ord('x'): self.sc.goto_off = True self.last_key = chr(n) elif n == ord('r'): self.sc.goto_remote = True self.last_key = chr(n) elif n == ord('3'): self.countdown = True self.countdown_end_time = float(time.time() + 3 * 3600.0) # second self.last_key = chr(n) # disable s o key # if n == ord('s'): # self.sc.goto_standby = True # if n == ord('o'): # self.sc.goto_operate = True self.screen.refresh() time.sleep(1) time.sleep(5) self.stop() print(EXCEPTION)
[docs] def stop(self): """ Cleanup the terminal """ curses.nocbreak() self.screen.keypad(0) curses.echo() curses.endwin()
[docs]class XRC1000(threading.Thread): """ Driver for X-ray Source Control - XRC 1000"""
[docs] def __init__(self, port=None): """Initialize module Establish serial connection and create status variable to expose the status for the instrument for the various gui's """ threading.Thread.__init__(self) if port == None: port = '/dev/ttyUSB0' self.status = {} # Hold parameters to be accecible by gui self.status['hv'] = None self.status['standby'] = None self.status['operate'] = None self.status['degas'] = None self.status['remote'] = None self.status['error'] = None self.status['error time'] = None self.status['start time'] = time.time() self.status['cooling'] = None self.status['off'] = None self.status['sputter_current'] = None self.status['filament_bias'] = None self.status['filament_current'] = None self.status['filament_power'] = None self.status['emission_current'] = None self.status['anode_voltage'] = None self.status['anode_power'] = None self.status['water_flow'] = None self.status['ID'] = None self.running = True self.goto_standby = False self.goto_operate = False self.goto_off = False self.goto_cooling = False self.goto_remote = False self.simulate = False # self.update_status() self.list_of_errors = [] self.list_of_errors += ['>E251: Remote Locked !\n'] self.list_of_errors += ['>E250: Not in Remote !\n'] self.list_of_errors += ['>E251: Misplaced Query !\n'] self.list_of_errors += ['>E251: Argument missing !\n'] self.list_of_errors += ['>E251: Value to big or to low !\n'] self.list_of_errors += ['>E251: Parameter unknown !\n'] self.list_of_errors += ['>E251: Command not found !\n'] self.list_of_errors += ['>E251: Unexpected Error code !\n'] self.get_commands = [ 'REM?', 'IEM?', 'UAN?', 'IHV?', 'IFI?', 'UFI?', 'PAN?', 'SERNO?', 'ANO?', 'STAT?', 'OPE?', ] # self.simulate = simulate self.f = serial.Serial(port, 9600, timeout=0.25) # baud: 9600, bits: 8, parity: None return_string = self.comm('SERNO?') self.status['ID'] = return_string if self.status['ID'] == '000003AADEBD28': pass else: print('Error SERIAL Number: ' + return_string) print(len(return_string)) print(len('SERNO:000003AADEBD28\n>')) for el in return_string: print(ord(el)) self.get_status() if self.status['remote'] == False: self.remote_enable() self.get_status() self.init_socket()
def init_socket( self, ): self.address_port = ('rasppi00', 9000) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
[docs] def comm(self, command): """Communication with the instrument Implements the synatx need to send commands to instrument including handling carrige returns and extra lines of 'OK' and other pecularities of the instrument. :param command: The command to send :type command: str :return: The reply to the command striped for protocol technicalities :rtype: str posible comands: REM?, IEM?, UAN?, IHV?, IFI?, UFI?, PAN?, SERNO?, ANO?, STAT?, OPE? REM, LOC, IEM 20e-3, UAN 10e3, OFF, COOL, STAN, UAON, OPE, ANO 1, ANO 2 """ n = self.f.inWaiting() if n > 1: print('Error') reply = self.f.read(n) self.status['error'] = 'n = ' + str(n) + ' ' + str(reply) else: self.f.read(n) self.f.write(command + '\r') time.sleep(0.2) reply = self.f.readline() # if '>' in reply[0]: # sanity character # #print 'Valid command' # pass # else: # print 'None valid command/reply' # print reply if command in self.get_commands and ':' in reply: echo, value = reply.split(':') return_string = value.strip() # get value from space to -2 # posible answer to 'UAN?' true echo # '>UAN: 12.00e3\n' # posible answer to 'OPE?' non true echo # '>OPERATE: 4.000\n' # return_string = reply else: return_string = True return return_string
def direct_comm(self, command): self.f.write(command + '\r') time.sleep(0.2) reply = self.f.readline() return_string = reply return return_string
[docs] def read_water_flow( self, ): """read the water flow from external hardware :return: water flow in L/min :rtype float """ try: self.sock.sendto('stm312_xray_waterflow#json', self.address_port) answer = self.sock.recvfrom(1024) water_flow_time, water_flow = json.loads(answer[0]) except: water_flow = -1.0 return water_flow
[docs] def read_emission_current(self): # need testing """Read the emission current. Unit A :return: The emission current :rtype: float """ reply = self.comm('IEM?') # 'IEM 20e-3\r' # print(reply) try: value = float(reply) / 1.0 except ValueError: self.status['error'] = reply value = None return value
[docs] def read_filament_voltage(self): # need testing """Read the filament voltage. Unit V :return: The filament voltage :rtype: float """ reply = self.comm('UFI?') try: value = float(reply) / 1.0 except ValueError: self.status['error'] = reply value = None return value
[docs] def read_filament_current(self): # need testing """Read the filament current. Unit A :return: The filament current :rtype: float """ reply = self.comm('IFI?') try: value = float(reply) / 1.0 except ValueError: self.status['error'] = reply value = None return value
# def read_emission_current(self): # """ Read the emission current. Unit mA # :return: The emission current # :rtype: float # """ # reply = self.comm('ec?') # try: # value = float(reply) / 1000 # except ValueError: # self.status['error'] = reply # value = None # return(value)
[docs] def read_anode_voltage(self): # need testing """Read the anode voltage. Unit V :return: The anode voltage :rtype: float """ reply = self.comm('UAN?') try: value = float(reply) / 1.0 except ValueError: self.status['error'] = reply value = None return value
[docs] def read_anode_power(self): # need testing """Read the anode voltage. Unit W :return: The anode voltage :rtype: float """ reply = self.comm('PAN?') try: value = float(reply) / 1.0 except ValueError: self.status['error'] = reply value = None return value
[docs] def standby(self): # need testing """Set the device on standby The function is not working entirely as intended. TODO: Implement check to see if the device is alrady in standby :return: The direct reply from the device :rtype: str """ # self.direct_comm('ANO 2') reply = self.comm('STAN') time.sleep(1) self.update_status() return reply
[docs] def operate(self): # need testing """Set the device in operation mode TODO: This function should only be activated from standby!!! :return: The direct reply from the device :rtype: str """ # self.comm('ANO 2') # time.sleep(1) self.comm('UAN 12e3') time.sleep(1) self.comm('IEM 20e-3') time.sleep(1) reply = self.comm('OPE') self.update_status() return reply
[docs] def remote_enable(self, local=False): # need testing """Enable or disable remote mode :param local: If True the device is set to local, otherwise to remote :type local: Boolean :return: The direct reply from the device :rtype: str """ if local: reply = self.comm('LOC') else: reply = self.comm('REM') time.sleep(1) self.update_status() return reply
[docs] def change_control(self): # need testing """Enable or disable remote mode :param local: If True the device is set to local, otherwise to remote :type local: Boolean :return: The direct reply from the device :rtype: str """ if self.status['remote']: reply = self.comm('LOC') else: reply = self.comm('REM') time.sleep(1) self.update_status() return reply
[docs] def cooling(self): """Enable or disable water cooling :type local: Boolean :return: The direct reply from the device :rtype: str """ reply = self.comm('COOL') self.update_status() return reply
def get_status(self): reply = self.comm('STAT?') if reply[0:2] == '00': self.status['remote'] = False if reply[0:2] == '02': self.status['remote'] = True if reply[2:4] == '00': self.status['off'] = True self.status['cooling'] = False self.status['standby'] = False self.status['hv'] = False self.status['operate'] = False if reply[2:4] == '01': self.status['off'] = False self.status['cooling'] = True self.status['standby'] = False self.status['hv'] = False self.status['operate'] = False if reply[2:4] == '02': self.status['off'] = False self.status['cooling'] = False self.status['standby'] = True self.status['hv'] = False self.status['operate'] = False if reply[2:4] == '03': self.status['off'] = False self.status['cooling'] = False self.status['standby'] = False self.status['hv'] = True self.status['operate'] = False if reply[2:4] == '04': self.status['off'] = False self.status['cooling'] = False self.status['standby'] = False self.status['hv'] = False self.status['operate'] = True if reply[4:6] == '00': self.status['error'] = False else: if self.status['error'] == False: self.status['error time'] = time.time() - self.status['start time'] self.status['error'] = reply[4:6] # error_bin = bin(int(reply[4:5])) # if error_bin[0:1] == '' def interlocks(self): self.status['interlocks'] = { 'Failure': True, 'HVLock': True, 'Vacuum': True, 'Water': True, 'ERR_ILIM': True, 'ERR_TOUT': True, } def automated_operate(self): self.direct_comm('ANO 1') self.direct_comm('STAN') self.direct_comm('UAON') self.direct_comm('UAN 12e3') # 12kV self.direct_comm('OPE') wait = True n = 0 while wait: if self.direct_comm('UAN?') == '>UAN: 12.00e3\n': wait = False elif n > 5: wait = False else: n += 1 time.sleep(5) self.direct_comm('IEM 20e-3') # 20mA wait = True n = 0 while wait: if self.direct_comm('IEM?') == '>IEM: 20.06e-3\n': wait = False elif n > 5: wait = False else: n += 1 time.sleep(5) return True def turn_off(self): self.update_status() if self.status['operate']: self.comm('UAON') time.sleep(2) self.update_status() if self.status['hv']: self.comm('STAN') time.sleep(2) self.update_status() if self.status['standby']: self.comm('OFF') self.update_status() # Update key parameters return True
[docs] def update_status(self): # not done """Update the status of the instrument Runs a number of status queries and updates self.status :return: The direct reply from the device :rtype: str """ # self.status['temperature'] = self.read_temperature_energy_module() self.status['filament_bias'] = self.read_filament_voltage() self.status['filament_current'] = self.read_filament_current() self.status['filament_power'] = ( self.status['filament_bias'] * self.status['filament_current'] ) self.status['emission_current'] = self.read_emission_current() self.status['anode_voltage'] = self.read_anode_voltage() self.status['anode_power'] = self.read_anode_power() self.status['water_flow'] = self.read_water_flow() # log.write(str(self.status) + '\n') self.get_status() return True
[docs] def run(self): while self.running: time.sleep(0.5) self.update_status() if self.goto_standby: self.standby() self.goto_operate = False self.goto_standby = False if self.goto_operate: self.operate() self.goto_operate = False if self.goto_off: self.turn_off() self.goto_off = False if self.goto_cooling: self.cooling() self.goto_cooling = False if self.goto_remote: self.remote_enable(local=self.status['remote']) self.goto_remote = False
if __name__ == '__main__': sc = XRC1000(port='/dev/serial/by-id/usb-1a86_USB2.0-Ser_-if00-port0') # print sc.read_emission_current() # print sc.read_filament_voltage() # print sc.read_filament_current() # print sc.read_anode_voltage() # print sc.read_anode_power() # command_list=['REM?', 'IEM?', 'UAN?', 'IHV?', 'IFI?', 'UFI?', 'PAN?', 'SERNO?', 'ANO?', 'STAT?', 'OPE?'] # for command in command_list: # print(str(command) + ' : ' + str(sc.direct_comm(command))) sc.start() tui = CursesTui(sc) tui.daemon = True tui.start() # print('Temperature: ' + str(sputter.read_temperature_energy_module())) # print('Sputter current: ' + str(sputter.read_sputter_current())) # print('Temperature: ' + str(sputter.read_temperature_energy_module())) # print('Filament voltage: ' + str(sc.read_filament_voltage())) # print('Filament current: ' + str(sc.read_filament_current())) # print('Emission current: ' + str(sc.read_emission_current()) + 'A') # print('Anode voltage: ' + str(sc.read_anode_voltage())) # print('Anode power: ' + str(sc.read_anode_power()) + 'W') # sputter.update_status() # print('Enable:') # print(sputter.remote_enable(local=False)) # print('Status:') # print(sputter.status)