Guys, my future post on practical multithreading is going to be of epic scale (previous teaser). I wish I could split it up into multiple posts, but that would be too risky: I don’t want readers stumbling on a later part without having first read the earlier caveats.
So here is another teaser in the meantime. Suppose you have this code that works in a synchronous fashion:
// // ObjectCache.swift // AsyncObjectCache // // Created by Pierre Lebeaupin on 10/02/2021. // import Foundation class ObjectCache { class Slot { var data: Double?; init() { data = nil; } func getData(_ basePath: String, _ index: UInt32) -> Double { if let solidData = data { return solidData; // our work here is done. } // Do the dirty work. let path = basePath.appendingFormat("/%lu.sprite", index); path.utf8CString.withUnsafeBufferPointer { ptr in let fd = open(ptr.baseAddress!, O_RDONLY); guard fd >= 0 else { fatalError(); } lseek(fd, off_t(8), SEEK_SET); var accum = ContiguousArray(); accum.reserveCapacity(8); accum.withUnsafeMutableBufferPointer { mPtr in guard read(fd, mPtr.baseAddress!, 8) == 8 else { fatalError(); } }; close(fd); var intermediate = UInt64(0); for byte in accum { // decode from big-endian format intermediate = (intermediate << 8) + UInt64(byte); } // Alleluia! data = Double(bitPattern: intermediate); }; return data!; } } let base: String; let slots: Array; init(base inbase: String, count: UInt32) { base = inbase; var protoslots = Array(); protoslots.reserveCapacity(Int(count)); for _ in 0 .. Double { return slots[Int(index)].getData(base, index); } }
Except for the use of Unix filesystem APIs, seems straightforward enough. But what if I want to make that work asynchronously so as to be safely callable from Grand Central Dispatch tasks? Here’s what you need to do:
// // AsyncObjectCache.swift // AsyncObjectCache // // Created by Pierre Lebeaupin on 10/02/2021. // import Foundation class AsyncObjectCache { class Slot { struct Functor { // silly struct just so I can make a // recursive function definition var invoke: (_ data: Double) -> Functor?; } var data: Double?; var insertNext: ((_ functor: Functor) -> Void)?; let q: DispatchQueue; var initialFunctor: Functor?; var launched: Bool; init() { q = DispatchQueue(label: "AsyncObjectCache slot protection"); launched = false; insertNext = { [self] functor in initialFunctor = functor; } // Note that at this point there is a strong reference cycle // between the class instance and the closure; I see no way // around that. // The manual cleanup of the line, through initialFunctor, // ought to be enough to solve the situation. } func asyncGetData(_ basePath: String, _ index: UInt32, _ continuationQueue: DispatchQueue, _ continuation: @escaping (_ data: Double) -> Void) { q.async { [self] in if let solidData = data { continuationQueue.async { continuation(solidData); }; return; // our work here is done. } // TODO: move allocation of closures off of the // critical section to improve scalability? var next: Functor? insertNext!(Functor(invoke: { incomingData in continuationQueue.async { continuation(incomingData); }; return next; })); insertNext = { functor in next = functor; }; guard !launched else { return; } launched = true; // Do the dirty work. let path = basePath.appendingFormat("/%lu.sprite", index); path.utf8CString.withUnsafeBufferPointer { ptr in let querier = DispatchIO(type: .random, path: ptr.baseAddress!, oflag: O_RDONLY, mode: 0, queue: q, cleanupHandler: { error in guard error == 0 else { fatalError(); } }); var accum = DispatchData.empty; // At this point, launched is true, so newly // launched tasks on our queue that are not the // following one will not enter this clause: // our state is consistent at this point, which // is necessary for us to safely relinquish q. querier!.read(offset: 8, length: 8, queue: q) { (done, rawData, error) in guard error == 0 else { fatalError(); } accum.append(rawData!); guard done else { return; } var intermediate = UInt64(0); for byte in accum { // decode from big-endian format intermediate = (intermediate << 8) + UInt64(byte); } // Alleluia! let incomingData = Double(bitPattern: intermediate); var current = initialFunctor; while let solidCurrent = current { current = solidCurrent.invoke(incomingData); } // clean up after ourselves data = incomingData; insertNext = nil; initialFunctor = nil; }; }; }; } } let base: String; let slots: Array; init(base inbase: String, count: UInt32) { base = inbase; var protoslots = Array(); protoslots.reserveCapacity(Int(count)); for _ in 0 .. Void) { slots[Int(index)].asyncGetData(base, index, continuationQueue, continuation); } }
Much less straightforward now, unfortunately.