Grand Central Dispatch is a douchebag

I’ve always had trouble with writing code for multithreading; I could get threading well enough for other purposes (e.g. real-time applications), but introducing parallelism for the specific purpose of exploiting execution resources changed the problem space enough that I had little handle on it. Now thanks to my recent completion of a project that was amenable to multithreading, I have… less trouble with this code.

So here is what I can share of my learnings; this is centered on Apple platforms, but also has wider applications.

A word of warning: technically, not much has changed with Swift’s support for concurrency since 2014: the behavior of concurrently executing Swift code is not specified at all, and that is barely starting to change with structured concurrency. That being said, when it comes to providing such sample code the Swift language is better suited, and is more future-proof anyway, allowing us to glimpse possible futures; so this article will use Swift.

But if there is no specification, what are the rules then? It would appear that, if you don’t access either variables declared var, or class instances, from multiple threads concurrently, you will be fine. Note that the prohibition includes concurrent access to struct copies that end up referring to the same class instance, unless the struct is a well-known object (like Array) which has special handling to support this use case.

The commandments for efficient multithreading

Even if I did not create it for the day job, I don’t want to share my project yet either; instead I synthesized an example for the purposes of this post. Let us say I have this processing, here in a completely serial form, that I intend to refactor to run in a multithreaded fashion:

//
//  Created by Pierre Lebeaupin on 13/12/2020.
//  Copyright © 2020 Pierre Lebeaupin. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

import Foundation;

enum Op: String {
    case additive = "+", multiplicative = "*";

    func neutralValue() -> UInt32 {
        return ((self == .additive) ? 0 : 1);
    }

    func combine(_ a: UInt32, _ b: UInt32) -> UInt32 {
        return ((self == .additive) ? (a + b) : (a * b));
    }

    func uncombine(_ a: UInt32, _ b: UInt32) -> UInt32 {
        return ((self == .additive) ? (a - b) : (a / b));
    }
}


struct ReadyValue {
    let value: UInt32;
    let op: Op?;
    let description: () -> String;
    // Note that as a closure, it can depend on more than
    // the stored properties, contrary to a computed
    // property.

    init(_ inValue: UInt32) {
        value = inValue;
        op = nil;
        description = { return inValue.description;};
    }

    init(value inValue: UInt32,
         op inOp: Op,
         description indescription: @escaping () -> String) {
        value = inValue;
        op = inOp;
        description = indescription;
    }
}

func resolve(_ startGoal: UInt32,
             _ incb: @escaping (_ result: String) -> Void,
             _ primitives: UInt32...)
{
    var l = [ReadyValue]();
    var composedCount = 0;
    var otherComposedCount = 0;

    for element in primitives {
        l.append(ReadyValue(element));
    }

    {
    exploreAdditionOfRightNode(kind: .additive);
    exploreAdditionOfRightNode(kind: .multiplicative);
    }(); // https://bugs.swift.org/browse/SR-12243

    func exploreAdditionOfRightNode(kind: Op) {
        if (l.count != 0) && (l[l.count - 1].op != kind) {
            guard otherComposedCount == 0 else {
                return;
            }

            otherComposedCount = composedCount;
            composedCount = 0;
        }
        defer {
            if (l.count != 0) && (l[l.count - 1].op != kind) {
                composedCount = otherComposedCount;
                otherComposedCount = 0;
            }
        }

        var decomposedValue = [false: kind.neutralValue(),
                               true: kind.neutralValue()];

        {
        iteratePossibleLeftNodes(startingFrom: 0, { _ in return;});
        }(); // https://bugs.swift.org/browse/SR-12243

        func iteratePossibleLeftNodes(startingFrom: Int,
                                      _ walkOperands: @escaping
                                        (_ action: (_ value: ReadyValue,
                                                    _ reverse: Bool)
                                            -> Void)
                                        -> Void) {
            for candidateOffset in
                startingFrom ..< (l.count - composedCount) {
                let rightChild = l.remove(at: candidateOffset);
                if let _ = rightChild.op {
                    otherComposedCount -= 1;
                }
                defer {
                    if let _ = rightChild.op {
                        otherComposedCount += 1;
                    }
                    l.insert(rightChild, at: candidateOffset);
                }

                for phase in 0...1 {
                    let reverse = (phase == 1);
                    { (_ valueComponent: inout UInt32) in
                        valueComponent = kind.combine(valueComponent,
                                                      rightChild.value);
                    }(&decomposedValue[reverse]!);
                    defer {
                        { (_ valueComponent: inout UInt32) in
                            valueComponent = kind.uncombine(valueComponent,
                                                            rightChild.value);
                        }(&decomposedValue[reverse]!);
                    }

                    let selfNode = {(_ action: (_ value: ReadyValue,
                                                _ reverse: Bool)
                                        -> Void)
                        -> Void in
                        action(rightChild, reverse);
                        walkOperands(action);
                    };

                    iteratePossibleLeftNodes(startingFrom: candidateOffset,
                                             selfNode);

                    // close current composition
                    guard ({
                        var num = 0;
                        selfNode({_,_ in num += 1;});
                        return num;
                    }() > 1) && ( (kind == .additive)
                                    ? decomposedValue[false]!
                                       > decomposedValue[true]!
                                    : ((decomposedValue[false]!
                                       % decomposedValue[true]!) == 0) ) else {
                        continue;
                    }

                    let realizedValue = kind.uncombine(decomposedValue[false]!,
                                                       decomposedValue[true]!);
                    let description = { () -> String in
                        var current = "(";
                        selfNode({(_ value: ReadyValue,
                                   _ freverse: Bool) -> Void in
                            current += " ";
                            current += (freverse
                                            ? (kind == .additive ? "-" : "/")
                                            : kind.rawValue);
                            current += " ";
                            current += value.description();
                        });
                        current += ")";

                        return current;
                    };

                    guard l.count > 0 else {
                        if realizedValue == startGoal {
                            incb(description());
                        }
                        continue;
                    }

                    composedCount += 1;
                    l.append(ReadyValue(value: realizedValue,
                                        op: kind,
                                        description: description));
                    defer {
                        l.remove(at: l.count - 1);
                        composedCount -= 1;
                    }

                    exploreAdditionOfRightNode(kind: .additive);
                    exploreAdditionOfRightNode(kind: .multiplicative);
                }
            }
        }

    }
}

(Try it out, e.g. with resolve(70, {print($0);}, 5, 4, 3, 2, 2). You’ll note it changed a bit from the version in the teasers: since then I discovered nested functions could work just as well as anonymous closures to capture variables, while avoiding any reference cycle issue).

What is it going to take to make it work in a multithreaded fashion?

  • First and foremost, most of the processing has to occur as part of a function that performs a chunk of the job, and then returns, ready to be called to perform the next chunk. Just say no to processing that is a single function call that only returns when the whole task is done: that is unacceptable for many, many reasons as we’ll soon see.

While recommendations for parallel processing have varied, shall we say, this one has remained useful for decades: for instance, that was already useful when you wanted your processing to occur asynchronously in traditional MacOS (where your processing had to occur in what was known as “interrupt time”). And parallel code is, more than anything else, asynchronous code — of which there just happens to be multiple instances.

