分布式锁
锁、信号量
锁 和 信号量 看起来很像,他们都是为了解决多线程、多进程、分布式集群等争抢资源时的同步问题。
锁和信号量比较显著的区别在于:针对同一个资源的争抢是否是完全互斥关系。
重要
锁:在同一个时间点只允许一个程序操纵资源。程序之间是互斥的关系。
信号量:在同一个时间点允许多个程序操纵资源。程序之间并不互斥,各程序之间通过信号来争抢资源。
另外,两者的操作也不同。
锁:
acquire lock/release lock信号量:
wait semaphore/signal semaphore
注意
信号量(Semaphore) 是一种在多线程环境下用于控制访问资源的同步机制。它是一个整数值,代表了同时可以访问某个资源或者执行某个操作的最大线程数量。
典型应用举例:
锁应用:在我们批量刷数据时,为了增加消费能力,我们会启动很多的进程同时处理。 但,如何保证每个程序所处理的数据不被其他相同程序再次处理?这时候我们需要采用互斥的锁来实现。 各进程抢占互斥锁,拿到锁后标记自己处理的数据,然后释放锁,交替往复消费完所有数据。

信号量应用(流量控制):我们在对接飞书发送消息的 Api 时,飞书的 API 存在频控策略。1 秒内发送次数不能超过 50 次。 但我们的对接的众多业务系统在早高峰上班时,会大量给用户推送通知消息。 那如何做到最大程度高效发送而又不触发下游飞书的频控呢?
业务系统将大量数据堆积到消息队列,发送程序从队列多线程消费。在发往飞书前,先等待信号量。等待到信号量后,才开始发送。 没有获取到信号量时,继续等待。
分布式锁使用
在我们使用的组件中, hangfire 、 consul 、 redis 都提供了分布式锁的能力。
hangfire 的锁实现是基于数据库表的原子操作来实现。(根据不同类型的数据库,实现稍有不同。)
consul 是基于它的 session + kv 存储来实现。详情实现因篇幅有限,请大家去官网了解。
小技巧
出于性能考虑,推荐使用 redis 的分布式锁。
但如果不想过多依赖中间件或对旧项目改造,可以基于db自行实现分布式锁。
经压测在 4c8g 的 mysql 实例,示例代码能做到 300+/s 的 tps
Consul KV 存在并发性能上的瓶颈,并不推荐在高并发场景下使用。
使用 redis 实现分布式锁
// Startup.cs services.AddEasyCaching(options =>{ // ... options.UseCSRedisLock(); // ... }); public class CacheController : ControllerBase { private readonly IDistributedLockFactory _lockFactory; public CacheController(CSRedisLockFactory lockFactory) { _lockFactory = lockFactory; } [HttpGet] public string Lock() { // lock var timeout = 1000; var @lock = lockFactory.CreateLock("redis", "lock_key"); await @lock.LockAsync(timeout); try { // do something; } finally { await @lock.ReleaseAsync(); } } }
基于 DB 自行实现分布式锁
-- Create lock table CREATE TABLE distributed_lock ( lock_key VARCHAR(64) NOT NULL, lock_value VARCHAR(128) NOT NULL, created_at DATETIME NOT NULL, expires_at DATETIME NOT NULL, PRIMARY KEY (lock_key) INDEX idx_distributed_lock_expires (expires_at) );
public interface IDistributedLock { bool Acquire(string lockKey, string value, int timeoutSeconds); bool Release(string lockKey, string value); } public class MySQLDistributedLock : IDistributedLock { private readonly DBContext _context; public MySQLDistributedLock(DBContext context) { _context = context; } private async Task<bool> Acquire(string lockKey, string value,int timeoutSeconds) { var now = DateTime.Now; var expiresAt = now.AddSeconds(timeoutSeconds); using var transaction = await _context.Database.BeginTransactionAsync(); try { var lockEntity = await _context.Locks.FirstOrDefaultAsync(x => x.Key == lockKey); if (lockEntity == null || lockEntity.ExpiresAt < now) { FormattableString sql = @"INSERT INTO distributed_lock (lock_key, lock_value, expires_at) SELECT {lockKey},{value},{expiresAt} WHERE NOT EXISTS ( SELECT 1 FROM distributed_lock WHERE lock_key = {lockKey} AND expires_at > NOW(3) );"; var affected = await _context.Database.ExecuteSqlInterpolatedAsync(sql); await transaction.CommitAsync(); return affected > 0 ? true : false; } await transaction.RollbackAsync(); return false; } catch { await transaction.RollbackAsync(); return false; } } private async Task<bool> Release(string lockKey, string lockValue) { var lockEntity = await _context.Locks.FirstOrDefaultAsync(x => x.Key == lockKey); if (lockEntity == null || lockEntity.Value != lockValue) return false; _context.Locks.Remove(lockEntity); await _context.SaveChangesAsync(); return true; } }
// use distributed lock var lockKey = "order:123"; var value = Guid.NewGuid().ToString(); var timeoutSeconds = 30; while (true) { if (await IDistributedLock.Acquire(lockKey, lockValue, timeoutSeconds)) { // mock business logic await Task.Delay(150); await IDistributedLock.Release(lockKey, lockValue); return Ok(); } await Task.Delay(10); }
使用 hangfire 实现分布式锁
// 1. Startup.cs 中添加正常添加 hangfire 组件 services.AddHangfire(configuration => { configuration .SetDataCompatibilityLevel(CompatibilityLevel.Version_170) .UseSimpleAssemblyNameTypeSerializer() .UseRecommendedSerializerSettings() .UseSqlServerStorage(Configuration.GetConnectionString("SqlConnection"), new SqlServerStorageOptions() { CommandBatchMaxTimeout = TimeSpan.FromMinutes(5), SlidingInvisibilityTimeout = TimeSpan.FromMinutes(5), QueuePollInterval = TimeSpan.Zero, UseRecommendedIsolationLevel = true, UsePageLocksOnDequeue = true, DisableGlobalLocks = true, SchemaName = "HangFire" }); }); // 2. 业务代码中使用静态类 JobStorage 来获得锁 var @lock = JobStorage.Current.GetConnection() .AcquireDistributedLock("lock_key", TimeSpan.FromSeconds(10)); try { // 处理业务逻辑 } catch (System.Exception e) { _logger.LogWarning(e, $"check poll condition error: {e.Message}"); } finally { // 释放锁 @lock.Dispose(); }
分布式信号量使用
以下通过使用 mysql 和 consul 简单实例分布式信号量。
危险
consul 的信号量基于 KV 实现,本身 KV 的写入存在性能瓶颈,不建议在高并发下使用。
经测试 4C/8G 的云 mysql 实例能支持 1000+/s tps 的并发,能满足大部分的场景。
如果是超高并发的场景需要使用 redis 计数的方式来实现并发线程、限流等控制。
使用 mysql 自行实现分布式信号量
小技巧
以下基于 mysql 的实现简单对信号量概念做了扩展。实现可以按时间配置的限流控制。如: 50/s 1000/m
-- semaphore.sql CREATE TABLE semaphore ( limit_key VARCHAR(100) NOT NULL, window_start DATETIME NOT NULL, request_count INT NOT NULL, PRIMARY KEY (limit_key, window_start), INDEX idx_window (window_start) );
public async Task<IActionResult> Acquire([FromQuery] int second = 1, [FromQuery] int maxCount = 100) { var current = DateTime.Now; var key = "limit_key"; if (await AttemptAcquireAsync(key, second, maxCount, current)) { // do something return Ok(); } return Conflict(); } private async Task<bool> AttemptAcquireAsync(string key, int window_seconds, int max, DateTime current) { using var transaction = await _dbContext.Database.BeginTransactionAsync(); try { var window = current.AddSeconds(0 - window_seconds); FormattableString sumSql = @" DELETE FROM rate_limits WHERE limit_key = {key} AND window_start < {window}; SELECT COALESCE(SUM(request_count), 0) FROM rate_limits WHERE limit_key = {key} AND window_start >= {window} FOR UPDATE; "; var cnt = (await _dbContext.Database.SqlQuery<int>(sumSql).ToListAsync()).FirstOrDefault(); if (cnt < max) { FormattableString increaseSql = @" INSERT INTO rate_limits (limit_key, window_start, request_count) SELECT {key}, {current}, 1 ON DUPLICATE KEY UPDATE request_count = request_count + 1; "; await _dbContext.Database.ExecuteSqlInterpolatedAsync(increaseSql); await transaction.CommitAsync(); return true; } await transaction.RollbackAsync(); return false; } catch { await transaction.RollbackAsync(); return false; } }
使用 consul 来实现分布式信号量
// 此段代码是控制调用飞书的同时执行的线程数量。 public async Task SendSingleMessage() { // ... // 创建一个分布信号量,此信号量可以同时最多被4个程序获取。 IDistributedSemaphore semaphore = _consulClient.Semaphore("semaphore/key:4", 4); // 等待信号量,如果目前已经有4个程序获取到信号量,则在这里挂起。 await semaphore.Acquire(CancellationToken.None); // 获取到信号量,执行业务代码 var token = await _tokenService.GetFeishuToken("{ak}"); var res = await _feiShuApi.SendMessage("account", input, token); // 业务执行完成, if (semaphore != null && semaphore.IsHeld) { await semaphore.Release(); } }