Dwi Wahyudi

Senior Software Engineer (Ruby, Golang, Java)


Concurrency, processing code asynchronously in Golang is easy to setup and use. In this post I will try to write some code about Goroutine and Channel in Golang.

Overview

  • Let say in our code, we need 600 emails to send per minute, these emails are invoices or marketing emails to our customers.
  • Each mail for example may take up to 1 second to be sent, it might consist of querying data to database, calculating some numbers, composing the email, etc, and finally send the email to a customer.
  • For 600 emails, we might need up to 600 seconds for them to complete, if we send them sequentially one by one (synchronously). This is too expensive for any business/product. It’s like a bottlenecking queue.

Go offers concurrency by default (instead of parallelism) which will run tasks in interleaving manner / context switching in a single CPU and it’s very fast as if multiple things are executed at the same time. And Go adopt CSP for its concurrency model. https://en.wikipedia.org/wiki/Communicating_sequential_processes

So in Go, we can imagine a goroutine is a cashier that has high speed of processing things. In Go however, we can achieve parallelism by utilizing GOMAXPROCS which by default will be set by numbers of cores. So we get the parallelism for those concurrent goroutines for free. So if we have 8 cores in our machine, Go will run goroutines in all of those 8 cores, by default (of course we can lower the number if we want).

  • The case in software development can be anything else, like this, imagine these sequential report email tasks:
    1. interacting with database,
    2. fetching data from 3rd party API,
    3. processing some images, processing some files,
    4. processing some reports,
    5. processing some json data,
    6. etc, anything that can be processed in concurrently/batch.
  • With concurrency, 600 emails (dealing all of them at once) can be sent in only 1 to 3 seconds (with the assumption that we have enough resource: bandwidth, CPU, memory, etc).
  • In general, we need to send those processes (in concurrent) to another process/program/service/thread without blocking the main code/program, in Java we can use Thread, in Ruby on Rails we usually use Sidekiq.
  • We can also utilize RabbitMQ (or any message broker or queue service), let say the main program in Ruby on Rails send a message to send email to RabbitMQ, while another running Java program wait for this message, and finally process the email and send it. That Rails app won’t need to wait for email sending to complete, it just sends the message and continue other tasks.

In Golang, we can utilize RabbitMQ or any other message broker or queue service, but Golang has its own utilities for concurrency, they are Goroutine and Channel. Goroutine is simple and lightweight thread, while Channel is a pipe that we can send to or receive data from.

Our Code for Sample

Now, let’s get to the code, let’s init new Golang app.

go mod init github.com/dwahyudi/golang-goroutine-channel

Just pretend that triple function here is another method for sending email or calculating some numbers, etc that needs some time (let say 2 seconds) to complete. Let’s simulate the long process with time.sleep.

For debugging purpose we also create calcTripleAndPrint function.

func triple(num int) int {
	// Simulate long time processing, example: sending email, calculating some numbers,
	// inserting into database, etc anything that can be processed concurrently.
	time.Sleep(2 * time.Second)

	return num * 3
}

func calcTripleAndPrint(num int) {
	tripledNum := triple(num)
	fmt.Println(strconv.Itoa(num) + " tripled: " + strconv.Itoa(tripledNum))
}

func otherTasks() {
	fmt.Println("Another important tasks")
}

Blocking Example

Here is our code that is blocking, in total blockingDemo() will require 2 seconds * 20 = 40 seconds to complete.

  • otherTasks() will not be run until blockingDemo() is completed.
  • calcTripleAndPrint will run one after another in sequence.
  • time.Sleep(22 * time.Second) is there so that main function for the app won’t terminate early. In real world example, we may place a web server there. Or we may use sync.WaitGroups, more on that later.
func main() {
	// Blocking
	blockingDemo()

	otherTasks()
	time.Sleep(22 * time.Second)
}

func blockingDemo() {
	for i := 1; i <= 20; i++ {
		calcTripleAndPrint(i)
	}
}

Goroutine

