İnternet tabanlı uygulamalar ile mesaj sıralama sistemlerinin bir arada kullanılmalarının tüm amacı, kullanıcıların yaşayacakları bekleme sürelerini ortadan kaldırmaktır. Beklemeler süresi kısa olan kullanıcı HTTP isteklerinin sonuçlarının beklendiği durumlarda oluşur. Örneğin; email gönderme, resimleri manipüle etmek (kesmek, küçültmek vs), CSV dosyalarını işlemek, PDF dosyalarını işlemek vb. işlemler. Kullanıcı yoğunluğu yaşayan siteler bunlara benzer zaman isteyen ve fazla memory kullanan işlemleri anlık olarak yapamazlar, bu nedenle işlemleri daha sonra yapmak için RabbitMQ gibi mesaj sıralama sistemlerini kullanırlar. Bu örneğimizde RabbitMQ'ye odaklanacağız. Daha fazla bilgi için RabbitMQ sitesini ziyaret edin ve mutlaka AMQP Model'i okuyun.


Consumer notları



Faydalı komutlar


# Start server
$ rabbitmq-server

# Check status
$ rabbitmqctl status

# List queues
$ rabbitmqctl list_queue

# Check if messages unacknowledged turned on or off
$ rabbitmqctl list_queues name messages_ready messages_unacknowledged

# List all available exchanges
$ rabbitmqctl list_exchanges

# List all bindings
$ rabbitmqctl list_bindings

# Delete all queues in one go
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl start_app

Sıkça sorulan sorular



RabbitMQ nedir?


RabbitMQ mesajları producerlerden alıp, consumerlere ileten bir mesaj işleme sistemidir. Bu iki işlem arasında, uygulama geliştiricisi tarafından belirlenen kurallara göre, mesajlar bir yerde tutulabilir veya sadece belirlenen sıralara yönlendirilirler.



Örnek: Sistemimizde 5 tane Producer, 5 Queue ve 5 Consumer olduğunu varsayalım. Bir tane resmin boyutlarını değiştirmek istiyoruz. P1 mesajı E'ye gönderir. E mesajı Q1'e iletir. C1 mesajı alır ve boyutları değiştirir. Neden sadece C1? Çünkü biri Exchange içinde binding rule tanımlamış, o nedenle! Eğer binding rule farklı bir şekilde tanımlansaydı, diğer consumerler'de mesajları alabilirlerdi.



RabbitMQ'nün mesajlaşma modelinin ana fikrinde, producerler mesajları direkt olarak sılaralara göndermemeliler. Onun yerine, mesajlar önce exchange'lere iletilmeli ve daha sonra da, uygulama geliştiricisinin belirlediği kurallara uygun olan sıralara gönderilmeliler.


Notlar



Work Queues (İşlem Sıraları)


Sıralama sistemi, kullanıcı tarafından yaratılan ve çok sistem kaynağı gerektiren HTTP isteklerinin, anında işlenmesini engellemek için kullanılır. Bu basit senaryo yerine, isteğin bir kısmını anında yapıp, cevabı kullanıcıya veririz ama, çok sistem kaynağı gerektiren kısmını mesaja dönüştürüp sıraya koyarız. Bu mesaj daha sonradan bir consumer tarafından işlenir. Bu kadar basit!


Round-robin Dispatching


Kısa anlamı ile, işleri consumerler arasında paralel bir şekilde dağıtmaktır. Eğer aynı anda birden fazla consumer çalıştırırsanız, averaj olarak hepsi aynı sayıda mesaj alırlar. Bu RabbitMQ'nun varsayılan özelliğidir.


Örnek: Elimizde 5 mesaj ve 2 consumer var. Consumer 1 mesaj 1, 3 ve 5'i alır. Consumer 2 ise mesaj 2 ve 4'ü alır. Eğer mesaj 6 oluşturulursa, o'da consumer 2'ye gider.


Message Acknowledgement (Mesaj geri bildirimi)


Herhangi bir nedenden dolayı, mesaj işleme sırasında elimizdeki consumer durabilir ve de biz o mesajı kaybedebiliriz. Bu gibi durumlarda, o mesajları sıralara geri koyup, çalışan diğer consumerler tarafından tekrar işlenmelerini sağlamk daha iyi olurdu.


