Third Teaser: Practical Multithreading

Guys, my future post on practical multithreading is going to be of epic scale (previous teaser). I wish I could split it up into multiple posts, but that would be too risky: I don’t want readers stumbling on a later part without having first read the earlier caveats.

So here is another teaser in the meantime. Suppose you have this code that works in a synchronous fashion:

//
//  ObjectCache.swift
//  AsyncObjectCache
//
//  Created by Pierre Lebeaupin on 10/02/2021.
//

import Foundation

class ObjectCache {
    class Slot {
        var data: Double?;
        
        init() {
            data = nil;
        }
        
        func getData(_ basePath: String,
                     _ index: UInt32) -> Double {
            if let solidData = data {
                return solidData; // our work here is done.
            }
            
            // Do the dirty work.
            let path = basePath.appendingFormat("/%lu.sprite", index);
            
            path.utf8CString.withUnsafeBufferPointer { ptr in
                let fd = open(ptr.baseAddress!, O_RDONLY);
                
                guard fd >= 0 else {
                    fatalError();
                }
                
                lseek(fd, off_t(8), SEEK_SET);
                
                var accum = ContiguousArray();
                accum.reserveCapacity(8);
                
                accum.withUnsafeMutableBufferPointer { mPtr in
                    guard read(fd, mPtr.baseAddress!, 8) == 8 else {
                        fatalError();
                    }
                };
                
                close(fd);
                
                var intermediate = UInt64(0);
                
                for byte in accum {
                    // decode from big-endian format
                    intermediate = (intermediate << 8) + UInt64(byte);
                }
                
                // Alleluia!
                data = Double(bitPattern: intermediate);
            };
            
            return data!;
        }
    }

    
    let base: String;
    let slots: Array;
    
    init(base inbase: String, count: UInt32) {
        base = inbase;
        var protoslots = Array();
        protoslots.reserveCapacity(Int(count));
        
        for _ in 0 .. Double {
        return slots[Int(index)].getData(base, index);
    }
}

Except for the use of Unix filesystem APIs, seems straightforward enough. But what if I want to make that work asynchronously so as to be safely callable from Grand Central Dispatch tasks? Here’s what you need to do:

//
//  AsyncObjectCache.swift
//  AsyncObjectCache
//
//  Created by Pierre Lebeaupin on 10/02/2021.
//

import Foundation

class AsyncObjectCache {
    class Slot {
        struct Functor {
            // silly struct just so I can make a
            // recursive function definition
            var invoke: (_ data: Double) -> Functor?;
        }

        var data: Double?;
        var insertNext: ((_ functor: Functor) -> Void)?;
        let q: DispatchQueue;
        var initialFunctor: Functor?;
        var launched: Bool;
        
        init() {
            q = DispatchQueue(label: "AsyncObjectCache slot protection");
            launched = false;

            insertNext = { [self] functor in
                initialFunctor = functor;
            }
            // Note that at this point there is a strong reference cycle
            // between the class instance and the closure; I see no way
            // around that.
            // The manual cleanup of the line, through initialFunctor,
            // ought to be enough to solve the situation.
        }
        
        func asyncGetData(_ basePath: String,
                          _ index: UInt32,
                          _ continuationQueue: DispatchQueue,
                          _ continuation: @escaping (_ data: Double) -> Void) {
            q.async { [self] in
                if let solidData = data {
                    continuationQueue.async {
                        continuation(solidData);
                    };
                    return; // our work here is done.
                }

                // TODO: move allocation of closures off of the
                // critical section to improve scalability?
                var next: Functor?

                insertNext!(Functor(invoke: { incomingData in
                    continuationQueue.async {
                        continuation(incomingData);
                    };

                    return next;
                }));
                
                insertNext = { functor in
                    next = functor;
                };
                
                guard !launched else {
                    return;
                }
                launched = true;
                
                // Do the dirty work.
                let path = basePath.appendingFormat("/%lu.sprite", index);
                
                path.utf8CString.withUnsafeBufferPointer { ptr in
                    let querier = DispatchIO(type: .random,
                                             path: ptr.baseAddress!,
                                             oflag: O_RDONLY,
                                             mode: 0,
                                             queue: q,
                                             cleanupHandler: {
                                                error in guard error == 0 else {
                                                    fatalError();
                                                }
                                             });
                    
                    var accum = DispatchData.empty;
                    
                    // At this point, launched is true, so newly
                    // launched tasks on our queue that are not the
                    // following one will not enter this clause:
                    // our state is consistent at this point, which
                    // is necessary for us to safely relinquish q.
                    querier!.read(offset: 8, length: 8, queue: q) {
                        (done, rawData, error)
                        in
                        guard error == 0 else {
                            fatalError();
                        }
                        
                        accum.append(rawData!);
                        
                        guard done else {
                            return;
                        }
                        
                        var intermediate = UInt64(0);
                        
                        for byte in accum {
                            // decode from big-endian format
                            intermediate = (intermediate << 8) + UInt64(byte);
                        }
                        
                        // Alleluia!
                        let incomingData = Double(bitPattern: intermediate);
                        var current = initialFunctor;
                        while let solidCurrent = current {
                            current = solidCurrent.invoke(incomingData);
                        }
                        
                        // clean up after ourselves
                        data = incomingData;
                        insertNext = nil;
                        initialFunctor = nil;
                    };
                };
            };
        }
    }

    
    let base: String;
    let slots: Array;
    
    init(base inbase: String, count: UInt32) {
        base = inbase;
        var protoslots = Array();
        protoslots.reserveCapacity(Int(count));
        
        for _ in 0 .. Void) {
        slots[Int(index)].asyncGetData(base, index, continuationQueue, continuation);
    }
}

Much less straightforward now, unfortunately.

Leave a Reply

Name *
Email *
Website