Source code for PyExpLabSys.drivers.dataq_binary

# pylint: disable=no-member

"""DataQ Binary protocol driver

To get started using the one of the supported DataQ data cards, use one of the
sub-classes of DataQBinary e.g. DI1110, set scan list and start:

.. code-block:: python

   # Instantiate dataq object
   dataq = DI1110('/dev/ttyUSB0')

   # Get all available information from the data cards
   print(repr(dataq.info()))

   # Set sample rate frequence
   dataq.sample_rate(1000)

   # Set scan list (meaure on channel 1 first, then 0 and finnaly 2)
   dataq.scan_list([1, 0, 2])

   dataq.clear_buffer()
   dataq.start()
   sleep(0.1)
   from pprint import pprint
   try:
       while True:
           pprint(dataq.read())
           sleep(0.5)
   except KeyboardInterrupt:
       dataq.stop()
   else:
       dataq.stop()
   dataq.clear_buffer()

If the data card is being used on the limit of emptying the data buffer before it
overflow, it might be useful to put the calls to ``read`` in a try except:

.. code-block:: python

   while True:
       try:
           dataq.read()
       except dataq_binary.BufferOverflow:
           # Re-start i.e. stop and start

or, to simple start and stop the card for a short amount of time, if slow measurements, which would otherwise fill buffer are required:

.. code-block:: python

   while True:
       dataq.start()
       sleep(0.1)
       dataq.read()
       dataq.stop()
       # We really only need to measure every 10s
       sleep(10)

On some Linux system, at the end of 2017, some models, e.g. the DI-1110 wasn't
automatically mounted. In that case, it can be manually mounted with a command
like this one:

.. code-block:: bash

   sudo modprobe usbserial vendor=0x0683 product=0x1110

On should be possible, on Debian based Linux systems to add an automatic mount
rule along the lines of this thread:
https://askubuntu.com/questions/525016/cant-open-port-dev-ttyusb0

So, add a new udev rule /etc/udev/rules.d/99-dataq_di1110.rules

with this content:

.. code-block:: text

   # /etc/udev/rules.d/99-dataq_di1110.rules
   # contains DataQ DI-1110 udev rule to patch default
   # rules
   SYSFS{idProduct}=="1110",
   SYSFS{idVendor}=="0683",
   RUN+="/sbin/modprobe -q usbserial product=0x1110 vendor=0x0683"

and aftwerwards run: sudo udevadm control --reload-rules

"""

from __future__ import print_function

from time import sleep
import logging

import serial
import numpy

from PyExpLabSys.common.supported_versions import python2_and_3

# Configure logger as library logger and set supported python versions
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
python2_and_3(__file__)


