主頁 > 知識庫 > 使用Redis實現(xiàn)延時任務的解決方案

使用Redis實現(xiàn)延時任務的解決方案

熱門標簽:南京手機外呼系統(tǒng)廠家 地圖標注工廠入駐 400電話辦理的口碑 四川穩(wěn)定外呼系統(tǒng)軟件 一個地圖標注多少錢 b2b外呼系統(tǒng) 臺灣電銷 廊坊外呼系統(tǒng)在哪買 高碑店市地圖標注app

最近在生產(chǎn)環(huán)境剛好遇到了延時任務的場景,調(diào)研了一下目前主流的方案,分析了一下優(yōu)劣并且敲定了最終的方案。這篇文章記錄了調(diào)研的過程,以及初步方案的實現(xiàn)。

候選方案對比

下面是想到的幾種實現(xiàn)延時任務的方案,總結(jié)了一下相應的優(yōu)勢和劣勢。

方案 優(yōu)勢 劣勢 選用場景
JDK 內(nèi)置的延遲隊列 DelayQueue 實現(xiàn)簡單 數(shù)據(jù)內(nèi)存態(tài),不可靠 一致性相對低的場景
調(diào)度框架和 MySQL 進行短間隔輪詢 實現(xiàn)簡單,可靠性高 存在明顯的性能瓶頸 數(shù)據(jù)量較少實時性相對低的場景
RabbitMQ  DLX  TTL,一般稱為 死信隊列 方案 異步交互可以削峰 延時的時間長度不可控,如果數(shù)據(jù)需要持久化則性能會降低 -
調(diào)度框架和 Redis 進行短間隔輪詢 數(shù)據(jù)持久化,高性能 實現(xiàn)難度大 常見于支付結(jié)果回調(diào)方案
時間輪 實時性高 實現(xiàn)難度大,內(nèi)存消耗大 實時性高的場景

如果應用的數(shù)據(jù)量不高,實時性要求比較低,選用調(diào)度框架和 MySQL 進行短間隔輪詢這個方案是最優(yōu)的方案。但是筆者遇到的場景數(shù)據(jù)量相對比較大,實時性并不高,采用掃庫的方案一定會對 MySQL 實例造成比較大的壓力。記得很早之前,看過一個PPT叫《盒子科技聚合支付系統(tǒng)演進》,其中里面有一張圖片給予筆者一點啟發(fā):

里面剛好用到了調(diào)度框架和 Redis 進行短間隔輪詢實現(xiàn)延時任務的方案,不過為了分攤應用的壓力,圖中的方案還做了分片處理。鑒于筆者當前業(yè)務緊迫,所以在第一期的方案暫時不考慮分片,只做了一個簡化版的實現(xiàn)。

由于PPT中沒有任何的代碼或者框架貼出,有些需要解決的技術點需要自行思考,下面會重現(xiàn)一次整個方案實現(xiàn)的詳細過程。

場景設計

實際的生產(chǎn)場景是筆者負責的某個系統(tǒng)需要對接一個外部的資金方,每一筆資金下單后需要延時30分鐘推送對應的附件。這里簡化為一個訂單信息數(shù)據(jù)延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做 OrderMessage ),訂單消息需要延遲5到15秒后進行異步處理。

否決的候選方案實現(xiàn)思路

下面介紹一下其它四個不選用的候選方案,結(jié)合一些偽代碼和流程分析一下實現(xiàn)過程。

JDK內(nèi)置延遲隊列

DelayQueue 是一個阻塞隊列的實現(xiàn),它的隊列元素必須是 Delayed 的子類,這里做個簡單的例子:

public class DelayQueueMain {
  private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);
  public static void main(String[] args) throws Exception {
    DelayQueueOrderMessage> queue = new DelayQueue>();
    // 默認延遲5秒
    OrderMessage message = new OrderMessage("ORDER_ID_10086");
    queue.add(message);
    // 延遲6秒
    message = new OrderMessage("ORDER_ID_10087", 6);
    queue.add(message);
    // 延遲10秒
    message = new OrderMessage("ORDER_ID_10088", 10);
    queue.add(message);
    ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
      Thread thread = new Thread(r);
      thread.setName("DelayWorker");
      thread.setDaemon(true);
      return thread;
    });
    LOGGER.info("開始執(zhí)行調(diào)度線程...");
    executorService.execute(() -> {
      while (true) {
        try {
          OrderMessage task = queue.take();
          LOGGER.info("延遲處理訂單消息,{}", task.getDescription());
        } catch (Exception e) {
          LOGGER.error(e.getMessage(), e);
        }
      }
    });
    Thread.sleep(Integer.MAX_VALUE);
  }
  private static class OrderMessage implements Delayed {
    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    /**
     * 默認延遲5000毫秒
     */
    private static final long DELAY_MS = 1000L * 5;
    /**
     * 訂單ID
     */
    private final String orderId;
    /**
     * 創(chuàng)建時間戳
     */
    private final long timest
    /**
     * 過期時間
     */
    private final long expire;
    /**
     * 描述
     */
    private final String description;
    public OrderMessage(String orderId, long expireSeconds) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.expire = this.timestamp + expireSeconds * 1000L;
      this.description = String.format("訂單[%s]-創(chuàng)建時間為:%s,超時時間為:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
          LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
    }
    public OrderMessage(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.expire = this.timestamp + DELAY_MS;
      this.description = String.format("訂單[%s]-創(chuàng)建時間為:%s,超時時間為:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F),
          LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F));
    }
    public String getOrderId() {
      return orderId;
    }
    public long getTimestamp() {
      return timest
    }
    public long getExpire() {
      return expire;
    }
    public String getDescription() {
      return description;
    }
    @Override
    public long getDelay(TimeUnit unit) {
      return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
      return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
  }
}

