模擬一個(gè)電商里面下單減庫(kù)存的場(chǎng)景。
1.首先在redis里加入商品庫(kù)存數(shù)量。
2.新建一個(gè)Spring Boot項(xiàng)目,在pom里面引入相關(guān)的依賴。
dependency>
groupId>org.springframework.boot/groupId>
artifactId>spring-boot-starter-web/artifactId>
/dependency>
dependency>
groupId>org.springframework.boot/groupId>
artifactId>spring-boot-starter-data-redis/artifactId>
/dependency>
3.接下來(lái),在application.yml配置redis屬性和指定應(yīng)用的端口號(hào):
server:
port: 8090
spring:
redis:
host: 192.168.0.60
port: 6379
4.新建一個(gè)Controller類,扣減庫(kù)存第一版代碼:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Objects;
@RestController
public class StockController {
private static final Logger logger = LoggerFactory.getLogger(StockController.class);
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/reduceStock")
public String reduceStock() {
// 從redis中獲取庫(kù)存數(shù)量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 減庫(kù)存
int restStock = stock - 1;
// 剩余庫(kù)存再重新設(shè)置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣減成功,剩余庫(kù)存:{}", restStock);
} else {
logger.info("庫(kù)存不足,扣減失敗。");
}
return "success";
}
}
上面第一版的代碼存在什么問(wèn)題:超賣。假如多個(gè)線程同時(shí)調(diào)用獲取庫(kù)存數(shù)量的代碼,那么每個(gè)線程拿到的都是100,判斷庫(kù)存都大于0,都可以執(zhí)行減庫(kù)存的操作。假如兩個(gè)線程都做減庫(kù)存更新緩存,那么緩存的庫(kù)存變成99,但實(shí)際上,應(yīng)該是減掉2個(gè)庫(kù)存。
那么很多人的第一個(gè)想法是加synchronized同步代碼塊,因?yàn)楂@取數(shù)量和減庫(kù)存不是原子性操作,有多個(gè)線程來(lái)執(zhí)行代碼的時(shí)候,只允許一個(gè)線程執(zhí)行代碼塊里的代碼。那么改完的第二版的代碼如下:
@RequestMapping("/reduceStock")
public String reduceStock() {
synchronized (this) {
// 從redis中獲取庫(kù)存數(shù)量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 減庫(kù)存
int restStock = stock - 1;
// 剩余庫(kù)存再重新設(shè)置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣減成功,剩余庫(kù)存:{}", restStock);
} else {
logger.info("庫(kù)存不足,扣減失敗。");
}
}
return "success";
}
但使用synchronize存在的問(wèn)題,就是只能保證單機(jī)環(huán)境運(yùn)行時(shí)沒(méi)有問(wèn)題的。但現(xiàn)在的軟件公司里,基本上都是集群架構(gòu),是多實(shí)例,前面使用Nginx做負(fù)載均衡,大概架構(gòu)如下:
Nginx分發(fā)請(qǐng)求,把請(qǐng)求發(fā)送到不同的Tomcat容器,而synchronize只能保證一個(gè)應(yīng)用是沒(méi)有問(wèn)題的。
那么代碼改進(jìn)第三版,就是引入redis分布式鎖,具體代碼如下:
@RequestMapping("/reduceStock")
public String reduceStock() {
String lockKey = "stockKey";
try {
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if (!result) {
return "errorCode";
}
// 從redis中獲取庫(kù)存數(shù)量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 減庫(kù)存
int restStock = stock - 1;
// 剩余庫(kù)存再重新設(shè)置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣減成功,剩余庫(kù)存:{}", restStock);
} else {
logger.info("庫(kù)存不足,扣減失敗。");
}
} finally {
stringRedisTemplate.delete(lockKey)
}
return "success";
}
如果有一個(gè)線程拿到鎖,那么其他的線程就會(huì)等待。一定要記得在finally里面把使用完的鎖要?jiǎng)h除掉。否則一旦拋出異常,只有一個(gè)線程會(huì)一直持有鎖,其他線程沒(méi)有機(jī)會(huì)獲取。
但如果在執(zhí)行if (stock > 0) {
代碼塊里的代碼,因?yàn)殄礄C(jī)或重啟沒(méi)有執(zhí)行完,也會(huì)一直持有鎖,所以,這里需要把鎖加一個(gè)超時(shí)時(shí)間:
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
但如果上面兩行代碼在中間執(zhí)行出問(wèn)題了,設(shè)置超時(shí)時(shí)間的代碼還沒(méi)執(zhí)行,也會(huì)出現(xiàn)鎖不能釋放的問(wèn)題。好在有對(duì)應(yīng)的方法:就是把上面兩行代碼設(shè)置成一個(gè)原子操作:
// 這里默認(rèn)設(shè)置超時(shí)時(shí)間為10秒
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
到此為止,如果并發(fā)量不是很大的話,基本上是沒(méi)有問(wèn)題的。
但是,如果請(qǐng)求的并發(fā)量很大,就會(huì)出現(xiàn)新的問(wèn)題:有種比較特殊的情況,第一個(gè)線程執(zhí)行了15秒,但是執(zhí)行到10秒鐘的時(shí)候,鎖已經(jīng)失效釋放了,那么在高并發(fā)場(chǎng)景下,第二個(gè)線程發(fā)現(xiàn)鎖已經(jīng)失效,那么它就可以拿到這把鎖進(jìn)行加鎖,
假設(shè)第二個(gè)線程執(zhí)行需要8秒,它執(zhí)行到5秒鐘后,此時(shí)第一個(gè)線程已經(jīng)執(zhí)行完了,執(zhí)行完那一刻,進(jìn)行了刪除key的操作,但是此時(shí)的鎖是第二個(gè)線程加的,這樣第一個(gè)線程把第二個(gè)線程加的鎖刪掉了。
那意味著第三個(gè)線程又可以拿到鎖,第三個(gè)線程執(zhí)行了3秒鐘,此時(shí)第二個(gè)線程執(zhí)行完畢,那么第二個(gè)線程把第三個(gè)線程的鎖又刪除了。導(dǎo)致鎖失效。
那么解決的思路就是,我自己加的鎖,不要被別人刪掉。那么可以為每個(gè)進(jìn)來(lái)的請(qǐng)求生成一個(gè)唯一的id,作為分布式鎖的值,然后在釋放時(shí),判斷一下當(dāng)前線程的id,是不是和緩存里的id是否相等。
@RequestMapping("/reduceStock")
public String reduceStock() {
String lockKey = "stockKey";
String id = UUID.randomUUID().toString();
try {
// 這里默認(rèn)設(shè)置超時(shí)時(shí)間為30秒
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
if (!result) {
return "errorCode";
}
// 從redis中獲取庫(kù)存數(shù)量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 減庫(kù)存
int restStock = stock - 1;
// 剩余庫(kù)存再重新設(shè)置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣減成功,剩余庫(kù)存:{}", restStock);
} else {
logger.info("庫(kù)存不足,扣減失敗。");
}
} finally {
if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
stringRedisTemplate.delete(lockKey);
}
}
return "success";
}
到此為止,一個(gè)比較完善的鎖就實(shí)現(xiàn)了,可以應(yīng)付大部分場(chǎng)景。
當(dāng)然,上面的代碼還有一個(gè)問(wèn)題,就是一個(gè)線程執(zhí)行時(shí)間超過(guò)了過(guò)期時(shí)間,后面的代碼還沒(méi)有執(zhí)行完,鎖就已經(jīng)刪除了,還是會(huì)有些bug存在。解決的方法是給鎖續(xù)命的操作。
在當(dāng)前主線程獲取到鎖以后,可以fork出一個(gè)線程,執(zhí)行Timer定時(shí)器操作,假如默認(rèn)超時(shí)時(shí)間為30秒,那么定時(shí)器每隔10秒去看下這把鎖還是否存在,存在就說(shuō)明這個(gè)鎖里的邏輯還沒(méi)有執(zhí)行完,那么就可以把當(dāng)前主線程的超時(shí)時(shí)間重新設(shè)置為30秒;如果不存在,就直接結(jié)束掉。
但是上面的邏輯,在高并發(fā)場(chǎng)景下,實(shí)現(xiàn)比較完善還是比較困難的。好在現(xiàn)在已經(jīng)有比較成熟的框架,那就是Redisson。官方地址https://redisson.org。
下面用Redisson來(lái)實(shí)現(xiàn)分布式鎖。
首先引入依賴包:
dependency>
groupId>org.redisson/groupId>
artifactId>redisson/artifactId>
version>3.6.5/version>
/dependency>
配置類:
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
// 單機(jī)模式
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
接下來(lái)用redisson重寫(xiě)上面的減庫(kù)存操作:
@Resource
private Redisson redisson;
@RequestMapping("/reduceStock")
public String reduceStock() {
String lockKey = "stockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
// 加鎖,鎖續(xù)命
redissonLock.lock();
// 從redis中獲取庫(kù)存數(shù)量
int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
if (stock > 0) {
// 減庫(kù)存
int restStock = stock - 1;
// 剩余庫(kù)存再重新設(shè)置到redis中
stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
logger.info("扣減成功,剩余庫(kù)存:{}", restStock);
} else {
logger.info("庫(kù)存不足,扣減失敗。");
}
} finally {
redissonLock.unlock();
}
return "success";
}
其實(shí)就是三個(gè)步驟:獲取鎖,加鎖,釋放鎖。
先簡(jiǎn)單看下Redisson的實(shí)現(xiàn)原理:
這里先說(shuō)一下Redis很多操作使用Lua腳本來(lái)實(shí)現(xiàn)原子性操作,關(guān)于Lua語(yǔ)法,可以去網(wǎng)上找下相關(guān)教程。
使用Lua腳本的好處有:
1.減少網(wǎng)絡(luò)開(kāi)銷,多個(gè)命令可以使用一次請(qǐng)求完成;
2.實(shí)現(xiàn)了原子性操作,Redis會(huì)把Lua腳本作為一個(gè)整體去執(zhí)行;
3.實(shí)現(xiàn)事務(wù),Redis自帶的事務(wù)功能有限,而Lua腳本實(shí)現(xiàn)了事務(wù)的常規(guī)操作,而且還支持回滾。
但是Lua實(shí)際上不會(huì)使用很多,如果Lua腳本執(zhí)行時(shí)間過(guò)長(zhǎng),因?yàn)镽edis是單線程,因此會(huì)導(dǎo)致堵塞。
最后,說(shuō)下Redisson分布式鎖的代碼實(shí)現(xiàn),
找到上面的redissonLock.lock();
lock方法點(diǎn)進(jìn)去,一直點(diǎn)到RedissonLock類里面的lockInterruptibly方法:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 獲取線程id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFutureRedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
重點(diǎn)看下tryAcquire方法,把線程id作為一個(gè)參數(shù)傳遞進(jìn)來(lái),在這個(gè)方法里面,找到tryLockInnerAsync方法點(diǎn)進(jìn)去,
T> RFutureT> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
這里就是一堆Lua腳本,先看第一個(gè)if命令,先去判斷 KEYS[1](就是對(duì)應(yīng)的鎖key的名字),如果不存在,在hashmap里,設(shè)置一個(gè)屬性為線程id,值為1,再把map的過(guò)期時(shí)間設(shè)置為internalLockLeaseTime,這個(gè)值默認(rèn)是30秒,
上面的操作對(duì)應(yīng)的命令是:
hset keyname id:thread 1
pexpire keyname 30
然后返回nil,相當(dāng)于null,那程序return了。
另外,Redisson還支持重入鎖,那第二個(gè)if就是執(zhí)行重入鎖的操作,會(huì)判斷鎖是否存在,并且傳入的線程id是否是當(dāng)前線程的id,若果是,支持重復(fù)加鎖進(jìn)行自增操作;
如果是其他線程調(diào)用lock方法,上面兩個(gè)if判斷不會(huì)走,會(huì)返回鎖剩余過(guò)期時(shí)間。
接著返回到tryAcquireAsync方法里面往下看:
實(shí)際上是加了一個(gè)監(jiān)聽(tīng)器,在監(jiān)聽(tīng)器里面有個(gè)很重要的方法scheduleExpirationRenewal,一看這個(gè)名字就能大概猜出是什么功能,
里面有個(gè)定時(shí)任務(wù)的輪詢,
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 判斷傳遞進(jìn)來(lái)的線程id是否是我們之前主線程設(shè)置的id,如果是,則增加續(xù)命,增加30秒。
RFutureBoolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListenerBoolean>() {
@Override
public void operationComplete(FutureBoolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
接著推遲10秒鐘(internalLockLeaseTime / 3),再執(zhí)行續(xù)命操作邏輯。
到最后,再回到lockInterruptibly方法,如果ttl 為null,說(shuō)明加鎖成功了,就返回null,那如果其他線程的話,就會(huì)返回剩余過(guò)期時(shí)間,那么就會(huì)進(jìn)入到while死循環(huán)里,一直嘗試加鎖,調(diào)用tryAcquire方法,在瑣失效以后,再會(huì)嘗試獲取加鎖。
到此為止,分析完畢。
總結(jié)
到此這篇關(guān)于Redis分布式鎖的使用和實(shí)現(xiàn)原理的文章就介紹到這了,更多相關(guān)Redis分布式鎖的使用和原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- redis分布式鎖之可重入鎖的實(shí)現(xiàn)代碼
- 詳解redis分布式鎖的這些坑
- Java基于redis實(shí)現(xiàn)分布式鎖
- 詳解Redis 分布式鎖遇到的序列化問(wèn)題
- php基于redis的分布式鎖實(shí)例詳解
- Redis分布式鎖升級(jí)版RedLock及SpringBoot實(shí)現(xiàn)方法
- redis分布式鎖的go-redis實(shí)現(xiàn)方法詳解
- redission分布式鎖防止重復(fù)初始化問(wèn)題
- Redis如何實(shí)現(xiàn)分布式鎖詳解