分布式锁

锁、信号量

信号量 看起来很像,他们都是为了解决多线程、多进程、分布式集群等争抢资源时的同步问题。
锁和信号量比较显著的区别在于:针对同一个资源的争抢是否是完全互斥关系。

重要

  • 锁:在同一个时间点只允许一个程序操纵资源。程序之间是互斥的关系。

  • 信号量:在同一个时间点允许多个程序操纵资源。程序之间并不互斥,各程序之间通过信号来争抢资源。

另外,两者的操作也不同。

  • 锁:acquire lock / release lock

  • 信号量:wait semaphore / signal semaphore

注意

信号量(Semaphore) 是一种在多线程环境下用于控制访问资源的同步机制。它是一个整数值,代表了同时可以访问某个资源或者执行某个操作的最大线程数量。

典型应用举例:

  1. 锁应用:在我们批量刷数据时,为了增加消费能力,我们会启动很多的进程同时处理。 但,如何保证每个程序所处理的数据不被其他相同程序再次处理?这时候我们需要采用互斥的锁来实现。 各进程抢占互斥锁,拿到锁后标记自己处理的数据,然后释放锁,交替往复消费完所有数据。 d-lock

  2. 信号量应用(流量控制):我们在对接飞书发送消息的 Api 时,飞书的 API 存在频控策略。1 秒内发送次数不能超过 50 次。 但我们的对接的众多业务系统在早高峰上班时,会大量给用户推送通知消息。 那如何做到最大程度高效发送而又不触发下游飞书的频控呢?
    业务系统将大量数据堆积到消息队列,发送程序从队列多线程消费。在发往飞书前,先等待信号量。等待到信号量后,才开始发送。 没有获取到信号量时,继续等待。 d-semaphore

分布式锁使用

在我们使用的组件中, hangfireconsulredis 都提供了分布式锁的能力。
hangfire 的锁实现是基于数据库表的原子操作来实现。(根据不同类型的数据库,实现稍有不同。)
consul 是基于它的 session + kv 存储来实现。详情实现因篇幅有限,请大家去官网了解。

小技巧

出于性能考虑,推荐使用 redis 的分布式锁。
但如果不想过多依赖中间件或对旧项目改造,可以基于db自行实现分布式锁。
经压测在 4c8g 的 mysql 实例,示例代码能做到 300+/s 的 tps
Consul KV 存在并发性能上的瓶颈,并不推荐在高并发场景下使用。

  1. 使用 redis 实现分布式锁

    // Startup.cs
    services.AddEasyCaching(options =>{
        // ...
        options.UseCSRedisLock();
        // ...
    });
    
    public class CacheController : ControllerBase
    {
        private readonly IDistributedLockFactory _lockFactory;
    
        public CacheController(CSRedisLockFactory lockFactory)
        {
            _lockFactory = lockFactory;
        }
    
        [HttpGet]
        public string Lock()
        {
            // lock
            var timeout = 1000;
            var @lock = lockFactory.CreateLock("redis", "lock_key");
            await @lock.LockAsync(timeout);
            try
            {
                // do something;
            }
            finally
            {
                await @lock.ReleaseAsync();
            }
        }
    }
    
  2. 基于 DB 自行实现分布式锁

    -- Create lock table
    CREATE TABLE distributed_lock (
        lock_key VARCHAR(64) NOT NULL,
        lock_value VARCHAR(128) NOT NULL,
        created_at DATETIME NOT NULL,
        expires_at DATETIME NOT NULL,
        PRIMARY KEY (lock_key)
        INDEX idx_distributed_lock_expires (expires_at)
    );
    
    public interface IDistributedLock
    {
        bool Acquire(string lockKey, string value, int timeoutSeconds);
        bool Release(string lockKey, string value);
    }
    
    public class MySQLDistributedLock : IDistributedLock
    {
        private readonly DBContext _context;
    
        public MySQLDistributedLock(DBContext context)
        {
            _context = context;
        }
    
        private async Task<bool> Acquire(string lockKey, string value,int timeoutSeconds)
        {
            var now = DateTime.Now;
            var expiresAt = now.AddSeconds(timeoutSeconds);
    
            using var transaction = await _context.Database.BeginTransactionAsync();
            try
            {
                var lockEntity = await _context.Locks.FirstOrDefaultAsync(x => x.Key == lockKey);
                if (lockEntity == null || lockEntity.ExpiresAt < now)
                {
                    FormattableString sql =
    @"INSERT INTO distributed_lock (lock_key, lock_value, expires_at)
    SELECT {lockKey},{value},{expiresAt}
    WHERE NOT EXISTS (
        SELECT 1 FROM distributed_lock
        WHERE lock_key = {lockKey}
        AND expires_at > NOW(3)
    );";
    
                    var affected = await _context.Database.ExecuteSqlInterpolatedAsync(sql);
                    await transaction.CommitAsync();
                    return affected > 0 ? true : false;
                }
                await transaction.RollbackAsync();
                return false;
            }
            catch
            {
                await transaction.RollbackAsync();
                return false;
            }
        }
    
        private async Task<bool> Release(string lockKey, string lockValue)
        {
            var lockEntity = await _context.Locks.FirstOrDefaultAsync(x => x.Key == lockKey);
            if (lockEntity == null || lockEntity.Value != lockValue) return false;
            _context.Locks.Remove(lockEntity);
            await _context.SaveChangesAsync();
            return true;
        }
    }
    
    // use  distributed lock
    var lockKey = "order:123";
    var value = Guid.NewGuid().ToString();
    var timeoutSeconds = 30;
    
    while (true)
    {
        if (await IDistributedLock.Acquire(lockKey, lockValue, timeoutSeconds))
        {
            // mock business logic
            await Task.Delay(150);
            await IDistributedLock.Release(lockKey, lockValue);
            return Ok();
        }
        await Task.Delay(10);
    }
    
  3. 使用 hangfire 实现分布式锁

    // 1. Startup.cs 中添加正常添加 hangfire 组件
    services.AddHangfire(configuration =>
    {
        configuration
        .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
        .UseSimpleAssemblyNameTypeSerializer()
        .UseRecommendedSerializerSettings()
        .UseSqlServerStorage(Configuration.GetConnectionString("SqlConnection"),
        new SqlServerStorageOptions()
        {
            CommandBatchMaxTimeout = TimeSpan.FromMinutes(5),
            SlidingInvisibilityTimeout = TimeSpan.FromMinutes(5),
            QueuePollInterval = TimeSpan.Zero,
            UseRecommendedIsolationLevel = true,
            UsePageLocksOnDequeue = true,
            DisableGlobalLocks = true,
            SchemaName = "HangFire"
        });
    });
    
    // 2. 业务代码中使用静态类 JobStorage 来获得锁
    var @lock = JobStorage.Current.GetConnection()
                                  .AcquireDistributedLock("lock_key", TimeSpan.FromSeconds(10));
    try
    {
        // 处理业务逻辑
    }
    catch (System.Exception e)
    {
        _logger.LogWarning(e, $"check poll condition error: {e.Message}");
    }
    finally
    {
        // 释放锁
        @lock.Dispose();
    
    }
    

