分布式缓存

我们的分布式缓存基于 redis + easycaching 来实现。 本文我们会简单介绍 redis 集群相关部署知识和 easycaching 组件的基本使用知识, 以帮助大家快速的使用分布式缓存。
但针对 redis 本身的使用,本文不做详细说明,仅会对一些特殊业务需要使用到如 redis 基本数据类型、特性等给出示例代码,原理将不会阐述。

集群模式

我们的 redis 集群采用了下图的部署方式。 redis 本身采用 redis cluster 方式部署,6 节点集群, 3 个 master 、3 个 salve 。 salve 用于只读查询。 我们又在 redis 集群前面增加了一层 redis 代理 (使用了开源组件 predixy )。

使用 predixy 的好处如下:

  1. 统一管理与 redis 的连接。

  2. 拓展了对缓存 key 的访问控制功能,增加应用对集群中缓存 key 的访问隔离。

  3. 解耦 redis 集群数据分片,让客户端像连接单台 redis 一样操作集群。

  4. 支持多集群、多数据中心,支持读写分离。屏蔽了 redis 扩容对业务的影响。

redis-cluster

注意

redis 集群模式下,只能使用 db0 ,不能使用 select 命令切换 db

缓存 key 的访问权限控制

我们在 redis 的代理层 predixy 中会为不同的应用开通不同的访问账号 (本质是 redis 的密码) 以便来达到对缓存 key 的访问的隔离。

Authority {
    Auth {
        Mode read
        KeyPrefix info
    }

    Auth readonly {
        Mode read
    }

    Auth modify {
        Mode write
        ReadPrefix msg bpm
        WritePrefix bpm
    }
}

代理层会根据不同账号增加读、写访问控制。以上示例的权限如下:

  1. 匿名账号的访问所有以 info 为前缀的 key

  2. readonly 账号可以只读所有的 key

  3. modify 账号可以对 msg bpm 前缀进行访问,对 bpm 前缀进行读写。

easycaching 组件的使用

架构 & 多级缓存

easycaching 组件通过使用 内存缓存 + 分布式缓存 + 发布订阅模式 的方式完成了如下功能:

  1. 多级缓存

  2. 应用分布式部署时,多台机器缓存同时更新

easycaching

注意

目前我们推荐大家使用多级缓存来实施自己的缓存方案,这样可以在保证高效缓存的前提下, 减轻 redis 集群的负载压力。
但需要特别注意,当使用了多级缓存时,上图中黄色部分都是高效的内存缓存。
当缓存数据量非常大时,必然会导致应用程序所占用内存过大。这需要根据自己的业务自行评估。

DotNet 集成

  1. 添加包引用

    # 添加组件
    dotnet add package EasyCaching.HybridCache
    dotnet add package EasyCaching.InMemory
    dotnet add package EasyCaching.CSRedis
    dotnet add package EasyCaching.Bus.CSRedis
    dotnet add package EasyCaching.Serialization.SystemTextJson
    
  2. appsettings.json 增加配置

    {
      "RedisConfig": {
        "Endpoint": "10.0.36.85:32059",
        "Password": "seazen",
        "KeyPrefix": "msg:"
      }
    }
    
  3. Startup.cs 增加启动代码

    public void ConfigureServices(IServiceCollection services)
    {
        // 读取配置
        string redisEndpoint = configuration["RedisConfig:Endpoint"];
        string redisPassword = configuration["RedisConfig:Password"];
        string keyPrefix = configuration["RedisConfig:KeyPrefix"];
        var endpointSplit = redisEndpoint.Split(":");
        string ip = endpointSplit[0];
        int port = Convert.ToInt32(endpointSplit[1]);
        // redis 连接字符串
        string redisConn = $"{redisEndpoint},password={redisPassword},defaultDatabase=0,prefix={keyPrefix}";
        services.AddEasyCaching(options =>
        {
            // 配置序列化组件
            options.WithSystemTextJson("systemJson");
            options.UseInMemory("memory");
            /*
                添加一个名为 redis 的 caching provider 。
                可以多次添加指定不同名称
                在使用时可以通过指定名称,来达到使用多个 redis 集群的作用。
            */
            options.UseCSRedis(config =>
            {
                // 配置 redis 连接字符串
                config.DBConfig = new CSRedisDBOptions
                {
                    ConnectionStrings = new List<string> { redisConn }
                };
                config.EnableLogging = true;
                // 指定使用的序列化组件
                config.SerializerName = "systemJson";
            }, "redis");
    
            // 混合本地缓存和分布式缓存,形成二级缓存
            options.UseHybrid(config =>
            {
                config.TopicName = $"{keyPrefix}topic";
                config.EnableLogging = true;
                config.LocalCacheProviderName = "memory";
                config.DistributedCacheProviderName = "redis";
            }, "hybrid")
            // 配置 event bus
            .WithCSRedisBus(busConf =>
            {
                busConf.ConnectionStrings = new List<string> { redisConn };
                busConf.ReadOnly = false;
            });
        });
    }
    

    小技巧

    添加的时的配置较多,推荐写成单独方法在 ConfigureServices 中调用。

  4. 使用缓存

    public class CacheController : ControllerBase
    {
        private readonly ILogger<CacheController> _logger;
        private readonly IConfiguration _config;
        // 依赖注入混合缓存的Provider
        private readonly IHybridCachingProvider _cache;
    
        public CacheController(ILogger<CacheController> logger
            , IHybridCachingProvider cache
            , IConfiguration config
        )
        {
            _logger = logger;
            _cache = cache;
            _config = config;
        }
    
        [HttpGet("get")]
        public async Task<IActionResult> Get([FromQuery] string key)
        {
            // 查询缓存
            var res = await _cache.GetAsync<string>(key);
            return Ok($"{res.Value}");
        }
    
        [HttpGet("save")]
        public async Task<IActionResult> Save([FromQuery] string key, [FromQuery] string val)
        {
            // 修改缓存
            var res = await _cache.TrySetAsync<string>(key, val, TimeSpan.FromDays(1));
            return Ok($"{res}");
        }
    
        [HttpGet("del")]
        public async Task<IActionResult> Del([FromQuery] string key)
        {
            // 删除缓存
            await _cache.RemoveAsync(key);
            return Ok($"");
        }
    }
    