That being established, how big should that individual chunk of work be?

  • Aim for a single chunk of processing to complete in about a tick (1/60th of a second) on a single instance of the slowest target core: the aim is for the overhead of pausing the work on the chunk (by returning), then resuming, to be amortized over useful processing that is as long as possible while still appearing instant to the user when cancelling.
  • Don’t bother making chunks bigger on faster machines just because you can fit more work on the same time period: with a machine-independent chunk size, what is a good enough amortization on the slowest target machine will also be good enough on other machines.
  • All that implies being able to roughly estimate the workload for a given amount of data, e.g. if work is coming in as packets of data received from the network. In fact, if short enough it might not be worth sending to the thread pool and better processed synchronously, or kept around and aggregated. But the latter may not always be possible (e.g. if your server is user interactive like a telnet server or another situation where you would legitimately disable Nagle’s algorithm).
  • Do not overcommit chunks of work. Some parallel processing APIs seem to tell you “just expose all your parallelism to us, we’ll sort it out”, but what do you do if and when you need to cancel the processing? You could tell the scheduling object to stop launching previously enqueued tasks, assuming that’s even possible, but that requires the tasks to have been scheduled on a specific object rather than the global one, adding to the complexity. And don’t even think about checking at the start of the chunk whether processing has been canceled: all your neatly parallel chunks would need to start with entering a bottleneck (a mutex or some such) just to make that determination… that makes no sense.
  • Do not start “ongoing” tasks either. Again, they are unacceptable when it comes to cancellation, unless you check that at regular intervals, again introducing bottlenecks in your otherwise neatly parallel tasks, which will confuse the heck out of the parallel processing API. More on that later.
  • And in a related fashion, do not pre-partition the work at the start to have only as many partitions as there are local processing resources, expecting them to complete simultaneously: besides the traditional issues with that (what happens if there is a high-priority task taking up a core in the background? One of the partitions will not get a chance to start until another one completes, and you’ll have wait for that late comer, doubling processing time), there is now the issue of heterogeneous multiprocessing on the Apple silicon Macs released so far, where some cores are more sober than others.
  • Instead, arrange for a single object to be able to dole out chunks of work to perform on request, also known as work stealing; no problem if that object has to be protected by a mutex itself or if the doling out has to otherwise be performed serially: it’s presumably not the part that is going to be a performance concern, and we will see how to have that bit of serialization without confusing GCD. Also, no problem if extracting and spawning such a chunk of work takes far less that a tick: this is not the part of the job where you want to amortize overhead.
  • For now, our best bet is to start off by scheduling only as many chunks as there are local processing resources (on the Mac, use [NS]ProcessInfo.processInfo.activeProcessorCount), let us call that N; then whenever a chunk completes, it returns (as discussed), but not prior to having scheduled on a serial queue (which I call the reference queue)… a request for the “doling out” object to provide a chunk of work and schedule it. In fact, the initial scheduling is best performed by scheduling N such requests on the serial queue.

Here is how I apply that to our example:

//
//  Created by Pierre Lebeaupin on 13/12/2020.
//  Copyright © 2020 Pierre Lebeaupin. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

import Foundation;

enum Op: String {
    case additive = "+", multiplicative = "*";

    func neutralValue() -> UInt32 {
        return ((self == .additive) ? 0 : 1);
    }

    func combine(_ a: UInt32, _ b: UInt32) -> UInt32 {
        return ((self == .additive) ? (a + b) : (a * b));
    }

    func uncombine(_ a: UInt32, _ b: UInt32) -> UInt32 {
        return ((self == .additive) ? (a - b) : (a / b));
    }
}


struct ReadyValue {
    let value: UInt32;
    let op: Op?;
    let description: () -> String;
    // Note that as a closure, it can depend on more than
    // the stored properties, contrary to a computed
    // property.

    init(_ inValue: UInt32) {
        value = inValue;
        op = nil;
        description = { return inValue.description;};
    }

    init(value inValue: UInt32,
         op inOp: Op,
         description indescription: @escaping () -> String) {
        value = inValue;
        op = inOp;
        description = indescription;
    }
}

class ResolveObject {
    let semaphoreWaitingHost: Thread;
    // Using a thread, as the function is going to spend
    // its time blocked in a semaphore, so better directly
    // use a thread than gum up the libdispatch machinery.

    init(_ startGoal: UInt32,
         _ inqueue: DispatchQueue,
         _ incb: @escaping (_ result: String) -> Void,
         _ completionCb: @escaping () -> Void,
         _ primitives: UInt32...) {
        semaphoreWaitingHost = Thread(block:{
            let /* 's */ avoidOvercommitment =
                DispatchSemaphore(value:
                ProcessInfo.processInfo.activeProcessorCount - 1);
            // We take one from the get go

            let completion = DispatchGroup();
            completion.enter();
            // Compensated when the thread exits

            completion.notify(queue: inqueue,
                              execute: completionCb);

            {
                var referenceL = [ReadyValue]();
                var referenceComposedCount = 0;
                var referenceOtherComposedCount = 0;


                for element in primitives {
                    referenceL.append(ReadyValue(element));
                }

                exploreAdditionOfRightNode(&referenceL,
                                           &referenceComposedCount,
                                           &referenceOtherComposedCount,
                                           isParent: true,
                                           kind: .additive);
                exploreAdditionOfRightNode(&referenceL,
                                           &referenceComposedCount,
                                           &referenceOtherComposedCount,
                                           isParent: true,
                                           kind: .multiplicative);
            }(); // https://bugs.swift.org/browse/SR-12243


            func exploreAdditionOfRightNode(_ l: inout [ReadyValue],
                                            _ composedCount: inout Int,
                                            _ otherComposedCount: inout Int,
                                            isParent: Bool,
                                            kind: Op) {
                if (l.count != 0) && (l[l.count - 1].op != kind) {
                    guard otherComposedCount == 0 else {
                        return;
                    }

                    otherComposedCount = composedCount;
                    composedCount = 0;
                }
                defer {
                    if (l.count != 0) && (l[l.count - 1].op != kind) {
                        composedCount = otherComposedCount;
                        otherComposedCount = 0;
                    }
                }

                var referenceDecomposedValue = [false: kind.neutralValue(),
                                                true: kind.neutralValue()];

                iteratePossibleLeftNodesDispatch(&l,
                                                 &composedCount,
                                                 &otherComposedCount,
                                                 &referenceDecomposedValue,
                                                 isParent: isParent,
                                                 kind: kind,
                                                 startingFrom: 0,
                                                 { _ in return;});
            }

            func iteratePossibleLeftNodesDispatch(_ l: inout [ReadyValue],
                                                  _ composedCount: inout Int,
                                                  _ otherComposedCount: inout Int,
                                                  _ decomposedValue: inout [Bool:UInt32],
                                                  isParent: Bool,
                                                  kind: Op,
                                                  startingFrom: Int,
                                                  _ walkOperands: @escaping
                                                    (_ action:
                                                        (_ value: ReadyValue,
                                                         _ reverse: Bool)
                                                        -> Void)
                                                    -> Void) {
                guard isParent else {
                    return iteratePossibleLeftNodes(&l,
                                                    &composedCount,
                                                    &otherComposedCount,
                                                    &decomposedValue,
                                                    isParent: isParent,
                                                    kind: kind,
                                                    startingFrom: startingFrom,
                                                    walkOperands);
                }

                var imminentlyViableOrBetter = false;
                walkOperands({_,_ in imminentlyViableOrBetter = true;});

                let workloadEstimator = l.count + (imminentlyViableOrBetter ? 1 : 0);
                /* Among other properties, this estimator value is monotonic. */

                // 6 may be too many to fit in a tick (1/60th of a second)
                // 4 already means too few possibilities to explore
                // 3 is right out
                if workloadEstimator == 5 {
                    // reseat this divergence over a copy of the whole state
                    var childL = l;
                    var childComposedCount = composedCount;
                    var childOtherComposedCount = otherComposedCount;
                    var childDecomposedValue = decomposedValue;

                    DispatchQueue.global(qos:.userInitiated).async(group: completion) {
                        iteratePossibleLeftNodes(&childL,
                                                 &childComposedCount,
                                                 &childOtherComposedCount,
                                                 &childDecomposedValue,
                                                 isParent: false,
                                                 kind: kind,
                                                 startingFrom: startingFrom,
                                                 walkOperands);

                        avoidOvercommitment.signal();
                    }

                    avoidOvercommitment.wait();
                } else {
                    return iteratePossibleLeftNodes(&l,
                                                    &composedCount,
                                                    &otherComposedCount,
                                                    &decomposedValue,
                                                    isParent: isParent,
                                                    kind: kind,
                                                    startingFrom: startingFrom,
                                                    walkOperands);
                }
            }

            func iteratePossibleLeftNodes(_ l: inout [ReadyValue],
                                          _ composedCount: inout Int,
                                          _ otherComposedCount: inout Int,
                                          _ decomposedValue: inout [Bool:UInt32],
                                          isParent: Bool,
                                          kind: Op,
                                          startingFrom: Int,
                                          _ walkOperands: @escaping
                                            (_ action:
                                                (_ value: ReadyValue,
                                                 _ reverse: Bool)
                                                -> Void)
                                            -> Void) {
                for candidateOffset in
                    startingFrom ..< (l.count - composedCount) {
                    let rightChild = l.remove(at: candidateOffset);
                    if let _ = rightChild.op {
                        otherComposedCount -= 1;
                    }
                    defer {
                        if let _ = rightChild.op {
                            otherComposedCount += 1;
                        }
                        l.insert(rightChild, at: candidateOffset);
                    }

                    for phase in 0...1 {
                        let reverse = (phase == 1);
                        { (_ valueComponent: inout UInt32) in
                            valueComponent = kind.combine(valueComponent,
                                                          rightChild.value);
                        }(&decomposedValue[reverse]!);
                        defer {
                            { (_ valueComponent: inout UInt32) in
                                valueComponent = kind.uncombine(valueComponent,
                                                                rightChild.value);
                            }(&decomposedValue[reverse]!);
                        }

                        let selfNode = {(_ action:
                                            (_ value: ReadyValue,
                                             _ reverse: Bool)
                                            -> Void)
                                         -> Void in
                            action(rightChild, reverse);
                            walkOperands(action);
                        };

                        iteratePossibleLeftNodesDispatch(&l,
                                                         &composedCount,
                                                         &otherComposedCount,
                                                         &decomposedValue,
                                                         isParent: isParent,
                                                         kind: kind,
                                                         startingFrom: candidateOffset,
                                                         selfNode);

                        // close current composition
                        guard ({
                            var num = 0;
                            selfNode({_,_ in num += 1;});
                            return num;
                        }() > 1) && ( (kind == .additive)
                                        ? decomposedValue[false]!
                                           > decomposedValue[true]!
                                        : ((decomposedValue[false]!
                                            % decomposedValue[true]!) == 0) ) else {
                            continue;
                        }

                        let realizedValue = kind.uncombine(decomposedValue[false]!,
                                                           decomposedValue[true]!);
                        let description = { () -> String in
                            var current = "(";
                            selfNode({(_ value: ReadyValue, _ freverse: Bool) -> Void in
                                current += " ";
                                current += (freverse
                                                ? (kind == .additive ? "-" : "/")
                                                : kind.rawValue);
                                current += " ";
                                current += value.description();
                            });
                            current += ")";

                            return current;
                        };

                        guard l.count > 0 else {
                            if realizedValue == startGoal {
                                let solution = description();
                                inqueue.async { incb(solution); };
                            }
                            continue;
                        }

                        composedCount += 1;
                        l.append(ReadyValue(value: realizedValue,
                                            op: kind,
                                            description: description));
                        defer {
                            l.remove(at: l.count - 1);
                            composedCount -= 1;
                        }

                        exploreAdditionOfRightNode(&l,
                                                   &composedCount,
                                                   &otherComposedCount,
                                                   isParent: isParent,
                                                   kind: .additive);
                        exploreAdditionOfRightNode(&l,
                                                   &composedCount,
                                                   &otherComposedCount,
                                                   isParent: isParent,
                                                   kind: .multiplicative);
                    }
                }
            }

            completion.leave();
        });
    }

