#!/usr/bin/env python # -*- coding: utf-8 -*- # """ keyboard (Keyboard Module) ========================== **Author:** * Dirk Alders **Description:** This Module supports functions and classes for collecting keyboard strokes. **Submodules:** * :class:`keyboard.keyboard` * :class:`keyboard.keyboard_csp` **Unittest:** See also the :download:`unittest <../../keyboard/_testresults_/unittest.pdf>` documentation. """ __DEPENDENCIES__ = ['stringtools', 'task'] import stringtools import task import evdev import logging import select import sys import time logger_name = 'KEYBOARD' logger = logging.getLogger(logger_name) __DESCRIPTION__ = """The Module {\\tt %s} is designed to fetch data from a keyboatd (e.g. RFID-Reader). For more Information read the sphinx documentation.""" % __name__.replace('_', '\_') """The Module Description""" __INTERPRETER__ = (2, 3) """The Tested Interpreter-Versions""" class keyboard(object): LOG_PREFIX = 'KBD:' WARN_MSG_INIT_DEVICES = 1 WARN_MSG_RUNTIME = 2 LEFT_SHIFT = 42 RIGHT_SHIFT = 54 CAPS_LOCK = 58 NUM_LOCK = 69 SCROLL_LOCK = 70 STATE_BITS_LEFT = { LEFT_SHIFT: 1, CAPS_LOCK: 2, # CAPS LOCK 29: 4, # Left Ctrl 56: 8, # Alt 125: 32, # Left Windows NUM_LOCK: 64, # Num Lock SCROLL_LOCK: 128, # Scroll Lock } STATE_BITS_RIGHT = { RIGHT_SHIFT: 1, 97: 4, # Right Ctrl 100: 16, # Alt Gr 126: 32, # Right Windows } SCANCODES = { # Scancode: ASCIICode 2: u'1', 3: u'2', 4: u'3', 5: u'4', 6: u'5', 7: u'6', 8: u'7', 9: u'8', 10: u'9', 11: u'0', 12: u'-', 13: u'=', 16: u'q', 17: u'w', 18: u'e', 19: u'r', 20: u't', 21: u'y', 22: u'u', 23: u'i', 24: u'o', 25: u'p', 26: u'[', 27: u']', 28: u'\n', 30: u'a', 31: u's', 32: u'd', 33: u'f', 34: u'g', 35: u'h', 36: u'j', 37: u'k', 38: u'l', 39: u';', 40: u'"', 41: u'`', 43: u'\\', 44: u'z', 45: u'x', 46: u'c', 47: u'v', 48: u'b', 49: u'n', 50: u'm', 51: u',', 52: u'.', 53: u'/', 57: u' ' } CAPSCODES = { 2: u'!', 3: u'@', 4: u'#', 5: u'$', 6: u'%', 7: u'^', 8: u'&', 9: u'*', 10: u'(', 11: u')', 12: u'_', 13: u'+', 16: u'Q', 17: u'W', 18: u'E', 19: u'R', 20: u'T', 21: u'Y', 22: u'U', 23: u'I', 24: u'O', 25: u'P', 26: u'{', 27: u'}', 30: u'A', 31: u'S', 32: u'D', 33: u'F', 34: u'G', 35: u'H', 36: u'J', 37: u'K', 38: u'L', 39: u':', 40: u'\'', 41: u'~', 43: u'|', 44: u'Z', 45: u'X', 46: u'C', 47: u'V', 48: u'B', 49: u'N', 50: u'M', 51: u'<', 52: u'>', 53: u'?', 57: u' ' } CAPSKEYS = list(range(16, 26)) + list(range(30, 39)) + list(range(44, 51)) def __init__(self, device_list): if sys.version_info >= (3, 0): str_types = [str] else: str_types = [str, unicode] if type(device_list) in str_types: device_list = [device_list] self.__device_list__ = [] for device in device_list: if device is not None: self.__device_list__.append(device) self.__devices__ = None self.__last_warn_message__ = None # self.__init_rx_buffer__() # self.__data_available_callback__ = {} # self.__state_byte__ = 0 self.__state_byte_left__ = 0 self.__state_byte_right__ = 0 self.__rx_queue__ = task.threaded_queue() self.__rx_queue__.enqueue(5, self.__receive_task__) self.__rx_queue__.run() self.__cb_queue__ = task.threaded_queue() self.__cb_queue__.run() def __init_rx_buffer__(self): self.__rx_data__ = {} for device in self.__device_list__: if device is not None: self.__rx_data__[device] = '' def __calc_keyboard_lock_state__(self, device): leds = device.leds() if evdev.ecodes.LED_NUML in leds: self.__state_byte_left__ |= self.STATE_BITS_LEFT[self.NUM_LOCK] else: self.__state_byte_left__ &= ~self.STATE_BITS_LEFT[self.NUM_LOCK] if evdev.ecodes.LED_CAPSL in leds: self.__state_byte_left__ |= self.STATE_BITS_LEFT[self.CAPS_LOCK] else: self.__state_byte_left__ &= ~self.STATE_BITS_LEFT[self.CAPS_LOCK] if evdev.ecodes.LED_SCROLLL in leds: self.__state_byte_left__ |= self.STATE_BITS_LEFT[self.SCROLL_LOCK] else: self.__state_byte_left__ &= ~self.STATE_BITS_LEFT[self.SCROLL_LOCK] self.__state_byte__ = self.__state_byte_left__ | self.__state_byte_right__ def __calc_keyboard_state__(self, cevent): if cevent.scancode in self.STATE_BITS_LEFT: if cevent.scancode not in [self.CAPS_LOCK, self.NUM_LOCK, self.SCROLL_LOCK]: if cevent.keystate == 1: self.__state_byte_left__ |= self.STATE_BITS_LEFT[cevent.scancode] elif cevent.keystate == 0: self.__state_byte_left__ &= ~self.STATE_BITS_LEFT[cevent.scancode] return True elif cevent.scancode in self.STATE_BITS_RIGHT: if cevent.keystate == 1: self.__state_byte_right__ |= self.STATE_BITS_RIGHT[cevent.scancode] elif cevent.keystate == 0: self.__state_byte_right__ &= ~self.STATE_BITS_RIGHT[cevent.scancode] return True else: return False def __get_ascii__(self, scancode): shift = self.__state_byte__ & self.STATE_BITS_LEFT[self.LEFT_SHIFT] caps = self.__state_byte__ & self.STATE_BITS_LEFT[self.CAPS_LOCK] if not caps and not shift: return self.SCANCODES.get(scancode, None) elif not caps and shift: return self.CAPSCODES.get(scancode, None) elif caps and shift: return self.SCANCODES.get(scancode, None) else: return self.CAPSCODES.get(scancode, None) def __append_rx_buffer__(self, device, c): self.__rx_data__[device.fn] += c if sys.version_info >= (3, 0): logger.debug('%s RX[%s] <- "%s"', self.LOG_PREFIX, device.fn, stringtools.hexlify(bytes(c, 'ascii'))) else: logger.debug('%s RX[%s] <- "%s"', self.LOG_PREFIX, device.fn, stringtools.hexlify(c)) return len(c) > 0 def __parse_ascii__(self, cevent, device): self.__calc_keyboard_lock_state__(device) if self.__calc_keyboard_state__(cevent): pass else: if cevent.keystate == 1: c = self.__get_ascii__(cevent.scancode) if c is not None: self.__append_rx_buffer__(device, c) else: logger.warning('%s No character in dictionary for scancode %d', self.LOG_PREFIX, cevent.scancode) def register_callback(self, callback, device_name): """ :param callback: The callback which will be executed, when data is available. :type callback: function This method sets the callback which is executed, if data is available. You need to execute :func:`receive` of the instance given with the first argument. .. note:: The :func:`callback` is executed with these arguments: :param self: This communication instance """ self.__data_available_callback__[device_name] = callback def receive(self, device_name, timeout=1, num=None): rv = None tm = time.time() while (num is not None and len(self.__rx_data__.get(device_name, [])) < num) or (num is None and len(self.__rx_data__.get(device_name, [])) < 1): if time.time() > tm + timeout: logger.warning('%s TIMEOUT (%ss): Not enough data in buffer. Requested %s and buffer size is %d.', self.LOG_PREFIX, repr(timeout), repr(num or 'all'), len(self.__receive_buffer__)) return None time.sleep(0.05) if num is None: rv = self.__rx_data__[device_name] else: rv = self.__rx_data__[device_name][:num] self.__rx_data__[device_name] = self.__rx_data__[device_name][len(rv):] return rv def __receive_task__(self, queue_inst): if self.__devices__ is None: # A mapping of file descriptors (integers) to InputDevice instances. try: self.__devices__ = map(evdev.InputDevice, self.__device_list__) self.__devices__ = {dev.fd: dev for dev in self.__devices__} logger.info('%s Listening to the following Devices: %s', self.LOG_PREFIX, repr(self.__device_list__)) except OSError: self.__devices__ = None if self.__last_warn_message__ is not self.WARN_MSG_INIT_DEVICES: logger.warning('%s Error while initialising the devices: %s', self.LOG_PREFIX, repr(self.__device_list__)) self.__last_warn_message__ = self.WARN_MSG_INIT_DEVICES else: r = select.select(self.__devices__, [], [])[0] for fd in r: try: for event in self.__devices__[fd].read(): if event.type == evdev.ecodes.EV_KEY: if self.__devices__[fd].fn in self.__device_list__: self.__parse_ascii__(evdev.categorize(event), self.__devices__[fd]) except IOError: if self.__last_warn_message__ is not self.WARN_MSG_RUNTIME: logger.warning('%s Error reading from device: %s', self.LOG_PREFIX, repr(self.__devices__[fd].fn)) self.__devices__ = None self.__last_warn_message__ = self.WARN_MSG_RUNTIME # enqueue required callbacks for device_name in self.__device_list__: if len(self.__rx_data__.get(device_name, [])) > 0: self.__cb_queue__.enqueue(5, self.__callback_execution_queue__, device_name) # queue_inst.enqueue(5, self.__receive_task__) def __callback_execution_queue__(self, queue_inst, device_name): cb = self.__data_available_callback__.get(device_name) if cb is not None and len(self.__rx_data__.get(device_name, [])) > 0: logger.debug("%s Executing callback: %s(self, %s)", self.LOG_PREFIX, cb.__name__, repr(device_name)) cb(self, device_name) if len(self.__rx_data__.get(device_name, [])) > 0: queue_inst.enqueue(5, self.__callback_execution_queue__, device_name) def close(self): """ This method closes the active keyboard channel, if channel exists. """ self.__rx_queue__.stop() self.__rx_queue__.join() self.__cb_queue__.stop() self.__cb_queue__.join() def __del__(self): self.close() class keyboard_csp(keyboard): """ Class to gather keystrokes for specific devices. All data till \\\\n will be collected to a frame. :param device_list: The device to listen to. :type device_list: str or list :param rx_lvl: The log level for single keystrokes :type rx_lvl: int **Example:** .. literalinclude:: ../../keyboard/_examples_/keyboard_csp.py Will result to the following output (if user scans an rfid-tag and types "12345\\\\n"): .. literalinclude:: ../../keyboard/_examples_/keyboard_csp.log """ LOG_PREFIX = 'KBD_CSP:' def __init_rx_buffer__(self): self.__rx_data__ = {} self.__csp__ = {} for device in self.__device_list__: if device is not None: self.__rx_data__[device] = [] self.__csp__[device] = stringtools.csp.csp() def __append_rx_buffer__(self, device, c): if sys.version_info >= (3, 0): content = self.__csp__[device.fn].process(bytes(c, 'ascii')) else: content = self.__csp__[device.fn].process(str(c)) if len(content) > 0: for msg in content: logger.debug('%s RX[%s] <- "%s"', self.LOG_PREFIX, device.fn, stringtools.hexlify(msg)) self.__rx_data__[device.fn] += content return True else: return False def receive(self, device_name, timeout=1): return keyboard.receive(self, device_name, timeout=timeout, num=1)[0]