27/08/2020 - GO, RABBITMQ
Bu örnekte, Golang için bir RabbitMQ paketi oluşturacağız. Daha sonra bunu üretici/yayıncı ve tüketici/abone uygulamalarımızda kullanacağız. Paket, aşağıda listelenen önemli özelliklerle birlikte gelir, bu nedenle bunları kendiniz halletme konusunda endişelenmemelisiniz.
Bağlantılar, uygulamanız tarafından (veya tanımladığınız her neyse) adlandırılır, böylece hangi bağlantının hangi uygulamaya atandığını bilirsiniz. Bununla birlikte, tüketici çalışanlarınız da isimlendirilir. Tüm bunları kullanıcı arayüzünde onaylayabilirsiniz.
package rabbitmq
import "time"
type ConnectionConfig struct {
// Name is used to name the connection. It provides visual cues to which
// connection belongs to which application in Management Plugin. Optional.
Name string
// Schema segment of the AMQP URI string. Required.
Schema string
// Username segment of the AMQP URI string. Optional.
Username string
// Password segment of the AMQP URI string. Optional.
Password string
// Host segment of the AMQP URI string. Required.
Host string
// Port segment of the AMQP URI string. Required.
Port string
// VHost segment of the AMQP URI string. Optional.
VHost string
// ReconAttempt is used to define maximum amount of reconnection attempts.
// If set to `0` attempts will be infinite. Optional.
ReconAttempt int
// ReconInterval defines the equal intervals between each reconnection
// attempts. Required.
ReconInterval time.Duration
// CACert represents Certificate Authority (CA) certificate. Optional.
CACert []byte
// ClientCert represents Client certificate. Optional.
ClientCert []byte
// ClientCert represents Client key. Optional.
ClientKey []byte
package rabbitmq
// ExchangeConfig is used by both producer and consumer applications.
type ExchangeConfig struct {
// Name defines the name of the exchange. Required.
Name string
// Type defines the type of the exchange. Required.
Type string
package rabbitmq
// QueueConfig is dedicated to consumer applications, not the producers.
type QueueConfig struct {
// Name defines the name of the queue. Required.
Name string
// Binding defines the relationship between an exchange and a queue.
// Optional.
// It is often used to refer to "routing key". Not required for the `fanout`
// exchange types.
Binding string
// Exchange defines the name of the exchange that needs to be used for
// message consumption. Required.
// This must match producer config value.
Exchange string
// Mode defines what type of queue shall be used. Required.
// This has an impact on the performance. Prefer `lazy` over `default`
// unless you have a very reasonable case. Read reference below before
// taking a decision.
// Ref: https://www.rabbitmq.com/lazy-queues.html
Mode string
// DLX is dedicated to the "dead-lettered" messages and represents the name
// of the exchange that was declared previously. Optional.
// If the consumer does not/will never require a DLX feature, skip this
// option. Late declaration of a DLX and programmatically using for an
// existing queue is not possible. However, you can manually achieve this
// which is not always wise as it is very error prone and tedious job.
// If the consumer requires a DLX feature, setup an exchange and queue
// beforehand then use its name here. A DLX does not require its own
// consumer upfront. It can be delivered when you know how to handle
// "dead-lettered" messages.
DLX string
package rabbitmq
type LogLevel string
const (
LevelDebug LogLevel = "debug"
LevelInfo = "info"
LevelWarning = "warning"
LevelError = "error"
// Log is used to help select what level logs the application wants to log or
// ignore. Logs are streamed via the `Server.logChan` field which is an optional
// argument. It is provided when calling the `NewServer()` method.
type Log struct {
Level LogLevel
Message string
package rabbitmq
type ProducerConfig struct {
// Name is used to reserve channels per producers. Hence the reason, each
// producer must have an unique name. Required.
Name string
// ExchangeName defines the name of the exchange that needs to be used for
// message publishing. Required.
// This must match consumer config value.
ExchangeName string
// RoutingKey is defined on the message. Optional.
// When the message is published, it ends up in a queue whose binding key
// matches to the routing key. Not required for the `fanout` exchange types.
RoutingKey string
// All producers must implement this interface.
type Producer interface {
Produce(messageID string, message []byte, data interface{}) error
package rabbitmq
import "github.com/streadway/amqp"
type ConsumerConfig struct {
// Name is used to name the consumer workers. It provides visual cues to
// which channel is used by which consumer/worker in Management Plugin. It
// also appears in the logs. Required.
Name string
// WorkerCount helps running given amount of workers for the consumer.
// Required.
// This has a high impact on the performance. The performance also has a
// direct relationship with the `PrefetchCount` option. If you have a fairly
// busy queue, avoid setting it to `1`. Also avoid setting it to very high
// because the more workers, the more work the broker has to do to keep
// track of them. Read reference below before taking a decision.
// Ref: https://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
// 1 -> n receiving rate vs consumer count / prefetch count
WorkerCount int
// PrefetchCount helps defining how many messages should be delivered to a
// consumer before acknowledgments are received. Optional.
// This has a high impact on the performance. The performance also has a
// direct relationship with the `WorkerCount` option below. Unless you have
// a fairly quiet queue, avoid setting it to `1`. Read reference below
// before taking a decision. Optional.
// Ref: https://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
// n -> 0 sending bytes rate vs number of producers, for various message sizes
// 1 -> n receiving rate vs consumer count / prefetch count
PrefetchCount int
// All consumers must implement this interface.
type Consumer interface {
Config() ConsumerConfig
Consume(messages <-chan amqp.Delivery, workerID int)
package rabbitmq
import (
type Server struct {
mutex *sync.RWMutex
conn *amqp.Connection
config ConnectionConfig
logChan chan Log
consumers []Consumer
channels map[string]*amqp.Channel
// New returns `Server` pointer type with an live AMQP connection attached to
// it.
// The optional `logChan` argument helps you get back package level logs. If it
// is to be utilised, you must use an unbuffered `Log` channel and read from it
// right after creating it. Failing to read will prevent the reconnection
// feature from establishing a new connection and possible unexpected issues.
func NewServer(config ConnectionConfig, logChan chan Log) (*Server, error) {
if config.ReconInterval == 0 {
return nil, fmt.Errorf("reconnection interval must be above 0")
srv := &Server{
mutex: &sync.RWMutex{},
config: config,
logChan: logChan,
channels: make(map[string]*amqp.Channel),
if err := srv.connect(); err != nil {
return nil, err
return srv, nil
// Shutdown closes the AMQP connection.
func (s *Server) Shutdown() error {
if s.conn != nil {
if err := s.conn.Close(); err != nil {
return fmt.Errorf("shutdown: %w", err)
return nil
// Setup declares all the necessary components of the broker that is needed for
// producers and consumers.
func (s *Server) Setup(exchanges []ExchangeConfig, queues []QueueConfig) error {
chn, err := s.conn.Channel()
if err != nil {
return fmt.Errorf("setup: %w", err)
defer chn.Close()
for _, exchange := range exchanges {
if err := chn.ExchangeDeclare(
); err != nil {
return fmt.Errorf("setup: exchange declare: %w", err)
for _, queue := range queues {
args := amqp.Table{"x-queue-mode": queue.Mode}
if queue.DLX != "" {
args["x-dead-letter-exchange"] = queue.DLX
if _, err := chn.QueueDeclare(
); err != nil {
return fmt.Errorf("setup: queue declare: %w", err)
if err := chn.QueueBind(
); err != nil {
return fmt.Errorf("setup: queue bind: %w", err)
return nil
// RegisterConsumers first registers all consumers and then runs their workers.
func (s *Server) RegisterConsumers(consumers []Consumer) error {
s.consumers = consumers
if err := s.runConsumerWorkers(s.consumers); err != nil {
return fmt.Errorf("register consumers: %w", err)
return nil
// PublishOnNewChannel publishes a message on a new channel.
// Every time this method is called a new channel is opened and closed right
// after the use. This has a negative impact on the application performance.
// The advantage of using a new channel for each publishing is that, it allows
// message delivery confirmation. It is possible for published message to not
// reach the exchange, queue or the server for any reason. The lack of an error
// on the publishing does not necessarily mean that the server has received the
// published message either.
// The disadvantage is obviously a new channel is created for each publishing
// and closed right after the use. This will result in considerably slower
// operations and higher usage of system resources such as high channel churn.
// The disadvantage becomes reality if it was used by fairly busy producers.
// If the message delivery confirmation is a "must have" feature for your use
// case you have no other choice but use this method. Otherwise always prefer
// the `PublishOnReservedChannel()` method.
// Ref: https://www.rabbitmq.com/channels.html
// High Channel Churn
func (s *Server) PublishOnNewChannel(publishing amqp.Publishing, config ProducerConfig) error {
defer s.mutex.RUnlock()
chn, err := s.conn.Channel()
if err != nil {
return fmt.Errorf("publish on new channel: get channel: %w", err)
defer chn.Close()
if err := chn.Confirm(false); err != nil {
return fmt.Errorf("publish on new channel: confirm mode: %w", err)
err = chn.Publish(config.ExchangeName, config.RoutingKey, true, false, publishing)
if err != nil {
return fmt.Errorf("publish on new channel: publish: %w", err)
select {
case ntf := <-chn.NotifyPublish(make(chan amqp.Confirmation, 1)):
if !ntf.Ack {
return errors.New("publish on new channel: failed to confirm publishing")
case <-chn.NotifyReturn(make(chan amqp.Return)):
return errors.New("publish on new channel: failed to route publishing")
return nil
// PublishOnReservedChannel publishes a message on previously reserved channel
// on behalf of the producers.
// The reserved channels are not closed as they are meant to be long-lived and
// reused for multiple publishing. This has a positive impact on the application
// performance.
// The advantage of using a reserved channel is that, each producer uses its own
// reserved channel for each publishing. This will result in considerably
// faster operations and less usage of system resources such as low channel
// churn.
// The disadvantage is that, it will not allow message delivery confirmation. If
// you want to what we mean by the message delivery confirmation, please read
// the `PublishOnNewChannel()` method.
// If the message delivery confirmation is not important for your use case,
// always prefer this method over `PublishOnNewChannel()` method.
// Ref: https://www.rabbitmq.com/channels.html
// High Channel Churn
func (s *Server) PublishOnReservedChannel(publishing amqp.Publishing, config ProducerConfig) error {
chn, err := s.reservedChannel(config.Name)
if err != nil {
return fmt.Errorf("publish on reserved channel: %w", err)
err = chn.Publish(config.ExchangeName, config.RoutingKey, false, false, publishing)
if err != nil {
return fmt.Errorf("publish on reserved channel: publish: %w", err)
return nil
// reservedChannel returns an existing channel for a producer.
// If the given producer name does not yet have an channel exist in the reserved
// channel pool, a new channel is created and reserved for later use.
// All the reserved channels have a channel listeners `producerChannelListener`
// attached to them so if the channel is closed for any given reason, the
// listener calls this method in order to recreate one.
func (s *Server) reservedChannel(producerName string) (*amqp.Channel, error) {
defer s.mutex.Unlock()
if chn, ok := s.channels[producerName]; ok {
return chn, nil
chn, err := s.conn.Channel()
if err != nil {
return nil, fmt.Errorf("reserved channel: get channel: %w", err)
s.channels[producerName] = chn
go s.producerChannelListener(chn, producerName)
return chn, nil
// runConsumerWorkers runs all the workers that are linked to the given
// consumers.
// Each individual consumer gets its own dedicated channel and this channel is
// shared between all its workers. e.g., there are two consumers and each have
// two workers attached to them. We would have a total of two open channels in
// the broker. Given that the workers are run as goroutines and the goroutines
// are not threads, there is no point of using a channel per worker.
// All the channels have a channel listeners `consumerChannelListener`
// attached to them so if the channel is closed for any given reason, the
// listener calls `runConsumerWorkers()` method in order to rerun all the
// workers of the consumer.
// Ref: https://www.rabbitmq.com/api-guide.html
// Channels and Concurrency Considerations (Thread Safety)
func (s *Server) runConsumerWorkers(consumers []Consumer) error {
op := "run consumer workers"
for _, consumer := range consumers {
consumer := consumer
config := consumer.Config()
chn, err := s.conn.Channel()
if err != nil {
return fmt.Errorf("%s: get channel: %s: %w", op, config.Name, err)
if err := chn.Qos(config.PrefetchCount, 0, false); err != nil {
return fmt.Errorf("%s: qos channel: %s: %w", op, config.Name, err)
if config.WorkerCount < 1 {
return fmt.Errorf("%s: insufficient worker count: %s-%d/0", op, config.Name, config.WorkerCount)
go s.consumerChannelListener(chn, consumer)
for i := 1; i <= config.WorkerCount; i++ {
i := i
go func() {
messages, err := chn.Consume(
fmt.Sprintf("%s (%d/%d)", config.Name, i, config.WorkerCount),
if err != nil {
Level: LevelError,
Message: fmt.Sprintf(
"%s: consume channel: %s-%d/%d: %v", op, config.Name, i, config.WorkerCount, err,
Level: LevelInfo,
Message: fmt.Sprintf("%s: run %s-%d/%d", op, config.Name, i, config.WorkerCount),
consumer.Consume(messages, i)
Level: LevelInfo,
Message: fmt.Sprintf("%s: stop %s-%d/%d", op, config.Name, i, config.WorkerCount),
return nil
// connect establishes a new connection based on the required schema.
func (s *Server) connect() error {
if s.config.Schema == "amqp" {
return s.connectAMQP()
return s.connectAMQPS()
// connectAMQP establishes a new AMQP connection only if there is not one at the
// application bootstrap.
// However, this method will also be called by the `connectionListener()` method
// behind the scene as many times as required when the connection goes down.
// Hence the reason why it is also responsible for rerunning the consumer
// workers. Otherwise, workers would not be up and running after reconnection.
func (s *Server) connectAMQP() error {
defer s.mutex.Unlock()
var (
err error
url string
if s.config.Username != "" || s.config.Password != "" {
url = fmt.Sprintf(
} else {
url = fmt.Sprintf(
s.conn, err = amqp.DialConfig(url,
Properties: amqp.Table{"connection_name": s.config.Name},
if err != nil {
return fmt.Errorf("connect amqp: %w", err)
if err := s.runConsumerWorkers(s.consumers); err != nil {
return fmt.Errorf("connect amqp: %w", err)
go s.connectionListener()
return nil
// connectAMQPS establishes a new AMQPS connection only if there is not one at
// the application bootstrap.
// However, this method will also be called by the `connectionListener()` method
// behind the scene as many times as required when the connection goes down.
// Hence the reason why it is also responsible for rerunning the consumer
// workers. Otherwise, workers would not be up and running after reconnection.
func (s *Server) connectAMQPS() error {
defer s.mutex.Unlock()
tlsCnf := &tls.Config{}
if s.config.CACert != nil {
tlsCnf.RootCAs = x509.NewCertPool()
if cert, err := tls.X509KeyPair(s.config.ClientCert, s.config.ClientKey); err == nil {
tlsCnf.Certificates = append(tlsCnf.Certificates, cert)
var (
err error
url string
if s.config.Username != "" || s.config.Password != "" {
url = fmt.Sprintf(
} else {
url = fmt.Sprintf(
s.conn, err = amqp.DialConfig(url,
Properties: amqp.Table{"connection_name": s.config.Name},
TLSClientConfig: tlsCnf,
if err != nil {
return fmt.Errorf("connect amqps: %w", err)
if err := s.runConsumerWorkers(s.consumers); err != nil {
return fmt.Errorf("connect amqps: %w", err)
go s.connectionListener()
return nil
// connectionListener listens on the closed connection notifications and
// attempts to reestablish a new connection by calling the `connect()` method.
// However, if the connection was closed explicitly, nothing shall be done.
// Total reconnection attempts and intervals are configured within the
// `ConnectionConfig` struct. For the infinite attempts, the `ReconAttempt`
// option must be set to `0`.
func (s *Server) connectionListener() {
err := <-s.conn.NotifyClose(make(chan *amqp.Error))
if err != nil {
op := "connection listener"
Level: LevelWarning,
Message: fmt.Sprintf("%s: closed: %v", op, err),
ticker := time.NewTicker(s.config.ReconInterval)
defer ticker.Stop()
var i int
for range ticker.C {
Level: LevelDebug,
Message: fmt.Sprintf("%s: reconnection attempt: %d/%d", op, i, s.config.ReconAttempt),
if err := s.connect(); err == nil {
Level: LevelInfo,
Message: fmt.Sprintf("%s: reconnected: %d/%d", op, i, s.config.ReconAttempt),
if i == s.config.ReconAttempt {
Level: LevelError,
Message: fmt.Sprintf("%s: reconnection failed: %d/%d", op, i, s.config.ReconAttempt),
Level: LevelInfo,
Message: fmt.Sprintf("connection listener: explicetly closed the connection"),
// producerChannelListener listens on the closed reserved channel notifications
// and removes the channel from the pool.
// Once removed, the very first call to the `PublishOnReservedChannel()` method
// will help recreate a new channel.
func (s *Server) producerChannelListener(chn *amqp.Channel, producerName string) {
err := <-chn.NotifyClose(make(chan *amqp.Error))
if err != nil {
Level: LevelWarning,
Message: fmt.Sprintf("producer channel listener: closed: %s: %v", producerName, err),
} else {
Level: LevelWarning,
Message: fmt.Sprintf("producer channel listener: closed: %s", producerName),
delete(s.channels, producerName)
// consumerChannelListener listens on the closed consumer channel notifications
// and reruns the consumer workers with `runConsumerWorkers()` method. However,
// if the connection was closed explicitly, nothing shall be done.
func (s *Server) consumerChannelListener(chn *amqp.Channel, consumer Consumer) {
err := <-chn.NotifyClose(make(chan *amqp.Error))
if err != nil && err.Code == amqp.ConnectionForced {
if err != nil {
Level: LevelWarning,
Message: fmt.Sprintf("consumer channel listener: closed: %s: %v", consumer.Config().Name, err),
} else {
Level: LevelWarning,
Message: fmt.Sprintf("consumer channel listener: closed: %s", consumer.Config().Name),
if err := s.runConsumerWorkers([]Consumer{consumer}); err != nil {
Level: LevelError,
Message: fmt.Sprintf("consumer channel listener: %v", err),
// log sends log messages to the log channel if not nil.
func (s *Server) log(log Log) {
if s.logChan == nil {
s.logChan <- log
version: "3.4"
image: "rabbitmq:3.8.3-management-alpine"
- "5672:5672"
- "15672:15672"
Golang istemcisinde AMQP bağlantısını bu şekilde kullanacaksınız.
srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
Name: "striker",
Schema: "amqp",
Host: "localhost",
Port: "5672",
ReconAttempt: 300,
ReconInterval: time.Second,
CACert: nil,
ClientCert: nil,
ClientKey: nil,
Sertifikalarınızı oluşturmak için Kendinden imzalı SSL sertifikaları ile Golang'dan RabbitMQ sunucusuna bağlanma blog yazısını okuyun.
version: "3.4"
image: "rabbitmq:3.8.3-management-alpine"
- "5671:5671"
- "15671:15671"
RABBITMQ_SSL_CACERTFILE: "/cert/ca_certificate.pem"
RABBITMQ_SSL_CERTFILE: "/cert/server_certificate.pem"
RABBITMQ_SSL_KEYFILE: "/cert/server_key.pem"
RABBITMQ_SSL_VERIFY: "verify_peer"
- "./cert:/cert"
Golang istemcisinde AMQPS bağlantısını bu şekilde kullanacaksınız.
caCert, err := ioutil.ReadFile("./cert/ca_certificate.pem")
if err != nil {
clientCert, err := ioutil.ReadFile("./cert/client_certificate.pem")
if err != nil {
clientKey, err := ioutil.ReadFile("./cert/client_key.pem")
if err != nil {
srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
Name: "striker",
Schema: "amqps",
Host: "localhost",
Port: "5671",
ReconAttempt: 300,
ReconInterval: time.Second,
CACert: caCert,
ClientCert: clientCert,
ClientKey: clientKey,
Kanalları nasıl kullanmak istediğinize bağlı olarak, yapımcıları kullanırken sadece bir satır değişir. Üretici kodunun tüm üreticiler için her zaman aynı olduğunu söylemek yanlış olmaz.
package producer
import (
type Morientes struct {
rabbitmq *rabbitmq.Server
config rabbitmq.ProducerConfig
func NewMorientes(rabbitmq *rabbitmq.Server, config rabbitmq.ProducerConfig) Morientes {
return Morientes{rabbitmq: rabbitmq, config: config}
func (m Morientes) Produce(messageID string, message []byte, data interface{}) error {
// if err := m.rabbitmq.PublishOnReservedChannel(amqp.Publishing{
if err := m.rabbitmq.PublishOnNewChannel(amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
ContentEncoding: "utf-8",
MessageId: messageID,
Body: message,
}, m.config); err != nil {
return err
return nil
Bu, kurulumun yapılma şeklidir
package main
import (
func main() {
log.Println("running striker ...")
// RabbitMQ ----------------------------------------------------------------
exchanges := []rabbitmq.ExchangeConfig{
Name: "penalty",
Type: "direct",
logChan := make(chan rabbitmq.Log)
go func() {
log.Println("watching logs ...")
for l := range logChan {
log.Printf("%+v\n", l)
srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
Name: "striker",
Schema: "amqp",
Username: "",
Password: "",
Host: "localhost",
Port: "5672",
VHost: "",
ReconAttempt: 300,
ReconInterval: time.Second,
CACert: nil,
ClientCert: nil,
ClientKey: nil,
}, logChan)
if err != nil {
defer srv.Shutdown()
if err := srv.Setup(exchanges, nil); err != nil {
// Producers ---------------------------------------------------------------
morientes := producer.NewMorientes(srv, rabbitmq.ProducerConfig{
Name: "morientes",
ExchangeName: "penalty",
RoutingKey: "spain",
// ....
Tüketici kodu, tüm tüketiciler için her zaman aynıdır.
package consumer
import (
type Casillas struct {
config rabbitmq.ConsumerConfig
func NewCasillas(config rabbitmq.ConsumerConfig) Casillas {
return Casillas{config: config}
func (c Casillas) Config() rabbitmq.ConsumerConfig {
return c.config
func (c Casillas) Consume(messages <-chan amqp.Delivery, id int) {
for message := range messages {
// Do the work ...
log.Printf("[%d] %s consumed: %s\n", id, c.config.Name, string(message.Body))
if err := message.Ack(false); err != nil {
log.Printf("consume: ack message: %v\n", err)
Bu, kurulumun yapılma şeklidir
package main
import (
func main() {
log.Println("running keeper ...")
// Consumers ---------------------------------------------------------------
casillas := consumer.NewCasillas(rabbitmq.ConsumerConfig{
Name: "casillas",
WorkerCount: 1,
PrefetchCount: 3,
// RabbitMQ ----------------------------------------------------------------
exchanges := []rabbitmq.ExchangeConfig{
Name: "penalty",
Type: "direct",
queues := []rabbitmq.QueueConfig{
Name: "casillas",
Binding: "spain",
Exchange: "penalty",
Mode: "lazy",
logChan := make(chan rabbitmq.Log)
go func() {
log.Println("watching logs ...")
for l := range logChan {
log.Printf("%+v\n", l)
srv, err := rabbitmq.NewServer(rabbitmq.ConnectionConfig{
Name: "keeper",
Schema: "amqp",
Username: "",
Password: "",
Host: "localhost",
Port: "5672",
VHost: "",
ReconAttempt: 300,
ReconInterval: time.Second,
CACert: nil,
ClientCert: nil,
ClientKey: nil,
}, logChan)
if err != nil {
defer srv.Shutdown()
if err := srv.Setup(exchanges, queues); err != nil {
if err := srv.RegisterConsumers([]rabbitmq.Consumer{casillas}); err != nil {
select {}