Bu örnekte bir göreve bir veya daha fazla iş ekleyeceğiz ve ardından her işi aynı anda işlemeye başlayacağız. Tüm işler sorunsuz çalışırsa, bu görev başarı, aksi takdirde başarısızlık olarak kabul edilir. Bağlam (context) iptal edilirse veya bir iş başarısız olursa, görev başarısızlık olarak kabul edilir ve süreç derhal sonlandırılır.


job.go


package task

import "context"

// Every custom job satisfy this interface.
type JobManager interface {
ID() interface{}
Run(ctx context.Context) error
}

result.go


package task

// Result contains the final outcome of the running task which might have
// multiple jobs within. The running task returns this type to indicate
// whether the task has succeeded or not. If a single job within the task
// fails, the whole task is marked as failed and the execution is terminated.
// The Success field indicates whether the task has successfully processed
// all the jobs or not. The Error field holds the error message that caused
// the job to fail.
type Result struct {
Success bool
Error error
}

task.go


package task

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"
)

// Task provides simultaneous batch job handling functionality.
type Task struct {
id string
jobs []JobManager
}

// New initiates and returns a Task type.
func New(taskID string) *Task {
return &Task{
id: taskID,
}
}

// ID returns the id of the task.
func (t *Task) ID() string {
return t.id
}

// Add adds a new job to the job list. It is called before the Start method.
// This is not thread safe, so do not call it as a goroutine.
func (t *Task) Add(job JobManager) {
t.jobs = append(t.jobs, job)
}

// Start starts running all the jobs in the list simultaneously. It returns
// a Result type to describe the outcome of the task handling. If the context
// is cancelled or a job fails, task is considered as failure and the process
// is terminated immediately.
func (t *Task) Start(ctx context.Context) Result {
erg, ctx := errgroup.WithContext(ctx)

for _, job := range t.jobs {
job := job

erg.Go(func() error {
select {
case <-ctx.Done():
return fmt.Errorf("context: %w", ctx.Err())
default:
}

if err := job.Run(ctx); err != nil {
return fmt.Errorf("job %v has failed: %w", job.ID(), err)
}
return nil
})
}

if err := erg.Wait(); err != nil {
return Result{Error: err}
}

return Result{Success: true}
}

echo.go


Bu sizin özel işinizdir. İşiniz JobManager arayüzünü uyguladığı sürece sorun yok.


package echo

import (
"context"
"fmt"
"log"
)

type Echo struct {
id int
message string
}

func New(id int, message string) *Echo {
return &Echo{
id: id,
message: message,
}
}

func (e *Echo) ID() interface{} {
return e.id
}

func (e *Echo) Run(ctx context.Context) error {
// Sending it ...

// Error simulation on purpose!
if e.id == 10 {
return fmt.Errorf("failed to echo message %d", e.id)
}

log.Printf(e.message)
//time.Sleep(time.Second)

return nil
}

main.go


package main

import (
"context"
"fmt"
"log"

"github.com/you/client/internal/echo"
"github.com/you/client/internal/task"
)

func main() {
// To simulate context deadline.
//ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond*800)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Add batch jobs in the task.
tsk := task.New("task-1")
for i := 1; i <= 30; i++ {
tsk.Add(echo.New(i, fmt.Sprintf("job %d says hello", i)))
}

// Process jobs in the batch.
log.Printf("task %s started", tsk.ID())
res := tsk.Start(ctx)
if !res.Success {
log.Fatalf("task %s failed: %s", tsk.ID(), res.Error.Error())
}
log.Printf("task %s succeeded", tsk.ID())
}

Test


Başarı


$ go run -race main.go
2021/08/16 14:39:07 task task-1 started
2021/08/16 14:39:07 job 1 says hello
2021/08/16 14:39:07 job 2 says hello
2021/08/16 14:39:07 job 3 says hello
2021/08/16 14:39:07 job 4 says hello
2021/08/16 14:39:07 job 5 says hello
2021/08/16 14:39:07 job 10 says hello
2021/08/16 14:39:07 job 11 says hello
2021/08/16 14:39:07 job 12 says hello
2021/08/16 14:39:07 job 13 says hello
2021/08/16 14:39:07 job 6 says hello
2021/08/16 14:39:07 job 7 says hello
2021/08/16 14:39:07 job 8 says hello
2021/08/16 14:39:07 job 9 says hello
2021/08/16 14:39:07 job 20 says hello
2021/08/16 14:39:07 job 14 says hello
2021/08/16 14:39:07 job 16 says hello
2021/08/16 14:39:07 job 15 says hello
2021/08/16 14:39:07 job 23 says hello
2021/08/16 14:39:07 job 19 says hello
2021/08/16 14:39:07 job 22 says hello
2021/08/16 14:39:07 job 30 says hello
2021/08/16 14:39:07 job 21 says hello
2021/08/16 14:39:07 job 25 says hello
2021/08/16 14:39:07 job 29 says hello
2021/08/16 14:39:07 job 26 says hello
2021/08/16 14:39:07 job 24 says hello
2021/08/16 14:39:07 job 18 says hello
2021/08/16 14:39:07 job 28 says hello
2021/08/16 14:39:07 job 27 says hello
2021/08/16 14:39:07 job 17 says hello
2021/08/16 14:39:07 task task-1 succeeded

Bağlam zaman aşımı


$ go run -race main.go
2021/08/15 20:27:30 task task-1 started
2021/08/15 20:27:30 job 30 says hello
2021/08/15 20:27:30 job 1 says hello
2021/08/15 20:27:30 job 2 says hello
2021/08/15 20:27:30 task task-1 failed: context: context deadline exceeded
exit status 1

İş hatası


$ go run -race main.go
2021/08/15 19:38:17 task task-1 started
2021/08/15 19:38:17 job 1 says hello
2021/08/15 19:38:17 job 2 says hello
2021/08/15 19:38:17 job 3 says hello
2021/08/15 19:38:17 job 4 says hello
2021/08/15 19:38:17 job 5 says hello
2021/08/15 19:38:17 job 6 says hello
2021/08/15 19:38:17 job 7 says hello
2021/08/15 19:38:17 job 8 says hello
2021/08/15 19:38:17 job 9 says hello
2021/08/15 19:38:17 job 14 says hello
2021/08/15 19:38:17 job 11 says hello
2021/08/15 19:38:17 job 12 says hello
2021/08/15 19:38:17 job 17 says hello
2021/08/15 19:38:17 task task-1 failed: job 10 has failed: failed to echo message 10