一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)

  • 首先RocketMQ是阿里巴巴自研出来的,也已开源。其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外); 

  • 在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;这,就能说明RocketMQ的强大。


二.RocketMQ的特点和优势(可跳过看三的整合代码实例

  • 削峰填谷(主要解决诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,海量消息堆积能力强)

  • 异步解耦(高可用松耦合架构设计,对高依赖的项目之间进行解耦,当下游系统出现宕机,不会影响上游系统的正常运行,或者雪崩)  

  • 顺序消息(顺序消息即保证消息的先进先出,比如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等)

  • 分布式事务消息(确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性,减少系统间的交互)

 

三.SpringBoot 整合RocketMQ(商业云端版)

  • 首先去阿里云控制台创建所需消息队列资源,包括消息队列 RocketMQ 的实例、Topic、Group ID (GID),以及鉴权需要的 AccessKey(AK)。

  • 在springboot项目pom.xml添加需要的依赖 ons-client

    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>ons-client</artifactId>
        <version>1.8.5.Final</version>
    </dependency>  


  • 在对应环境的application-xx.properties文件配置参

  • spring:
      application:
        name: @pom.artifactId@
    ##-------鉴权需要的 AccessKey(AK)(实际项目,这里填写阿里云自己的账号信息)---
    ali:
      mq:
        # 实例key
        accessKey: xxAxxxxxxxxxx
        # 实例密钥
        secretKey: xxxxxxxxxiHxxxxxxxxxxxxxx
        # 实例TCP 协议公网接入地址(实际项目,填写自己阿里云MQ的公网地址)
        nameSrvAddr: http://MQ_INST_***********85_BbM********************yuncs.com:80
        # 消费者组执行者线程数(根据业务指定)
        consumeThreadNums: 20
        # 消息topic(实际项目,填写自己阿里云MQ中的topic名称和groupid)
        topic: topic-dev
        # 服务分组(根据自身业务配置)
        groupId: GID_MESSAGE_TEST
        # 默认tag
        tag: *
        # 定时/延时消息
        timeTopic: time-lapse
        timeGroupId: GID-message
        timeTag: *
        # 业务1tag
        businessOneTag: businessOneTag
        # 业务2tag
        businessTwoTag: businessTwoTag
        # 业务3tag
        businessThreeTag: businessThreeTag


  • 封装MQ配置类:RocketMqProperties

  • package com.xh.demo.config.properties;
    
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Properties;
    
    /**
     * @author wen
     * @apiNote MQ配置加载
     * @since 2023/1/10
     */
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "ali.mq")
    public class RocketMqProperties {
    
        /**
         * 消息队列公钥
         */
        private String accessKey;
    
        /**
         * 消息队列密钥
         */
        private String secretKey;
    
        /**
         * 消息队列云地址
         */
        private String nameSrvAddr;
    
        /**
         * 消费线程数
         */
        private String consumeThreadNums;
    
        /**
         * 消息主题
         */
        private String topic;
    
        /**
         * 消息分组
         */
        private String groupId;
    
        private String tag;
    
        private String timeTopic;
    
        private String timeGroupId;
    
        private String timeTag;
    
        private String businessOneTag;
    
        private String businessTwoTag;
    
        private String businessThreeTag;
    
    
        /**
         * 获取消息队列基本配置信息
         */
        public Properties getMqProperties() {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
            properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
            properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
            // 设置发送超时时间,单位毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "5000");
            return properties;
        }
    
    }


  • 给消息生产者注入配置信息,ProducerBean用于将Producer集成至Spring Bean中

package com.xh.demo.config;

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.xh.demo.config.properties.RocketMqProperties;
import com.xh.demo.utils.MqProducerUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author wen
 * @apiNote MQ配置注入生成消息实例
 * @since 2023/1/10
 */
@Configuration
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class RocketMqProducerConfig {

    private final RocketMqProperties rocketMqProperties;

    public static final int CORE_POOL_SIZE = 1;
    public static final int MAXIMUM_POOL_SIZE = 8;
    public static final long KEEP_ALIVE_SECONDS = 30;

    /**
     * MQ生产者异步回调线程池
     */
    @Bean(name = "producerCallbackExecutor")
    public ThreadPoolExecutor producerCallbackExecutor() {
        return new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAXIMUM_POOL_SIZE,
                KEEP_ALIVE_SECONDS,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(MAXIMUM_POOL_SIZE),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producerBean = new ProducerBean();
        producerBean.setProperties(rocketMqProperties.getMqProperties());
        producerBean.setCallbackExecutor(producerCallbackExecutor());
        MqProducerUtil.init(rocketMqProperties, producerBean);
        return producerBean;
    }

}


  • MQ消息发送类型

package com.xh.demo.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author wen
 * @apiNote MqSendTypeEnum
 * @since 2023-01-11
 */
@Getter
@AllArgsConstructor
public enum MqSendTypeEnum {

    /**
     * 单向消息
     */
    ONE_WAY("单向消息"),

