In this example we are going to have two types of event-driven programming examples with Golang. They are very similar to each other but the main difference between both is the channel usage. The first example doesn't use channels. You are never meant to touch the internal/pkg/event folder so you can consider it as an external library package. The user folder is your code.


This is an opinionated yet very straight forward "even driven" example. Events are dispatched in a certain way however the way they are handled is up to you. Dispatching events can be handled in a blocking or non-blocking manner (as goroutine). It has three simple steps as listed below.


  1. Create an instance of the event dispatcher. (done at application bootstrap)

  2. Register events and listeners. (done at application bootstrap)

  3. Dispatch events.

Properties



Example 1 - without channels


Structure


├── cmd
│   └── client
│   └── main.go
└── internal
├── pkg
│   └── event
│   ├── dispatcher.go
│   ├── event.go
│   └── listener.go
└── user
├── created.go
├── deleted.go
├── listener.go
└── updated.go

Files


cmd/client/main.go

package main

import (
"context"
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
"github.com/inanzzz/client/internal/user"
)

func main() {
// ----------------------------------------------------------------------------------------
// Register events with listeners at application boot
dispatcher := event.NewDispatcher()
if err := dispatcher.Register(user.Listener{}, user.Created, user.Updated); err != nil {
log.Fatalln(err)
}

// ----------------------------------------------------------------------------------------
// Dispatch registered events. Valid.
go func() {
err := dispatcher.Dispatch(context.Background(), user.Created, user.CreatedEvent{
Time: time.Now().UTC(),
ID: "111",
})
if err != nil {
log.Println(err)
}
}()

go func() {
err := dispatcher.Dispatch(context.Background(), user.Updated, user.UpdatedEvent{
Time: time.Now().UTC(),
ID: "222",
Key: "name",
OldValue: "inanzzz",
NewValue: "zzznani",
})
if err != nil {
log.Println(err)
}
}()

// ----------------------------------------------------------------------------------------
// Dispatch a valid event type to unregistered event name. Error.
go func() {
err := dispatcher.Dispatch(context.Background(), user.Deleted, user.DeletedEvent{
Time: time.Now().UTC(),
ID: "333",
Who: "admin",
})
if err != nil {
log.Println(err)
}
}()

// ----------------------------------------------------------------------------------------
// Dispatch a wrong event type to registered event name. Error.
go dispatcher.Dispatch(context.Background(), user.Created, nil)
go dispatcher.Dispatch(context.Background(), user.Updated, "hi")
go dispatcher.Dispatch(context.Background(), user.Created, 123)
go dispatcher.Dispatch(context.Background(), user.Updated, struct{}{})
go dispatcher.Dispatch(context.Background(), user.Created, make(chan int))

select {}
}

internal/pkg/event/dispatcher.go

package event

import (
"context"
"fmt"
)

type Dispatcher struct {
events map[Name]Listener
}

func NewDispatcher() *Dispatcher {
return &Dispatcher{
events: make(map[Name]Listener),
}
}

func (d *Dispatcher) Register(listener Listener, names ...Name) error {
for _, name := range names {
if _, ok := d.events[name]; ok {
return fmt.Errorf("the '%s' event is already registered", name)
}

d.events[name] = listener
}

return nil
}

func (d *Dispatcher) Dispatch(ctx context.Context, name Name, event interface{}) error {
if _, ok := d.events[name]; !ok {
return fmt.Errorf("the '%s' event is not registered", name)
}

d.events[name].Listen(ctx, event)

return nil
}

internal/pkg/event/event.go

package event

import (
"context"
)

// All custom events names must be of this type.
type Name string

// All custom event types must satisfy this interface.
type Event interface {
Handle(ctx context.Context)
}

internal/pkg/event/listener.go

package event

import (
"context"
)

// All custom event listeners must satisfy this interface.
type Listener interface {
Listen(ctx context.Context, event interface{})
}

internal/user/created.go

package user

import (
"context"
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
)

