Я пытаюсь настроить RabbitMQ, где я могу публиковать сообщения в сервисах как в виде разветвления, так и напрямую. Тем не менее, когда я публикую публикацию на обмене фанатов, я вижу сообщение, доставленное всем службам, но также и в режиме круговой проверки. Следовательно, одна из служб всегда видит одно и то же сообщение дважды.
Вот полное воспроизведение:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace rabbitmq_exchanges_repro
{
class Program
{
static void Main(string[] args)
{
var hostName = "localhost";
var factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = hostName,
};
var connection = factory.CreateConnection();
var model = connection.CreateModel();
var serviceName = "service1";
// This queue is for round-robin messages distributed to instances of the service with the specified service name.
var directExchangeName = $"{serviceName}-direct";
model.QueueDeclare(
serviceName,
durable: true,
exclusive: false,
autoDelete: false);
model.ExchangeDeclare(
exchange: directExchangeName,
type: "direct",
durable: true,
autoDelete: false);
model.QueueBind(
queue: serviceName,
exchange: directExchangeName,
routingKey: string.Empty);
// This is for fanout messages distributed to all services with the specified service name.
var fanoutExchangeName = $"{serviceName}-fanout";
model.ExchangeDeclare(
exchange: fanoutExchangeName,
type: "fanout",
durable: true,
autoDelete: false);
var fanoutQueueName = model
.QueueDeclare()
.QueueName;
model.QueueBind(
queue: fanoutQueueName,
exchange: fanoutExchangeName,
routingKey: string.Empty);
var directConsumer = new EventingBasicConsumer(model);
var fanoutConsumer = new EventingBasicConsumer(model);
var workItemConsumerTag = model.BasicConsume(
queue: serviceName,
autoAck: true,
consumer: directConsumer);
var fanoutConsumerTag = model.BasicConsume(
queue: fanoutQueueName,
autoAck: true,
consumer: fanoutConsumer);
directConsumer.Received += (o, e) =>
{
Console.WriteLine("Received message (direct)");
};
fanoutConsumer.Received += (o, e) =>
{
Console.WriteLine("Received message (fanout)");
};
Console.WriteLine("[P]ublish");
Console.WriteLine("E[x]it");
var exit = false;
while (!exit)
{
var key = Console.ReadKey();
switch (key.Key)
{
case ConsoleKey.P:
model
.BasicPublish(
exchange: fanoutExchangeName,
routingKey: string.Empty,
body: new byte[] { 1, 2, 3 });
break;
case ConsoleKey.X:
exit = true;
break;
}
}
model.BasicCancel(workItemConsumerTag);
model.BasicCancel(fanoutConsumerTag);
model.Close();
model.Dispose();
connection.Close();
connection.Dispose();
}
}
}
Запустите приведенный выше код в двух отдельных окнах консоли. Если вы нажмете P
в одном окне, вы увидите, что один экземпляр выводит то, что я ожидал:
Received message (fanout)
Но другое окно выводит это:
Received message (fanout)
Received message (direct)
И это несмотря на то, что в вызове PublishBasic
указывается имя разветвленного обмена. Что здесь происходит? Как я могу убедиться, что прямой обмен не вовлечен в этот случай?
Я не могу воспроизвести, используя RabbitMQ 3.7.14 и ваш код. Я получаю только сообщение "Received message (fanout)" в каждом окне терминала. Может быть, в RabbitMQ есть старые привязки? Вы должны сбросить свой экземпляр и повторить попытку.
ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users
и только иногда отвечает на вопросы о StackOverflow.