Responsive GUI

And now for the hard part of making the widget responsive. We will do this by offloading the learner evaluations into a separate thread.

First read up on threading basics in Qt and in particular the subject of threads and qobjects and how they interact with the Qt’s event loop.

We must also take special care that we can cancel/interrupt our task when the user changes algorithm parameters or removes the widget from the canvas. For that we use a strategy known as cooperative cancellation where we ‘ask’ the pending task to stop executing (in the GUI thread), then in the worker thread periodically check (at known predetermined points) whether we should continue, and if not return early (in our case by raising an exception).

Setting up

We use orangewidget.utils.concurrent.ThreadExecutor for thread allocation/management (but could easily replace it with stdlib’s concurrent.futures.ThreadPoolExecutor).

import concurrent.futures
from orangewidget.utils.concurrent import (
    FutureWatcher, methodinvoke
)

We will reorganize our code to make the learner evaluation an explicit task as we will need to track its progress and state. For this we define a Task class.

class Task:
    """
    A class that will hold the state for an learner evaluation.
    """
    #: A concurrent.futures.Future with our (eventual) results.
    #: The OWLearningCurveC class must fill this field
    future = ...       # type: concurrent.futures.Future

    #: FutureWatcher. Likewise this will be filled by OWLearningCurveC
    watcher = ...      # type: FutureWatcher

    #: True if this evaluation has been cancelled. The OWLearningCurveC
    #: will setup the task execution environment in such a way that this
    #: field will be checked periodically in the worker thread and cancel
    #: the computation if so required. In a sense this is the only
    #: communication channel in the direction from the OWLearningCurve to the
    #: worker thread
    cancelled = False  # type: bool

    def cancel(self):
        """
        Cancel the task.

        Set the `cancelled` field to True and block until the future is done.
        """
        # set cancelled state
        self.cancelled = True
        # cancel the future. Note this succeeds only if the execution has
        # not yet started (see `concurrent.futures.Future.cancel`) ..
        self.future.cancel()
        # ... and wait until computation finishes
        concurrent.futures.wait([self.future])

In the widget’s __init__ we create an instance of the ThreadExector and initialize the task field.

        #: The current evaluating task (if any)
        self._task = None   # type: Optional[Task]
        #: An executor we use to submit learner evaluations into a thread pool
        self._executor = concurrent.futures.ThreadPoolExecutor()

All code snippets are from OWLearningCurveC.py.

Starting a task in a thread

In handleNewSignals we call _update.

    def handleNewSignals(self):
        if len(self.learners):
            self.infob.setText("%d learners on input." % len(self.learners))
        else:
            self.infob.setText("No learners.")

        self.commitBtn.setEnabled(len(self.learners))
        self._update()

And finally the _update function (from OWLearningCurveC.py) that will start/schedule all updates.

    def _update(self):
        if self._task is not None:
            # First make sure any pending tasks are cancelled.
            self.cancel()
        assert self._task is None

        if self.data is None:
            return
        # collect all learners for which results have not yet been computed
        need_update = [(i, item) for (i, item) in enumerate(self.learners)
                       if item.results is None]
        if not need_update:
            self._update_curve_points()
            self._update_table()
            return

At the start we cancel pending tasks if they are not yet completed. It is important to do this, we cannot allow the widget to schedule tasks and then just forget about them. Next we make some checks and return early if there is nothing to be done.

Continue by setting up the learner evaluations as a partial function capturing the necessary arguments:

        learners = [item.learner for _, item in need_update]
        # setup the learner evaluations as partial function capturing
        # the necessary arguments.
        if self.testdata is None:
            learning_curve_func = partial(
                learning_curve,
                learners, self.data, folds=self.folds,
                proportions=self.curvePoints,
            )
        else:
            learning_curve_func = partial(
                learning_curve_with_test_data,
                learners, self.data, self.testdata, times=self.folds,
                proportions=self.curvePoints,
            )