RabbitMQ mesaj kaybını önlemek için "message acknowledgments" yöntemini kullanır. Bir consumer herhangi bir mesajı başarılı bir biçimde işlediğinde, RabbitMQ'ya olumlu bilgi verir, RabbitMQ'da bu mesajı siler. Eğer olumlu bilgi verme işlemi gerçekleşmez ise, RabbitMQ birşeylerin yolunda gitmediğini anlar ve o mesajı tekrardan sıraya koyar. Daha sonra aktif olan bir consumer o mesajı tekrar alır ve işler.


Message acknowledgment varsayılan olarak aktif halde değildir, bu nedenle kendiniz aktif hale getirmek zorundasınız.


Message Durability (Mesaj devamlılığı)


Herhangi bir nedenden dolayı eğer RabbitMQ sunucusu durursa, mesaj ve sıralar ile ilgili olan bilgileri unutulur. Bu başımıza gelmesini istemediğimiz bir senaryo ama bu gibi durumlarla başa çıkmak için mesaj ve sıraları "durable" olarak tanımlayabiliriz. Mesaj ve sıralar varsayılan olarak "durable" olarak tanımlanmazlar, bu nedenle tanımlamayı siz yapmak zorundasınız.


Öncelikle sırayı "durable" olarak tanımlamalısınız ama aklınızda bulunsun, eğer sıra zaten mevcut ise, onu "durable" olarak değiştiremezsiniz, bu nedenle yeni bir tane sıra tanımlamalısınız. Durable tanımlaması producer ve consumer kodlarında yapılır. Daha sonra mesajı delivery_mode = 2 ayarı ile "persistent" olarak ayarlamalısınız.


Persistent mesajlar hakkında not: Bu ayar RabbitMQ'ye mesajı önce diske kaydetmesini söyler ama bazen mesajlar diske kaydedilmeden önce işlenebilirler, bu nedenle mesajların kaybolma ihtimali halen vardır. Eğer bu ihtimali tamamen ortadan kaldırmak istiyorsanız, publisher confirms (Publisher Acknowledgements) kullanın.


Fair Dispatch (Adil sevk)


Round-robin dispatching özelliğinin mesajları consumerler arasında eşit olarak paylaştırdığını biliyoruz ama şöyle bir senaryo düşünelim. Consumer 1 mesaj 1, 3 ve 5'i işler. Consumer 2 mesaj 2 ve 4'ü işler. Şans olsa gerek, mesaj 1, 3 ve 5 çok az efor isteyen işler iken, mesaj 2 ve 4 ise tam tersi, yani çok efor gerektiren işlerdir, bu nedenle consumer 2'ye biraz acımamız gerekir.


Bu haksızlığı gidermek için, RabbitMQ'ye bir mesajın işlenmesi bitmeden, kesinlikle ikinci mesajı consumerlere vermemesini söyleyebiliriz. Yani bir consumer işlemini bitirmeden diğerlerine beklemeleri gerektiğini söyler. Bu işlemi basic_qos() methodunda prefetch_count = 1 ayarı yaparak aktif hale getiririz.


Publishers/Subscribers


Mesaj sıralama mantığına göre, bir mesaj işlenmesi için sadece bir consumere verilir. Bir mesajı birden fazla consumere verme işlemine "publish/subscribe" denir.



Exchange


Exchange publisher ve sıralar arasında bulunur ve tanımlanan kurallara göre, mesajı producerden alıp uygun olan sıraya yerleştirir. Kurallar exchange modellerine göre değişir. Mesajlar sadece bir veya birden fazla sıraya verilir. Verilecek sıra yoksa, silinirler. Kısa anlamı ile, exchange ona mesajlarla ilgili ne derseniz onu yapar.


Dört çeşit exchange vardır: direct, topic, headers ve fanout.


Fanout Exchange


Gelen mesajları keyfine göre sıralara verir, yani kural tanımazdır. Elinizde ona dikte etme yetkiniz yoktur. Mesajlar sıralara kullanıcı tarafından isimleri routing_key ile tanımlandıkları zaman iletilirler ama bu fanout exchange için geçerli değildir.


