linuxOS_AP05/debian/test/usr/lib/python3/dist-packages/Onboard/TextContext.py
2025-09-26 09:40:02 +08:00

704 lines
23 KiB
Python

# -*- coding: utf-8 -*-
# Copyright © 2012-2017 marmuta <marmvta@gmail.com>
#
# This file is part of Onboard.
#
# Onboard is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Onboard is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, print_function, unicode_literals
import unicodedata
import time
import logging
_logger = logging.getLogger(__name__)
from Onboard.Version import require_gi_versions
require_gi_versions()
try:
from gi.repository import Atspi
except ImportError as e:
pass
from Onboard.AtspiStateTracker import AtspiStateTracker, AtspiStateType
from Onboard.TextDomain import TextDomains
from Onboard.TextChanges import TextChanges, TextSpan
from Onboard.utils import KeyCode, unicode_str
from Onboard.Timer import Timer
from Onboard import KeyCommon
### Config Singleton ###
from Onboard.Config import Config
config = Config()
class TextContext:
"""
Keep track of the current text context and intecept typed key events.
"""
def cleanup(self):
pass
def reset(self):
pass
def can_insert_text(self):
return NotImplementedError()
def insert_text(self, offset, text):
return NotImplementedError()
def insert_text_at_caret(self, text):
return NotImplementedError()
def delete_text(self, offset, length=1):
return NotImplementedError()
def delete_text_before_caret(self, length=1):
return NotImplementedError()
def get_context(self):
raise NotImplementedError()
def get_line(self):
raise NotImplementedError()
def get_line_caret_pos(self):
raise NotImplementedError()
def get_changes(self):
raise NotImplementedError()
def clear_changes(self):
raise NotImplementedError()
class AtspiTextContext(TextContext):
"""
Keep track of the current text context with AT-SPI
"""
_state_tracker = AtspiStateTracker()
def __init__(self, wp):
self._wp = wp
self._accessible = None
self._can_insert_text = False
self._text_domains = TextDomains()
self._text_domain = self._text_domains.get_nop_domain()
self._changes = TextChanges()
self._entering_text = False
self._text_changed = False
self._context = ""
self._line = ""
self._line_caret = 0
self._selection_span = TextSpan()
self._begin_of_text = False # context starts at begin of text?
self._begin_of_text_offset = None # offset of text begin
self._pending_separator_span = None
self._last_text_change_time = 0
self._last_caret_move_time = 0
self._last_caret_move_position = 0
self._last_context = None
self._last_line = None
self._update_context_timer = Timer()
self._update_context_delay_normal = 0.01
self._update_context_delay = self._update_context_delay_normal
def cleanup(self):
self._register_atspi_listeners(False)
def enable(self, enable):
self._register_atspi_listeners(enable)
def get_text_domain(self):
return self._text_domain
def set_pending_separator(self, separator_span=None):
""" Remember this separator span for later insertion. """
if self._pending_separator_span is not separator_span:
self._pending_separator_span = separator_span
def get_pending_separator(self):
""" Return current pending separator span or None """
return self._pending_separator_span
def get_context(self):
"""
Returns the predictions context, i.e. some range of
text before the caret position.
"""
if self._accessible is None:
return ""
# Don't update suggestions in scrolling terminals
if self._entering_text or \
not self._text_changed or \
self.can_suggest_before_typing():
return self._context
return ""
def get_bot_context(self):
"""
Returns the predictions context with
begin of text marker (at text begin).
"""
context = ""
if self._accessible:
context = self.get_context()
# prepend domain specific begin-of-text marker
if self._begin_of_text:
marker = self.get_text_begin_marker()
if marker:
context = marker + " " + context
return context
def get_pending_bot_context(self):
"""
Context including bot marker and pending separator.
"""
context = self.get_bot_context()
if self._pending_separator_span is not None:
context += self._pending_separator_span.get_span_text()
return context
def get_line(self):
return self._line \
if self._accessible else ""
def get_line_caret_pos(self):
return self._line_caret \
if self._accessible else 0
def get_line_past_caret(self):
return self._line[self._line_caret:] \
if self._accessible else ""
def get_selection_span(self):
return self._selection_span \
if self._accessible else None
def get_span_at_caret(self):
if not self._accessible:
return None
span = self._selection_span.copy()
span.length = 0
return span
def get_caret(self):
return self._selection_span.begin() \
if self._accessible else 0
def get_character_extents(self, offset):
accessible = self._accessible
if accessible:
return accessible.get_character_extents(offset)
else:
return None
def get_text_begin_marker(self):
domain = self.get_text_domain()
if domain:
return domain.get_text_begin_marker()
return ""
def can_record_insertion(self, accessible, pos, length):
domain = self.get_text_domain()
if domain:
return domain.can_record_insertion(accessible, pos, length)
return True
def can_suggest_before_typing(self):
domain = self.get_text_domain()
if domain:
return domain.can_suggest_before_typing()
return True
def can_auto_punctuate(self):
domain = self.get_text_domain()
if domain:
return domain.can_auto_punctuate(self._begin_of_text)
return False
def get_begin_of_text_offset(self):
return self._begin_of_text_offset \
if self._accessible else None
def get_changes(self):
return self._changes
def has_changes(self):
""" Are there any changes to learn? """
return not self._changes.is_empty()
def clear_changes(self):
self._changes.clear()
def can_insert_text(self):
"""
Can delete or insert text into the accessible?
"""
# support for inserting is spotty: not in firefox, terminal
return bool(self._accessible) and self._can_insert_text
def delete_text(self, offset, length=1):
""" Delete directly, without going through faking key presses. """
self._accessible.delete_text(offset, offset + length)
def delete_text_before_caret(self, length=1):
""" Delete directly, without going through faking key presses. """
try:
caret_offset = self._accessible.get_caret_offset()
except Exception as ex: # Private exception gi._glib.GError when
_logger.info("TextContext.delete_text_before_caret(): " +
unicode_str(ex))
return
self.delete_text(caret_offset - length, length)
def insert_text(self, offset, text):
"""
Insert directly, without going through faking key presses.
"""
self._accessible.insert_text(offset, text)
# Move the caret after insertion if the accessible itself
# hasn't done so already. This assumes the insertion begins at
# the current caret position, which always happens to be the case
# currently.
# Only the nautilus rename text entry appears to need this.
offset_before = offset
try:
offset_after = self._accessible.get_caret_offset()
except Exception as ex: # Private exception gi._glib.GError when
_logger.info("TextContext.insert_text(): " +
unicode_str(ex))
return
if text and offset_before == offset_after:
self._accessible.set_caret_offset(offset_before + len(text))
def insert_text_at_caret(self, text):
"""
Insert directly, without going through faking key presses.
Fails for terminal and firefox, unfortunately.
"""
try:
caret_offset = self._accessible.get_caret_offset()
except Exception as ex: # Private exception gi._glib.GError when
_logger.info("TextContext.insert_text_at_caret(): " +
unicode_str(ex))
return
self.insert_text(caret_offset, text)
def _register_atspi_listeners(self, register=True):
st = self._state_tracker
if register:
st.connect("text-entry-activated", self._on_text_entry_activated)
st.connect("text-changed", self._on_text_changed)
st.connect("text-caret-moved", self._on_text_caret_moved)
# st.connect("key-pressed", self._on_atspi_key_pressed)
else:
st.disconnect("text-entry-activated", self._on_text_entry_activated)
st.disconnect("text-changed", self._on_text_changed)
st.disconnect("text-caret-moved", self._on_text_caret_moved)
# st.disconnect("key-pressed", self._on_atspi_key_pressed)
def get_accessible_capabilities(self, accessible):
can_insert_text = False
if accessible:
# Can insert text via Atspi?
# Advantages:
# - faster, no individual key presses
# - trouble-free insertion of all unicode characters
if "EditableText" in accessible.get_interfaces():
# Support for atspi text insertion is spotty.
# Firefox, LibreOffice Writer, gnome-terminal don't support it,
# even if they claim to implement the EditableText interface.
# Allow direct text insertion for gtk widgets
if accessible.is_toolkit_gtk3():
can_insert_text = True
return can_insert_text
def _on_text_entry_activated(self, accessible):
# old text_domain still valid here
self._wp.on_text_entry_deactivated()
# keep track of the active accessible asynchronously
self._accessible = accessible
self._entering_text = False
self._text_changed = False
# make sure state is filled with essential entries
if accessible:
accessible.get_role()
accessible.get_attributes()
accessible.get_interfaces()
accessible.is_urlbar()
state = accessible.get_state()
else:
state = {}
# select text domain matching this accessible
self._text_domain = self._text_domains.find_match(**state)
self._text_domain.init_domain()
# determine capabilities of this accessible
self._can_insert_text = \
self.get_accessible_capabilities(accessible)
# log accessible info
if _logger.isEnabledFor(_logger.LEVEL_ATSPI):
log = _logger.atspi
log("-" * 70)
log("Accessible focused: ")
indent = " " * 4
if accessible:
state = accessible.get_all_state()
for key, value in sorted(state.items()):
msg = str(key) + "="
if key == "state-set":
msg += repr(AtspiStateType.to_strings(value))
elif hasattr(value, "value_name"): # e.g. role
msg += value.value_name
else:
msg += repr(value)
log(indent + msg)
log(indent + "text_domain: {}"
.format(self._text_domain and
type(self._text_domain).__name__))
log(indent + "can_insert_text: {}"
.format(self._can_insert_text))
else:
log(indent + "None")
self._update_context()
self._wp.on_text_entry_activated()
def _on_text_changed(self, event):
insertion_span = self._record_text_change(event.pos,
event.length,
event.insert)
# synchronously notify of text insertion
if insertion_span:
try:
caret_offset = self._accessible.get_caret_offset()
except Exception as ex: # Private exception gi._glib.GError when
_logger.info("TextContext._on_text_changed(): " +
unicode_str(ex))
else:
self._wp.on_text_inserted(insertion_span, caret_offset)
self._last_text_change_time = time.time()
self._update_context()
def _on_text_caret_moved(self, event):
self._last_caret_move_time = time.time()
self._last_caret_move_position = event.caret
self._update_context()
self._wp.on_text_caret_moved()
def _on_atspi_key_pressed(self, event):
""" disabled, Francesco didn't receive any AT-SPI key-strokes. """
# keycode = event.hw_code # uh oh, only keycodes...
# # hopefully "c" doesn't move around a lot.
# modifiers = event.modifiers
# self._handle_key_press(keycode, modifiers)
def on_onboard_typing(self, key, mod_mask):
if key.is_text_changing():
keycode = 0
if key.is_return():
keycode = KeyCode.KP_Enter
else:
label = key.get_label()
if label == "C" or label == "c":
keycode = KeyCode.C
self._handle_key_press(keycode, mod_mask)
def _handle_key_press(self, keycode, modifiers):
if self._accessible:
domain = self.get_text_domain()
if domain:
self._entering_text, end_of_editing = \
domain.handle_key_press(keycode, modifiers)
if end_of_editing is True:
self._wp.commit_changes()
elif end_of_editing is False:
self._wp.discard_changes()
def _record_text_change(self, pos, length, insert):
accessible = self._accessible
insertion_span = None
char_count = None
if accessible:
try:
char_count = accessible.get_character_count()
except: # gi._glib.GError: The application no longer exists
# when closing a tab in gnome-terminal.
char_count = None
if char_count is not None:
# record the change
spans_to_update = []
if insert:
if self._entering_text and \
self.can_record_insertion(accessible, pos, length):
if self._wp.is_typing() or length < 30:
# Remember all of the insertion, might have been
# a pressed snippet or wordlist button.
include_length = -1
else:
# Remember only the first few characters.
# Large inserts can be paste, reload or scroll
# operations. Only learn the first word of these.
include_length = 2
# simple span for current insertion
begin = max(pos - 100, 0)
end = min(pos + length + 100, char_count)
try:
text = accessible.get_text(begin, end)
except Exception as ex:
_logger.info("TextContext._record_text_change() 1: " +
unicode_str(ex))
else:
insertion_span = TextSpan(pos, length, text, begin)
else:
# Remember nothing, just update existing spans.
include_length = None
spans_to_update = self._changes.insert(pos, length,
include_length)
else:
spans_to_update = self._changes.delete(pos, length,
self._entering_text)
# update text of all modified spans
for span in spans_to_update:
# Get some more text around the span to hopefully
# include whole words at beginning and end.
begin = max(span.begin() - 100, 0)
end = min(span.end() + 100, char_count)
try:
span.text = accessible.get_text(begin, end)
except Exception as ex:
_logger.info("TextContext._record_text_change() 2: " +
unicode_str(ex))
span.text = ""
span.text_pos = begin
self._text_changed = True
return insertion_span
def set_update_context_delay(self, delay):
self._update_context_delay = delay
def reset_update_context_delay(self):
self._update_context_delay = self._update_context_delay_normal
def _update_context(self):
self._update_context_timer.start(self._update_context_delay,
self.on_text_context_changed)
def on_text_context_changed(self):
# Clear pending separator when the user clicked to move
# the cursor away from the separator position.
if self._pending_separator_span:
# Lone caret movement, no recent text change?
if self._last_caret_move_time - self._last_text_change_time > 1.0:
# Away from the separator?
if self._last_caret_move_position != \
self._pending_separator_span.begin():
self.set_pending_separator(None)
result = self._text_domain.read_context(self._wp, self._accessible)
if result is not None:
(self._context,
self._line,
self._line_caret,
self._selection_span,
self._begin_of_text,
self._begin_of_text_offset) = result
# make sure to include bot-markers and pending separator
context = self.get_pending_bot_context()
change_detected = (self._last_context != context or
self._last_line != self._line)
if change_detected:
self._last_context = context
self._last_line = self._line
self._wp.on_text_context_changed(change_detected)
return False
class InputLine(TextContext):
"""
Track key presses ourselves.
Advantage: Doesn't require AT-SPI
Problems: Misses key repeats,
Doesn't know about keymap translations before events are
delivered to their destination, i.e records wrong key
strokes when changing keymaps.
"""
def __init__(self):
self.reset()
def reset(self):
self.line = ""
self.caret = 0
self.valid = True
self.word_infos = {}
def is_valid(self):
return self.valid
def is_empty(self):
return len(self.line) == 0
def insert(self, s):
self.line = self.line[:self.caret] + s + self.line[self.caret:]
self.move_caret(len(s))
def delete_left(self, n=1): # backspace
self.line = self.line[:self.caret - n] + self.line[self.caret:]
self.move_caret(-n)
def delete_right(self, n=1): # delete
self.line = self.line[:self.caret] + self.line[self.caret + n:]
def move_caret(self, n):
self.caret += n
# moving into unknown territory -> suggest reset
if self.caret < 0:
self.caret = 0
self.valid = False
if self.caret > len(self.line):
self.caret = len(self.line)
self.valid = False
def get_context(self):
return self.line[:self.caret]
def get_line(self):
return self.line
def get_line_caret_pos(self):
return self.caret
@staticmethod
def is_printable(char):
"""
True for printable keys including whitespace as defined for isprint().
"""
if char == "\t":
return True
return not unicodedata.category(char) in ('Cc', 'Cf', 'Cs', 'Co',
'Cn', 'Zl', 'Zp')
def track_sent_key(self, key, mods):
"""
Sync input_line with single key presses.
WORD_ACTION and MACRO_ACTION do this in press_key_string.
"""
end_editing = False
if config.wp.stealth_mode:
return True
id = key.id.upper()
char = key.get_label()
if char is None or len(char) > 1:
char = ""
if key.action_type == KeyCommon.WORD_ACTION:
pass # don't reset input on word insertion
elif key.action_type == KeyCommon.MODIFIER_ACTION:
pass # simply pressing a modifier shouldn't stop the word
elif key.action_type == KeyCommon.BUTTON_ACTION:
pass
elif key.action_type == KeyCommon.KEYSYM_ACTION:
if id == 'ESC':
self.reset()
end_editing = True
elif key.action_type == KeyCommon.KEYPRESS_NAME_ACTION:
if id == 'DELE':
self.delete_right()
elif id == 'LEFT':
self.move_caret(-1)
elif id == 'RGHT':
self.move_caret(1)
else:
end_editing = True
elif key.action_type == KeyCommon.KEYCODE_ACTION:
if id == 'RTRN':
char = "\n"
elif id == 'SPCE':
char = " "
elif id == 'TAB':
char = "\t"
if id == 'BKSP':
self.delete_left()
elif self.is_printable(char):
if mods[4]: # ctrl+key press?
end_editing = True
else:
self.insert(char)
else:
end_editing = True
else:
end_editing = True
if not self.is_valid(): # caret moved outside known range?
end_editing = True
# print end_editing,"'%s' " % self.line, self.caret
return end_editing