Herkese merhaba!

Uzun yıllardır bol miktarda kişisel zaman ve enerji harcayarak bilgimizi hepinizle paylaşıyoruz. Ancak şu andan itibaren bu blogu çalışır durumda tutabilmek için yardımınıza ihtiyacımız var. Yapmanız gereken tek şey, sitedeki reklamlardan birine tıklamak olacaktır, aksi takdirde hosting vb. masraflar nedeniyle maalesef yayından kaldırılacaktır. Teşekkürler.

Bu örnekte, senkronize ve asenkronize iş işleme için kullanılacak bir paket oluşturacağız.


Paket


Bu paket vaat ettiği şeyi yapsa da, üç bonus özelliğe daha ihtiyacı var.



Bu örnek, işler için "arabelleksiz" kanalları kullanır. Ancak, işleri sınırlamak isterseniz, NewWorker() fonksiyonunu aşağıda gösterildiği gibi "arabellekli" kanalları kullanacak şekilde güncelleyebilirsiniz.


func NewWorker(workerTotal, jobTotal int) Worker {
return Worker{
total: workerTotal,
jobChan: make(chan Job, jobTotal),
resultChan: make(chan interface{}, jobTotal),
}
}

package worker

import (
"fmt"
"math/rand"
"time"
)

// -----------------------------------------------------------------------------

type Job struct {
// id represents the job identifier.
id interface{}
}

// NewJob returns a `Job` instance.
func NewJob(id interface{}) Job {
return Job{
id: id,
}
}

// -----------------------------------------------------------------------------

type Worker struct {
// total represents the amount of workers to be run at startup.
total int
// jobChan represents a two-way "unbuffered" channel that has unlimited
// capacity for the jobs.
jobChan chan Job
// resultChan represents a two-way "unbuffered" channel that has unlimited
// capacity for the job results.
resultChan chan interface{}
}

// NewWorker returns a `Worker` instance.
func NewWorker(workerTotal int) Worker {
return Worker{
total: workerTotal,
jobChan: make(chan Job),
resultChan: make(chan interface{}),
}
}

// Start brings up certain amount of worker(s) so that they can pick up and work
// on the job(s).
func (w Worker) Start() {
for i := 1; i <= w.total; i++ {
go w.run(i)
}
}

// Add adds a job to a channel so that it could be picked up and worked on by
// the running worker(s).
func (w Worker) Add(job Job) {
w.jobChan <- job
}

// Result returns a channel so that it could be ranged over in order to fetch
// job results from the running worker(s).
func (w Worker) Result() <-chan interface{} {
return w.resultChan
}

// run runs a worker and works on the job(s).
func (w Worker) run(id int) {
fmt.Println(id, "running...")

for {
select {
case job := <- w.jobChan:
fmt.Printf("%d picked up job %v @ %s\n", id, job.id, time.Now().UTC())

// Pretend like doing something.
rand.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(rand.Intn(len([]int{0, 1, 2, 3, 4}))) * time.Second)
// Done.

fmt.Printf("%d completed job %v @ %s\n", id, job.id, time.Now().UTC())
w.resultChan <- job.id
default:
time.Sleep(1 * time.Second)
fmt.Println(id, "waiting...")
}
}
}

Kullanım


Asenkronize


Bu, eşzamansız (engellemesiz) bir örnektir, bu nedenle işler rastgele sırada işlenir. Program asla çıkmaz. İş sonuçları birbirinden bağımsız olarak yazdırılır. Birden fazla işçi bulundurmak önemlidir.


package main

import (
"fmt"

"internal/worker"
)

func main() {
// Create new worker(s) and start.
w := worker.NewWorker(3)
w.Start()

go func() {
// Add jobs.
for i := 1; i <= 5; i++ {
w.Add(worker.NewJob(i))
}
}()

// Print results.
for v := range w.Result() {
fmt.Println("Result:", v)
}
}

1 running...
2 running...
3 running...
1 picked up job 1 @ 2020-04-11 18:47:12.496663 +0000 UTC
1 completed job 1 @ 2020-04-11 18:47:12.496757 +0000 UTC
Result: 1
1 picked up job 2 @ 2020-04-11 18:47:12.49679 +0000 UTC
2 waiting...
3 waiting...
2 picked up job 3 @ 2020-04-11 18:47:13.496786 +0000 UTC
2 completed job 3 @ 2020-04-11 18:47:13.496839 +0000 UTC
3 picked up job 4 @ 2020-04-11 18:47:13.496813 +0000 UTC
Result: 3
1 completed job 2 @ 2020-04-11 18:47:14.497218 +0000 UTC
1 picked up job 5 @ 2020-04-11 18:47:14.497286 +0000 UTC
Result: 2
2 waiting...
2 waiting...
1 completed job 5 @ 2020-04-11 18:47:16.497519 +0000 UTC
Result: 5
2 waiting...
1 waiting...
3 completed job 4 @ 2020-04-11 18:47:17.501962 +0000 UTC
Result: 4
2 waiting...
1 waiting...
3 waiting...
2 waiting...
1 waiting...
3 waiting...
2 waiting...
1 waiting...

Senkronize


Bu eşzamanlı (engellemeli) bir örnektir, bu nedenle işler sırayla işlenir. İşler tamamlandığında program kapanır. İş sonuçları sırayla yazdırılır. Bir seferde bir iş işlendiğinden birden fazla işleyicinin olması anlamsızdır.


package main

import (
"fmt"

"internal/worker"
)

func main() {
// Create new worker(s) and start.
w := worker.NewWorker(3)
w.Start()

// Add jobs.
w.Add(worker.NewJob(1))
// Print results.
v := <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(2))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(3))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(4))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(5))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)
}

1 running...
3 running...
2 running...
1 picked up job 1 @ 2020-04-11 18:52:14.135716 +0000 UTC
3 waiting...
2 waiting...
3 waiting...
2 waiting...
2 waiting...
3 waiting...
1 completed job 1 @ 2020-04-11 18:52:18.137946 +0000 UTC
Result: 1
3 waiting...
2 waiting...
3 picked up job 2 @ 2020-04-11 18:52:18.149216 +0000 UTC
1 waiting...
2 waiting...
1 waiting...
3 completed job 2 @ 2020-04-11 18:52:21.150342 +0000 UTC
Result: 2
2 waiting...
2 picked up job 3 @ 2020-04-11 18:52:21.153484 +0000 UTC
2 completed job 3 @ 2020-04-11 18:52:21.153534 +0000 UTC
Result: 3
1 waiting...
1 picked up job 4 @ 2020-04-11 18:52:22.144573 +0000 UTC
3 waiting...
2 waiting...
2 waiting...
3 waiting...
2 waiting...
1 completed job 4 @ 2020-04-11 18:52:26.146923 +0000 UTC
Result: 4
3 waiting...
2 waiting...
3 picked up job 5 @ 2020-04-11 18:52:26.164251 +0000 UTC
1 waiting...
2 waiting...
3 completed job 5 @ 2020-04-11 18:52:27.167167 +0000 UTC
Result: 5