Home Programming Effective Concurrency in Go

Effective Concurrency in Go

By Burak Serdar
ai-assist-svg-icon Book + AI Assistant
eBook + AI Assistant $31.99 $21.99
Print $39.99
Subscription $15.99 $10 p/m for three months
ai-assist-svg-icon NEW: AI Assistant (beta) Available with eBook, Print, and Subscription.
ai-assist-svg-icon NEW: AI Assistant (beta) Available with eBook, Print, and Subscription. $10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime! ai-assist-svg-icon NEW: AI Assistant (beta) Available with eBook, Print, and Subscription.
What do you get with a Packt Subscription?
Gain access to our AI Assistant (beta) for an exclusive selection of 500 books, available during your subscription period. Enjoy a personalized, interactive, and narrative experience to engage with the book content on a deeper level.
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
Gain access to our AI Assistant (beta) for an exclusive selection of 500 books, available during your subscription period. Enjoy a personalized, interactive, and narrative experience to engage with the book content on a deeper level.
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Along with your eBook purchase, enjoy AI Assistant (beta) access in our online reader for a personalized, interactive reading experience.
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
ai-assist-svg-icon NEW: AI Assistant (beta) Available with eBook, Print, and Subscription. ai-assist-svg-icon NEW: AI Assistant (beta) Available with eBook, Print, and Subscription. BUY NOW $10 p/m for first 3 months. $15.99 p/m after that. Cancel Anytime! ai-assist-svg-icon NEW: AI Assistant (beta) Available with eBook, Print, and Subscription.
eBook + AI Assistant $31.99 $21.99
Print $39.99
Subscription $15.99 $10 p/m for three months
What do you get with a Packt Subscription?
Gain access to our AI Assistant (beta) for an exclusive selection of 500 books, available during your subscription period. Enjoy a personalized, interactive, and narrative experience to engage with the book content on a deeper level.
This book & 7000+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook + Subscription?
Download this book in EPUB and PDF formats, plus a monthly download credit
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with a Packt Subscription?
Gain access to our AI Assistant (beta) for an exclusive selection of 500 books, available during your subscription period. Enjoy a personalized, interactive, and narrative experience to engage with the book content on a deeper level.
This book & 6500+ ebooks & video courses on 1000+ technologies
60+ curated reading lists for various learning paths
50+ new titles added every month on new and emerging tech
Early Access to eBooks as they are being written
Personalised content suggestions
Customised display settings for better reading experience
50+ new titles added every month on new and emerging tech
Playlists, Notes and Bookmarks to easily manage your learning
Mobile App with offline access
What do you get with eBook?
Along with your eBook purchase, enjoy AI Assistant (beta) access in our online reader for a personalized, interactive reading experience.
Download this book in EPUB and PDF formats
Access this title in our online reader
DRM FREE - Read whenever, wherever and however you want
Online reader with customised display settings for better reading experience
What do you get with video?
Download this video in MP4 format
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with video?
Stream this video
Access this title in our online reader
DRM FREE - Watch whenever, wherever and however you want
Online reader with customised display settings for better learning experience
What do you get with Audiobook?
Download a zip folder consisting of audio files (in MP3 Format) along with supplementary PDF
What do you get with Exam Trainer?
Flashcards, Mock exams, Exam Tips, Practice Questions
Access these resources with our interactive certification platform
Mobile compatible-Practice whenever, wherever, however you want
  1. Free Chapter
    Chapter 2: Go Concurrency Primitives
About this book
The Go language has been gaining momentum due to its treatment of concurrency as a core language feature, making concurrent programming more accessible than ever. However, concurrency is still an inherently difficult skill to master, since it requires the development of the right mindset to decompose problems into concurrent components correctly. This book will guide you in deepening your understanding of concurrency and show you how to make the most of its advantages. You’ll start by learning what guarantees are offered by the language when running concurrent programs. Through multiple examples, you will see how to use this information to develop concurrent algorithms that run without data races and complete successfully. You’ll also find out all you need to know about multiple common concurrency patterns, such as worker pools, asynchronous pipelines, fan-in/fan-out, scheduling periodic or future tasks, and error and panic handling in goroutines. The central theme of this book is to give you, the developer, an understanding of why concurrent programs behave the way they do, and how they can be used to build correct programs that work the same way in all platforms. By the time you finish the final chapter, you’ll be able to develop, analyze, and troubleshoot concurrent algorithms written in Go.
Publication date:
April 2023
Publisher
Packt
Pages
212
ISBN
9781804619070

 

Go Concurrency Primitives

This chapter is about the fundamental concurrency facilities of the Go language. We will first talk about goroutines and channels, the two concurrency building blocks that are defined by the language. Then, we also look at some of the concurrency utilities that are included in the standard library. We will cover the following topics:

  • Goroutines
  • Channels and the select statement
  • Mutexes
  • Wait groups
  • Condition variables

By the end of this chapter, you will have enough under your belt to tackle basic concurrency problems using language features and standard library objects.

 

Technical Requirements

The source code for this particular chapter is available on GitHub at https://github.com/PacktPublishing/Effective-Concurrency-in-Go/tree/main/chapter2.

 

Goroutines

First, some basics.

A process is an instance of a program with certain dedicated resources, such as memory space, processor time, file handles (for example, most processes in Linux have stdin, stdout, and stderr), and at least one thread. We call it an instance because the same program can be used to create many processes. In most general-purpose operating systems, every process is isolated from the others, so any two processes that wish to communicate have to do it through well-defined inter-process communication utilities. When a process terminates, all the memory allocated for the process is freed, all open files are closed, and all threads are terminated.

A thread is an execution context that contains all the resources required to run a sequence of instructions. Usually, this contains a stack and the values of processor registers. The stack is necessary to keep the sequence of nested function calls within that thread, as well as to store values declared in the functions executing in that thread. A given function may execute in many different threads, so the local variables used when that function runs in a thread are stored in the stack of that thread. A scheduler allocates processor time to threads. Some schedulers are preemptive and can stop a thread at any time to switch to another thread. Some schedulers are collaborative and have to wait for the thread to yield to switch to another one. A thread is usually managed by the operating system.

A goroutine is an execution context that is managed by the Go runtime (as opposed to a thread that is managed by the operating system). A goroutine usually has a much smaller startup overhead than an operating system thread. A goroutine starts with a small stack that grows as needed. Creating new goroutines is faster and cheaper than creating operation system threads. The Go scheduler assigns operating system threads to run goroutines.

In a Go program, goroutines are created using the go keyword followed by a function call:

go f()
go g(i,j)
go func() {
...
}()
go func(i,j int) {
...
}(1,2)

