In this example we are going to pass bulk set of jobs to 2 goroutines and process them concurrently. A goroutine runs to finish all jobs in given bulk set of jobs first before moving on to the next bulk.


Example


package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

var (
// wg is used to force the application wait for all goroutines to finish before exiting.
wg sync.WaitGroup
// batchSize defines the batch size where each job batch will contain maximum certain number of jobs.
batchSize = 3
// batchInChan is a unbuffered channel that has the capacity of 1 resource(batch) slot.
batchInChan = make(chan []string)
// jobs contains the resources we are going to process.
jobs = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
// waiters is used to make goroutines sleep in order to simulate time it took to process the job.
waiters = []int{0, 1, 2, 3, 4}
)

func main() {
// Prevent picking up the same random number all the time for sleeping.
rand.Seed(time.Now().UnixNano())

fmt.Println("BEGIN")

// Create 2 goroutines.
wg.Add(2)
go processor(1, batchInChan, &wg)
go processor(2, batchInChan, &wg)

// Create job batch and push them into `batchInChan` channel.
for i := 0; i < len(jobs); i += batchSize {
j := i + batchSize
if j > len(jobs) {
j = len(jobs)
}

batchInChan <- jobs[i:j]
}

// Close channel to remove the lock.
close(batchInChan)

// Block exiting until all the goroutines are finished.
wg.Wait()

fmt.Println("END")
}

func processor(id int, batchInChan <- chan []string, wg *sync.WaitGroup) {
// As soon as the current goroutine finishes (job done!), notify back WaitGroup.
defer wg.Done()

b := 1

// Listen on `batchInChan` to see if there is any resource pending in it.
for batch := range batchInChan {
for _, job := range batch {
fmt.Println("processor:", id, "batch:", b, "job:", job, "- started")
wait := rand.Intn(len(waiters))
time.Sleep(time.Duration(wait) * time.Second)
fmt.Println("processor:", id, "batch:", b, "job:", job, "- finished in", wait)
}

b++
}
}

Output


BEGIN
processor: 2 batch: 1 job: d - started
processor: 1 batch: 1 job: a - started
processor: 1 batch: 1 job: a - finished in 0
processor: 1 batch: 1 job: b - started
processor: 1 batch: 1 job: b - finished in 1
processor: 1 batch: 1 job: c - started
processor: 2 batch: 1 job: d - finished in 3
processor: 2 batch: 1 job: e - started
processor: 2 batch: 1 job: e - finished in 0
processor: 2 batch: 1 job: f - started
processor: 1 batch: 1 job: c - finished in 2
processor: 1 batch: 2 job: g - started
processor: 1 batch: 2 job: g - finished in 2
processor: 1 batch: 2 job: h - started
processor: 2 batch: 1 job: f - finished in 4
processor: 2 batch: 2 job: j - started
processor: 1 batch: 2 job: h - finished in 4
processor: 1 batch: 2 job: i - started
processor: 2 batch: 2 job: j - finished in 3
processor: 1 batch: 2 job: i - finished in 3
END