Reader small image

You're reading from  Effective Concurrency in Go

Product typeBook
Published inApr 2023
PublisherPackt
ISBN-139781804619070
Edition1st Edition
Concepts
Right arrow
Author (1)
Burak Serdar
Burak Serdar
author image
Burak Serdar

Burak Serdar is a software engineer with over 30 years of experience in designing and developing distributed enterprise applications that scale. He's worked for several start-ups and large corporations, including Thomson and Red Hat, as an engineer and technical lead. He's one of the co-founders of Cloud Privacy Labs where he works on semantic interoperability and privacy technologies for centralized and decentralized systems. Burak holds BSc and MSc degrees in electrical and electronics engineering, and an MSc degree in computer science.
Read more about Burak Serdar

Right arrow

Condition variables

Condition variables differ from the previous concurrency primitives in the sense that, for Go, they are not an essential concurrency tool because, in most cases, a condition variable can be replaced with a channel. However, especially for shared memory systems, condition variables are important tools for synchronization. For example, the Java language builds one of its core synchronization features using condition variables.

A well-known problem of concurrent computing is the producer-consumer problem. There are one or more producer threads that produce a value. These values are consumed by one or more consumer threads. Since all producers and consumers are running concurrently, sometimes there are not enough values produced to satisfy all the consumers, and sometimes there are not enough consumers to consume values produced by the producers. There is usually a finite queue of values into which producers put, and from which consumers retrieve. There is already an elegant solution to this problem: use a channel. All producers write to the channel, and all consumers read from it, and the problem is solved. But in a shared memory system, a condition variable is usually employed for such a situation. A condition variable is a synchronization mechanism where multiple goroutines wait for a condition to occur, and another goroutine announces the occurrence of the condition to the waiting goroutines.

A condition variable supports three operations, as follows:

  • Wait: Blocks the current goroutine until a condition happens
  • Signal: Wakes up one of the waiting goroutines when the condition happens
  • Broadcast: Wakes up all of the waiting goroutines when the condition happens

Unlike the other concurrency primitives, a condition variable needs a mutex. The mutex is used to lock the critical sections in the goroutines that modify the condition. It does not matter what the condition is; what matters is that the condition can only be modified in a critical section and that critical section must be entered by locking the mutex used to construct the condition variable, as shown in the following code:

lock := sync.Mutex{}
cond := sync.NewCond(&lock)

Now let’s implement the producers-consumers problem using this condition variable. Our producers will produce integers and place them in a circular queue. The queue has a finite capacity, so the producer must wait until a consumer consumes from the queue if the queue is full. That means we need a condition variable that will cause the producers to wait until a consumer consumes a value. When the consumer consumes a value, the queue will have more space, and the producer can use it, but then the consumer who consumed that value has to signal the waiting producers that there is space available. Similarly, if consumers consume all the values before producers can produce new ones, the consumers have to wait until new values are available. So, we need another condition variable that will cause the consumers to wait until a producer produces a value. When a producer produces a new value, it has to signal to the waiting consumers that a new value is available.

Let’s start with a simple circular queue implementation:

type Queue struct {
    elements    []int
    front, rear int
    len         int
}
// NewQueue initializes an empty circular queue 
//with the given capacity
func NewQueue(capacity int) *Queue {
    return &Queue{
    elements: make([]int, capacity),
    front:    0,  // Read from elements[front]
    rear:     -1, // Write to elements[rear]
    len:      0,
    }
}
// Enqueue adds a value to the queue. Returns false 
// if queue is full
func (q *Queue) Enqueue(value int) bool {
    if q.len == len(q.elements) {
         return false
    }
    // Advance the write pointer, go around in a circle
    q.rear = (q.rear + 1) % len(q.elements)
    // Write the value
    q.elements[q.rear] = value
    q.len++
    return true
}
// Dequeue removes a value from the queue. Returns 0,false 
// if queue is empty
func (q *Queue) Dequeue() (int, bool) {
    if q.len == 0 {
          return 0, false
    }
    // Read the value at the read pointer
    data := q.elements[q.front]
    // Advance the read pointer, go around in a circle
    q.front = (q.front + 1) % len(q.elements)
    q.len--
    return data, true
}