注意一下, OrderMessage 實現(xiàn) Delayed 接口,關鍵是需要實現(xiàn) Delayed#getDelay()Delayed#compareTo() 。運行一下 main() 方法:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 開始執(zhí)行調(diào)度線程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10086]-創(chuàng)建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10087]-創(chuàng)建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延遲處理訂單消息,訂單[ORDER_ID_10088]-創(chuàng)建時間為:2019-08-20 10:16:08,超時時間為:2019-08-20 10:16:18

調(diào)度框架 + MySQL

使用調(diào)度框架對 MySQL 表進行短間隔輪詢是實現(xiàn)難度比較低的方案,通常服務剛上線,表數(shù)據(jù)不多并且實時性不高的情況下應該首選這個方案。不過要注意以下幾點:

MySQL

引入 Quartz 、 MySQL 的Java驅(qū)動包和 spring-boot-starter-jdbc (這里只是為了方便用相對輕量級的框架實現(xiàn),生產(chǎn)中可以按場景按需選擇其他更合理的框架):

dependency>
  groupId>mysql/groupId>
  artifactId>mysql-connector-java/artifactId>
  version>5.1.48/version>
  scope>test/scope>
/dependency>
dependency>
  groupId>org.springframework.boot/groupId>
  artifactId>spring-boot-starter-jdbc/artifactId>
  version>2.1.7.RELEASE/version>
  scope>test/scope>
/dependency>
dependency>
  groupId>org.quartz-scheduler/groupId>
  artifactId>quartz/artifactId>
  version>2.3.1/version>
  scope>test/scope>
/dependency>

假設表設計如下:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;

USE `delayTask`;

CREATE TABLE `t_order_message`
(
  id      BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  order_id   VARCHAR(50) NOT NULL COMMENT '訂單ID',
  create_time DATETIME  NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建日期時間',
  edit_time  DATETIME  NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期時間',
  retry_times TINYINT   NOT NULL DEFAULT 0 COMMENT '重試次數(shù)',
  order_status TINYINT   NOT NULL DEFAULT 0 COMMENT '訂單狀態(tài)',
  INDEX idx_order_id (order_id),
  INDEX idx_create_time (create_time)
) COMMENT '訂單信息表';

# 寫入兩條測試數(shù)據(jù)
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

編寫代碼:

// 常量
public class OrderConstants {

  public static final int MAX_RETRY_TIMES = 5;

  public static final int PENDING = 0;

  public static final int SUCCESS = 1;

  public static final int FAIL = -1;

  public static final int LIMIT = 10;
}

// 實體
@Builder
@Data
public class OrderMessage {

  private Long id;
  private String orderId;
  private LocalDateTime createTime;
  private LocalDateTime editTime;
  private Integer retryTimes;
  private Integer orderStatus;
}

// DAO
@RequiredArgsConstructor
public class OrderMessageDao {

  private final JdbcTemplate jdbcTemplate;

  private static final ResultSetExtractorListOrderMessage>> M = r -> {
    ListOrderMessage> list = Lists.newArrayList();
    while (r.next()) {
      list.add(OrderMessage.builder()
          .id(r.getLong("id"))
          .orderId(r.getString("order_id"))
          .createTime(r.getTimestamp("create_time").toLocalDateTime())
          .editTime(r.getTimestamp("edit_time").toLocalDateTime())
          .retryTimes(r.getInt("retry_times"))
          .orderStatus(r.getInt("order_status"))
          .build());
    }
    return list;
  };

