消息中间件

简介

所谓的消息中间件,一般都是一些基于内存、并且能保证数据可靠持久化的一些软件,目前,主流的消息中间件有 RocketMQRabbitMQKafka。 在应用解耦的手段中,消息中间件是比较常见的方案,通过减少系统之间的相互依赖,使得应用本身变得简单高效。

使用消息中间件的优势

  • 系统解耦

  • 异步处理

  • 提供系统吞吐量(削峰)

  • 降低分布式系统的复杂度

使用消息中间件的场景举例:

在电商下单的业务场景中,需要依赖的中台服务过多,在传统的同步调用方式下,接口响应时间严重依赖下游系统。

create-order-sync

当使用消息中间件解耦异步处理后,创建订单服务在完成自身的业务并将下单消息推送到 MQ 后便返回客户端,相关服务自行消费创建订单事件。

create-order-async

何时需要使用消息中间件?

  1. 系统需要广播一类事件,让关心事件发生的下游系统能感知变化并处理完自身业务,如:员工离职、用户手机号更新等,可以通过定义标准的 eventMsg 数据模型和各个业务系统解耦。

  2. 接口并发非常大,业务处理比较慢,需要使用“内存类”的组件先将请求数据放入队列,后台再慢慢消费。这就是我们所说的流量削峰。

注意

第二类重点是解决入口流量非常大,需要使用内存型的缓存队列才对接使用 MQ 。 如果仅是为了消费性任务、非主干业务异步处理,推荐使用 hangfire 的后台作业能里。
因为上了 MQ ,系统架构中就多个一个分布式组件。他的可靠性将影响系统可靠性。 而 hangfire 的后台作业仅是一个 client 级别的工具包,他只依赖数据库而不依赖其他任何组件。降低系统复杂度。

幂等性

当我们谈论幂等时,一般是指可以重复处理传递的消息,而不会产生意外的结果。
在基于消息的系统中,存在以下三种可能:

  • Exactly Once (仅有一次,在实际场景中,很难达到。)

  • At Most Once (最多一次)

  • At Least Once (最少一次)

我们一般的系统设计,都采用了最少一次的处理方式。 \

重要

严格的业务幂等性需要业务系统自行完成。消息队列本身并不保证幂等。

消息组件-CAP

CAP 是使用消息中间件实现的一个 EventBus 组件。同时,它还借助本地数据库解决了分布式系统中的事务问题。

CAP 组件的优势:

  • 封装了 RabbitMQKafkaRedis Stream 等开源中间件的调用,降低开发难度,代码集成简单。

  • 基于本地消息表,保证了消息的可靠投递,并配合重试等策略以达到各个服务之间的数据最终一致性。

  • 自带有可视化页面,可方便查看消息消费情况。

CAP 架构:

cap

详细的使用文档请参见: 官方文档