We need a lock, two condition variables, and a circular queue:

func main() {
     lock := sync.Mutex{}
     fullCond := sync.NewCond(&lock)
     emptyCond := sync.NewCond(&lock)
     queue := NewQueue(10)

Here is the producer function. It runs in an infinite loop, producing random integer values:

producer := func() {
    for {
        // Produce value
        value := rand.Int()
        lock.Lock()
        for !queue.Enqueue(value) {
            fmt.Println("Queue is full")
            fullCond.Wait()
        }
        lock.Unlock()
        emptyCond.Signal()
        time.Sleep(time.Millisecond *
          time.Duration(rand.Intn(1000)))
    }
}

The producer generates a random integer, enters into its critical section, and attempts to enqueue the value. If it is successful, it unlocks the mutex and signals one of the consumers, letting it know that a value has been generated. If there are no consumers waiting on the emptyCond variable at that point, the signal is lost. If, however, the queue is full, then the producer starts waiting on the fullCond variable. Note that Wait is called in the critical section, with the mutex locked. When called, Wait atomically unlocks the mutex and suspends the execution of the goroutine. While waiting, the producer is no longer in its critical section, allowing the consumers to go into their own critical sections. When a consumer consumes a value, it will signal fullCond, which will wake one of the waiting producers up. When the producer wakes up, it will lock the mutex again. Waking up and locking the mutex is not atomic, which means, when Wait returns, the condition that woke up the goroutine may no longer hold, so Wait must be called inside a loop to recheck the condition. When the condition is rechecked, the goroutine will be in its critical section again, so no race conditions are possible.

The consumer is as follows:

consumer := func() {
for {
     lock.Lock()
     var v int
     for {
           var ok bool
           if v, ok = queue.Dequeue(); !ok {
                fmt.Println("Queue is empty")
                emptyCond.Wait()
                continue
           }
           break
}
     lock.Unlock()
     fullCond.Signal()
     time.Sleep(time.Millisecond * 
       time.Duration(rand.Intn(1000)))
     fmt.Println(v)
     }
}

Note the symmetry between the producer and the consumer. The consumer enters into its critical section and attempts to dequeue a value inside a for loop. If the queue has a value in it, it is read, the for loop terminates, and the mutex is unlocked. Then the goroutine notifies any potential producers that a value is read from the queue, so it is likely that the queue is not full. By the time the consumer exists in its critical section and signals the producer, it is possible that another producer produced values to fill up the queue. That’s why the producer has to check the condition again when it wakes up. The same logic applies to the consumer: if the consumer cannot read a value, it starts waiting, and when it wakes up, it has to check whether the queue has elements in it to be consumed.

The rest of the program is as follows:

for i := 0; i < 10; i++ {
     go producer()
}
for i := 0; i < 10; i++ {
     go consumer()
}
select {} // Wait indefinitely

You can run this program with different numbers of producers and consumers, and see how it behaves. When there are more producers than consumers, you should see more messages on the queue being full, and when there are more consumers than producers, you should see more messages on the queue being empty.

Previous PageNext Page
You have been reading a chapter from
Effective Concurrency in Go
Published in: Apr 2023Publisher: PacktISBN-13: 9781804619070
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at €14.99/month. Cancel anytime

Author (1)

author image
Burak Serdar

Burak Serdar is a software engineer with over 30 years of experience in designing and developing distributed enterprise applications that scale. He's worked for several start-ups and large corporations, including Thomson and Red Hat, as an engineer and technical lead. He's one of the co-founders of Cloud Privacy Labs where he works on semantic interoperability and privacy technologies for centralized and decentralized systems. Burak holds BSc and MSc degrees in electrical and electronics engineering, and an MSc degree in computer science.
Read more about Burak Serdar