使用場景
工作中大家往往會遇到類似的場景:
1.對于紅包場景,賬戶 A 對賬戶 B 發(fā)出紅包通常在 1 天后會自動歸還到原賬戶。
2.對于實時支付場景,如果賬戶 A 對商戶 S 付款 100 元,5秒后沒有收到支付方回調(diào)將自動取消訂單。
解決方案分析
方案一:
采用通過定時任務(wù)采用數(shù)據(jù)庫/非關(guān)系型數(shù)據(jù)庫輪詢方案。
優(yōu)點:
1. 實現(xiàn)簡單,對于項目前期這樣是最容易的解決方案。
缺點:
1. DB 有效使用率低,需要將一部分的數(shù)據(jù)庫的QPS分配給 JOB 的無效輪詢。
2. 服務(wù)資源浪費,因為輪詢需要對所有的數(shù)據(jù)做一次 SCAN 掃描 JOB 服務(wù)的資源開銷很大。
方案二:
采用延遲隊列:
優(yōu)點:
1. 服務(wù)的資源使用率較高,能夠精確的實現(xiàn)超時任務(wù)的執(zhí)行。
2. 減少 DB 的查詢次數(shù),能夠降低數(shù)據(jù)庫的壓力
缺點:
1. 對于延遲隊列來說本身設(shè)計比較復(fù)雜,目前沒有通用的比較好過的方案。
基于 Redis 的延遲隊列實現(xiàn)
基于以上的分析,我決定通過 Redis 來實現(xiàn)分布式隊列。
設(shè)計思路:
1. 第一步將需要發(fā)放的消息發(fā)送到延遲隊列中。
2. 延遲隊列將數(shù)據(jù)存入 Redis 的 ZSet 有序集合中score 為當(dāng)前時間戳,member 存入需要發(fā)送的數(shù)據(jù)。
3. 添加一個 schedule 來進行對 Redis 有序隊列的輪詢。
4. 如果到達(dá)達(dá)到消息的執(zhí)行時間,那么就進行業(yè)務(wù)的執(zhí)行。
5. 如果沒有達(dá)到消息的執(zhí)行是將,那么消息等待下輪執(zhí)行。
實現(xiàn)步驟:
由于本處篇幅有限,所以只列舉部分代碼,完整的代碼可以在本文最后訪問 GitHub 獲取。由于本人閱歷/水平有限,如有建議/或更正歡迎留言或提問。先在此謝謝大家駐足閱讀 👏 👏 👏。
需要注意的問題:
單個 Redis 命令的執(zhí)行是原子性的,但 Redis 沒有在事務(wù)上增加任何維持原子性的機制,所以 Redis 事務(wù)的執(zhí)行并不是原子性的。
事務(wù)可以理解為一個打包的批量執(zhí)行腳本,但批量指令并非原子化的操作,中間某條指令的失敗不會導(dǎo)致前面已做指令的回滾,也不會造成后續(xù)的指令不做。
我們可以通過 Redis 的 eval 命令來執(zhí)行 lua 腳本來保證原子性實現(xiàn)Redis的事務(wù)。
實現(xiàn)步驟如下:
1. 延遲隊列接口
/**
* 延遲隊列
*
* @author zhengsh
* @date 2020-03-27
*/
public interface RedisDelayQueueE extends DelayMessage> {
String META_TOPIC_WAIT = "delay:meta:topic:wait";
String META_TOPIC_ACTIVE = "delay:meta:topic:active";
String TOPIC_ACTIVE = "delay:active:9999";
/**
* 拉取消息
*/
void poll();
/**
* 推送延遲消息
*
* @param e
*/
void push(E e);
}
2. 延遲隊列消息
/**
* 消息體
*
* @author zhengsh
* @date 2020-03-27
*/
@Setter
@Getter
public class DelayMessage {
/**
* 消息唯一標(biāo)識
*/
private String id;
/**
* 消息主題
*/
private String topic = "default";
/**
* 具體消息 json
*/
private String body;
/**
* 延時時間, 格式為時間戳: 當(dāng)前時間戳 + 實際延遲毫秒數(shù)
*/
private Long delayTime = System.currentTimeMillis() + 30000L;
/**
* 消息發(fā)送時間
*/
private LocalDateTime createTime;
}
3. 延遲隊列實現(xiàn)
/**
* 延遲隊列實現(xiàn)
*
* @author zhengsh
* @date 2020-03-27
*/
@Component
public class RedisDelayQueueImplE extends DelayMessage> implements RedisDelayQueueE> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void poll() {
// todo
}
/**
* 發(fā)送消息
*
* @param e
*/
@SneakyThrows
@Override
public void push(E e) {
try {
String jsonStr = JSON.toJSONString(e);
String topic = e.getTopic();
String zkey = String.format("delay:wait:%s", topic);
String u =
"redis.call('sadd', KEYS[1], ARGV[1])\n" +
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" +
"return 1";
Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)};
Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)};
Long result = redisTemplate.execute((RedisCallbackLong>) connection -> {
Object nativeConnection = connection.getNativeConnection();
if (nativeConnection instanceof RedisAsyncCommands) {
RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
} else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
}
return 0L;
});
logger.info("延遲隊列[1],消息推送成功進入等待隊列({}), topic: {}", result != null result > 0, e.getTopic());
} catch (Throwable t) {
t.printStackTrace();
}
}
private byte[] serialize(String key) {
RedisSerializerString> stringRedisSerializer =
(RedisSerializerString>) redisTemplate.getKeySerializer();
//lettuce連接包下序列化鍵值,否則無法用默認(rèn)的ByteArrayCodec解析
return stringRedisSerializer.serialize(key);
}
}
4. 定時任務(wù)
/**
* 分發(fā)任務(wù)
*/
@Component
public class DistributeTask {
private static final String LUA_SCRIPT;
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private StringRedisTemplate redisTemplate;
static {
StringBuilder sb = new StringBuilder(128);
sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n");
sb.append("if(next(val) ~= nil) then\n");
sb.append(" redis.call('sadd', KEYS[2], ARGV[2])\n");
sb.append(" redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n");
sb.append(" for i = 1, #val, 100 do\n");
sb.append(" redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n");
sb.append(" end\n");
sb.append(" return 1\n");
sb.append("end\n");
sb.append("return 0");
LUA_SCRIPT = sb.toString();
}
/**
* 2秒鐘掃描一次執(zhí)行隊列
*/
@Scheduled(cron = "0/5 * * * * ?")
public void scheduledTaskByCorn() {
try {
SetString> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT);
assert members != null;
for (String k : members) {
if (!redisTemplate.hasKey(k)) {
// 如果 KEY 不存在元數(shù)據(jù)中刪除
redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k);
continue;
}
String lk = k.replace("delay:wait", "delay:active");
Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)};
Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)};
Long result = redisTemplate.execute((RedisCallbackLong>) connection -> {
Object nativeConnection = connection.getNativeConnection();
if (nativeConnection instanceof RedisAsyncCommands) {
RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
} else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
}
return 0L;
});
logger.info("延遲隊列[2],消息到期進入執(zhí)行隊列({}): {}", result != null result > 0, TOPIC_ACTIVE);
}
} catch (Throwable t) {
t.printStackTrace();
}
}
private byte[] serialize(String key) {
RedisSerializerString> stringRedisSerializer =
(RedisSerializerString>) redisTemplate.getKeySerializer();
//lettuce連接包下序列化鍵值,否則無法用默認(rèn)的ByteArrayCodec解析
return stringRedisSerializer.serialize(key);
}
}
GitHub 地址
https://github.com/zhengsh/redis-delay-queue
參考地址
1.https://www.runoob.com/redis/redis-transactions.html
到此這篇關(guān)于基于Redis延遲隊列的實現(xiàn)代碼的文章就介紹到這了,更多相關(guān)Redis 延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- SpringBoot集成Redisson實現(xiàn)延遲隊列的場景分析
- php使用redis的有序集合zset實現(xiàn)延遲隊列應(yīng)用示例
- Redis延遲隊列和分布式延遲隊列的簡答實現(xiàn)