const Created event.Name = "user.created"

type CreatedEvent struct {
Time time.Time
ID string
}

func (e CreatedEvent) Handle(ctx context.Context) {
log.Printf("creating: %+v\n", e)
}

internal/user/deleted.go

package user

import (
"context"
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
)

const Deleted event.Name = "user.deleted"

type DeletedEvent struct {
Time time.Time
ID string
Who string
}

func (e DeletedEvent) Handle(ctx context.Context) {
log.Printf("deleting: %+v\n", e)
}

internal/user/updated.go

package user

import (
"context"
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
)

const Updated event.Name = "user.updated"

type UpdatedEvent struct {
Time time.Time
ID string
Key string
OldValue string
NewValue string
}

func (e UpdatedEvent) Handle(ctx context.Context) {
log.Printf("updating: %+v\n", e)
}

internal/user/listener.go

package user

import (
"context"
"log"
)

type Listener struct{}

func (u Listener) Listen(ctx context.Context, event interface{}) {
switch event := event.(type) {
case CreatedEvent:
event.Handle(ctx)
case UpdatedEvent:
event.Handle(ctx)
case DeletedEvent:
event.Handle(ctx)
default:
log.Printf("registered an invalid user event: %T\n", event)
}
}

Test


2020/05/27 21:08:53 creating: {Time:2020-05-27 20:08:53.272564 +0000 UTC ID:111}
2020/05/27 21:08:53 registered an invalid user event:
2020/05/27 21:08:53 the 'user.deleted' event is not registered
2020/05/27 21:08:53 registered an invalid user event: string
2020/05/27 21:08:53 updating: {Time:2020-05-27 20:08:53.273203 +0000 UTC ID:222 Key:name OldValue:inanzzz NewValue:zzznani}
2020/05/27 21:08:53 registered an invalid user event: int
2020/05/27 21:08:53 registered an invalid user event: struct {}
2020/05/27 21:08:53 registered an invalid user event: chan int

Example 2 - with channels (preferred)


If you wish to implement channel priority option, you can look into the previous blog post I wrote. Note: Check the third example (late addition) below.


Structure


├── cmd
│   └── client
│   └── main.go
└── internal
├── pkg
│   └── event
│   ├── dispatcher.go
│   ├── event.go
│   ├── job.go
│   └── listener.go
└── user
├── created.go
├── deleted.go
├── listener.go
└── updated.go

Files


cmd/client/main.go

package main

import (
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
"github.com/inanzzz/client/internal/user"
)

func main() {
// ----------------------------------------------------------------------------------------
// Register events with listeners at application boot
dispatcher := event.NewDispatcher()
if err := dispatcher.Register(user.Listener{}, user.Created, user.Updated); err != nil {
log.Fatalln(err)
}

// ----------------------------------------------------------------------------------------
// Dispatch registered events. Valid.
go func() {
err := dispatcher.Dispatch(user.Created, user.CreatedEvent{
Time: time.Now().UTC(),
ID: "111",
})
if err != nil {
log.Println(err)
}
}()

go func() {
err := dispatcher.Dispatch(user.Updated, user.UpdatedEvent{
Time: time.Now().UTC(),
ID: "222",
Key: "name",
OldValue: "inanzzz",
NewValue: "zzznani",
})
if err != nil {
log.Println(err)
}
}()

// ----------------------------------------------------------------------------------------
// Dispatch a valid event type to unregistered event name. Error.
go func() {
err := dispatcher.Dispatch(user.Deleted, user.DeletedEvent{
Time: time.Now().UTC(),
ID: "333",
Who: "admin",
})
if err != nil {
log.Println(err)
}
}()

// ----------------------------------------------------------------------------------------
// Dispatch a wrong event type to registered event name. Error.
go dispatcher.Dispatch(user.Created, nil)
go dispatcher.Dispatch(user.Updated, "hi")
go dispatcher.Dispatch(user.Created, 123)
go dispatcher.Dispatch(user.Updated, struct{}{})
go dispatcher.Dispatch(user.Created, make(chan int))

select {}
}

