文章

Redis 分布式锁

分布式锁可以将多个并发/并行执行的请求串行化,在分布式系统中,有些业务场景要求在同一时刻只有一个节点线程可以处理某种消息,这时候就可以用分布式锁。常见的分布式锁实现方案有:基于数据库的分布式锁、基于Redis的分布式锁、基于ZooKeeper的分布式锁。

Redis之所以可以做分布式锁,是因为Redis是单线程执行命令的。

Redis分布式锁

下面以扣减库存业务为例,介绍Redis分布式锁:

在没有使用分布式锁之前,扣减库存的逻辑可能是这样的:

public String deductStock(@PathVariable("id") String productId) {
    String stockKey = "stock:" + productId;
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(stockKey));
    if (stock <= 0) {
        System.out.println("扣减失败,库存不足");
        return "fail";
    }
    int currentStock = stock - 1;
    stringRedisTemplate.opsForValue().set(stockKey, String.valueOf(currentStock));
    System.out.println("扣减成功,剩余库存:" + currentStock);
    return "success";
}

上面这段代码,即使是对于单机系统,也是有问题的,使用synchronized同步代码块包裹后才能正确运行:

public String deductStock(@PathVariable("id") String productId) {
    synchronized (this) {  // 使用同步代码块
        String stockKey = "stock:" + productId;
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(stockKey));
        if (stock <= 0) {
            System.out.println("扣减失败,库存不足");
            return "fail";
        }
        int currentStock = stock - 1;
        stringRedisTemplate.opsForValue().set(stockKey, String.valueOf(currentStock));
        System.out.println("扣减成功,剩余库存:" + currentStock);
        return "success";
    }
}

但是互联网公司基本不使用单机系统了,至少也会上集群。由于synchronized同步代码块无法跨JVM生效,这时候就需要分布式锁了。下面是最简单的使用Redis分布式锁的代码:

public String deductStock(@PathVariable("id") String productId) {
    String lockKey = "lock:product:" + productId;
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");  # 加锁
    if (!success) {
        System.out.println("未抢到锁~");
        return "fail";
    }

    String stockKey = "stock:" + productId;
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(stockKey));
    if (stock <= 0) {
        System.out.println("扣减失败,库存不足");
        return "fail";
    }
    int currentStock = stock - 1;
    stringRedisTemplate.opsForValue().set(stockKey, String.valueOf(currentStock));
    System.out.println("扣减成功,剩余库存:" + currentStock);

    stringRedisTemplate.delete(lockKey);  # 释放锁
    return "success";
}

虽然上面这段代码有释放锁的逻辑,但是执行业务代码的时候可能抛出异常,执行不到释放锁的逻辑,需要进行异常捕获:

public String deductStock(@PathVariable("id") String productId) {
    String lockKey = "lock:product:" + productId;
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
    if (!success) {
        System.out.println("未抢到锁~");
        return "fail";
    }

    try {
        String stockKey = "stock:" + productId;
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(stockKey));
        if (stock <= 0) {
            System.out.println("扣减失败,库存不足");
            return "fail";
        }
        int currentStock = stock - 1;
        stringRedisTemplate.opsForValue().set(stockKey, String.valueOf(currentStock));
        System.out.println("扣减成功,剩余库存:" + currentStock);
    } finally {
        stringRedisTemplate.delete(lockKey);  # 改成在finally中释放锁
    }

    return "success";
}

然而finally并不是一定能执行到的,当机器宕机时,finally中释放锁的代码就有可能刚好没执行到,因此需要给锁设置超时时间:

public String deductStock(@PathVariable("id") String productId) {
    String lockKey = "lock:product:" + productId;
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock", 10, TimeUnit.SECONDS);  # 加锁并设置超时时间
    if (!success) {
        System.out.println("未抢到锁~");
        return "fail";
    }

    try {
        String stockKey = "stock:" + productId;
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(stockKey));
        if (stock <= 0) {
            System.out.println("扣减失败,库存不足");
            return "fail";
        }
        int currentStock = stock - 1;
        stringRedisTemplate.opsForValue().set(stockKey, String.valueOf(currentStock));
        System.out.println("扣减成功,剩余库存:" + currentStock);
    } finally {
        stringRedisTemplate.delete(lockKey);
    }

    return "success";
}

目前为止,你是不是以为就OK了?其实在高并发情况下,这段代码还是可能出现问题。当一个获取到锁的线程A执行加锁业务的耗时超过了锁的超时时间,此时锁已经失效,后面会有新的线程B获取到这把锁,然后就会出现线程A释放锁时把线程B加的锁给释放了,这样后面又可以有新的线程C来获取这把锁,这就出现了超卖问题。因此,线程释放锁之前应该先判断是不是自己加的锁:

public String deductStock(@PathVariable("id") String productId) {
    String lockKey = "lock:product:" + productId;
    String clientId = UUID.randomUUID() + ":" + Thread.currentThread().getId();  # 给锁一个唯一的id
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
    if (!success) {
        System.out.println("未抢到锁~");
        return "fail";
    }

    try {
        String stockKey = "stock:" + productId;
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get(stockKey));
        if (stock <= 0) {
            System.out.println("扣减失败,库存不足");
            return "fail";
        }
        int currentStock = stock - 1;
        stringRedisTemplate.opsForValue().set(stockKey, String.valueOf(currentStock));
        System.out.println("扣减成功,剩余库存:" + currentStock);
    } finally {
        if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {  # 释放锁之前判断是不是自己加的锁
            stringRedisTemplate.delete(lockKey);
        }
    }

    return "success";
}

至此,实现的Redis分布式锁已经比较OK了,但是在极端情况下,还是有超卖问题。极端情况下,在释放锁时,有可能获取到锁的线程A判断完当前锁是自己加的后锁刚好过期,这样线程A还是会释放掉其他线程加的锁。一个不彻底的解决方案是增大锁的超时时间,如将超时时间由10秒改为30秒,这样只有特别极端的情况下才会出现问题。另一个彻底的解决方案是锁续命,意思是在某个线程加锁后创建一个它的子线程,在子线程中定时判断主线程有没有执行结束,如果主线程还在执行,就给主线程获取到的锁重置超时时间,使得在主线程执行期间它获取的锁不会过期。市面上已经有很多现成的Redis分布式锁实现,不需要我们再造轮子了,如Redisson实现的分布式锁就用到了锁续命:

Redisson类似Jedis,都是对Java操作Redis的封装,区别在于Redisson主要用在分布式场景

下面是用Redisson分布式锁扣减库存的正确实践:

public String deductStock(@PathVariable("id") String productId) {
    String lockKey = "lock:product:" + productId;
    RLock lock = redissonClient.getLock(lockKey); // 获取锁对象
    lock.lock();
    try {
        String stockKey = "stock:" + productId;
        RBucket<Object> stockBucket = redissonClient.getBucket(stockKey);
        int stock = Integer.parseInt(String.valueOf(stockBucket.get()));
        if (stock <= 0) {
            System.out.println("扣减失败,库存不足");
            return "fail";
        }
        int currentStock = stock - 1;
        stockBucket.set(currentStock);
        System.out.println("扣减成功,剩余库存:" + currentStock);
    } finally {
        lock.unlock();
    }
    return "success";
}

Redisson除了lock(),还有一个tryLock()方法:

如使用这个方法加锁失败,代码会继续往下执行。

Redisson分布式锁源码