分布式信号量使用

以下通过使用 mysql 和 consul 简单实例分布式信号量。

危险

consul 的信号量基于 KV 实现,本身 KV 的写入存在性能瓶颈,不建议在高并发下使用。
经测试 4C/8G 的云 mysql 实例能支持 1000+/s tps 的并发,能满足大部分的场景。
如果是超高并发的场景需要使用 redis 计数的方式来实现并发线程、限流等控制。

  1. 使用 mysql 自行实现分布式信号量

    小技巧

    以下基于 mysql 的实现简单对信号量概念做了扩展。实现可以按时间配置的限流控制。如: 50/s 1000/m

    -- semaphore.sql
    CREATE TABLE semaphore (
        limit_key VARCHAR(100) NOT NULL,
        window_start DATETIME NOT NULL,
        request_count INT NOT NULL,
        PRIMARY KEY (limit_key, window_start),
        INDEX idx_window (window_start)
    );
    
    public async Task<IActionResult> Acquire([FromQuery] int second = 1, [FromQuery] int maxCount = 100)
    {
        var current = DateTime.Now;
        var key = "limit_key";
    
        if (await AttemptAcquireAsync(key, second, maxCount, current))
        {
            // do something
    
            return Ok();
        }
    
        return Conflict();
    }
    
    private async Task<bool> AttemptAcquireAsync(string key, int window_seconds, int max, DateTime current)
    {
        using var transaction = await _dbContext.Database.BeginTransactionAsync();
        try
        {
            var window = current.AddSeconds(0 - window_seconds);
    
            FormattableString sumSql = 
    @"
    DELETE FROM rate_limits WHERE limit_key = {key} AND window_start < {window};
    
    SELECT COALESCE(SUM(request_count), 0)
    FROM rate_limits
    WHERE limit_key = {key}
    AND window_start >= {window}
    FOR UPDATE;
    ";
    
            var cnt = (await _dbContext.Database.SqlQuery<int>(sumSql).ToListAsync()).FirstOrDefault();
            if (cnt < max)
            {
                FormattableString increaseSql =
    @"
    INSERT INTO rate_limits (limit_key, window_start, request_count)
    SELECT {key}, {current}, 1
    ON DUPLICATE KEY UPDATE request_count = request_count + 1;
    ";
                await _dbContext.Database.ExecuteSqlInterpolatedAsync(increaseSql);
                await transaction.CommitAsync();
                return true;
            }
    
            await transaction.RollbackAsync();
            return false;
        }
        catch
        {
            await transaction.RollbackAsync();
            return false;
        }
    }
    
  2. 使用 consul 来实现分布式信号量

    // 此段代码是控制调用飞书的同时执行的线程数量。
    public async Task SendSingleMessage() {
        // ...
        // 创建一个分布信号量,此信号量可以同时最多被4个程序获取。
        IDistributedSemaphore semaphore = _consulClient.Semaphore("semaphore/key:4", 4);
        // 等待信号量,如果目前已经有4个程序获取到信号量,则在这里挂起。
        await semaphore.Acquire(CancellationToken.None);
        // 获取到信号量,执行业务代码
        var token = await _tokenService.GetFeishuToken("{ak}");
        var res = await _feiShuApi.SendMessage("account", input, token);
        // 业务执行完成,
        if (semaphore != null && semaphore.IsHeld)
        {
            await semaphore.Release();
        }
    }