NET CORE 集成 CAP

  1. 添加包引用

    dotnet add package DotNetCore.CAP
    dotnet add package DotNetCore.CAP.Dashboard
    dotnet add package DotNetCore.CAP.RabbitMQ
    dotnet add package DotNetCore.CAP.SqlServer
    
  2. appsettings.json添加配置文件

    {
      "RabbitMQConfig": {
        "HostName": "10.0.0.85",
        "Port": "35672",
        "UserName": "seazen",
        "Password": "seazen"
      }
    }
    

    小技巧

    在集群内,我们使用 k8s 的服务方式配置 rabbitmq 集群访问。

    {
        "RabbitMQConfig":
        {
            "HostName": "rabbitmq-0.rabbitmq-headless,rabbitmq-1.rabbitmq-headless,rabbitmq-2.rabbitmq-headless",
            "Port": "5672",
            "UserName": "seazen",
            "Password": "seazen"
        }
    }
    
  3. 配置 CAP

     // Startup.cs
     public void ConfigureServices(IServiceCollection services)
     {
         services.AddCap(config =>
         {
             // 根据不同环境,在 topic 、 group 上增加前缀,解决多个环境消费相同队列问题。
             var env = RuntimeInfo.Environment.ToLower();
             config.GroupNamePrefix
                 = config.TopicNamePrefix
                 = env;
    
             // 配置数据库持久化存储
             config.UseSqlServer(opts =>
             {
                 opts.ConnectionString = configuration.GetConnectionString("SqlConnection");
             });
             // 配置消息中间件
             config.UseRabbitMQ(opt =>
             {
                 opt.HostName = configuration["RabbitMQConfig:HostName"];
                 opt.Port = Convert.ToInt32(configuration["RabbitMQConfig:Port"]);
                 opt.UserName = configuration["RabbitMQConfig:UserName"];
                 opt.Password = configuration["RabbitMQConfig:Password"];
                 opt.ExchangeName = $"{env}.msg.feishu.router";
             });
             // 配置可视化 dashboard
             config.UseDashboard(opts =>
             {
                 opts.PathBase = string.IsNullOrEmpty(RuntimeInfo.ServiceName)
                 ? ""
                 : $"/{RuntimeInfo.ServiceName}";
             });
    
             //失败重试次数
             config.FailedRetryCount = 3;
         });
         // 注入消息订阅者
         services.AddTransient<FeishuNormalMsgSubscribe>();
     }
    
  4. 增加消费者

    public class FeishuNormalMsgSubscribe : ICapSubscribe
    {
        readonly ILogger<FeishuNormalMsgSubscribe> _logger;
        readonly IFeiShuMessageService _feiShuMessageService;
    
        public FeishuNormalMsgSubscribe(
            ILogger<FeishuNormalMsgSubscribe> logger
            , IFeiShuMessageService feiShuMessageService
        )
        {
            _logger = logger;
            _feiShuMessageService = feiShuMessageService;
        }
    
        // 指定消费队列和分组。同一个分组的多个实例,消息只会被1个处理。
        [CapSubscribe("msg.feishu.normal", Group = "msg.feishu.normal")]
        public void HandleNormalMessage(FeiShuSendingMsgBody msg,[FromCap]CapHeader header)
        {
            var capId = header["cap-msg-id"];
            var sendTime = header["cap-senttime"];
            var customerHeader = header["my.customer.name"];
            try
            {
                // 处理业务逻辑
                _feiShuMessageService.HandleSendMessage(msg);
            }
            catch (System.Exception e)
            {
                _logger.LogError(e, $"capId:{capId},time:{sendTime},consumer error:{e.Message}");
                // 当发生异常,CAP 会自动重试!
                throw;
            }
        }
    }
    

    注意

    消息订阅的方法 HandleNormalMessage 需要是同步方法。

    消费者分组: mq-group

  5. 发送消息

    public class FeiShuMessageService : IFeiShuMessageService
    {
        private readonly ICapPublisher _capPublisher;
    
        public FeiShuMessageService( ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }
    
        private void PublishToMQ(FeiShuSendingMsgBody msg)
        {
            var header = new Dictionary<string, string>()
            {
                ["my.customer.name"] = "seazen",
            };
    
            // 开启本地事务,完成业务数据及 mq 发送。
            using (var connection = new SqlConnection("ConnectionString"))
            {
                using (var transaction = connection.BeginTransaction(_capBus, autoCommit: false))
                {
                    // 业务代码
                    connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);
    
                    _capPublisher.Publish("msg.feishu.normal", msg, header);
                    transaction.Commit();
                }
            }
        }
    }
    

    CAPRabbitmq 中创建的队列

    rabbitmq

  6. Dashboard 查看消息消费 消息体具体内容可以在 Dashboard 消费详情中查看。

    cap-dashboard-1

    cap-dashboard-2

Skywalking 集成 CAP 追踪

错误

在目前架构中,链路追踪我们使用的是 skywalking ,但鉴于目前开源社区活跃度后续我们将逐步迁移到 opentelemetry 技术栈。

按照链路追踪文档,我们使用了环境变量启动 skywalking 的追踪服务。 默认下, skywalking 是不支持 cap 的异步追踪。 我们需要添加单独的 package 来支持 skywalking 追踪 cap 。

  1. 添加包

    # 添加 1.3.0 的 cap 追踪支持 package
    dotnet add package SkyAPM.Diagnostics.CAP --version 1.3.0
    
  2. startup 时启动 skywalking 及 cap

    services.AddCap();
    // 增加 skywalking core 和 cap 追踪
    services.AddSkyAPM(exts =>
    {
        exts.AddAspNetCoreHosting();
        exts.AddCap();
    });
    
  3. 移除 Pod 的 yaml 文件中的 skywalking 启动环境变量 ASPNETCORE_HOSTINGSTARTUPASSEMBLIES

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: order
    spec:
    replicas: 1
    template:
        metadata:
        labels:
            name: order
        spec:
        containers:
            - name: order
            env:
                - name: ASPNETCORE_HOSTINGSTARTUPASSEMBLIES
                value: SkyAPM.Agent.AspNetCore
    

