Source code for PyExpLabSys.drivers.scpi

""" Implementation of SCPI standard """
from __future__ import print_function
import time
import logging
import telnetlib
import serial

try:
    import usbtmc
except ImportError:
    usbtmc = None
try:
    import Gpib
except ImportError:
    Gpib = None

LOGGER = logging.getLogger(__name__)
# Make the logger follow the logging setup from the caller
LOGGER.addHandler(logging.NullHandler())


[docs]class SCPI(object): """ Driver for scpi communication """
[docs] def __init__( self, interface, device='', tcp_port=5025, hostname='', baudrate=9600, visa_string='', gpib_address=None, line_ending='\r', ): self.device = device self.line_ending = line_ending self.interface = interface if self.interface == 'file': self.comm_dev = open(self.device, 'w') self.comm_dev.close() if self.interface == 'serial': self.comm_dev = serial.Serial( self.device, baudrate, timeout=2, xonxoff=True ) if self.interface == 'lan': self.comm_dev = telnetlib.Telnet(hostname, tcp_port) if self.interface == 'usbtmc': if usbtmc is None: exit('usbtmc is not availalbe') self.comm_dev = usbtmc.Instrument(visa_string) if self.interface == 'gpib': if Gpib is None: exit('gpib is not availalbe') self.comm_dev = Gpib.Gpib(0, pad=gpib_address)
[docs] def scpi_comm(self, command, expect_return=False): """ Implements actual communication with SCPI instrument """ return_string = "" if self.interface == 'file': self.comm_dev = open(self.device, 'w') self.comm_dev.write(command) time.sleep(0.02) self.comm_dev.close() time.sleep(0.05) if '?' in command: self.comm_dev = open(self.device, 'r') return_string = self.comm_dev.readline() self.comm_dev.close() command_text = command + self.line_ending if self.interface == 'serial': self.comm_dev.write(command_text.encode('ascii')) if command.endswith('?') or (expect_return is True): return_string = ''.encode('ascii') while True: next_char = self.comm_dev.read(1) if ord(next_char) == ord(self.line_ending): break return_string += next_char return_string = return_string.decode() if self.interface == 'lan': lan_time = time.time() self.comm_dev.write(command_text.encode('ascii')) if (command.find('?') > -1) or (expect_return is True): return_string = self.comm_dev.read_until( chr(10).encode('ascii'), 2 ).decode() LOGGER.info('Return string length: ' + str(len(return_string))) LOGGER.info( 'lan_time for coomand ' + command_text.strip() + ': ' + str(time.time() - lan_time) ) if self.interface == 'usbtmc': if command.find('?') > -1: return_string = self.comm_dev.ask(command_text) else: self.comm_dev.write(command_text) return_string = 'command_text' if self.interface == 'gpib': self.comm_dev.write(command_text) if (command.find('?') > -1) or expect_return: return_string = self.comm_dev.read().strip().decode() return return_string
[docs] def read_software_version(self): """ Read version string from device """ version_string = self.scpi_comm("*IDN?") version_string = version_string.strip() return version_string
[docs] def reset_device(self): """ Rest device """ self.scpi_comm("*RST") return True
[docs] def device_clear(self): """ Stop current operation """ self.scpi_comm("*abort") return True
[docs] def clear_error_queue(self): """ Clear error queue """ error = self.scpi_comm("*ESR?") self.scpi_comm("*cls") return error