redis作延迟队列简单实现
使用场景:
1)支付倒计时:超过多少时间未支付,直接取消订单
2)系统默认评论:订单签收后,执行系统默认好评
3)配送超时,推送短信提醒
......
第一种:使用springboot集成redisson实现延迟队列
原博文连接:SpringBoot集成Redisson实现延迟队列_michael's blog-CSDN博客
依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.10.7</version></dependency>
1)redisson配置类
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private int port;@Value("${spring.redis.database}")private int database;
// @Value("${spring.redis.password}")
// private String password;@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(database);
// .setPassword(password);return Redisson.create(config);}
}
2)类型枚举
RedisDelayQueueEnum:相当于一个topic
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderPaymentTimeout"),ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");/*** 延迟队列 Redis Key*/private String code;/*** 中文描述*/private String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private String beanId;}
3)延迟队列操作类
@Slf4j
@Component
public class RedisDelayQueueUtil {@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟队列* @param value 队列值* @param delay 延迟时间* @param timeUnit 时间单位* @param queueCode 队列键* @param <T>*/public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(value, delay, timeUnit);log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.error("(添加延时队列失败) {}", e.getMessage());throw new RuntimeException("(添加延时队列失败)");}}/*** 获取延迟队列* @param queueCode* @param <T>* @return* @throws InterruptedException*/public <T> T getDelayQueue(String queueCode) throws InterruptedException {RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);T value = (T) blockingDeque.take();return value;}
}
4)延迟队列执行器
public interface RedisDelayQueueHandle<T> {void execute(T t);
}
5)订单支付超时处理类
@Component
@Slf4j
public class OrderPaymentTimeout implements RedisDelayQueueHandle<Map> {@Overridepublic void execute(Map map) {log.info("(收到订单支付超时延迟消息) {}", map);// TODO 订单支付超时,自动取消订单处理业务...}
}
6)订单超时未评价处理类
@Component
@Slf4j
public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle<Map> {@Overridepublic void execute(Map map) {log.info("(收到订单超时未评价延迟消息) {}", map);// TODO 订单超时未评价,系统默认好评处理业务...}
}
7)启动延迟队列消费者
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {@Autowiredprivate RedisDelayQueueUtil redisDelayQueueUtil;@Overridepublic void run(String... args) {new Thread(() -> {while (true){try {RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();for (RedisDelayQueueEnum queueEnum : queueEnums) {Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());if (value != null) {RedisDelayQueueHandle redisDelayQueueHandle = SpringUtil.getBean(queueEnum.getBeanId());redisDelayQueueHandle.execute(value);}}} catch (InterruptedException e) {log.error("(Redis延迟队列异常中断) {}", e.getMessage());}}}).start();log.info("(Redis延迟队列启动成功)");}
}
8)测试延迟队列
@RestController
public class RedisDelayQueueController {@Autowiredprivate RedisDelayQueueUtil redisDelayQueueUtil;@PostMapping("/addQueue")public void addQueue() {Map<String, String> map1 = new HashMap<>();map1.put("orderId", "100");map1.put("remark", "订单支付超时,自动取消订单");Map<String, String> map2 = new HashMap<>();map2.put("orderId", "200");map2.put("remark", "订单超时未评价,系统默认好评");// 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());// 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode());}}
第二种:使用redis实现延迟队列
原博文:【DB系列】借助redis来实现延时队列(应用篇) | 一灰灰Blog (hhui.top)
依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- 下面这里两个非必须,主要是后面的实现演示使用到了 -->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId>
</dependency>
配置:
server:port: 8081
spring:redis:port: 6379host: 127.0.0.1database: 1
1)延迟队列操作类
@Component
public class RedisDelayListWrapper implements ApplicationContextAware {/*** 定义删除成功的状态*/private static final Long DELETE_SUCCESS = 1L;/*** 存放topic值*/private Set<String> topic = new CopyOnWriteArraySet<>();@Autowiredprivate StringRedisTemplate redisTemplate;private ApplicationContext applicationContext;/*** 命中数据* @param key* @return*/public String fetchOne(String key) {Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);if (CollectionUtils.isEmpty(sets)) {return null;}for (String set : sets) {if (Objects.equals(DELETE_SUCCESS, redisTemplate.opsForZSet().remove(key, set))) {//删除成功,表示在众多实例中拿到该资源return set;}}return null;}/*** 将任务添加进延迟队列* @param key* @param value* @param delayTime*/public void publish(String key, Object value, long delayTime) {topic.add(key);String strVal = value instanceof String ? (String) value : JSONObject.toJSONString(value);redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime);}/*** 将任务作为一个事件发布到applicationContext容器中,使用时开启时间监听即可*/@Scheduled(fixedRate = 10_000)public void schedule() {for (String specialTopic : topic) {String cell = fetchOne(specialTopic);if (!ObjectUtils.isEmpty(cell)) {applicationContext.publishEvent(new DelayMsg(this,cell,specialTopic));}}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}/*** 封装msg和topic*/@ToStringpublic static class DelayMsg extends ApplicationEvent{@Getterprivate String msg;@Getterprivate String topic;public DelayMsg(Object source,String msg,String topic){super(source);this.msg = msg;this.topic = topic;}}
}
2)自定义监听消费注解
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {String topic();
}
3)aop拦截自定义注解
@Aspect
@Component
public class ConsumerAspect {@Around("@annotation(consumer)")public Object around(ProceedingJoinPoint joinPoint,Consumer consumer) throws Throwable {Object[] args = joinPoint.getArgs();boolean check = false;for (Object arg : args) {if (arg instanceof RedisDelayListWrapper.DelayMsg){check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) arg).getTopic());}}if (!check){//不满足条件,直接忽略return null;}//topic匹配成功,执行return joinPoint.proceed();}
}
4)延迟队列测试接口
@RestController
public class DelayQueueController {private static final String TEST_DELAY_QUEUE = "test";private static final String DEMO_DELAY_QUEUE = "demo";@Autowiredprivate RedisDelayListWrapper redisDelayListWrapper;private Random random = new Random();@GetMapping(path = "publish")public String publish(String msg, Long delayTime) {if (delayTime == null) {delayTime = 10_000L;}String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE;msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime);redisDelayListWrapper.publish(queue, msg, delayTime);System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now());return "success!";}@Consumer(topic = TEST_DELAY_QUEUE)public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) {System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());}@Consumer(topic = DEMO_DELAY_QUEUE)public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) {System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());}
}
5)启动类开启定时任务注解
@SpringBootApplication
@EnableScheduling
public class UserApplication {public static void main(String[] args) {SpringApplication.run(UserApplication.class, args);}}
仅做学习参考
redis作延迟队列简单实现
使用场景:
1)支付倒计时:超过多少时间未支付,直接取消订单
2)系统默认评论:订单签收后,执行系统默认好评
3)配送超时,推送短信提醒
......
第一种:使用springboot集成redisson实现延迟队列
原博文连接:SpringBoot集成Redisson实现延迟队列_michael's blog-CSDN博客
依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.10.7</version></dependency>
1)redisson配置类
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private int port;@Value("${spring.redis.database}")private int database;
// @Value("${spring.redis.password}")
// private String password;@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(database);
// .setPassword(password);return Redisson.create(config);}
}
2)类型枚举
RedisDelayQueueEnum:相当于一个topic
@Getter
@NoArgsConstructor
@AllArgsConstructor
public enum RedisDelayQueueEnum {ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderPaymentTimeout"),ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");/*** 延迟队列 Redis Key*/private String code;/*** 中文描述*/private String name;/*** 延迟队列具体业务实现的 Bean* 可通过 Spring 的上下文获取*/private String beanId;}
3)延迟队列操作类
@Slf4j
@Component
public class RedisDelayQueueUtil {@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟队列* @param value 队列值* @param delay 延迟时间* @param timeUnit 时间单位* @param queueCode 队列键* @param <T>*/public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){try {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);delayedQueue.offer(value, delay, timeUnit);log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");} catch (Exception e) {log.error("(添加延时队列失败) {}", e.getMessage());throw new RuntimeException("(添加延时队列失败)");}}/*** 获取延迟队列* @param queueCode* @param <T>* @return* @throws InterruptedException*/public <T> T getDelayQueue(String queueCode) throws InterruptedException {RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);T value = (T) blockingDeque.take();return value;}
}
4)延迟队列执行器
public interface RedisDelayQueueHandle<T> {void execute(T t);
}
5)订单支付超时处理类
@Component
@Slf4j
public class OrderPaymentTimeout implements RedisDelayQueueHandle<Map> {@Overridepublic void execute(Map map) {log.info("(收到订单支付超时延迟消息) {}", map);// TODO 订单支付超时,自动取消订单处理业务...}
}
6)订单超时未评价处理类
@Component
@Slf4j
public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle<Map> {@Overridepublic void execute(Map map) {log.info("(收到订单超时未评价延迟消息) {}", map);// TODO 订单超时未评价,系统默认好评处理业务...}
}
7)启动延迟队列消费者
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {@Autowiredprivate RedisDelayQueueUtil redisDelayQueueUtil;@Overridepublic void run(String... args) {new Thread(() -> {while (true){try {RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();for (RedisDelayQueueEnum queueEnum : queueEnums) {Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());if (value != null) {RedisDelayQueueHandle redisDelayQueueHandle = SpringUtil.getBean(queueEnum.getBeanId());redisDelayQueueHandle.execute(value);}}} catch (InterruptedException e) {log.error("(Redis延迟队列异常中断) {}", e.getMessage());}}}).start();log.info("(Redis延迟队列启动成功)");}
}
8)测试延迟队列
@RestController
public class RedisDelayQueueController {@Autowiredprivate RedisDelayQueueUtil redisDelayQueueUtil;@PostMapping("/addQueue")public void addQueue() {Map<String, String> map1 = new HashMap<>();map1.put("orderId", "100");map1.put("remark", "订单支付超时,自动取消订单");Map<String, String> map2 = new HashMap<>();map2.put("orderId", "200");map2.put("remark", "订单超时未评价,系统默认好评");// 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());// 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode());}}
第二种:使用redis实现延迟队列
原博文:【DB系列】借助redis来实现延时队列(应用篇) | 一灰灰Blog (hhui.top)
依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- 下面这里两个非必须,主要是后面的实现演示使用到了 -->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId>
</dependency>
配置:
server:port: 8081
spring:redis:port: 6379host: 127.0.0.1database: 1
1)延迟队列操作类
@Component
public class RedisDelayListWrapper implements ApplicationContextAware {/*** 定义删除成功的状态*/private static final Long DELETE_SUCCESS = 1L;/*** 存放topic值*/private Set<String> topic = new CopyOnWriteArraySet<>();@Autowiredprivate StringRedisTemplate redisTemplate;private ApplicationContext applicationContext;/*** 命中数据* @param key* @return*/public String fetchOne(String key) {Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);if (CollectionUtils.isEmpty(sets)) {return null;}for (String set : sets) {if (Objects.equals(DELETE_SUCCESS, redisTemplate.opsForZSet().remove(key, set))) {//删除成功,表示在众多实例中拿到该资源return set;}}return null;}/*** 将任务添加进延迟队列* @param key* @param value* @param delayTime*/public void publish(String key, Object value, long delayTime) {topic.add(key);String strVal = value instanceof String ? (String) value : JSONObject.toJSONString(value);redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime);}/*** 将任务作为一个事件发布到applicationContext容器中,使用时开启时间监听即可*/@Scheduled(fixedRate = 10_000)public void schedule() {for (String specialTopic : topic) {String cell = fetchOne(specialTopic);if (!ObjectUtils.isEmpty(cell)) {applicationContext.publishEvent(new DelayMsg(this,cell,specialTopic));}}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}/*** 封装msg和topic*/@ToStringpublic static class DelayMsg extends ApplicationEvent{@Getterprivate String msg;@Getterprivate String topic;public DelayMsg(Object source,String msg,String topic){super(source);this.msg = msg;this.topic = topic;}}
}
2)自定义监听消费注解
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {String topic();
}
3)aop拦截自定义注解
@Aspect
@Component
public class ConsumerAspect {@Around("@annotation(consumer)")public Object around(ProceedingJoinPoint joinPoint,Consumer consumer) throws Throwable {Object[] args = joinPoint.getArgs();boolean check = false;for (Object arg : args) {if (arg instanceof RedisDelayListWrapper.DelayMsg){check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) arg).getTopic());}}if (!check){//不满足条件,直接忽略return null;}//topic匹配成功,执行return joinPoint.proceed();}
}
4)延迟队列测试接口
@RestController
public class DelayQueueController {private static final String TEST_DELAY_QUEUE = "test";private static final String DEMO_DELAY_QUEUE = "demo";@Autowiredprivate RedisDelayListWrapper redisDelayListWrapper;private Random random = new Random();@GetMapping(path = "publish")public String publish(String msg, Long delayTime) {if (delayTime == null) {delayTime = 10_000L;}String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE;msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime);redisDelayListWrapper.publish(queue, msg, delayTime);System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now());return "success!";}@Consumer(topic = TEST_DELAY_QUEUE)public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) {System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());}@Consumer(topic = DEMO_DELAY_QUEUE)public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) {System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());}
}
5)启动类开启定时任务注解
@SpringBootApplication
@EnableScheduling
public class UserApplication {public static void main(String[] args) {SpringApplication.run(UserApplication.class, args);}}
仅做学习参考