  public ListOrderMessage> selectPendingRecords(LocalDateTime start,
                          LocalDateTime end,
                          ListInteger> statusList,
                          int maxRetryTimes,
                          int limit) {
    StringJoiner joiner = new StringJoiner(",");
    statusList.forEach(s -> joiner.add(String.valueOf(s)));
    return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time = ? " +
            "AND order_status IN (?) AND retry_times  ? LIMIT ?",
        p -> {
          p.setTimestamp(1, Timestamp.valueOf(start));
          p.setTimestamp(2, Timestamp.valueOf(end));
          p.setString(3, joiner.toString());
          p.setInt(4, maxRetryTimes);
          p.setInt(5, limit);
        }, M);
  }

  public int updateOrderStatus(Long id, int status) {
    return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",
        p -> {
          p.setInt(1, status);
          p.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now()));
          p.setLong(3, id);
        });
  }
}

// Service
@RequiredArgsConstructor
public class OrderMessageService {

  private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);

  private final OrderMessageDao orderMessageDao;

  private static final ListInteger> STATUS = Lists.newArrayList();

  static {
    STATUS.add(OrderConstants.PENDING);
    STATUS.add(OrderConstants.FAIL);
  }

  public void executeDelayJob() {
    LOGGER.info("訂單處理定時任務開始執(zhí)行......");
    LocalDateTime end = LocalDateTime.now();
    // 一天前
    LocalDateTime start = end.minusDays(1);
    ListOrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT);
    if (!list.isEmpty()) {
      for (OrderMessage m : list) {
        LOGGER.info("處理訂單[{}],狀態(tài)由{}更新為{}", m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS);
        // 這里其實可以優(yōu)化為批量更新
        orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS);
      }
    }
    LOGGER.info("訂單處理定時任務開始完畢......");
  }
}

// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {

  @Override
  public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
    service.executeDelayJob();
  }

  public static void main(String[] args) throws Exception {
    HikariConfig config = new HikariConfig();
    config.setJdbcUrl("jdbc:mysql://localhost:3306/delayTask?useSSL=falsecharacterEncoding=utf8");
    config.setDriverClassName(Driver.class.getName());
    config.setUsername("root");
    config.setPassword("root");
    HikariDataSource dataSource = new HikariDataSource(config);
    OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
    OrderMessageService service = new OrderMessageService(orderMessageDao);
    // 內(nèi)存模式的調(diào)度器
    StdSchedulerFactory factory = new StdSchedulerFactory();
    Scheduler scheduler = factory.getScheduler();
    // 這里沒有用到IOC容器,直接用Quartz數(shù)據(jù)集合傳遞服務引用
    JobDataMap jobDataMap = new JobDataMap();
    jobDataMap.put("orderMessageService", service);
    // 新建Job
    JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
        .withIdentity("orderMessageDelayJob", "delayJob")
        .usingJobData(jobDataMap)
        .build();
    // 新建觸發(fā)器,10秒執(zhí)行一次
    Trigger trigger = TriggerBuilder.newTrigger()
        .withIdentity("orderMessageDelayTrigger", "delayJob")
        .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
        .build();
    scheduler.scheduleJob(job, trigger);
    // 啟動調(diào)度器
    scheduler.start();
    Thread.sleep(Integer.MAX_VALUE);
  }
}

這個例子里面用了 create_time 做輪詢,實際上可以添加一個調(diào)度時間 schedule_time 列做輪詢,這樣子才能更容易定制空閑時和忙碌時候的調(diào)度策略。上面的示例的運行效果如下:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
 Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
 NOT STARTED.
 Currently in standby mode.
 Number of jobs executed: 0
 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
 Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始執(zhí)行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.mysql.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10, active=0, idle=10, waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time = ? AND order_status IN (?) AND retry_times  ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007', error code '1292', message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10086],狀態(tài)由0更新為1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 處理訂單[10087],狀態(tài)由0更新為1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 訂單處理定時任務開始完畢......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob', class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

RabbitMQ死信隊列

使用 RabbitMQ 死信隊列依賴于 RabbitMQ 的兩個特性: TTLDLX

TTLTime To Live ,消息存活時間,包括兩個維度:隊列消息存活時間和消息本身的存活時間。

DLXDead Letter Exchange ,死信交換器。

畫個圖描述一下這兩個特性:

下面為了簡單起見, TTL 使用了針對隊列的維度。引入 RabbitMQ 的Java驅(qū)動:

dependency>
  groupId>com.rabbitmq/groupId>
  artifactId>amqp-client/artifactId>
  version>5.7.3/version>
  scope>test/scope>
/dependency>

代碼如下:

public class DlxMain {