internal/pkg/event/dispatcher.go

package event

import (
"fmt"
)

type Dispatcher struct {
jobs chan job
events map[Name]Listener
}

func NewDispatcher() *Dispatcher {
d := &Dispatcher{
jobs: make(chan job),
events: make(map[Name]Listener),
}

go d.consume()

return d
}

func (d *Dispatcher) Register(listener Listener, names ...Name) error {
for _, name := range names {
if _, ok := d.events[name]; ok {
return fmt.Errorf("the '%s' event is already registered", name)
}

d.events[name] = listener
}

return nil
}

func (d *Dispatcher) Dispatch(name Name, event interface{}) error {
if _, ok := d.events[name]; !ok {
return fmt.Errorf("the '%s' event is not registered", name)
}

d.jobs <- job{eventName: name, eventType: event}

return nil
}

func (d *Dispatcher) consume() {
for job := range d.jobs {
d.events[job.eventName].Listen(job.eventType)
}
}

internal/pkg/event/event.go

package event

// All custom events names must be of this type.
type Name string

// All custom event types must satisfy this interface.
type Event interface {
Handle()
}

internal/pkg/event/job.go

package event

// job represents events. When a new event is dispatched, it
// gets tuned into a job and put into `Dispatcher.jobs` channel.
type job struct {
eventName Name
eventType interface{}
}

internal/pkg/event/listener.go

package event

// All custom event listeners must satisfy this interface.
type Listener interface {
Listen(event interface{})
}

internal/user/created.go

package user

import (
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
)

const Created event.Name = "user.created"

type CreatedEvent struct {
Time time.Time
ID string
}

func (e CreatedEvent) Handle() {
log.Printf("creating: %+v\n", e)
}

internal/user/deleted.go

package user

import (
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
)

const Deleted event.Name = "user.deleted"

type DeletedEvent struct {
Time time.Time
ID string
Who string
}

func (e DeletedEvent) Handle() {
log.Printf("deleting: %+v\n", e)
}

internal/user/updated.go

package user

import (
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
)

const Updated event.Name = "user.updated"

type UpdatedEvent struct {
Time time.Time
ID string
Key string
OldValue string
NewValue string
}

func (e UpdatedEvent) Handle() {
log.Printf("updating: %+v\n", e)
}

internal/user/listener.go

package user

import (
"log"
)

type Listener struct{}

func (u Listener) Listen(event interface{}) {
switch event := event.(type) {
case CreatedEvent:
event.Handle()
case UpdatedEvent:
event.Handle()
case DeletedEvent:
event.Handle()
default:
log.Printf("registered an invalid user event: %T\n", event)
}
}

Test


2020/05/27 21:12:01 creating: {Time:2020-05-27 20:12:01.556947 +0000 UTC ID:111}
2020/05/27 21:12:01 the 'user.deleted' event is not registered
2020/05/27 21:12:01 updating: {Time:2020-05-27 20:12:01.557059 +0000 UTC ID:222 Key:name OldValue:inanzzz NewValue:zzznani}
2020/05/27 21:12:01 registered an invalid user event:
2020/05/27 21:12:01 registered an invalid user event: string
2020/05/27 21:12:01 registered an invalid user event: int
2020/05/27 21:12:01 registered an invalid user event: struct {}
2020/05/27 21:12:01 registered an invalid user event: chan int

Example 3 - without channels


Just to let you know, this is a late addition to this post. It is simplified version compared to other two above. You don't have to implement your listener. All you have to do is, create your event and register as usual. That's all. If you added the channel feature to this example, you could consider it to be the preferred version! Either way, this is my preferred choice.


Structure


├── cmd
│   └── client
│   └── main.go
├── go.mod
└── internal
├── pkg
│   └── event
│   ├── dispatcher.go
│   └── event.go
└── user
├── create.go
├── delete.go
└── update.go

