Source code for msl.qt.threading

"""
Base classes for starting a process in a new :class:`~QtCore.QThread`.
"""
import sys
import traceback as tb

from . import QtCore
from . import Signal
from . import prompt


[docs]class Worker(QtCore.QObject): finished = Signal() error = Signal(BaseException, object) # (exception, traceback) def __init__(self, *args, **kwargs): """Process an expensive or blocking operation in a thread that is separate from the main thread. You can access to the attributes of the :class:`~msl.qt.threading.Worker` as though they are attributes of the :class:`~msl.qt.threading.Thread`. The ``*args`` and ``**kwargs`` parameters are passed to the constructor of the :class:`~msl.qt.threading.Worker` when the :meth:`~msl.qt.threading.Thread.start` method is called. Example ------- See :class:`~msl.qt.sleep.SleepWorker` for an example of a :class:`~msl.qt.threading.Worker`. """ super(Worker, self).__init__()
[docs] def process(self): """The expensive or blocking operation to process. .. attention:: You must override this method. """ raise NotImplementedError("You must override the 'process' method.")
def _process(self): try: self.process() except: # noqa: using bare 'except' self.error.emit(*sys.exc_info()[1:]) else: self.finished.emit()
[docs]class Thread(QtCore.QObject): finished = Signal() """A :class:`~QtCore.Signal` that is emitted when the thread is finished (i.e., when :meth:`~msl.qt.threading.Worker.process` finishes).""" def __init__(self, worker): """Moves a :class:`~msl.qt.threading.Worker` to a new :class:`~QtCore.QThread`. Parameters ---------- worker A :class:`~msl.qt.threading.Worker` subclass that has *not* been instantiated. Example ------- See :class:`~msl.qt.sleep.Sleep` for an example of a :class:`~msl.qt.threading.Thread`. """ super(Thread, self).__init__() self._thread = None self._worker = None self._finished = False self._signals_slots = [] if not callable(worker): raise TypeError('The Worker for the Thread must not be instantiated') elif not issubclass(worker, Worker): raise TypeError('The Worker for the Thread is not a Worker subclass') else: self._worker_class = worker
[docs] def __getattr__(self, item): """All other attributes are assumed to be those of the :class:`~msl.qt.threading.Worker`.""" if self._worker is not None: return getattr(self._worker, item) raise AttributeError('You must start the Thread before accessing an attribute of the Worker')
[docs] def error_handler(self, exception, traceback): """If an exception is raised by the :class:`~msl.qt.threading.Worker` then the default behaviour is to show the error message in a :func:`~msl.qt.prompt.critical` dialog window. You can override this method to implement your own error handler. Parameters ---------- exception : :exc:`BaseException` The exception instance traceback : :mod:`traceback` A traceback object. """ prompt.critical(''.join(tb.format_exception(type(exception), exception, traceback)))
[docs] def is_finished(self): """Whether the thread is finished. Returns ------- :class:`bool` :data:`True` if the thread finished otherwise :data:`False`. """ if self._thread is None: return self._finished return bool(self._check(self._thread.isFinished))
[docs] def is_running(self): """Whether the thread is running. Returns ------- :class:`bool` :data:`True` if the thread is running otherwise :data:`False`. """ if self._thread is None: return False return bool(self._check(self._thread.isRunning))
[docs] def quit(self): """Tells the thread's event loop to exit.""" if self._thread is not None: self._check(self._thread.quit)
[docs] def start(self, *args, **kwargs): """Start processing the :class:`~msl.qt.threading.Worker` in a :class:`~QtCore.QThread`. The ``*args`` and ``**kwargs`` are passed to the constructor of the :class:`~msl.qt.threading.Worker` class. """ self._thread = QtCore.QThread() self._worker = self._worker_class(*args, **kwargs) self._worker.moveToThread(self._thread) for signal, slot in self._signals_slots: getattr(self._worker, signal).connect(slot) self._worker.error.connect(lambda *ignore: self._thread.exit(-1)) self._worker.error.connect(self.error_handler) self._worker.finished.connect(self._worker_finished) self._worker.finished.connect(self._worker.deleteLater) self._thread.started.connect(self._worker._process) # noqa self._thread.finished.connect(self._thread.deleteLater) # noqa: finished.connect self._thread.finished.connect(self.finished) # noqa: finished.connect self._thread.start()
[docs] def stop(self, milliseconds=None): """Calls :meth:`.quit` then :meth:`.wait`.""" self.quit() return self.wait(milliseconds=milliseconds)
[docs] def wait(self, milliseconds=None): """Wait for the thread to either finish or timeout. Parameters ---------- milliseconds : :class:`int`, optional The number of milliseconds to wait before a timeout occurs. If :data:`None` then this method will never time out and the thread must return from its `run` method. Returns ------- :class:`bool` or :data:`None` :data:`True` if the thread finished, :data:`False` if the thread timed out or :data:`None` if the thread is not running. """ if self._thread is None: return None if milliseconds is None: return self._check(self._thread.wait) return self._check(self._thread.wait, int(milliseconds))
[docs] def worker_connect(self, signal, slot): """Connect a :class:`~QtCore.Signal` from the :class:`~msl.qt.threading.Worker` to a :class:`~QtCore.Slot`. This method is intended to be called *before* a worker thread starts. Although, you can still call this method when a worker thread is running, it is easier (fewer characters to type) to access the attributes of a :class:`~msl.qt.threading.Worker` as though they are attributes of the :class:`~msl.qt.threading.Thread`. Parameters ---------- signal : :class:`str`, :class:`~QtCore.Signal` or :class:`~PySide6.QtCore.SignalInstance` The `signal` to connect the `slot` to. If a :class:`str`, then either the name of a class attribute of the :class:`~msl.qt.threading.Worker` or the `name` parameter that was used in the :class:`~QtCore.Signal` constructor. slot A callable function to use as the :class:`~QtCore.Slot`. """ signal, slot = Thread._check_signal_slot(signal, slot) if self.is_running(): getattr(self._worker, signal).connect(slot) else: self._signals_slots.append((signal, slot))
[docs] def worker_disconnect(self, signal, slot): """Disconnect a :class:`~QtCore.Slot` from a :class:`~QtCore.Signal` of the :class:`~msl.qt.threading.Worker`. This method is intended to be called *before* a worker thread is restarted. Although, you can still call this method when a worker thread is running, it is easier (fewer characters to type) to access the attributes of a :class:`~msl.qt.threading.Worker` as though they are attributes of the :class:`~msl.qt.threading.Thread`. Parameters ---------- signal : :class:`str`, :class:`~QtCore.Signal` or :class:`~PySide6.QtCore.SignalInstance` The `signal` to disconnect the `slot` from. Must be the same value that was used in :meth:`worker_connect`. slot Must be the same callable that was used in :meth:`worker_connect`. """ signal, slot = Thread._check_signal_slot(signal, slot) if self.is_running(): getattr(self._worker, signal).disconnect(slot) else: try: self._signals_slots.remove((signal, slot)) except ValueError: options = '\n'.join(f' {a!r} {b}' for a, b in self._signals_slots) if not options: raise ValueError( 'No Worker signals were connected to slots') from None raise ValueError( f'The slot {slot} is not connected to the signal ' f'{signal!r}.\nOptions\n{options}') from None
@staticmethod def _check_signal_slot(signal, slot): """Check the input types and returns the appropriate types.""" if not callable(slot): raise TypeError('slot must be callable') if isinstance(signal, str): return signal, slot if isinstance(signal, Signal): if hasattr(signal, 'signatures'): # PyQt signal = signal.signatures[0] if '(' not in signal: raise TypeError( 'Cannot determine the Signal name. Either pass in ' 'the Signal name as a string or define a name ' 'parameter in the Signal constructor.') return str(signal).split('(')[0], slot raise TypeError('signal must be a QtCore.Signal or string') def _check(self, method, *args): """Wrap all calls to the QThread in a try-except block to silently ignore the following error: RuntimeError: Internal C++ object (*.QtCore.QThread) already deleted. """ try: return method(*args) except RuntimeError: self._thread = None def _worker_finished(self): """Slot -> Called when the :class:`~msl.qt.threading.Worker` finished successfully.""" self._thread.quit() self._finished = True