Bu örnekte Golang içinde mesaj yayınlamak için bir RabbitMQ üreticisi oluşturacağız. Sadece mesaj yayınlayacağız, bu yüzden burada tüketicilerin katılımı yok. Devam etmeden önce bir şeyi netleştireyim. Hem Yayınla/Abone Ol (Publish/Subscribe) hem de Üretici/Tüketici (Producer/Consumer) terimleri mesajlaşma ile ilgilidir, ancak bunlar farklı şeylerdir. Publish/Subscribe, Producer/Consumer tarafından gerçekleştirilen bir mesajlaşma modelidir. Üretici mesajları yayınlar ve tüketici mesajlara abone olur. Her ikisi de RabbitMQ'ya ait değildir. Bunlar uygulamanızın bir parçasıdır. Yalnızca exchange, yönlendirme/bağlama anahtarları ve kuyruklar RabbitMQ'nun bir parçasıdır.


Bu örnek ne iş yapar?


Burada listelenen her nokta, aşağıdaki "Notlar" bölümünde ilgili bilgilere sahiptir.



Notlar


Bağlantı (veri yarışı)


Beklenmedik olsa da, RabbitMQ.reconnect() metodu (her zaman değil) yalnızca aşağıdaki senaryoların her ikisi de aynı anda oluştuğunda terminalde bir veri yarışı uyarısı verebilir.



Ancak, yalnızca ilk senaryo ile karşı karşıyaysanız, uygulamanızı gerçekten kötü bir şekilde programlamadığınız sürece, aslında bir uygulama düzeyi sorunundan çok, bir tür mimari/ağ/sunucu düzeyinde sorun yaşıyor olabilirsiniz. Bu örnekteki kurulum, birinci senaryoya neden olmamayacaktır. RabbitMQ UI ile bağlantıyı manuel olarak kapatsanız bile, uygulama sizin için yeni bir bağlantı kuracaktır.


Kanal hafıza)


Ben RabbitMQ.reconnect() metodu ile olası veri yarış sorunlarını önlemek için her istek/iş parçacığı başına izole bir kanal ayırıyorum. Ancak, bu Higher Channel Churn'a yol açar (bu sayfayı okumanızı şiddetle tavsiye ederim) ama en azından uygulamanın çökmesini önleriz, böylece bir şekilde bir şeyi seçmeniz gerekir. Ayrıca, iş parçacığı başına kanal açma ve kapatma ne nadirdir ne de korkutucudur. RabbitMQ belgelerinde "İşlem için birden çok iş parçacığı/işlem kullanan uygulamalar için, her iş parçacığı/işlem aralarında kanalları paylaşmak yerine, yeni bir kanal açmak çok daha doğaldır" diye geçiyor ki, bu uygulamada biz de tam bunu yapıyoruz. Bununla birlikte, gerçekten az miktarda bellek kullanır. Bu kategori, belirli bir zamanda tüm kanal işlemleri tarafından kullanılan bellek toplamıdır. Kanıt olarak, RabbitMQ sunucusunda rabbitmq-diagnostics memory_breakdown -q --unit mb komutunu çalıştırırsanız, aşağıdaki gibi bir çıktı görürsünüz. connection_channels RabbitMQ dökümantasyonunda "İstemci bağlantıları ne kadar çok kanal kullanırsa, bu kategori tarafından o kadar fazla bellek kullanılır" olarak açıklanmıştır. Bellek kullanımı hakkında daha fazla bilgi için lütfen Reasoning About Memory Use sayfasını okuyun.


allocated_unused: 31.6314 mb (23.3%)
code: 30.1752 mb (22.22%)
other_proc: 24.6923 mb (18.19%)
other_system: 23.9947 mb (17.67%)
plugins: 12.1899 mb (8.98%)
mgmt_db: 6.467 mb (4.76%)
other_ets: 2.9154 mb (2.15%)
atom: 1.5177 mb (1.12%)
binary: 1.324 mb (0.98%)
metrics: 0.3868 mb (0.28%)
connection_other: 0.1771 mb (0.13%)
mnesia: 0.0802 mb (0.06%)
queue_procs: 0.0552 mb (0.04%)
connection_readers: 0.0517 mb (0.04%)
quorum_ets: 0.0474 mb (0.03%)
msg_index: 0.0297 mb (0.02%)
connection_writers: 0.0246 mb (0.02%)
connection_channels: 0.0219 mb (0.02%)
queue_slave_procs: 0.0 mb (0.0%)
quorum_queue_procs: 0.0 mb (0.0%)
reserved_unallocated: 0.0 mb (0.0%)