The go keyword starts the given function in a new goroutine. The existing goroutine continues running concurrently with the newly created goroutine. The function running as a goroutine can take parameters, but it cannot return a value. The parameters of the goroutine function are evaluated before the goroutine starts and passed to the function once the goroutine starts running.

You may ask why there was a need to develop a completely new threading system. Just to get lightweight threads? Goroutines are more than just lightweight threads. They are the key to increasing throughput by efficiently sharing processing power among goroutines that are ready to run. Here’s the gist of the idea.

The number of operating system threads used by the Go runtime is equal to the number of processors/cores on the platform (unless you change this by setting the GOMAXPROCS environment variable or by calling the runtime.GOMAXPROCS function). This is the number of things the platform can do in parallel. Anything more than that and the operating system will have to resort to time sharing. With GOMAXPROCS threads running in parallel, there is no context-switching overhead at the operating system level. The Go scheduler assigns goroutines to operating system threads to get more work on each thread as opposed to doing less work on many threads. The smaller context switching is not the only reason why the Go scheduler performs better than the operating system scheduler. The Go scheduler performs better because it knows which goroutines to wake up to get more out of them. The operating system does not know about channel operations or mutexes, which are all managed in the user space by the Go runtime.

There are some more subtle differences between threads and goroutines. Threads usually have priorities. When a low-priority thread competes with a high-priority thread for a shared resource, the high-priority thread has a better chance of getting it. Goroutines do not have pre-assigned priorities. That said, the language specification allows for a scheduler that favors certain goroutines. For example, later versions of the Go runtime include scheduling algorithms that will select starving goroutines. In general, though, a correct concurrent Go program should not rely on scheduling behavior. Many languages have facilities such as thread pools with configurable scheduling algorithms. These facilities are developed based on the assumption that thread creation is an expensive operation, which is not the case for Go. Another difference is how goroutines stacks are managed. A goroutine starts with a small stack (Go runtimes after 1.19 use a historical average, earlier versions use 2K), and every function call checks whether the remaining stack space is sufficient. If not, the stack is resized. An operation system thread usually starts with a much larger stack (in the order of megabytes) that usually does not grow.

The Go runtime starts several goroutines when a program starts. Exactly how many depends on the implementation and may change between versions. However, there is at least one for the garbage collector and another for the main goroutine. The main goroutine simply calls the main function and terminates the program when it returns. When main returns and the program exits, all running goroutines terminate abruptly, mid-function, without a chance to perform any cleanup.

Let’s look at what happens when we create a goroutine:

func f() {
     fmt.Println("Hello from goroutine")
}
func main() {
     go f()
     fmt.Println("Hello from main")
     time.Sleep(100)
}

This program starts with the main goroutine. When the go f() statement is run, a new goroutine is created. Remember, a goroutine is an execution context, which means the go keyword causes the runtime to allocate a new stack and set it up to run the f() function. Then this goroutine is marked as ready to run. The main goroutine continues running without waiting for f() to be called, and prints Hello from main to console. Then it waits for 100 milliseconds. During this time, the new goroutine may start running, call f(), and print Hello from goroutine. fmt.Println has mutual exclusion built in to ensure that the two goroutines do not corrupt each other’s outputs.

This program can output one of the following options:

  • Hello from main, then Hello from goroutine: This is the case when the main goroutine first prints the output, then the goroutine prints it.
  • Hello from goroutine, then Hello from main: This is the case when the goroutine created in main() runs first, and then the main goroutine prints the output.
  • Hello from main: This is the case when the main goroutine continues running, but the new goroutine never finds a chance to run in the given 100 milliseconds, causing main to return. Once main returns, the program terminates without the goroutine ever finding a chance to run. It is unlikely that this case is observable, but it is possible.

Functions that take arguments can run as goroutines:

func f(s string) {
     fmt.Printf("Goroutine %s\n", s)
}
func main() {
     for _, s := range []string{"a", "b", "c"} {
          go f(s)
     }
     time.Sleep(100)
}

Every run of this program is likely to print out a, b, and c in random order. This is because the for loop creates three goroutines, each called with the current value of s, and they can run in any order the scheduler picks them. Of course, if all goroutines do not finish within the given 100 milliseconds, some strings may be missing from the output.

Naturally, this can be done with an anonymous function as well. But now, things get interesting:

func main() {
     for _, s := range []string{"a", "b", "c"} {
           go func() {
                fmt.Printf("Goroutine %s\n", s)
           }()
     }
     time.Sleep(100)
}

Here’s the output:

Goroutine c
Goroutine c
Goroutine c

So, what is going on here?

First, this is a data race, because there is a shared variable that is written by one goroutine and read by three others without any synchronization. This becomes more evident if we unroll the for loop, as follows:

func main() {
     var s string
     s = "a"
     go func() {
           fmt.Printf("Goroutine %s\n", s)
     }()
     
     s = "b"
     go func() {
           fmt.Printf("Goroutine %s\n", s)
     }()
     
     s = "c"
     go func() {
           fmt.Printf("Goroutine %s\n", s)
     }()
     
     time.Sleep(100)
}

In this example, each anonymous function is a closure. We are running three goroutines, each with a closure that captures the s variable from the enclosing scope. Because of that, we have three goroutines that read the shared s variable, and one goroutine (the main goroutine) writing to it concurrently. This is a data race. In the preceding run, all three goroutines ran after the last assignment to s. There are other possible runs. In fact, this program may even run correctly and print the expected output.

That is the danger of data races. A program such as this rarely runs correctly, so it is easy to diagnose and fix before code is deployed in a production environment. The data races that rarely give the wrong output usually make it to production and cause a lot of trouble.

Let’s look at how closures work in more detail. They are the cause of many misunderstandings in Go development because simply refactoring a declared function as an anonymous function may have unexpected consequences.

A closure is a function with a context that includes some variables included in its enclosing scope. In the preceding example, there are three closures, and each closure captures the s variable from their scope. The scope defines all symbol names accessible at a given point in a program. In Go, the scope is determined syntactically, so where we declare the anonymous function, the scope includes all the exported functions, variables, type names, the main function, and the s variable. The Go compiler analyzes the source code to determine whether a variable defined in a function may be referenced after that function returns. This is the case when, for instance, you pass a pointer to a variable defined in one function to another function. Or when you assign a global pointer variable to a variable defined in a function. Once the function declaring that variable returns, the global variable will be pointing to a stale memory location. Stack locations come and go as functions enter and return. When such a situation is detected (or even a potential for such a situation is detected, such as creating a goroutine or calling another function), the variable escapes to the heap. That is, instead of allocating that variable on the stack, the compiler allocates the variable dynamically on the heap, so even if the variable leaves scope, its contents remain accessible. This is exactly what is happening in our example. The s variable escapes to the heap because there are goroutines that can continue running and accessing that variable even after main returns. This situation is depicted in Figure 2.1:

Figure 2.1 – Closures

Figure 2.1 – Closures

Closures as goroutines can be a very powerful tool, but they must be used carefully. Most closures running as goroutines share memory, so they are prone to races.

We can fix our program by creating a copy of the s variable at each iteration. The first iteration sets s to "a". We create a copy of it and capture that copy in the closure. Then the next iteration sets s to "b". This is fine because the closure created during the first iteration is still using "a". We create a new copy of s, this time with a value of "b", and this goes on. This is shown in the following code:

for _, s := range []string{"a", "b", "c"} {
     s:=s // Redeclare s, create a copy
     // Here, the redeclared s shadows the loop variable s
     go func() {…}
}

Another way is to pass it as a parameter:

for _, s := range []string{"a", "b", "c"} {
     go func(s string) {
           fmt.Printf("Goroutine %s\n", s)
     }(s) // This will pass a copy of s to the function
}

In either solution, the s loop variable no longer escapes to the heap, because a copy of it is captured. In the first solution using a redeclared variable, the copy escapes to the heap, but the s loop variable doesn’t.

One of the frequently asked questions regarding goroutines is: how do we stop a running goroutine? There is no magic function that will terminate or pause a goroutine. If you want to stop a goroutine, you have to send some message or set a flag shared with the goroutine, and the goroutine either has to respond to the message or read the shared variable and return. If you want to pause it, you have to use one of the synchronization mechanisms to block it. This fact causes some anxiety among developers who cannot find an effective way to terminate their goroutines. However, this is one of the realities of concurrent programming. The ability to create concurrent execution blocks is only one part of the problem. Once created, you have to be mindful of how to terminate them responsibly.

A panic can terminate a goroutine. If a panic happens in a goroutine, it is propagated up the call stack until a recover is found, or until the goroutine returns. This is called stack unwinding. If a panic is not handled, a panic message will be printed and the program will crash.

Before closing this topic, it might be helpful to talk about how Go runtime manages goroutines. Go uses an M:N scheduler that runs M goroutines on N OS threads. Internally, the Go runtime keeps track of the OS threads and the goroutines. When an OS thread is ready to execute a goroutine, the scheduler selects one that is ready to run and assigns it to the thread. The OS thread runs that goroutine until it blocks, yields, or is preempted. There are several ways a goroutine can be blocked. Blocking by channel operations or mutexes is managed by the Go runtime. If the goroutine is blocked because of a synchronous I/O operation, then the thread running that goroutine will also be blocked (this is managed by the operating system). In this case, the Go runtime starts a new thread or uses one already available and continues operation. When the OS thread unblocks (that is, the I/O operation ends), the thread is put back into use or returned to the thread pool. The Go runtime limits the number of active OS threads running user goroutines with the GOMAXPROCS variable. However, there is no limit on the number of OS threads waiting for I/O operations. So, the actual OS thread count a Go program uses can be much higher than GOMAXPROCS. However, only GOMAXPROCS of those threads would be executing user goroutines.

Figure 2.2 illustrates this. Suppose GOMAXPROCS=2. Thread 1 and Thread 2 are operating system threads that are executing goroutines. Goroutine G1, which is running on Thread 1, performs a synchronous I/O operation, blocking Thread 1. Since Thread 1 is no longer operational, the Go runtime allocates Thread 3 and continues running goroutines. Note that even though there are three operating system threads, there are two active threads and one blocked thread. When the system call running on Thread 1 completes, the goroutine G1 becomes runnable again, but there is one extra thread now. The Go runtime continues running with Thread 3 and stops using Thread 1.

Figure 2.2 – System calls block OS threads

Figure 2.2 – System calls block OS threads

A similar process happens for asynchronous I/O operations, such as network operations and some file operations on certain platforms. However, instead of blocking a thread for a system call, the goroutine is blocked, and a netpoller thread is used to wait for asynchronous events. When the netpoller receives events, it wakes up the relevant goroutines.

 

Channels

Channels allow goroutines to share memory by communicating, as opposed to communicating by sharing memory. When you are working with channels, you have to keep in mind that channels are two things combined together: they are synchronization tools, and they are conduits for data.

You can declare a channel by specifying its type and its capacity:

ch:=make(chan int,2)

The preceding declaration creates and initializes a channel that can carry integer values with a capacity of 2. A channel is a first-in, first-out (FIFO) conduit. That is, if you send some values to a channel, the receiver will receive those values in the order they were written. Use the following syntax to send to or receive from channels:

ch <- 1   // Send 1 to the channel
<- ch     // Receive a value from the channel
x= <- ch  // Receive value from the channel and assign it to x
x:= <- ch // Receive value from the channel, declare variable x
          // using the same type as the value read (which is
// int),and assign the value to x.

The len() and cap() functions work as expected for channels. The len() function will return the number of items waiting in the channel, and cap() will return the capacity of the channel buffer. The availability of these functions doesn’t mean the following code is correct, though:

// Don't do this!
if len(ch) > 0 {
    x := <-ch
}

This code checks whether the channel has some data in it, and, seeing that it does, reads it. This code has a race condition. Even though the channel may have data in it when its length is checked, another goroutine may receive it by the time this goroutine attempts to do so. In other words, if len(ch) returns a non-zero value, it means that the channel had some values when its length was checked, but it doesn’t mean that it has some values after the len function returns.

Figure 2.3 illustrates a possible sequence of operations with this channel using two goroutines. The first goroutine sends values 1 and 2 to the channel, which are stored in the channel buffer (len(ch)=2, cap(ch)=2). Then the other goroutine receives 1. At this point, a value of 2 is the next one to be read from the channel, and the channel buffer only has one value in it. The first goroutine sends 3. The channel is full, so the operation to send 4 to the channel blocks. When the second goroutine receives the 2 value from the channel, the send by the first goroutine succeeds, and the first goroutine wakes up.

Figure 2.3 – Possible sequence of operations with a buffered channel of capacity 2

Figure 2.3 – Possible sequence of operations with a buffered channel of capacity 2

This example shows that a send operation to a channel will block until the channel is ready to accept a value. If the channel is not ready to accept the value, the send operation blocks.

Similarly, Figure 2.4 shows a blocking receive operation. The first goroutine sends 1, and the second goroutine receives it. Now len(ch)=0, so the next receive operation by the second goroutine blocks. When the first goroutine sends a value of 2 to the channel, the second goroutine receives that value and wakes up.

Figure 2.4 – Blocking receive operation

Figure 2.4 – Blocking receive operation

So, a receive from a channel will block until the channel is ready to provide a value.

