Say for example you want to process maximum 3 jobs at a time and wait for all the jobs to finish before exiting the program. For this we are going to use "buffered channels". The first example (preferred) relies on "sync.WaitGroup" to control the process whereas the second one uses multiple channels. In both examples there will be maximum of 3 goroutines running at a time.


Buffered channels are used to limit maximum how many slots available in the channel for resources. If the buffer full, it will become a blocking channel until one is released from the buffer. It is created like this: make(chan TYPE, n).


Example 1


A WaitGroup waits for a collection of goroutines to finish. Once all done, it stops waiting and moves on to the next stage in the application.


package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

var (
// wg is used to force the application wait for all goroutines to finish before exiting.
wg sync.WaitGroup
// ch is a buffered channel that has the capacity of maximum 3 resource slot.
ch = make(chan bool, 3)
// waiters is used to make goroutines sleep in order to simulate time it took to process the job.
waiters = []int{0, 1, 2, 3, 4}
// jobs are the jobs we are going to process as in batch.
jobs = [10]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)

func main() {
// Prevent picking up the same random number all the time for sleeping.
rand.Seed(time.Now().UnixNano())

fmt.Println("BEGIN")

// Tell how many jobs to be processed in total before exiting the app.
wg.Add(len(jobs))

for _, job := range jobs {
go process(job, ch, &wg)
}

// Block exiting until all the goroutines are finished.
wg.Wait()

fmt.Println("END")
}

func process(job string, ch chan bool, wg *sync.WaitGroup) {
// As soon as the current goroutine finishes (job done!), notify back WaitGroup.
defer wg.Done()

// Acquire 1 resource to fill in the channel buffer. Once the channel buffer is full, it blocks `range` loop.
ch <- true

fmt.Println(job, "in")
time.Sleep(time.Duration(rand.Intn(len(waiters))) * time.Second)
fmt.Println(job, "out")

// Release 1 resource to free up the channel buffer slot.
<- ch
}

Output


BEGIN
a in
a out
b in
b out
c in
c out
e in
d in
f in
f out
g in
g out
h in
e out
i in
i out
j in
j out
d out
h out
END

Proof


As seen below, there is maximum 3 jobs in channel at a time.



Example 2


package main

import (
"fmt"
"math/rand"
"time"
)

var (
// concurrency defines maximum how many concurrent job processors(goroutines) should be run.
concurrency = 3
// jobInChan is a buffered channel that has the capacity of maximum 3 resource slot.
jobInChan = make(chan string, 3)
// jobOutChan is a buffered channel that has the capacity for total jobs amount.
jobOutChan = make(chan bool, len(jobs))
// waiters is used to make goroutines sleep in order to simulate time it took to process the job.
waiters = []int{0, 1, 2, 3, 4}
// jobs contains the resources we are going to process.
jobs = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)

func main() {
// Prevent picking up the same random number all the time for sleeping.
rand.Seed(time.Now().UnixNano())

fmt.Println("BEGIN")

// Create 3 concurrent processor goroutines.
for i := 1; i <= concurrency; i++ {
go processor(jobInChan, jobOutChan)
}

// Fill in the buffered channel one by one when there is slot available.
for _, job := range jobs {
jobInChan <- job
}

// Block until `jobOutChan` is not full.
for {
if len(jobOutChan) == len(jobs) {
break
}
}

fmt.Println("END")
}

func processor(jobInChan <- chan string, jobOutChan chan <- bool) {
// Wait for resources to come in.
for job := range jobInChan {
fmt.Println(job, "in")
time.Sleep(time.Duration(rand.Intn(len(waiters))) * time.Second)
fmt.Println(job, "out")

// Fill in `jobOutChan` channel by one resource as soon as the new resource is processed.
jobOutChan <- true
}
}

Output


BEGIN
b in
a in
c in
b out
d in
a out
c out
e in
f in
e out
g in
d out
h in
h out
i in
f out
j in
g out
i out
j out
END

Proof


As seen below, there is maximum 3 jobs in channel at a time.