  private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    Channel producerChannel = connection.createChannel();
    Channel consumerChannel = connection.createChannel();
    // dlx交換器名稱為dlx.exchange,類型是direct,綁定鍵為dlx.key,隊列名為dlx.queue
    producerChannel.exchangeDeclare("dlx.exchange", "direct");
    producerChannel.queueDeclare("dlx.queue", false, false, false, null);
    producerChannel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
    MapString, Object> queueArgs = new HashMap>();
    // 設置隊列消息過期時間,5秒
    queueArgs.put("x-message-ttl", 5000);
    // 指定DLX相關參數(shù)
    queueArgs.put("x-dead-letter-exchange", "dlx.exchange");
    queueArgs.put("x-dead-letter-routing-key", "dlx.key");
    // 聲明業(yè)務隊列
    producerChannel.queueDeclare("business.queue", false, false, false, queueArgs);
    ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
      Thread thread = new Thread(r);
      thread.setDaemon(true);
      thread.setName("DlxConsumer");
      return thread;
    });
    // 啟動消費者
    executorService.execute(() -> {
      try {
        consumerChannel.basicConsume("dlx.queue", true, new DlxConsumer(consumerChannel));
      } catch (IOException e) {
        LOGGER.error(e.getMessage(), e);
      }
    });
    OrderMessage message = new OrderMessage("10086");
    producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
        message.getDescription().getBytes(StandardCharsets.UTF_8));
    LOGGER.info("發(fā)送消息成功,訂單ID:{}", message.getOrderId());

    message = new OrderMessage("10087");
    producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
        message.getDescription().getBytes(StandardCharsets.UTF_8));
    LOGGER.info("發(fā)送消息成功,訂單ID:{}", message.getOrderId());

    message = new OrderMessage("10088");
    producerChannel.basicPublish("", "business.queue", MessageProperties.TEXT_PLAIN,
        message.getDescription().getBytes(StandardCharsets.UTF_8));
    LOGGER.info("發(fā)送消息成功,訂單ID:{}", message.getOrderId());

    Thread.sleep(Integer.MAX_VALUE);
  }

  private static class DlxConsumer extends DefaultConsumer {

    DlxConsumer(Channel channel) {
      super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag,
                  Envelope envelope,
                  AMQP.BasicProperties properties,
                  byte[] body) throws IOException {
      LOGGER.info("處理消息成功:{}", new String(body, StandardCharsets.UTF_8));
    }
  }

  private static class OrderMessage {

    private final String orderId;
    private final long timest
    private final String description;

    OrderMessage(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
      this.description = String.format("訂單[%s],訂單創(chuàng)建時間為:%s", orderId,
          LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F));
    }

    public String getOrderId() {
      return orderId;
    }

    public long getTimestamp() {
      return timest
    }

    public String getDescription() {
      return description;
    }
  }
}

運行 main() 方法結(jié)果如下:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 發(fā)送消息成功,訂單ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發(fā)送消息成功,訂單ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 發(fā)送消息成功,訂單ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10086],訂單創(chuàng)建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10087],訂單創(chuàng)建時間為:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 處理消息成功:訂單[10088],訂單創(chuàng)建時間為:2019-08-20 16:35:58

時間輪

時間輪 TimingWheel 是一種高效、低延遲的調(diào)度數(shù)據(jù)結(jié)構,底層采用數(shù)組實現(xiàn)存儲任務列表的環(huán)形隊列,示意圖如下:

這里暫時不對時間輪和其實現(xiàn)作分析,只簡單舉例說明怎么使用時間輪實現(xiàn)延時任務。這里使用 Netty 提供的 HashedWheelTimer ,引入依賴:

dependency>
  groupId>io.netty/groupId>
  artifactId>netty-common/artifactId>
  version>4.1.39.Final/version>
/dependency>

代碼如下:

public class HashedWheelTimerMain {

  private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

  public static void main(String[] args) throws Exception {
    AtomicInteger counter = new AtomicInteger();
    ThreadFactory factory = r -> {
      Thread thread = new Thread(r);
      thread.setDaemon(true);
      thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
      return thread;
    };
    // tickDuration - 每tick一次的時間間隔, 每tick一次就會到達下一個槽位
    // unit - tickDuration的時間單位
    // ticksPerWhee - 時間輪中的槽位數(shù)
    Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60);
    TimerTask timerTask = new DefaultTimerTask("10086");
    timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
    timerTask = new DefaultTimerTask("10087");
    timer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
    timerTask = new DefaultTimerTask("10088");
    timer.newTimeout(timerTask, 15, TimeUnit.SECONDS);
    Thread.sleep(Integer.MAX_VALUE);
  }

  private static class DefaultTimerTask implements TimerTask {

    private final String orderId;
    private final long timest

    public DefaultTimerTask(String orderId) {
      this.orderId = orderId;
      this.timestamp = System.currentTimeMillis();
    }

    @Override
    public void run(Timeout timeout) throws Exception {
      System.out.println(String.format("任務執(zhí)行時間:%s,訂單創(chuàng)建時間:%s,訂單ID:%s",
          LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), orderId));
    }
  }
}

