Worker and main thread in wxPython

All of the GUI toolkits that I have used only allowed updates to GUI elements from the main thread. In order to affect the GUI from another thread, the toolkit provides a safe way to post a message or event from another thread which will be handled by the main thread. The first version of Watcher was single-threaded. Watcher uses timers to determine when to do things like fetch the next image for display or refresh its view of the RSS feed. That first version of Watcher did all of its processing in those event handlers. This meant image and feed fetching happened in the main thread.

While the fetch was happening, the main thread was not processing events. This meant it ignored repaint and mouse events. This kind of things makes the app frustrating to use since it just stops responding. Most of the time, it wasn’t annoying enough for me to want to fix, since I associate a pretty high cognative cost with adding more threads to my applications and most of the time the network i/o was fast enough for me not to notice the hanging.

Eventually, though, if you use the Web long enough you will hit a server’s nonresponsive time and in this situation the hanging was really, really annoying.

I needed to move the fetches out of the main thread. The basic pattern is the same for any GUI toolkit — some event handlers starts worker thread processing on something and then when the worker thread finishes or needs to send status updates to the GUI it posts an event back to the GUI’s event loop. For wxPython, there is an example of this in the demo application.

I wrote a little mix-in class to make it easier for me to add this functionality (and, more importantly, attempt to hide some of the threading goo from) to the two places in Watcher where it did potentially long-running operations — the RSS feed refresh and the image fetch and display code.

import threading
from Queue import Queue
import wx
import wx.lib
import traceback
(ThreadedResultEvent, EVT_THREAD_RESULT) = wx.lib.newevent.NewEvent()
class AsyncIOWindow:
    def __init__(self):
        self.Bind(EVT_THREAD_RESULT, self.OnThreadedResultEvent)
        self._ioQueue = Queue()
        self._ioThreadRun = True
        self._ioThread = threading.Thread(target=self.ioThreadRun)
        self._ioThread.start()

    def OnThreadedResultEvent(self, event):
        event.func(*event.args, **event.kwargs)
    def ioReport(self, func, *args, **kwargs):
        """Call func with args/kwargs in the GUI thread by posting
           an event for OnThreadedEventResult to handle and dispatch."""
        wx.PostEvent(self,
                     ThreadedResultEvent(func=func,
                                         args=args,
                                         kwargs=kwargs))
    def ioThreadRun(self):
        while self._ioThreadRun:
            work = self._ioQueue.get(True)
            if not work:
                self._ioThreadRun = False
            else:
                func, args, kwargs, resultfunc = work
                success = False
                try:
                    try:                    
                        result = func(*args, **kwargs)
                        success = True
                        self.ioReport(resultfunc, result, *args, **kwargs)
                    except:
                        traceback.print_exc()  # FIXME
                finally:
                    if not success:
                        self.ioReport(resultfunc, None, *args, **kwargs)

    def ioCall(self, resultfunc, func, *args, **kwargs):
        """Ask the worker thread to work on func(*args, **kwargs) and call
           resultfunc(rval, *args, **kwargs) when func returns."""        
        self._ioQueue.put((func, args, kwargs, resultfunc))
    def ioFinish(self):
        """Call this to shut down the worker thread, e.g. when your window is closing."""
        self._ioQueue.put(None)  # signal the io thread to quit 
        self._ioThread.join(5.0) # wait a little while for things to finish

If this was correct, ioFinish would have some mechanism for dealing with the situation when the ioThread does not in fact exit after the 5.0 seconds given to it to exit. Oh well.

So how is this used?

Let’s look at relevant bits of a method in watcher.py’s ControlFrame which implements the main window of Watcher. This method is called by a timer event when it’s time to refresh the feed.

    def DoFetchFeed(self):
        [ .. make sure only one fetch is ever in progress at once .. ]
        # we're still in the main thread, so it's safe to call statusMsg directly
        # which will update the status bar
        self.statusMsg("Fetching feed...") 
        # note the def here -- we're defining a function in the scope of this 
        # DoFetchFeed method
        def doFetchFeed():
            [ .. prepare lastmodified and etag info for use .. ]
            rss = feedparser.parse(self.preferences.feedUrl,
                                       etag=p.rssEtag,
                                       modified=lastmod)
            [ .... handle the feed fetch failing or not being difference since last fetch .. ]
            # note this will be in the worker thread, so we have to use ioReport to 
            # call statusMsg -- this will call self.statusMsg("Processing feed...") in 
            # the main thread
            self.ioReport(self.statusMsg, "Processing feed...")
            [ .. parse the feed, look for new photos, etc .. ]
            self.ioReport(self.statusMsg, 'Processing complete.')
            [ .. update last modified info .. ]
            return result   # this will be a list of the new photos (may be empty)
         # now define a function to be called in the main thread once the worker
         # is finished
        def handleFetch(result):
            self.statusMsg("Done (%d new)" % len(result))
            [ ... handle displaying the new photos .. ]
        # now use ioCall to call out to the worker thread to run doFetchFeed and have it
        # call back to handleFetch when doFetchFeed returns done
        self.ioCall(handleFetch, doFetchFeed)