Goroutine is a simple and lightweight thread. When we use it, we fairly send the code to another thread without blocking the main code. We can just for example create 5000 Goroutines concurrently, and continue other task without being blocked. Main thread shouldn’t know whether each of them succeed or failed, if main goroutine don’t need to wait for the result.

So instead of processing it one by one, we can do it in concurrent with Goroutine. The syntax is pretty simple, just add go before calling the function, like go calcTripleAndPrint(i).

Let’s use this code instead of the blocking version.

  • otherTasks() will be run immediately without waiting for goroutineDemo().
  • calcTripleAndPrint will run for all 20 iterations at once, all of them just need 2 seconds to complete.
func main() {
	// Blocking
	//blockingDemo()

	// Goroutine
	goroutineDemo()

	otherTasks()
	time.Sleep(22 * time.Second)
}

func goroutineDemo() {
	for i := 1; i <= 20; i++ {
		go calcTripleAndPrint(i)
	}
}

Here we see that the execution of each Goroutine completed in random order, because each Goroutine runs independently without blocking/waiting others.

Channel

Channel on the other hand is like a pipe, that sends data in and out. We can let say, pass a channel to function A, let function A send data to that channel, and finally let another code fetch data from that channel given by function A.

Channel syntax is fairly simple:

channel := make(chan int) // Create a channel with int data type
channel := make(chan string) // Create a channel with string data type

channel <- var1   // Send var1 to channel.
var2 := <-channel // Receive from channel, and
                  // assign value to var2.
                  // var1 and var2 must be int.

<-channel         // This is valid way of receiving.

With this in mind, let’s create and utilize Channel.

Here is the sending function:

func calcTripleToChannel(num int, channel chan int) {
	tripledNum := triple(num)
	channel <- tripledNum
}

func sendToChannel(channel chan int) {
	for i := 1; i <= 10; i++ {
		fmt.Println("Emitting " + strconv.Itoa(i))
		go calcTripleToChannel(i, channel)
	}
}

Basically we pass a number and channel to calcTripleToChannel. Function calcTripleToChannel will just call triple() function, triple the value and send the value to channel. Sending alone won’t be enough, we must receive those values from channel.

func receiveFromChannel(channel chan int) {
	for i := 1; i <= 10; i++ {
		tripledNum, ok := <-channel
		if ok {
			fmt.Println("Receiving from channel: " + strconv.Itoa(tripledNum))
		} else {
			fmt.Println("Channel Closed")
		}
	}
}

Here with tripledNum, ok := <-channel, we are just receiving the value from channel into variable tripledNum, and print it out to console.

We can actually receive data from Channel with tripledNum := <-channel, meanwhile ok is there to check whether that Channel is already closed or still active/open.

Now run these from main function.

func main() {
	// With channel
	channel := make(chan int)
	go sendToChannel(channel)
	go receiveFromChannel(channel)

	otherTasks()
	time.Sleep(22 * time.Second)
}

Channel and Range

Golang also specifies (for convenience) that we can also receive data from Channel with range.

func receiveWithRange(channel chan int) {
	for tripledNum := range channel {
		fmt.Println(strconv.Itoa(tripledNum))
	}
}

Result should be expected:

Channel Close

Golang also specifies that we developer may close a Channel. We don’t have to, but if we need to, there is close syntax for closing channel. After closing the channel we cannot send and receive from it.

close(channel)

Here is a simple example:

func closeChannelDemo() {
	channel := make(chan int)

	go calcTripleToChannel(3, channel)
	// int3 is 3, ok should be true
	int3, ok := <-channel
	fmt.Println(int3)

	go calcTripleToChannel(5, channel)
	// int5 is 5, ok should be true
	int5, ok := <-channel
	fmt.Println(int5)

	close(channel)

	// Error, channel already closed
	//go calcTripleToChannel(10, channel)

	// Channel already closed, ok is false
	_, ok = <-channel
	fmt.Println(strconv.FormatBool(ok))
}

Call this code in main function, simply with go closeChannelDemo().

Channel Select

There is another feature from Golang, select, that we can use to wait on multiple Channels, and receive one data from them selectively, just like switch case.

