当前位置: 首页 > news >正文

RabbitMQ 工作模式

        RabbitMQ 的工作模式共有七种,分别为简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式、PRPC 通信、发布确认模式,下面一一进行介绍。

1. Simple(简单模式)

只有一个生产者和消费者,所以也称为 “点对点” 模式,如下图所示:

生产者发送消息到达交换机后,由交换机中的队列发送给消费者;

生产者代码如下:

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("139.199.198.28"); //ip 地址connectionFactory.setPort(5672); //端口号connectionFactory.setUsername("admin"); //用户名connectionFactory.setPassword("admin"); //密码connectionFactory.setVirtualHost("/"); //虚拟主机Connection connection = connectionFactory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换机,此处使用内置交换机//声明队列/*** queueDeclare(* String queue,  队列名称* boolean durable,  可持久化* boolean exclusive,  独占* boolean autoDelete,  自动删除* Map<String, Object> arguments 参数* )*/channel.queueDeclare("hello", true, false, false, null);//发送消息/*** basicPublish(* String exchange, 交换机名称* String routingKey,  与要使用的队列名保持一直* BasicProperties props,  属性配置* byte[] body 消息内容* )*/String messageInfo = "hello mq";channel.basicPublish("", "hello", null, messageInfo.getBytes(StandardCharsets.UTF_8));//资源释放//先释放 channel ,后释放 connection,顺序反了会报错channel.close();connection.close(); // connection 关闭后,包含的 channel 也就关闭了}
}

1、首先我们根据自己的服务器 ip 地址和端口号、RabbitMQ 的用户名和密码建立连接;

2、建立连接后,需要开启信道,用于消息的发送;

3、再声明交换机和队列;

4、发送消息;

5、关闭资源,这里需要注意 channel 和 connection 关闭的顺序,可以先关闭 channel,再关闭 connection;若先关闭了 connection,就不需要再关闭 channel,因为 channel 包含于 connection,若 connection关闭了,那么 channel 也就不存在了。

消费者代码如下:

package simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("139.199.198.28"); //ip 地址connectionFactory.setPort(5672); //端口号connectionFactory.setUsername("admin"); //用户名connectionFactory.setPassword("admin"); //密码connectionFactory.setVirtualHost("/"); //虚拟主机Connection connection = connectionFactory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换机,此处使用内置交换机//声明队列(若生产者已经声明,可以省略)//在消费者启动时,若队列不存在,就会报错/*** queueDeclare(* String queue,  队列名称* boolean durable,  可持久化* boolean exclusive,  独占* boolean autoDelete,  自动删除* Map<String, Object> arguments 参数* )*/channel.queueDeclare("hello", true, false, false, null);//消费消息/*** basicConsume(* String queue, 队列名* boolean autoAck,  自动确认* Consumer callback //收到消息后,执行的逻辑* )*/DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//可能还没有来得及打印,资源就已经释放了String messageInfo = new String(body);System.out.println(messageInfo);}};channel.basicConsume("hello", true, consumer);Thread.sleep(1000);//资源释放//先释放 channel ,后释放 connectionchannel.close();connection.close(); // connection 关闭后,包含的 channel 也就关闭了}
}

准备工作与生产者一致,消费者消费消息时需要自定义流程,即定义 DefaultConsumer 对象,重写的方法就代表对接收到的消息的操作。

为了保证将消息消耗后才释放资源,于是让线程休眠一段时间,结束后才释放资源。

消费者代码运行结果如下:

2. Work Queue(工作队列模式)

在该模式下,由一个生产者,多个消费者,这些消费者共同消费消息,且消费消息的总和为生产者发送消息的数量。消费者之间消费的消息不重复。如下图所示:

生产者代码如下:

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机,使用内置交换机//声明队列channel.queueDeclare(Constants.QUEUE, true, false, false, null);//发送消息for (int i = 0; i < 10; i++) {String message = "hello" + i;channel.basicPublish("", Constants.QUEUE, null, message.getBytes(StandardCharsets.UTF_8));}//关闭连接channel.close();connection.close();}
}

 与简单模式下的生产者代码相同。

消费者代码如下(创建了两个消费者):

消费者1:

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机,使用内置交换机//声明队列channel.queueDeclare(Constants.QUEUE, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.QUEUE, true, consumer);}
}

消费者2:

public class ConsumerDemo2 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机,使用内置交换机//声明队列channel.queueDeclare(Constants.QUEUE, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.QUEUE, true, consumer);}
}

