Dwi Wahyudi
Senior Software Engineer (Ruby, Golang, Java)
Concurrency, processing code asynchronously in Golang is easy to setup and use. In this post I will try to write some code about Goroutine and Channel in Golang.
Overview
- Let say in our code, we need 600 emails to send per minute, these emails are invoices or marketing emails to our customers.
- Each mail for example may take up to 1 second to be sent, it might consist of querying data to database, calculating some numbers, composing the email, etc, and finally send the email to a customer.
- For 600 emails, we might need up to 600 seconds for them to complete, if we send them sequentially one by one (synchronously). This is too expensive for any business/product. It’s like a bottlenecking queue.
Go offers concurrency by default (instead of parallelism) which will run tasks in interleaving manner / context switching in a single CPU and it’s very fast as if multiple things are executed at the same time. And Go adopt CSP for its concurrency model. https://en.wikipedia.org/wiki/Communicating_sequential_processes
So in Go, we can imagine a goroutine is a cashier that has high speed of processing things. In Go however, we can achieve parallelism by utilizing GOMAXPROCS
which by default will be set by numbers of cores. So we get the parallelism for those concurrent goroutines for free. So if we have 8 cores in our machine, Go will run goroutines in all of those 8 cores, by default (of course we can lower the number if we want).
- The case in software development can be anything else, like this, imagine these sequential report email tasks:
- interacting with database,
- fetching data from 3rd party API,
- processing some images, processing some files,
- processing some reports,
- processing some json data,
- etc, anything that can be processed in concurrently/batch.
- With concurrency, 600 emails (dealing all of them at once) can be sent in only 1 to 3 seconds (with the assumption that we have enough resource: bandwidth, CPU, memory, etc).
- In general, we need to send those processes (in concurrent) to another process/program/service/thread without blocking the main code/program, in Java we can use
Thread
, in Ruby on Rails we usually use Sidekiq. - We can also utilize RabbitMQ (or any message broker or queue service), let say the main program in Ruby on Rails send a message to send email to RabbitMQ, while another running Java program wait for this message, and finally process the email and send it. That Rails app won’t need to wait for email sending to complete, it just sends the message and continue other tasks.
In Golang, we can utilize RabbitMQ or any other message broker or queue service, but Golang has its own utilities for concurrency, they are Goroutine and Channel. Goroutine is simple and lightweight thread, while Channel is a pipe that we can send to or receive data from.
Our Code for Sample
Now, let’s get to the code, let’s init new Golang app.
go mod init github.com/dwahyudi/golang-goroutine-channel
Just pretend that triple
function here is another method for sending email or calculating some numbers, etc that needs some time (let say 2 seconds) to complete. Let’s simulate the long process with time.sleep
.
For debugging purpose we also create calcTripleAndPrint
function.
func triple(num int) int {
// Simulate long time processing, example: sending email, calculating some numbers,
// inserting into database, etc anything that can be processed concurrently.
time.Sleep(2 * time.Second)
return num * 3
}
func calcTripleAndPrint(num int) {
tripledNum := triple(num)
fmt.Println(strconv.Itoa(num) + " tripled: " + strconv.Itoa(tripledNum))
}
func otherTasks() {
fmt.Println("Another important tasks")
}
Blocking Example
Here is our code that is blocking, in total blockingDemo()
will require 2 seconds * 20 = 40 seconds to complete.
otherTasks()
will not be run untilblockingDemo()
is completed.calcTripleAndPrint
will run one after another in sequence.time.Sleep(22 * time.Second)
is there so thatmain
function for the app won’t terminate early. In real world example, we may place a web server there. Or we may usesync.WaitGroups
, more on that later.
func main() {
// Blocking
blockingDemo()
otherTasks()
time.Sleep(22 * time.Second)
}
func blockingDemo() {
for i := 1; i <= 20; i++ {
calcTripleAndPrint(i)
}
}
Goroutine
Goroutine is a simple and lightweight thread. When we use it, we fairly send the code to another thread without blocking the main code. We can just for example create 5000 Goroutines concurrently, and continue other task without being blocked. Main thread shouldn’t know whether each of them succeed or failed, if main goroutine don’t need to wait for the result.
So instead of processing it one by one, we can do it in concurrent with Goroutine. The syntax is pretty simple, just add go
before calling the function, like go calcTripleAndPrint(i)
.
Let’s use this code instead of the blocking version.
otherTasks()
will be run immediately without waiting forgoroutineDemo()
.calcTripleAndPrint
will run for all 20 iterations at once, all of them just need 2 seconds to complete.
func main() {
// Blocking
//blockingDemo()
// Goroutine
goroutineDemo()
otherTasks()
time.Sleep(22 * time.Second)
}
func goroutineDemo() {
for i := 1; i <= 20; i++ {
go calcTripleAndPrint(i)
}
}
Here we see that the execution of each Goroutine completed in random order, because each Goroutine runs independently without blocking/waiting others.
Channel
Channel on the other hand is like a pipe, that sends data in and out. We can let say, pass a channel to function A, let function A send data to that channel, and finally let another code fetch data from that channel given by function A.
Channel syntax is fairly simple:
channel := make(chan int) // Create a channel with int data type
channel := make(chan string) // Create a channel with string data type
channel <- var1 // Send var1 to channel.
var2 := <-channel // Receive from channel, and
// assign value to var2.
// var1 and var2 must be int.
<-channel // This is valid way of receiving.
With this in mind, let’s create and utilize Channel.
Here is the sending function:
func calcTripleToChannel(num int, channel chan int) {
tripledNum := triple(num)
channel <- tripledNum
}
func sendToChannel(channel chan int) {
for i := 1; i <= 10; i++ {
fmt.Println("Emitting " + strconv.Itoa(i))
go calcTripleToChannel(i, channel)
}
}
Basically we pass a number and channel to calcTripleToChannel
. Function calcTripleToChannel
will just call triple()
function, triple the value and send the value to channel. Sending alone won’t be enough, we must receive those values from channel.
func receiveFromChannel(channel chan int) {
for i := 1; i <= 10; i++ {
tripledNum, ok := <-channel
if ok {
fmt.Println("Receiving from channel: " + strconv.Itoa(tripledNum))
} else {
fmt.Println("Channel Closed")
}
}
}
Here with tripledNum, ok := <-channel
, we are just receiving the value from channel into variable tripledNum
, and print it out to console.
We can actually receive data from Channel with tripledNum := <-channel
, meanwhile ok
is there to check whether that Channel is already closed or still active/open.
Now run these from main
function.
func main() {
// With channel
channel := make(chan int)
go sendToChannel(channel)
go receiveFromChannel(channel)
otherTasks()
time.Sleep(22 * time.Second)
}
Channel and Range
Golang also specifies (for convenience) that we can also receive data from Channel with range
.
func receiveWithRange(channel chan int) {
for tripledNum := range channel {
fmt.Println(strconv.Itoa(tripledNum))
}
}
Result should be expected:
Channel Close
Golang also specifies that we developer may close a Channel. We don’t have to, but if we need to, there is close
syntax for closing channel. After closing the channel we cannot send and receive from it.
close(channel)
Here is a simple example:
func closeChannelDemo() {
channel := make(chan int)
go calcTripleToChannel(3, channel)
// int3 is 3, ok should be true
int3, ok := <-channel
fmt.Println(int3)
go calcTripleToChannel(5, channel)
// int5 is 5, ok should be true
int5, ok := <-channel
fmt.Println(int5)
close(channel)
// Error, channel already closed
//go calcTripleToChannel(10, channel)
// Channel already closed, ok is false
_, ok = <-channel
fmt.Println(strconv.FormatBool(ok))
}
Call this code in main
function, simply with go closeChannelDemo()
.
Channel Select
There is another feature from Golang, select
, that we can use to wait on multiple Channels, and receive one data from them selectively, just like switch case.
Here is a sample code, here we add another function, aVeryLongTimeProcess
to simulate a code with longer processing time.
func aVeryLongTimeProcess(channel chan string) {
time.Sleep(8 * time.Second)
channel <- "After 8 seconds"
}
func selectDemo() {
channel3 := make(chan int)
go calcTripleToChannel(3, channel3)
channel4 := make(chan string)
go aVeryLongTimeProcess(channel4)
select {
case int1 := <-channel3:
fmt.Println("Received from channel3, value: " + strconv.Itoa(int1))
case text := <-channel4:
fmt.Println(text)
}
}
Notice different type of Channel, one Channel is int
, another one is string
.
Above select
statement will wait until any Channel in cases has available data, and execute the case statement based on that Channel.
calcTripleToChannel
needs 2 seconds to send data, so when we run this, after 2 seconds, select
statement will surely fetch data from channel3
. Once one of the case statement completed, select statement is done, just like switch
case statement.
channel4
won’t be fetched here (it takes 8 seconds to complete), we will need to run the select
statement again, or fetch data from that Channel in another way.
Channel is Blocking !!
As we see earlier, Channel is a great tool for sending values between Goroutine, but be aware. Like I’ve mentioned above, Sending and receiving to and from channel are blocking.
func blockingChannelDemo() {
channel := make(chan int)
calcTripleToChannel(3, channel)
calcTripleToChannel(30, channel)
num1, num2 := <-channel, <-channel
fmt.Println(strconv.Itoa(num1) + " " + strconv.Itoa(num2))
}
Do not run this code as it will throw deadlock error.
We must send the blocking parts to Goroutine, call go calcTripleToChannel
.
If we still want to do this, we can utilize buffer in Channel. By default, Channel has 0 buffer size, it blocks right away.
Buffered Channel
When we give buffer to a Channel, as long as the Channel has remaining available buffer, it won’t throw previous error.
func bufferedChannelDemo() {
channel := make(chan int, 2)
calcTripleToChannel(3, channel) // Take 1 buffer, 1 buffer remaining
calcTripleToChannel(30, channel) // Take another 1 buffer, no buffer remaining
// Error, blocking again, do not uncomment this
//calcTripleToChannel(50, channel)
num1, num2 := <-channel, <-channel // Receiving from Channel, 2 buffers remaining
fmt.Println(strconv.Itoa(num1) + " " + strconv.Itoa(num2))
}
Now this code can run safely, again we should just put call to calcTripleToChannel
in another Goroutine, why? Because above code will take 4 seconds (calls to calcTripleToChannel
are synchronous, again, blocking) to complete against 2 seconds if we put calcTripleToChannel
in other Goroutines.
sync.WaitGroups
Another option, and sometimes recommended for us to use, is sync.WaitGroups
. It is essentially a way to wait for multiple Goroutines to be done/completed. It is just an await mechanism based on a counter.
var waitGroups sync.WaitGroup
waitGroups.Add(1) // counter is 1
waitGroups.Add(1) // counter is 2
waitGroups.Add(1) // counter is 3
waitGroups.Done() // counter is 2
waitGroups.Done() // counter is 1
time.Sleep(2 * time.Second)
waitGroups.Done() // counter is back to 0
waitGroups.Wait() // blocking for 2 seconds, will wait until counter is 0
fmt.Println("Marco")
We can use sync.WaitGroups
for multiple Channels, just make sure to add waitGroups.Add(1)
per Goroutine launch and waitGroup.Done()
on each completed Goroutine.
func calcTripleAndPrintWithWaitGroup(num int, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
calcTripleAndPrint(num)
}
func waitGroupsDemo() {
// ....
for i := 1; i <= 10; i++ {
waitGroups.Add(1)
go calcTripleAndPrintWithWaitGroup(i, &waitGroups)
}
waitGroups.Wait()
fmt.Println("Polo")
}
Here is the repository for the all samples: https://github.com/dwahyudi/golang-goroutine-channel