In this example we are going to create a RabbitMQ producer to publish messages within Golang. We will just be publishing messages so there is no involvement of consumers here. Let me just make something clear before going ahead. Both Publish/Subscribe and Producer/Consumer terms are relevant to messaging but they are different things. Publish/Subscribe is a messaging pattern which are performed by Producer/Consumer. Producer publishes messages and Consumer subscribes to the messages. Both of them don't belong to RabbitMQ. They are part of your application. Only the exchange, routing/binding keys and queues are part of RabbitMQ.


What does it do?


Each point listed here has relevant information in "Notes" section below.



Notes


Connection (data race)


Although it is unlikely, the RabbitMQ.reconnect() method might (not always) dump a data race warning in terminal only if both scenarios below are satisfied at same time.



However, if you are facing the first scenario alone, you might actually have some kind of architectural/network/server level problem than an application level problem unless you really programmed your application badly! The setup in this example shouldn't cause scenario one at all. Even if you manually close the connection in RabbitMQ UI, the application will establish a new one for you.


Channel (memory)


I am dedicating an isolated channel per request/thread to prevent possible data race issues within RabbitMQ.reconnect() method. However, this leads to Higher Channel Churn (I highly advice you to read this page) but at least it prevents application from going down so you have to sacrifice one way or another. Also, opening and closing channels per thread is neither uncommon nor scary. RabbitMQ documentation reads "For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them" which is what this application is doing. Along with that, it uses really small amount of memory. This category is the sum of memory used by all channel processes at any given time. As a proof, if you run rabbitmq-diagnostics memory_breakdown -q --unit mb command in RabbitMQ server, you will see something like below. The official description of connection_channels is "The more channels client connections use, the more memory will be used by this category". For more information about memory usage please read Reasoning About Memory Use.


allocated_unused: 31.6314 mb (23.3%)
code: 30.1752 mb (22.22%)
other_proc: 24.6923 mb (18.19%)
other_system: 23.9947 mb (17.67%)
plugins: 12.1899 mb (8.98%)
mgmt_db: 6.467 mb (4.76%)
other_ets: 2.9154 mb (2.15%)
atom: 1.5177 mb (1.12%)
binary: 1.324 mb (0.98%)
metrics: 0.3868 mb (0.28%)
connection_other: 0.1771 mb (0.13%)
mnesia: 0.0802 mb (0.06%)
queue_procs: 0.0552 mb (0.04%)
connection_readers: 0.0517 mb (0.04%)
quorum_ets: 0.0474 mb (0.03%)
msg_index: 0.0297 mb (0.02%)
connection_writers: 0.0246 mb (0.02%)
connection_channels: 0.0219 mb (0.02%)
queue_slave_procs: 0.0 mb (0.0%)
quorum_queue_procs: 0.0 mb (0.0%)
reserved_unallocated: 0.0 mb (0.0%)

Queue (memory)


We are using Lazy Queues to avoid overloading the memory. Lazy queues move messages to disk as early as possible and only load them in RAM when requested by consumers. This feature comes with a lot of benefits so please read the documentation for more information.


Resource usage (test)


Assuming that you send 1000 continuous requests. While this is done the connection is closed for every 5th request on purpose. In such case, your application will bring up max of 2 additional goroutines (temporarily) and consume between 0.2 to 2MB of memory which is very lightweight on system resources. If you send another 1000 continuous requests at same time, only memory usage goes up to 3MB which is still nothing much. Remember, this usage won't be constant so time to time it will be even less. I barely experienced 3MB for 2000 continues requests.


The outcome above was related to the application. Now let's have a look how RabbitMQ behaves. I've had 100 users sending concurrent requests continuously for 10 seconds. RabbitMQ channel usage was average 1.5MB and maximum ~4MB. Still very efficient. I highly recommend running rabbitmq-plugins enable rabbitmq_top command in RabbitMQ server to enable rabbitmq-top plugin that helps analysing runtime processes that consume most memory or CPU time. This will add "Top Processes" menu item under the "Admin" tab.


Reconnection


If you go to RabbitMQ UI and close the connection to simulate a network error, you will see that your application reconnects. The logs should look like below.


2020/05/05 20:48:35 CRITICAL: Connection dropped, reconnecting
2020/05/05 20:48:35 INFO: Recovered connected

If you manually close it in your application to simulate a normal operation, you will see that your application won't reconnect. The logs should look like below.


2020/05/05 20:46:34 INFO: Connection dropped normally, will not reconnect
2020/05/05 20:46:34 failed to create 5: failed to open channel: Exception (504) Reason: "channel/connection is not open"

