Source code for PyExpLabSys.drivers.vivo_technologies

"""Driver for a Vivo Technologies LS-689A barcode scanner"""

import glob
import evdev
import threading
import time

try:
    import Queue
except ImportError:
    import queue as Queue


[docs]def detect_barcode_device(): """Return the input device path of the Barcode Scanner Iterates over all devices in /dev/input/event?? and looks for one that has 'Barcode Reader' in its description. Returns: str: The Barcode Scanner device path """ barcode_device = None for device_string in glob.glob('/dev/input/event*'): try: tmp_dev = evdev.InputDevice(device_string) except OSError: continue else: device_description = str(tmp_dev) tmp_dev.close() if 'Barcode Reader' in device_description: barcode_device = device_string break return barcode_device
# pylint: disable=too-few-public-methods
[docs]class BlockingBarcodeReader(object): """Blocking Barcode Reader"""
[docs] def __init__(self, device_path): self.dev = evdev.InputDevice(device_path)
[docs] def read_barcode(self): """Wait for a barcode and return it""" out = '' for event in self.dev.read_loop(): if event.type == evdev.ecodes.EV_KEY: # Save the event temporarily to introspect it data = evdev.categorize(event) if data.keystate == 1: # Down events only key_lookup = SCANCODES.get(data.scancode, '?') if key_lookup == 'CRLF': break else: out += key_lookup return out
[docs] def close(self): """Close the device""" self.dev.close()
[docs]class ThreadedBarcodeReader(threading.Thread): """Threaded Barcode Scanner that holds only the last value"""
[docs] def __init__(self, device_path): super(ThreadedBarcodeReader, self).__init__() self.dev = evdev.InputDevice(device_path) self.daemon = True self._barcode_queue = Queue.Queue()
[docs] def run(self): """The threaded run method""" read_so_far = '' for event in self.dev.read_loop(): if event.type == evdev.ecodes.EV_KEY: # Save the event temporarily to introspect it data = evdev.categorize(event) if data.keystate == 1: # Down events only key_lookup = SCANCODES.get(data.scancode, '?') if key_lookup == 'CRLF': self._barcode_queue.put(read_so_far) read_so_far = '' else: read_so_far += key_lookup
@property def last_barcode_in_queue(self): """Last barcode in the queue""" last = None while True: try: last = self._barcode_queue.get_nowait() except Queue.Empty: break return last @property def wait_for_barcode(self): """Last barcode property""" # We need to timeout every once in a while to allow interrupts to work while True: try: return self._barcode_queue.get(timeout=1) except Queue.Empty: pass @property def oldest_barcode_from_queue(self): """Get one barcode from the queue if there is one""" try: return self._barcode_queue.get_nowait() except Queue.Empty: return None
[docs] def close(self): """Close the device""" self.dev.close()
SCANCODES = { # Scancode: ASCIICode 0: None, 1: u'ESC', 2: u'1', 3: u'2', 4: u'3', 5: u'4', 6: u'5', 7: u'6', 8: u'7', 9: u'8', 10: u'9', 11: u'0', 12: u'-', 13: u'=', 14: u'BKSP', 15: u'TAB', 16: u'Q', 17: u'W', 18: u'E', 19: u'R', 20: u'T', 21: u'Y', 22: u'U', 23: u'I', 24: u'O', 25: u'P', 26: u'[', 27: u']', 28: u'CRLF', 29: u'LCTRL', 30: u'A', 31: u'S', 32: u'D', 33: u'F', 34: u'G', 35: u'H', 36: u'J', 37: u'K', 38: u'L', 39: u';', 40: u'"', 41: u'`', 42: u'LSHFT', 43: u'\\', 44: u'Z', 45: u'X', 46: u'C', 47: u'V', 48: u'B', 49: u'N', 50: u'M', 51: u',', 52: u'.', 53: u'/', 54: u'RSHFT', 56: u'LALT', 100: u'RALT', } if __name__ == '__main__': dev_ = detect_barcode_device() print(dev_) tbs = ThreadedBarcodeReader(dev_) tbs.start() try: while True: print('Last barcode: {}'.format(tbs.last_barcode_in_queue)) time.sleep(1) except KeyboardInterrupt: tbs.close()