    func start() {
        semaphoreWaitingHost.start();
    }
}

Cancellation has been left as an exercise for the reader. But the important part is that the bulk of the processing does happen in these tasks sent to Grand Central Dispatch that do work by processing a chunk then returning.

You will note that I couldn’t manage to cut the main function into bits that processes chunks in the same way, returning at the end of chunk: due to the recursion there is no simple way to do that; but let me assure you that I did follow my own rules in my own project. So instead I put that main function under its own thread and limited it through a semaphore: it is through the semaphore that completed chunks make a “request” to the central function that a new chunk be dispatched.

This provides the following benefits:

  • At any given time, there are always at least N bits of work either actively executing or waiting to execute, making sure processing resources can be exploited.
  • Even if there turns out to be fewer available processing resources (other tasks executing, etc.), the small granularity of work chunks means they will be evenly scheduled on the remaining resources, avoiding any lopsided execution, such as a large portion of the time with a single task left executing, with the other cores remaining idle
  • Cancellation can now be centralized on the “doling out” object and the reference queue: the processing being cancelled simply translates to the “doling out” object refusing to provide more work to schedule, and that can be triggered by scheduling a simple request to the reference queue. As a bonus, the dispatch group object that is normally set up to tell when processing is complete can now instead tell when cancellation is complete.
  • There is now no risk of thread explosion, as libdispatch (the library that implements Grand Central Dispatch, or GCD) is “aware” of the bottleneck: if the tasks instead synchronously scheduled work on the serial queue, they would risk blocking waiting for the kernel, leading libdispatch to spawn more threads to keep processing resources busy.

…Oh, yes, I’d forgotten to tell about the most important commandment:

  • Do not, under any circumstance, synchronously wait from within a scheduled task (any kind of wait: select, dispatch_sync, sem_wait, pthread_mutex_lock, even read(2), even on a local filesystem) unless you know for a fact what you’re waiting on can arbitrarily scale. Not on penalty of you inheriting the lack of scaling: on penalty of thread explosion.

This is sufficiently important to deserve further development.

~ end of chapter 1 ~

The trouble with libdispatch

blocking on the kernel

You probably ought to get up to speed with a proper computer science curriculum if that’s not already the case, but for the purposes of this post I’ll give you the cliff’s notes.

On a reasonably modern multitasking OS (which includes everything less obsolete than MacOS 9) your program is given direct access the processor core to perform its own computations on its own data structures, but any other activity has to go through the kernel. This includes access to shared resources such as memory allocation, filesystem access, interprocess I/O (including process management), peripheral I/O including mouse, keyboard, and screen or touchscreen, other I/O such as the network, etc. In 99% of the cases, the kernel provides the service in a way that resembles a subroutine call: it is given control, and processing resumes at the point you gave it control once its job is done.

In some cases, the kernel call actually behaves like a subroutine call: after switching to the kernel environment such as its own protected memory, it will perform its own processing while still running off the same core before transferring back control to your program (though not before having exited the kernel environment).

But that is the exception.

In the general case, the kernel may need more than just processor time to fulfill your request. The typical example is reading from a file, where the kernel will try to fetch the information from cache, and if it can’t, have to request it from the physical drive by sending it a request. And not take any further action: the physical drive will raise an interrupt once the data is ready. This was originally the case for spinning drives, it is still the case today, even for SSDs, and even for NVMe (Flash accessed through PCI)! So it would be pointless for the kernel to twiddle its thumbs on the processor in the meantime; instead, it will mark the thread as “not ready to execute”, or more simply “blocked”, and enter the scheduler as if the time slice of the thread was up to give another thread the opportunity to execute. If another thread is available to run, this allows the processor core to keep performing useful work; otherwise, well, we tried.

Blocking can also happen when writing to a file: think when write buffers are full and the OS needs your program to slow down writing data; it obviously happens when waiting on (and possibly sending) interprocess I/O, on peripheral I/O, and most famously on network I/O. But it can also happen when taking a lock: your thread may need to wait in the kernel in the contended case.

The important thing to note is that a blocked thread does take up all the structures of a thread (stack memory along with its virtual memory address range, in particular, but also a slot in the scheduler data structures) while simultaneously being unable to contribute work to perform to a processor core. So for heavily I/O-based workloads you can imagine the overhead of blocked threads can become prohibitive, and it would, if ways to improve the situation hadn’t been developed while still keeping the base model. The most famous of which is select (and refinements: poll, epoll, kevent/kqueue, etc.), which allows taking multiple I/O objects and multiplexing any blocking over them on a single thread: as soon as any of these objects (called file descriptors, even if they are for non-file I/O) would be done blocking, the select call returns, allowing the operation to be performed on the object and immediately succeeding. You can even build great asynchronous APIs above that, and thus offload the burden of blocking on network events to a single background thread in your process.

libdispatch as a feedback loop

That being established, we can now model the transformations that get you from raw tasks to useful work being performed.

At the end you have the desired production: useful work being performed. Yes, we’ll start from the end.

Circles on four conveyor belts, meant to evoke the output of four assembly lines

That is produced by the third stage: execution proper. Prior to that stage, you only have a bunch of threads in various states, depending on the execution time they previously got, but also depending on the makeup of the code in that thread. The execution stage is mostly characterized by the performance and amount of the cores.

Thick wavy lines, meant to evoke literal threads

Since we’re focusing on libdispatch, the bunch of threads, in turn, are the production of the second stage: task launching.

a sequence of squares with a large arrow pointing the the rightmost one, meant to evoke a priority queue

This is where the tasks are fired off, and this stage is fed from data structures that together amount to a priority queue of tasks. This stage has a dial/input port for the number of threads, which is its main characteristic.

Finally, the priority queue of tasks is produced by the first stage: dispatch proper. It is fed raw tasks in the way you defined them for your processing and submitted them to dispatch_async().

squares reading "^{}" floating around, meant to evoke C language blocks

