近些年,Redis憑借在性能、穩定性和高可擴展性上的卓越表現,基本上已經成了互聯網行業緩存中間件的標配,甚至很多傳統行業也在使用Redis。那么我們在使用Redis等緩存中間件時,要注意哪些問題呢?本文咱們就來聊聊,我們使用緩存中間件過程中曾經遇到的坑!
緩存穿透
先看一個常見的緩存使用方式。請求來了,先查緩存,緩存有值就直接返回;緩存沒值,查數據庫,然后把數據庫的值存到緩存,再返回。
假如緩存沒查到某個值,查數據庫也沒這個值,也就是說要查的值根本不存在,這樣就會導致每次對這個值的查詢請求都會穿透到數據庫。這就是所謂的“緩存穿透”。
如何避免緩存穿透?
如果從數據庫中沒查到值,可以在緩存中記錄一個空值,來避免“緩存穿透”。并且要給這個空值設置一個較短的過期時間。
比如說,我們經常會把用戶信息緩存到Redis。如果調用方傳了一個不存在的UserID,在緩存中就查不到這個用戶信息,然后去DB也查不到。這樣就會導致,每次根據這個UserID查用戶信息,都會穿透到數據庫,給數據庫造成了壓力。為了避免緩存穿透,當數據庫查不到時,我們可以在緩存中記錄一條空數據,比如userID做為key,空json做為值,如果程序獲得這個空json,就按用戶不存在處理。再給這個key設置一個很短的過期時間,比如30秒。
緩存雪崩
我們經常會遇到需要初始化緩存的情況。比如說用戶系統重構,表結構發生了變化,緩存信息也要變,上線前需要初始化緩存,將用戶信息批量存入緩存。假如我們給這些用戶信息設置相同的過期時間,到過期時間點所有用戶信息的緩存記錄就會同時集中失效,導致大量請求瞬間打到數據庫,數據庫很可能會被搞掛。這種緩存集中失效,導致大量請求同時穿透到數據庫的情況,就是所謂的“雪崩效應”。
所以,當我們向緩存初始化數據時,要保證每個緩存記錄過期時間的離散性??梢圆捎靡粋€較大的固定值加上一個較小的隨機值。比如過期時間可以是:10小時 + 0到3600秒的隨機值。
緩存并發
當系統并發很高,緩存數據尤其是熱點數據過期后,可能會出現多個請求同時訪問數據庫并設置緩存的情況,不但給數據庫帶來壓力,而且會有緩存頻繁更新的問題。
我們可以通過加鎖來避免緩存并發問題。如果從緩存查不到數據,對查詢數據加分布式鎖,然后查數據庫并把數據庫查詢結果放入緩存。其他線程等待鎖釋放后,直接從緩存取值。
比如,電商系統會緩存商品SKU價格,一些熱點商品的并發訪問會非常高。當緩存過期失效后,訪問請求從緩存查不到記錄,此時可以用商品SKU ID為Key加分布式鎖,然后從數據庫查詢價格并把價格放入緩存,最后解鎖。解鎖后其他請求就可以從緩存直接取值了。從而避免了數據庫的壓力。
分布式鎖
以我們之前做過的5人拼團為例。如果有用戶參加團購,我們需要先校驗參團人數是否達到了上限5人。如果沒達到5人,用戶才可以參團。偽代碼如下:
//根據拼團ID獲取目前參團成員數量
int numOfMembers = pinTuanService.getNumOfMembersById(pinTuanID);
if(numOfMembers < 5) {
pinTuanService.pintuan();//執行,加入拼團,生單等邏輯
}
高并發場景下,上面的代碼會有很嚴重的問題。如果某個團當前的參團人數是4,這時有兩個用戶同時參團,用戶A和用戶B的請求同時進入上面的代碼塊,A和B的請求同時執行到第2行代碼,獲取的numOfMembers都是4,表達式 numOfMembers < 5 成立,所以兩個用戶都能執行到第4行代碼,就是說A用戶和B用戶都能成功參加拼團。于是,參團人數就超過了5人的上限。所以我們就需要加鎖來避免這個問題。synchronized行嗎?不行。因為我們的服務是多節點部署的,所以要加分布式鎖。代碼如下:
boolean aquired = distributedLock.aquireLock(pinTuanID, 3000);
if(aquired == true) {
try{
//根據拼團ID獲取目前參團成員數量
int numOfMembers = pinTuanService.getNumOfMembersById(pinTuanID);
if(numOfMembers < 5) {
pinTuanService.pintuan();//執行,加入拼團,生單等邏輯
}
} finally {
distributedLock.releaseLock(pinTuanID);
}
}
這樣就好多啦!接下來我們看看基于Redis分布式鎖的實現,以及特別要注意的問題。一般我們會基于setnx實現Redis分布式鎖。setnx命令可以檢查key是否存在,如果key不存在,就在Redis中創建一個鍵值對(操作成功),如果key已經存在就放棄執行(操作失敗)。
先看一段基于Springboot實現的加鎖和釋放鎖的代碼:
@Component
public class DistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加鎖
* lockKey,redis的key
* expireTime,過期時間,單位是毫秒
* 注:setIfAbsent方法就使用了redis的setnx
*/
public boolean aquireLock(String lockKey, long expireTime) {
long waitTime = 0;
boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, "distributedLock",
expireTime, TimeUnit.MILLISECONDS);
if(success == true){
return success;
} else {
//如果加鎖失敗,循環重試加鎖
while(success != true && waitTime < 5000L ) {
success = redisTemplate.opsForValue().setIfAbsent(lockKey, "distributedLock",
expireTime, TimeUnit.MILLISECONDS);
sleep 100毫秒;
waitTime += 100L;
}
}
return success;
}
/**
* 釋放鎖
* lockKey,redis的key
*/
public void releaseLock(String lockKey) {
redisTemplate.delete(lockKey);
}
}
上面的代碼。乍一看,好像沒什么問題!加鎖失敗有循環重試加鎖,過期時間設置了,而且也保證了創建Key-Value鍵值對和設置過期時間的原子性,這樣當程序沒有正常釋放鎖時,也能保證過期后鎖自動釋放(注意:redis較老的版本不支持 setnx 和設置過期時間的原子操作,不過可以利用Lua腳本來保證原子性)。
我們再仔細思考一下,一般場景我們會對Key設置一個很短的過期時間,當一次操作因為網絡等原因耗費了較長時間,操作還沒完成key就過期失效了。這樣會產生什么問題呢?我們還是以拼團為例加以說明,先看看下面這張圖:
如上圖,用戶A和用戶B同時參加同一團,團ID為 001,我們以團ID作為分布式鎖的Key,"distributedLock" 作為固定的Value,過期時間是5秒。A先獲取分布式鎖,但是由于網絡等原因A的拼團操作在5秒內沒完成,這時Key過期并從Redis清除掉,A的分布式鎖失效。此時用戶B拿到分布式鎖,Key也同樣是團ID 001。在用戶B的拼團邏輯執行完之前,用戶A的邏輯先執行完了,緊接著A就把鎖給釋放了。不過A的鎖早已經過期失效了,B持有鎖的Key和A又完全一樣,所以此時A釋放的其實是B的鎖。這樣一來整個拼團還是有可能會超員。怎么解決呢?
我們可以把分布式鎖的Value設成可以區分的值,比如拼團的場景Value可以設置為userID,在釋放鎖的時候根據key和value來判斷當前的鎖是不是自己的,只有Redis中userID和自己的userID相同才釋放鎖。
改進后的代碼如下:
@Component
public class DistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加鎖
* lockKey,redis的key
* expireTime,過期時間,單位是毫秒
* 注:setIfAbsent方法就使用了redis的setnx
*/
public boolean aquireLock(String lockKey, String userID, long expireTime) {
long waitTime = 0;
boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, userID,
expireTime, TimeUnit.MILLISECONDS);
if(success == true){
return success;
} else {
//如果加鎖失敗,循環重試加鎖
while(success != true && waitTime < 5000L ) {
success = redisTemplate.opsForValue().setIfAbsent(lockKey, userID,
expireTime, TimeUnit.MILLISECONDS);
sleep 100毫秒;
waitTime += 100L;
}
}
return success;
}
/**
* 釋放鎖
* lockKey,redis的key
*/
public void releaseLock(String lockKey, String userID) {
String userIDFromRedis = redisTemplate.get(lockKey);
if( userID.equals(userIDFromRedis) ) {
redisTemplate.delete(lockKey);
}
}
}
還有一種場景需要考慮。當Redis master發生故障,主備切換時往往會造成數據丟失,包括分布式鎖的Key-Value 也可能丟失。這樣就會導致操作還沒執行完,鎖就被其他請求拿到了。Redis官方提供了Redlock算法,以及相應的開源實現 Redisson。用到分布式鎖的場景,大家可以直接使用 Redisson,非常方便。如果系統對可靠性要求很高,如需用到分布式鎖,建議使用 Zookeeper,etcd 等。