linuxOS_AP05/debian/test/usr/lib/python3/dist-packages/Onboard/Scanner.py
2025-09-26 09:40:02 +08:00

1081 lines
31 KiB
Python

# -*- coding: utf-8 -*-
# Copyright © 2011-2013 Gerd Kohlberger <lowfi@chello.at>
# Copyright © 2012-2014, 2016 marmuta <marmvta@gmail.com>
#
# This file is part of Onboard.
#
# Onboard is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Onboard is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, print_function, unicode_literals
import sys
import logging
from functools import cmp_to_key
from Onboard.Config import Config
from Onboard.XInput import XIDeviceManager, XIEventType, XIEventMask
from Onboard.WindowUtils import show_new_device_dialog
from Onboard.Timer import Timer
logger = logging.getLogger(__name__)
config = Config()
"""
Methods and terminology from:
- Colven, Judge, 2006: Switch access to technology. A comprehensive guide.
- GOK: The GNOME On-screen Keyboard.
"""
class Chunker(object):
"""
Abstract base class for all chunker objects.
Organizes keys into groups and provides methods
to travers and highlight them.
Hierarchy:
Chunker --> FlatChunker --> GroupChunker
--> GridChunker
"""
def __init__(self):
logger.debug("Chunker.__init__()")
""" Hierarchy of keys (list). """
self._chunks = None
""" The index of the active chunk. """
self._index = 0
""" The number of chunks at the current level. """
self._length = 0
""" A stack of (index, len) tuples. """
self._path = []
""" Number of times the current level has been scanned. """
self.cycles = 0
def __del__(self):
logger.debug("Chunker.__del__()")
def chunk(self, layout, layer):
"""
Abstract: Split the keys on a layer into chunks.
"""
raise NotImplementedError()
def get_current_object(self):
"""
Get the list or key the chunker points to.
"""
level = self._chunks
for p in self._path:
index = p[0]
level = level[index]
return level[self._index]
def _highlight_rec(self, obj, hl, keys):
"""
Recursively sets the highlight on all keys below obj.
"""
if isinstance(obj, list):
for o in obj:
self._highlight_rec(o, hl, keys)
else:
if hl != obj.scanned:
obj.scanned = hl
keys.append(obj)
def highlight(self, hl, root=None):
"""
Highlight or clear the current chunk.
"""
keys = []
if not root:
root = self.get_current_object()
self._highlight_rec(root, hl, keys)
return keys
def highlight_all(self, hl):
"""
Highlight or clear all chunks.
"""
return self.highlight(hl, self._chunks)
def next(self):
"""
Move to the next chunk on the current level.
"""
next = (self._index + 1) % self._length
if next < self._index:
self.cycles += 1
self._index = next
def previous(self):
"""
Move to the previous chunk on the current level.
"""
prev = (self._index - 1) % self._length
if prev > self._index:
self.cycles += 1
self._index = prev
def can_ascend(self):
"""
Whether the chunker can move a level up in the hierarchy.
"""
return len(self._path) != 0
def ascend(self):
"""
Move one level up in the hierarchy.
"""
if self.can_ascend():
self._index, self._length = self._path.pop()
self.cycles = 0
return True
return False
def can_descend(self):
"""
Whether the chunker can move a level down in the hierarchy.
"""
return isinstance(self.get_current_object(), list)
def descend(self):
"""
Move one level down in the hierarchy.
- Skips levels that have only one element.
"""
obj = self.get_current_object()
while isinstance(obj, list):
self._path.append((self._index, self._length))
self._index = 0
self._length = len(obj)
self.cycles = 0
if self._length == 1:
obj = obj[0]
continue
return True
return False
def up(self):
"""
Abstract: Move to key above the current.
"""
raise NotImplementedError()
def down(self):
"""
Abstract: Move to key below the current.
"""
raise NotImplementedError()
def get_key(self):
"""
Get the current key.
Returns None if the object is a list.
"""
obj = self.get_current_object()
if not isinstance(obj, list):
return obj
return None
def reset(self):
"""
Set the chunker to its initial state.
"""
self.cycles = 0
self._index = 0
self._length = len(self._chunks)
self._path = []
def is_reset(self):
"""
Is the chunker in its initial state.
"""
return not self._index and \
not self.cycles and \
not len(self._path)
class FlatChunker(Chunker):
"""
Chunks a layer based on key location.
"""
def compare_keys(self, a, b):
"""
Sort keys by y and then x position
"""
rect_a = a.get_border_rect().int()
rect_b = b.get_border_rect().int()
y = rect_a.y - rect_b.y
if y != 0:
return y
return rect_a.x - rect_b.x
def chunk(self, layout, layer):
"""
Create a list of scannable keys and sort it.
"""
self._chunks = [k for k in layout.iter_layer_keys(layer) \
if k.is_path_scannable()]
self._chunks.extend([k for k in layout.iter_layer_keys(None) \
if k.is_path_scannable()])
self._chunks.sort(key=cmp_to_key(self.compare_keys))
self._length = len(self._chunks)
class GroupChunker(FlatChunker):
"""
Chunks a layer based on priority and key location.
"""
def compare_keys(self, a, b):
"""
Sort keys by priority and location.
"""
p = a.get_path_scan_priority() - b.get_path_scan_priority()
if p != 0:
return p
return super(GroupChunker, self).compare_keys(a, b)
def chunk(self, layout, layer):
"""
Create a nested list of keys.
"""
last_priority = None
last_y = None
chunks = []
# populates 'self._chunks' with a flat sorted list of keys
# using the compare_keys method of this class
super(GroupChunker, self).chunk(layout, layer)
# creates a new nested chunk list with the following layout:
# A list of 'priority groups' where each members is a
# list of 'scan rows' in which each member is a key.
for key in self._chunks:
scan_priority = key.get_path_scan_priority()
if scan_priority != last_priority:
last_priority = scan_priority
last_y = None
group = []
chunks.append(group)
rect = key.get_border_rect().int()
if rect.y != last_y:
last_y = rect.y
row = []
group.append(row)
row.append(key)
# if all keys are in the same group, remove the group
if len(chunks) == 1:
chunks = chunks[0]
self._chunks = chunks
self._length = len(self._chunks)
class GridChunker(FlatChunker):
"""
Chunks a layer into rows of keys.
"""
def chunk(self, layout, layer):
"""
Create a nested list of keys.
"""
last_x = sys.maxsize
chunks = []
# populates 'self._chunks' with a flat sorted list of keys
super(GridChunker, self).chunk(layout, layer)
for key in self._chunks:
rect = key.get_border_rect().int()
if rect.x < last_x:
row = []
chunks.append(row)
last_x = rect.x
row.append(key)
self._chunks = chunks
self._length = len(self._chunks)
def _select_neighbour(self, key, direction):
if key is None:
return
kc = key.get_border_rect().get_center()
min_x = sys.float_info.max
self.ascend()
direction()
for idx, obj in enumerate(self.get_current_object()):
oc = obj.get_border_rect().get_center()
dx = abs(kc[0] - oc[0])
if dx < min_x:
min_x = dx
neighbour = idx
self.descend()
self._index = neighbour
def up(self):
self._select_neighbour(self.get_key(), self.previous)
def down(self):
self._select_neighbour(self.get_key(), self.next)
class ScanMode(Timer):
"""
Abstract base class for all scanning modes.
Specifies how the scanner moves between chunks of keys
and when to activate them. Scan mode subclasses define
a set of actions they support and the base class translates
input device events into scan actions.
Hierarchy:
ScanMode --> AutoScan --> UserScan
--> OverScan
--> StepScan
--> DirectScan
"""
""" Scan actions """
ACTION_STEP = 0
ACTION_LEFT = 1
ACTION_RIGHT = 2
ACTION_UP = 3
ACTION_DOWN = 4
ACTION_ACTIVATE = 5
ACTION_STEP_START = 6
ACTION_STEP_STOP = 7
ACTION_UNHANDLED = 8
""" Time between key activation flashes (in sec) """
ACTIVATION_FLASH_INTERVAL = 0.1
""" Number of key activation flashes """
ACTIVATION_FLASH_COUNT = 4
def __init__(self, redraw_callback, activate_callback):
super(ScanMode, self).__init__()
logger.debug("ScanMode.__init__()")
""" Activation timer instance """
self._activation_timer = Timer()
""" Counter for key flash animation """
self._flash = 0
""" Callback for key redraws """
self._redraw_callback = redraw_callback
""" Callback for key activation """
self._activate_callback = activate_callback
""" A Chunker instance """
self.chunker = None
def __del__(self):
logger.debug("ScanMode.__del__()")
def map_actions(self, detail, pressed):
"""
Abstract: Convert input events into scan actions.
"""
raise NotImplementedError()
def do_action(self, action):
"""
Abstract: Handle scan actions.
"""
raise NotImplementedError()
def scan(self):
"""
Abstract: Move between chunks.
"""
raise NotImplementedError()
def create_chunker(self):
"""
Abstract: Create a chunker instance.
"""
raise NotImplementedError()
def init_position(self):
"""
Virtual: Called if a new layer was set or a key activated.
"""
pass
def handle_event(self, event):
"""
Translate device events into scan actions.
"""
# Ignore events during key activation
if self._activation_timer.is_running():
return
event_type = event.xi_type
if event_type == XIEventType.ButtonPress:
button_map = config.scanner.device_button_map
action = self.map_actions(button_map, event.button, True)
elif event_type == XIEventType.ButtonRelease:
button_map = config.scanner.device_button_map
action = self.map_actions(button_map, event.button, False)
elif event_type == XIEventType.KeyPress:
key_map = config.scanner.device_key_map
action = self.map_actions(key_map, event.keyval, True)
elif event_type == XIEventType.KeyRelease:
key_map = config.scanner.device_key_map
action = self.map_actions(key_map, event.keyval, False)
else:
action = self.ACTION_UNHANDLED
if action != self.ACTION_UNHANDLED:
self.do_action(action)
def on_timer(self):
"""
Override: Timer() callback.
"""
return self.scan()
def max_cycles_reached(self):
"""
Check if the maximum number of scan cycles is reached.
"""
return self.chunker.cycles >= config.scanner.cycles
def set_layer(self, layout, layer):
"""
Set the layer that should be scanned.
"""
self.reset()
self.chunker = self.create_chunker()
self.chunker.chunk(layout, layer)
self.init_position()
def _on_activation_timer(self, key):
"""
Timer callback: Flashes the key and finally activates it.
"""
if self._flash > 0:
key.scanned = not key.scanned
self._flash -= 1
self.redraw([key])
return True
else:
self._activate_callback(key)
self.init_position()
return False
def activate(self):
"""
Activates a key and triggers feedback.
"""
key = self.chunker.get_key()
if not key:
return
if config.scanner.feedback_flash:
self._flash = self.ACTIVATION_FLASH_COUNT
self._activation_timer.start(self.ACTIVATION_FLASH_INTERVAL,
self._on_activation_timer,
key)
else:
self._activate_callback(key)
self.init_position()
def reset(self):
"""
Stop scanning and clear all highlights.
"""
if self.is_running():
self.stop()
if self.chunker:
self.redraw(self.chunker.highlight_all(False))
def redraw(self, keys=None):
"""
Update individual keys or the entire keyboard.
"""
self._redraw_callback(keys)
def finalize(self):
"""
Clean up the ScanMode instance.
"""
self.reset()
self._activation_timer = None
class AutoScan(ScanMode):
"""
Automatic scan mode for 1 switch. Starts scanning on
switch press and moves through a hierarchy of chunks.
"""
def create_chunker(self):
return GroupChunker()
def map_actions(self, dev_map, detail, is_press):
if is_press and detail in dev_map:
return self.ACTION_STEP
return self.ACTION_UNHANDLED
def scan(self):
self.redraw(self.chunker.highlight(False))
self.chunker.next()
if self.max_cycles_reached():
self.chunker.reset()
return False
else:
self.redraw(self.chunker.highlight(True))
return True
def do_action(self, action):
if not self.is_running():
# Start scanning
self.redraw(self.chunker.highlight(True))
self.start(config.scanner.interval)
else:
# Subsequent clicks
self.stop()
self.redraw(self.chunker.highlight(False))
if self.chunker.descend():
# Move one level down
self.redraw(self.chunker.highlight(True))
self.start(config.scanner.interval)
else:
# Activate
self.activate()
self.chunker.reset()
class UserScan(AutoScan):
"""
Automatic scan mode for 1 switch. Like AutoScan but
the scanner progresses only during switch press.
"""
def map_actions(self, dev_map, detail, is_press):
if detail in dev_map:
if is_press:
return self.ACTION_STEP_START
else:
return self.ACTION_STEP_STOP
return self.ACTION_UNHANDLED
def do_action(self, action):
if action == self.ACTION_STEP_START:
if not self.chunker.is_reset():
# Every press except the initial
self.redraw(self.chunker.highlight(False))
self.chunker.descend()
self.redraw(self.chunker.highlight(True))
self.start(config.scanner.interval)
elif action == self.ACTION_STEP_STOP:
# Every release
self.stop()
if not self.chunker.can_descend():
# Activate
self.redraw(self.chunker.highlight(False))
self.activate()
self.chunker.reset()
class OverScan(AutoScan):
"""
Automatic scan mode for 1 switch. Does fast forward
scanning in a flat hierarchy with slow backtracking.
"""
def __init__(self, redraw_callback, activate_callback):
super(OverScan, self).__init__(redraw_callback, activate_callback)
self._step = -1
self._fast = True
def create_chunker(self):
return FlatChunker()
def scan(self):
self.redraw(self.chunker.highlight(False))
if self._step > 0:
# Backtrack
self.chunker.previous()
self._step -= 1
self.redraw(self.chunker.highlight(True))
else:
# Fast forward
self.chunker.next()
if self.max_cycles_reached():
# Abort
self.chunker.reset()
return False
self.redraw(self.chunker.highlight(True))
if not self._fast:
self.stop()
self.do_action(None)
return True
def do_action(self, action):
if not self.is_running():
# Start
self._fast = True
self._step = -1
self.redraw(self.chunker.highlight(True))
self.start(config.scanner.interval_fast)
else:
# Subsequent clicks
if self._step >= 0:
# Activate
self.stop()
self.redraw(self.chunker.highlight(False))
self.activate()
self.chunker.reset()
else:
# Backtrack
self._step = config.scanner.backtrack
self._fast = False
self.chunker.cycles = 0
self.start(config.scanner.interval)
class StepScan(ScanMode):
"""
Directed scan mode for 2 switches.
"""
def __init__(self, redraw_callback, activate_callback):
super(StepScan, self).__init__(redraw_callback, activate_callback)
self.swapped = False
def create_chunker(self):
return GroupChunker()
def init_position(self):
self.chunker.reset()
self.redraw(self.chunker.highlight(True))
def map_actions(self, dev_map, detail, is_press):
if is_press and detail in dev_map:
return dev_map[detail]
return self.ACTION_UNHANDLED
def get_alternate(self, action):
if config.scanner.alternate and self.swapped:
if action == self.ACTION_STEP:
return self.ACTION_ACTIVATE
else:
return self.ACTION_STEP
return action
def do_action(self, action):
if action == self.get_alternate(self.ACTION_STEP):
self.redraw(self.chunker.highlight(False))
self.chunker.next()
if self.max_cycles_reached():
self.init_position()
else:
self.redraw(self.chunker.highlight(True))
else:
self.redraw(self.chunker.highlight(False))
self.swapped = not self.swapped
if self.chunker.descend():
self.redraw(self.chunker.highlight(True))
else:
self.activate()
class DirectScan(ScanMode):
"""
Directed scan mode for 3 or 5 switches.
"""
def create_chunker(self):
return GridChunker()
def init_position(self):
self.chunker.descend()
self.redraw(self.chunker.highlight(True))
def map_actions(self, dev_map, detail, is_press):
if is_press and detail in dev_map:
return dev_map[detail]
return self.ACTION_UNHANDLED
def do_action(self, action):
keys = self.chunker.highlight(False)
if action == self.ACTION_LEFT:
self.chunker.previous()
elif action == self.ACTION_RIGHT:
self.chunker.next()
elif action == self.ACTION_UP:
self.chunker.up()
elif action == self.ACTION_DOWN:
self.chunker.down()
else:
self.activate()
keys.extend(self.chunker.highlight(True))
self.redraw(keys)
class Scanner(object):
"""
Main controller class for keyboard scanning. Manages
ScanMode and ScanDevices objects and provides the
public interface for the scanner.
"""
""" Scan modes """
MODE_AUTOSCAN = 0
MODE_OVERSCAN = 1
MODE_STEPSCAN = 2
MODE_DIRECTED3 = 3
MODE_DIRECTED5 = 4
def __init__(self, redraw_callback, activate_callback):
logger.debug("Scanner.__init__()")
""" A scan mode instance """
self.mode = self._get_scan_mode(config.scanner.mode,
redraw_callback,
activate_callback)
""" A scan device instance """
self.device = ScanDevice(self.mode.handle_event)
""" A keyboard layout """
self.layout = None
""" The active layer of the layout """
self.layer = None
config.scanner.mode_notify_add(self._mode_notify)
config.scanner.user_scan_notify_add(self._user_scan_notify)
def __del__(self):
logger.debug("Scanner.__del__()")
def _mode_notify(self, mode):
"""
Callback for scanner.mode configuration changes.
"""
rcb = self.mode._redraw_callback
acb = self.mode._activate_callback
self.mode.finalize()
self.mode = self._get_scan_mode(mode, rcb, acb)
self.mode.set_layer(self.layout, self.layer)
self.device._event_handler = self.mode.handle_event
def _user_scan_notify(self, user_scan):
"""
Callback for scanner.user_scan configuration changes.
"""
if config.scanner.mode == self.MODE_AUTOSCAN:
self._mode_notify(config.scanner.mode)
def _get_scan_mode(self, mode, redraw_callback, activate_callback):
"""
Get the ScanMode instance for the current profile.
"""
profiles = [ AutoScan, OverScan, StepScan, DirectScan ]
if mode == self.MODE_AUTOSCAN and config.scanner.user_scan:
return UserScan(redraw_callback, activate_callback)
return profiles[mode](redraw_callback, activate_callback)
def update_layer(self, layout, layer, force_update = False):
"""
Notify the scanner about layer or layout changes.
"""
changed = False
if self.layout != layout:
self.layout = layout
changed = True
if self.layer != layer:
self.layer = layer
changed = True
if changed or force_update:
self.mode.set_layer(self.layout, self.layer)
def finalize(self):
"""
Clean up all objects related to scanning.
"""
config.scanner.mode_notify_remove(self._mode_notify)
config.scanner.user_scan_notify_remove(self._user_scan_notify)
self.device.finalize()
self.mode.finalize()
class ScanDevice(object):
"""
Input device manager class.
Manages input devices on the system and deals with
PnP related event. The actual press/release events
are forwarded to a ScanMode instance.
"""
""" Default device name (virtual core pointer) """
DEFAULT_NAME = "Default"
""" Device id's of the primary masters """
DEFAULT_VCP_ID = 2
DEFAULT_VCK_ID = 3
""" Device name blacklist """
blacklist = ["Virtual core pointer",
"Virtual core keyboard",
"Virtual core XTEST pointer",
"Virtual core XTEST keyboard",
"Power Button"]
def __init__(self, event_handler):
logger.debug("ScanDevice.__init__()")
""" Selected device tuple (device id, master id) """
self._active_device_ids = None
""" Whether the active device is detached """
self._floating = False
""" Event handler for device events """
self._event_handler = event_handler
""" The manager for osk XInput devices """
self._device_manager = XIDeviceManager() # singleton
self._device_manager.connect("device-event", self._device_event_handler)
config.scanner.device_name_notify_add(self._device_name_notify)
config.scanner.device_detach_notify_add(self._device_detach_notify)
self._device_name_notify(config.scanner.device_name)
def __del__(self):
logger.debug("ScanDevice.__del__()")
def _device_event_handler(self, event):
"""
Handler for XI2 events.
"""
event_type = event.xi_type
device_id = event.device_id
if event_type == XIEventType.DeviceAdded:
device = self._device_manager.lookup_device_id(device_id)
show_new_device_dialog(device.name,
device.get_config_string(),
device.is_pointer(),
self._on_new_device_accepted)
elif event_type == XIEventType.DeviceRemoved:
# If we are currently using this device,
# close it and fall back to 'Default'
if self._active_device_ids and \
self._active_device_ids[0] == device_id:
self._active_device_ids = None
self._floating = False
config.scanner.device_detach = False
config.scanner.device_name = self.DEFAULT_NAME
else:
# Never handle VCK events.
if device_id != self.DEFAULT_VCK_ID:
# Forward VCP events only if 'Default' is selected.
# Else only handle devices we selected.
if (device_id == self.DEFAULT_VCP_ID and \
config.scanner.device_name == self.DEFAULT_NAME) or \
(self._active_device_ids and \
device_id == self._active_device_ids[0]):
self._event_handler(event)
def _on_new_device_accepted(self, config_string):
"""
Callback for the 'New device' dialog.
Called only if 'Use device' was chosen.
"""
config.scanner.device_name = config_string
config.scanner.device_detach = True
def _device_detach_notify(self, detach):
"""
Callback for the scanner.device_detach configuration changes.
"""
if self._active_device_ids is None:
return
if detach:
if not self._floating:
self.detach(self._active_device_ids[0])
else:
if self._floating:
self.attach(*self._active_device_ids)
def _device_name_notify(self, name):
"""
Callback for the scanner.device_name configuration changes.
"""
self.close()
if name == self.DEFAULT_NAME:
return
for device in self._device_manager.get_devices():
if self.is_useable(device) and \
name == device.get_config_string():
self.open(device)
break
if self._active_device_ids is None:
logger.debug("Unknown device-name in configuration.")
config.scanner.device_detach = False
config.scanner.device_name = self.DEFAULT_NAME
def open(self, device):
"""
Select for events and optionally detach the device.
"""
if device.is_pointer():
event_mask = XIEventMask.ButtonPressMask | \
XIEventMask.ButtonReleaseMask
else:
event_mask = XIEventMask.KeyPressMask | \
XIEventMask.KeyReleaseMask
try:
self._device_manager.select_events(None, device, event_mask)
self._active_device_ids = (device.id, device.attachment)
except Exception as ex:
logger.warning("Failed to open device {id}: {ex}"
.format(id = device.id, ex = ex))
if config.scanner.device_detach and not device.is_master():
self.detach(device.id)
def close(self):
"""
Stop using the current device.
"""
if self._floating:
self.attach(*self._active_device_ids)
if self._active_device_ids:
device = self._device_manager.lookup_device_id( \
self._active_device_ids[0])
try:
self._device_manager.unselect_events(None, device)
self._active_device_ids = None
except Exception as ex:
logger.warning("Failed to close device {id}: {ex}"
.format(id = self._active_device_ids[0],
ex = ex))
def attach(self, dev_id, master):
"""
Attach the device to a master.
"""
try:
self._device_manager.attach_device_id(dev_id, master)
self._floating = False
except:
logger.warning("Failed to attach device {id} to {master}"
.format(id = dev_id, master = master))
def detach(self, dev_id):
"""
Detach the device from its master.
"""
try:
self._device_manager.detach_device_id(dev_id)
self._floating = True
except:
logger.warning("Failed to detach device {id}".format(id = dev_id))
def finalize(self):
"""
Clean up the ScanDevice instance.
"""
self._device_manager.disconnect("device-event",
self._device_event_handler)
config.scanner.device_name_notify_remove(self._device_name_notify)
config.scanner.device_detach_notify_remove(self._device_detach_notify)
self.close()
self._event_handler = None
self.devices = None
@staticmethod
def is_useable(device):
"""
Check whether this device is useable for scanning.
"""
return device.name not in ScanDevice.blacklist \
and device.enabled \
and not device.is_floating()