生产者发送了10条数据,之后被两个消费者消耗,在进行测试时,我们需要先将消费者的代码运行起来,然后再运行生产者代码,若先运行了生产者代码,那么者10条消息就已经存放到了队列中,当运行第一个消费者代码时,就会将这10条消息全部消耗,当启动第二个消费者时,此时队列中就已经没有消息了。

消费者代码运行如下:

3. Publish/Subscribe(发布订阅模式)

如下图所示:

生产者将消息发送给交换机后,会将消息全部发送给与之绑定的所有的队列,这些队列的消息与生产者的一致。

生产者代码如下:

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机//durable true:交换机在服务器重启后依然存在channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//将交换机与队列进行绑定//发布、订阅模式,需要将交换机中的信息发送至所有与之绑定的队列,不需要 routingKeychannel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");//发布消息for (int i = 0; i < 10; i++) {String message = "hello" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, message.getBytes(StandardCharsets.UTF_8));}//关闭连接channel.close();connection.close();}
}

当声明完交换机与队列后,需要将交换机与队列进行绑定。

消费者代码如下:

消费者1:

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}

消费者2:

public class ConsumerDemo2 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}

消费者代码运行结果如下:

 

从上面的结果可以看出,两个消费者都消耗了10条数据。

 4. RoutingKey(路由模式)

如图所示:

载路由模式中,交换机通过 BindingKey 与队列进行绑定, 当我们发送消息时,指定的交换机为 direct,BindingKey 为 black,那么就会将消息发送到 Q2 队列中。

生产者代码如下:

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);/*** 绑定交换机与队列*/channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");/*** 发送消息*/String message1 = "routingKey is a";channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, message1.getBytes(StandardCharsets.UTF_8));String message2 = "routingKey is b";channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, message2.getBytes(StandardCharsets.UTF_8));String message3 = "routingKey is c";channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, message3.getBytes(StandardCharsets.UTF_8));/*** 关闭连接*/channel.close();connection.close();}
}

在这里,将 Queue1 的 BindingKey 设置为 a,将 Queue2 的 BindingKey 设置为 a、b、c,那么 message1 就会发送到 Queue1 和 Queue2 中,message2 和 message3 只会发送到 Queue2 中。

消费者代码如下:

消费者1:

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

消费者2:

public class ConsumerDemo2 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);}
}

代码运行结果如下:

 

5. Topics(通配符模式) 

通配符模式与路由模式相似,都是交换机与队列进行绑定,但使用的不仅仅是 BindingKey,再 BindingKey 中添加了通配符。

这里的通配符有两种:

*    匹配一个单词

#   匹配多个单词

如 *.a.* 可以匹配 c.a.f;a.# 可以匹配 a.bbb 等。

生产者代码如下:

public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.TOPICS_EXCHANGE, BuiltinExchangeType.TOPIC, true);//声明队列channel.queueDeclare(Constants.TOPICS_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPICS_QUEUE2, true, false, false, null);//绑定交换机与队列channel.queueBind(Constants.TOPICS_QUEUE1, Constants.TOPICS_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPICS_QUEUE2, Constants.TOPICS_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPICS_QUEUE2, Constants.TOPICS_EXCHANGE, "c.#");//发送消息String message1 = "routingKey is fff.a.bbbb";channel.basicPublish(Constants.TOPICS_EXCHANGE, "fff.a.bbbb", null, message1.getBytes(StandardCharsets.UTF_8));String message2 = "routingKey is fff.a.b";channel.basicPublish(Constants.TOPICS_EXCHANGE, "fff.a.b", null, message2.getBytes(StandardCharsets.UTF_8));String message3 = "routingKey is c.aaa";channel.basicPublish(Constants.TOPICS_EXCHANGE, "c.aaa", null, message3.getBytes(StandardCharsets.UTF_8));/*** 关闭连接*/channel.close();connection.close();}
}

通过上面的通配符可以看出,message1 会发送到 Queue1 中,message2 会发送到 Queue1 和 Queue2 中,message3 会发送到 Queue3 中。

消费者代码如下:

消费者1:

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.TOPICS_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE1, true, consumer);}
}

消费者2:

public class ConsumerDemo2 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.TOPICS_QUEUE2, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE2, true, consumer);}
}

代码运行结果如下:

 

6. RPC(RPC 通信) 

如图所示:

RPC 通信是在客户端与服务器之间进行的, 在客户端发起 RPC 通信时,会在发送的请求中包含 reply_to 和 correlation_id,reply_to 指定了服务器在发送响应时,使用的是哪一个队列;correlation_id 代表了这次会话的 id。

服务器给客户端发送响应时,也会包含 correlation_id,客户端拿到消息后,会将自己发送出去 correlation_id 与 接收到的进行比较,若相同,就表示这个响应与客户端发送的请求时匹配的,然后就能进行后续操作,反之则不匹配。