A channel is actually a pointer to a data structure that contains its internal state, so the zero-value of a channel variable is nil. Because of that, channels must be initialized using the make keyword. If you forget to initialize a channel, it will never be ready to accept a value, or provide a value, thus reading from or writing to a nil channel will block indefinitely.

The Go garbage collector will collect channels that are no longer in use. If there are no goroutines that directly or indirectly reference a channel, the channel will be garbage collected even if its buffer has elements in it. You do not need to close channels to make them eligible for garbage collection. In fact, closing a channel has more significance than just cleaning up resources.

You may have noticed sending and receiving data using channels is a one-to-one operation: one goroutine sends, and another receives the data. It is not possible to send data that will be received by many goroutines using one channel. However, closing a channel is a one-time broadcast to all receiving goroutines. In fact, that is the only way to notify multiple goroutines at once. This is a very useful feature, especially when developing servers. For example, the net/http package implements a Server type that handles each request in a separate goroutine. An instance of context.Context is passed to each request handler that contains a Done() channel. If, for example, the client closes the connection before the request handler can prepare the response, the handler can check to see whether the Done() channel is closed and terminate processing prematurely. If the request handler creates goroutines to prepare the response, it should pass the same context to these goroutines, and they will all receive the cancellation notice once the Done() channel closes. We will talk about how to use context.Context later in the book.

Receiving from a closed channel is a valid operation. In fact, a receive from a closed channel will always succeed with the zero value of the channel type. Writing to a closed channel is a bug: writing to a closed channel will always panic.

Figure 2.5 depicts how closing a channel works. This example starts with one goroutine sending 1 and 2 to the channel and then closing it. After the channel is closed, sending more data to it will cause a panic. The channel keeps the information that the channel is closed as one of the values in its buffer, so receive operations can still continue. The goroutine receives the 1 and 2 values, and then every read will return the zero value for the channel type, in this case, the 0 integer.

Figure 2.5 – Closing a channel

Figure 2.5 – Closing a channel

For a receiver, it is usually important to know whether the channel was closed when the read happened. Use the following form to test the channel state:

y, ok := <-ch

This form of channel receive operation will return the received value and whether or not the value was a real receive or whether the channel is closed. If ok=true, the value was received. If ok=false, the channel was closed, and the value is simply the zero value. A similar syntax does not exist for sending because sending to a closed channel will panic.

What happens when a channel is created without a buffer? Such a channel is called an unbuffered channel, and behaves in the same way as a buffered channel, but with len(ch)=0 and cap(ch)=0. Thus, a send operation will block until another goroutine receives from it. A receive operation will block until another goroutine sends to it. In other words, an unbuffered channel is a way to transfer data between goroutines atomically. Let’s look at how unbuffered channels are used to send messages and to synchronize goroutines using the following snippet:

1: chn := make(chan bool) // Create an unbuffered channel
2: go func() {
3:     chn <- true  // Send to channel
4: }()
5: go func() {
6:    var y bool
7:     y <-chn      // Receive from channel
8:    fmt.Println(y)
9: }()

Line 1 creates an unbuffered bool channel.

Line 2 creates the G1 goroutine and line 5 creates the G2 goroutine.

There are two possible runs at this point: G1 attempts to send (line 3) before G2 is ready to receive (line 7), or G2 attempts to receive (line 7) before G1 is ready to send (line 3). The first diagram in Figure 2.6 illustrates the case where G1 runs first. At line 3, G1 attempts to send to the channel. However, at this point, G2 is still not ready to receive. Since the channel is unbuffered and there are no receivers available, G1 blocks.

Figure 2.6 – Two possible runs using an unbuffered channel

Figure 2.6 – Two possible runs using an unbuffered channel

After a while, G2 executes line 7. This is a channel receive operation, and there is a goroutine (G1) waiting to send to it. Because of this, the first G1 is unblocked and sends the value to the channel, and G2 receives it without blocking. It is now up to the scheduler to decide when G1 can run.

The second possible scenario, where G2 runs first, is shown in Figure 2.6 on the right-hand side. Since G1 is not yet sent to the channel, G2 blocks. When G1 is ready to send, G2 is already waiting to receive, so G1 does not block and sends the value, and, G2 unblocks and receives the value. The scheduler decides when G2 can run again.

Note that an unbuffered channel acts as a synchronization point between two goroutines. Both goroutines must align for the message transfer to happen.

A word of caution is necessary here. Transferring a value from one goroutine to another transfers a copy of the value. So, if a goroutine runs ch<-x and sends the value of x, and another goroutine receives it with y<-ch, then this is equivalent to y=x, with additional synchronization guarantees. The crucial point here is that it does not transfer the ownership of the value. If the transferred value is a pointer, you end up with a shared memory system. Consider the following program:

type Data struct {
     Values map[string]interface{}
}
func processData(data Data,pipeline chan Data) {
     data.Values = getInitialValues()  // Initialize the
                                       // map
     pipeline <- data        // Send data to another
                             // goroutine for processing
     data.Values["status"] = "sent"    // Possible data
                                       // race!
}

The processData function initializes the Values map and then sends the data to another goroutine for processing. But a map is actually a pointer to a complex map structure. When data is sent through the channel, the receiver receives a copy of the pointer to the same map structure. If the receiving goroutine reads from or writes to the Values map, that operation will be concurrent with the write operation shown in the preceding code snippet. That is a data race.

So, as a convention, it is a good practice to assume that if a value is sent via a channel, the ownership of the value is also transferred, and you should not use a variable after sending it via a channel. You can redeclare it or throw it away. If you have to, include an additional mechanism, such as a mutex, so you can coordinate goroutines after the value becomes shared.

A channel can be declared with a direction. Such channels are useful as function arguments, or as function return values:

var receiveOnly <-chan int // Can receive, cannot
                           // write or close
var sendOnly chan<- int    // Can send, cannot read
                           // or close

The benefit of this declaration is type safety: a function that takes a send-only channel as an argument cannot receive from or close that channel. A function that gets a receive-only channel returned from a function can only receive data from that channel but cannot send data or close it, for example:

func streamResults() <-chan Data {
     resultCh := make(chan Data)
     go func() {
           defer close(resultCh)
           results := getResults()
           for _, result := range results {
                resultCh <- result
           }
     }()
     return resultCh
}

This is a typical way of streaming the results of a query to the caller. The function starts by declaring a bidirectional channel but returns it as a directional one. This tells the caller that it is only supposed to read from that channel. The streaming function will write to it and close it when everything is done.

So far, we have looked at channels in the context of two goroutines. But channels can be used to communicate with many goroutines. When multiple goroutines attempt to send to a channel or when multiple goroutines attempt to read from a channel, they are scheduled randomly. There are many implications of this simple rule.