Files


cmd/client/main.go

package main

import (
"context"
"log"
"time"

"github.com/inanzzz/client/internal/pkg/event"
"github.com/inanzzz/client/internal/user"
)

func main() {
// ----------------------------------------------------------------------------------------
// Register events with dispatcher at application boot
dispatcher := event.NewDispatcher()
dispatcher.Register(user.AfterCreate{}, user.BeforeUpdate{}, user.AfterUpdate{})

// ----------------------------------------------------------------------------------------
// Dispatch registered events. Valid.
go func() {
err := dispatcher.Dispatch(context.Background(), user.AfterCreate{
Time: time.Now().UTC(),
ID: "111",
})
if err != nil {
log.Println(err)
}
}()

go func() {
err := dispatcher.Dispatch(context.Background(), user.BeforeUpdate{
Time: time.Now().UTC(),
ID: "222",
Key: "name",
OldValue: "inanzzz",
})
if err != nil {
log.Println(err)
}
}()

go func() {
err := dispatcher.Dispatch(context.Background(), user.AfterUpdate{
Time: time.Now().UTC(),
ID: "222",
Key: "name",
NewValue: "zzznani",
})
if err != nil {
log.Println(err)
}
}()

// ----------------------------------------------------------------------------------------
// Dispatch unregistered event. Error.
go func() {
err := dispatcher.Dispatch(context.Background(), user.AfterDelete{
Time: time.Now().UTC(),
ID: "333",
Who: "admin",
})
if err != nil {
log.Println(err)
}
}()

select {}
}

internal/pkg/event/dispatcher.go

package event

import (
"context"
"fmt"
"reflect"
)

type Dispatcher struct {
events []string
}

func NewDispatcher() *Dispatcher {
return &Dispatcher{}
}

func (d *Dispatcher) Register(events ...Event) {
for _, v := range events {
d.events = append(d.events, reflect.TypeOf(v).String())
}
}

func (d *Dispatcher) Dispatch(ctx context.Context, event Event) error {
name := reflect.TypeOf(event).String()

for _, v := range d.events {
if v == name {
return event.Handle(ctx)
}
}

return fmt.Errorf("%s is not a registered event", name)
}

internal/pkg/event/event.go

package event

import (
"context"
)

// All custom events must satisfy this interface.
type Event interface {
Handle(ctx context.Context) error
}

internal/user/create.go

package user

import (
"context"
"log"
"time"
)

type AfterCreate struct {
Time time.Time
ID string
}

func (event AfterCreate) Handle(ctx context.Context) error {
log.Printf("after create: %+v\n", event)
return nil
}

internal/user/delete.go

package user

import (
"context"
"log"
"time"
)

type AfterDelete struct {
Time time.Time
ID string
Who string
}

func (event AfterDelete) Handle(ctx context.Context) error {
log.Printf("after delete: %+v\n", event)
return nil
}

internal/user/update.go

package user

import (
"context"
"log"
"time"
)

type BeforeUpdate struct {
Time time.Time
ID string
Key string
OldValue string
}

func (event BeforeUpdate) Handle(ctx context.Context) error {
log.Printf("before update: %+v\n", event)
return nil
}

type AfterUpdate struct {
Time time.Time
ID string
Key string
NewValue string
}

func (event AfterUpdate) Handle(ctx context.Context) error {
log.Printf("after update: %+v\n", event)
return nil
}

Test


2020/07/08 15:40:10 after create: {Time:2020-07-08 14:40:10.024471 +0000 UTC ID:111}
2020/07/08 15:40:10 before update: {Time:2020-07-08 14:40:10.024654 +0000 UTC ID:222 Key:name OldValue:inanzzz}
2020/07/08 15:40:10 after update: {Time:2020-07-08 14:40:10.024816 +0000 UTC ID:222 Key:name NewValue:zzznani}
2020/07/08 15:40:10 the 'user.AfterDelete' event is not registered