    /**
     * 同步消息
     */
    SYNC("同步消息"),

    /**
     * 异步消息
     */
    ASYNC("异步消息"),

    ;

    private final String desc;

}


  • 为了方便使用,封装了一个发送消息的工具类

package com.xh.demo.utils;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import com.xh.demo.config.properties.RocketMqProperties;
import com.xh.demo.enums.MqSendTypeEnum;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * @author wen
 * @apiNote MQ发送消息生产者工具
 * @since 2023/1/11
 */
@Slf4j
public class MqProducerUtil {

    private static RocketMqProperties ROCKET_MQ_PROPERTIES;

    private static ProducerBean PRODUCER_BEAN;

    public static void init(RocketMqProperties rocketMqProperties, ProducerBean producerBean) {
        MqProducerUtil.ROCKET_MQ_PROPERTIES = rocketMqProperties;
        MqProducerUtil.PRODUCER_BEAN = producerBean;
        log.info("【MQ发送消息生产者工具-init-初始化成功】");
    }

    /**
     * 异步发送普通消息
     *
     * @param tag        标签 默认 *
     * @param messageKey 消息key 不设置也不会影响消息正常收发
     * @param body       消息内容
     */
    public void sendAsyncMsg(String tag, String messageKey, String body) {
        sendAsyncMsg(tag, messageKey, body, null);
    }

    /**
     * 异步发送普通消息
     *
     * @param tag        标签 默认 *
     * @param messageKey 消息key 不设置也不会影响消息正常收发
     * @param body       消息内容
     * @param delayTime  消息接收时间(Unix 时间戳)
     */
    public void sendAsyncMsg(String tag, String messageKey, String body, Long delayTime) {
        send(tag, messageKey, body, MqSendTypeEnum.ASYNC, delayTime);
    }

    /**
     * 发送单向消息
     *
     * @param tag        标签 默认 *
     * @param messageKey 消息key 不设置也不会影响消息正常收发
     * @param body       消息内容
     */
    public void sendOneWayMsg(String tag, String messageKey, String body) {
        sendOneWayMsg(tag, messageKey, body, null);
    }

    /**
     * 发送单向消息
     *
     * @param tag        标签 默认 *
     * @param messageKey 消息key 不设置也不会影响消息正常收发
     * @param body       消息内容
     * @param delayTime  消息接收时间(Unix 时间戳)
     */
    public void sendOneWayMsg(String tag, String messageKey, String body, Long delayTime) {
        send(tag, messageKey, body, MqSendTypeEnum.ONE_WAY, delayTime);
    }

    /**
     * 同步发送消息
     *
     * @param tag        标签,可用于消息小分类标注
     * @param body       消息body内容,生产者自定义内容
     * @param messageKey 消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return SendResult 发送结果
     */
    public SendResult sendMsg(String tag, String messageKey, String body) {
        return sendMsg(tag, messageKey, body, null);
    }

    /**
     * 同步发送消息
     *
     * @param tag        标签,可用于消息小分类标注
     * @param body       消息body内容,生产者自定义内容
     * @param messageKey 消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @param delayTime  消息接收时间(Unix 时间戳)
     * @return SendResult 发送结果
     */
    public SendResult sendMsg(String tag, String messageKey, String body, Long delayTime) {
        return send(tag, messageKey, body, MqSendTypeEnum.SYNC, delayTime);
    }

    /**
     * 发送消息
     *
     * @param tag          标签 默认 *
     * @param messageKey   消息key 不设置也不会影响消息正常收发
     * @param body         消息内容
     * @param sendTypeEnum 发送消息类型 {@link MqSendTypeEnum}
     * @param delayTime    消息接收时间(Unix 时间戳)
     * @return SendResult 发送结果
     */
    public SendResult send(String tag, String messageKey, String body, MqSendTypeEnum sendTypeEnum, Long delayTime) {
        log.info("[发送MQ消息-send] tag:{}, messageKey:{}, body:{}, isOneWay:{}, delayTime:{}",
                tag, messageKey, body, sendTypeEnum.name(), delayTime);
        Message msg = new Message(
                // Message所属的Topic
                ROCKET_MQ_PROPERTIES.getTopic(),
                // Message Tag 对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
                StringUtils.isEmpty(tag) ? "*" : tag,
                // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
                // 需要Producer与Consumer协商好一致的序列化和反序列化方式
                body.getBytes(StandardCharsets.UTF_8));
        // 设置代表消息的业务关键属性,请尽可能全局唯一
        // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
        // 注意:不设置也不会影响消息正常收发
        msg.setKey(messageKey);
        // 发送消息,只要不抛异常就是成功
        try {
            if (delayTime != null) {
                msg.setStartDeliverTime(delayTime);
            }
            if (MqSendTypeEnum.ONE_WAY == sendTypeEnum) {
                PRODUCER_BEAN.sendOneway(msg);
                log.info("[消息队列] 消息发送成功-send 单向消息 单向消息MsgId不返回");
                return null;
            } else if (MqSendTypeEnum.SYNC == sendTypeEnum) {
                SendResult sendResult = PRODUCER_BEAN.send(msg);
                assert sendResult != null;
                log.info("[消息队列] 消息发送成功-send 同步消息 SendResult:{}", sendResult);
                return sendResult;
            } else {
                PRODUCER_BEAN.sendAsync(msg, new SendCallback() {
                    @Override
                    public void onSuccess(final SendResult sendResult) {
                        assert sendResult != null;
                        log.info("[消息队列] 消息发送成功-send 异步消息回调 SendResult:{}", sendResult);
                    }

                    // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
                    @Override
                    public void onException(final OnExceptionContext context) {
                        log.error("[消息队列] 消息发送失败-send 异步消息回调:{}", context.getException().getMessage());
                    }
                });
                return null;
            }
        } catch (ONSClientException e) {
            log.warn("[消息队列] 消息发送失败 ONSClientException", e);
            // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
            throw new IllegalArgumentException("消息队列发送异常");
        } catch (Exception e) {
            log.warn("[消息队列] 消息发送失败 Exception", e);
            throw new IllegalArgumentException("消息队列发送异常");
        }
    }

}


  • 接下来是消息消费者的配置和接收消息(一般在下游系统或者相关联的系统),接收消息的项目照旧,添加依赖jar包 ons-client v1.8.0.Final 、配置mq参数链接

  • 注入配置、订阅消息、添加消息处理的方法