You can create many worker goroutines, all receiving from a channel. Another goroutine sends work items to the channel, and each work item will be picked up by an available worker goroutine and processed. This is useful for worker pool patterns where many goroutines work on a list of tasks concurrently. Then, you can have one goroutine reading from a channel that is written by many worker goroutines. The reading goroutine will collect the results of computations performed by those goroutines. The following program illustrates this idea:

 1: workCh := make(chan Work)
 2: resultCh := make(chan Result)
 3: done := make(chan bool)
 4:
 5: // Create 10 worker goroutines
 6: for i := 0; i < 10; i++ {
 7:     go func() {
 8:         for {
 9:            // Get work from the work channel
10:            work := <- workCh
11:            // Compute result
12:            // Send the result via the result channel
13:            resultCh <- result
14:        }
15:    }()
16: }
17: results := make([]Result, 0)
18: go func() {
19:    // Collect all the results.
20:    for _, i := 0; i < len(workQueue); i++ {
21:        results = append(results, <-resultCh)
22:    }
23:    // When all the results are collected, notify the done channel
24:    done <- true
25: }()
26: // Send all the work to the workers
27: for _, work := range workQueue {
28:    workCh <- work
29: }
30: // Wait until everything is done
31: <- done

This is an artificial example that illustrates how multiple channels can be used to coordinate work. There are two channels used for passing data around, workCh for sending work to goroutines, and resultCh to collect computed results. There is one channel, the done channel, to control program flow. This is required because we would like to wait until all the results are computed and stored in the slice before proceeding. The program starts by creating the worker goroutines and then creating a separate goroutine to collect the results. All these goroutines will be blocked, waiting to receive data (lines 10 and 21). The for loop at the main body will then iterate through the work queue and send the work items to the waiting worker goroutines (line 28). Each worker will receive the work (line 10), compute a result, and send the result to the collector goroutine (line 13), which will place them in a slice. The main goroutine will send all the work items and then block until it receives a value from the done channel (line 31), which will come after all the results are collected (line 24). As you can see, there is an ordering of channel operations in this program: 28 < 10 < 13 < 21 < 24 < 31. These types of orderings will be crucial in analyzing the concurrent execution of programs.

You may have noticed that in this program, all the worker goroutines leak; that is, they were never stopped. A good way to stop them is to close the work channel once we’re done writing to it. Then we can check whether the channel is closed in the worker:

for _, work := range workQueue {
     workCh <- work
}
close(workCh)

This will notify the workers that the work queue has been exhausted and the work channel is closed. We change the workers to check for this, as shown in the following code:

work, ok := <- workCh
if !ok {         // Is the channel closed?
     return      // Yes. Terminate
}

There is a more idiomatic way of doing this. You can range over a channel in a for loop, which will exit when the channel is closed:

go func() {
     for work := range workCh { // Receive until channel
                                //closes
           // Compute result
            // Send the result via the result channel
           resultCh <- result
     }
}()

With this change, all the running worker goroutines will terminate once the work channel is closed.

We will explore these patterns in greater detail later in the book. However, for now, these patterns bring up another question: how do we work with multiple channels? To answer this, we have to introduce the select statement. The following definition is from the Go language specification:

A select statement chooses which of a set of possible send or receive operations proceed.

The select statement looks like a switch-case statement:

select {
     case x := <-ch1:
     // Received x from ch1
     case y := <-ch2:
     // Received y from ch2
     case ch3 <- z:
     // Sent z to ch3
     default:
     // Optional default, if none of the other
     // operations can proceed
}

At a high level, the select statement chooses one of the send or receive operations that can proceed and then runs the block corresponding to the chosen operation. Note the past tense in the previous comments. The block for the reception of x from ch1 runs only after x is received from ch1. If there are multiple send or receive operations that can proceed, the select statement chooses one randomly. If there are none, the select statement chooses the default option. If a default option does not exist, the select statement blocks until one of the channel operations becomes available.

It follows from the preceding definitions that the following blocks indefinitely:

select {}

Using the default option in a select statement is useful for non-blocking sends and receives. The default option will only be chosen when all other options are not ready. The following is a non-blocking send operation:

select {
case ch<-x:
     sent = true
default:
}

The preceding select statement will test whether the ch channel is ready for sending data. If it is ready, the x value will be sent. If it is not, the execution will continue with the default option. Note that this only means that the ch channel was not ready for sending when it was tested. The moment the default option starts running, send to ch may become available.

Similarly, the following is a non-blocking receive:

select {
case x = <- ch:
     received = true
default:
}

One of the frequently asked questions about goroutines is how to stop them. As explained before, there is no magic function that will stop a goroutine in the middle of its operation. However, using a non-blocking receive and a channel to signal a stop request, you can terminate a long-running goroutine gracefully:

 1: stopCh := make(chan struct{})
 2: requestCh := make(chan Request)
 3: resultCh := make(chan Result)
 4: go func() {
 5:    for { // Loop indefinitely
 6:        var req Request
 7:        select {
 8:        case req = <-requestCh:
 9:            // Received a request to process
10:        case <-stopCh:
11:            // Stop requested, cleanup and return
12:            cleanup()
13:            return
14:        }
15:        // Do some processing
16:        someLongProcessing(req)
17:        // Check if stop requested before another long task
18:        select {
19:        case <-stopCh:
20:            // Stop requested, cleanup and return
21:            cleanup()
22:            return
23:        default:
24:        }
25:        // Do more processing
26:        result := otherLongProcessing(req)
27:        select {
28:        // Wait until resultCh becomes sendable, or stop requested
29:        case resultCh <- result:
30:            // Result is sent
31:        case <-stopCh:
32:            // Stop requested
33:            cleanup()
34:            return
35:        }
36:    }
37: }()

The preceding function works with three channels, one to receive requests from requestCh, one to send results to resultCh, and one to notify the goroutine of a request to stop stopCh. To send a stop request, the main goroutine simply closes the stop channel, which broadcasts all worker goroutines a request to stop.

The select statement at line 7 blocks until one of the channels, the request channel or the stop channel, has data to receive. If it receives from the stop channel, the goroutine cleans up and returns. If a request is received, then the goroutine processes it. The select statement at line 18 is a non-blocking read from the stop channel. If during the processing, stop is requested, it is detected here, and the goroutine can clean up and return. Otherwise, the processing continues, and a result is computed. The select statement at line 27 checks whether the listening goroutine is ready to receive the result or whether stop is requested. If the listening goroutine is ready, the result is sent, and the loop restarts. If the listening goroutine is not ready but stop is requested, the goroutine cleans up and returns. This select is a blocking select, so it will wait until it can transmit the result or receive the stop request and return. Note that for the select statement at line 27, if both the result channel and the stop channel are enabled, the choice is random. The goroutine may send the result channel and continue with the loop even if stop is requested. The same situation applies to the select statement in line 7. If both the request channel and the stop channel are enabled, the select statement may choose to read the request instead of stopping.