Kuyruk (hafıza)


Belleğin aşırı yüklenmesini önlemek için Lazy Queues (Tembel) kullanıyoruz. Tembel kuyruklar iletileri mümkün olduğunca erken diske taşır ve yalnızca tüketiciler tarafından istendiğinde RAM'e yükler. Bu özelliğin birçok avantajı vardır, bu nedenle daha fazla bilgi için lütfen belgeleri okuyun.


Kaynak kullanımı (test)


1000 sürekli istek gönderdiğinizi varsayın. Bu yapılırken, her 5. istekte bağlantı kasıtlı olarak kapatılır. Bu durumda, uygulamanız en fazla 2 ek goroutin (geçici olarak) çalıştırır ve sistem kaynakları üzerinde çok hafif olarak 0,2 ila 2 MB bellek tüketir. Aynı anda ek olarak 1000 istek daha gönderirseniz, yalnızca bellek kullanımı 3MB'a kadar çıkar ve bu hala çok fazla bir şey değildir. Unutmayın, bu kullanım sürekli olmayacak, zaman zaman daha da az olacaktır. Ben 2000 devamlı istek ile zar zor 3MB'ı gördüm.


Yukarıdaki sonuç uygulama ile ilgilidir. Şimdi RabbitMQ'nun nasıl davrandığına bir göz atalım. 100 kullanıcı 10 saniye boyunca sürekli olarak eşzamanlı istek gönderiyor. RabbitMQ kanal kullanımı ortalama 1.5MB ve maksimum ~ 4MB idi. Yine de çok verimli. RabbitMQ sunucusunda rabbitmq-plugins enable rabbitmq_top komutunu kullanarak, bellek ve CPU kullanımlarını analiz etmeye yardımcı olan rabbitmq-top ekini etkinleştirmenizi tavsiye ederim. Bu, "Admin" sekmesinin altına "Top Processes" menü öğesini ekler.


Tekrar Bağlanma


RabbitMQ kullanıcı arayüzüne gidip bir ağ hatasını simüle etmek için bağlantıyı kapatırsanız, uygulamanızın yeniden bağlandığını göreceksiniz. Günlükler aşağıdaki gibi olmalıdır.


2020/05/05 20:48:35 CRITICAL: Connection dropped, reconnecting
2020/05/05 20:48:35 INFO: Recovered connected

Normal bir işlemi simüle etmek için uygulamanızda manuel olarak kapatırsanız, uygulamanızın yeniden bağlanmayacağını görürsünüz. Günlükler aşağıdaki gibi olmalıdır.


2020/05/05 20:46:34 INFO: Connection dropped normally, will not reconnect
2020/05/05 20:46:34 failed to create 5: failed to open channel: Exception (504) Reason: "channel/connection is not open"

Mesaj teslimi onay testleri


Bu bölüm size Channel.NotifyPublish ve Channel.NotifyReturn'un mesaj yayınlamayı nasıl ele aldığını gösterir. İletiyi RabbitMQ'da yayınladıktan sonra kullanıcıya doğru bir yanıt döndürmek istiyorsunuz. Dinleyici olmadan yayınlama özelliği, iletinin iletildiği anlamına gelmez. Bu nedenle, onayı beklememiz gerekiyor.


Onay kodu


...
select {
case ntf := <-channel.NotifyPublish(make(chan amqp.Confirmation, 1)):
if !ntf.Ack {
return errors.New("failed to deliver message to exchange/queue")
}
case <-channel.NotifyReturn(make(chan amqp.Return)):
return errors.New("failed to deliver message to exchange/queue")
case <-time.After(c.rabbitmq.ChannelNotifyTimeout):
log.Println("message delivery confirmation to exchange/queue timed out")
}

return nil
}

Test vakaları


Aşağıdaki liste hangi dinleyicilerin/işleyicilerin hangi durumları yakaladığını göstermektedir.