客户端代码如下:

public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();/*** 声明队列*/channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//发送消息String message = "hello rpc";//配置参数String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE) //服务器通过哪个队列给客户端发送消息.build();channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, message.getBytes(StandardCharsets.UTF_8));//接收消息//使用阻塞队列BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String resultMessage = new String(body);System.out.println("接收到的消息: " + resultMessage);//判断客户端的 correlation 与 服务器发送过来的 correlation 是否相同//相同,就表示这条消息是响应if (correlationId.equals(properties.getCorrelationId())) {queue.offer(resultMessage);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);String response = queue.take();System.out.println("客户端接收结果: " + response);}
}

建立连接、开启信道、声明队列操作与之前的都一致。

当客户端在发送消息时,需要配置参数,参数中包含 reply_to 和 correlation_id,并将这个参数连同 message 发送给服务器。

当客户端接收消息时,使用了一个阻塞队列 queue,queue 的容量为1,当客户端没有接收到响应或响应的 correlation_id 与 自己发送出去的不匹配,那么此时 queue 为空,就会阻塞在 queue.take(),只有当客户端接收到响应并且响应的 correlation_id 与 自己所发送的相同,才会将响应入队列,这是 queue 就不会阻塞,整个程序也就结束了。

服务器代码如下:

public class RPCService {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//接收消息channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String resultMessage = new String(body, StandardCharsets.UTF_8);System.out.println("服务器接收请求: " + resultMessage);//返回响应String response = "服务器发送响应";//构造参数AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()) //与客户端发送的 correlation 一致.build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes(StandardCharsets.UTF_8));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}

服务器与客户端的流程正好相反,客户端是先发送请求,在获取响应;服务器则是先接收请求,在发送响应。

代码运行结果如下:

 

在客户端这里打印了两条日志,第一条是接收到了服务器发来的响应打印日志,第二条是经过比较 correlation_id 后判断出这一条响应与自己之前的请求是匹配的而打印的日志。

7. Publisher Confirms(发布确认模式)

RabbitMQ 提供了三种确认方式,分别为单独确认、批量确认、异步确认。

在这种模式下,生产者发送消息后,会等待 broker 的确认,以保证消息被服务器接收和处理。

生产者将 channel 设置为 confirm 模式后,自己发送的每一条消息,都会会的一个唯一的 id,生产者可以根据这些 id 跟踪消息的状态。

当消息被 broker 接收并处理后,会异步地给生产者发送 ack,表明消息已经到达,若失败,就会发送 nack,于是生产者可以根据 broker 返回地是 ack 还是 nack 来判断消息是否被成功处理。

7.1 单独确认

在单独确认中,生产者每发送一条消息,就会等待服务器返回 ack 会 nack,代码如下:

    /*** 单独确认*/private static void publishMessagesIndividually() {try (Connection connection = createConnection()) {//开启信道Channel channel = connection.createChannel();//将信道设置为 confirm 模式channel.confirmSelect();//使用默认的交换机, routingKey 为队列名//声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, false, null);//发布消息long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "Individual confirmation " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, message.getBytes(StandardCharsets.UTF_8));//等待 broker 确认//最长等待时间 5000mschannel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.println("单独确认模式耗时: " + (endTime - startTime) + "ms");} catch (IOException | TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}

使用 waitForConfirmsOrDie 就可以指定生产者对多等待服务器返回 ack 或 nack 的最长等待时间。

7.2 批量确认

在单独确认的基础上,我们设置了一次可以确认的最大个数 batchMessageCount ,当发送消息的个数达到了 batchMessageCount,就会进行批量确认,最后将还没有被确认的消息进行统一确认。代码如下:

/*** 批量确认*/private static void publishMessagesInBatch() {try (Connection connection = createConnection()) {//开启信道Channel channel = connection.createChannel();//将信道设置为 confirm 模式channel.confirmSelect();//使用默认的交换机, routingKey 为队列名//声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, false, null);int batchMessageCount = 100;int sendMessageCount = 0;//发布消息long startTime = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = "Batch Confirmation " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, message.getBytes(StandardCharsets.UTF_8));sendMessageCount++;if (sendMessageCount == batchMessageCount) {channel.waitForConfirmsOrDie(5000);sendMessageCount = 0;}}//还有没有确认的消息if (sendMessageCount > 0) {channel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.println("批量确认模式耗时: " + (endTime - startTime) + "ms");} catch (IOException | TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}

7.3 异步确认

在异步确认中,我们需要实现 ConfirmListener 接口中的 handleAck 和 handleNack,就表示服务器返回了 ack 或 nack 时的操作。 

此外还需要一个 ConcurrentNavigableMap 用来存储 deliveryTag(生产者发送消息的唯一标识)和消息内容。

当服务器返回 ack 时,就将 deliveryTag 从 map 中删除,若返回 nack,就将 deliveryTag 从 map 中删除后进行重新发送。

最后通过 map 中是否为空来判断所有的消息是否被服务器成功消费了。

代码如下:

    /*** 异步确认*/private static void handlePublishConfirmsAsynchronously() {try (Connection connection = createConnection()) {//开启信道Channel channel = connection.createChannel();//声明交换机//设置信道为 confirm 模式channel.confirmSelect();//声明队列channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, false, null);long startTime = System.currentTimeMillis();//存放 deliveryTag 和 message 的集合//key 为 deliveryTag,value 为对应的 message/*** 选择 ConcurrentNavigableMap 而不是 ConcurrentHashMap:* 基于跳表实现,清除所有 <= deliveryTag 的时间复杂度为 o(logn)* 线程安全* 保证了原子性*/ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();channel.addConfirmListener(new ConfirmListener() {/*** 消息成功确认* @param deliveryTag* @param multiple true 表示批量删除;false 表示值删除当前 deliveryTag*/@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//将 deliveryTag 及以下的从 sortedSet 中去除if (multiple) {//批量删除//true 表示包含 deliveryTagmap.headMap(deliveryTag, true).clear();} else {//单独删除map.remove(deliveryTag);}}/*** 消息确认失败* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String message = (String) map.get(deliveryTag);if (multiple) {//批量删除map.headMap(deliveryTag, true //包含 deliveryTag).clear();} else {//单独删除map.remove(deliveryTag);}//消息确认失败,重新发送该消息deliveryTag = channel.getNextPublishSeqNo(); //每次生成的 deliveryTag 都不一样channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));map.put(deliveryTag, message);}});//发送消息int maxUnConfirmed = 100; //未处理消息的阈值for (int i = 0; i < MESSAGE_COUNT; i++) {//将未处理的消息数量降到阈值以下while (map.size() >= maxUnConfirmed) {Thread.sleep(10);}long deliveryTag = channel.getNextPublishSeqNo(); //每次生成的 deliveryTag 都不一样String message = "Asynchronous Confirmation " + i;channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, message.getBytes(StandardCharsets.UTF_8));map.put(deliveryTag, message);}//保证所有消息都已经确认while (!map.isEmpty()) {Thread.sleep(10);}long endTime = System.currentTimeMillis();System.out.println("异步确认模式耗时: " + (endTime - startTime) + "ms");} catch (IOException | TimeoutException | InterruptedException e) {throw new RuntimeException(e);}}

在上面三种确认模式下,异步确认的时间消耗是最少的。

http://www.lqws.cn/news/563941.html

相关文章:

  • 海量数据存储与分析:HBase、ClickHouse、Doris三款数据库对比
  • 用celery作为信息中间件
  • AlpineLinux安装部署MariaDB
  • 如何撰写有价值的项目复盘报告
  • 将iso镜像文件格式转换为云平台支持的镜像文件格式
  • lv_font_conv转换自定义symbol
  • 志愿填报深度解析与专业导向推荐-AI生成
  • SATA信号基础介绍
  • python基础23(2025.6.29)分布式爬虫(增量式爬虫去重)redis应用_(未完成!)
  • DOP数据开放平台(真实线上项目)
  • c++ 学习(二、结构体)
  • 非阻塞 IO
  • 卸载Modelsim/Qustasim方法
  • matplotlib 绘制水平柱状图
  • 买卖股票的最佳时机 II
  • 开源3D 动态银河系特效:Vue 与 THREE.JS 的奇幻之旅
  • 【面板数据】上市公司企业代理成本数据(四项代理成本) 2000-2024年
  • 设备树引入
  • kubectl exec 原理
  • Python 数据分析:numpy,抽提,整数数组索引。听故事学知识点怎么这么容易?
  • AD22以上的基础操作
  • 基于WOA鲸鱼优化算法的圆柱体容器最大体积优化设计matlab仿真
  • 星际争霸数据集指南
  • 数据结构与算法总概
  • Rust代码规范之蛇形命名法和驼峰命名法
  • AUTOSAR图解==>AUTOSAR_AP_EXP_SOVD
  • 关于ubuntu 20.04系统安装分区和重复登录无法加载桌面的问题解决
  • 力扣 刷题(第七十一天)
  • 可观测性的哲学
  • 学习使用dotnet-dump工具分析.net内存转储文件(2)