This example brings up a good point: in a select statement, all enabled channels have the same likelihood of being chosen; that is, there is no channel priority. Under heavy load, the previous goroutine may process many requests even after a stop is requested. One way to deal with such a situation is to double-check the higher-priority channel:

select {
case req = <-requestCh:
    // Received a request to process 
    // Check if also stop requested 
    select { 
    case <- stopCh: 
        cleanup() 
        return 
    default: 
    } 
    case <-stopCh: 
        // Stop requested, cleanup and return 
        cleanup() 
        return
}

This will re-check the stop request after receiving it from the request channel and return if stop is requested.

Also, note that the preceding implementations will lose the received request if they are stopped. If that is not desired side effect, then the cleanup process should put the request back into a queue.

Channels can be used to gracefully terminate a program based on a signal. This is important in a containerized environment where the orchestration platform may terminate a running container using a signal. The following code snippet illustrates this scenario:

var term chan struct{}
func main() {
    term = make(chan struct{})
    sig := make(chan os.Signal, 1)
    go func() {
        <-sig
        close(term)
    }()
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        for {
            select {
            case term:
                return
            default:
            }
            // Do work
        }
    }()
    // ...
}

This program will handle the interrupt and termination signals coming from the operating system by closing a global term channel. All workers check for the term channel and return whether the program is terminating. This gives the application the opportunity to perform cleanup operations before the program terminates. The channel that listens to the signals must be buffered because the runtime uses a non-blocking write to send signal messages.

Finally, let’s take a closer look at some of the interesting properties of select statements that may cause some misunderstandings. For example, the following is a valid select statement. When the channel becomes ready to receive, the select statement will choose one of the cases randomly:

select {
     case <-ch:
     case <-ch:
}

It is possible that the channel send or receive operation is not the first thing in a case block, for example:

func main() {
    var i int
    f := func() int {
          i++
         return i
    }
    ch1 := make(chan int)
    ch2 := make(chan int)
     select {
    case ch1 <- f():
    case ch2 <- f():
     default:
    }
    fmt.Println(i)
}

The preceding program uses a non-blocking send. There are no other goroutines, so the channel send operations cannot be chosen, but the f() function is still called for both cases. This program will print 2.

A more complicated select statement is as follows:

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go func() {
        ch2 <- 1
    }()
    go func() {
        fmt.Println(<-ch1)
    }()
    select {
     case ch1 <- <-ch2:
         time.Sleep(time.Second)
    default:
    }
}

In this program, there is a goroutine that sends to the ch2 channel, and a goroutine that receives from ch1. Both channels are unbuffered, so both goroutines will block at the channel operation. But the select statement has a case that receives a value from ch2 and sends it to ch1. What exactly is going to happen? Will the select statement make its decision based on the readiness of ch1 or ch2?

The select statement will immediately evaluate the arguments to the channel send operation. That means <-ch2 will run, without looking at whether it is ready to receive or not. If ch2 is not ready to receive, the select statement will block until it becomes ready even though there is a default case. Once the message from ch2 is received, the select statement will make its choice: if ch1 is ready to send the value, it will send it. If not, the default case will be selected.

 

Mutex

Mutex is short for mutual exclusion. It is a synchronization mechanism to ensure that only one goroutine can enter a critical section while others are waiting.

A mutex is ready to be used when declared. Once declared, a mutex offers two basic operations: lock and unlock. A mutex can be locked only once, so if a goroutine locks a mutex, all other goroutines attempting to lock it will block until the mutex is unlocked. This ensures only one goroutine enters a critical section.

Typical uses of mutexes are as follows:

var m sync.Mutex
func f() {
    m.Lock()
    // Critical section
    m.Unlock()
    }
func g() {
    m.Lock()
    defer m.Unlock()
    // Critical section
}

To ensure mutual exclusion for a critical section, the mutex must be a shared object. That is, a mutex defined for a particularly critical section must be shared by all the goroutines to establish mutual exclusion.

We will illustrate the use of mutexes with a realistic example. A common problem that has been solved many times is the caching problem: certain operations, such as expensive computations, I/O operations, or working with databases, are slow, so it makes sense to cache the results once you obtain them. But by definition, a cache is shared among many goroutines, so it must be thread-safe. The following example is a cache implementation that loads objects from a database and puts them in a map. If the object does not exist in the database, the cache also remembers that:

type Cache struct {
     mu sync.Mutex
     m map[string]*Data
}
func (c *Cache) Get(ID string) (Data, bool) {
     c.mu.Lock()
     data, exists := c.m[ID]
     c.mu.Unlock()
     if exists {
           if data == nil {
                return Data{}, false
           }
           return *data, true
     }
     data, loaded = retrieveData(ID)
     c.mu.Lock()
     defer c.mu.Unlock()
     d, exists := c.m[data.ID]
     if exists {
          return *d, true
     }
     if !loaded {
           c.m[ID] = nil
           return Data{}, false
     }
     c.m[data.ID] = data
     return *data, true
}

The Cache structure includes a mutex. The Get method starts with locking the cache. This is because Cache.m is shared between goroutines, and all read or write operations involving Cache.m must be done by only one goroutine. If there are other cache requests ongoing at that moment, this call will block until the other goroutines are done.

The first critical section simply reads the map to see whether the requested object is already in the cache. Note the cache is unlocked as soon as the critical section is completed to allow other goroutines to enter their critical sections. If the requested object is in the cache, or if the nonexistence of that object is recorded in the cache, the method returns. Otherwise, the method retrieves the object from the database. Since the lock is not held during this operation, other goroutines may continue using the cache. This may cause other goroutines to load the same object as well. Once the object is loaded, the cache is locked again because the loaded object must be put in the cache. This time, we can use defer c.mu.Unlock() to ensure the cache is unlocked once the method returns. There is a second check to see whether the object was already placed in the cache by another goroutine. This is possible because multiple goroutines can ask for the object using the same ID at the same time, and many goroutines may proceed to load the object from the database. Checking this again after acquiring the lock will make sure that if another goroutine has already put the object into the cache, it will not be overwritten with a new copy.

An important point to note here is that mutexes should not be copied. When you copy a mutex, you end up with two mutexes, the original and the copy, and locking the original will not prevent the copies from locking their copies as well. The go vet tool catches these. For instance, declaring the cache Get method using a value receiver instead of a pointer will copy the cache struct and the mutex:

func (c Cache) Get(ID string) (Data,bool) {…}

This will copy the mutex at every call, thus all concurrent Get calls will enter into the critical section with no mutual exclusion.