集成后,cap 消费的调用链路如下图:

oap-cap

RabbitMQ 传输层

cap 支持多种中间件作为消息传递的传输者。为了屏蔽各个中间件实现的细节,在发布订阅时通过指定的“名称”,来映射的各个中间件中的不同概念。
目前我们使用 RabbitMQ 作为消息传输中间件, RabbitMQ 内部消息投递和路由过程大致如下:

rabbitmq-router

rabbitmq-router

当我们在程序中如下初始化 cap 时,

config.UseRabbitMQ(opt =>
{
    // ...
    opt.ExchangeName = $"{env}.msg.feishu.router";
});

// 发送
_capPublisher.Publish("msg.feishu.normal", msg);

// 订阅
[CapSubscribe("msg.feishu.normal", Group = "msg.feishu.normal")]
public void Handler(MsgBody msg)
{
    // 业务逻辑
}

所指定的 ExchangeNamenamegroup 等概念的具体映射如下:
opt.ExchangeName -> ExchangeName
Publish(name,msg) 其中的 name -> RoutingKey
[CapSubscribe(name, Group = group)] 其中 name -> RoutingKey, group -> Queue.name

RabbitMQ 的高可用

  1. 集群高可用

    单台机器的 RabbitMQ 容易引发单点故障,在生产环境中,将部署 3 台机器的 RabbitMQ 集群,来保证 RabbitMQ 的吞吐量。 具体的安装文档可参见 官方文档

    大致过程是安装 RabbitMQ 后,复制 .erlang.cookie 到每台机器。然后逐台机器执行 rabbitmqctl join_cluster <node> 来加入集群,详情不再赘述。

    rabbitmq-cluster

  2. 队列高可用

    RabbitMQ 的集群仅会存储一些 exchange 、 queue 的元数据。队列的消息数据,将会存储在创建队列的节点上。 所有,当节点宕机,消息发送将出现问题。为了解决消息不丢失,需要结合 cap + Publisher Confirms 机制或者镜像队列来解决。

    1. 普通队列高可用

      当普通队列所在节点宕机后,因为 exchange 还存在,默认情况下消息发送端 client 并不会收到任何错误。
      所以,默认我们使用 cap 看到的现象是:当 queue 被删除或所在节点宕机, cap 的发送记录中还全部是成功,不会出现在失败队列中。 但 RabbitMQ 因无法路由消息,会将消息丢弃,造成消息丢失问题。

      小技巧

      AMQP 0-9-1

      Publisher confirms is the publisher acknowledgement mechanism.

      There are several common types of publisher errors that are handled using different protocol features:

      Publishing to a non-existent exchange results in a channel error, which closes the channel so that no further publishing (or any other operation) is allowed on it. When a published message cannot be routed to any queue (e.g. because there are no bindings defined for the target exchange), and the publisher set the mandatory message property to false (this is the default), the message is discarded or republished to an alternate exchange, if any. When a published message cannot be routed to any queue, and the publisher set the mandatory message property to true, the message will be returned to it. The publisher must have a returned message handler set up in order to handle the return (e.g. by logging an error or retrying with a different exchange)

      警告

      目前版本的 cap 并未支持 RabbitMQ 提供的 client 确认机制。建议采用镜像队列的方式来解决。
      或,可以通过自定义 cap 的 DotNetCore.CAP.Transport.ITransport 来自己实现。

      public class CRabbitMQTransport : DotNetCore.CAP.Transport.ITransport
      {
          public Task<OperateResult> SendAsync(TransportMessage message)
          {
              IModel? channel = null;
              try
              {
                  channel = _connectionChannelPool.Rent();
      
                  var props = channel.CreateBasicProperties();
      
                  channel.BasicReturn += (sender, args) =>
                  {
                      // warn: Message 'development.test.mq.publish' with internal id '1604920418632937473' has been returned with reason 'NO_ROUTE'
                      _logger.LogWarning("Message '{0}' with internal id '{1}' has been returned with reason '{2}'",
                          message.GetName(), message.GetId(), args.ReplyText);
                  };
      
                  channel.BasicAcks += (sender, args) =>
                  {
                      _logger.LogInformation("Message '{0}' with internal id '{1}' has been ack-ed",
                          message.GetName(), message.GetId());
                  };
      
                  channel.BasicNacks += (sender, args) =>
                  {
                      _logger.LogInformation("Message '{0}' with internal id '{1}' has been nack-ed",
                          message.GetName(), message.GetId());
                  };
      
                  channel.BasicPublish(_exchange, message.GetName(), mandatory:true, props, message.Body);
      
                  // Enable publish confirms
                  if (channel.NextPublishSeqNo > 0)
                  {
                      channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
                  }
      
                  return Task.FromResult(OperateResult.Success);
              }
              catch (Exception ex)
              {
                  // ...
              }
              finally
              {
                  // ...
              }
          }
      }
      
    2. 镜像队列(quorum queue)

      RabbitMQ 集群为了解决队列的高可用问题,增加了镜像队列的支持。
      镜像队列会将队列的消息数据同步到多个节点上,当 master 队列不可用后,会从其他的 Mirrors 节点上选择启动最早的节点变成新的 master 以实现队列的高可用。

      rabbitmq-mirror-queue

      一般,镜像队列的配置,都是由运维同学通过策略配置完成。如下图,集体参数配置不再赘述。

      rabbitmq-add-policy

      注意

      镜像队列会将队列的消息数据同步到多个节点上,所以在效率和吞吐量上会比普通队列低。

