In this example we are going to create a worker package which will be used for synchronous and asynchronous job processing cases.


Package


Although this package does what it promises, it needs three more bonus features.



This example uses "unbuffered" channels for jobs. However, if you wish to limit jobs at a time, you can update NewWorker() function to use "buffered" channels as shown below.


func NewWorker(workerTotal, jobTotal int) Worker {
return Worker{
total: workerTotal,
jobChan: make(chan Job, jobTotal),
resultChan: make(chan interface{}, jobTotal),
}
}

package worker

import (
"fmt"
"math/rand"
"time"
)

// -----------------------------------------------------------------------------

type Job struct {
// id represents the job identifier.
id interface{}
}

// NewJob returns a `Job` instance.
func NewJob(id interface{}) Job {
return Job{
id: id,
}
}

// -----------------------------------------------------------------------------

type Worker struct {
// total represents the amount of workers to be run at startup.
total int
// jobChan represents a two-way "unbuffered" channel that has unlimited
// capacity for the jobs.
jobChan chan Job
// resultChan represents a two-way "unbuffered" channel that has unlimited
// capacity for the job results.
resultChan chan interface{}
}

// NewWorker returns a `Worker` instance.
func NewWorker(workerTotal int) Worker {
return Worker{
total: workerTotal,
jobChan: make(chan Job),
resultChan: make(chan interface{}),
}
}

// Start brings up certain amount of worker(s) so that they can pick up and work
// on the job(s).
func (w Worker) Start() {
for i := 1; i <= w.total; i++ {
go w.run(i)
}
}

// Add adds a job to a channel so that it could be picked up and worked on by
// the running worker(s).
func (w Worker) Add(job Job) {
w.jobChan <- job
}

// Result returns a channel so that it could be ranged over in order to fetch
// job results from the running worker(s).
func (w Worker) Result() <-chan interface{} {
return w.resultChan
}

// run runs a worker and works on the job(s).
func (w Worker) run(id int) {
fmt.Println(id, "running...")

for {
select {
case job := <- w.jobChan:
fmt.Printf("%d picked up job %v @ %s\n", id, job.id, time.Now().UTC())

// Pretend like doing something.
rand.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(rand.Intn(len([]int{0, 1, 2, 3, 4}))) * time.Second)
// Done.

fmt.Printf("%d completed job %v @ %s\n", id, job.id, time.Now().UTC())
w.resultChan <- job.id
default:
time.Sleep(1 * time.Second)
fmt.Println(id, "waiting...")
}
}
}

Usage


Asynchronous


This is an asynchronous (non-blocking) example so the jobs are handled randomly in no order. Program never exits. Job results are printed independently from each other. It is important having multiple workers.


package main

import (
"fmt"

"internal/worker"
)

func main() {
// Create new worker(s) and start.
w := worker.NewWorker(3)
w.Start()

go func() {
// Add jobs.
for i := 1; i <= 5; i++ {
w.Add(worker.NewJob(i))
}
}()

// Print results.
for v := range w.Result() {
fmt.Println("Result:", v)
}
}

1 running...
2 running...
3 running...
1 picked up job 1 @ 2020-04-11 18:47:12.496663 +0000 UTC
1 completed job 1 @ 2020-04-11 18:47:12.496757 +0000 UTC
Result: 1
1 picked up job 2 @ 2020-04-11 18:47:12.49679 +0000 UTC
2 waiting...
3 waiting...
2 picked up job 3 @ 2020-04-11 18:47:13.496786 +0000 UTC
2 completed job 3 @ 2020-04-11 18:47:13.496839 +0000 UTC
3 picked up job 4 @ 2020-04-11 18:47:13.496813 +0000 UTC
Result: 3
1 completed job 2 @ 2020-04-11 18:47:14.497218 +0000 UTC
1 picked up job 5 @ 2020-04-11 18:47:14.497286 +0000 UTC
Result: 2
2 waiting...
2 waiting...
1 completed job 5 @ 2020-04-11 18:47:16.497519 +0000 UTC
Result: 5
2 waiting...
1 waiting...
3 completed job 4 @ 2020-04-11 18:47:17.501962 +0000 UTC
Result: 4
2 waiting...
1 waiting...
3 waiting...
2 waiting...
1 waiting...
3 waiting...
2 waiting...
1 waiting...

Synchronous


This is an synchronous (blocking) example so the jobs are handled one after another in order. Program exits when jobs are completed. Job results are printed one after another in order. It is pointless having multiple workers as one job is processed at a time.


package main

import (
"fmt"

"internal/worker"
)

func main() {
// Create new worker(s) and start.
w := worker.NewWorker(3)
w.Start()

// Add jobs.
w.Add(worker.NewJob(1))
// Print results.
v := <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(2))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(3))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(4))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)

// Add jobs.
w.Add(worker.NewJob(5))
// Print results.
v = <- w.Result()
fmt.Println("Result:", v)
}

1 running...
3 running...
2 running...
1 picked up job 1 @ 2020-04-11 18:52:14.135716 +0000 UTC
3 waiting...
2 waiting...
3 waiting...
2 waiting...
2 waiting...
3 waiting...
1 completed job 1 @ 2020-04-11 18:52:18.137946 +0000 UTC
Result: 1
3 waiting...
2 waiting...
3 picked up job 2 @ 2020-04-11 18:52:18.149216 +0000 UTC
1 waiting...
2 waiting...
1 waiting...
3 completed job 2 @ 2020-04-11 18:52:21.150342 +0000 UTC
Result: 2
2 waiting...
2 picked up job 3 @ 2020-04-11 18:52:21.153484 +0000 UTC
2 completed job 3 @ 2020-04-11 18:52:21.153534 +0000 UTC
Result: 3
1 waiting...
1 picked up job 4 @ 2020-04-11 18:52:22.144573 +0000 UTC
3 waiting...
2 waiting...
2 waiting...
3 waiting...
2 waiting...
1 completed job 4 @ 2020-04-11 18:52:26.146923 +0000 UTC
Result: 4
3 waiting...
2 waiting...
3 picked up job 5 @ 2020-04-11 18:52:26.164251 +0000 UTC
1 waiting...
2 waiting...
3 completed job 5 @ 2020-04-11 18:52:27.167167 +0000 UTC
Result: 5