体验
安装
1、官网地址 https://www.rabbitmq.com/download.html
2、文件上传到/usr/local/software目录下(software需要自己创建)
3、安装文件(分别按照以下顺序安装)
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
常用命令
添加开机启动RabbitMQ服务(可选)
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status
停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启web管理插件
rabbitmq-plugins enable rabbitmq_management
此时用默认账号密码(guest)访问地址 http://IP:15672/ 会出现权限问题
4、添加一个新的用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
当前用户和角色
rabbitmqctl list_users
5、再次利用admin用户登录
6、重置命令
关闭应用的命令为
rabbitmqctl stop_app
清除的命令为
rabbitmqctl reset
重新启动命令为
rabbitmqctl start_app
Hello World
用Java编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。
在下图中:“ P”是我们的生产者,“ C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build>
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
|
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.92.234.71"); factory.setUsername("admin"); factory.setPassword("123"); try( Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } } }
|
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.92.234.71"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息...."); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消费被中断"); };
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
|
Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
轮训分发消息
我们启动两个工作线程,一个消息发送线程,来看看他们两个工作线程是如何工作的。
抽取工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class RabbitMqUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.121.128");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel; } }
|
启动两个工作线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Worker01 { private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("接收到消息:"+receivedMessage); }; CancelCallback cancelCallback=(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻"; }; System.out.println("C2 消费者启动等待消费......"); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
|
启动一个发送线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Task01 {
private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception { try(Channel channel=RabbitMqUtils.getChannel();) { channel.queueDeclare(QUEUE_NAME,false,false,false,null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } } }
|
结果展示
通过程序执行发现生产者总共发送4个消息,消费者1和消费者2分别分得两个消息,并且是按照有序的一个接收一次消息
消息应答
概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
消息应答的方法
Channel.basicAck(用于肯定确认) : RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认):相较于 Channel.basicNack(用于否定确认) 少了一个参数,表示不处理该消息直接拒绝,可以将其丢弃了。
Multiple
(Multiple)手动应答的好处是可以批量应答并且减少网络拥堵
true :代表批量应答 Channel 上未应答的消息,比如说 Channel 上有传输 tag 的消息 [5,6.7.8] 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
false : 同上面的相比,只会应答 tag = 8 的消息,[5,6,7] 这三个消息依然不会被确认收到消息应答
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
消息手动应答代码
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, false,false, false, null); Scanner sc = new Scanner(System.in); System.out.println("请输入信息");
while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } }
|
消费者 01
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class Work03 { private static final String ACK_QUEUE_NAME="ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message= new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; boolean autoAck=false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }); } }
|
消费者 02
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class Work04 { private static final String ACK_QUEUE_NAME="ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2等待接收消息处理时间较长"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message= new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; boolean autoAck=false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }); } }
|
睡眠工具类
1 2 3 4 5 6 7 8 9 10
| public class SleepUtils { public static void sleep(int second){
try { Thread.sleep(1000*second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
|
手动应答效果演示
正常情况下消息发送方发送两个消息C1和C2分别接收到消息并进行处理
在发送者发送消息dd,发出消息之后的把C2消费者停掉,按理说该C2来处理该消息,但是由于它处理时间较长,在还未处理完,也就是说C2还没有执行ack代码的时候,C2被停掉了,此时会看到消息被C1接收到了,说明消息dd被重新入队,然后分配给能处理消息的C1处理了