Here is a sample code, here we add another function, aVeryLongTimeProcess to simulate a code with longer processing time.

func aVeryLongTimeProcess(channel chan string) {
	time.Sleep(8 * time.Second)
	channel <- "After 8 seconds"
}

func selectDemo() {
	channel3 := make(chan int)
	go calcTripleToChannel(3, channel3)
	channel4 := make(chan string)
	go aVeryLongTimeProcess(channel4)

	select {
	case int1 := <-channel3:
		fmt.Println("Received from channel3, value: " + strconv.Itoa(int1))
	case text := <-channel4:
		fmt.Println(text)
	}
}

Notice different type of Channel, one Channel is int, another one is string.

Above select statement will wait until any Channel in cases has available data, and execute the case statement based on that Channel.

calcTripleToChannel needs 2 seconds to send data, so when we run this, after 2 seconds, select statement will surely fetch data from channel3. Once one of the case statement completed, select statement is done, just like switch case statement.

channel4 won’t be fetched here (it takes 8 seconds to complete), we will need to run the select statement again, or fetch data from that Channel in another way.

Channel is Blocking !!

As we see earlier, Channel is a great tool for sending values between Goroutine, but be aware. Like I’ve mentioned above, Sending and receiving to and from channel are blocking.

func blockingChannelDemo() {
	channel := make(chan int)
	calcTripleToChannel(3, channel)
	calcTripleToChannel(30, channel)

	num1, num2 := <-channel, <-channel
	fmt.Println(strconv.Itoa(num1) + " " + strconv.Itoa(num2))
}

Do not run this code as it will throw deadlock error.

We must send the blocking parts to Goroutine, call go calcTripleToChannel.

If we still want to do this, we can utilize buffer in Channel. By default, Channel has 0 buffer size, it blocks right away.

Buffered Channel

When we give buffer to a Channel, as long as the Channel has remaining available buffer, it won’t throw previous error.

func bufferedChannelDemo() {
	channel := make(chan int, 2)
	calcTripleToChannel(3, channel) // Take 1 buffer, 1 buffer remaining
	calcTripleToChannel(30, channel) // Take another 1 buffer, no buffer remaining

	// Error, blocking again, do not uncomment this
	//calcTripleToChannel(50, channel)

	num1, num2 := <-channel, <-channel // Receiving from Channel, 2 buffers remaining
	fmt.Println(strconv.Itoa(num1) + " " + strconv.Itoa(num2))
}

Now this code can run safely, again we should just put call to calcTripleToChannel in another Goroutine, why? Because above code will take 4 seconds (calls to calcTripleToChannel are synchronous, again, blocking) to complete against 2 seconds if we put calcTripleToChannel in other Goroutines.

sync.WaitGroups

Another option, and sometimes recommended for us to use, is sync.WaitGroups. It is essentially a way to wait for multiple Goroutines to be done/completed. It is just an await mechanism based on a counter.

	var waitGroups sync.WaitGroup

	waitGroups.Add(1) // counter is 1
	waitGroups.Add(1) // counter is 2
	waitGroups.Add(1) // counter is 3
	waitGroups.Done() // counter is 2
	waitGroups.Done() // counter is 1
	time.Sleep(2 * time.Second)
	waitGroups.Done() // counter is back to 0

	waitGroups.Wait() // blocking for 2 seconds, will wait until counter is 0
	fmt.Println("Marco")

We can use sync.WaitGroups for multiple Channels, just make sure to add waitGroups.Add(1) per Goroutine launch and waitGroup.Done() on each completed Goroutine.

func calcTripleAndPrintWithWaitGroup(num int, waitGroup *sync.WaitGroup) {
	defer waitGroup.Done()
	calcTripleAndPrint(num)
}

func waitGroupsDemo() {
    // ....

	for i := 1; i <= 10; i++ {
		waitGroups.Add(1)
		go calcTripleAndPrintWithWaitGroup(i, &waitGroups)
	}

	waitGroups.Wait()
	fmt.Println("Polo")
}

Here is the repository for the all samples: https://github.com/dwahyudi/golang-goroutine-channel