Out of these, task launching is pretty dumb. Don’t get me wrong, there are plenty of clever algorithms, lock-free data structures, and other such engineering to minimize overhead and scale well; but for the purposes of our model it consists of picking up the top item from a priority queue whenever needed.

Where libdispatch starts displaying interesting behavior is in the feedback/control of that stage. Indeed, libdispatch also has a feedback stage which looks at core occupation (as a proxy for the amount of useful work being performed), and if it falls short of the goal, then it increases the number of threads in the task launching stage: this input is driven by the feedback stage.

all of the above, plus a feedback stage that goes from the execution stage to the launching stage

The goal being, of course, all cores 100% occupied. Decreasing the number of threads is not perfectly symmetrical, if only because you can’t have more core occupation than there are cores in the system, but the principle is the same. And there are subtleties: the number of threads won’t increase if there is no ready task left in the priority queue, for instance.

But on the whole, this can be analyzed with the tools of control theory. And this is where libdispatch breaks down.

where does libdispatch go wrong?

The fundamental assumption of this feedback system is that increasing the number of threads will increase core occupation, as long as there are tasks ready to be dequeued. And I have to admit it’s rare (but possible) for additional threads to end up decreasing core occupation. But there are many common ways for additional threads to instead have no or marginal impact on processor occupation.

Imagine for instance that the tasks being scheduled all begin by attempting to acquire the same mutex, and perform all their processing while holding it. Obviously, on a dual-core machine you will never get more than 50% core occupation since only a single task will ever get to run at any given time, the others being stuck in the kernel, excluded from the code thanks to the mutex. That, of course, is contrived, but imagine code that frequently accesses a common data structure that is very coarsely locked: much of the processing time of the task could end up having to occur while while under the lock, and since other tasks are excluded from taking the lock at the same time, any new thread will not help much with core occupancy since that bottleneck remains. This is already much more realistic: coarse locking is common when creating the first iteration of thread-safe data structures, and significant reengineering may be required to obtain finer locking.

But wait, there’s worse.

In such a simple scenario (constant fraction of processing time of a task having to occur under a common bottleneck), it can be proven that there is a maximum core occupation that can be reached, it can never be higher (that value, interestingly, is the inverse of that fraction). So it’s not just that additional threads will help less, but still help: with the first threads you will reach that maximum, and then additional threads will not help at all. So if libdispatch is not satisfied with core occupation even at that maximum, it will create new threads in vain since they won’t improve the situation, but it will keep doing so anyway, until an internal maximum is reached. And you get thread explosion.

But wait, there’s worse.

Even in more realistic setups (finer, hierarchical locking, or blocking on external limitations such as disk IOPS or bandwidth rather than just locks, etc.) there is almost always such a maximum for processor parallelism which is considered the ability of your system to scale. So it’s not just that additional threads will bring increasingly diminishing returns on core occupation, it’s that this core occupation will tend towards a limit, and if that limit (which libdispatch is not aware of) is not to the satisfaction of libdispatch, you will get thread explosion by the same logic as above.

But wait, there’s worse.

You could have customers report thread explosion even though you never observed it on your machine, and upon closer examination realize this is because their hardware has more cores than yours: on their hardware, libdispatch desperately tries to fill up these cores while on yours libdispatch is satisfied, because it turns out your system’s ability to scale is sufficient to occupy all cores on your machine, but insufficient to occupy all cores on your customer’s machine. The result? You will have to acquire hardware at least as wide (in terms of number of cores if nothing else) as any of your customers’ in order to reproduce their thread explosion problems. And stay on top of your customers as new hardware gets released.

But wait, there’s worse.

Your customer (or you) could reproduce thread explosions at some point, then fail to reproduce the issue, only because of slightly different conditions: other tasks running in the background (which interestingly enough would decrease the odds of the issue occurring), or different filesystem performance because of different disk occupation, or different network server performance! With libdispatch if you aren’t careful you get exposed to numerous potential sources of non-reproducibility. And the only thing worse than a showstopper bug is a hard-to-reproduce showstopper bug.

Here’s how I view Grand Central Dispatch as a result. It’s akin to a manager that claims not to micromanage you, but indirectly forces you to stay at your desk: if you don’t, GCD will put someone else to work at your desk in your absence, and when you come back, everything will be messed up because these computers don’t support multiple user sessions. So you take it as a hint to instead sleep at your desk when you’re waiting to hear from the outside to resume working, but in that case GCD becomes frantic with worry that people aren’t looking busy enough, and tries to acquire additional desks, floor space, whole buildings, and hire the corresponding workers, in an attempt to reach a threshold in the number of visibly busy workers. With the issues that entails: more difficulty reaching your desk, natural resources and land use, etc.

And you know how we call such a person? We call them douchebags.

~ end of chapter 2 ~

Dealing with libdispatch

Now that we know that Grand Central Dispatch is a douchebag, how do we work with it anyway? I haven’t found any better way to avoid these scenarios than by not blocking in the kernel unless you know for a fact that doing so will arbitrarily scale; and consequently, you need only launch a limited number of tasks: with rare exceptions you will not need libdispatch to schedule an unpredictable number of them to reach full core occupation. Any bottleneck, then, has instead to be expressed as tasks on a serial queue that are never waited on by dispatch_sync, but instead work asynchronously: they schedule a new task, as if calling a callback, once they are done (more on that later).

But how does this translate into what you can and can’t call from a queue? It’s time for a round of “Will It Scale?”:

Allocating memory (big and small)
yes
Nowadays memory allocators have gotten pretty smart, in particular maintaining per-thread pools of memory to allocate from, so as to reduce contention to a minimum. Even if a round trip to the kernel is necessary, such as for big allocations, the kernel too is meant to scale well in this instance.

In fact, memory allocation nowadays has to scale since multithreading libraries rely on it in the first place as part of most operations (e.g. memory allocation occurs as part of Block_copy(), in the case of libdispatch).

Making non-blocking calls on a socket
yes
Note that it would be mostly pointless to make such a call without any indication that the socket is ready for it (e.g. read(2) when no data has been received), so it only makes sense to do this from a function registered to be called when the socket is ready for this operation. By “socket” I mean any file descriptor that is not a regular file or block device; that includes sockets in local domains (AF_UNIX), TTYs, pipes, etc.
Making non-blocking calls to libdispatch
yes
You kind of have to make such calls at some point, e.g. dispatch_async to access data protected by a serial queue.
Making non-blocking async-signal safe calls
yes
These include posting to a semaphore for instance.
Making synchronous calls to access files that could reside on a remote filesystem
no
Regardless of how it is performed, regardless of whether it is for reading or writing, accessing a remote file not only causes blocking in the kernel but will limit the business of your program by what the network and the remote server can provide (or absorb, in case you’re writing to it); and if libdispatch catches your code being less busy than the available cores allow, thread explosion will result. Usage of non-blocking flags is pointless: they have no effect for regular files or block devices, even when located on networked filesystems.
Making synchronous calls to access files known to be local
no
If your code is at the point where you are multithreading it, then even the fastest mass storage interface won’t be able to keep up with your code running on all processor cores, so these calls will prevent you from scaling up to that point, and thread explosion will result.

I know this because I tested it: this project reads files as fast as it can while still processing every single byte, and you can configure the amount of tasks that are allowed to be simultaneously in-flight (note: if you use an M1 Mac as your daily driver, I welcome any patch to add native ARM support). On my 4-core (2 physical ones) machine with integrated flash storage, there are benefits to having more than 4 threads, and more than even 8, thus showing the impressive capabilities of the flash controller, enabled by the NVMe interface. However, performance won’t improve after 12 or so threads, but it doesn’t matter to libdispatch: if I allow 16 in-flight tasks, it will dutifully create 16 threads, if I allow 32, 32 threads will be created for still no performance improvement, and it seems libdispatch will get up to 64 threads or so before backing off, at which point performance has actually degraded from the 32 threads case.

Making synchronous calls to browse a local filesystem
no
You can’t count on that to scale, either. And with libdispatch, “not scaling with symptoms being threads blocking in the kernel” translates to “thread explosion”.
Making blocking calls on a socket
heck no
Synchronous networking is bad already, do I need to explain how worse it is when done from a dispatch queue?
Acquiring a mutex
scaling now becomes your responsibility, on penalty of thread explosion if you fail. Are you sure you want it?
The very principle of mutual exclusion means the chunks of your tasks under the lock are serialized, and this limits how your tasks can scale. That limit may be unattainable in practice (e.g. allow for thousand-fold parallelism), but you won’t know that limit until you reach it. And a mutex works by blocking your thread in the kernel until the light is green, as opposed to a serial queue (as long as you never call dispatch_sync on the latter, of course).