业务代码使用举例

public Product GetProduct(int id)
{
    string cacheKey = $"product:{id}";

    var res = _cache.Get<Product>(cacheKey);

    if(res.HasValue)
        return res.Value;

    val = _db.GetProduct(id);

    if(val != null)
        _cache.Set<Product>(cacheKey, val, expiration);

    return val;
}

重要

easycaching 只有 IHybridCachingProvider 接口对错误进行了封装。
其他类型的 provider 并没有对访问 redis 的错误进行封装,如下游 redis 宕机,Get 方法会报错。 业务系统需要自行 try catch 异常。

如何同时使用多个 redis 集群

当我们在 ConfigureServices 中配置 easycaching 时,options.UseXXX 等方法都可以多次执行。 每次执行时都需要指定一个名称。例如:

services.AddEasyCaching(options =>
{
    options.UseInMemory("memory1");
    options.UseInMemory("memory2");

     options.UseCSRedis(config1,"redis1");
     options.UseCSRedis(config1,"redis2");

    options.UseHybrid(config =>
    {
        config.TopicName = $"{keyPrefix}topic";
        config.EnableLogging = true;
        config.LocalCacheProviderName = "memory1";
        config.DistributedCacheProviderName = "redis1";
    }, "hybrid1");

    options.UseHybrid(config =>
    {
        config.TopicName = $"{keyPrefix}topic";
        config.EnableLogging = true;
        config.LocalCacheProviderName = "memory2";
        config.DistributedCacheProviderName = "redis2";
    }, "hybrid2");
});

以上的配置,我们就添加了 2 组 redis 集群。接下来,我们就可以使用工厂方法通过指定名称来获取不同 redis 集群的数据了。

public class CacheController : ControllerBase
{
    private readonly IHybridCachingProvider _cache1;
    private readonly IHybridCachingProvider _cache2;

    public CacheController(IHybridProviderFactory cacheFactory)
    {
        _cache1 = cacheFactory.GetHybridCachingProvider("hybrid1");
        _cache2 = cacheFactory.GetHybridCachingProvider("hybrid2");
    }
}

使用 redis 的其他功能

public class CacheController : ControllerBase
{
    private readonly IRedisCachingProvider _redis;

    public CacheController(IEasyCachingProviderFactory factory)
    {
         _redis = factory.GetRedisProvider("redis");
    }

    [HttpGet]
    public string Get()
    {
        // HMSet
        var res = _redis.HMSet(cacheKey, new Dictionary<string, string>
        {
            {"a1","v1"},{"a2","v2"}
        });

        //others ...
    }
}

redis 示例

分布式锁

// Startup.cs
services.AddEasyCaching(options => {
    // ...
    options.UseCSRedisLock();
    // ...
});

public class CacheController : ControllerBase
{
    private readonly IDistributedLockFactory _lockFactory;

    public CacheController(CSRedisLockFactory lockFactory)
    {
         _lockFactory = lockFactory;
    }

    [HttpGet]
    public string Get()
    {
        // lock
        var @lock = lockFactory.CreateLock("redis", "lock_key");
        await @lock.LockAsync(timeout);
        try
        {
            // do something;
        }
        finally
        {
            await @lock.ReleaseAsync();
        }

        //others ...
    }
}

