Server Sent Event (SSE) technology uses a single long live HTTP/1.1 or HTTP/2 connection to send data (text/string) from web server to browser in "real-time". It is similar to WebSockets technology but one-way model. Client (browser) initiates a connection to a server using EventSource interface and server streams data back using this connection. If the connection breaks for any reason, EventSource would automatically try to reconnect. There is one particular limitation which is related to concurrent connection limits (open tabs per browser). At the time of writing, common limit for the HTTP/1.1 is 6 and for the HTTP/2 about 100 (configurable).


In this example, we are going to send requests to a write endpoint to populate database. Behind the scene, the read endpoint streams these data back to the browser for listing purposes.



Files


index.html


<!DOCTYPE html>
<html>
<head>
<title>Audit events</title>
<script>
const endpoint = new EventSource("http://localhost:8888/api/v1/audits");

endpoint.onmessage = function(audit) {
const container = document.getElementById("records");
const record = document.createElement("p");

container.append(audit.data, record);
};
</script>
</head>
<body>
<div id="records"></div>
</body>
</html>

main.go


package main

import (
"net/http"
"time"

"sse/api"
"sse/storage"
)

func main() {
auditStorage := storage.NewAudit()
auditHandler := api.NewAudit(auditStorage, time.Second)

rtr := http.NewServeMux()
rtr.HandleFunc("POST /api/v1/audits", auditHandler.Write)
rtr.HandleFunc("GET /api/v1/audits", auditHandler.Read)

http.ListenAndServe(":8888", rtr)
}

api/audit.go


package api

import (
"encoding/json"
"fmt"
"net/http"
"time"

"sse/model"
)

type storer interface {
Insert(audit *model.Audit)
List(since time.Time) []*model.Audit
}

type Audit struct {
storage storer
delay time.Duration
}

func NewAudit(str storer, delay time.Duration) Audit {
return Audit{
storage: str,
delay: delay,
}
}

func (a Audit) Write(w http.ResponseWriter, r *http.Request) {
var req model.Request

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
fmt.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

a.storage.Insert(&model.Audit{
ID: req.ID,
CreatedAt: time.Now(),
})
}

func (a Audit) Read(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Access-Control-Allow-Origin", "*") // In prod, only allow known origins!
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")

since := time.Now()

for {
audits := a.storage.List(since)
since = time.Now()

for _, audit := range audits {
res, err := json.Marshal(model.Response{
ID: audit.ID,
CreatedAt: audit.CreatedAt,
})
if err != nil {
fmt.Println(err)
continue
}

fmt.Fprintf(w, "data: %v\n\n", string(res))
flusher.Flush()
}

<-time.After(a.delay)
}

// TODO: Make sure to disconnect as per your feature needs. I am skipping it.
}

model/api.go


package model

import "time"

type Request struct {
ID string `json:"id"`
}

type Response struct {
ID string `json:"id"`
CreatedAt time.Time `json:"created_at"`
}

model/storage.go


package model

import "time"

type Audit struct {
ID string
CreatedAt time.Time
}

storage/audit.go


package storage

import (
"sync"
"time"

"sse/model"
)

type Audit struct {
records *sync.Map
}

func NewAudit() Audit {
return Audit{
records: &sync.Map{},
}
}

func (a Audit) Insert(audit *model.Audit) {
a.records.Store(audit.ID, audit)
}

func (a Audit) List(since time.Time) []*model.Audit {
var list []*model.Audit

a.records.Range(func(k, v any) bool {
audit := v.(*model.Audit)
if audit.CreatedAt.Equal(since) || audit.CreatedAt.After(since) {
list = append(list, audit)
}

return true
})

return list
}

Test


Open index.html in browser(s) with many tabs as you wish. Run application with go run -race main.go command and use command below to send requests to the write endpoint.


$ for i in `seq 1 10`; do curl -i -X POST -d '{"id": "'$i'"}' http://localhost:8888/api/v1/audits ; done

If you look at the browser, you should see output like below.


{"id":"1","created_at":"2024-06-09T16:01:57.770701+01:00"}
{"id":"2","created_at":"2024-06-09T16:01:57.801793+01:00"}
{"id":"3","created_at":"2024-06-09T16:01:58.803549+01:00"}
{"id":"4","created_at":"2024-06-09T16:01:59.800784+01:00"}
{"id":"5","created_at":"2024-06-09T16:02:00.798023+01:00"}
{"id":"6","created_at":"2024-06-09T16:02:01.797871+01:00"}
{"id":"7","created_at":"2024-06-09T16:02:02.799884+01:00"}
{"id":"8","created_at":"2024-06-09T16:02:03.801665+01:00"}
{"id":"9","created_at":"2024-06-09T16:02:04.809286+01:00"}
{"id":"10","created_at":"2024-06-09T16:02:05.803576+01:00"}