So you can’t rule out your mutex being responsible for thread explosion, short of owning the most parallel machine in the world to perform your tests on. Unless your intent was to ensure your ability to scale being always sufficient by way of a formal proof? In that case, don’t forget to update your proof whenever you make a change, no matter how insignificant. Easy.

Calling purely computational libraries
yes
These include system libraries such as Accelerate.
Calling into other third-party code
no
You have no idea of what it could be doing, so you can’t exclude it calling one of the kernel services that don’t scale at some point. In the end, you’d expose yourself to thread explosion.
Calling into other system libraries
no
Same as for other third-party code: you have no idea of what it could be doing. For instance, typesetting can easily become the biggest consumer of CPU time of an iOS app, so why not try and run these jobs concurrently? Unfortunately, what do you think happens when your string contains a character outside the repertoire of the font you’ve specified? Font substitution kicks in so that character can be rendered, which means loading the font from storage, and since Core Text APIs are synchronous, this loading is going to happen synchronously, too (I have seen this take on the order of 600 ms, though that was a long time ago). Congratulations, you are now exposed to thread explosion issues.

So you might be wondering, wait, what is libdispatch good for, anyway? And that is a very good question. Here are the situations where I would reach for it, from most suitable to least.

  • Where it shines most of all is for code you can factor as a factory of micro-batches (again, about one tick of processing time) that can be executed independently, where the inside of micro-batches is computational but not regular at all (involving data structures, allocations, branches, etc.), such that you’d have a hard time parallelizing using other methods, such as with vector-oriented APIs.
  • Since it can do that, it is also good at more regular cases: big vectors, striped image processing, some cases of offline audio or video processing, etc.
  • Combined with Dispatch I/O (more on that in a bit), it’s also good at file processing, though you won’t be able to reuse libraries that access the file formats synchronously, at least not from dispatched tasks, plus you have to contend with asynchronous code (more on that in a bit).
  • The same way, combined with Dispatch Source (more on that in a bit) it’s also good at the initial request processing of a server or daemon, though again you’ll probably have to write that from scratch.
  • And in theory it’s going to be suitable for filesystem indexing (thumbnail generation for instance), though one difficulty would be the obtention of directory listings solely using Dispatch I/O.

(But in these cases, why not use threads directly? First, to avoid the overhead of spawning a thread per such micro-batch, and second, to share threads with other such activity in the same process while still optimally exploiting processing resources)

~ end of chapter 3 ~

Working asynchronously

The designers of libdispatch did much of the same analysis and did reach the same conclusions as to the suitability of kernel entry points to being called from within dispatched tasks, and that is why they created Dispatch I/O for instance, since without it you would be left with crap when it comes to asynchronous file APIs on Mac OS X (interestingly, classic MacOS did have first-class support for asynchronous filesystem I/O, owing to the fact it originally had to run off of floppies, but that neatly applied to networked filesystems as well).

By the same token, they created Dispatch Source to replace any select call: Dispatch Source allows you to make a non-blocking call to a socket from within your dispatched task with the confidence that the socket is ready for it: the only requirement is that your task be launched from the dispatch source.

As for mutexes, they propose serial queues as an alternative for mutual exclusion, with the intent that the task you submit to that queue itself submits a callback function to run once the transaction is done.

All of these work asynchronously: they schedule a new task, as if calling a callback, once they are done. This way of working has two main drawbacks, let us see them in order.

poor language support

Factoring the continuation of your work as a new function to be provided to an asynchronous API requires you to explicitly move your state off of the stack into some sort of object in order to be able to recover it. This is, it has to be said, a pain in the ass. Even the addition of blocks to the C language has only alleviated this issue a little: in the best case, you get arrow code; in the worst case, code that is error-prone and impossible to follow.

As that last link shows, this is set to improve soonish in Swift, but at the time of this writing this is still experimental (e.g. I haven’t managed to get my example processing to work with it). In the meantime, nothing unsurmountable as long as you don’t have to have too much code that needs to work that way; so my advice for the poor language support issue would be: focus on the small chunks of processing that would most benefit from parallelism; everything else should be run the traditional way: in the thread you already have, sequentially, using locks and synchronous calls, etc.

the fake dilemma

Note that these asynchronous libdispatch APIs take a queue, which is where the continuation task will be scheduled; the intent being that you will pass the queue you are running on, so the continuation will run under the same conditions.

If that queue is a concurrent queue, no problem. You weren’t counting on it to protect the consistency of your state anyway, so it’s not a problem to leave your state as-is until you get called back.

If that queue is a serial queue, however, you have a real problem: as soon as your task has “returned” after calling the asynchronous API, there is nothing that prevents another unrelated task on the serial queue from launching, and finding the state not as it was when you usually start a task, but as you left it at the point you needed to call the async API.

So that leaves you with a rotten choice: either keep the synchronous call instead and risk thread explosion, or factor your code so your state is consistent at the point you hand over to the asynchronous API. But, wait. I don’t call that a choice: I call that a fake dilemma. Presumably, you were using the serial queue for the purpose of state protection, and if that queue can’t ensure its purpose, then the problem is with the queue. I haven’t found any way to “reserve” the queue until the completion has run, that does not seem to be possible with libdispatch. There is no simple solution here, to put it bluntly. If you have few enough serial queues in that situation, then I give you special dispensation to perform blocking calls from it, but in compensation every single such serial queue you create has to be considered equivalent to creating a thread, resource-wise.

(You will note that in my example I directly used a thread instead, but that is because the only inter-task communication mechanism I needed in that simple example was a semaphore; in the general case, better standardize on dispatch queues so you can just use dispatch_async for everything.)

If there are too many such serial queues for this to be reasonable, you can also try reworking your code to have a consistent state at any point you make an asynchronous call. Unfortunately, if that is too complex, then you’re out of luck: it is best to sidestep Grand Central Dispatch altogether.

case study: having a consistent state whenever a serial queue is relinquished

As an aside in this discussion, let us see how it could work in practice. Suppose you have some code that requires serialization, and currently works synchronously. By way of example, I will take this code where a value is lazily loaded from a file (synchronous operation) at the first request, and kept around to satisfy future requests. As a corollary, it is necessary to protect this state management with some serialization, but this happens automatically in the single-threaded, synchronous case:

//
//  ObjectCache.swift
//  AsyncObjectCache
//
//  Created by Pierre Lebeaupin on 10/02/2021.
//

import Foundation

class ObjectCache {
    class Slot {
        var data: Double?;

        init() {
            data = nil;
        }

        func getData(_ basePath: String,
                     _ index: UInt32) -> Double {
            if let solidData = data {
                return solidData; // our work here is done.
            }

            // Do the dirty work.
            let path = basePath.appendingFormat("/%lu.sprite", index);

            path.utf8CString.withUnsafeBufferPointer { ptr in
                let fd = open(ptr.baseAddress!, O_RDONLY);

                guard fd >= 0 else {
                    fatalError();
                }

                lseek(fd, off_t(8), SEEK_SET);

                var accum = ContiguousArray<Int8>();
                accum.reserveCapacity(8);

                accum.withUnsafeMutableBufferPointer { mPtr in
                    guard read(fd, mPtr.baseAddress!, 8) == 8 else {
                        fatalError();
                    }
                };

                close(fd);

                var intermediate = UInt64(0);

                for byte in accum {
                    // decode from big-endian format
                    intermediate = (intermediate << 8) + UInt64(byte);
                }

                // Alleluia!
                data = Double(bitPattern: intermediate);
            };

            return data!;
        }
    }


    let base: String;
    let slots: Array<Slot>;

    init(base inbase: String, count: UInt32) {
        base = inbase;
        var protoslots = Array<Slot>();
        protoslots.reserveCapacity(Int(count));

        for _ in 0 ..< count {
            protoslots.append(Slot());
        }

        slots = protoslots;
    }

    func getData(_ index: UInt32) -> Double {
        return slots[Int(index)].getData(base, index);
    }
}

Except for the use of Unix filesystem APIs, seems straightforward enough. So how can we make that work asynchronously so as to be safely callable from libdispatch tasks? Indeed, even if the synchronous filesystem call happens once per cache slot, this is enough for the first phase of processing to be dominated by filesystem activity, and so enough for thread explosion to happen. The feedback mechanism may not even get around to cleaning up these threads even once the processing settles in a compute-bound phase! To avoid that, the name of the game here is to explicitly mark when you’re in a transition state, and if it is the case, hold off incoming requests:

