分布式缓存
我们的分布式缓存基于 redis + easycaching 来实现。
本文我们会简单介绍 redis 集群相关部署知识和 easycaching 组件的基本使用知识,
以帮助大家快速的使用分布式缓存。
但针对 redis 本身的使用,本文不做详细说明,仅会对一些特殊业务需要使用到如 redis 基本数据类型、特性等给出示例代码,原理将不会阐述。
集群模式
我们的 redis 集群采用了下图的部署方式。 redis 本身采用 redis cluster 方式部署,6 节点集群, 3 个 master 、3 个 salve 。 salve 用于只读查询。 我们又在 redis 集群前面增加了一层 redis 代理 (使用了开源组件 predixy )。
使用 predixy 的好处如下:
统一管理与 redis 的连接。
拓展了对缓存 key 的访问控制功能,增加应用对集群中缓存 key 的访问隔离。
解耦 redis 集群数据分片,让客户端像连接单台 redis 一样操作集群。
支持多集群、多数据中心,支持读写分离。屏蔽了 redis 扩容对业务的影响。

注意
redis 集群模式下,只能使用 db0 ,不能使用 select 命令切换 db
缓存 key 的访问权限控制
我们在 redis 的代理层 predixy 中会为不同的应用开通不同的访问账号 (本质是 redis 的密码) 以便来达到对缓存 key 的访问的隔离。
Authority {
Auth {
Mode read
KeyPrefix info
}
Auth readonly {
Mode read
}
Auth modify {
Mode write
ReadPrefix msg bpm
WritePrefix bpm
}
}
代理层会根据不同账号增加读、写访问控制。以上示例的权限如下:
匿名账号的访问所有以
info为前缀的 keyreadonly 账号可以只读所有的 key
modify 账号可以对
msg bpm前缀进行访问,对bpm前缀进行读写。
easycaching 组件的使用
架构 & 多级缓存
easycaching 组件通过使用 内存缓存 + 分布式缓存 + 发布订阅模式 的方式完成了如下功能:
多级缓存
应用分布式部署时,多台机器缓存同时更新