Setup the task state and the communication between the main and worker thread. The only state flowing from the GUI to the worker thread is the task.cancelled field which is a simple trip wire causing the learning_curve’s callback argument to raise an exception. In the other direction we report the percent of work done.

        # setup the task state
        self._task = task = Task()
        # The learning_curve[_with_test_data] also takes a callback function
        # to report the progress. We instrument this callback to both invoke
        # the appropriate slots on this widget for reporting the progress
        # (in a thread safe manner) and to implement cooperative cancellation.
        set_progress = methodinvoke(self, "setProgressValue", (float,))

        def callback(finished):
            # check if the task has been cancelled and raise an exception
            # from within. This 'strategy' can only be used with code that
            # properly cleans up after itself in the case of an exception
            # (does not leave any global locks, opened file descriptors, ...)
            if task.cancelled:
                raise KeyboardInterrupt()
            set_progress(finished * 100)

        # capture the callback in the partial function
        learning_curve_func = partial(learning_curve_func, callback=callback)

See also

progressBarInit(), progressBarSet(), progressBarFinished()

Next, we submit the function to be run in a worker thread and instrument a FutureWatcher instance to notify us when the task completes (via a _task_finished slot).

        self.progressBarInit()
        # Submit the evaluation function to the executor and fill in the
        # task with the resultant Future.
        task.future = self._executor.submit(learning_curve_func)
        # Setup the FutureWatcher to notify us of completion
        task.watcher = FutureWatcher(task.future)
        # by using FutureWatcher we ensure `_task_finished` slot will be
        # called from the main GUI thread by the Qt's event loop
        task.watcher.done.connect(self._task_finished)

For the above code to work, the setProgressValue needs defined as a pyqtSlot.

    @pyqtSlot(float)
    def setProgressValue(self, value):
        assert self.thread() is QThread.currentThread()
        self.progressBarSet(value)

Collecting results

In _task_finished (from OWLearningCurveC.py) we handle the completed task (either success or failure) and then update the displayed score table.

    @pyqtSlot(concurrent.futures.Future)
    def _task_finished(self, f):
        """
        Parameters
        ----------
        f : Future
            The future instance holding the result of learner evaluation.
        """
        assert self.thread() is QThread.currentThread()
        assert self._task is not None
        assert self._task.future is f
        assert f.done()

        self._task = None
        self.progressBarFinished()

        try:
            results = f.result()  # type: List[Results]
        except Exception as ex:
            # Log the exception with a traceback
            log = logging.getLogger()
            log.exception(__name__, exc_info=True)
            self.error("Exception occurred during evaluation: {!r}"
                       .format(ex))
            # clear all results
            for item in self.learners:
                item.results = None
        else:
            # split the combined result into per learner/model results ...
            results = [list(Results.split_by_model(p_results))
                       for p_results in results]  # type: List[List[Results]]
            assert all(len(r.learners) == 1 for r1 in results for r in r1)
            assert len(results) == len(self.curvePoints)

            learners = [r.learners[0] for r in results[0]]
            # map learner back to LearnerData instance
            data_by_learner = {item.learner: item for item in self.learners}
            # ... and update self.results
            for i, learner in enumerate(learners):
                item = data_by_learner[learner]
                item.results = [p_results[i] for p_results in results]
        # update the display
        self._update_curve_points()
        self._update_table()

Stopping

Also of interest is the cancel method. Note that we also disconnect the _task_finished slot so that _task_finished does not receive stale results.

    def cancel(self):
        """
        Cancel the current task (if any).
        """
        if self._task is not None:
            self._task.cancel()
            assert self._task.future.done()
            # disconnect the `_task_finished` slot
            self._task.watcher.done.disconnect(self._task_finished)
            self._task = None
            self.progressBarFinished()

We also use cancel in onDeleteWidget() to stop if/when the widget is removed from the canvas.

    def onDeleteWidget(self):
        self.cancel()
        super().onDeleteWidget()