Почему публикация на разветвленной бирже и публикация на прямой обмен?

2

Я пытаюсь настроить RabbitMQ, где я могу публиковать сообщения в сервисах как в виде разветвления, так и напрямую. Тем не менее, когда я публикую публикацию на обмене фанатов, я вижу сообщение, доставленное всем службам, но также и в режиме круговой проверки. Следовательно, одна из служб всегда видит одно и то же сообщение дважды.

Вот полное воспроизведение:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace rabbitmq_exchanges_repro
{
    class Program
    {
        static void Main(string[] args)
        {
            var hostName = "localhost";
            var factory = new ConnectionFactory
            {
                AutomaticRecoveryEnabled = true,
                HostName = hostName,
            };

            var connection = factory.CreateConnection();
            var model = connection.CreateModel();

            var serviceName = "service1";

            // This queue is for round-robin messages distributed to instances of the service with the specified service name.
            var directExchangeName = $"{serviceName}-direct";
            model.QueueDeclare(
                serviceName,
                durable: true,
                exclusive: false,
                autoDelete: false);
            model.ExchangeDeclare(
                exchange: directExchangeName,
                type: "direct",
                durable: true,
                autoDelete: false);
            model.QueueBind(
                queue: serviceName,
                exchange: directExchangeName,
                routingKey: string.Empty);

            // This is for fanout messages distributed to all services with the specified service name.
            var fanoutExchangeName = $"{serviceName}-fanout";
            model.ExchangeDeclare(
                exchange: fanoutExchangeName,
                type: "fanout",
                durable: true,
                autoDelete: false);
            var fanoutQueueName = model
                .QueueDeclare()
                .QueueName;
            model.QueueBind(
                queue: fanoutQueueName,
                exchange: fanoutExchangeName,
                routingKey: string.Empty);

            var directConsumer = new EventingBasicConsumer(model);
            var fanoutConsumer = new EventingBasicConsumer(model);
            var workItemConsumerTag = model.BasicConsume(
                queue: serviceName,
                autoAck: true,
                consumer: directConsumer);
            var fanoutConsumerTag = model.BasicConsume(
                queue: fanoutQueueName,
                autoAck: true,
                consumer: fanoutConsumer);

            directConsumer.Received += (o, e) =>
            {
                Console.WriteLine("Received message (direct)");
            };
            fanoutConsumer.Received += (o, e) =>
            {
                Console.WriteLine("Received message (fanout)");
            };

            Console.WriteLine("[P]ublish");
            Console.WriteLine("E[x]it");
            var exit = false;

            while (!exit)
            {
                var key = Console.ReadKey();

                switch (key.Key)
                {
                    case ConsoleKey.P:
                        model
                            .BasicPublish(
                                exchange: fanoutExchangeName,
                                routingKey: string.Empty,
                                body: new byte[] { 1, 2, 3 });
                        break;
                    case ConsoleKey.X:
                        exit = true;
                        break;
                }
            }

            model.BasicCancel(workItemConsumerTag);
            model.BasicCancel(fanoutConsumerTag);

            model.Close();
            model.Dispose();

            connection.Close();
            connection.Dispose();
        }
    }
}

Запустите приведенный выше код в двух отдельных окнах консоли. Если вы нажмете P в одном окне, вы увидите, что один экземпляр выводит то, что я ожидал:

Received message (fanout)

Но другое окно выводит это:

Received message (fanout)
Received message (direct)

И это несмотря на то, что в вызове PublishBasic указывается имя разветвленного обмена. Что здесь происходит? Как я могу убедиться, что прямой обмен не вовлечен в этот случай?

  • 0
    Я не могу воспроизвести, используя RabbitMQ 3.7.14 и ваш код. Я получаю только сообщение "Received message (fanout)" в каждом окне терминала. Может быть, в RabbitMQ есть старые привязки? Не могли бы вы сбросить свой экземпляр и повторить попытку?
  • 0
    @LukeBakken Действительно, я удалил свой докер-контейнер и создал заново, и теперь он работает. Если вы добавите это как ответ, я с радостью приму. Спасибо!
Теги:
.net-core
rabbitmq

1 ответ

0

Я не могу воспроизвести, используя RabbitMQ 3.7.14 и ваш код. Я получаю только сообщение "Received message (fanout)" в каждом окне терминала. Может быть, в RabbitMQ есть старые привязки? Вы должны сбросить свой экземпляр и повторить попытку.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы о StackOverflow.

Ещё вопросы

Сообщество Overcoder
Наверх
Меню