//
//  AsyncObjectCache.swift
//  AsyncObjectCache
//
//  Created by Pierre Lebeaupin on 10/02/2021.
//

import Foundation

class AsyncObjectCache {
    class Slot {
        struct Functor {
            // silly struct just so I can make a
            // recursive function definition
            var invoke: (_ data: Double) -> Functor?;
        }

        var data: Double?;
        var insertNext: ((_ functor: Functor) -> Void)?;
        let q: DispatchQueue;
        var initialFunctor: Functor?;
        var launched: Bool;

        init() {
            q = DispatchQueue(label: "AsyncObjectCache slot protection");
            launched = false;

            insertNext = { [self] functor in
                initialFunctor = functor;
            }
            // Note that at this point there is a strong reference cycle
            // between the class instance and the closure; I see no way
            // around that.
            // The manual cleanup of the line, through initialFunctor,
            // ought to be enough to solve the situation.
        }

        func asyncGetData(_ basePath: String,
                          _ index: UInt32,
                          _ continuationQueue: DispatchQueue,
                          _ continuation: @escaping (_ data: Double) -> Void) {
            q.async { [self] in
                if let solidData = data {
                    continuationQueue.async {
                        continuation(solidData);
                    };
                    return; // our work here is done.
                }

                // TODO: move allocation of closures off of the
                // critical section to improve scalability?
                var next: Functor?

                insertNext!(Functor(invoke: { incomingData in
                    continuationQueue.async {
                        continuation(incomingData);
                    };

                    return next;
                }));

                insertNext = { functor in
                    next = functor;
                };

                guard !launched else {
                    return;
                }
                launched = true;

                // Do the dirty work.
                let path = basePath.appendingFormat("/%lu.sprite", index);

                path.utf8CString.withUnsafeBufferPointer { ptr in
                    let querier = DispatchIO(type: .random,
                                             path: ptr.baseAddress!,
                                             oflag: O_RDONLY,
                                             mode: 0,
                                             queue: q,
                                             cleanupHandler: {
                                                error in guard error == 0 else {
                                                    fatalError();
                                                }
                                             });

                    var accum = DispatchData.empty;

                    // At this point, launched is true, so newly
                    // launched tasks on our queue that are not the
                    // following one will not enter this clause:
                    // our state is consistent at this point, which
                    // is necessary for us to safely relinquish q.
                    querier!.read(offset: 8, length: 8, queue: q) {
                        (done, rawData, error)
                        in
                        guard error == 0 else {
                            fatalError();
                        }

                        accum.append(rawData!);

                        guard done else {
                            return;
                        }

                        var intermediate = UInt64(0);

                        for byte in accum {
                            // decode from big-endian format
                            intermediate = (intermediate << 8) + UInt64(byte);
                        }

                        // Alleluia!
                        let incomingData = Double(bitPattern: intermediate);
                        var current = initialFunctor;
                        while let solidCurrent = current {
                            current = solidCurrent.invoke(incomingData);
                        }

                        // clean up after ourselves
                        data = incomingData;
                        insertNext = nil;
                        initialFunctor = nil;
                    };
                };
            };
        }
    }


    let base: String;
    let slots: Array<Slot>;

    init(base inbase: String, count: UInt32) {
        base = inbase;
        var protoslots = Array<Slot>();
        protoslots.reserveCapacity(Int(count));

        for _ in 0 ..< count {
            protoslots.append(Slot());
        }

        slots = protoslots;
    }

    func asyncGetData(_ index: UInt32,
                      _ continuationQueue: DispatchQueue,
                      _ continuation: @escaping (_ data: Double) -> Void) {
        slots[Int(index)].asyncGetData(base, index, continuationQueue, continuation);
    }
}

Not only is this more complex, but this is pretty error-prone, between not forgetting to enqueue tasks that are held off or the temporary strong reference cycle that must be broken manually. Not to mention the temptations to cut corners: calling held off tasks in backwards order and/or doing so recursively would make the code easier, but could easily trip you up at some point (e.g. imagine having to fire off a hundred held off tasks).

So I don’t think such an operation is viable except for similarly simple cases.

~ end of chapter 4 ~

Future directions

Can Grand Central Dispatch be fixed? What can be learned of it for the purposes of designing concurrent APIs? Here are my humble contributions.

generic thread pool

What if you could have a thread pool that would also be useful for at least some blocking operations? Indeed, asynchronous code brings unavoidable complexity, with more opportunities for bugs, inefficiencies, etc. In the case of local filesystems, you can imagine a way: the kernel is aware of how busy local storage is; or at the very least, it knows when it starts being saturated: either when dirty data grows for writes, or when requests start having to be queued instead of being able to be issued immediately for reads. That information could then be exposed in some way to a thread pool, which could then use that as a basis for the feedback mechanism for increasing or decreasing the number of threads, just like libdispatch uses processor occupation as a basis for the same purpose.

I don’t think we will see that added to libdispatch any time soon, though. For one, there can be multiple local drives hosting each an independent filesystem, so you would need tasks to declare somehow which filesystems they would be accessing, and how; but the API would still eventually map these tasks to the same thread pool. And internally, you would need the launching stage to pick the next task to execute depending on which is more busy: the processors, or the local drives (and which one), which is dynamic information, as opposed to just picking the head of the priority queue.

Architecturally and API-wise, this would mean a very different beast. So if you need a thread pool for local filesystem access, better roll your own.

I will note that in theory, the existing feedback system of libdispatch could use more advanced control functions which could end up detecting when adding a thread ends up having no effect on core occupation, without having to obtain information about local storage. However, such control functions do exist but take a long time to converge, which would be fine for batch processing, but is completely incompatible with an API meant for use on interactive systems.

(As for creating threads to deal with network access, Java famously explored that, and it turned out not to scale remotely well since you need one thread per connection; you can’t add a feedback mechanism similar to that of libdispatch to limit the number of threads in the pool: how would you know how busy the remote device is or devices are? Forget about it: I am under the impression this is an open research problem)

solutions for poor language support

Swift is going to start providing solutions to this soonish, but I think they don’t go far enough. Let us see that with our example.

Some parts of the refactor will forever remain my responsibility, first among them the need to roughly estimate workload for a potential task so as to keep each of them under a tick.

But some parts ought to be managed by the language, with the help of the API: I shouldn’t need to limit the number of dispatched tasks myself. Either the API should allow me to un-dispatch tasks as long as they haven’t been launched (admittedly, easier said than done), or it should run my tasks in such a way that code that ends up dispatching a new task is not run unless that new task can be reasonably expected to launch soon. Otherwise, you risk accumulating tasks that are not launched, but whose launching cannot be cancelled either, adding to cancellation latency; limiting the number of in-flight tasks (either dispatched or launched) to [NS]ProcessInfo.processInfo.activeProcessorCount is insufficient in the case where your tasks have to content for cores with other system activity.

Another part that the language ought to take care of is state duplication: it is obviously unsafe by default to access the same state from independent tasks. Swift closures, like blocks in C, help in that regard, but not enough: variables are captured by reference, not value. So you end up having to copy them yourself, as can be seen in our example, and even that may not be sufficient (more on that later).

So I propose a construct that provides the basis to solve both issues: await fork. Here is how it would be added to our example processing:

//
//  Created by Pierre Lebeaupin on 13/12/2020.
//  Copyright © 2020 Pierre Lebeaupin. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

import Foundation;

enum Op: String {
    case additive = "+", multiplicative = "*";

    func neutralValue() -> UInt32 {
        return ((self == .additive) ? 0 : 1);
    }

    func combine(_ a: UInt32, _ b: UInt32) -> UInt32 {
        return ((self == .additive) ? (a + b) : (a * b));
    }

    func uncombine(_ a: UInt32, _ b: UInt32) -> UInt32 {
        return ((self == .additive) ? (a - b) : (a / b));
    }
}


struct ReadyValue {
    let value: UInt32;
    let op: Op?;
    let description: () -> String;
    // Note that as a closure, it can depend on more than
    // the stored properties, contrary to a computed
    // property.

    init(_ inValue: UInt32) {
        value = inValue;
        op = nil;
        description = { return inValue.description;};
    }

    init(value inValue: UInt32,
         op inOp: Op,
         description indescription: @escaping () -> String) {
        value = inValue;
        op = inOp;
        description = indescription;
    }
}

