Örneğin, bir seferde en fazla 3 işi işlemek istediğinizi ve programdan çıkmadan önce tüm işlerin bitmesini beklemeyi planladığınızı hayal edelim. Bunun için arabellekli kanal ("buffered channel") kullanacağız. Birinci örnek (tavsiyem) kontrol amaçlı olarak "sync.WaitGroup" kullanırken, ikinci örnek birden fazla kanal kullanıyor. Her iki örnekte de bir seferde en fazla 3 goroutine çalışacaktır.


Arabellekli kanallar bir anda en fazla kaç tane kaynak için yer ayrılacağını kontrol ederler. Eğer dolu olurlarsa, boş yer açılana kadar sistemi bloke ederler. Şu şekilde kullanılırlar: make(chan TYPE, n).


Örnek 1


WaitGroup, mevcut olan tüm goroutinelerin bitmelerini bekler. Hepsi bittiğinde blokeyi kaldırır ve uygulama normal bir şekilde çalışmaya devam eder.


package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

var (
// wg is used to force the application wait for all goroutines to finish before exiting.
wg sync.WaitGroup
// ch is a buffered channel that has the capacity of maximum 3 resource slot.
ch = make(chan bool, 3)
// waiters is used to make goroutines sleep in order to simulate time it took to process the job.
waiters = []int{0, 1, 2, 3, 4}
// jobs are the jobs we are going to process as in batch.
jobs = [10]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)

func main() {
// Prevent picking up the same random number all the time for sleeping.
rand.Seed(time.Now().UnixNano())

fmt.Println("BEGIN")

// Tell how many jobs to be processed in total before exiting the app.
wg.Add(len(jobs))

for _, job := range jobs {
go process(job, ch, &wg)
}

// Block exiting until all the goroutines are finished.
wg.Wait()

fmt.Println("END")
}

func process(job string, ch chan bool, wg *sync.WaitGroup) {
// As soon as the current goroutine finishes (job done!), notify back WaitGroup.
defer wg.Done()

// Acquire 1 resource to fill in the channel buffer. Once the channel buffer is full, it blocks `range` loop.
ch <- true

fmt.Println(job, "in")
time.Sleep(time.Duration(rand.Intn(len(waiters))) * time.Second)
fmt.Println(job, "out")

// Release 1 resource to free up the channel buffer slot.
<- ch
}

Çıktı


BEGIN
a in
a out
b in
b out
c in
c out
e in
d in
f in
f out
g in
g out
h in
e out
i in
i out
j in
j out
d out
h out
END

Kanıt


Aşağıda görüldüğü gibi, kanalda aynı anda en fazla 3 iş var.



Örnek 2


package main

import (
"fmt"
"math/rand"
"time"
)

var (
// concurrency defines maximum how many concurrent job processors(goroutines) should be run.
concurrency = 3
// jobInChan is a buffered channel that has the capacity of maximum 3 resource slot.
jobInChan = make(chan string, 3)
// jobOutChan is a buffered channel that has the capacity for total jobs amount.
jobOutChan = make(chan bool, len(jobs))
// waiters is used to make goroutines sleep in order to simulate time it took to process the job.
waiters = []int{0, 1, 2, 3, 4}
// jobs contains the resources we are going to process.
jobs = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
)

func main() {
// Prevent picking up the same random number all the time for sleeping.
rand.Seed(time.Now().UnixNano())

fmt.Println("BEGIN")

// Create 3 concurrent processor goroutines.
for i := 1; i <= concurrency; i++ {
go processor(jobInChan, jobOutChan)
}

// Fill in the buffered channel one by one when there is slot available.
for _, job := range jobs {
jobInChan <- job
}

// Block until `jobOutChan` is not full.
for {
if len(jobOutChan) == len(jobs) {
break
}
}

fmt.Println("END")
}

func processor(jobInChan <- chan string, jobOutChan chan <- bool) {
// Wait for resources to come in.
for job := range jobInChan {
fmt.Println(job, "in")
time.Sleep(time.Duration(rand.Intn(len(waiters))) * time.Second)
fmt.Println(job, "out")

// Fill in `jobOutChan` channel by one resource as soon as the new resource is processed.
jobOutChan <- true
}
}

Çıktı


BEGIN
b in
a in
c in
b out
d in
a out
c out
e in
f in
e out
g in
d out
h in
h out
i in
f out
j in
g out
i out
j out
END

Kanıt


Aşağıda görüldüğü gibi, kanalda aynı anda en fazla 3 iş var.