Bu örnekte, başarısız istekleri yeniden deneyebilen ve her deneme arasındaki süreyi yönetmek için değişken bekleme değerleri kullanabilen bir HTTP istemcisi oluşturacağız. Ayrıca rastgele zamanlar için rastgele değerler kullanır. Tüm bu seçenekler yapılandırılabilir ve istemci, kodunuzda kullandığınız yerel HTTP istemcisinin yerine geçecek şekilde kullanılabilir, böylece kodun yeniden düzenlenmesine gerek kalmaz. Bu örneği daha da geliştirmek isterseniz, Circuit breaker özelliğini tanıtabilirsiniz ancak bu örnek bu haliyle kullanılabilir.


Örnekte, gerçek akışı göstermek için bir istemci uygulaması ve bir sunucu kullanılmaktadır. Referans amaçlı olarak basılan günlükler olacaktır. Bekleme ve rastgele değerli süreleri ile birlikte, ardışık istekler arasında beklenen toplam süreyi içerir.


İsteğe bağlı olarak yapılandırabilecekleriniz:



Bunları açıkça ayarlamazsanız istemci varsayılan ayarlara geri döner.


Client


package client

import (
"fmt"
"io"
"log"
"math/rand"
"net/http"
"time"
)

type Option func(*transport)

// WithFixedBackoffs accepts set of durations as fixed intervals to increase the
// time between each tries.
// Example:
//
// `time.Second * 1, time.Second * 2, time.Second * 3, time.Second * 4`
//
// This equals to `4, 1, time.Second` with the `WithIncrementalBackoffs` option.
func WithFixedBackoffs(durations ...time.Duration) Option {
return func(tr *transport) {
tr.backoffs = durations
}
}

// WithIncrementalBackoffs accepts set of variables to calculate intervals to
// increase the time between each tries.
// Example:
//
// `3, 2, time.Second`
//
// This equals to `time.Second * 2, time.Second * 4, time.Second * 6` with the
// `WithFixedBackoffs` option.
func WithIncrementalBackoffs(limit int, interval int, duration time.Duration) Option {
return func(tr *transport) {
var j int
for i := interval; ; i += interval {
if j == limit {
break
}
j++

tr.backoffs = append(tr.backoffs, duration*time.Duration(i))
}
}
}

// WithFixedJitters accepts set of durations as fixed intervals to add randomnes
// to each backoffs.
// Example:
//
// `time.Millisecond * 100, time.Millisecond * 200, time.Millisecond * 300`
//
// This equals to `3, 100, time.Millisecond` with the `WithIncrementalJitters`
// option.
func WithFixedJitters(durations ...time.Duration) Option {
return func(tr *transport) {
tr.jitters = durations
}
}

// WithRandomJitters accepts set of variables to pick random jitters to add
// randomnes to each backoffs.
// Example:
//
// `100, 900, 10, time.Millisecond`
//
// This will pick 10 numbers between `100` and `900` and use the as jitters.
func WithRandomJitters(min int, max int, limit int, duration time.Duration) Option {
return func(tr *transport) {
tr.jitters = nil

for i := 0; i < limit; i++ {
jit := rand.Intn(max-min) + min
tr.jitters = append(tr.jitters, duration*time.Duration(jit))
}
}
}

// WithRetryableStatusCodes accepts HTTP status codes that will be retried. If
// not provided, it defaults to common codes which are `408`, `425`, `429`,
// `500`, `502`, `503` and `504`.
func WithRetryableStatusCodes(codes ...int) Option {
return func(tr *transport) {
tr.statuses = make(map[int]struct{})

for _, code := range codes {
tr.statuses[code] = struct{}{}
}
}
}

// New accepts a HTTP client with a transport attached to it and wraps it with
// optionally provided retry logic then returns it as a replacement.
func New(client *http.Client, options ...Option) (*http.Client, error) {
tr := &transport{}

for _, option := range options {
option(tr)
}

if len(tr.jitters) == 0 {
tr.jitters = append(tr.jitters, time.Millisecond*0)
}

if len(tr.statuses) == 0 {
tr.statuses = make(map[int]struct{})
tr.statuses[http.StatusRequestTimeout] = struct{}{}
tr.statuses[http.StatusTooEarly] = struct{}{}
tr.statuses[http.StatusTooManyRequests] = struct{}{}
tr.statuses[http.StatusInternalServerError] = struct{}{}
tr.statuses[http.StatusBadGateway] = struct{}{}
tr.statuses[http.StatusServiceUnavailable] = struct{}{}
tr.statuses[http.StatusGatewayTimeout] = struct{}{}
}

if tr.client == nil {
tr.client = http.DefaultTransport
}

client.Transport = tr

return client, nil
}