# EXCHANGE QUEUE CONSUMER RESPONSE DELAY HANDLER       ERROR                               
1 + + + 200 - NotifyPublish n/a
2 - + + 500 - NotifyPublish server failed to receive the message
3 + - + 500 - NotifyReturn server failed to receive the message
4 - - + 500 - NotifyPublish server failed to receive the message
5 + + - 200 - NotifyPublish n/a
6 - + - 500 - NotifyPublish server failed to receive the message
7 + - - 500 - NotifyReturn server failed to receive the message
8 - - - 500 - NotifyReturn server failed to receive the message

+ The component exists/running.
- The component does not exist or not running.

Her iki dinleyici için de ntf değişkenleri dökerseniz, aşağıdaki sonuçları elde edersiniz.


# 1
{DeliveryTag:1 Ack:true}

# 2
{DeliveryTag:0 Ack:false}

# 3
{ReplyCode:312 ReplyText:NO_ROUTE Exchange:user RoutingKey:create ContentType:text/plain ContentEncoding: Headers:map[]
DeliveryMode:2 Priority:0 CorrelationId: ReplyTo: Expiration: MessageId:UUID Timestamp:0001-01-01 00:00:00 +0000 UTC
Type: UserId: AppId: Body:[49]}

# 4
{DeliveryTag:0 Ack:false}

# 5
{DeliveryTag:1 Ack:true}

# 6
{DeliveryTag:0 Ack:false}

# 7
{ReplyCode:312 ReplyText:NO_ROUTE Exchange:user RoutingKey:create ContentType:text/plain ContentEncoding: Headers:map[]
DeliveryMode:2 Priority:0 CorrelationId: ReplyTo: Expiration: MessageId:UUID Timestamp:0001-01-01 00:00:00 +0000 UTC
Type: UserId: AppId: Body:[49]}

# 8
{ReplyCode:0 ReplyText: Exchange: RoutingKey: ContentType: ContentEncoding: Headers:map[] DeliveryMode:0 Priority:0
CorrelationId: ReplyTo: Expiration: MessageId: Timestamp:0001-01-01 00:00:00 +0000 UTC Type: UserId: AppId: Body:[]}

RabbiqMQ sunucusu


İsterseniz aşağıdaki compose dosyasını kullanabilir ve docker-compose up komutuyla çalıştırabilirsiniz. Daha sonra kullanıcı arayüzüne http://0.0.0.0:15672/ adresinden erişebilir ve guest:guest kimlik bilgileriyle giriş yapabilirsiniz.


version: "3.4"

services:
rabbit:
image: "rabbitmq:3.8.3-management-alpine"
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_ERLANG_COOKIE: "a-secret-hash"
RABBITMQ_DEFAULT_VHOST: "my_app"
RABBITMQ_DEFAULT_USER: "inanzzz"
RABBITMQ_DEFAULT_PASS: "123123"

Uygulama


Şimdi uygulamaya geçiyoruz. Önce RabbitMQ istemcisini kurmak için go get github.com/streadway/amqp komutunu kullanın. Örneğimizde aşağıdaki özellikleri kullanılmaktadır.


ExchangeType: direct
ExchangeName: user
RoutingKey: create
QueueName: user_create

Elimizde kullanmak için http://localhost:8080/users/create adresi var. İsterseniz, daha fazla adres ve yukarıdakine benzer kendi RabbitMQ özelliklerini ekleyebilirsiniz.


Yapı


├── cmd
│   └── client
│   └── main.go
└── internal
├── app
│   └── app.go
├── config
│   └── config.go
├── pkg
│   ├── http
│   │   ├── router.go
│   │   └── server.go
│   └── rabbitmq
│   └── rabbitmq.go
└── user
├── amqp.go
└── create.go

Dosyalar


main.go


package main

import (
"log"

"github.com/inanzzz/client/internal/app"
"github.com/inanzzz/client/internal/config"
"github.com/inanzzz/client/internal/pkg/http"
"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/inanzzz/client/internal/user"
)

func main() {
// Config
cnf := config.New()
//

// RabbitMQ
rbt := rabbitmq.New(cnf.RabbitMQ)
if err := rbt.Connect(); err != nil {
log.Fatalln(err)
}
defer rbt.Shutdown()
//

// AMQP setup
userAMQP := user.NewAMQP(cnf.UserAMQP, rbt)
if err := userAMQP.Setup(); err != nil {
log.Fatalln(err)
}
//

// HTTP router
rtr := http.NewRouter()
rtr.RegisterUsers(rbt)
//

// HTTP server
srv := http.NewServer(cnf.HTTPAddress, rtr)
//

// Run
if err := app.New(srv).Run(); err != nil {
log.Fatalln(err)
}
//
}

