RabbitMQ
安装
在MacOS系统下直接使用Homebrew
安装。
brew install rabbitmq
之后将rabbitmq加入环境变量:
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
只需要使用rabbitmq-server
命令就可以启动rabbitmq了。启动后访问localhost:15672
端口可以访问UI界面,超级管理员账号密码都是guest
。
之后可以在Admin
中添加用户:
刚添加完成的用户在vhost
一栏是没有权限的,所以这个时候需要给其设置一个vhost
,那么这个vhost
就相当于一个数据库(可以理解为mysql
里面的一个db
),我们创建一个用户对其用户授权,他就可以访问了。
创建vhost
过程操作方式如下:name
一般以/
开头
之后点击进入创建的vhost
进行设置
之后就可以使用创建的角色登陆了。
在Java中使用Rabbitmq
首先需要导入以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.0-alpha1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.0-alpha1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0-M1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
简单队列
简单队列模型如图:
- P:消息的生产者
- C:消息的消费者
- 红色:队列
生产者将消息发送到队列,消费者从队列中获取信息。
根据以上的模型,可以抽取出 3 个对象:生产者(用户发送消息)、队列(中间件):类似于容器(存储消息)、消费者(获取队列中的消息) 。
获取MQ连接
package com.xm.rabbitmq.conn;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 获取rabbitmq连接
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
//定义连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务地址
connectionFactory.setHost("127.0.0.1");
//设置端口,amqp协议,类似于mysql的3306
connectionFactory.setPort(5672);
//设置账号信息,用户名,密码,vhost
connectionFactory.setVirtualHost("/ming");
connectionFactory.setUsername("ming");
connectionFactory.setPassword("zm19980225");
//通过工厂获取链接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
生产者发送数据到消费者队列
package com.xm.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SendMQ {
private static final String QUEUE_NAME="QUEUE_simple";
/**
* 生产者发送数据到消息队列
* @throws IOException
* @throws TimeoutException
*/
@Test
public void sendMsg() throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//创建队列(声明),因为我们要往队列里面发送消息,所以就得知道往哪个队列里面发送
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,null);
String message = "Hello World!";
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
//我们需要将该参数设置成创建的exchange的名字)
//第二个参数是路由键
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("[x] Sent '" + message + "'");
//由于Connection和Channel都实现了java.io.Closeable接口,所以可以不同手动关闭
//channel.close();
//connection.close();
}
}
运行之后可以在UI界面获取到如下内容:
消费者消费
package com.xm.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String QUEUE_NAME = “QUEUE_simple”;
public static void main(String[] args) throws IOException, TimeoutException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("[*] Waiting for message. To exit press CTRL+C");
//该回调会缓冲消息(阻塞),直到使用它为止
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message +"'");
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag-> { });
}
}
简单队列的缺点
耦合性高,生产消费一一对应(如果有多个消费者都想消费这个消息就不行了),队列名称变更时需要同时修改代码。
工作队列
对于简单队列,应用程序在是使用消息系统的时候,一般生产者 P 生产消息是毫不费力的(发送消息即可),而消费者接收完消息后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用。那么怎么让消费者同时处理多个消息呢? 在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了。
使用任务队列的优点之一就是可以轻易的并行工作。如果积压了好多工作,可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。
生产者发送消息
//消费者1
package com.xm.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0; i < 50; i++) {
//消息内容
String message = "." + i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("[x] Sent '" + message +"'");
Thread.sleep(i*10);
}
}
}
消费者
package com.xm.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(),”UTF-8”);
System.out.println("[x] Received '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[x] Done");
}
});
boolean autoAck = true;//设置消息的确认模式为自动应答
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep(1000);
}
}
//消费者2
package com.xm.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(“[x] Received ‘” + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(“[x] Done”);
}
});
boolean autoAck = true;//设置消息的确认模式为自动应答
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
}
}
测试结果:
- 消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
- 消费者 1 和消费者 2 获得的消息数量是一样的,一个奇数,一个偶数。
这种方式称为轮询分发,结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务 。
公平分发(Fait dispatch)
上面发生轮询分发这种情况发生是因为RabbitMQ在消息进入队列时才调度消息。它不会查看使用者的未确认消息数。它只是盲目地将每第n条消息发送给第n个使用者。
为了解决这个问题,可以将basicQos
方法与prefetchCount = 1
设置一起使用。这告诉RabbitMQ一次不要给消费者一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给消费者。而是将其分派给不忙的下一个工作程序。
int prefetchCount = 1 ;
channel.basicQos(prefetchCount);
需要注意的是,使用公平分发,必须关闭自动应答,改为手动应答。
生产者:
package com.xm.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
//限制发给同一个消费者不得超过1条消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
for (int i = 0; i < 50; i++) {
//消息内容
String message = “.” + i;
channel.basicPublish(“”,QUEUE_NAME,null,message.getBytes(“UTF-8”));
System.out.println("[x] Sent '" + message +"'");
Thread.sleep(i*10);
}
}
}
消费者:
//消费者1
package com.xm.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//保证一次只分发一个
channel.basicQos(1);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("[x] Received '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
});
boolean autoAck = false;//设置消息的确认模式为手动确认模式
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,consumerTag -> { });
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep(1000);
}
}
//消费者2
package com.xm.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
});
boolean autoAck = false;//设置消息的确认模式为手动确认模式
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
}
消息确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
boolean autoAck = true
(自动确认模式):一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。boolean autoAck = false
(手动确认模式):如果不想丢失任何任务,如果有一个消费者挂掉了,那么应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消息应答。消费者发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ才可以删除它。- 默认情况下消息应答是打开的
消息持久化
通过消息确认能够保证即使消费者死亡,任务也不会丢失。但是如果RabbitMQ 服务器停止,任务仍将失去。当 RabbitMQ 退出或者崩溃,将会丢失队列和消息。除非不要队列和消息。两件事儿必须保证消息不被丢失,所以必须把“队列”和“消息”设为持久化。
boolean durable = true;
channel.queueDeclare("test_queue_work",durable,false,false,null);
如果在此之前已经创建了队列,在程序代码中直接修改后再重新运行会抛出异常:
channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue_work' in vhost '/ming': received 'true' but current is 'false', class-id=50, method-id=10)
原因是rabbitmq不允许使用不同的参数重复设定已经存在的队列。解决方案就是声明一个不同名字的队列,或者在UI界面删除原有队列。
订阅模式
如果希望发送一个消息后能够被多个消费者消费,这时候就需要用到消息中的发布订阅模型。
这种模式需要在生产者和队列之间多加一个交换机,当消费者获取消息时,需要队列绑定到交换机上,交换机把消息发送到队列,消费者就能够获得队列消息。
- 一个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
- 每个队列都要绑定到交换机
- 生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
生产者:
package com.xm.rabbitmq.publish_subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange交换机/转发器
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout分裂方式
//消息内容
String message = "Hello!";
//因为是发送到交换机,所以第二个参数队列名留空
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("[x] Sent '" + message + "'");
}
}
运行之后可以在控制台看到交换机,但是消息却丢失了。原因是存储机并不具备存储消息的能力,这是还没有队列绑定到交换机,所以会造成消息丢失。
消费者:
package com.xm.rabbitmq.publish_subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明队列名
//声明一个非持久的、排他的、自动删除的队列,具有随机名称,一旦断开使用者的连接,该队列将自动删除。
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("[*] Waiting for messages.To exit press CTRL+C");
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + message + "'");
});
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{ });
}
}
多运行几个之后,可以看到有多个绑定到交换机:
路由模式
如果希望根据特定场景将消息发送给特定队列,而不是直接发送到全部队列中,可以使用路由模式。
路由模式可以使用routingKey
参数创建绑定键,生产者在发送消息时可以通过制定路由键将消息放入到指定队列,同样消费者也需要通过路由键绑定到指定的队列上获取消息。
此时交换机需要采用direct
模式。
生产者:
package com.xm.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String EXCHANGE_NAME = “test_exchange_direct”;
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange(需要使用direct模式)
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//消息内容
String message = "Hello!BLACK!";
//指定路由键
channel.basicPublish(EXCHANGE_NAME,"black",null,message.getBytes());
System.out.println("[x] Sent '" + message + "'");
}
}
消费者:
package com.xm.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"black");
//同一时刻服务器只会分发一个给当前消费者
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
boolean autoACK = false;
channel.basicConsume(queueName,autoACK,deliverCallback,consumerTag -> { });
}
}
Topic模型
能够对路由键进行匹配。此时消费者能够根据匹配情况使用队列。
- 符号
#
匹配一个或者多个词 - 符号
*
匹配一个词
路由键设置为quick.orange.rabbit
的消息将传递到两个队列。消息lazy.orange.elephant
也将发送给他们两个。另一方面,quick.orange.fox
只会进入第一个队列,而lazy.brown.fox
只会进入第二个队列。lazy.pink.rabbit
将仅被传递到第二队列一次,即使两个绑定匹配。quick.brown.fox
与任何绑定都不匹配,因此将被丢弃。
生产者:
package com.xm.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private final static String EXCHANGE_NAME = “test_exchange_topic”;
public static void main(String[] args) throws IOException, TimeoutException {
//获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//消息内容
String message = "Hello,Topic!";
channel.basicPublish(EXCHANGE_NAME,"item.create",null,message.getBytes());
System.out.println("[x] Sent '" + message + "'");
}
}
消费者:
package com.xm.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Recv {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
//如果改成item.*则可以收到消息
channel.queueBind(queueName,EXCHANGE_NAME,”item.update”);
channel.queueBind(queueName,EXCHANGE_NAME,"item.delete");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
boolean autoAck = false;
channel.basicConsume(queueName,autoAck,deliverCallback,consumerTag -> { });
}
}
Exchange(转发器|交换机)
转发器一方面它接受生产者的消息,另一方面向队列推送消息。主要有以下几种方式:
Nameless exchange(匿名转发)
默认的转发器是空字符串””
,也可以理解为未使用转发器:
channel.basicPublish("","hello”,null,message.getBytes());
Fanout Exchange
不处理路由键。只需要将队列绑定到交换机上,发送消息到交换机就会被转发到与该交换机绑定的所有队列上。
Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键是“dog”,则只有被标记为“dog”的消息才会被转发,而“dog.a”不会被转发。
Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定到一个模式上,符号#
匹配一个或者多个词。符号*
匹配一个词。因此audit.#
能够匹配到audit.irs.corporate
,但是audit.*
只会匹配到audit.irs
。
消息确认机制(事务+Confirm)
在Rabbitmq中可以通过持久化来解决因为服务器异常而导致的数据丢失问题。但是还存在另外一个问题:生产者将消息发送出去之后,消息是否真正到达Rabbitmq服务器是无法确定的(即Rabbitmq不会反馈任何消息给生产者),默认情况下是不知道消息有没有正确到达的。
导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本没有到达Rabbitmq服务器。
Rabbitmq提供了两种解决方式:
- 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案。
- 通过将
channel
设置成confirm
模式来实现。
事务机制
RabbitMQ中与事务机制有关的方法有三个:txSelect()
,txCommit()
以及txRollback()
。txSelect()
用于将当前channel
设置成transaction
模式,txCommit
用于提交事务,txRollback
用于回滚事务,在通过txSelect()
开启事务之后,便可以发布消息给broker
代理服务器了,如果txCommit()
提交成功了,则表明消息到达了broker
了,如果在txCommit
执行之前broker
异常崩溃或者由于其他原因抛出异常,这个时候便可以捕获异常通过txRollback
回滚事务了。
关键代码:
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.txCommmit();
生产者:
package com.xm.rabbitmq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SendMQ {
private static final String QUEUE_NAME = "QUEUE_simple";
@Test
public void sendMsg() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "Hello,Simple QUEUE!";
try {
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
int i = 10/0;
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("--msg rollback");
} finally {
System.out.println("--Send msg over:" + msg);
}
}
}
消费者:
package com.xm.rabbitmq.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String QUEUE_NAME = "QUEUE_simple";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Receive message: " + message);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> { });
}
}
缺点:采用这种模式比较耗时,降低了Rabbitmq的吞吐量
Confirm模式
Confirm模式是相对比较高效的结局方案,分为三种情况:普通Confirm模式,批量Confirm模式以及异步Confirm模式。
producer端confirm模式的实现原理
生产者将信道设置成confirm
模式,一旦信道进入confirm
模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker
回传给生产者的确认消息中deliver-tag
域包含了确认消息的序列号,此外broker
也可以设置basic.ack
的multiple
域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm
模式最大的好处在于它是异步的,一旦发布了一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack
消息,生产者应用程序同样可以在回调方法中处理该nack
消息。
开启confirm模式
生产者通过调用channel
的confirmSelect
方法将channel
设置为confirm
模式。
Channel channel = connection.createChannel();
channel.confirmSelect();
已经在
transaction
事务模式的channel
是不能设置成confirm
模式的,即这两种模式是不能共存的。
编程模式
- 普通
confirm
模式:每发送一条消息后,调用waitForConfirms
方法,等待服务器端confirm
,实际上是一种串行confirm
了。 - 批量
confirm
模式:每发送一批消息后,调用waitForConfirms
方法,等待服务器端confirm
- 异步
confirm
模式:提供了一个回调方法,服务端confirm
了一条或者多条消息之后Client
端会回调这个方法。
普通Confirm模式
该技术实现相对比较简单,但是有一个主要缺点:由于消息确认会阻止所有后续消息的发布,因此它会大大减慢发布速度。这种方法不会提供每秒超过数百条已发布消息的吞吐量。
package com.xm.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SendConfirm {
private static final String QUEUE_NAME = "QUEUE_simple_confirm";
@Test
public void sendMsg() throws IOException, TimeoutException, InterruptedException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String body = String.valueOf(i);
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
//通过该消息进行发送确认,也可以通过下面的方式
channel.waitForConfirmsOrDie(5000);
}
//通过该方法阻塞对消息发送进行确认,确认消息后该方法立即返回。
//可以通过设置超时时间内确认该消息是否被确认,如果没有那么发生异常。
// if(!channel.waitForConfirms()) {
// System.out.println("send message failed.");
// } else {
// System.out.println(“send message success!”);
// }
}
}
批量Confirm模式
批量confirm
模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来 publish
消息,然后等待服务器端confirm
, 相比普通confirm
模式,批量极大提升confirm
效率,但是问题在于一旦出现confirm
返回false
或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm
性能应该是不升反降的。
package com.xm.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SendBatchConfirm {
private static final String QUEUE_NAME = “QUEUE_simple_confirm”;
@Test
public void sendMsg() throws IOException, TimeoutException, InterruptedException {
//获取一个连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式
channel.confirmSelect();
int outstandingMessageCount = 0;
for (int i = 0; i < 10; i++) {
String msg = "hello,batchConfirm";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
outstandingMessageCount++;
if(outstandingMessageCount == 10) {
channel.waitForConfirmsOrDie(50);
outstandingMessageCount = 0;
}
}
if(outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(50);
}
}
}
异步confirm模式
Channel对象提供的ConfirmListener()
回调方法只包含deliveryTag
(当前Channel发出的消息序号),我们需要自己为每一个Channel
维护一个unconfirm
的消息序号集合,每publish
一条数据,集合中元素加1,每回调一次handleAck
方法,unconofirm
集合删掉相应的一条(multiple=false
)或者多条(multiple=true
)记录。从程序运行效率上看,这个unconfirm
集合最好采用有序集合SortedSet
存储结构。实际上,SDK中的waitForConfirms()
方法也是通过SortedSet
维护这个消息序号的。
package com.xm.rabbitmq.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.xm.rabbitmq.conn.ConnectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class SendAync {
private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
//未确认的消息标识
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
//添加监听通道
channel.addConfirmListener(new ConfirmListener() {
//每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
//没有问题的handleAck
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
System.out.println("--multiple—");
//用一个SortedSet, 返回此有序集合中从开始到deliveryTag+1的所有元素。
confirmSet.headSet(deliveryTag+1).clear();
} else {
System.out.println(“—multiple false”);
confirmSet.remove(deliveryTag);
}
}
//失败的回调
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack,SeqNo:" + deliveryTag + ",multiple: " + multiple);
if(multiple) {
confirmSet.headSet(deliveryTag+1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
String msg = "Hello,QUEUE !";
while(true) {
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
confirmSet.add(nextPublishSeqNo);
}
}
}