Producer-Consumer Problem with GCD and Sempaphors using Swift (Part 1)

The producer–consumer problem also known as the bounded-buffer problem is a classic example of a multi-process synchronisation problem. It is defined as follows:

  • The Producer generates items in random time intervals and writes them into a bounded buffer.
  • The Consumer consumes these items by removing the items from the end of that buffer.

Both Producer and Consumer run in parallel and produce & consume items randomly.

Producer and Consumer with Queue

Uncoordinated Solution

The first case to consider is the one in which there is a single producer and a single consumer reading and writing asynchronous from a limited buffer. The solution for the producer is to either write into buffer or discard data if the buffer is full. In the same way, the consumer can read from buffer or skip read if it finds the buffer empty. This solution has a big drawback that it is loosing data and is not possible in most cases.

Consumer and producer methods will access the buffer in parallel. This would lead to data races and inconsistent data structures. To avoid this problem we put the code that adds and removes items to the buffer into a synchronised GCD queue. This is a simple pattern to synchronise data access in Swift with GCD!

Here is a swift implementation of this scenario. I will improve it in the next section to coordinate producer and consumer and avoid data loss.

import Foundation

public class SynchronizedBuffer {

    // infinit write Buffer
    private var buffer = Array<Int>()
    public let maxBufferSize = 100
    
    // serialization queue to avoid data races on buffer...
    private static let serialQueue = DispatchQueue(label: "ProducerConsumer.serial")
    
    public func push(num: Int) {
        Self.serialQueue.sync {
            if buffer.count < maxBufferSize {
                print("Push \(num) to buffer -> len: \(buffer.count + 1)")
                self.buffer.append(num)
            } else {
               print("Buffer queue is full could not write \(num)")
            }
        }
    }
    
    public func pop() -> Int? {
        return Self.serialQueue.sync {
            if buffer.count > 0 {
                let i = self.buffer.removeFirst()
                print("Pop \(i) from buffer -> len: \(buffer.count)")
                return i
            } else {
                print("Buffer queue is empty could not read")
            }
            return nil
        }
    }
    
}


let concurrentQueue = DispatchQueue.global(qos: .userInitiated)


var queue = SynchronizedBuffer()

// Producer

func randomTimeInterval(_ range: Range<Int>) -> DispatchTimeInterval {
    return DispatchTimeInterval.milliseconds(Int.random(in: range))
}

func produce(i: Int, n: Int) {
    let nextTime = DispatchTime.now().advanced(by: randomTimeInterval(0..<10))
    concurrentQueue.asyncAfter(deadline: nextTime) {
        queue.push(num: i)
        if i < n {
            produce(i: i + 1, n: n)
        }
    }
}


let dispatchGroup = DispatchGroup()

// Consumer
func consume(i: Int, n: Int) {
    let nextTime = DispatchTime.now().advanced(by: randomTimeInterval(0..<12))
    concurrentQueue.asyncAfter(deadline: nextTime) {
            queue.pop()
            if i < n {
                consume(i: i + 1, n: n)
            }
            dispatchGroup.leave()
    }
    dispatchGroup.enter()
}

produce(i: 0, n: 1000)
consume(i: 0, n: 1000)


dispatchGroup.wait()

Semaphore & Mutex Solution

Semaphore solve this problem of lost items. In the solution below I have used two semaphores, fillCount end emptyCount. The fillCount is the number of items already in the buffer; the emptyCount is the number of empty slots in the buffer. Before the producer is writing to buffer he will check the emptyCount semaphore and wait until a free slot is available. Vice versa the consumer checks the fillCount semaphore before reading from the buffer.

import Foundation

public class Buffer {

    // infinit write Buffer
    private var buffer = Array<Int>()
    private let bufferSizeMax: Int
    
    private let bufferEmptyCount: DispatchSemaphore
    private let bufferFillCount: DispatchSemaphore
    private let bufferMutex: DispatchSemaphore
    
    init(bufferSize: Int) {
        self.bufferSizeMax = bufferSize
        self.bufferEmptyCount = DispatchSemaphore(value: bufferSize)
        self.bufferFillCount = DispatchSemaphore(value: 0)
        self.bufferMutex = DispatchSemaphore(value: 1)
    }
    
    public func push(num: Int) {
        bufferEmptyCount.wait()
        bufferMutex.wait()
        
        self.buffer.append(num)
        print("Push \(num) to buffer(\(buffer.count))")
                
        bufferMutex.signal()
        bufferFillCount.signal()
    }
    
    public func pop() -> Int? {
        bufferFillCount.wait()
        bufferMutex.wait()
        
        let rtn = self.buffer.removeFirst()
        print("Pop \(rtn) from buffer(\(buffer.count))")
           
        bufferMutex.signal()
        bufferEmptyCount.signal()
        
        return rtn
    }
    
}


let concurrentQueue = DispatchQueue.global(qos: .userInitiated)

var buffer = Buffer(bufferSize: 100)

// Producer

func randomTimeInterval(_ range: Range<Int>) -> DispatchTimeInterval {
    return DispatchTimeInterval.milliseconds(Int.random(in: range))
}

func produce(i: Int, n: Int) {
    let nextTime = DispatchTime.now().advanced(by: randomTimeInterval(0..<10))
    concurrentQueue.asyncAfter(deadline: nextTime) {
        buffer.push(num: i)
        if i < n {
            produce(i: i + 1, n: n)
        }
    }
}


let dispatchGroup = DispatchGroup()

// Consumer
func consume(i: Int, n: Int) {
    let nextTime = DispatchTime.now().advanced(by: randomTimeInterval(0..<10))
    concurrentQueue.asyncAfter(deadline: nextTime) {
            _ = buffer.pop()
            if i < n {
                consume(i: i + 1, n: n)
            }
            dispatchGroup.leave()
    }
    dispatchGroup.enter()
}

produce(i: 0, n: 1000)
consume(i: 0, n: 1000)


dispatchGroup.wait()



Note this solution would also work for multiple consumer and producers as the semaphors take care of counting the ready and free slots.

tomkausch

Leave a Reply

Your email address will not be published. Required fields are marked *