func resolve(_ startGoal: UInt32,
             _ queue: DispatchQueue,
             _ incb: @escaping (_ result: String) -> Void,
             _ primitives: UInt32...)
{
    var l = [ReadyValue]();
    var composedCount = 0;
    var otherComposedCount = 0;

    var isParent = true;

    for element in primitives {
        l.append(ReadyValue(element));
    }

    {
    await exploreAdditionOfRightNode(kind: .additive);
    await exploreAdditionOfRightNode(kind: .multiplicative);
    }(); // https://bugs.swift.org/browse/SR-12243

    func exploreAdditionOfRightNode(kind: Op) async {
        if (l.count != 0) && (l[l.count - 1].op != kind) {
            guard otherComposedCount == 0 else {
                return;
            }

            otherComposedCount = composedCount;
            composedCount = 0;
        }
        defer {
            if (l.count != 0) && (l[l.count - 1].op != kind) {
                composedCount = otherComposedCount;
                otherComposedCount = 0;
            }
        }

        var decomposedValue = [false: kind.neutralValue(),
                               true: kind.neutralValue()];

        {
        await iteratePossibleLeftNodes(startingFrom: 0, { _ in return;});
        }(); // https://bugs.swift.org/browse/SR-12243

        func iteratePossibleLeftNodes(startingFrom: Int,
                                      _ walkOperands: @escaping
                                        (_ action: (_ value: ReadyValue,
                                                    _ reverse: Bool)
                                            -> Void)
                                        -> Void) async {
            var isChildStart = false;

            if isParent {
                let cost = operationsToExplore(l.count, composedCount, otherComposedCount, startingFrom, walkOperands);

                if cost < maxCost && cost > minCost {
                    guard await fork == .child else {
                        return;
                    }

                    isParent = false;
                    isChildStart = true;
                }
            }
            defer {
                if isChildStart {
                    async exit;
                }
            }

            for candidateOffset in
                startingFrom ..< (l.count - composedCount) {
                let rightChild = l.remove(at: candidateOffset);
                if let _ = rightChild.op {
                    otherComposedCount -= 1;
                }
                defer {
                    if let _ = rightChild.op {
                        otherComposedCount += 1;
                    }
                    l.insert(rightChild, at: candidateOffset);
                }

                for phase in 0...1 {
                    let reverse = (phase == 1);
                    { (_ valueComponent: inout UInt32) in
                        valueComponent = kind.combine(valueComponent,
                                                      rightChild.value);
                    }(&decomposedValue[reverse]!);
                    defer {
                        { (_ valueComponent: inout UInt32) in
                            valueComponent = kind.uncombine(valueComponent,
                                                            rightChild.value);
                        }(&decomposedValue[reverse]!);
                    }

                    let selfNode = {(_ action: (_ value: ReadyValue,
                                                _ reverse: Bool)
                                        -> Void)
                        -> Void in
                        action(rightChild, reverse);
                        walkOperands(action);
                    };

                    await iteratePossibleLeftNodes(startingFrom: candidateOffset,
                                             selfNode);

                    // close current composition
                    guard ({
                        var num = 0;
                        selfNode({_,_ in num += 1;});
                        return num;
                    }() > 1) && ( (kind == .additive)
                                    ? decomposedValue[false]!
                                       > decomposedValue[true]!
                                    : ((decomposedValue[false]!
                                       % decomposedValue[true]!) == 0) ) else {
                        continue;
                    }

                    let realizedValue = kind.uncombine(decomposedValue[false]!,
                                                       decomposedValue[true]!);
                    let description = { () -> String in
                        var current = "(";
                        selfNode({(_ value: ReadyValue,
                                   _ freverse: Bool) -> Void in
                            current += " ";
                            current += (freverse
                                            ? (kind == .additive ? "-" : "/")
                                            : kind.rawValue);
                            current += " ";
                            current += value.description();
                        });
                        current += ")";

                        return current;
                    };

                    guard l.count > 0 else {
                        if realizedValue == startGoal {
                            let solution = description();
                            queue.async { incb(solution); };
                        }
                        continue;
                    }

                    composedCount += 1;
                    l.append(ReadyValue(value: realizedValue,
                                        op: kind,
                                        description: description));
                    defer {
                        l.remove(at: l.count - 1);
                        composedCount -= 1;
                    }

                    await exploreAdditionOfRightNode(kind: .additive);
                    await exploreAdditionOfRightNode(kind: .multiplicative);
                }
            }
        }

    }
}

(While I figure out how to style monospaced block elements without breaking my publishing pipeline, here’s the diff so you can focus on the differences:)