type transport struct {
// client represents HTTP client that is used for requests.
client http.RoundTripper

// backoffs increases the time between each tries. If not set, there will not
// be any retry as retry amount equals to the size of this field.
backoffs []time.Duration

// jitters adds random duration to backoffs. If not set, 0 millisecond is
// used. It will be disregarded if the `backoffs` field is not set.
jitters []time.Duration

// statuses defines HTTP status codes that will be retried. If not set, it
// defaults to common values. It will be disregarded if the `backoffs` field
// is not set.
statuses map[int]struct{}
}

func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
log.Println("|||||||||||||||||||")
log.Println("> req", req.Header.Get("id"))

var (
res *http.Response
err error
i int
)

for {
// If there was no error in response and status code was not defined as
// retriable, break immediatelly to return response.
res, err = t.client.RoundTrip(req)
if err == nil {
if _, ok := t.statuses[res.StatusCode]; !ok {
break
}
}

// If single request plus configured backoffs are all tried, break
// immediatelly to return response.
i++
if i == len(t.backoffs)+1 {
break
}

// Free up the memory to prepare for the next try. In case of an error,
// halt operation.
log.Println("> clr", i-1, req.Header.Get("id"))
log.Println("-------------------")
if err := t.discard(res); err != nil {
return nil, err
}

// Before retrying a request, check if the context was cancelled. If so,
// halt operation, otherwise proceed to wait for the next attempt.
select {
case <-req.Context().Done():
return nil, req.Context().Err()
case <-t.wait(i - 1).C:
}

log.Println("> try", i, req.Header.Get("id"))
}

log.Println("-------------------")

return res, err
}

// discard dumps body to release memory before next request attempt.
func (t *transport) discard(res *http.Response) error {
if res == nil {
return nil
}

if _, err := io.Copy(io.Discard, res.Body); err != nil {
return fmt.Errorf("discard body: %w", err)
}

if err := res.Body.Close(); err != nil {
return fmt.Errorf("close body: %w", err)
}

return nil
}

// wait calculatse sleep time before next request attempt.
func (t *transport) wait(i int) *time.Timer {
bof := t.backoffs[i]
jit := t.jitters[rand.Intn(len(t.jitters))]
dur := time.Duration(bof + jit)

log.Println("backof (sec):", bof.Seconds())
log.Println("jitter (mil):", jit.Milliseconds())
log.Println("waiter (sec):", dur.Seconds())

return time.NewTimer(dur)
}

package main

import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"time"

"clientserver/client/client"
)

// Originates from http.DefaultTransport
var baseClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 30 * time.Second,
}

// -----------------------------------------------------------------------------

func main() {
fmt.Println("client")

options := []client.Option{
client.WithIncrementalBackoffs(4, 1, time.Second),
client.WithRandomJitters(100, 900, 10, time.Millisecond),
}

client, err := client.New(baseClient, options...)
if err != nil {
log.Fatalln(err)
}

hnd := handler{
client: client,
tripper: client.Transport,
}

rtr := http.NewServeMux()
rtr.HandleFunc("/", hnd.home)

log.Fatalln(http.ListenAndServe(":5000", rtr))
}

// -----------------------------------------------------------------------------

type handler struct {
client *http.Client
tripper http.RoundTripper
}

func (h handler) home(w http.ResponseWriter, r *http.Request) {
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, "http://0.0.0.0:5001/", nil)
if err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(500)
return
}

req.Header.Add("id", r.Header.Get("id"))

// res, err := h.client.Do(req)
res, err := h.tripper.RoundTrip(req)
if err != nil {
log.Println(err)
w.WriteHeader(500)
w.Write([]byte(err.Error()))
return
}
defer res.Body.Close()