Message delivery confirmation tests


This part shows you how Channel.NotifyPublish and Channel.NotifyReturn handles message publishing. You just want to return a correct response back to user after publishing the message to RabbitMQ. Publishing feature without a listener doesn't necessarily mean that the message has been delivered. Hence reason, we need to wait for the confirmation.


Confirmation code


...
select {
case ntf := <-channel.NotifyPublish(make(chan amqp.Confirmation, 1)):
if !ntf.Ack {
return errors.New("failed to deliver message to exchange/queue")
}
case <-channel.NotifyReturn(make(chan amqp.Return)):
return errors.New("failed to deliver message to exchange/queue")
case <-time.After(c.rabbitmq.ChannelNotifyTimeout):
log.Println("message delivery confirmation to exchange/queue timed out")
}

return nil
}

Test cases


The list below shows which listeners/handlers catch what cases.


# EXCHANGE QUEUE CONSUMER RESPONSE DELAY HANDLER       ERROR                               
1 + + + 200 - NotifyPublish n/a
2 - + + 500 - NotifyPublish server failed to receive the message
3 + - + 500 - NotifyReturn server failed to receive the message
4 - - + 500 - NotifyPublish server failed to receive the message
5 + + - 200 - NotifyPublish n/a
6 - + - 500 - NotifyPublish server failed to receive the message
7 + - - 500 - NotifyReturn server failed to receive the message
8 - - - 500 - NotifyReturn server failed to receive the message

+ The component exists/running.
- The component does not exist or not running.

If you dumped ntf variables for both listeners, you would get results below.


# 1
{DeliveryTag:1 Ack:true}

# 2
{DeliveryTag:0 Ack:false}

# 3
{ReplyCode:312 ReplyText:NO_ROUTE Exchange:user RoutingKey:create ContentType:text/plain ContentEncoding: Headers:map[]
DeliveryMode:2 Priority:0 CorrelationId: ReplyTo: Expiration: MessageId:UUID Timestamp:0001-01-01 00:00:00 +0000 UTC
Type: UserId: AppId: Body:[49]}

# 4
{DeliveryTag:0 Ack:false}

# 5
{DeliveryTag:1 Ack:true}

# 6
{DeliveryTag:0 Ack:false}

# 7
{ReplyCode:312 ReplyText:NO_ROUTE Exchange:user RoutingKey:create ContentType:text/plain ContentEncoding: Headers:map[]
DeliveryMode:2 Priority:0 CorrelationId: ReplyTo: Expiration: MessageId:UUID Timestamp:0001-01-01 00:00:00 +0000 UTC
Type: UserId: AppId: Body:[49]}

# 8
{ReplyCode:0 ReplyText: Exchange: RoutingKey: ContentType: ContentEncoding: Headers:map[] DeliveryMode:0 Priority:0
CorrelationId: ReplyTo: Expiration: MessageId: Timestamp:0001-01-01 00:00:00 +0000 UTC Type: UserId: AppId: Body:[]}

RabbiqMQ server


You can use compose file below if you wish and run it with docker-compose up command. You can then access UI via http://0.0.0.0:15672/ address and login with guest:guest credentials.


version: "3.4"

services:
rabbit:
image: "rabbitmq:3.8.3-management-alpine"
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_ERLANG_COOKIE: "a-secret-hash"
RABBITMQ_DEFAULT_VHOST: "my_app"
RABBITMQ_DEFAULT_USER: "inanzzz"
RABBITMQ_DEFAULT_PASS: "123123"

Application


We are moving on to the application now. Use go get github.com/streadway/amqp to install RabbitMQ client first. Our example uses properties below.


ExchangeType: direct
ExchangeName: user
RoutingKey: create
QueueName: user_create

We have an endpoint http://localhost:8080/users/create which will be used. If you wanted, you could add more endpoints and their own RabbitMQ properties similar to above.


Structure


├── cmd
│   └── client
│   └── main.go
└── internal
├── app
│   └── app.go
├── config
│   └── config.go
├── pkg
│   ├── http
│   │   ├── router.go
│   │   └── server.go
│   └── rabbitmq
│   └── rabbitmq.go
└── user
├── amqp.go
└── create.go

Files


main.go


package main

import (
"log"

"github.com/inanzzz/client/internal/app"
"github.com/inanzzz/client/internal/config"
"github.com/inanzzz/client/internal/pkg/http"
"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/inanzzz/client/internal/user"
)