[docs]class BufferOverflow(Exception): """Custom exception to indicate a buffer overflow""" pass
[docs]class DataQBinary(object): """Base class for DataQBinary driver""" #: Serial communication end character end_char = '\r' #: Wait time between serial write and read read_wait_time = 0.001 #: Information items that can be retrieved along with their code number infos = { 'vendor': 0, 'device name': 1, 'firmware revision': 2, 'serial number': 6, 'sample rate divisor': 9, } #: Supported LED colors along with the code number led_colors = { 'black': 0, 'blue': 1, 'green': 2, 'cyan': 3, 'red': 4, 'magenta': 5, 'yellow': 6, 'white': 7, } #: Buffer overflow size, may be overwritte in sub classes buffer_overflow_size = 4095 #: Packet sizes and code numbers packet_sizes = { 16: 0, 32: 1, 64: 2, 128: 3, 256: 4, 512: 5, 1024: 6, 2048: 7, }
[docs] def __init__(self, device, serial_kwargs=None): """Initialize local variables Args: device (str): The device e.g: '/dev/ttyUSB0' of 'COM1' serial_kwargs (dict): dict of keyword arguments for serial.Serial """ if serial_kwargs is None: serial_kwargs = {} self.serial = serial.Serial(device, **serial_kwargs) # Stop any active acquisition and clear buffer self.stop() self.clear_buffer() # Initialize variables self.expect_echo = True self._scan_list = [0] self._position_in_scanlist = 0 self._sample_rate_divisor = int(self.info('sample rate divisor'))
[docs] def clear_buffer(self): """Clear the buffer""" sleep(0.5) while self.serial.inWaiting() > 0: self.serial.read(self.serial.inWaiting())
def _comm(self, arg, read_reply_policy='read'): """Execute command and return reply""" # Form command and send command = arg + self.end_char self.serial.write(command.encode()) LOGGER.debug("cmd: %s", command) if read_reply_policy == 'dont_read': return '' # Read until reply end string return_string = '' while return_string == '' or not return_string.endswith(self.end_char): sleep(self.read_wait_time) bytes_waiting = self.serial.inWaiting() bytes_read = self.serial.read(bytes_waiting) # For the stop command, the reply might have a bit a data # prefixed, so just don't interpret the reply if read_reply_policy == 'dont_interpret': return '' return_string += bytes_read.decode().rstrip('\x00') LOGGER.debug("reply: %s", return_string) # Strip echo if present if self.expect_echo and return_string.startswith(arg): return_string = return_string.replace(arg + ' ', '', 1) # And finally end_char when we return the result return return_string.strip(self.end_char)
[docs] def info(self, info_name='all'): """Return information about the device Args: info_name (str): Name of the requested information item(s). If info_name is one the specific info names, (the keys in DataQBinary.infos), a string will be returned with the value. If info_name is 'all', all values will be returned in a dict Returns: str or dict: Information items """ if info_name == 'all': out = {} for name, number in self.infos.items(): command = 'info {}'.format(number) out[name] = self._comm(command) return out if info_name not in self.infos: msg = 'Invalid info_name. Valid value are: {} and "all"' raise ValueError(msg.format(self.infos.keys())) command = 'info {}'.format(self.infos[info_name]) return self._comm(command)
[docs] def start(self): """Start data acquisition""" self.expect_echo = False self._comm('start 0', read_reply_policy='dont_read')
[docs] def stop(self): """Stop data acquisition This also implies clearing the buffer of any remaining data """ self.expect_echo = True self._comm('stop', read_reply_policy='dont_interpret') # This shouldn't be necessary self.clear_buffer()
[docs] def scan_list(self, scan_list): """Set the scan list The scan list is the list of inputs to acquire from on the data card. The scan list can hold up to 11 items, since there are a total on 11 inputs and each element can only be there once. The analogue input channel are numbered 0-7, 8 is the counter channel, 9 is the rate channel and 10 is the general purpose input channel. 0.7 are specified only by their number, 8, 9 and 10 are configured specially, which is not described here yet. Args: scan_list (list): Etc. [3, 5, 0] for analogue input chanel 3, 5 and 0. NOTE: The numbers are integers, not strings. """ LOGGER.debug("Set scan list: %s", scan_list) # Check for valid scan list configuratio for scan_list_configuration in scan_list: if scan_list_configuration not in range(8): msg = 'Only scan list arguments 0-7 are supported' raise ValueError(msg) # Check for duplicate entries if len(set(scan_list)) != len(scan_list): msg = 'The scan list is not allowed to have duplicate entries' raise ValueError(msg) # Send the configuration for slot_number, configuration in enumerate(scan_list): command = 'slist {} {}'.format(slot_number, configuration) self._comm(command) self._scan_list = scan_list
[docs] def sample_rate(self, rate): """Set the sample rate The value values are calculated as being in the range of sample rate divisor / 375 to sample rate divisor / 65535 So e.g. for the DI-1110 product, with a sample rate divisor of 60,000,000, the valid inputs are in range from 915.5413 to 160000. Args: rate (float): The sample rate given in number of elements in the scan list sampled per second (i.e. in Hz). Valid values depend on the model and is given by the "sample rate divisor" information item (see the info method). See information about how to calculate the valid input range above. """ rate_for_command = int(self._sample_rate_divisor / float(rate)) # Coerce in valid range rate_for_command = min(max(375, rate_for_command), 65535) self._comm('srate {}'.format(rate_for_command))
[docs] def packet_size(self, size): """Set the packet size The packet size is he amount of data acquired before it is placed in the read buffer. The available packet sizes are the keys in packet_sizes class variable. Args: size (int): The requested packet size """ if size not in self.packet_sizes: msg = 'Invalid packet size. Available values are: {}' raise ValueError(msg.format(self.packet_sizes.keys())) packet_size_number = self.packet_sizes[size] command = 'ps {}'.format(packet_size_number) self._comm(command)
[docs] def led_color(self, color): """Set the LED color Args: color (str): The available colors are the keys in the led_colors class variable """ if color not in self.led_colors: msg = 'Invalid color. Available colors are: {}' raise ValueError(msg.format(self.led_colors.keys())) color_number = self.led_colors[color] command = 'led {}'.format(color_number) self._comm(command)
[docs] def read(self): """Read all values waiting This method reads all available damples from the data card and returns for every channel the mean of those samples. The returned data is on the form of a list with one item for each item in the scan list and in the same order. Each of the items is in them selves a dict, which holds the mean value of the samples, the number of samples in the mean, the channel number (0-based) and information about how full the data buffer was, at the time when it was read out. An example could be: .. code-block:: python [{'buffer_status': '3040/4095 bytes', 'channel': 1, 'samples': 506, 'value': -1.6224543756175889}, {'buffer_status': '3040/4095 bytes', 'channel': 0, 'samples': 507, 'value': -0.0044494267751479287}, {'buffer_status': '3040/4095 bytes', 'channel': 2, 'samples': 507, 'value': 1.6192735299556213}] where the scan list was set to [1, 0, 2]. Returns: list: A list of values for each of the items in the scan-list and in the same order. For details of returns values see above. Raises: BufferOverflow: If the buffer was full at the time of reading. The behavior in this case is ill-defined, so it is better to re-start the measurement if that happens. """ # Form the output list and check for data n_input_channels = len(self._scan_list) list_out = [None] * n_input_channels waiting = self.serial.inWaiting() if waiting == 0: return list_out # Read and ... bytes_read = self.serial.read(waiting) # ... check for buffer overflow if len(bytes_read) == self.buffer_overflow_size: msg = ( "Buffer flowed over. Adjust sampling parameters and restart " "measurement." ) raise BufferOverflow(msg) buffer_message = '{}/{} bytes'.format( len(bytes_read), self.buffer_overflow_size, ) # Parse and interpret the data list_out = self._parse_data_packet(bytes_read, list_out) # Add buffer status for item in list_out: if item is not None: item['buffer_status'] = buffer_message return list_out
def _parse_data_packet(self, bytes_read, list_out): """Parse a data packet""" n_input_channels = len(self._scan_list) # If any of the items in the scan list are analogue inputs, # calculate, parse the 2 byte words as int16 and convert to # float voltage values if any(n in range(0, 8) for n in self._scan_list): ints = numpy.frombuffer(bytes_read, dtype='int16') # NOTE on shift FIXME values = numpy.right_shift(ints, 4) * 10.0 / 2048 # Start in the scan list at the saved position position_in_scanlist = self._position_in_scanlist # Loop over the offsets in the raw values, to calculate average for offset in range(n_input_channels): # Extract the actual channel number from the scan list channel = self._scan_list[position_in_scanlist] # If the input is a analogue input if channel in range(0, 8): # Extracts every n_input_channels items at offset values_this_channel = values[offset::n_input_channels] channel_out = { 'value': values_this_channel.mean(), 'samples': values_this_channel.size, 'channel': channel, } list_out[position_in_scanlist] = channel_out else: msg = "Only analogue input channel supported so far" raise RuntimeError(msg) position_in_scanlist = (position_in_scanlist + 1) % n_input_channels # Calculate scan list position to start from next time self._position_in_scanlist += (len(bytes_read)) // 2 % n_input_channels self._position_in_scanlist = self._position_in_scanlist % n_input_channels return list_out
[docs]class DI1110(DataQBinary): """FIXME""" buffer_overflow_size = 4095
[docs]def module_test(): """Run primitive module tests""" logging.basicConfig(level=logging.DEBUG) global LOGGER # pylint: disable=global-statement LOGGER = logging.getLogger('module_test') # Instantiate driver and print info dataq = DI1110('/dev/ttyUSB0') print(repr(dataq.info())) # Set sample rate dataq.sample_rate(1000) dataq.scan_list([1, 0, 2]) dataq.led_color('red') # Start measurement # dataq.start() # sleep(0.1) from pprint import pprint try: while True: dataq.start() sleep(0.1) pprint(dataq.read()) dataq.stop() sleep(3) except KeyboardInterrupt: dataq.stop() else: dataq.stop() dataq.clear_buffer()
if __name__ == '__main__': module_test()