典型应用举例

下面我们以人员离职作为典型案例来为大家演示 cap 消息组件的使用。
下图是目前我们一个同事离职后,在关键的单点登录、BPM 等系统的流转过程。 用户的离职信息,依赖于定时任务在固定时间点通过数据库层全量推送来完成。
离职消息通过层层数据推送,当下游系统能感知消息时,已经距消息触发延迟了很久。

hr-1

下面我们改用消息队列来解耦各系统依赖数据库同步传递人员离职数据。流程如下:

hr-2

完整的示例代码如下:

# 项目结构
tree -L 2
├── bpm
│   ├── Controllers
│   ├── Program.cs
│   ├── Startup.cs
│   ├── Subscriber.cs
│   ├── appsettings.Development.json
│   ├── bpm.csproj
├── hr
│   ├── Controllers
│   ├── Program.cs
│   ├── Startup.cs
│   ├── appsettings.Development.json
│   ├── hr.csproj
├── sso
│   ├── Controllers
│   ├── Program.cs
│   ├── Startup.cs
│   ├── Subscriber.cs
│   ├── appsettings.Development.json
│   └── sso.csproj
namespace hr.Controllers
{
    [ApiController]
    [Route("/api/employees")]
    public class EmployeesController : ControllerBase
    {
        private readonly ICapPublisher _capPublisher;
        public EmployeesController(ICapPublisher capPublisher)
        {
            _capPublisher = capPublisher;
        }

        // 业务人员通过此接口完成了在hr系统内的离职操作
        [HttpGet]
        [Route("resign")]
        public async Task<IActionResult> Resign([FromQuery] string account)
        {
            await Task.CompletedTask;
            // 触发人员离职。
            _capPublisher.Publish("seazen.employees.resign", new { account = account });
            return Ok();
        }
    }
}
namespace bpm
{
    public class Subscriber : ICapSubscribe
    {
        public Subscriber(){}

        // 指定消费队列和分组。同一个分组的多个实例,消息只会被1个处理。
        [CapSubscribe("seazen.employees.resign", Group = "seazen.bpm")]
        public void HandleResign(object model,[FromCap]CapHeader header)
        {
            System.Console.WriteLine($"bpm receive");
            System.Console.WriteLine($"msg:{JsonSerializer.Serialize(model)}");
            System.Console.WriteLine($"header:{JsonSerializer.Serialize(header)}");
        }
    }
}
namespace sso
{
    public class Subscriber : ICapSubscribe
    {
        public Subscriber(){}

        // 指定消费队列和分组。同一个分组的多个实例,消息只会被1个处理。
        [CapSubscribe("seazen.employees.resign", Group = "seazen.sso")]
        public void HandleResign(object model,[FromCap]CapHeader header)
        {
            System.Console.WriteLine($"sso receive");
            System.Console.WriteLine($"msg:{JsonSerializer.Serialize(model)}");
            System.Console.WriteLine($"header:{JsonSerializer.Serialize(header)}");
        }
    }
}

顺序消费队列