data, err := io.ReadAll(res.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Print("unable to read client response")
return
}

log.Println("res.StatusCode", res.StatusCode)
log.Println("res.Body", string(data))
}

Server


package main

import (
"fmt"
"log"
"math/rand"
"net/http"
)

var funcRandomStatusCode = func() int {
list := []int{
// Non-retriable
200, 201, 202, 204,
// Retriable
408, 425, 429, 500, 502, 503, 504,
}

return list[rand.Intn(len(list))]
}

func main() {
fmt.Println("server")

rtr := http.NewServeMux()
rtr.HandleFunc("/", home)

log.Fatalln(http.ListenAndServe(":5001", rtr))
}

func home(w http.ResponseWriter, r *http.Request) {
code := funcRandomStatusCode()
log.Println(code)
w.WriteHeader(code)
w.Write([]byte(fmt.Sprint(code)))
}

Test


Client en fazla 4 kez deneyecek şekilde yapılandırılmıştır. Son deneme başarılı olmazsa, yanıt gövdesi atılmaz ancak öncekiler atılır.


$ curl http://0.0.0.0:5000/

Başarılı yanıtlı ilk deneme

2024/01/23 17:43:46 |||||||||||||||||||
2024/01/23 17:43:46 > req
2024/01/23 17:43:46 -------------------
2024/01/23 17:43:46 res.StatusCode 204
2024/01/23 17:43:46 res.Body

Üçüncü denemede başarı

2024/01/23 17:44:56 |||||||||||||||||||
2024/01/23 17:44:56 > req
2024/01/23 17:44:56 > clr 0
2024/01/23 17:44:56 -------------------
2024/01/23 17:44:56 backof (sec): 1
2024/01/23 17:44:56 jitter (mil): 789
2024/01/23 17:44:56 waiter (sec): 1.7890000000000001
2024/01/23 17:44:58 > try 1
2024/01/23 17:44:58 > clr 1
2024/01/23 17:44:58 -------------------
2024/01/23 17:44:58 backof (sec): 2
2024/01/23 17:44:58 jitter (mil): 292
2024/01/23 17:44:58 waiter (sec): 2.292
2024/01/23 17:45:00 > try 2
2024/01/23 17:45:00 > clr 2
2024/01/23 17:45:00 -------------------
2024/01/23 17:45:00 backof (sec): 3
2024/01/23 17:45:00 jitter (mil): 671
2024/01/23 17:45:00 waiter (sec): 3.6710000000000003
2024/01/23 17:45:04 > try 3
2024/01/23 17:45:04 -------------------
2024/01/23 17:45:04 res.StatusCode 201
2024/01/23 17:45:04 res.Body

Son denemeden sonra başarısızlık

2024/01/23 17:47:05 |||||||||||||||||||
2024/01/23 17:47:05 > req
2024/01/23 17:47:05 > clr 0
2024/01/23 17:47:05 -------------------
2024/01/23 17:47:05 backof (sec): 1
2024/01/23 17:47:05 jitter (mil): 745
2024/01/23 17:47:05 waiter (sec): 1.745
2024/01/23 17:47:06 > try 1
2024/01/23 17:47:06 > clr 1
2024/01/23 17:47:06 -------------------
2024/01/23 17:47:06 backof (sec): 2
2024/01/23 17:47:06 jitter (mil): 459
2024/01/23 17:47:06 waiter (sec): 2.459
2024/01/23 17:47:09 > try 2
2024/01/23 17:47:09 > clr 2
2024/01/23 17:47:09 -------------------
2024/01/23 17:47:09 backof (sec): 3
2024/01/23 17:47:09 jitter (mil): 359
2024/01/23 17:47:09 waiter (sec): 3.359
2024/01/23 17:47:12 > try 3
2024/01/23 17:47:12 > clr 3
2024/01/23 17:47:12 -------------------
2024/01/23 17:47:12 backof (sec): 4
2024/01/23 17:47:12 jitter (mil): 459
2024/01/23 17:47:12 waiter (sec): 4.459
2024/01/23 17:47:17 > try 4
2024/01/23 17:47:17 -------------------
2024/01/23 17:47:17 res.StatusCode 429
2024/01/23 17:47:17 res.Body 429