Redis实现消息队列及延迟队列
一、介绍
在选择消息中间件的问题上,我们有很多解决方案,具体选择哪一种还是要根据实际的情况来进行确认。
如果直接有成熟的第三方消息中间件,能用就直接用,如rabbitMq
、kafka
等。
再如果,推送的消息比较简单,又恰好有个redis
,那么就选择redis
吧。
下面,将进行介绍,如果使用redis
作为消息队列,我们该如何编写这段程序。
二、消息队列
前置工作,本次使用的工程框架直接是springBoot
,其他maven
依赖就不贴出来了,主要是要有这个redis
的依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
|
有了依赖,记得在application.yml
配置文件中加入对应redis
的配置信息
1 2 3 4 5
| spring: redis: database: 0 host: localhost port: 6379
|
还有一件事,redisTemplate
的这个bean
我们要进行润色一下,虽然用自带的也行,但作为一个强迫症,我还是希望我写入的key
和redis
中的key
一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.banmoon.test.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration public class RedisConfig {
@Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; }
}
|
好的准备工作完成,先来看生产者
1)生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.banmoon.test.queue.producer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;
@Component public class RedisTestProducer {
public static final String REDIS_TEST_KEY = "test:queue";
@Autowired private RedisTemplate redisTemplate;
public long push (String... params) { Long l = redisTemplate.opsForList().rightPushAll(REDIS_TEST_KEY, params); return l; }
}
|
生产者很简单,就是向redis
的list
中推送数据
主要在于消费者,该如何获取到其中的消息
2)消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.banmoon.test.queue.consumer;
import cn.hutool.core.util.StrUtil; import com.banmoon.test.queue.producer.RedisTestProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import java.util.concurrent.TimeUnit;
@Slf4j @Component public class RedisTestConsumer {
@Autowired private RedisTemplate redisTemplate;
@PostConstruct public void pop() { new Thread(() -> { while (true) { try { String params = (String) redisTemplate.opsForList().leftPop(RedisTestProducer.REDIS_TEST_KEY, 10, TimeUnit.SECONDS); if (StrUtil.isNotBlank(params)) log.info("模拟消费消息:{}", params); TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { } } }, RedisTestProducer.REDIS_TEST_KEY).start(); }
}
|
上述就是消费者,其中注意几点
-
这里服务启动时,用到了bean
初始化的一个方法,大家也可以使用静态代码块,只要让这个消费线程启动就行
-
线程启动,切记不要让异常导致了线程的退出。因为这样就没有消费者了,要时刻保证消费者的在线
-
在取出队首的消息时,用到了阻塞机制。当没有获取到消息,该线程会进行阻塞,直到有消息入队或者阻塞超时,才会返回消息。避免死循环带来了cpu
高载荷
3)测试
启动该springBoot
项目,同时执行下面这段测试代码,调用三次生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package com.banmoon.test;
import com.banmoon.test.queue.producer.RedisTestProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class ServiceTest {
@Autowired private RedisTestProducer redisTestProducer;
@Test void insertTest() { redisTestProducer.push("a", "b", "c"); }
}
|
查看springBoot
项目的控制台,消费者有进行消费
三、延迟队列
延迟队列的应用场景还是比较多见的,比如
-
用户下单后,此订单超30分钟后取消
-
用户订阅,指定时间推送订阅消息事件
很多类似的业务场景,我们不再依赖定时,使用消息中间件就可以完成这类功能。
在redis
实现延迟队列之前,我有必要说一下set
和zset
,主要是这个zset
set
大家都很熟悉,与list
不同,set
是无序且内部元素不重复。
那么zset
呢,它结合了set
和list
的特点
zset
中的元素都会关联一个分数score
,内部将通过这个score
对集合元素进行的排序。
虽然zset
集合中元素不会重复,但score
可以重复。如果有两个score
相同的元素,将按照元素的字典序进行排序。
1)生产者
上面描述了这么多,我们该如何使用,先看生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package com.banmoon.test.queue.producer;
import cn.hutool.core.date.DateField; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j @Component public class RedisTestDelayProducer {
public static final String REDIS_DELAY_TEST_KEY = "test:delay:queue";
@Autowired private RedisTemplate redisTemplate;
public Boolean push (String params, int offset, DateField dateField) { long score = DateUtil.offset(new Date(), dateField, offset).getTime(); Boolean b = redisTemplate.opsForZSet().addIfAbsent(REDIS_DELAY_TEST_KEY, params, score); log.info("生产消息:{},推送是否成功:{}", params, b); return b; }
}
|
可以看到,这边使用将消费时间点的时间戳,作为了score
,生产的消息
2)消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.banmoon.test.queue.consumer;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.StrUtil; import com.banmoon.test.queue.producer.RedisTestDelayProducer; import com.banmoon.test.queue.producer.RedisTestProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit;
@Slf4j @Component public class RedisTestDelayConsumer {
@Autowired private RedisTemplate redisTemplate;
@PostConstruct public void pop() { new Thread(() -> { while (true) { try { Set<Object> set = redisTemplate.opsForZSet().rangeByScore(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, 0, new Date().getTime(), 0, 1); if (CollUtil.isNotEmpty(set)) { String params = (String) set.iterator().next(); Long success = redisTemplate.opsForZSet().remove(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, params); if (success > 0) { log.info("模拟消费消息:{}", params); } } else { TimeUnit.MILLISECONDS.sleep(1000); } } catch (InterruptedException e) { } } }, RedisTestDelayProducer.REDIS_DELAY_TEST_KEY).start(); }
}
|
消费的逻辑,基本就是,取出当前时间点,要执行的消息。
score
保证了队列中的消息有序性,且作为时间戳,所以可以完成延迟队列的对应功能。
注意事项和上面的普通队列差不多,简单注意一下就好。
3)测试
启动该springBoot
项目,同时执行下面这段测试代码,调用三次生产者,分别在10秒后,30秒后,1分钟后进行消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.banmoon.test;
import cn.hutool.core.date.DateField; import com.banmoon.test.queue.producer.RedisTestDelayProducer; import com.banmoon.test.queue.producer.RedisTestProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class ServiceTest {
@Autowired private RedisTestDelayProducer redisTestDelayProducer;
@Test void insertTest() { redisTestDelayProducer.push("a", 10, DateField.SECOND); redisTestDelayProducer.push("b", 30, DateField.SECOND); redisTestDelayProducer.push("c", 1, DateField.MINUTE); }
}
|
查看springBoot
项目的控制台,注意查看消费者打印的日志,主要看看三条日志的时间间隔
四、最后
我还要讲一下,redis
作为消息队列的优缺点
-
优点
- 使用相对简单
- 不用专门维护专业的消息中间件,降低服务和运维成本
-
缺点
- 没有
ack
,消息确认机制,存在消息丢失的可能
- 死循环进行监听队列,消息队列一多,所需要的线程资源也会增多,服务器的负担会增大
所以,如果是简单的日志推送,消息推送等,可以使用redis
队列。相反,如果对消息的可靠性有很大的要求,建议还是不要使用redis
作为消息中间件了。
我是半月,祝你幸福!!!