A mutex does not keep track of which goroutine locked it. This has some implications. First, locking a mutex twice from the same goroutine will deadlock that goroutine. This is a common problem with multiple functions that can call each other and also lock the same mutex:

var m sync.Mutex
func f() {
    m.Lock()
    defer m.Unlock()
    // process
}
func g() {
    m.Lock()
    defer m.Unlock()
    f() // Deadlock
}

Here, the g() function calls the f() function, but the m mutex is already locked, so f deadlocks. One way to correct this problem is to declare two versions of f, one with a lock and one without:

func f() {
    m.Lock()
    defer m.Unlock()
    fUnlocked()
}
func fUnlocked() {
    // process
}
func g() {
    m.Lock()
    defer m.Unlock()
    fUnlocked()
}

Second, there is nothing preventing an unrelated goroutine from unlocking a mutex locked by another goroutine. Such things tend to happen after refactoring algorithms and forgetting to change the mutex names during the process. They create very subtle bugs.

The functionality of a mutex can be replicated using a channel with a buffer size of 1:

var mutexCh = make(chan struct{},1)
func Lock() {
    mutexCh<-struct{}{}
}
func Unlock() {
    select {
    case <-mutexCh:
    default:
    }
}

Many times, such as in the preceding cache example, there are two types of critical sections: one for the readers and one for the writers. The critical section for the readers allows multiple readers to enter the critical section but does not allow a writer to go into the critical section until all readers are done. The critical section for writers excludes all other writers and all readers. This means that there can be many concurrent readers of a structure, but there can be only one writer. For this, an RWMutex mutex can be used. This mutex allows multiple readers or a single writer to hold the lock. The modified cache is shown as follows:

type Cache struct {
    mu sync.RWMutex // Use read/write mutex
    cache map[string]*Data
}
func (c *Cache) Get(ID string) (Data, bool) {
c.mu.RLock()
    data, exists := c.m[data.ID]
    c.mu.RUnlock()
    if exists {
         if data == nil {
             return Data{}, false
        }
    return *data, true
    }
    data, loaded = retrieveData(ID)
    c.mu.Lock()
    defer c.mu.Unlock()
    d, exists := c.m[data.ID]
    if exists {
        return *d, true
    }
    if !loaded {
        c.m[ID] = nil
               return Data{}, false
    }
    c.m[data.ID] = data
    return *data, true
}

Note that the first lock is a reader lock. It allows many reader goroutines to execute concurrently. Once it is determined that the cache needs to be updated, a writer lock is used.

 

Wait groups

A wait group waits for a collection of things, usually goroutines, to finish. It is essentially a thread-safe counter that allows you to wait until the counter reaches zero. A common pattern for their usage is this:

// Create a waitgroup
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
    // Add to the wait group **before** creating the 
    //goroutine
    wg.Add(1)
    go func() {
        // Make sure the waitgroup knows about
        // goroutine completion
         defer wg.Done()
         // Do work
    }()
}
// Wait until all goroutines are done
wg.Wait()

When you create a WaitGroup, it is initialized to zero, so a call to Wait will not wait for anything. So, you have to add the number of things it has to wait for before calling Wait. To do this, we call Add(n), where n is the number of things to add for waiting. It makes it easier for the reader to call Add(1) just before creating the thing to wait, which is, in this case, a goroutine. The main goroutine then calls Wait, which will wait until the wait group counter reaches zero. For that to happen, we have to make sure that the Done method is called for each goroutine that returns. Using a defer statement is the easiest way to ensure that.

A common use for a WaitGroup is in an orchestrator service that calls multiple services and collects the results. The orchestrator service has to wait for all the services to return to continue computation.

Look at the following example:

func orchestratorService() (Result1, Result2) {
    wg := sync.WaitGroup{}  // Create a WaitGroup
    wg.Add(1)     // Add the first goroutine
    var result1 Result1
    go func() {
         defer wg.Done() // Make sure waitgroup
                        // knows completion
         result1 = callService1() // Call service1
    }()
    wg.Add(1)     // Add the second goroutine
    var result2 Result2
    go func() {
        defer wg.Done()  // Make sure waitgroup
                         // knows completion
         result2 = callService2()  // Call service2
    }()
    wg.Wait()     // Wait for both services to return
    return result1, result2    // Return results
}

A common mistake when working with a WaitGroup is calling Add or Done at the wrong place. There are two points to keep in mind:

  • Add must be called before the program has a chance to run Wait. That implies that you cannot call Add inside the goroutine you are waiting for using the WaitGroup. There is no guarantee that the goroutine will run before Wait is called.
  • Done must be called eventually. The safest way to do it is to use a defer statement inside the goroutine, so if the goroutine logic changes in time or it returns in an unexpected way (such as a panic), Done is called.

Sometimes using a wait group and channels together can cause some chicken or the egg problems: you have to close a channel after Wait, but Wait will not terminate unless you close the channel. Look at the following program:

 1: func main() {
 2:    ch := make(chan int)
 3:    var wg sync.WaitGroup
 4:    for i := 0; i < 10; i++ {
 5:        wg.Add(1)
 6:        go func(i int) {
 7:            defer wg.Done()
 8:            ch <- i
 9:        }(i)
10:    }
11:    // There is no goroutine reading from ch
12:    // None of the goroutines will return
13:    // so this will deadlock at Wait below
14:    wg.Wait()
15:    close(ch)
16:    for i := range ch {
17:        fmt.Println(i)
18:    }
19: }

One possible solution is to put the for loop at lines 16-18 into a separate goroutine before Wait, so there will be a goroutine reading from the channels. Since the channels will be read, all goroutines will terminate, which will release the wg.Wait, and close the channel, terminating the reader for loop:

go func() {
     for i := range ch {
           fmt.Println(i)
     }
}()
wg.Wait()
close(ch)

Another solution is as follows:

go func() {
     wg.Wait()
     close(ch)
}()
for i := range ch {
     fmt.Println(i)
}

The wait group is now waiting inside another goroutine, and after all the waited-for goroutines return, it closes the channel.

 

Condition variables

Condition variables differ from the previous concurrency primitives in the sense that, for Go, they are not an essential concurrency tool because, in most cases, a condition variable can be replaced with a channel. However, especially for shared memory systems, condition variables are important tools for synchronization. For example, the Java language builds one of its core synchronization features using condition variables.

A well-known problem of concurrent computing is the producer-consumer problem. There are one or more producer threads that produce a value. These values are consumed by one or more consumer threads. Since all producers and consumers are running concurrently, sometimes there are not enough values produced to satisfy all the consumers, and sometimes there are not enough consumers to consume values produced by the producers. There is usually a finite queue of values into which producers put, and from which consumers retrieve. There is already an elegant solution to this problem: use a channel. All producers write to the channel, and all consumers read from it, and the problem is solved. But in a shared memory system, a condition variable is usually employed for such a situation. A condition variable is a synchronization mechanism where multiple goroutines wait for a condition to occur, and another goroutine announces the occurrence of the condition to the waiting goroutines.