func main() {
// Config
cnf := config.New()
//

// RabbitMQ
rbt := rabbitmq.New(cnf.RabbitMQ)
if err := rbt.Connect(); err != nil {
log.Fatalln(err)
}
defer rbt.Shutdown()
//

// AMQP setup
userAMQP := user.NewAMQP(cnf.UserAMQP, rbt)
if err := userAMQP.Setup(); err != nil {
log.Fatalln(err)
}
//

// HTTP router
rtr := http.NewRouter()
rtr.RegisterUsers(rbt)
//

// HTTP server
srv := http.NewServer(cnf.HTTPAddress, rtr)
//

// Run
if err := app.New(srv).Run(); err != nil {
log.Fatalln(err)
}
//
}

app.go


package app

import (
"net/http"
)

type App struct {
server *http.Server
}

func New(server *http.Server) App {
return App{
server: server,
}
}

func (a App) Run() error {
return a.server.ListenAndServe()
}

config.go


package config

import (
"time"

"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/inanzzz/client/internal/user"
)

type Config struct {
HTTPAddress string
RabbitMQ rabbitmq.Config
UserAMQP user.AMQPConfig
}

func New() Config {
var cnf Config

cnf.HTTPAddress = ":8080"

cnf.RabbitMQ.Schema = "amqp"
cnf.RabbitMQ.Username = "inanzzz"
cnf.RabbitMQ.Password = "123123"
cnf.RabbitMQ.Host = "0.0.0.0"
cnf.RabbitMQ.Port = "5672"
cnf.RabbitMQ.Vhost = "my_app"
cnf.RabbitMQ.ConnectionName = "MY_APP"
cnf.RabbitMQ.ChannelNotifyTimeout = 100 * time.Millisecond
cnf.RabbitMQ.Reconnect.Interval = 500 * time.Millisecond
cnf.RabbitMQ.Reconnect.MaxAttempt = 7200

cnf.UserAMQP.Create.ExchangeName = "user"
cnf.UserAMQP.Create.ExchangeType = "direct"
cnf.UserAMQP.Create.RoutingKey = "create"
cnf.UserAMQP.Create.QueueName = "user_create"

return cnf
}

router.go


package http

import (
"net/http"

"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/inanzzz/client/internal/user"
)

type Router struct {
*http.ServeMux
}

func NewRouter() *Router {
return &Router{http.NewServeMux()}
}

func (r *Router) RegisterUsers(rabbitmq *rabbitmq.RabbitMQ) {
create := user.NewCreate(rabbitmq)

r.HandleFunc("/users/create", create.Handle)
}

server.go


package http

import (
"net/http"
)

func NewServer(address string, router *Router) *http.Server {
return &http.Server{
Addr: address,
Handler: router,
}
}

rabbitmq.go


package rabbitmq

import (
"fmt"
"log"
"sync"
"time"

"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type Config struct {
Schema string
Username string
Password string
Host string
Port string
Vhost string
ConnectionName string
ChannelNotifyTimeout time.Duration
Reconnect struct {
Interval time.Duration
MaxAttempt int
}
}

type RabbitMQ struct {
mux sync.RWMutex
config Config
dialConfig amqp.Config
connection *amqp.Connection
ChannelNotifyTimeout time.Duration
}

func New(config Config) *RabbitMQ {
return &RabbitMQ{
config: config,
dialConfig: amqp.Config{Properties: amqp.Table{"connection_name": config.ConnectionName}},
ChannelNotifyTimeout: config.ChannelNotifyTimeout,
}
}

// Connect creates a new connection. Use once at application
// startup.
func (r *RabbitMQ) Connect() error {
con, err := amqp.DialConfig(fmt.Sprintf(
"%s://%s:%s@%s:%s/%s",
r.config.Schema,
r.config.Username,
r.config.Password,
r.config.Host,
r.config.Port,
r.config.Vhost,
), r.dialConfig)
if err != nil {
return err
}

r.connection = con

go r.reconnect()

return nil
}

// Channel returns a new `*amqp.Channel` instance. You must
// call `defer channel.Close()` as soon as you obtain one.
// Sometimes the connection might be closed unintentionally so
// as a graceful handling, try to connect only once.
func (r *RabbitMQ) Channel() (*amqp.Channel, error) {
if r.connection == nil {
if err := r.Connect(); err != nil {
return nil, errors.New("connection is not open")
}
}

channel, err := r.connection.Channel()
if err != nil {
return nil, err
}

return channel, nil
}

// Connection exposes the essentials of the current connection.
// You should not normally use this but it is there for special
// use cases.
func (r *RabbitMQ) Connection() *amqp.Connection {
return r.connection
}

