DRMacIver's Notebook
Temporary processing loops as a sometimes replacement for background threads
Temporary processing loops as a sometimes replacement for background threads
There’s a trick I’ve used twice now, and I figure any trick worth using twice is worth writing up. I’ve never seen anyone else using it, possibly because it’s not actually very useful, or is secretly a bad idea for reasons that I’m unaware of. It’s obvious enough that I’d be surprised if it was original to me, but I also expect most other people haven’t seen it either.
The basic idea is this: Suppose you have a bunch of tasks running in different threads, and you need some sort of background thread running to keep them happy. For example:
- In shrinkray the tasks are attempts to apply patches to a test case, and the background thread is a sort of “merge queue” which is responsible for trying to combine successful patches together.
- In a recent project, I have a number of communication channels, and messages coming in from them on a single connection, and the background thread is reading those messages, figuring out where they need to go, and dispatching them to a queue for the right channel.
…those are actually the only two examples I have right now. I could probably imagine more, but those are the ones I’ve concretely tried this in.
In any case, in both of these the background thread was sortof a pain in the ass. In the shrinkray case, there was a bunch of lifecycle management I had to worry about (the “thread” was actually a trio task, and in some of these use cases it was annoying to scope it to a nursery). In the messages case, it was viable, but it was difficult to debug and I wanted it to work in a language with kinda shit threading, so I’d rather not use a background thread if I didn’t have to.
Anyway, there turns out to be a common trick: In both of these cases, we are doing a thing in the calling thread, and that thing will return back to us only once the background thread has got to processing our particular need.
- In shrinkray, we only care that the merge queue has run for long enough to either accept or reject the patch.
- In the message processing case, we only care that the dispatcher has run for long enough that there is at least one message in our inbox.
As a result, in both of these cases, we are essentially blocking until the dispatcher thread has got to our particular need. Which means there doesn’t need to be a dispatcher thread at all - we can just temporarily become it. Look at me, I’m the dispatcher thread now.
let’s look at some pseudocode for this. Here is how our message dispatcher might work with a background thread:
channels: dict[str, SimpleQueue] = {}
def run_dispatcher():
while True:
msg = get_message()
channnels[msg.id].put(msg)
Thread(target=run_dispatcher).start()
def get_messsage(id: str):
return channels[id].get()And with inline processing:
channels: dict[str, SimpleQueue] = {}
lock = Lock()
def get_messsage(id: str):
queue = channels[id]
while queue.empty():
with lock:
msg = get_message()
channnels[msg.id].put(msg)
return queue.get()The shrinkray one looks a bit different because the merge queue operates on many patches at a time rather than being single message like the dispatcher case, but is basically the same principle: Check if we need to become the merge thread, if we do start doing that until our patch is merged or rejected, and if not just wait on the (guaranteed to be running) merge thread until we get to that point.
Do you need this trick? No, probably not. But it has solved a genuine need for me twice, so maybe you’ll be the third time it was useful.