A condition variable supports three operations, as follows:

  • Wait: Blocks the current goroutine until a condition happens
  • Signal: Wakes up one of the waiting goroutines when the condition happens
  • Broadcast: Wakes up all of the waiting goroutines when the condition happens

Unlike the other concurrency primitives, a condition variable needs a mutex. The mutex is used to lock the critical sections in the goroutines that modify the condition. It does not matter what the condition is; what matters is that the condition can only be modified in a critical section and that critical section must be entered by locking the mutex used to construct the condition variable, as shown in the following code:

lock := sync.Mutex{}
cond := sync.NewCond(&lock)

Now let’s implement the producers-consumers problem using this condition variable. Our producers will produce integers and place them in a circular queue. The queue has a finite capacity, so the producer must wait until a consumer consumes from the queue if the queue is full. That means we need a condition variable that will cause the producers to wait until a consumer consumes a value. When the consumer consumes a value, the queue will have more space, and the producer can use it, but then the consumer who consumed that value has to signal the waiting producers that there is space available. Similarly, if consumers consume all the values before producers can produce new ones, the consumers have to wait until new values are available. So, we need another condition variable that will cause the consumers to wait until a producer produces a value. When a producer produces a new value, it has to signal to the waiting consumers that a new value is available.

Let’s start with a simple circular queue implementation:

type Queue struct {
    elements    []int
    front, rear int
    len         int
}
// NewQueue initializes an empty circular queue 
//with the given capacity
func NewQueue(capacity int) *Queue {
    return &Queue{
    elements: make([]int, capacity),
    front:    0,  // Read from elements[front]
    rear:     -1, // Write to elements[rear]
    len:      0,
    }
}
// Enqueue adds a value to the queue. Returns false 
// if queue is full
func (q *Queue) Enqueue(value int) bool {
    if q.len == len(q.elements) {
         return false
    }
    // Advance the write pointer, go around in a circle
    q.rear = (q.rear + 1) % len(q.elements)
    // Write the value
    q.elements[q.rear] = value
    q.len++
    return true
}
// Dequeue removes a value from the queue. Returns 0,false 
// if queue is empty
func (q *Queue) Dequeue() (int, bool) {
    if q.len == 0 {
          return 0, false
    }
    // Read the value at the read pointer
    data := q.elements[q.front]
    // Advance the read pointer, go around in a circle
    q.front = (q.front + 1) % len(q.elements)
    q.len--
    return data, true
}

We need a lock, two condition variables, and a circular queue:

func main() {
     lock := sync.Mutex{}
     fullCond := sync.NewCond(&lock)
     emptyCond := sync.NewCond(&lock)
     queue := NewQueue(10)

Here is the producer function. It runs in an infinite loop, producing random integer values:

producer := func() {
    for {
        // Produce value
        value := rand.Int()
        lock.Lock()
        for !queue.Enqueue(value) {
            fmt.Println("Queue is full")
            fullCond.Wait()
        }
        lock.Unlock()
        emptyCond.Signal()
        time.Sleep(time.Millisecond *
          time.Duration(rand.Intn(1000)))
    }
}

The producer generates a random integer, enters into its critical section, and attempts to enqueue the value. If it is successful, it unlocks the mutex and signals one of the consumers, letting it know that a value has been generated. If there are no consumers waiting on the emptyCond variable at that point, the signal is lost. If, however, the queue is full, then the producer starts waiting on the fullCond variable. Note that Wait is called in the critical section, with the mutex locked. When called, Wait atomically unlocks the mutex and suspends the execution of the goroutine. While waiting, the producer is no longer in its critical section, allowing the consumers to go into their own critical sections. When a consumer consumes a value, it will signal fullCond, which will wake one of the waiting producers up. When the producer wakes up, it will lock the mutex again. Waking up and locking the mutex is not atomic, which means, when Wait returns, the condition that woke up the goroutine may no longer hold, so Wait must be called inside a loop to recheck the condition. When the condition is rechecked, the goroutine will be in its critical section again, so no race conditions are possible.

The consumer is as follows:

consumer := func() {
for {
     lock.Lock()
     var v int
     for {
           var ok bool
           if v, ok = queue.Dequeue(); !ok {
                fmt.Println("Queue is empty")
                emptyCond.Wait()
                continue
           }
           break
}
     lock.Unlock()
     fullCond.Signal()
     time.Sleep(time.Millisecond * 
       time.Duration(rand.Intn(1000)))
     fmt.Println(v)
     }
}

Note the symmetry between the producer and the consumer. The consumer enters into its critical section and attempts to dequeue a value inside a for loop. If the queue has a value in it, it is read, the for loop terminates, and the mutex is unlocked. Then the goroutine notifies any potential producers that a value is read from the queue, so it is likely that the queue is not full. By the time the consumer exists in its critical section and signals the producer, it is possible that another producer produced values to fill up the queue. That’s why the producer has to check the condition again when it wakes up. The same logic applies to the consumer: if the consumer cannot read a value, it starts waiting, and when it wakes up, it has to check whether the queue has elements in it to be consumed.

The rest of the program is as follows:

for i := 0; i < 10; i++ {
     go producer()
}
for i := 0; i < 10; i++ {
     go consumer()
}
select {} // Wait indefinitely

You can run this program with different numbers of producers and consumers, and see how it behaves. When there are more producers than consumers, you should see more messages on the queue being full, and when there are more consumers than producers, you should see more messages on the queue being empty.

 

Summary

In this chapter, we introduced goroutines and channels, the two concurrency primitives supported by the Go language, as well as some of the fundamental synchronization primitives in the Go library. These primitives will be used in the next chapter to solve some of the popular concurrency problems.

 

Questions

  1. Can you implement a mutex using channels? How about an RWMutex mutex?
  2. Most condition variables can be replaced by channels. How would you implement Broadcast using channels?
About the Author
  • Burak Serdar

    Burak Serdar is a software engineer with over 30 years of experience in designing and developing distributed enterprise applications that scale. He's worked for several start-ups and large corporations, including Thomson and Red Hat, as an engineer and technical lead. He's one of the co-founders of Cloud Privacy Labs where he works on semantic interoperability and privacy technologies for centralized and decentralized systems. Burak holds BSc and MSc degrees in electrical and electronics engineering, and an MSc degree in computer science.

    Browse publications by this author
Effective Concurrency in Go
Unlock this book and the full library FREE for 7 days
Start now