// Shutdown triggers a normal shutdown. Use this when you wish
// to shutdown your current connection or if you are shutting
// down the application.
func (r *RabbitMQ) Shutdown() error {
if r.connection != nil {
return r.connection.Close()
}

return nil
}

// reconnect reconnects to server if the connection or a channel
// is closed unexpectedly. Normal shutdown is ignored. It tries
// maximum of 7200 times and sleeps half a second in between
// each try which equals to 1 hour.
func (r *RabbitMQ) reconnect() {
WATCH:

conErr := <-r.connection.NotifyClose(make(chan *amqp.Error))
if conErr != nil {
log.Println("CRITICAL: Connection dropped, reconnecting")

var err error

for i := 1; i <= r.config.Reconnect.MaxAttempt; i++ {
r.mux.RLock()
r.connection, err = amqp.DialConfig(fmt.Sprintf(
"%s://%s:%s@%s:%s/%s",
r.config.Schema,
r.config.Username,
r.config.Password,
r.config.Host,
r.config.Port,
r.config.Vhost,
), r.dialConfig)
r.mux.RUnlock()

if err == nil {
log.Println("INFO: Reconnected")

goto WATCH
}

time.Sleep(r.config.Reconnect.Interval)
}

log.Println(errors.Wrap(err, "CRITICAL: Failed to reconnect"))
} else {
log.Println("INFO: Connection dropped normally, will not reconnect")
}
}

amqp.go


package user

import (
"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type AMQPConfig struct {
Create struct {
ExchangeName string
ExchangeType string
RoutingKey string
QueueName string
}
}

type AMQP struct {
config AMQPConfig
rabbitmq *rabbitmq.RabbitMQ
}

func NewAMQP(config AMQPConfig, rabbitmq *rabbitmq.RabbitMQ) AMQP {
return AMQP{
config: config,
rabbitmq: rabbitmq,
}
}

func (a AMQP) Setup() error {
channel, err := a.rabbitmq.Channel()
if err != nil {
return errors.Wrap(err, "failed to open channel")
}
defer channel.Close()

if err := a.declareCreate(channel); err != nil {
return err
}

return nil
}

func (a AMQP) declareCreate(channel *amqp.Channel) error {
if err := channel.ExchangeDeclare(
a.config.Create.ExchangeName,
a.config.Create.ExchangeType,
true,
false,
false,
false,
nil,
); err != nil {
return errors.Wrap(err, "failed to declare exchange")
}

if _, err := channel.QueueDeclare(
a.config.Create.QueueName,
true,
false,
false,
false,
amqp.Table{"x-queue-mode": "lazy"},
); err != nil {
return errors.Wrap(err, "failed to declare queue")
}

if err := channel.QueueBind(
a.config.Create.QueueName,
a.config.Create.RoutingKey,
a.config.Create.ExchangeName,
false,
nil,
); err != nil {
return errors.Wrap(err, "failed to bind queue")
}

return nil
}

create.go


package user

import (
"fmt"
"log"
"net/http"
"time"

"github.com/inanzzz/client/internal/pkg/rabbitmq"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)

type Create struct {
rabbitmq *rabbitmq.RabbitMQ
}

func NewCreate(rabbitmq *rabbitmq.RabbitMQ) Create {
return Create{
rabbitmq: rabbitmq,
}
}

func (c Create) Handle(w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("ID")

if err := c.publish(id); err != nil {
log.Println(errors.Wrap(err, fmt.Sprintf("failed to create %s", id)))
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Println("created", id)
w.WriteHeader(http.StatusAccepted)
}

func (c Create) publish(message string) error {
channel, err := c.rabbitmq.Channel()
if err != nil {
return errors.Wrap(err, "failed to open channel")
}
defer channel.Close()

if err := channel.Confirm(false); err != nil {
return errors.Wrap(err, "failed to put channel in confirmation mode")
}

if err := channel.Publish(
"user",
"create",
true,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
MessageId: "A-UNIQUE-ID",
ContentType: "text/plain",
Body: []byte(message),
},
); err != nil {
return errors.Wrap(err, "failed to publish message")
}

select {
case ntf := <-channel.NotifyPublish(make(chan amqp.Confirmation, 1)):
if !ntf.Ack {
return errors.New("failed to deliver message to exchange/queue")
}
case <-channel.NotifyReturn(make(chan amqp.Return)):
return errors.New("failed to deliver message to exchange/queue")
case <-time.After(c.rabbitmq.ChannelNotifyTimeout):
log.Println("message delivery confirmation to exchange/queue timed out")
}

return nil
}

Examples