引言

    最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供自己和大家一块儿学习!

1.先来介绍RabbitMQ中的成员

  • Producer(生产者): 将消息发送到Exchange
  • Exchange(交换器):将从生产者接收到的消息路由到Queue
  • Queue(队列):存放供消费者消费的消息
  • BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue)
  • RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
  • Consumer(消费者):从Queue中获取消息

下面是各个成员的作用图解

 

 

 

 

 

 

引入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

 

2.先来介绍Exchange

这里将着重于介绍Exchange和Queue的各个参数解释

先来看看Exchange中都有哪些属性

  • exchange:名称
  • type:类型
  • durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
  • autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除
  • internal:是否内置的,如果为true,只能通过Exchange到Exchange
  • arguments:结构化参数

 

 

 下面这个类用于创建一个与RabbitMQ的Connection(连接),该Connection用于创建Channel(信道),Channel是消息读写的通道,也就是我们的操作都会在Channel的基础之上进行

 

 

 2.1先使用最简单的参数构建Exchange
exchangeDeclare(String exchange, String type)

 

 

 进入RabbitMQ可视化界面可以看到,RabbitMQ已经为我们创建了exchange.0,类型为direct

 

 

具体释意

  name                      名称
  type                        类型
  Features                 特征
  Message rate in       消息速率输入
  Message rate out     消息速率输出

2.2接下来是三个参数,也就是加上了是否持久化,同时保留先前两个参数的exchange.0,之前我们已经创建了exchange.0,那么我们再创建一次会怎样

  exchangeDeclare(String exchange, String type, boolean durable)

 

运行成功,并没有报错,因为只要你设置的的设置是一样的,那么就不会报错,如果设置的不一样,那么就会报错,后面会进行验证

这里我们发现exchange.2多了一个D标识,这个D是durable也就是持久化,而exchange.0没有持久化,也就是默认非持久化

 

 

 接下来验证这个持久化有什么作用
关闭rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
重新进入可视化界面,Exchange就只剩下持久化的了

 

 

 2.3接下来是五个参数的

多了两个参数,autoDelete和arguments
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)

下面创建了两个Exchange
exchange.3自动删除为false
exchange.4自动删除为true
由于这里是没有绑定Queue的,那么exchange.4将在创建后就被删除掉?

 

 

 执行上面的代码

 

exchange.4还活的好好的,这是因为我们必须在绑定Queue之后再失去绑定才会被删除,否则为什么不直接抛异常,接下来进行验证
下面直接通过可视化工具创建一个名称为queue.4的Queue

 

 

 

 

 英文释义

Name         名称
Features     特征
Status        状态
Ready        是否准备好
Unacked     未确认
Total           总计
incoming     进来的
deliver        传送
get             得到
ack             确认

2.5讲解完Exchange的参数,再来看Queue的参数,就会发现只有一个exclusive未讲
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments

exclusive:是否排他,如果未true,则只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除

在执行完下面代码,查看可视化界面,发现queue中并没有exclusive.queue,因为在connection关闭后,该queue也会自动删除

创建实例

package com.tiandy.illegal.util.mq;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.*;
import com.tiandy.illegal.bo.CLS_ManageService;
import com.tiandy.illegal.bo.CLS_ManageServiceImpl;
import com.tiandy.illegal.util.CLS_ILLEGAL_Error;
import com.tiandy.illegal.vo.CLS_VO_Message;
import com.tiandy.illegal.vo.CLS_VO_Record;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;

public class RabbitMQSend {
    //rabbitmq连接
    public static Connection connection = null;
    //rabbitmq通道
    public static Channel channel = null;
    //连接状态标识
    public static boolean connectStatus = false;
    // 配置
    static ResourceBundle resourceBundle = ResourceBundle.getBundle("mq/artemisConfig");
    // 交换机  exchangeTemp
    private static String rabbitmq_exchange = resourceBundle.getString("rabbitmq_exchange");
    // 队列名  queue_vbs_vehicle_record
    private static String rabbitmq_queue = resourceBundle.getString("rabbitmq_queue");
    // service
    CLS_ManageService cls_manageService = new CLS_ManageServiceImpl();
    static ConnectionFactory factory = null;

    public void initialize() {
        try {
            //连接工厂
            if (null == factory) {
                factory = new ConnectionFactory();
                factory= RabbitMQUtil.getRabbitMQConnectionFactory();
                // 关闭通道与连接
                closeConnection();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // 声明交换机
                // channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.DIRECT ,true);
                connectStatus = true;

            }
        } catch (Exception e) {
            connectStatus = false;
            e.printStackTrace();
           // log.error("RabbitMQSend method initialize:" + e.getMessage(), e);
        }
    }

    //关闭连接
    public void closeConnection() {
        try {
            if (channel != null) {
                if (channel.isOpen()) {
                    channel.close();
                    channel = null;
                }
            }
        } catch (Exception e) {
            //log.error("RabbitMQSend closeChannel  error  " + e);
            e.printStackTrace();
        }
        try {
            if (connection != null) {
                if (connection.isOpen()) {
                    connection.close();
                    connection = null;
                }
            }
        } catch (Exception e) {
           // log.error("RabbitMQSend closeConnection  error  " + e);
            e.printStackTrace();
        }
    }
/**
     * 监听消息队列,获取数据
     */
    public void queueDeclareExchange() {
        //声明交换机
        try {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-max-length", 100000); // 设置最大存储消息数
            // 声明交换机  (交换机参数)
            channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.FANOUT, true);
            // 消息持久化  (队列参数)
            channel.queueDeclare(rabbitmq_queue, true, false, false, args);
            // 交换机与队列绑定
            channel.queueBind(rabbitmq_queue, rabbitmq_exchange, "");
            // 消费者限制
            //channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                int inRecord=0; // 插入记录数量
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //接收到的消息
                    String msg = new String(body, "UTF-8");
                    // 判断数据是否允许接入
                    int check = checkMessage(msg);
                    if (check == CLS_ILLEGAL_Error.ERROR_OK) {
                        // 消息转换至VO
                        CLS_VO_Message msgVo = cls_manageService.getMessageVo(msg);
                        // 判断数据,分开处理白车牌数据与其他数据,每次新增一条
                        int count = cls_manageService.decideData(msgVo);
                        if(count>0){
                            inRecord+=count;
                            System.out.println("  已消费消息:"+envelope.getDeliveryTag()+"  插入记录数:" + inRecord);
                        }
                    }
                    // 单条消息确认(第几条,是否多条)
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            // 设置消息手动确认 (队列名,是否自动确认,consumer)
            channel.basicConsume(rabbitmq_queue, false, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * 方法说明:监测接收信息
     *
     * @param message
     * @return @修改人及日期: @修改描述: @其他:
     */
    public int checkMessage(String message) {
        // TODO 监测数据格式及是否允许接入
        int check = 0;
        CLS_VO_Message vo_Message = null;
        try {
            vo_Message = JSONObject.parseObject(message, CLS_VO_Message.class);
        } catch (Exception e) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        if (vo_Message.getStorage_id() == null || "".equals(vo_Message.getStorage_id())) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        if (vo_Message.getCap_pic() == null || vo_Message.getCap_pic().size() == 0) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        if (vo_Message.getTotal_info() == null) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        CLS_VO_Record total_info = vo_Message.getTotal_info();
        if (total_info.getTollgateID() == null || "".equals(total_info.getTollgateID())) {
            return CLS_ILLEGAL_Error.ERROR_PARAM;
        }
        return check;
    }

}

 至此,简单的参数讲解和应用就总结完了!


    

  
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/zhaosq/p/13719585.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!