限流计数器

// 简单示例
public async Task Invoke(HttpContext context)
{
    var userId = "00001";
    var requestsPerMinuteThreshold = 100;
    var cacheKey = $"consumer.throttle:{userId}";

    // 直接计数
    var cnt = await _redis.IncrByAsync(cacheKey, 1);
    if (cnt == 1)
    {
        // 第一个计数的线程设置缓存
        _redis.KeyExpire(cacheKey, 1 * 60);
    }
    else if (cnt > requestsPerMinuteThreshold)
    {
        // 当用户的请求超过了限制,限流
        context.Response.StatusCode = 429;

        using (var writer = new StreamWriter(context.Response.Body))
        {
            await writer.WriteAsync("You are making too many requests.");
        }

        return;
    }

    await next(context);
}
// 带 timeout 的示例
using EasyCaching.Core;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Seazen.Services
{
    public interface ICounterService
    {
        Task GetCounter(string name, int cnt , int seconds , int timeoutSeconds);
    }

    public class CounterService : ICounterService
    {
        readonly IRedisCachingProvider _redis;

        private const string CounterKeyTmpl = "counter:{0}";

        public CounterService(IRedisCachingProvider redis)
        {
            _redis = redis ?? throw new ArgumentNullException(nameof(redis));
        }

        public async Task GetCounter(string name, int cnt , int seconds , int timeoutSeconds)
        {
            DateTime startTime = DateTime.Now;

            do
            {
                string key = string.Format(CounterKeyTmpl, name);
                // 计数
                var counter = await _redis.IncrByAsync(key, 1);
                if (counter == 1)
                {
                    _redis.KeyExpire(key, seconds);
                }

                if (counter <= cnt)
                {
                    return;
                }

                // 已经超过限流,如果等待超时,触发 timeout 异常
                if ((DateTime.Now - startTime).TotalSeconds > timeoutSeconds)
                {
                    // 防止多线程导致的设置 ttl 失败问题
                    if (_redis.TTL(key) < 0)
                    {
                        _redis.KeyDel(key);
                    }

                    throw new TimeoutException($" {name} get counter {timeoutSeconds}s timeout");
                }

                Thread.Sleep(50);
            }
            while (true);
        }
    }
}

乐观锁

乐观锁一般使用在如:秒杀、抢单等场景下。以下为伪代码。

bool success;
do {
  var someVal = redis.HashGet(...);
  var tran = redis.CreateTransaction();
  tran.AddCondition(Condition.HashEquals(...));
  tran.HashSetAsync(....);
  success = tran.Execute();
} while (!success);

bitmap & 布隆过滤器

bitmap 最大支持 512 MB, 2^32 不同位。

布隆过滤器应用场景

  1. 缓存击穿

  2. 黑名单过滤,把黑名单全部放在布隆过滤器中

bloom-filter

布隆过滤器的基础数据结构是一个比特向量,大致的实现方案是:将同一个 key ,通过不同的 hash 算法,多次运算,得到标志位,并将二进制数组中对应位的标记成 1。当查询时,使用同样的算法得到结果,再判断 redis 中是否对应位上是否是 1。

Bloom Filter 是一个基于概率的数据结构:它只能告诉我们一个元素绝对不在集合内或可能在集合内

  • 优点:优点很明显,二进制组成的数组,占用内存极少,并且插入和查询速度都足够快。

  • 缺点:随着数据的增加,误判率会增加;还有无法判断数据一定存在;另外还有一个重要缺点,无法删除数据。

小技巧

下面展示的基本原理。实际业务中,可以使用开源项目实现。 如:BloomFilter.NetCore

  1. Install Redis Client

    dotnet add package StackExchange.Redis
    
  2. Connect to Redis

    using StackExchange.Redis;
    
    var redis = ConnectionMultiplexer.Connect("localhost");
    var db = redis.GetDatabase();
    
  3. Hash Functions

    public static int Hash1(string input)
    {
        return input.GetHashCode();
    }
    
    public static int Hash2(string input)
    {
        return input.GetHashCode() * 31;
    }
    
  4. Add to Bloom Filter

    public void AddToBloomFilter(string key, string value)
    {
        int hash1 = Hash1(value);
        int hash2 = Hash2(value);
    
        db.StringSetBit(key, Math.Abs(hash1 % 1000), true);
        db.StringSetBit(key, Math.Abs(hash2 % 1000), true);
    }
    
  5. Check Bloom Filter

    public bool MightContain(string key, string value)
    {
    int hash1 = Hash1(value);
    int hash2 = Hash2(value);
    
        bool bit1 = db.StringGetBit(key, Math.Abs(hash1 % 1000));
        bool bit2 = db.StringGetBit(key, Math.Abs(hash2 % 1000));
    
        return bit1 && bit2;
    
    }