Source code for PyExpLabSys.drivers.deltaco_TB_298

"""Driver with line reader for the Deltaco TB-298 Keypad"""

import glob
import evdev
import select
from time import sleep, time
from threading import Thread
from queue import Queue, Empty, Full

from PyExpLabSys.common.supported_versions import python3_only

python3_only(__file__)


KEYS_TO_CHARS = {'KEY_KP{}'.format(n): str(n) for n in range(10)}
KEYS_TO_CHARS.update(
    {
        'KEY_KPSLASH': '/',
        'KEY_KPASTERISK': '*',
        'KEY_KPMINUS': '-',
        'KEY_KPPLUS': '+',
        'KEY_KPDOT': '.',
    }
)


[docs]def detect_keypad_device(): """Return the input device path of the Deltaco TB-298 Keypad Iterates over all devices in /dev/input/event?? and looks for one that has the Deltaco Keypad vendor and product id and has a "1" key Returns: str: The Keypad device path """ barcode_device = None for device_string in glob.glob('/dev/input/event*'): try: tmp_dev = evdev.InputDevice(device_string) except OSError: continue else: device_information = tmp_dev.info # Check for vendor and product ids if not ( device_information.vendor == 0x04D9 and device_information.product == 0x1203 ): continue available_keys = tmp_dev.capabilities(verbose=True)[('EV_KEY', 1)] tmp_dev.close() if ('KEY_1', 2) in available_keys: return device_string
STOP = object()
[docs]class MaxSizeQueue(Queue): """MaxSizeQueue: A subclassed queue.Queue object that pops the first item if raising a queue.Full exception when adding an item. Otherwise the queue will block until an item is removed from the queue. """
[docs] def put(self, item): while True: try: super().put(item, block=False) break except Full: super().get()
[docs]class ThreadedKeypad(Thread): """Threaded keypad reader Attributes: line_queue (MaxSizeQueue): Queue of enter separated lines entered event_queue (MaxSizeQueue): Queue of key pad events key_pressed_queue (MaxSizeQueue): Queue of pressed keys (only one entry will be added if the key is held down) device (evdev.InputDevice): The input device """
[docs] def __init__(self, device_path, line_chars='0123456789', max_queue_sizes=None): """Instantiate local variables Holds 3 non-blocking queues that record inputs. If a queue is full, the first item is popped and the newest item is put in the end of the queue. When starting a function in a program that makes use of another queue, make sure to flush the queue before the main loop so you get the newest value first. Alternatively, if you want to keep an unused queue for history purposes, you may want to increase the max_queue_size of that queue. Args: device_path (str): Path of the inpu device to use e.g: '/dev/input/event0' line_chars (str): String of accepted characters in a line. Default value is '0123456789' max_queue_sizes: Dict used to overwrite the default max queue size of 1024 elements. The keys are 'line', 'event' and 'key_pressed' and the values are positive ints (0 is infinite). """ super().__init__() self._continue_reading = True self.device = evdev.InputDevice(device_path) self._gathered_line = '' self.line_chars = line_chars self.led_on = False for (name, value) in self.device.leds(verbose=True): if name == 'LED_NUML': self.led_on = True # Create queues _max_queue_sizes = {'line': 1024, 'event': 1024, 'key_pressed': 1024} if max_queue_sizes: _max_queue_sizes.update(max_queue_sizes) self.line_queue = MaxSizeQueue(maxsize=_max_queue_sizes['line']) self.event_queue = MaxSizeQueue(maxsize=_max_queue_sizes['event']) self.key_pressed_queue = MaxSizeQueue(maxsize=_max_queue_sizes['key_pressed'])
[docs] def run(self): """Main run method""" while self._continue_reading: # Check if there is anything to read on the device try: read_list, _, _ = select.select( [self.device.fd], [], [], 0.5 ) # previous timeout 0.2 if read_list: for event in self.device.read(): if event.type == evdev.ecodes.EV_KEY: self.handle_event(event) # sleep(0.01) # Read LED Num Lock state for (name, value) in self.device.leds(verbose=True): # Set True if LED is on if name == 'LED_NUML': self.led_on = True break else: self.led_on = False except OSError as e: print('\n"OSError" encountered\n{}'.format(e)) self.stop() except Exception as e: print('\nOther "Exception" encountered\n{}'.format(e)) self.stop()
[docs] def handle_event(self, event): """Handle an event""" categorized_event = evdev.categorize(event) self.event_queue.put(categorized_event) if categorized_event.keystate == 0: # Key up # Insert into key_pressed_queue key_code = categorized_event.keycode self.key_pressed_queue.put(key_code) # Insert chars in line if key_code in KEYS_TO_CHARS: char = KEYS_TO_CHARS[key_code] if char in self.line_chars: self._gathered_line += char elif key_code == 'KEY_BACKSPACE': self._gathered_line = self._gathered_line[:-1] elif key_code == 'KEY_KPENTER': self.line_queue.put(self._gathered_line) self._gathered_line = ''
[docs] def stop(self): """Stop the thread and put deltaco_TP_298.STOP in queues""" self._continue_reading = False # while self.is_alive(): # Commented as thread cannot be closed from # sleep(0.1) # within otherwise self.device.close() self.line_queue.put(STOP) self.event_queue.put(STOP) self.key_pressed_queue.put(STOP)
[docs]def module_demo(): """Simple module demo""" # Get reader device_path = detect_keypad_device() reader = ThreadedKeypad(device_path) reader.start() try: while True: # Get pending lines while True: try: line = reader.line_queue.get(timeout=0) print("LINE", line) except Empty: break # Get pending events while True: try: event = reader.event_queue.get(timeout=0) print("EVENT", event) except Empty: break # Get pending key presses while True: try: key_pressed = reader.key_pressed_queue.get(timeout=0) print("KEY PRESSED", key_pressed) except Empty: break sleep(0.1) except KeyboardInterrupt: reader.stop()
if __name__ == '__main__': module_demo()