当在一些特殊的业务场景下,我们需要严格的保证消息队列的顺序消费。否则下游业务会出现和上游发布消息业务数据不一致问题。
例如:管理员在对用户的业务权限频繁操作时。依次触发了对某个权限的增、删、改。 当下游消费系统是多实例消费时,消息可能同时在多个进程中执行,由于各操作的效率不同,最终的实际消费顺序可能就变成了:改、删、增

no-order

为了解决上述问题,我们需要在消费端处理顺序消费问题。我们可以有 2 种做法:

  1. 消费者变成单实例、单线程,一个一个处理消息内容。(此方式会出现消费能力不足问题。)

  2. 结合持久化存储,给每条消息添加顺序标记,消费者在处理消息时根据标记顺序处理。

下面,我们针对方法 2 来阐述我们的顺序队列方案。

order-queue

  1. 首先,我们需要在数据库中创建一个暂存队列消息的数据表。

    Sequence

    Type

    Partition

    Data

    IsConsumed

    Added

    ExpiresAt

    序号

    队列类型

    分区

    消息数据

    是否创建

    添加时间

    超时时间

    重要

    Type 字段是为了给同一个应用的多个顺序队列增加一个区分。这样在多线程消费时,不同线程可以以更小的颗粒度增加全局锁。 Partition 字段是为了给同一个顺序队列增加消费能力。一般,我们使用业务的唯一 key 进行哈希取模来分区,相同 key 的消息都进入同一个分区。每个分区仅一个线程处理(多线程可以通过分布式锁模拟一个线程消费)。

  2. 在生产者中增加顺序字段,一般使用时间戳或雪花片算法得到的 id。并将顺序数据写入到持久化表中。

    [HttpGet("publish")]
    public async Task<IActionResult> Publish()
    {
        await Task.CompletedTask;
    
        // 随机模拟同一个用户的数据变化通知
        var userId = (new Random()).Next(1, 10);
        // 为了增加并发性,我们需要对同一个类型的队列进行分区处理。这样,后续的多线程可以每个形成针对不同的分区进行消费。
        // 这样既提升了性能,还能保证同一个分区同时只有1个线程消费。
        var partition = (int)((long)msg.UserId / 3);
    
        // cap 的 msgId 使用雪花片算法,可以保证越大越靠后,我们直接使用 capId 做为消息的序号
        var snowflakeId = DotNetCore.CAP.Internal.SnowflakeId.Default().NextId();
    
        string queueType = "user_auth.change";
        // insert db
        var data = new OrderedQueue()
        {
            Sequence = snowflakeId,
            Partition = partition,
            Type = queueType,
            Data = System.Text.Json.JsonSerializer.Serialize(new UserInfo { UserId = userId, Time = DateTime.Now })
        };
    
        InMemoryDB.OrderedQueue.Add(data);
        InMemoryDB.OrderedQueue.SaveChange();
    
        _capPublisher.Publish("auth.user_auth.change", data);
        return Ok($"ok");
    }
    
  3. 消费者按照分区和类型,消费消息。

    [DotNetCore.CAP.CapSubscribe("auth.user_auth.change", Group = "auth.user_auth")]
    public void Handle(OrderedQueue data, [FromCap] CapHeader header)
    {
        DoSomethingOrdered(data.Partition,data.Type);
    }
    
  4. 对用一个类型、同一个分区的消费者加锁,进行顺序消费。

    public async Task DoSomethingOrdered(int partition,string type)
    {
        // 这里需要加锁来保证消费顺序,可是使用类似于 mysql 的 select for update 的数据库锁。
        // 我们这里使用 redis 的分布式锁。
        var @lock = _lockFactory.CreateLock("redis", $"orderedMQ_{type}_{partition}");
        if (await @lock.LockAsync(10000))
        {
            try
            {
                // 仅处理本分区数据
                var orderedMsg = InMemoryDB.OrderedQueue
                        .Where(d => !d.IsConsumed && d.Partition == partition && d.Type == type)
                        .OrderBy(d => d.Sequence)
                        .FirstOrDefault();
                if(orderedMsg is not null)
                {
                    UserInfo data = System.Text.Json.JsonSerializer.Deserialize<UserInfo>(orderedMsg.DataJson);
                    orderedMsg.IsConsumed = true;
                    InMemoryDB.OrderedQueue.SaveChange();
                    _logger.LogInformation($"OrderConsume:{data.UserId}-{orderedMsg.Sequence}-{data.Time.ToString("o")}");
                }
            }
            finally
            {
                await @lock.ReleaseAsync();
            }
        }
    }
    

    测试结果如下:

    order-queue-result