運行結(jié)果:

任務執(zhí)行時間:2019-08-20 17:19:49.310,訂單創(chuàng)建時間:2019-08-20 17:19:43.294,訂單ID:10086
任務執(zhí)行時間:2019-08-20 17:19:54.297,訂單創(chuàng)建時間:2019-08-20 17:19:43.301,訂單ID:10087
任務執(zhí)行時間:2019-08-20 17:19:59.297,訂單創(chuàng)建時間:2019-08-20 17:19:43.301,訂單ID:10088

一般來說,任務執(zhí)行的時候應該使用另外的業(yè)務線程池,以免阻塞時間輪本身的運動。

選用的方案實現(xiàn)過程

最終選用了基于 Redis 的有序集合 Sorted SetQuartz 短輪詢進行實現(xiàn)。具體方案是:

  • 訂單創(chuàng)建的時候,訂單ID和當前時間戳分別作為 Sorted Set 的member和score添加到訂單隊列 Sorted Set 中。
  • 訂單創(chuàng)建的時候,訂單ID和推送內(nèi)容 JSON 字符串分別作為field和value添加到訂單隊列內(nèi)容 Hash 中。
  • 第1步和第2步操作的時候用 Lua 腳本保證原子性。
  • 使用一個異步線程通過 Sorted Set 的命令 ZREVRANGEBYSCORE 彈出指定數(shù)量的訂單ID對應的訂單隊列內(nèi)容 Hash 中的訂單推送內(nèi)容數(shù)據(jù)進行處理。

對于第4點處理有兩種方案:

  • 方案一:彈出訂單內(nèi)容數(shù)據(jù)的同時進行數(shù)據(jù)刪除,也就是 ZREVRANGEBYSCORE 、 ZREMHDEL 命令要在同一個 Lua 腳本中執(zhí)行,這樣的話 Lua 腳本的編寫難度大,并且由于彈出數(shù)據(jù)已經(jīng)在 Redis 中刪除,如果數(shù)據(jù)處理失敗則可能需要從數(shù)據(jù)庫重新查詢補償。
  • 方案二:彈出訂單內(nèi)容數(shù)據(jù)之后,在數(shù)據(jù)處理完成的時候再主動刪除訂單隊列 Sorted Set 和訂單隊列內(nèi)容 Hash 中對應的數(shù)據(jù),這樣的話需要控制并發(fā),有重復執(zhí)行的可能性。

最終暫時選用了方案一,也就是從 Sorted Set 彈出訂單ID并且從 Hash 中獲取完推送數(shù)據(jù)之后馬上刪除這兩個集合中對應的數(shù)據(jù)。方案的流程圖大概是這樣:

這里先詳細說明一下用到的 Redis 命令。

Sorted Set相關命令

ZADD 命令 - 將一個或多個成員元素及其分數(shù)值加入到有序集當中。

ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN

ZREVRANGEBYSCORE 命令 - 返回有序集中指定分數(shù)區(qū)間內(nèi)的所有的成員。有序集成員按分數(shù)值遞減(從大到小)的次序排列。

ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]

max:分數(shù)區(qū)間 - 最大分數(shù)。 min:分數(shù)區(qū)間 - 最小分數(shù)。 WITHSCORES:可選參數(shù),是否返回分數(shù)值,指定則會返回得分值。 LIMIT:可選參數(shù),offset和count原理和 MySQLLIMIT offset,size 一致,如果不指定此參數(shù)則返回整個集合的數(shù)據(jù)。 ZREM 命令 - 用于移除有序集中的一個或多個成員,不存在的成員將被忽略。

ZREM key member [member ...]

Hash相關命令 HMSET 命令 - 同時將多個field-value(字段-值)對設置到哈希表中。

HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN

HDEL 命令 - 刪除哈希表key中的一個或多個指定字段,不存在的字段將被忽略。

HDEL KEY_NAME FIELD1.. FIELDN

Lua相關 加載 Lua 腳本并且返回腳本的 SHA-1 字符串: SCRIPT LOAD script 。 執(zhí)行已經(jīng)加載的 Lua 腳本: EVALSHA sha1 numkeys key [key ...] arg [arg ...] 。 unpack 函數(shù)可以把 table 類型的參數(shù)轉(zhuǎn)化為可變參數(shù),不過需要注意的是 unpack 函數(shù)必須使用在非變量定義的函數(shù)調(diào)用的最后一個參數(shù),否則會失效,詳細見 Stackoverflow 的提問 table.unpack() only returns the first element 。

PS:如果不熟悉Lua語言,建議系統(tǒng)學習一下,因為想用好Redis,一定離不開Lua。

引入依賴:

dependencyManagement>
  dependencies>
    dependency>
      groupId>org.springframework.boot/groupId>
      artifactId>spring-boot-dependencies/artifactId>
      version>2.1.7.RELEASE/version>
      type>pom/type>
      scope>import/scope>
    /dependency>
  /dependencies>
/dependencyManagement>

dependencies>
  dependency>
    groupId>org.quartz-scheduler/groupId>
    artifactId>quartz/artifactId>
    version>2.3.1/version>
  /dependency>
  dependency>
    groupId>redis.clients/groupId>
    artifactId>jedis/artifactId>
    version>3.1.0/version>
  /dependency>
  dependency>
    groupId>org.springframework.boot/groupId>
    artifactId>spring-boot-starter-web/artifactId>
  /dependency>
  dependency>
    groupId>org.springframework.boot/groupId>
    artifactId>spring-boot-starter-jdbc/artifactId>
  /dependency>  
  dependency>
    groupId>org.springframework/groupId>
    artifactId>spring-context-support/artifactId>
    version>5.1.9.RELEASE/version>
  /dependency> 
  dependency>
    groupId>org.projectlombok/groupId>
    artifactId>lombok/artifactId>
    version>1.18.8/version>
    scope>provided/scope>
  /dependency>
  dependency>
    groupId>com.alibaba/groupId>
    artifactId>fastjson/artifactId>
    version>1.2.59/version>
  /dependency>    
/dependencies>

編寫 Lua 腳本 /lua/enqueue.lua/lua/dequeue.lua

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil

-- /lua/dequeue.lua
-- 參考jesque的部分Lua腳本實現(xiàn)
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回結(jié)果是{'ok':'zset'}這樣子,這里利用next做一輪迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
  if type == 'zset' then
    local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
    if list ~= nil and #list > 0 then
      -- unpack函數(shù)能把table轉(zhuǎn)化為可變參數(shù)
      redis.call('ZREM', zset_key, unpack(list))
      local result = redis.call('HMGET', hash_key, unpack(list))
      redis.call('HDEL', hash_key, unpack(list))
      return result
    end
  end
end
return nil

編寫核心API代碼:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

  private JedisPool jedisPool;

  @Override
  public void afterPropertiesSet() throws Exception {
    jedisPool = new JedisPool();
  }

  public Jedis provide(){
    return jedisPool.getResource();
  }
}

// OrderMessage
@Data
public class OrderMessage {

  private String orderId;
  private BigDecimal amount;
  private Long userId;
}

// 延遲隊列接口
public interface OrderDelayQueue {

  void enqueue(OrderMessage message);

  ListOrderMessage> dequeue(String min, String max, String offset, String limit);

  ListOrderMessage> dequeue();

  String enqueueSha();

  String dequeueSha();
}

// 延遲隊列實現(xiàn)類
@RequiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean {

  private static final String MIN_SCORE = "0";
  private static final String OFFSET = "0";
  private static final String LIMIT = "10";
  private static final String ORDER_QUEUE = "ORDER_QUEUE";
  private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
  private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
  private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
  private static final AtomicReferenceString> ENQUEUE_LUA_SHA = new AtomicReference>();
  private static final AtomicReferenceString> DEQUEUE_LUA_SHA = new AtomicReference>();
  private static final ListString> KEYS = Lists.newArrayList();

  private final JedisProvider jedisProvider;

  static {
    KEYS.add(ORDER_QUEUE);
    KEYS.add(ORDER_DETAIL_QUEUE);
  }

  @Override
  public void enqueue(OrderMessage message) {
    ListString> args = Lists.newArrayList();
    args.add(message.getOrderId());
    args.add(String.valueOf(System.currentTimeMillis()));
    args.add(message.getOrderId());
    args.add(JSON.toJSONString(message));
    try (Jedis jedis = jedisProvider.provide()) {
      jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
    }
  }

  @Override
  public ListOrderMessage> dequeue() {
    // 30分鐘之前
    String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
    return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
  }

  @SuppressWarnings("unchecked")
  @Override
  public ListOrderMessage> dequeue(String min, String max, String offset, String limit) {
    ListString> args = new ArrayList>();
    args.add(max);
    args.add(min);
    args.add(offset);
    args.add(limit);
    ListOrderMessage> result = Lists.newArrayList();
    try (Jedis jedis = jedisProvider.provide()) {
      ListString> eval = (ListString>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args);
      if (null != eval) {
        for (String e : eval) {
          result.add(JSON.parseObject(e, OrderMessage.class));
        }
      }
    }
    return result;
  }

  @Override
  public String enqueueSha() {
    return ENQUEUE_LUA_SHA.get();
  }

  @Override
  public String dequeueSha() {
    return DEQUEUE_LUA_SHA.get();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    // 加載Lua腳本
    loadLuaScript();
  }

  private void loadLuaScript() throws Exception {
    try (Jedis jedis = jedisProvider.provide()) {
      ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
      String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
      String sha = jedis.scriptLoad(luaContent);
      ENQUEUE_LUA_SHA.compareAndSet(null, sha);
      resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
      luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
      sha = jedis.scriptLoad(luaContent);
      DEQUEUE_LUA_SHA.compareAndSet(null, sha);
    }
  }

  public static void main(String[] as) throws Exception {
    DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    JedisProvider jedisProvider = new JedisProvider();
    jedisProvider.afterPropertiesSet();
    RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
    queue.afterPropertiesSet();
    // 寫入測試數(shù)據(jù)
    OrderMessage message = new OrderMessage();
    message.setAmount(BigDecimal.valueOf(10086));
    message.setOrderId("ORDER_ID_10086");
    message.setUserId(10086L);
    message.setTimestamp(LocalDateTime.now().format(f));
    ListString> args = Lists.newArrayList();
    args.add(message.getOrderId());
    // 測試需要,score設置為30分鐘之前
    args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
    args.add(message.getOrderId());
    args.add(JSON.toJSONString(message));
    try (Jedis jedis = jedisProvider.provide()) {
      jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args);
    }
    ListOrderMessage> dequeue = queue.dequeue();
    System.out.println(dequeue);
  }
}