Binding


Binding, exchange ve sıra arasındaki ilişkinin tanımıdır. Exchange, tanımlanan binding kuralını takip ederek mesajı uygun olan sıraya verir. Bu yöntem binding_key ile gerçekleştirilir ama fanout exchange modelinin bu özelliği yoktur.


Routing


Routing, binding key kullanıp mesajları filtre eder ve exchange'den aldıkları mesajları kurallara uyan sıralara verir, ama fanout exchange bunun dışındadır. Binding key queue_bind methodunun üçüncü parametresidir.


Direct Exchange


Mesajlar, mesajın "binding key" değeri ile sıranın "routing key" değerleri arasında eşitliğin olduğu sıraya verilir. Elinizde hangi mesajın hangi sıraya gideceğini belirleme seçeneği vardır. Eğer direct exchange kullanırken binding_key ve routing_key belirlenmemiş ise, direct exchange fanout gibi hareket edip, mesajları ilişkide olduğu tüm sıralara gönderecektir.



Yukarıdaki direct exchange örneğinde, kendisine bağlı olan iki tane sıra vardır. Q1 jpg binding key değeri ile, Q2 ise gif ve png binding key değerleri ile bağlıdır. Routing key değeri jpg olan mesaj önce Q1'e gider ve oradan da C1'e gider. Routing key değerleri gif ve png olan mesajlar ise önce Q2'e giderler ve oradan da C2'ye giderler.


Multiple Bindings


Aynı binding key değeri kullanılarak, mesajlar bir exchange'den birden fazla sıraya verilebilirler. Aşağıda da gördüğümüz gibi routing key değeri jpg olan mesajlar Q1 ve Q2'ye iletilirler.



Topic Exchange


Direct exchange'de olduğu gibi mesajlar, mesajın "binding key" değeri ile sıranın "routing key" değerleri arasında eşitliğin olduğu sıraya verilir. Aralarındaki fark, binding key değerleri birden fazla kelimeden oluşur ve nokta işareti ile ayrılırlar. Ayrıca * ve # simgeleri içerirler. Yıldız tam olarak bir kelime, hash ise sıfır veya birden fazla kelime anlamına gelir. Binding key değerinin uzunluğu 255 byte ile sınırlıdır. Örnek: create.pdf, delete.csv, uefa.europa.league, *.europa.*, champion.# vs.



Yukarıda gösterilen binding key değerlerine göre sonuç şu olur:



Topic exchange kullanma durumunda, eğer binding key sadece # işaretinden oluşuyorsa, topic exchange fanout exchange gibi hareket edip, mesajları sıralara keyfiyen yönlendirir. Eğer binding key içinde herhangi bir işaret yok ise, topic exchange direct exchange gibi hareket edip, mesajları uygun olan sıralara yönlendirir.


RPC


Eğer client sunucudan mesaj işleminin sonucunun ne olduğuna dair cevap ister ise, Remote procedure call (RPC) yöntemi kullanılır. Sunucunun cevap alabilmek için, client "callback queue" adresini isteğin parçası olarak göndermelidir. RPC işlemlerinde mesaj özelliği olarak, reply_to (callback queue'nin ismi) ve correlation_id (RPC isteğinin ve sunucu cevabının bağıdır) kullanılır.


Atanan correlation_id değeri her istek için eşsiz olmalıdır. Sunucudan "callback queue" ile cevabı aldığımız zaman, geri gelen correlation_id değerinin isteğimize uyup uymadığını kontrol ederiz. Bazı durumlarda kopya cevaplar alabiliriz, bu nedenle kopyayı gerektiği şekilde işlemek size kalmış bir şey.



RPC şu şekilde çalışır:


  1. Client isteği başlattığında anonim bir callback queue yaratır.

  2. Client mesajı reply_to ve correlation_id bilgileri ile birlikte gönderir.

  3. Mesaj RPC sırasına gönderilir.

  4. RPC worker isteği yakalayıp sonlandırır ve cevabı reply_to alanında tanımlanan sıraya verir.

  5. Client cevabı yakalar ve correlation_id değerini kontrol eder. Eğer eşitlik var ise gerekeni yapar, aksi taktirde önemsenmez.