--- AlternateSolver.swift   2021-02-19 23:32:13.000000000 +0100
+++ ImaginarySolver.swift   2021-02-21 22:30:33.000000000 +0100
@@ -56,6 +56,7 @@
 }

 func resolve(_ startGoal: UInt32,
+             _ queue: DispatchQueue,
              _ incb: @escaping (_ result: String) -> Void,
              _ primitives: UInt32...)
 {
@@ -63,16 +64,18 @@
     var composedCount = 0;
     var otherComposedCount = 0;

+    var isParent = true;
+
     for element in primitives {
         l.append(ReadyValue(element));
     }

     {
-    exploreAdditionOfRightNode(kind: .additive);
-    exploreAdditionOfRightNode(kind: .multiplicative);
+    await exploreAdditionOfRightNode(kind: .additive);
+    await exploreAdditionOfRightNode(kind: .multiplicative);
     }(); // https://bugs.swift.org/browse/SR-12243

-    func exploreAdditionOfRightNode(kind: Op) {
+    func exploreAdditionOfRightNode(kind: Op) async {
         if (l.count != 0) && (l[l.count - 1].op != kind) {
             guard otherComposedCount == 0 else {
                 return;
@@ -92,7 +95,7 @@
                                true: kind.neutralValue()];

         {
-        iteratePossibleLeftNodes(startingFrom: 0, { _ in return;});
+        await iteratePossibleLeftNodes(startingFrom: 0, { _ in return;});
         }(); // https://bugs.swift.org/browse/SR-12243

         func iteratePossibleLeftNodes(startingFrom: Int,
@@ -100,7 +103,27 @@
                                         (_ action: (_ value: ReadyValue,
                                                     _ reverse: Bool)
                                             -> Void)
-                                        -> Void) {
+                                        -> Void) async {
+            var isChildStart = false;
+
+            if isParent {
+                let cost = operationsToExplore(l.count, composedCount, otherComposedCount, startingFrom, walkOperands);
+
+                if cost < maxCost && cost > minCost {
+                    guard await fork == .child else {
+                        return;
+                    }
+
+                    isParent = false;
+                    isChildStart = true;
+                }
+            }
+            defer {
+                if isChildStart {
+                    async exit;
+                }
+            }
+
             for candidateOffset in
                 startingFrom ..< (l.count - composedCount) {
                 let rightChild = l.remove(at: candidateOffset);
@@ -135,7 +158,7 @@
                         walkOperands(action);
                     };

-                    iteratePossibleLeftNodes(startingFrom: candidateOffset,
+                    await iteratePossibleLeftNodes(startingFrom: candidateOffset,
                                              selfNode);

                     // close current composition
@@ -171,7 +194,8 @@

                     guard l.count > 0 else {
                         if realizedValue == startGoal {
-                            incb(description());
+                            let solution = description();
+                            queue.async { incb(solution); };
                         }
                         continue;
                     }
@@ -185,8 +209,8 @@
                         composedCount -= 1;
                     }

-                    exploreAdditionOfRightNode(kind: .additive);
-                    exploreAdditionOfRightNode(kind: .multiplicative);
+                    await exploreAdditionOfRightNode(kind: .additive);
+                    await exploreAdditionOfRightNode(kind: .multiplicative);
                 }
             }
         }

The principle of await fork is that, just like the fork syscall, the code after it is run two times: once as a child task, and once as a continuation of the parent task; you know which is which from the return value of the await fork statement. Note how, as a result, the code here is close to the serial version.

await fork can form the basis of the task limitation feature: the system could assign a higher priority to child tasks so that parent tasks are never launched as long as one of its own child tasks is available to launch instead. It is not unheard of for the task launcher to be aware of task groups: for instance in most operating systems the scheduler groups together threads from the same user session, so as to avoid one user taking cycles off of the other users just because that user has many processes running. The outcome would be that the parent would not get to run until all of its extent child tasks are already launched and a new opportunity for launching occurs, at which point the parent will perform its own processing and end up calling await fork again; at that point, the API could run either the continuation of the parent task or the new child task, and will choose the latter, by application of the priority rule.

So the principle of await fork as being simultaneously a suspension point and submission of a subtask works well in that case, but it is also resilient for other situations: in the possible case where the parent task cannot provide subtasks fast enough to satisfy core occupation (e.g. in case there are a godawful number of cores), then the submitted subtask will be run on an available core, and the core from which await fork was called will simply resume the parent task.

And await fork obviously takes care of state duplication. In our example processing, we were lucky: the description closures referenced by ChildL only ever access let variables which they are never going to mutate or which are never going to be mutated from under them. But that may not necessarily be the case: I believe it is possible to remove the @escaping qualifier from most closure parameters here, but that can only happen if I can prove to the compiler that they never escape, and that would require replacing l by closures that link to one another in a linked list. So now the closures that are passed around couldn’t just be shallow copies, otherwise the parent task would be going to mess with the child task whenever an element is inserted back into the linked list…

So now you need (or rather you need the language) to make a structured copy, JavaScript-like, of the data that the child task will access. But wait, you can’t just make a structured copy of each closure or structure independently, as some of those could be meant to access common data through a variable capture, and with independent structured copies they would access independent instances. So you need to make a structured copy of the whole data as a single item.

At this point, wouldn’t it be easier to make a structured copy of the whole coroutine stack in the first place? Hence the concept of await fork.

Note that the pervasiveness of copy-on-write behaviors in Swift means some data (data buffers, kernel handles such as file descriptors, etc.) does not need to be copied at that point, and may not even need to be at any point, so async fork should reasonably be “pay-as-you-go”. Still, you may object that there is usually little point for the subtask to be able to keep accessing their own snapshotted copy of lower stack frames; but I think that this is almost always useful, on the contrary. For instance, if you’re implementing a multithreaded indexer, at one point or another you’re going to have filesystem access issues, this is basically unavoidable. And a subtask would raise an exception in that case, but if it only has errno to work with, diagnostic information is going to be limited. However, if it does have access to the snapshot of earlier frames corresponding to the directory hierarchy leading to the file being processed, it would be able to query these directories and provide good diagnostic information. Note that it would be unsafe for this querying to occur as a preflight, prior to the subtask launching: this would raise time-of-check time-of-use issues. Better to try, then handle failure, and you need everything you can get your hands on when handling failure.

Of course, the point is not to implement await fork using fork(2): the two tasks are meant to be running in the same address space. And here we get to what I admit is the most contentious part of the proposal: how to internally remap references so they point to the child task copies of the data?

The first way would be to copy the coroutine stack wholesale, as a memory block, and have all references, including in data structures and references to captured variables, be relative to a stack base that is passed to the task whenever it is launched (or relaunched, in case of an asynchronous continuation): these relative references would not need to be updated. That does raise some thorny ABI issues as to how to recover that base when going through code that is not aware of this. That also means suboptimal codegen, somewhat compensated by the fact most modern processors implement relative addressing, but suboptimal nevertheless. So this is probably not the right way; I only mention this since it’s by far the simplest and least error-prone way to simulate await fork without language support.

The second way would be to have all allocations on the coroutine stack be made from a specific memory pool, and when await fork occurs, copy that whole pool, but then perform the in-place equivalent of structured copy on the second pool: adjusting references and reference counts on the copied pool until all allocations have been visited. That way the copy is still relatively fast and codegen does not need to change, and a child task terminating with async exit (the counterpart of the await fork) ought not to fragment memory too much: its coroutine stack pool ought to be empty as a result of that exit. Even if it is only almost empty, in case of a result that was exfiltrated and retained as a result for instance, it will likely soon be empty.

The third way would be to just have a structured copy starting from all the references the code has access to at the point of the await fork, without any pool.

Finally, note that classes, since they would not belong to the coroutine stack by any measure, would not be copied (or allocated in the coroutine stack pool, if applicable): the two stacks would have two references to the same instance. That is consistent with class being reference types. However closures would have to be copied at least when the references in them would need to be adjusted themselves, since these references could be to structure variables or other value type variables on the coroutine stack.

solutions for the fake dilemma

That one seems rather obvious: the serial queue or equivalent “asynchronous lock” should keep being held across any asynchronous API; that way, you would no longer have to choose between risking thread explosion and reworking your code in a complex and error-prone fashion. And that would be a start; but what happens if your task has .userInteractive quality of service class, but its filesystem request has to wait among a flurry of background filesystem activity? You task could end up being delayed by other system activity of lower priority: if the priority only applies to processor access, it won’t get you very far in the end. This is important including for concurrent queues, but doubly so for serial queues given their status as a potential bottleneck.

So there is a need for the priority/QoS/etc. of the queue to be applied across all asynchronous APIs it depends on, instead of the async API “just” maintaining the queue in the state where it was prior to the call. Many APIs have some support for that already, and the matter would be integration with the existing priority levels of these APIs: APFS for instance has support for I/O prioritization. I believe that the future structured concurrency proposals in Swift will support such priority inheritance. That includes the case where you submit a task to a serial queue along with a completion closure, of course: that whole serial queue needs to inherit at least the priority of the submitting queue until that task is complete.

~ end of chapter 5 ~

Other takes

I read with interest Thomas Clement’s take on Grand Central Dispatch. Funnily enough, I had completed preparatory work (task workload estimation, switching internal state to Swift structs so as to be able to duplicate state for a pseudo-fork, etc.) and was right in the middle of integrating with libdispatch when Michael J. Tsai linked to it, so I had already made my design choices by the time I read it; however, like him I had benefitted from similar collective hindsight on libdispatch’s… flaws.

Still, I would quibble with a few things. Not so much with his recommendations, which amount to the specific case of my “special dispensation”.

First, I would quibble with what he finds hard: what he describes is not multithreading being hard, it’s asynchronous code being hard! Indeed, asynchronous code is hard even when it all happens on the main thread. Of my whole career, the achievement I’m most proud of is still scrubbing in CineXPlayer. And the media player I reused having to run on its own thread (more on that in a bit) wasn’t a problem: for callbacks run from that thread, I would just call [performSelectorOnMainThread: withObject: waitUntilDone:NO] and be done with it; in some cases such as progress indication I would coalesce the updates at the callback level so as to avoid processing redundant progress updates from the main thread, but that was basically it for code that had to be thread-safe. The remainder could be as thread-unsafe as it wanted… but it was hard nevertheless, because events, whether coming in from user input or these player callbacks (and a few coming from the system, such as network status) could come in and be interleaved in any order, and I had to handle them anyway. I ended with setting up a state machine that would model the desired state of the player, and call the next player API to get closer to there, unless the player was already handling a prior request, in which case I (asynchronously) waited for it to no longer be busy. So my message would be: don’t worry about code that you spin off to a separate thread/task/etc; do worry about code that has to work asynchronously, regardless of where it runs.

Second, I would quibble with his dismissal of actors. Just like concurrent queues (you have seen earlier how I used them to great extent, the condition being that their contents be free from kernel calls that don’t scale for sure), actors have their role: when your processing has to be built around its own specific thread. When does that happen? Mostly when your processing has real-time constraints. One example would be a media player, but a more useful one would be an animation engine like CoreAnimation. The thing about an animation engine is that, when you call an API on it, it could either be:

  • Busy with something right now, so whatever you want it to do needs to be queued until such time the animation engine is ready to handle that,
  • In the middle of something but not currently busy, in which case it needs to know about your API call right now in case that call would impact its future plans (e.g. for animation retargeting)
  • Not doing anything at all, in which case it needs to be woken up.

You could attempt to implement all that yourself. And when you’d be done, you would realize you had just reimplemented an actor. I should know, because multiple times in my career I have said “OK, this thing that needs its own long-lived thread, I now happen have the opportunity to do it from scratch as we can’t use an existing implementation, and I know enough to do it properly from the mistake of the implementations I saw previously,” and once I was done… I realized I had reimplemented an actor.

So the important thing here is not to see actors as the new framework for concurrency, but just for what they are: a replacement for threads when they are used for real-time purposes. That is the general lesson here: concurrent queues should similarly be seen as the way to parallelize tasks that we can already guarantee to be compute-bound, etc. The pitfall here is deeming anything to be “the general framework for concurrency”, and that goes doubly for its vendor. As long as we avoid that pitfall, and do not let any particular abstraction become our universal savior, it will have no power over us and will not be able to hurt us.

Leave a Reply

Name *
Email *
Website