這里先執(zhí)行一次 main() 方法驗證一下延遲隊列是否生效:

[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)]

確定延遲隊列的代碼沒有問題,接著編寫一個 QuartzJob 類型的消費者 OrderMessageConsumer

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

  private static final AtomicInteger COUNTER = new AtomicInteger();
  private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
    return thread;
  });
  private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);

  @Autowired
  private OrderDelayQueue orderDelayQueue;

  @Override
  public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    LOGGER.info("訂單消息處理定時任務開始執(zhí)行......");
    ListOrderMessage> messages = orderDelayQueue.dequeue();
    if (!messages.isEmpty()) {
      // 簡單的列表等分放到線程池中執(zhí)行
      ListListOrderMessage>> partition = Lists.partition(messages, 2);
      int size = partition.size();
      final CountDownLatch latch = new CountDownLatch(size);
      for (ListOrderMessage> p : partition) {
        BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch));
      }
      try {
        latch.await();
      } catch (InterruptedException ignore) {
        //ignore
      }
    }
    stopWatch.stop();
    LOGGER.info("訂單消息處理定時任務執(zhí)行完畢,耗時:{} ms......", stopWatch.getTotalTimeMillis());
  }

  @RequiredArgsConstructor
  private static class ConsumeTask implements Runnable {

    private final ListOrderMessage> messages;
    private final CountDownLatch latch;

    @Override
    public void run() {
      try {
        // 實際上這里應該單條捕獲異常
        for (OrderMessage message : messages) {
          LOGGER.info("處理訂單信息,內(nèi)容:{}", message);
        }
      } finally {
        latch.countDown();
      }
    }
  }
}

上面的消費者設計的時候需要有以下考量:

  • 使用 @DisallowConcurrentExecution 注解不允許 Job 并發(fā)執(zhí)行,其實多個 Job 并發(fā)執(zhí)行意義不大,因為我們采用的是短間隔的輪詢,而 Redis 是單線程處理命令,在客戶端做多線程其實效果不佳。
  • 線程池 BUSINESS_WORKER_POOL 的線程容量或者隊列應該綜合 LIMIT 值、等分訂單信息列表中使用的 size 值以及 ConsumeTask 里面具體的執(zhí)行時間進行考慮,這里只是為了方便使用了固定容量的線程池。
  • ConsumeTask 中應該對每一條訂單信息的處理單獨捕獲異常和吞并異常,或者把處理單個訂單信息的邏輯封裝成一個不拋出異常的方法。

其他 Quartz 相關的代碼:

// Quartz配置類
@Configuration
public class QuartzAutoConfiguration {

  @Bean
  public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setAutoStartup(true);
    factory.setJobFactory(quartzAutowiredJobFactory);
    return factory;
  }

  @Bean
  public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
    return new QuartzAutowiredJobFactory();
  }

  public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware {

    private AutowireCapableBeanFactory autowireCapableBeanFactory;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
      this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory;
    }

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
      Object jobInstance = super.createJobInstance(bundle);
      // 這里利用AutowireCapableBeanFactory從新建的Job實例做一次自動裝配,得到一個原型(prototype)的JobBean實例
      autowireCapableBeanFactory.autowireBean(jobInstance);
      return jobInstance;
    }
  }
}