注意
目前我们推荐大家使用多级缓存来实施自己的缓存方案,这样可以在保证高效缓存的前提下,
减轻 redis 集群的负载压力。
但需要特别注意,当使用了多级缓存时,上图中黄色部分都是高效的内存缓存。
当缓存数据量非常大时,必然会导致应用程序所占用内存过大。这需要根据自己的业务自行评估。
DotNet 集成
添加包引用
# 添加组件 dotnet add package EasyCaching.HybridCache dotnet add package EasyCaching.InMemory dotnet add package EasyCaching.CSRedis dotnet add package EasyCaching.Bus.CSRedis dotnet add package EasyCaching.Serialization.SystemTextJson
appsettings.json增加配置{ "RedisConfig": { "Endpoint": "10.0.36.85:32059", "Password": "seazen", "KeyPrefix": "msg:" } }
Startup.cs增加启动代码public void ConfigureServices(IServiceCollection services) { // 读取配置 string redisEndpoint = configuration["RedisConfig:Endpoint"]; string redisPassword = configuration["RedisConfig:Password"]; string keyPrefix = configuration["RedisConfig:KeyPrefix"]; var endpointSplit = redisEndpoint.Split(":"); string ip = endpointSplit[0]; int port = Convert.ToInt32(endpointSplit[1]); // redis 连接字符串 string redisConn = $"{redisEndpoint},password={redisPassword},defaultDatabase=0,prefix={keyPrefix}"; services.AddEasyCaching(options => { // 配置序列化组件 options.WithSystemTextJson("systemJson"); options.UseInMemory("memory"); /* 添加一个名为 redis 的 caching provider 。 可以多次添加指定不同名称 在使用时可以通过指定名称,来达到使用多个 redis 集群的作用。 */ options.UseCSRedis(config => { // 配置 redis 连接字符串 config.DBConfig = new CSRedisDBOptions { ConnectionStrings = new List<string> { redisConn } }; config.EnableLogging = true; // 指定使用的序列化组件 config.SerializerName = "systemJson"; }, "redis"); // 混合本地缓存和分布式缓存,形成二级缓存 options.UseHybrid(config => { config.TopicName = $"{keyPrefix}topic"; config.EnableLogging = true; config.LocalCacheProviderName = "memory"; config.DistributedCacheProviderName = "redis"; }, "hybrid") // 配置 event bus .WithCSRedisBus(busConf => { busConf.ConnectionStrings = new List<string> { redisConn }; busConf.ReadOnly = false; }); }); }
小技巧
添加的时的配置较多,推荐写成单独方法在 ConfigureServices 中调用。
使用缓存
public class CacheController : ControllerBase { private readonly ILogger<CacheController> _logger; private readonly IConfiguration _config; // 依赖注入混合缓存的Provider private readonly IHybridCachingProvider _cache; public CacheController(ILogger<CacheController> logger , IHybridCachingProvider cache , IConfiguration config ) { _logger = logger; _cache = cache; _config = config; } [HttpGet("get")] public async Task<IActionResult> Get([FromQuery] string key) { // 查询缓存 var res = await _cache.GetAsync<string>(key); return Ok($"{res.Value}"); } [HttpGet("save")] public async Task<IActionResult> Save([FromQuery] string key, [FromQuery] string val) { // 修改缓存 var res = await _cache.TrySetAsync<string>(key, val, TimeSpan.FromDays(1)); return Ok($"{res}"); } [HttpGet("del")] public async Task<IActionResult> Del([FromQuery] string key) { // 删除缓存 await _cache.RemoveAsync(key); return Ok($""); } }
业务代码使用举例
public Product GetProduct(int id)
{
string cacheKey = $"product:{id}";
var res = _cache.Get<Product>(cacheKey);
if(res.HasValue)
return res.Value;
val = _db.GetProduct(id);
if(val != null)
_cache.Set<Product>(cacheKey, val, expiration);
return val;
}
重要
easycaching 只有 IHybridCachingProvider 接口对错误进行了封装。
其他类型的 provider 并没有对访问 redis 的错误进行封装,如下游 redis 宕机,Get 方法会报错。
业务系统需要自行 try catch 异常。
如何同时使用多个 redis 集群
当我们在 ConfigureServices 中配置 easycaching 时,options.UseXXX 等方法都可以多次执行。 每次执行时都需要指定一个名称。例如:
services.AddEasyCaching(options =>
{
options.UseInMemory("memory1");
options.UseInMemory("memory2");
options.UseCSRedis(config1,"redis1");
options.UseCSRedis(config1,"redis2");
options.UseHybrid(config =>
{
config.TopicName = $"{keyPrefix}topic";
config.EnableLogging = true;
config.LocalCacheProviderName = "memory1";
config.DistributedCacheProviderName = "redis1";
}, "hybrid1");
options.UseHybrid(config =>
{
config.TopicName = $"{keyPrefix}topic";
config.EnableLogging = true;
config.LocalCacheProviderName = "memory2";
config.DistributedCacheProviderName = "redis2";
}, "hybrid2");
});
以上的配置,我们就添加了 2 组 redis 集群。接下来,我们就可以使用工厂方法通过指定名称来获取不同 redis 集群的数据了。
public class CacheController : ControllerBase
{
private readonly IHybridCachingProvider _cache1;
private readonly IHybridCachingProvider _cache2;
public CacheController(IHybridProviderFactory cacheFactory)
{
_cache1 = cacheFactory.GetHybridCachingProvider("hybrid1");
_cache2 = cacheFactory.GetHybridCachingProvider("hybrid2");
}
}
使用 redis 的其他功能
public class CacheController : ControllerBase
{
private readonly IRedisCachingProvider _redis;
public CacheController(IEasyCachingProviderFactory factory)
{
_redis = factory.GetRedisProvider("redis");
}
[HttpGet]
public string Get()
{
// HMSet
var res = _redis.HMSet(cacheKey, new Dictionary<string, string>
{
{"a1","v1"},{"a2","v2"}
});
//others ...
}
}
redis 示例
分布式锁
// Startup.cs
services.AddEasyCaching(options => {
// ...
options.UseCSRedisLock();
// ...
});
public class CacheController : ControllerBase
{
private readonly IDistributedLockFactory _lockFactory;
public CacheController(CSRedisLockFactory lockFactory)
{
_lockFactory = lockFactory;
}
[HttpGet]
public string Get()
{
// lock
var @lock = lockFactory.CreateLock("redis", "lock_key");
await @lock.LockAsync(timeout);
try
{
// do something;
}
finally
{
await @lock.ReleaseAsync();
}
//others ...
}
}
限流计数器
// 简单示例
public async Task Invoke(HttpContext context)
{
var userId = "00001";
var requestsPerMinuteThreshold = 100;
var cacheKey = $"consumer.throttle:{userId}";
// 直接计数
var cnt = await _redis.IncrByAsync(cacheKey, 1);
if (cnt == 1)
{
// 第一个计数的线程设置缓存
_redis.KeyExpire(cacheKey, 1 * 60);
}
else if (cnt > requestsPerMinuteThreshold)
{
// 当用户的请求超过了限制,限流
context.Response.StatusCode = 429;
using (var writer = new StreamWriter(context.Response.Body))
{
await writer.WriteAsync("You are making too many requests.");
}
return;
}
await next(context);
}
// 带 timeout 的示例
using EasyCaching.Core;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Seazen.Services
{
public interface ICounterService
{
Task GetCounter(string name, int cnt , int seconds , int timeoutSeconds);
}
public class CounterService : ICounterService
{
readonly IRedisCachingProvider _redis;
private const string CounterKeyTmpl = "counter:{0}";
public CounterService(IRedisCachingProvider redis)
{
_redis = redis ?? throw new ArgumentNullException(nameof(redis));
}
public async Task GetCounter(string name, int cnt , int seconds , int timeoutSeconds)
{
DateTime startTime = DateTime.Now;
do
{
string key = string.Format(CounterKeyTmpl, name);
// 计数
var counter = await _redis.IncrByAsync(key, 1);
if (counter == 1)
{
_redis.KeyExpire(key, seconds);
}
if (counter <= cnt)
{
return;
}
// 已经超过限流,如果等待超时,触发 timeout 异常
if ((DateTime.Now - startTime).TotalSeconds > timeoutSeconds)
{
// 防止多线程导致的设置 ttl 失败问题
if (_redis.TTL(key) < 0)
{
_redis.KeyDel(key);
}
throw new TimeoutException($" {name} get counter {timeoutSeconds}s timeout");
}
Thread.Sleep(50);
}
while (true);
}
}
}
乐观锁
乐观锁一般使用在如:秒杀、抢单等场景下。以下为伪代码。
bool success;
do {
var someVal = redis.HashGet(...);
var tran = redis.CreateTransaction();
tran.AddCondition(Condition.HashEquals(...));
tran.HashSetAsync(....);
success = tran.Execute();
} while (!success);
bitmap & 布隆过滤器
bitmap 最大支持 512 MB, 2^32 不同位。
布隆过滤器应用场景
缓存击穿
黑名单过滤,把黑名单全部放在布隆过滤器中

布隆过滤器的基础数据结构是一个比特向量,大致的实现方案是:将同一个 key ,通过不同的 hash 算法,多次运算,得到标志位,并将二进制数组中对应位的标记成 1。当查询时,使用同样的算法得到结果,再判断 redis 中是否对应位上是否是 1。
Bloom Filter 是一个基于概率的数据结构:它只能告诉我们一个元素绝对不在集合内或可能在集合内
优点:优点很明显,二进制组成的数组,占用内存极少,并且插入和查询速度都足够快。
缺点:随着数据的增加,误判率会增加;还有无法判断数据一定存在;另外还有一个重要缺点,无法删除数据。
小技巧
下面展示的基本原理。实际业务中,可以使用开源项目实现。 如:BloomFilter.NetCore
Install Redis Client
dotnet add package StackExchange.Redis
Connect to Redis
using StackExchange.Redis; var redis = ConnectionMultiplexer.Connect("localhost"); var db = redis.GetDatabase();
Hash Functions
public static int Hash1(string input) { return input.GetHashCode(); } public static int Hash2(string input) { return input.GetHashCode() * 31; }
Add to Bloom Filter
public void AddToBloomFilter(string key, string value) { int hash1 = Hash1(value); int hash2 = Hash2(value); db.StringSetBit(key, Math.Abs(hash1 % 1000), true); db.StringSetBit(key, Math.Abs(hash2 % 1000), true); }
Check Bloom Filter
public bool MightContain(string key, string value) { int hash1 = Hash1(value); int hash2 = Hash2(value); bool bit1 = db.StringGetBit(key, Math.Abs(hash1 % 1000)); bool bit2 = db.StringGetBit(key, Math.Abs(hash2 % 1000)); return bit1 && bit2; }