RabbitMQ 是一个消息中间件:它接收和转发消息。
生产 无非就是发送。发送消息的程序是生产者;
队列 尽管消息流经 RabbitMQ
和您的应用程序,但它们只能存储在队列中。一个队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这是我们表示队列的方式;
消费与接收具有相似的含义。一个消费者是一个程序,主要是等待接收信息;
Hello,World
https://www.nuget.org/packages/RabbitMQ.Client
RabbitMQ.Client
RabbitMQ 的 默认端口为:5672 ,默认的账号密码为 guest
生产者发送:
using System.Text;
using RabbitMQ.Client;
// 创建与服务器的连接
// 创建一个ConnectionFactory对象
var factory = new ConnectionFactory { HostName = "localhost" };
// 使用ConnectionFactory对象创建一个Connection,使用using语句将连接包装在一个作用域中。这样可以确保连接在使用完毕后会被正确关闭和释放资源。
using var connection = factory.CreateConnection();
// 使用Connection对象创建一个通道channel,这是大多数用于完成工作的 API 所在的位置
using var channel = connection.CreateModel();
// 必须使用通道channel对象调用QueueDeclare方法来 声明一个队列
channel.QueueDeclare(queue: "hello", // 队列的名称为"hello"
durable: false, // 队列是否持久化,这里false表示队列不持久化
exclusive: false, // 队列是否为当前连接的专用队列
autoDelete: false, // 队列在所有消费者断开连接后是否自动删除,false表示不会自动删除
arguments: null); // 指定队列的其他属性
// 要传递的消息
var message = "Hello World!";
// 消息内容是一个字节数组
var body = Encoding.UTF8.GetBytes(message);
// 向队列发布一条消息
channel.BasicPublish(exchange: string.Empty,
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine($" [x] Sent {message}");
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
// 当上面的代码完成运行时,通道channel和连接connection将被释放。
声明队列是幂等的 - 仅当队列尚不存在时才会创建队列。
消费者接收:
消费者正在监听来自 RabbitMQ 的消息。因此,与发布单个消息的发布者不同,我们将让消费者持续运行以侦听消息并将其打印出来。
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
// 与发布者相同,打开一个连接和一个通道
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 注意:我们在此处也声明了队列,这是由于我们可能会在生产者之前启动消费者,因此在我们试图从中消费消息之前,要确定队列是存在的。
// 声明我们要从中使用的队列。请注意,这与“发送”发布到的队列匹配
channel.QueueDeclare(queue: "hello", // 队列的名称
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine(" [x] 等待消息.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
// 使用BasicConsume方法来订阅名为"hello"的队列,并将消息传递给之前创建的EventingBasicConsumer对象。
channel.BasicConsume(queue: "hello", // 订阅名为"hello"的队列
autoAck: true, // 是否自动确认消息的接收。true表示一旦消费者接收到消息,就自动确认消息的接收。如果设置为false,则需要在处理完消息后手动确认消息的接收。
consumer: consumer); // 要处理接收到的消息的消费者对象。
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
工作队列 Work Queues
生产者发送消息到队列中,消费者从对立中抢占这个消息去消费;
工作队列(也叫任务队列),我们封装一个任务为一条消息并且发送它到一个队列。一个在后台运行的工作进程将立即获取这个任务并最终执行它。当你运行多个工作进程时,任务将在它们之间被分配。