In this example we are going to add one or many jobs to a task then start simultaneously process each jobs. If all jobs run without an issue, a task is considered as success, otherwise failure. If the context is cancelled or a job fails, task is considered as failure and the process is terminated immediately.


job.go


package task

import "context"

// Every custom job satisfy this interface.
type JobManager interface {
ID() interface{}
Run(ctx context.Context) error
}

result.go


package task

// Result contains the final outcome of the running task which might have
// multiple jobs within. The running task returns this type to indicate
// whether the task has succeeded or not. If a single job within the task
// fails, the whole task is marked as failed and the execution is terminated.
// The Success field indicates whether the task has successfully processed
// all the jobs or not. The Error field holds the error message that caused
// the job to fail.
type Result struct {
Success bool
Error error
}

task.go


package task

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"
)

// Task provides simultaneous batch job handling functionality.
type Task struct {
id string
jobs []JobManager
}

// New initiates and returns a Task type.
func New(taskID string) *Task {
return &Task{
id: taskID,
}
}

// ID returns the id of the task.
func (t *Task) ID() string {
return t.id
}

// Add adds a new job to the job list. It is called before the Start method.
// This is not thread safe, so do not call it as a goroutine.
func (t *Task) Add(job JobManager) {
t.jobs = append(t.jobs, job)
}

// Start starts running all the jobs in the list simultaneously. It returns
// a Result type to describe the outcome of the task handling. If the context
// is cancelled or a job fails, task is considered as failure and the process
// is terminated immediately.
func (t *Task) Start(ctx context.Context) Result {
erg, ctx := errgroup.WithContext(ctx)

for _, job := range t.jobs {
job := job

erg.Go(func() error {
select {
case <-ctx.Done():
return fmt.Errorf("context: %w", ctx.Err())
default:
}

if err := job.Run(ctx); err != nil {
return fmt.Errorf("job %v has failed: %w", job.ID(), err)
}
return nil
})
}

if err := erg.Wait(); err != nil {
return Result{Error: err}
}

return Result{Success: true}
}

echo.go


This is your custom job. As long as your job implements the JobManager interface, it is fine.


package echo

import (
"context"
"fmt"
"log"
)

type Echo struct {
id int
message string
}

func New(id int, message string) *Echo {
return &Echo{
id: id,
message: message,
}
}

func (e *Echo) ID() interface{} {
return e.id
}

func (e *Echo) Run(ctx context.Context) error {
// Sending it ...

// Error simulation on purpose!
if e.id == 10 {
return fmt.Errorf("failed to echo message %d", e.id)
}

log.Printf(e.message)
//time.Sleep(time.Second)

return nil
}

main.go


package main

import (
"context"
"fmt"
"log"

"github.com/you/client/internal/echo"
"github.com/you/client/internal/task"
)

func main() {
// To simulate context deadline.
//ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond*800)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Add batch jobs in the task.
tsk := task.New("task-1")
for i := 1; i <= 30; i++ {
tsk.Add(echo.New(i, fmt.Sprintf("job %d says hello", i)))
}

// Process jobs in the batch.
log.Printf("task %s started", tsk.ID())
res := tsk.Start(ctx)
if !res.Success {
log.Fatalf("task %s failed: %s", tsk.ID(), res.Error.Error())
}
log.Printf("task %s succeeded", tsk.ID())
}

Test


Success


$ go run -race main.go
2021/08/16 14:39:07 task task-1 started
2021/08/16 14:39:07 job 1 says hello
2021/08/16 14:39:07 job 2 says hello
2021/08/16 14:39:07 job 3 says hello
2021/08/16 14:39:07 job 4 says hello
2021/08/16 14:39:07 job 5 says hello
2021/08/16 14:39:07 job 10 says hello
2021/08/16 14:39:07 job 11 says hello
2021/08/16 14:39:07 job 12 says hello
2021/08/16 14:39:07 job 13 says hello
2021/08/16 14:39:07 job 6 says hello
2021/08/16 14:39:07 job 7 says hello
2021/08/16 14:39:07 job 8 says hello
2021/08/16 14:39:07 job 9 says hello
2021/08/16 14:39:07 job 20 says hello
2021/08/16 14:39:07 job 14 says hello
2021/08/16 14:39:07 job 16 says hello
2021/08/16 14:39:07 job 15 says hello
2021/08/16 14:39:07 job 23 says hello
2021/08/16 14:39:07 job 19 says hello
2021/08/16 14:39:07 job 22 says hello
2021/08/16 14:39:07 job 30 says hello
2021/08/16 14:39:07 job 21 says hello
2021/08/16 14:39:07 job 25 says hello
2021/08/16 14:39:07 job 29 says hello
2021/08/16 14:39:07 job 26 says hello
2021/08/16 14:39:07 job 24 says hello
2021/08/16 14:39:07 job 18 says hello
2021/08/16 14:39:07 job 28 says hello
2021/08/16 14:39:07 job 27 says hello
2021/08/16 14:39:07 job 17 says hello
2021/08/16 14:39:07 task task-1 succeeded

Context timeout


$ go run -race main.go
2021/08/15 20:27:30 task task-1 started
2021/08/15 20:27:30 job 30 says hello
2021/08/15 20:27:30 job 1 says hello
2021/08/15 20:27:30 job 2 says hello
2021/08/15 20:27:30 task task-1 failed: context: context deadline exceeded
exit status 1

Job error


$ go run -race main.go
2021/08/15 19:38:17 task task-1 started
2021/08/15 19:38:17 job 1 says hello
2021/08/15 19:38:17 job 2 says hello
2021/08/15 19:38:17 job 3 says hello
2021/08/15 19:38:17 job 4 says hello
2021/08/15 19:38:17 job 5 says hello
2021/08/15 19:38:17 job 6 says hello
2021/08/15 19:38:17 job 7 says hello
2021/08/15 19:38:17 job 8 says hello
2021/08/15 19:38:17 job 9 says hello
2021/08/15 19:38:17 job 14 says hello
2021/08/15 19:38:17 job 11 says hello
2021/08/15 19:38:17 job 12 says hello
2021/08/15 19:38:17 job 17 says hello
2021/08/15 19:38:17 task task-1 failed: job 10 has failed: failed to echo message 10