Derinlemesine yazılım eğitimleri için kanalımı takip edebilirsiniz...

RabbitMQ – Fanout Exchange

Merhaba,

RabbitMQ kendisine gönderilen mesajları direkt olarak oluşturulan kuyruğa eklemekte ve consumerlar ise bu mesajları sırasıyla tüketmektedir. Süreç bu şekilde ilerlerken birden fazla consumerın olduğu durumlarda mesaj tüketiminin nasıl bir algoritma tarafından işleneceğini ve hangi consumerın ne kadar mesajı işleyeceğini bildirmemiz gerekmektedir. İşte bu durumda devreye exchangeler girmekte ve türüne göre exchange ile kuyruğa gönderilen mesajlar türün getirdiği işleme mantığına göre tüketicilere gönderilerek işletilmektedir. Bizler bu içeriğimizde ilk olarak tüm consumerlara mesajların iletilmesini sağlayan Fanout Exchange yapılanmasını inceleyeceğiz.

Exchange Nedir?

Bu içeriğimiz exchangelerle ilgili ilk makalemiz olacağından dolayı bu soruyu cevaplandırarak konuya girmek istiyorum 🙂
RabbitMQ Nedir?
Yukarıdaki şemayı incelerseniz eğer RabbitMQ servislerine gönderilen mesajlar exchange tarafından karşılanmakta ve exchange’in türüne göre ilgili kuyruğa iletilerek, dahil edilmekte ve bu kuyruğu takip eden tüm consumerlar tarafından mesajlar tüketilmekte bir başka deyişle işlenmektedir..

Burada consumerlar tarafından kuyrukların takip edilmesi durumunu, mesajı yayınlayan(publisher) bu yayınlara abone/subscribe olan consumerların olmasından dolayı Publish/Subscribe Pattern olarak ifade etmekteyiz.

Fanout Exchange İşleyişi Nasıldır?

v
Fanout exchange yukarıdaki görselden de anlaşılacağı gibi aldığı mesajı direkt olarak tüm kuyruklara dağıtmakta ve böylece kuyrukları dinleyen tüm consumerlara işletmektedir.
Genellikle aşağıdaki durumlara benzer senaryolarda tercih edilebilir;

  • Güncel skorların tüm oyunculara bildirilmesi,
  • Hava durumu bilgisinin tüm kanallarda yayınlanması,
  • Tüm birimlere %n’lik bir ikramiyede bulunulması

vs…
Yani anlayacağınız üzere umumi bir operasyon yahut bildirim söz konusuysa Fanout Exchange kullanılabilir.

Kullanımı

Publisher

Artık bir exchange kullanılacağından dolayı publisher tarafında bir kuyruk oluşturmak yerine exchange oluşturulmalı ve ardından mesajlar ilgili exchange’e gönderilmelidir.

        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.Uri = new Uri("amqp://hkhjerrt:hcqiavAqll6-co4abXnSqUBh_hHifz-Z@hornet.rmq.cloudamqp.com/hkhjerrt");

            using (IConnection connection = factory.CreateConnection())
            using (IModel channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("iskuyrugu", type: ExchangeType.Fanout);
                for (int i = 1; i <= 100; i++)
                {
                    byte[] bytemessage = Encoding.UTF8.GetBytes($"is - {i}");

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    channel.BasicPublish(exchange: "iskuyrugu", routingKey: "", basicProperties: properties, body: bytemessage);
                }
            }
        }

Yukarıdaki publisher kaynağını temsil eden kod bloğuna göz atarsanız eğer 9. satırda “ExchangeDeclare” metodu ile bir exchange oluşturulmuş ve ardından ‘type’ parametresine verilen ‘ExchangeType.Fanout’ komutuylada türü belirtilmiştir. Ayrıca 17. satırda ise oluşturulan exchange’in “BasicPublish” metoduna belirtildiğinide gözden kaçırmayınız…

Consumer

Exchange kullanmadığımız önceki makalelerimizde manuel bir queue declare ediyor ve ardından tüm mesajlarımızı o kuyruğa gönderiyorduk. Bu içeriğimizde biraz önce(yukarıda) exchange oluşturduk ve o exchange üzerinden kuyruğa mesajlarımızı iletileceğini belirttik. Lakin sizden hoca kuyruk oluşturmadın ki mesajlar iletilsin! şeklinde bir duyum aldım gibiyim… Evet, exchange declare edildikten sonra bir kuyruk oluşturmadık çünkü ne zaman ki bir consumer herhangi bir exchange üzerinden bağlantı sağlayarak mesajlarını iletir o zaman otomatik bir queue oluşturulacak ve ona bağlanacaktır. İşte bu yüzden ekstradan bir kuyruk oluşturma gereği duymamış bulunmaktayım.

Bir exchange’e birden fazla consumer bağlanıp ayağa kalkarsa eğer bu consumerların her biri için ayrı ayrı kuyruk oluşturulacaktır.

Aşağıdaki örnek consumer kodunu ele alırsak eğer;

        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.Uri = new Uri("amqp://hkhjerrt:hcqiavAqll6-co4abXnSqUBh_hHifz-Z@hornet.rmq.cloudamqp.com/hkhjerrt");

            using (IConnection connection = factory.CreateConnection())
            using (IModel channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("iskuyrugu", type: ExchangeType.Fanout);

                #region Her Consumer İçin Oluşturulacak Kuyruklara Random İsim Oluşturma
                string queueName = channel.QueueDeclare().QueueName; //Random isim oluşturuyoruz.
                channel.QueueBind(queue: queueName, exchange: "iskuyrugu", routingKey: ""); //Oluşturulan Random ismi ilgili exchange'e bind ediyoruz.
                #endregion

                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queueName, false, consumer);
                consumer.Received += (sender, e) =>
                {
                    Thread.Sleep(int.Parse(args[0]));
                    Console.WriteLine(Encoding.UTF8.GetString(e.Body) + " alındı");
                    channel.BasicAck(e.DeliveryTag, false);
                };
                Console.Read();
            }

9. satırda publisher’da ki ile birebir aynı bir exchange declare etmiş bulunmaktayız. 1114. satırlar arasında her consumer için oluşturulacak queue’nin adını random oluşturabilmek için bir isim üretmekteyiz ve bu ismi ilgili declare edilen exchange ile bind etmekteyiz. 19. satırda ise artık consumer’ın doğru kuyruğu tüketmesi için “BasicConsume” metodunda random oluşturulan kuyruk ismini vermekteyiz.

Test

Yapılan çalışmaları derleyip çalıştırdıktan sonra hornet.rmq.cloudamqp.com adresindeki cloud servise göz attığımızda ilgili exchange’in oluşturulduğunu görmekteyiz.
RabbitMQ – Fanout Exchange

v
Evet… Görüldüğü üzere RabbitMQ – Fanout Exchange ile abone olan tüm consumerların kuyruklarına eşit şekilde mesajlarımız iletilmekte ve biryandan da consumerlar tarafından tüketilmektedir.

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Not : Örnek uygulamayı indirmek için buraya tıklayınız.

Bunlar da hoşunuza gidebilir...

3 Cevaplar

  1. Mesut dedi ki:

    Hocam şu fanout olayında bir türlü kuyruk olayını cozememistim, tek açıklamanız yetti. Teşekkürler

  1. 01 Mart 2020

    […] RabbitMQ – Fanout Exchange […]

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

*