[SOLVED] OperationQueue with custom `maxConcurrentOperationCount` does not pick up / execute all operations in the queue, after finishing first operation

Issue

I’m sure there’s something wrong with my logic, just cant figure out what it is.

There’s a "Service" class, which has an operation queue:

class Service {

    let queue: OperationQueue = {

        var queue = OperationQueue()
        queue.name = "my.operationQueue"
        queue.maxConcurrentOperationCount = 1
        return queue
    }()

    func add(operation: Operation) {
        queue.addOperation(operation)
    }
}

The operation is asynchronous, so it overrides the states, and the start function:

class MyOp: Operation {

    private var state: State = .ready
    private var id: Int

    init(id: Int) {
        self.id = id
    }

    override var isAsynchronous: Bool {
        return true
    }

    override var isReady: Bool {
        return state == .ready
    }

    override var isExecuting: Bool {
        return state == .started
    }

    /// See: `Operation`
    override var isFinished: Bool {
        return state == .finished || state == .cancelled
    }

    /// See: `Operation`
    override var isCancelled: Bool {
        return state == .cancelled
    }

    override func start() {

        guard state == .ready else {
            return
        }

        state = .started
        print("\(Date()) started \(id)")

        DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
            self.state = .finished
            print("\(Date()) finished \(self.id)")
        }
    }
}

private extension MyOp {

    enum State {

        case ready
        case started
        case cancelled
        case finished
    }
}

I am adding multiple operations to the queue (using concurrentPerform for testing purposes, in reality, it’s different):

let iterations = 20
let service = Service()

DispatchQueue.concurrentPerform(iterations: iterations) { iteration in

    let operation = MyOp(id: iteration)
    service.add(operation: operation)
}
DispatchQueue.global().asyncAfter(deadline: .now() + 40) {
    print("\(Date()) after run \(String(describing: service.queue.operations))")
}

What do I expect

  • 20 operations are added to the queue (because let iterations = 20)
  • 1 operation starts to run immediately, others wait in the queue (because queue.maxConcurrentOperationCount = 1)
  • once the first operation completes, second starts, and so on.
  • the final block, which prints the queue contents, should not contain any items, or at max 1-2 remaining items.

What actually happens

Operations are added to the queue as expected it seems.

I see that only 1 operation starts and finishes, the remaining operations never start. The final block, which prints queue contents 40 sec after all operations were added (roughly enough time to complete all, or almost all operations), shows that the remaining operations are still in the queue, not running. Here’s an example:

<NSOperationQueue: 0x7fd477f09460>{name = 'my.operationQueue'}
2022-03-23 21:05:51 +0000 started 11
2022-03-23 21:05:53 +0000 finished 11
2022-03-23 21:06:31 +0000 after run [
  <__lldb_expr_25.MyOp 0x7fd479406660 isFinished=YES isReady=NO isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd477f04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479206a70 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460904190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479004080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479406550 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460804080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd470904480 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460904080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460804190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460a04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4793068c0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460b04080 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd477f0a160 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd460a04190 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd479406770 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4608042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4792092f0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd47910a360 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>, 
  <__lldb_expr_25.MyOp 0x7fd4609042a0 isFinished=NO isReady=YES isCancelled=NO isExecuting=NO>
 ]

So what am I doing wrong?

Note:

  • This is not a problem with the print being wrong, as in actual code I am not using it
  • Also in actual code there’s no DispatchQueue.global().asyncAfter(deadline: .now() + 2) – this is just to simulate a running asynchronous operation.

Update: I distilled the problem to maxConcurrentOperationCount: if I remove the line queue.maxConcurrentOperationCount = 1, the queue is working as expected. Setting it to any other value, creates a similar problem.

Still don’t understand why is it wrong.

Solution

The issue is that the methods are not KVC/KVO compliant. As the Operation documentation says:

The NSOperation class is key-value coding (KVC) and key-value observing (KVO) compliant for several of its properties.

If you provide custom implementations for any of the preceding properties, your implementations must maintain KVC and KVO compliance.

Constraints on the degree of concurrency (e.g., both maxConcurrentOperationCount and addDependency(_:)) rely upon KVO to know when the prior operation is complete. If you fail to perform the required KVO notifications, the queue will not know when subsequent operations may proceed.

See the latter part of Trying to Understand Asynchronous Operation Subclass for an example implementation.


FWIW, here is an asynchronous operation implementation:

public class AsynchronousOperation: Operation {

    @Atomic @objc private dynamic var state: OperationState = .ready

    // MARK: - Various `Operation` properties

    open         override var isReady:        Bool { state == .ready && super.isReady }
    public final override var isExecuting:    Bool { state == .executing }
    public final override var isFinished:     Bool { state == .finished }
    public final override var isAsynchronous: Bool { true }

    // KVO for dependent properties

    open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
        if [#keyPath(isReady), #keyPath(isFinished), #keyPath(isExecuting)].contains(key) {
            return [#keyPath(state)]
        }

        return super.keyPathsForValuesAffectingValue(forKey: key)
    }

    // Start

    public final override func start() {
        if isCancelled {
            state = .finished
            return
        }

        state = .executing

        main()
    }

    /// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.

    open override func main() {
        fatalError("Subclasses must implement `main`.")
    }

    /// Call this function to finish an operation that is currently executing

    public final func finish() {
        if !isFinished { state = .finished }
    }
}

private extension AsynchronousOperation {
    /// State for this operation.

    @objc enum OperationState: Int {
        case ready
        case executing
        case finished
    }
}

With the following:

@propertyWrapper
public class Atomic<T> {
    private var _wrappedValue: T
    private var lock = NSLock()

    public var wrappedValue: T {
        get { lock.synchronized { _wrappedValue } }
        set { lock.synchronized { _wrappedValue = newValue } }
    }

    public init(wrappedValue: T) {
        _wrappedValue = wrappedValue
    }
}

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

With the above, I abstract the asynchronous Operation code into something I can subclass and inherit the asynchronous behaviors. E.g., here is an operation that performs the same asyncAfter as your example (but with some extra OSLog signposts so I can visually see the operations in Instruments):

import os.log

private let log = OSLog(subsystem: "Op", category: .pointsOfInterest)

class MyOperation: AsynchronousOperation {
    var value: Int

    init(value: Int) {
        self.value = value
        super.init()
    }

    override func main() {
        let id = OSSignpostID(log: log)
        os_signpost(.begin, log: log, name: "Operation", signpostID: id, "%d", value)

        DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [self] in
            finish()
            os_signpost(.end, log: log, name: "Operation", signpostID: id, "%d", value)
        }
    }
}

Then …

let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1

for i in 0..<5 {
    queue.addOperation(MyOperation(value: i))
}

… yields a timeline of the operations like so:

enter image description here

Answered By – Rob

Answer Checked By – Jay B. (BugsFixing Admin)

Leave a Reply

Your email address will not be published.