linuxOS_AP05/debian/test/usr/lib/python3/dist-packages/Onboard/WPEngine.py
2025-09-26 09:40:02 +08:00

660 lines
23 KiB
Python

# -*- coding: utf-8 -*-
# Copyright © 2013-2017 marmuta <marmvta@gmail.com>
#
# This file is part of Onboard.
#
# Onboard is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Onboard is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, print_function, unicode_literals
import os
import time
import logging
from Onboard.utils import unicode_str, XDGDirs
from Onboard.Timer import Timer
from Onboard.Config import Config
import Onboard.pypredict as pypredict
config = Config()
_logger = logging.getLogger(__name__)
class WPLocalEngine(object):
"""
Singleton class for low-level word prediction, local in-process engine.
"""
def __new__(cls, *args, **kwargs):
"""
Singleton magic.
"""
if not hasattr(cls, "self"):
cls.self = object.__new__(cls, *args, **kwargs)
cls.self.construct()
return cls.self
def __init__(self):
"""
Called multiple times, do not use.
"""
pass
def construct(self):
"""
Singleton constructor, runs only once.
"""
self._model_cache = ModelCache()
self._auto_save_timer = AutoSaveTimer(self._model_cache)
self.models = []
self.persistent_models = []
self.auto_learn_models = []
self.scratch_models = []
def cleanup(self):
self._auto_save_timer.stop()
self._model_cache.save_models()
def set_models(self, persistent_models, auto_learn_models, scratch_models):
""" Fixme: rename to "set_model_ids" """
self.models = persistent_models + scratch_models
self.persistent_models = persistent_models
self.auto_learn_models = auto_learn_models
self.auto_learn_models = auto_learn_models
self.scratch_models = scratch_models
def load_models(self):
"""
Pre-load models set with set_models. If this isn't called,
language models are lazy-loaded on demand.
"""
self._model_cache.get_models(self.models)
def postpone_autosave(self):
self._auto_save_timer.postpone()
def pause_autosave(self):
# Pause for a minute max, because resume_autosave isn't
# reliable called, e.g. when dragging and leaving the window.
self._auto_save_timer.pause(60)
def resume_autosave(self):
self._auto_save_timer.resume()
def predict(self, context_line, limit=20,
case_insensitive=False,
case_insensitive_smart=False,
accent_insensitive=False,
accent_insensitive_smart=False,
ignore_capitalized=False,
ignore_non_capitalized=False):
""" Find completion/prediction choices. """
LanguageModel = pypredict.LanguageModel
options = 0
if case_insensitive:
options |= LanguageModel.CASE_INSENSITIVE
if case_insensitive_smart:
options |= LanguageModel.CASE_INSENSITIVE_SMART
if accent_insensitive:
options |= LanguageModel.ACCENT_INSENSITIVE
if accent_insensitive_smart:
options |= LanguageModel.ACCENT_INSENSITIVE_SMART
if ignore_capitalized:
options |= LanguageModel.IGNORE_CAPITALIZED
if ignore_non_capitalized:
options |= LanguageModel.IGNORE_NON_CAPITALIZED
context, spans = pypredict.tokenize_context(context_line)
choices = self._get_prediction(self.models, context, limit, options)
_logger.debug("context=" + repr(context))
_logger.debug("choices=" + repr(choices[:5]))
return [x[0] for x in choices]
def learn_text(self, text, allow_new_words):
""" Count n-grams and add words to the auto-learn models. """
if self.auto_learn_models:
tokens, spans = pypredict.tokenize_text(text)
# There are too many false positives with trailing
# single quotes, remove them.
# Do this here, because we still want "it's", etc. to
# incrementally provide completions.
for i, token in enumerate(tokens):
if token.endswith("'"):
token = token[:-1]
if not token: # shouldn't happen
token = "<unk>"
tokens[i] = token
# if requested, drop unknown words
if allow_new_words:
token_sections = [tokens]
else:
token_sections = self._drop_new_words(tokens, spans,
self.persistent_models)
models = self._model_cache.get_models(self.auto_learn_models)
for model in models:
for tokens in token_sections:
model.learn_tokens(tokens)
_logger.info("learn_text: tokens=" + repr(token_sections))
# debug: save all learned text for later parameter optimization
if config.log_learn:
fn = os.path.join(config.user_dir, "learned_text.txt")
with open(fn, "a") as f:
f.write(text + "\n")
def _drop_new_words(self, tokens, spans, lmids):
""" Remove tokens that don't already exist in any active model. """
tokspans, counts = self.lookup_tokens(tokens, spans, lmids)
split_indices = [i for i, model_counts in enumerate(counts)
if all(n != 1 for n in model_counts)]
return pypredict.split_tokens_at(tokens, split_indices)
def learn_scratch_text(self, text):
""" Count n-grams and add words to the scratch models. """
tokens, spans = pypredict.tokenize_text(text)
models = self._model_cache.get_models(self.scratch_models)
for model in models:
# print("scratch learn", model, tokens)
model.learn_tokens(tokens, True)
def clear_scratch_models(self):
""" Count n-grams and add words to the scratch models. """
models = self._model_cache.get_models(self.scratch_models)
for model in models:
model.clear()
def lookup_text(self, text, lmids):
"""
Split <text> into tokens and lookup the individual tokens in each
of the given language models. See lookup_tokens() for more information.
"""
tokens, spans = pypredict.tokenize_sentence(text)
return self.lookup_tokens(tokens, spans, lmids)
def lookup_tokens(self, tokens, spans, lmids):
"""
Lookup the individual tokens in each of the given language models.
This method is meant to be a basis for highlighting (partially)
unknown words in a display for recently typed text.
The return value is a tuple of two arrays. First an array of tuples
(start, end, token), one per token, with start and end index pointing
into <text> and second a two dimensional array of lookup results.
There is one lookup result per token and language model. Each lookup
result is either 0 for no match, 1 for an exact match or -n for
count n partial (prefix) matches.
"""
tokspans = [(spans[i][0], spans[i][1], t)
for i, t in enumerate(tokens)]
counts = [[0 for lmid in lmids] for t in tokspans]
for i, lmid in enumerate(lmids):
model = self._model_cache.get_model(lmid)
if model:
for j, t in enumerate(tokspans):
counts[j][i] = model.lookup_word(t[2])
_logger.debug("lookup_tokens: tokens=%s counts=%s" %
(repr(tokens), repr(counts)))
# Counts are 0 for no match, 1 for exact match or
# -n for partial matches
return tokens, counts
def word_exists(self, word):
"""
Does word exist in any of the non-scratch models?
"""
exists = False
lmids = self.persistent_models
for i, lmid in enumerate(lmids):
model = self._model_cache.get_model(lmid)
if model:
count = model.lookup_word(word)
if count > 0:
exists = True
break
return exists
def tokenize_text(self, text):
"""
Let the service find the words in text.
"""
tokens, spans = pypredict.tokenize_text(text)
return tokens, spans
def tokenize_text_pythonic(self, text):
"""
Let the service find the words in text.
Return python types instead of dbus.Array/String/... .
Doctests:
# whitspace have to be respected in spans
>>> p = WPLocalEngine()
>>> p.tokenize_text_pythonic("abc def")
(['abc', 'def'], [[0, 3], [5, 8]])
"""
return self.tokenize_text(text)
def tokenize_context(self, text):
""" let the service find the words in text """
return pypredict.tokenize_context(text)
def get_model_names(self, _class):
""" Return the names of the available models. """
names = self._model_cache.find_available_model_names(_class)
return names
def get_last_context_fragment(self, text):
"""
Return the very last (partial) word in text.
"""
text = text[-1024:]
tokens, spans = self.tokenize_context(text)
if len(spans):
# Don't return the token itself as it won't include
# trailing dashes. Catch the text until its very end.
begin = spans[-1][0]
return text[begin:]
else:
return ""
def _get_prediction(self, lmdesc, context, limit, options):
lmids, weights = self._model_cache.parse_lmdesc(lmdesc)
models = self._model_cache.get_models(lmids)
for m in models:
# Kneser-ney perfomes best in entropy and ksr measures, but
# failed in practice for anything but natural language, e.g.
# shell commands.
# -> use the second best available: absolute discounting
# m.smoothing = "kneser-ney"
m.smoothing = "abs-disc"
# setup recency caching
if hasattr(m, "recency_ratio"):
# Values found with
# $ pypredict/optimize caching models/en.lm learned_text.txt
# based on multilingual text actually typed (--log-learning)
# with onboard over ~3 months.
# How valid those settings are under different conditions
# remains to be seen, but for now this is the best I have.
m.recency_ratio = 0.811
m.recency_halflife = 96
m.recency_smoothing = "jelinek-mercer"
m.recency_lambdas = [0.404, 0.831, 0.444]
model = pypredict.overlay(models)
# model = pypredict.linint(models, weights)
# model = pypredict.loglinint(models, weights)
choices = model.predictp(context, limit, options=options)
return choices
def remove_context(self, context):
"""
Remove the last word of context in the given context.
If len(context) == 1 then all occurences of the word will be removed.
"""
lmids, weights = self._model_cache.parse_lmdesc(self.auto_learn_models)
models = self._model_cache.get_models(lmids)
for i, m in enumerate(models):
changes = m.remove_context(context)
# debug output
_logger.debug("removing {} from '{}': {} n-grams affected"
.format(context, lmids[i], len(changes)))
if _logger.isEnabledFor(logging.DEBUG):
changes = sorted(sorted(changes.items()),
key=lambda x: -len(x[0]))
for ng in changes:
_logger.debug(" remove: {}, count {}"
.format(ng[0], ng[1]))
class ModelCache:
""" Loads and caches language models """
def __init__(self):
self._language_models = {}
def clear(self):
self._language_models = {}
def get_models(self, lmids):
models = []
for lmid in lmids:
model = self.get_model(lmid)
if model:
models.append(model)
return models
def get_model(self, lmid):
""" get language model from cache or load it from disk"""
lmid = self.canonicalize_lmid(lmid)
if lmid in self._language_models:
model = self._language_models[lmid]
else:
model = self.load_model(lmid)
if model:
self._language_models[lmid] = model
return model
def find_available_model_names(self, _class):
names = []
models = self._find_models(_class)
for model in models:
name = os.path.basename(model)
name, ext = os.path.splitext(name)
names.append(name)
return names
@staticmethod
def _find_models(_class):
models = []
if _class == "system":
path = config.get_system_model_dir()
else:
path = config.get_user_model_dir()
try:
files = os.listdir(path)
extension = "lm"
for filename in files:
if filename.endswith("." + extension):
models.append(os.path.join(path, filename))
except OSError as e:
_logger.warning("Failed to find language models in '{}': {} ({})"
.format(path, os.strerror(e.errno), e.errno))
return models
@staticmethod
def parse_lmdesc(lmdesc):
"""
Extract language model ids and interpolation weights from
the language model description.
"""
lmids = []
weights = []
for entry in lmdesc:
fields = entry.split(",")
lmids.append(fields[0])
weight = 1.0
if len(fields) >= 2: # weight is optional
try:
weight = float(fields[1])
except:
pass
weights.append(weight)
return lmids, weights
@staticmethod
def canonicalize_lmid(lmid):
"""
Fully qualifies and unifies language model ids.
Fills in missing fields with default values.
The result is of the format "type:class:name".
"""
# default values
result = ["lm", "system", "en"]
for i, field in enumerate(lmid.split(":")[:3]):
result[i] = field
return ":".join(result)
@staticmethod
def split_lmid(lmid):
lmid = ModelCache.canonicalize_lmid(lmid)
return lmid.split(":")
@staticmethod
def is_user_lmid(lmid):
type_, class_, name = ModelCache.split_lmid(lmid)
return class_ == "user"
def load_model(self, lmid):
type_, class_, name = lmid.split(":")
filename = self.get_filename(lmid)
if type_ == "lm":
if class_ == "system":
if pypredict.read_order(filename) == 1:
model = pypredict.UnigramModel()
else:
model = pypredict.DynamicModel()
elif class_ == "user":
model = pypredict.CachedDynamicModel()
elif class_ == "mem":
model = pypredict.DynamicModel()
else:
_logger.error("Unknown class component '{}' in lmid '{}'"
.format(class_, lmid))
return None
else:
_logger.error("Unknown type component '{}' in lmid '{}'"
.format(type_, lmid))
return None
if filename:
self.do_load_model(model, filename, class_)
return model
@staticmethod
def do_load_model(model, filename, class_):
_logger.info("Loading language model '{}'.".format(filename))
if not os.path.exists(filename):
if class_ == "system":
_logger.warning("System language model '{}' "
"doesn't exist, skipping."
.format(filename))
else:
try:
model.load(filename)
except IOError as ex:
if ex.errno is not None: # not n-gram count mismatch
errno = ex.errno
errstr = os.strerror(errno)
msg = _format(
"Failed to load language model '{}': {} ({})",
filename, errstr, errno)
else:
msg = unicode_str(ex)
_logger.error(msg)
model.load_error_msg = msg
if class_ == "user":
_logger.error("Saving word suggestions disabled "
"to prevent further data loss.")
def save_models(self):
for lmid, model in list(self._language_models.items()):
if self.can_save(lmid):
self.save_model(model, lmid)
@staticmethod
def can_save(lmid):
type_, class_, name = lmid.split(":")
return class_ == "user"
def save_model(self, model, lmid):
type_, class_, name = lmid.split(":")
filename = self.get_filename(lmid)
backup_filename = self.get_backup_filename(filename)
if filename and \
model.modified:
if model.load_error:
_logger.warning("Not saving modified language model '{}' "
"due to previous error on load."
.format(filename))
else:
_logger.info("Saving language model '{}'".format(filename))
try:
# create the path
path = os.path.dirname(filename)
XDGDirs.assure_user_dir_exists(path)
if 1:
# save to temp file
basename, ext = os.path.splitext(filename)
tempfile = basename + ".tmp"
model.save(tempfile)
# rename to final file
if os.path.exists(filename):
os.rename(filename, backup_filename)
os.rename(tempfile, filename)
model.modified = False
except (IOError, OSError) as e:
_logger.warning(
"Failed to save language model '{}': {} ({})"
.format(filename, os.strerror(e.errno), e.errno))
@staticmethod
def get_filename(lmid):
type_, class_, name = lmid.split(":")
if class_ == "mem":
filename = ""
else:
if class_ == "system":
path = config.get_system_model_dir()
else: # if class_ == "user":
path = config.get_user_model_dir()
ext = type_
filename = os.path.join(path, name + "." + ext)
return filename
@staticmethod
def get_backup_filename(filename):
return filename + ".bak"
@staticmethod
def get_broken_filename(filename):
"""
Filename broken files are renamed to.
Doctests:
>>> import tempfile
>>> import subprocess
>>> from os.path import basename
>>> td = tempfile.TemporaryDirectory(prefix="test_onboard_")
>>> dir = td.name
>>> fn = os.path.join(dir, "en_US.lm")
>>>
>>> def test(fn):
... bfn = ModelCache.get_broken_filename(fn)
... print(repr(basename(bfn)))
... _ignore = subprocess.call(["touch", bfn])
>>> test(fn) # doctest: +ELLIPSIS
'en_US.lm.broken-..._001'
>>> test(fn) # doctest: +ELLIPSIS
'en_US.lm.broken-..._002'
>>> test(fn) # doctest: +ELLIPSIS
'en_US.lm.broken-..._003'
"""
count = 1
while True:
fn = "{}.broken-{}_{:03}".format(filename,
time.strftime("%Y-%m-%d"),
count)
if not os.path.exists(fn):
break
count += 1
return fn
class AutoSaveTimer(Timer):
""" Auto-save modified language models periodically """
def __init__(self, mode_cache,
interval_min=10 * 60,
interval_max=30 * 60,
postpone_delay=10):
self._model_cache = mode_cache
self._interval_min = interval_min # in seconds
self._interval_max = interval_max # in seconds
self._postpone_delay = postpone_delay
self._interval = self._interval_min # in seconds
self._last_save_time = time.time()
self._pause = 0
self._timer_interval = 5
self.start(self._timer_interval, self._on_timer)
def pause(self, duration=None):
"""
No auto-saving while paused, e.g. during key-press.
"""
self._pause = duration
def resume(self):
"""
Allow auto-saving again.
"""
self._pause = 0
def postpone(self):
"""
Postpone saving a little while the user is still typing.
Helps to mask the delay when saving large models, during which
Onboard briefly becomes unresponsive.
"""
elapsed = time.time() - self._last_save_time
if self._interval < elapsed + self._postpone_delay:
self._interval = elapsed + self._postpone_delay
if self._interval > self._interval_max:
self._interval = self._interval_max
_logger.debug("postponing autosave: current interval {}, "
"elapsed since last save {}"
.format(self._interval, elapsed))
def _on_timer(self):
now = time.time()
elapsed = now - self._last_save_time
if self._interval < elapsed and \
self._pause == 0:
self._last_save_time = now
self._interval = self._interval_min
_logger.debug("auto-saving language models; "
"interval {}, elapsed time {}"
.format(self._interval, elapsed))
self._model_cache.save_models()
if self._pause:
self._pause = max(0, self._pause - self._timer_interval)
return True # run again