docker启动rabbitmq以及使用方式详解

搜索rabbitmq镜像

docker search rabbitmq:management

下载镜像

docker pull rabbitmq:management

启动容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

打印容器

docker logs rabbitmq

访问RabbitMQ Management

http://localhost:15672

账户密码默认:guest

编写生产者类

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] args) throws IOException, TimeoutException {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setUsername("guest");
 factory.setPassword("guest");
 factory.setHost("localhost");
 factory.setPort(5672);
 factory.setVirtualHost("/");

 Connection connection = factory.newConnection();

 Channel channel = connection.createChannel();
 /**
 * 生成一个queue队列
 * 1、队列名称 QUEUE_NAME
 * 2、队列里面的消息是否持久化(默认消息存储在内存中)
 * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
 * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
 * 5、其他参数
 */
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 String message = "Hello world!";
 /**
 * 发送一个消息
 * 1、发送到哪个exchange交换机
 * 2、路由的key
 * 3、其他的参数信息
 * 4、消息体
 */
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println(" [x] Sent '"+message+"'");

 channel.close();
 connection.close();
 }
}

运行该方法,可以看到控制台的打印

name=hello的队列收到Message

消费者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
 private final static String QUEUE_NAME = "hello";
 public static void main(String[] args) throws IOException, TimeoutException {
 ConnectionFactory factory = new ConnectionFactory();
 factory.setUsername("guest");
 factory.setPassword("guest");
 factory.setHost("localhost");
 factory.setPort(5672);
 factory.setVirtualHost("/");
 factory.setConnectionTimeout(600000);//milliseconds
 factory.setRequestedHeartbeat(60);//seconds
 factory.setHandshakeTimeout(6000);//milliseconds
 factory.setRequestedChannelMax(5);
 factory.setNetworkRecoveryInterval(500);

 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();

 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 System.out.println("Waiting for messages. ");

 Consumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
 String message = new String(body, "UTF-8");
 System.out.println(" [x] Received '" + message + "'");
 }
 };
 channel.basicConsume(QUEUE_NAME,true,consumer);
 }
}

工作队列

RabbitMqUtils工具类

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
 public static Channel getChannel() throws Exception{
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 factory.setUsername("guest");
 factory.setPassword("guest");
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 return channel;
 }
}

启动2个工作线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
 private static final String QUEUE_NAME = "hello";
 public static void main(String[] args) throws Exception{
 Channel channel = RabbitMqUtils.getChannel();
 DeliverCallback deliverCallback = (consumerTag,delivery)->{
 String receivedMessage = new String(delivery.getBody());
 System.out.println("接收消息:"+receivedMessage);
 };
 CancelCallback cancelCallback = (consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 };
 System.out.println("C1 消费者启动等待消费....");
 /**
 * 消费者消费消息
 * 1、消费哪个队列
 * 2、消费成功后是否自动应答
 * 3、消费的接口回调
 * 4、消费未成功的接口回调
 */
 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
 }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
 private static final String QUEUE_NAME = "hello";
 public static void main(String[] args) throws Exception{
 Channel channel = RabbitMqUtils.getChannel();
 DeliverCallback deliverCallback = (consumerTag,delivery)->{
 String receivedMessage = new String(delivery.getBody());
 System.out.println("接收消息:"+receivedMessage);
 };
 CancelCallback cancelCallback = (consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 };
 System.out.println("C2 消费者启动等待消费....");
 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
 }
}

启动工作线程

启动发送线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
 private static final String QUEUE_NAME = "hello";
 public static void main(String[] args) throws Exception{
 try(Channel channel= RabbitMqUtils.getChannel();){
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 //从控制台接收消息
 Scanner scanner = new Scanner(System.in);
 while(scanner.hasNext()){
 String message = scanner.next();
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println("发送消息完成:"+message);
 }
 }
 }
}

启动发送线程,此时发送线程等待键盘输入

发送4个消息

可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。

但消费者在处理message过程中宕机,会导致消息的丢失。

因此需要设置手动应答。

生产者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
 private static final String TASK_QUEUE_NAME = "ack_queue";
 public static void main(String[] args) throws Exception{
 try(Channel channel = RabbitMqUtils.getChannel()){
 channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
 Scanner scanner = new Scanner(System.in);
 System.out.println("请输入信息");
 while(scanner.hasNext()){
 String message = scanner.nextLine();
 channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
 System.out.println("生产者task02发出消息"+ message);
 }
 }
 }
}

消费者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
 private static final String ACK_QUEUE_NAME = "ack_queue";
 public static void main(String[] args) throws Exception{
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("Work03 等待接收消息处理时间较短");
 DeliverCallback deliverCallback = (consumerTag,delivery)->{
 String message = new String(delivery.getBody());
 SleepUtils.sleep(1);
 System.out.println("接收到消息:"+message);
 /**
 * 1、消息的标记tag
 * 2、是否批量应答
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 CancelCallback cancelCallback = (consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 };
 //采用手动应答
 boolean autoAck = false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
 }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
 private static final String ACK_QUEUE_NAME = "ack_queue";
 public static void main(String[] args) throws Exception{
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("Work04 等待接收消息处理时间较长");
 DeliverCallback deliverCallback = (consumerTag,delivery)->{
 String message = new String(delivery.getBody());
 SleepUtils.sleep(30);
 System.out.println("接收到消息:"+message);
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 CancelCallback cancelCallback = (consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 };
 //采用手动应答
 boolean autoAck = false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
 }
}

工具类SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
 public static void sleep(int second){
 try{
 Thread.sleep(1000*second);
 }catch (InterruptedException _ignored){
 Thread.currentThread().interrupt();
 }
 }
}

模拟

work04等待30s后发出ack

在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03

不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。

总结 

作者:Maackia原文地址:https://blog.csdn.net/qq_44402069/article/details/124303944

%s 个评论

要回复文章请先登录注册