app.go


package app

import (
"net/http"
)

type App struct {
server *http.Server
}

func New(server *http.Server) App {
return App{
server: server,
}
}

func (a App) Run() error {
return a.server.ListenAndServe()
}

config.go


package config

import (
"time"

"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/inanzzz/client/internal/user"
)

type Config struct {
HTTPAddress string
RabbitMQ rabbitmq.Config
UserAMQP user.AMQPConfig
}

func New() Config {
var cnf Config

cnf.HTTPAddress = ":8080"

cnf.RabbitMQ.Schema = "amqp"
cnf.RabbitMQ.Username = "inanzzz"
cnf.RabbitMQ.Password = "123123"
cnf.RabbitMQ.Host = "0.0.0.0"
cnf.RabbitMQ.Port = "5672"
cnf.RabbitMQ.Vhost = "my_app"
cnf.RabbitMQ.ConnectionName = "MY_APP"
cnf.RabbitMQ.ChannelNotifyTimeout = 100 * time.Millisecond
cnf.RabbitMQ.Reconnect.Interval = 500 * time.Millisecond
cnf.RabbitMQ.Reconnect.MaxAttempt = 7200

cnf.UserAMQP.Create.ExchangeName = "user"
cnf.UserAMQP.Create.ExchangeType = "direct"
cnf.UserAMQP.Create.RoutingKey = "create"
cnf.UserAMQP.Create.QueueName = "user_create"

return cnf
}

router.go


package http

import (
"net/http"

"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/inanzzz/client/internal/user"
)

type Router struct {
*http.ServeMux
}

func NewRouter() *Router {
return &Router{http.NewServeMux()}
}

func (r *Router) RegisterUsers(rabbitmq *rabbitmq.RabbitMQ) {
create := user.NewCreate(rabbitmq)

r.HandleFunc("/users/create", create.Handle)
}

server.go


package http

import (
"net/http"
)

func NewServer(address string, router *Router) *http.Server {
return &http.Server{
Addr: address,
Handler: router,
}
}

rabbitmq.go


package rabbitmq

import (
"fmt"
"log"
"sync"
"time"

"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type Config struct {
Schema string
Username string
Password string
Host string
Port string
Vhost string
ConnectionName string
ChannelNotifyTimeout time.Duration
Reconnect struct {
Interval time.Duration
MaxAttempt int
}
}

type RabbitMQ struct {
mux sync.RWMutex
config Config
dialConfig amqp.Config
connection *amqp.Connection
ChannelNotifyTimeout time.Duration
}

func New(config Config) *RabbitMQ {
return &RabbitMQ{
config: config,
dialConfig: amqp.Config{Properties: amqp.Table{"connection_name": config.ConnectionName}},
ChannelNotifyTimeout: config.ChannelNotifyTimeout,
}
}

// Connect creates a new connection. Use once at application
// startup.
func (r *RabbitMQ) Connect() error {
con, err := amqp.DialConfig(fmt.Sprintf(
"%s://%s:%s@%s:%s/%s",
r.config.Schema,
r.config.Username,
r.config.Password,
r.config.Host,
r.config.Port,
r.config.Vhost,
), r.dialConfig)
if err != nil {
return err
}

r.connection = con

go r.reconnect()

return nil
}

// Channel returns a new `*amqp.Channel` instance. You must
// call `defer channel.Close()` as soon as you obtain one.
// Sometimes the connection might be closed unintentionally so
// as a graceful handling, try to connect only once.
func (r *RabbitMQ) Channel() (*amqp.Channel, error) {
if r.connection == nil {
if err := r.Connect(); err != nil {
return nil, errors.New("connection is not open")
}
}

channel, err := r.connection.Channel()
if err != nil {
return nil, err
}

return channel, nil
}

// Connection exposes the essentials of the current connection.
// You should not normally use this but it is there for special
// use cases.
func (r *RabbitMQ) Connection() *amqp.Connection {
return r.connection
}

// Shutdown triggers a normal shutdown. Use this when you wish
// to shutdown your current connection or if you are shutting
// down the application.
func (r *RabbitMQ) Shutdown() error {
if r.connection != nil {
return r.connection.Close()
}

return nil
}