lock()加锁相关源码

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 尝试加锁
    // lock acquired
    if (ttl == null) { // ttl为null表示加锁成功
        return;
    }

    // 下面是加锁失败的逻辑:
    CompletableFuture<RedissonLockEntry> future = subscribe(threadId); // 订阅主线程的锁释放消息,避免锁释放了tryAcquire()还在无谓地等待
    pubSub.timeout(future);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) { // 自旋尝试加锁
            ttl = tryAcquire(-1, leaseTime, unit, threadId); // 更新ttl
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); // 让出CPU等待ttl时间,如收到锁释放消息,则取消等待,再次尝试加锁
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(entry, threadId); // 取消订阅主线程的锁释放消息
    }
}

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
    @Override
    protected void onMessage(RedissonLockEntry value, Long message) { // 每个等待的线程收到锁释放消息都会执行的逻辑
        if (message.equals(UNLOCK_MESSAGE)) { // 判断是不是锁释放消息
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }
            value.getLatch().release(); // 释放许可,使得tryAcquire()可以获取到许可恢复执行
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // leaseTime默认为-1,执行这行代码,去尝试加锁
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) { // 获取到了锁
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                scheduleExpirationRenewal(threadId);  // leaseTime默认为-1,执行这行代码,创建后台线程给锁续命
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +           // 如果锁不存在,则执行下面的代码分支
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  // 设置当前线程加锁成功
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +      // 设置锁超时时间
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 如果当前线程已经加过锁,则执行下面的代码分支
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                    // 将当前线程的锁重入次数+1(实现可重入锁)
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +                        // 重置锁的超时时间
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",  // 返回锁还剩多少时间过期
            Collections.singletonList(getRawName()),  // KEYS[1]:锁KEY
            unit.toMillis(leaseTime),                                 // ARGV[1]:锁超时时间
            getLockName(threadId)                                // ARGV[2]:标识当前线程的加锁id
    );
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            CompletionStage<Boolean> future = renewExpirationAsync(threadId); // 重置锁的超时时间
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration(); // 
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // run()方法延迟执行时间,默认为10s
    
    ee.setTimeout(task);
}

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 如果当前线程有获取到锁,执行下面的代码
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +                // 重置锁的超时时间(默认为30s)
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

unlock()解锁相关源码

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RFuture<Boolean> future = unlockInnerAsync(threadId); // 释放锁

    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId); // 后台线程停止锁续命

        if (e != null) {
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  // 当前线程没有加锁,直接返回
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  // 将当前线程的锁重入次数-1
                    "if (counter > 0) then " +  // 还有锁没退出,重置锁超时时间
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +  // 已经退出了所有锁,释放锁
                    "redis.call('publish', KEYS[2], ARGV[1]); " +  // 发布锁释放消息
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Redis主从架构锁失效问题

Redis默认是异步同步数据到从节点,因此有可能主节点设置锁成功后立马宕机了,导致锁没有来得及同步给从节点,这就是锁丢失问题。

ZooKeeper也有主从节点,它使用半数写入机制避免了这个问题,即超过半数节点写入成功才算写入成功。由于ZooKeeper偏向于CP架构,性能不如偏向于AP架构的Redis,因此互联网公司一般选择使用Redis。那如何解决Redis的锁丢失问题呢?

(1) 一个可能的方案是使用RedLock分布式锁,RedLock要求同时多个彼此独立的Redis实例上获取锁,当这些独立的节点都没有从节点时,极端情况下才会丢失锁。但是为了更高的可用性,这些独立的节点需要加上从节点,这样的话,又会有锁丢失问题,并不能解决问题,因此不推荐使用RedLock

(2) 既然RedLock不行,那么还有什么解决方法呢?

一个可行的方案是使用Zookeeper替代Redis解决分布式锁失效问题。

在向Zookeeper的主节点写入数据时,并不是立刻告诉客户端写入成功,而是先同步给从节点,至少半数的节点同步成功才能返回“写入成功”给客户端。这个时候如果Zookeeper的主节点挂了,Zookeeper的ZAB分布式一致性协议能保证一定是数据同步完成的从节点被选举为主节点,所以就不会发生分布式锁的失效问题。

License:  CC BY 4.0