Bu örnekte, eşzamansız iş işleme için kullanılacak bir çalışan havuz paketi oluşturacağız. Her havuzun sınırlı görevleri olabilir ve her görev, herhangi bir zamanda sınırlı işleri çalıştırabilir. Bu sayede sistem kaynakları dövülmez. Hem görev hem de iş sayımı için uygun bir sayı bulmak size kalmıştır.


Yapı


├── job
│   └── greeting.go
├── main.go
└── task
├── pool.go
└── task.go

Dosyalar


pool.go


package task

// Pool represents a pool that runs one or many tasks within.
type Pool struct {
// taskChan contains tasks running within the pool. A pool is allowed to run
// limited amount of tasks at any given time. This is so the system
// resources are not exhausted.
taskChan chan struct{}
}

func NewPool(maxTasks int) *Pool {
return &Pool{
taskChan: make(chan struct{}, maxTasks),
}
}

// Allocate allocates a free space to a new task in the pool.
func (p *Pool) Allocate() {
p.taskChan <- struct{}{}
}

// Release releases an allocated space to the pool.
func (p *Pool) Release() {
<-p.taskChan
}

task.go


package task

import (
"context"
"fmt"
"sync"
)

// jobber is dedicated for custom job types within the application.
type jobber interface {
ID() string
Run(ctx context.Context) error
}

// Task represents a task that runs one or many jobs within. Depending on the
// `earlyExit` parameter, a task will either be terminated as soon as an error
// occurs in a job or carries on running all jobs until they are finished even
// if an error occurs.
type Task struct {
// id represents an identifier for the task.
id string

// jobChan contains jobs (goroutines) running within the task. A task is
// allowed to run limited amount of jobs at any given time. This is so the
// system resources are not exhausted.
jobChan chan jobber

// jobGroup is used to wait for already running jobs to finish.
jobGroup sync.WaitGroup

// errStore is used to store job errors and optionally used to terminate the
// task immediately only if it was created as "early exit".
errStore sync.Map

// earlyExit defines if the task should be terminated immediately in case of
// an error.
earlyExit bool
}

func New(id string, maxJobs int, earlyExit bool) *Task {
return &Task{
id: id,
jobChan: make(chan jobber, maxJobs),
jobGroup: sync.WaitGroup{},
errStore: sync.Map{},
earlyExit: earlyExit,
}
}

// ID returns task identifier.
func (t *Task) ID() string {
return t.id
}

// Add attempts to add a new job to the task if there is a free slot otherwise
// waits in the queue. Once in the queue job is started.
func (t *Task) Add(ctx context.Context, job jobber) {
if t.earlyExit {
if _, ok := t.errStore.Load(struct{}{}); ok {
return
}
}

t.jobChan <- job

t.jobGroup.Add(1)

go func() {
defer func() {
t.jobGroup.Done()

<-t.jobChan
}()

select {
case <-ctx.Done():
t.errStore.Store(struct{}{}, fmt.Errorf("job: %s - err: %w", job.ID(), ctx.Err()))

return
default:
}

if err := job.Run(ctx); err != nil {
t.errStore.Store(struct{}{}, fmt.Errorf("job: %s - err: %w", job.ID(), err))

return
}
}()
}

// Wait waits for all active jobs to finish before exiting the task. No matter
// how many jobs have failed, always the last registered error is returned. Also
// no matter how the task was created (`earlyExit` true/false), this will always
// return registered error if there was any.
func (t *Task) Wait() error {
t.jobGroup.Wait()

if err, ok := t.errStore.Load(struct{}{}); ok {
return err.(error)
}

return nil
}

greeting.go


Bu, rastgele geç kalan veya gösteri amacıyla başarısız olan özel işinizdir.


package job

import (
"context"
"fmt"
"log/slog"
"math/rand/v2"
"time"
)

type Greeting struct {
Identifier string
}

func (g Greeting) ID() string {
return g.Identifier
}

func (g Greeting) Run(ctx context.Context) error {
var s int

if g.Identifier == "A 3" || g.Identifier == "B 3" {
s = 5
time.Sleep(time.Second * time.Duration(s))
} else if g.Identifier == "A 5" || g.Identifier == "B 5" {
s = 3
time.Sleep(time.Second * time.Duration(s))
} else if g.Identifier == "A 6" || g.Identifier == "B 6" {
return fmt.Errorf("purposely failed")
} else {
s = rand.IntN(900)

time.Sleep(time.Millisecond * time.Duration(s))
}

slog.Info("hello...", slog.String("id", g.Identifier), slog.Int("ttl", s))

return nil
}

main.go


Tek görev.


package main

import (
"context"
"fmt"
"log/slog"
"time"

"random/job"
"random/task"
)

func main() {
slog.Info("MAIN START")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*59)
defer cancel()

pool := task.NewPool(2)
pool.Allocate()
defer pool.Release()

// task := task.New("A", 5, true) // Early exit task
task := task.New("B", 10, false) // Not an early exit task

slog.Info("LOOP START", slog.String("task", task.ID()))

for i := range 250 {
task.Add(ctx, job.Greeting{
Identifier: fmt.Sprintf("%s %d", task.ID(), i),
})
}

slog.Info("LOOP END", slog.String("task", task.ID()))

if err := task.Wait(); err != nil {
slog.Error(err.Error(), slog.String("task", task.ID()))
}

slog.Info("MAIN EXIT")
}

Çoklu görev.


package main

import (
"context"
"fmt"
"log"
"log/slog"
"runtime"
"sync"
"time"

"random/job"
"random/task"
)

func main() {
// -------------------------------------------------------------------------
// This is debugging the goroutine count, CPU and RAM usage
go func() {
mem := &runtime.MemStats{}

for {
cpu := runtime.NumCPU()
rot := runtime.NumGoroutine()
runtime.ReadMemStats(mem)

log.Printf(">>> R (%d) - C (%d) - M (%d)\n", rot, cpu, mem.Alloc)

time.Sleep(10 * time.Millisecond)
}
}()
// -------------------------------------------------------------------------

tasks := map[*task.Task]int{
task.New("A", 5, true): 250, // Early exit task
task.New("B", 10, false): 500, // Not an early exit task
}

pool := task.NewPool(2)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*59)
defer cancel()

slog.Info("MAIN START")

var wg sync.WaitGroup
wg.Add(len(tasks))

for task, jobs := range tasks {
task := task
jobs := jobs

go func() {
defer wg.Done()

pool.Allocate()
defer pool.Release()

slog.Info("LOOP START", slog.String("task", task.ID()))

for i := range jobs {
task.Add(ctx, job.Greeting{
Identifier: fmt.Sprintf("%s %d", task.ID(), i),
})
}

slog.Info("LOOP END", slog.String("task", task.ID()))

if err := task.Wait(); err != nil {
slog.Error(err.Error(), slog.String("task", task.ID()))
}
}()
}

wg.Wait()
slog.Info("MAIN EXIT")
}