package com.xh.demo.config;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.xh.demo.config.properties.RocketMqProperties;
import com.xh.demo.mq.MqMessageListener;
import com.xh.demo.mq.MqTimeMessageListener;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author wen
 * @apiNote RocketMqConsumerConfig
 * @since 2023/1/10
 */
@Configuration
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class RocketMqConsumerConfig {

    /**
     * rocketMqProperties
     */
    private final RocketMqProperties rocketMqProperties;

    /**
     * 普通消息监听器,Consumer注册消息监听器来订阅消息.
     */
    private final MqMessageListener messageListener;

    /**
     * 定时消息监听器,Consumer注册消息监听器来订阅消息.
     */
    private final MqTimeMessageListener timeMessageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        // 配置文件
        Properties properties = rocketMqProperties.getMqProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, rocketMqProperties.getGroupId());
        // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 消费者线程数
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, rocketMqProperties.getConsumeThreadNums());
        consumerBean.setProperties(properties);
        // 订阅消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        // 订阅普通消息
        Subscription subscription = new Subscription();
        subscription.setTopic(rocketMqProperties.getTopic());
        subscription.setExpression(rocketMqProperties.getTag());
        subscriptionTable.put(subscription, messageListener);
        // 订阅定时/延时消息
        Subscription subscriptionTime = new Subscription();
        subscriptionTime.setTopic(rocketMqProperties.getTimeTopic());
        subscriptionTime.setExpression(rocketMqProperties.getTimeTag());
        subscriptionTable.put(subscriptionTime, timeMessageListener);
        // 订阅多个topic如上面设置
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

}



  • 对消息监听类进行实现,处理接收到的消息

package com.xh.demo.mq;

import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.xh.demo.config.properties.RocketMqProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

/**
 * @author wen
 * @apiNote MQ消息监听消费
 * @since 2023/1/10
 */
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class MqMessageListener implements MessageListener, InitializingBean {

    private final RocketMqProperties rocketMqProperties;

    private final static Map<String, Consumer<Message>> CONSUMER_MAP = new HashMap<>();

    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("[消息队列] receivedTime:{}, topic:{}, tag:{}, messageKey:{}, body:{}", message.getTopic(),
                DateUtil.format(LocalDateTime.now(), DatePattern.NORM_DATETIME_PATTERN), message.getTag(), message.getKey(), body);
        try {
            Consumer<Message> messageConsumer = CONSUMER_MAP.get(message.getTag());
            if (Objects.isNull(messageConsumer)) {
                log.warn("[消息队列] 消息消费-consume-未找到对应消费者执行函数: topic:{}, tag:{}, messageKey:{}, body:{}",
                        message.getTopic(), message.getTag(), message.getKey(), body);
            } else {
                messageConsumer.accept(message);
            }
            return Action.CommitMessage;
        } catch (Exception e) {
            log.warn("[消息队列] 消息消费异常:topic:{}, tag:{}, messageKey:{}, body:{}", message.getTopic(), message.getTag(), message.getKey(), body, e);
            return Action.ReconsumeLater;
        }

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 消费者业务1
        CONSUMER_MAP.put(rocketMqProperties.getBusinessOneTag(), message -> {
            // TODO do something
        });
        // 消费者业务2
        CONSUMER_MAP.put(rocketMqProperties.getBusinessTwoTag(), message -> {
            // TODO do something
        });
        // 消费者业务3
        CONSUMER_MAP.put(rocketMqProperties.getBusinessThreeTag(), message -> {
            // TODO do something
        });
    }

}