這里暫時使用了內(nèi)存態(tài)的 RAMJobStore 去存放任務和觸發(fā)器的相關信息,如果在生產(chǎn)環(huán)境最好替換成基于 MySQL 也就是 JobStoreTX 進行集群化,最后是啟動函數(shù)和 CommandLineRunner 的實現(xiàn):

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {

  @Autowired
  private Scheduler scheduler;

  @Autowired
  private JedisProvider jedisProvider;

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Override
  public void run(String... args) throws Exception {
    // 準備一些測試數(shù)據(jù)
    prepareOrderMessageData();
    JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
        .withIdentity("OrderMessageConsumer", "DelayTask")
        .build();
    // 觸發(fā)器5秒觸發(fā)一次
    Trigger trigger = TriggerBuilder.newTrigger()
        .withIdentity("OrderMessageConsumerTrigger", "DelayTask")
        .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
        .build();
    scheduler.scheduleJob(job, trigger);
  }

  private void prepareOrderMessageData() throws Exception {
    DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    try (Jedis jedis = jedisProvider.provide()) {
      ListOrderMessage> messages = Lists.newArrayList();
      for (int i = 0; i  100; i++) {
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(i));
        message.setOrderId("ORDER_ID_" + i);
        message.setUserId((long) i);
        message.setTimestamp(LocalDateTime.now().format(f));
        messages.add(message);
      }
      // 這里暫時不使用Lua
      MapString, Double> map = Maps.newHashMap();
      MapString, String> hash = Maps.newHashMap();
      for (OrderMessage message : messages) {
        // 故意把score設計成30分鐘前
        map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
        hash.put(message.getOrderId(), JSON.toJSONString(message));
      }
      jedis.zadd("ORDER_QUEUE", map);
      jedis.hmset("ORDER_DETAIL_QUEUE", hash);
    }
  }
}

輸出結(jié)果如下:

2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 訂單消息處理定時任務開始執(zhí)行......
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 訂單消息處理定時任務執(zhí)行完畢,耗時:22 ms......
2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 訂單消息處理定時任務開始執(zhí)行......
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 處理訂單信息,內(nèi)容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 訂單消息處理定時任務執(zhí)行完畢,耗時:1 ms......
......

首次執(zhí)行的時候涉及到一些組件的初始化,會比較慢,后面看到由于我們只是簡單打印訂單信息,所以定時任務執(zhí)行比較快。如果在不調(diào)整當前架構的情況下,生產(chǎn)中需要注意:

  • 切換 JobStoreJDBC 模式, Quartz 官方有完整教程,或者看筆者之前翻譯的 Quartz 文檔。
  • 需要監(jiān)控或者收集任務的執(zhí)行狀態(tài),添加預警等等。

這里其實有一個性能隱患,命令 ZREVRANGEBYSCORE 的時間復雜度可以視為為 O(N) , N 是集合的元素個數(shù),由于這里把所有的訂單信息都放進了同一個 Sorted Set ( ORDER_QUEUE )中,所以在一直有新增數(shù)據(jù)的時候, dequeue 腳本的時間復雜度一直比較高,后續(xù)訂單量升高之后會此處一定會成為性能瓶頸,后面會給出解決的方案。

小結(jié)

這篇文章主要從一個實際生產(chǎn)案例的仿真例子入手,分析了當前延時任務的一些實現(xiàn)方案,還基于 RedisQuartz 給出了一個完整的示例。當前的示例只是處于可運行的狀態(tài),有些問題尚未解決。下一篇文章會著眼于解決兩個方面的問題:

  1. 分片。
  2. 監(jiān)控。

還有一點, 架構是基于業(yè)務形態(tài)演進出來的,很多東西需要結(jié)合場景進行方案設計和改進,思路僅供參考,切勿照搬代碼 。

以上所述是小編給大家介紹的使用Redis實現(xiàn)延時任務的解決方案,非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧!

您可能感興趣的文章:
  • golang實現(xiàn)redis的延時消息隊列功能示例
  • 利用Redis實現(xiàn)延時處理的方法實例
  • redis實現(xiàn)延時隊列的兩種方式(小結(jié))

標簽:伊春 甘南 泰州 南寧 河源 定州 畢節(jié) 拉薩

巨人網(wǎng)絡通訊聲明:本文標題《使用Redis實現(xiàn)延時任務的解決方案》,本文關鍵詞  使用,Redis,實現(xiàn),延時,任務,;如發(fā)現(xiàn)本文內(nèi)容存在版權問題,煩請?zhí)峁┫嚓P信息告之我們,我們將及時溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《使用Redis實現(xiàn)延時任務的解決方案》相關的同類信息!
  • 本頁收集關于使用Redis實現(xiàn)延時任務的解決方案的相關信息資訊供網(wǎng)民參考!
  • 推薦文章