// reconnect reconnects to server if the connection or a channel
// is closed unexpectedly. Normal shutdown is ignored. It tries
// maximum of 7200 times and sleeps half a second in between
// each try which equals to 1 hour.
func (r *RabbitMQ) reconnect() {
WATCH:

conErr := <-r.connection.NotifyClose(make(chan *amqp.Error))
if conErr != nil {
log.Println("CRITICAL: Connection dropped, reconnecting")

var err error

for i := 1; i <= r.config.Reconnect.MaxAttempt; i++ {
r.mux.RLock()
r.connection, err = amqp.DialConfig(fmt.Sprintf(
"%s://%s:%s@%s:%s/%s",
r.config.Schema,
r.config.Username,
r.config.Password,
r.config.Host,
r.config.Port,
r.config.Vhost,
), r.dialConfig)
r.mux.RUnlock()

if err == nil {
log.Println("INFO: Reconnected")

goto WATCH
}

time.Sleep(r.config.Reconnect.Interval)
}

log.Println(errors.Wrap(err, "CRITICAL: Failed to reconnect"))
} else {
log.Println("INFO: Connection dropped normally, will not reconnect")
}
}

amqp.go


package user

import (
"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type AMQPConfig struct {
Create struct {
ExchangeName string
ExchangeType string
RoutingKey string
QueueName string
}
}

type AMQP struct {
config AMQPConfig
rabbitmq *rabbitmq.RabbitMQ
}

func NewAMQP(config AMQPConfig, rabbitmq *rabbitmq.RabbitMQ) AMQP {
return AMQP{
config: config,
rabbitmq: rabbitmq,
}
}

func (a AMQP) Setup() error {
channel, err := a.rabbitmq.Channel()
if err != nil {
return errors.Wrap(err, "failed to open channel")
}
defer channel.Close()

if err := a.declareCreate(channel); err != nil {
return err
}

return nil
}

func (a AMQP) declareCreate(channel *amqp.Channel) error {
if err := channel.ExchangeDeclare(
a.config.Create.ExchangeName,
a.config.Create.ExchangeType,
true,
false,
false,
false,
nil,
); err != nil {
return errors.Wrap(err, "failed to declare exchange")
}

if _, err := channel.QueueDeclare(
a.config.Create.QueueName,
true,
false,
false,
false,
amqp.Table{"x-queue-mode": "lazy"},
); err != nil {
return errors.Wrap(err, "failed to declare queue")
}

if err := channel.QueueBind(
a.config.Create.QueueName,
a.config.Create.RoutingKey,
a.config.Create.ExchangeName,
false,
nil,
); err != nil {
return errors.Wrap(err, "failed to bind queue")
}

return nil
}

create.go


package user

import (
"fmt"
"log"
"net/http"
"time"

"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type Create struct {
rabbitmq *rabbitmq.RabbitMQ
}

func NewCreate(rabbitmq *rabbitmq.RabbitMQ) Create {
return Create{
rabbitmq: rabbitmq,
}
}

func (c Create) Handle(w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("ID")

if err := c.publish(id); err != nil {
log.Println(errors.Wrap(err, fmt.Sprintf("failed to create %s", id)))
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Println("created", id)
w.WriteHeader(http.StatusAccepted)
}

func (c Create) publish(message string) error {
channel, err := c.rabbitmq.Channel()
if err != nil {
return errors.Wrap(err, "failed to open channel")
}
defer channel.Close()

if err := channel.Confirm(false); err != nil {
return errors.Wrap(err, "failed to put channel in confirmation mode")
}

if err := channel.Publish(
"user",
"create",
true,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
MessageId: "A-UNIQUE-ID",
ContentType: "text/plain",
Body: []byte(message),
},
); err != nil {
return errors.Wrap(err, "failed to publish message")
}

select {
case ntf := <-channel.NotifyPublish(make(chan amqp.Confirmation, 1)):
if !ntf.Ack {
return errors.New("failed to deliver message to exchange/queue")
}
case <-channel.NotifyReturn(make(chan amqp.Return)):
return errors.New("failed to deliver message to exchange/queue")
case <-time.After(c.rabbitmq.ChannelNotifyTimeout):
log.Println("message delivery confirmation to exchange/queue timed out")
}

return nil
}

Örnekler