Redis实现消息队列及延迟队列

一、介绍

在选择消息中间件的问题上,我们有很多解决方案,具体选择哪一种还是要根据实际的情况来进行确认。

如果直接有成熟的第三方消息中间件,能用就直接用,如rabbitMqkafka等。

再如果,推送的消息比较简单,又恰好有个redis,那么就选择redis吧。

下面,将进行介绍,如果使用redis作为消息队列,我们该如何编写这段程序。

二、消息队列

前置工作,本次使用的工程框架直接是springBoot,其他maven依赖就不贴出来了,主要是要有这个redis的依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

有了依赖,记得在application.yml配置文件中加入对应redis的配置信息

1
2
3
4
5
spring:
redis:
database: 0
host: localhost
port: 6379

还有一件事,redisTemplate的这个bean我们要进行润色一下,虽然用自带的也行,但作为一个强迫症,我还是希望我写入的keyredis中的key一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.banmoon.test.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

}

好的准备工作完成,先来看生产者

1)生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.banmoon.test.queue.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class RedisTestProducer {

public static final String REDIS_TEST_KEY = "test:queue";

@Autowired
private RedisTemplate redisTemplate;

public long push (String... params) {
Long l = redisTemplate.opsForList().rightPushAll(REDIS_TEST_KEY, params);
return l;
}

}

生产者很简单,就是向redislist中推送数据

主要在于消费者,该如何获取到其中的消息

2)消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.banmoon.test.queue.consumer;

import cn.hutool.core.util.StrUtil;
import com.banmoon.test.queue.producer.RedisTestProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisTestConsumer {

@Autowired
private RedisTemplate redisTemplate;

@PostConstruct
public void pop() {
new Thread(() -> {
while (true) {
try {
// 阻塞取出队首
String params = (String) redisTemplate.opsForList().leftPop(RedisTestProducer.REDIS_TEST_KEY, 10, TimeUnit.SECONDS);
if (StrUtil.isNotBlank(params))
log.info("模拟消费消息:{}", params);
// 避免高频轮循,添加休眠
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
// 不做任何处理,切记不要因为异常导致了消费线程的退出
}
}
}, RedisTestProducer.REDIS_TEST_KEY).start();
}

}

上述就是消费者,其中注意几点

  • 这里服务启动时,用到了bean初始化的一个方法,大家也可以使用静态代码块,只要让这个消费线程启动就行

  • 线程启动,切记不要让异常导致了线程的退出。因为这样就没有消费者了,要时刻保证消费者的在线

  • 在取出队首的消息时,用到了阻塞机制。当没有获取到消息,该线程会进行阻塞,直到有消息入队或者阻塞超时,才会返回消息。避免死循环带来了cpu高载荷

3)测试

启动该springBoot项目,同时执行下面这段测试代码,调用三次生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.banmoon.test;

import com.banmoon.test.queue.producer.RedisTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ServiceTest {

@Autowired
private RedisTestProducer redisTestProducer;

@Test
void insertTest() {
redisTestProducer.push("a", "b", "c");
}

}

查看springBoot项目的控制台,消费者有进行消费

image-20220517140449171

三、延迟队列

延迟队列的应用场景还是比较多见的,比如

  • 用户下单后,此订单超30分钟后取消

  • 用户订阅,指定时间推送订阅消息事件

很多类似的业务场景,我们不再依赖定时,使用消息中间件就可以完成这类功能。

redis实现延迟队列之前,我有必要说一下setzset,主要是这个zset

set大家都很熟悉,与list不同,set是无序且内部元素不重复。

那么zset呢,它结合了setlist的特点

  • 集合内元素不会重复

  • 元素以有序的方式排列

zset中的元素都会关联一个分数score,内部将通过这个score对集合元素进行的排序。

虽然zset集合中元素不会重复,但score可以重复。如果有两个score相同的元素,将按照元素的字典序进行排序。

1)生产者

上面描述了这么多,我们该如何使用,先看生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.banmoon.test.queue.producer;

import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class RedisTestDelayProducer {

public static final String REDIS_DELAY_TEST_KEY = "test:delay:queue";

@Autowired
private RedisTemplate redisTemplate;

public Boolean push (String params, int offset, DateField dateField) {
long score = DateUtil.offset(new Date(), dateField, offset).getTime();
Boolean b = redisTemplate.opsForZSet().addIfAbsent(REDIS_DELAY_TEST_KEY, params, score);
log.info("生产消息:{},推送是否成功:{}", params, b);
return b;
}

}

可以看到,这边使用将消费时间点的时间戳,作为了score,生产的消息

2)消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.banmoon.test.queue.consumer;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.test.queue.producer.RedisTestDelayProducer;
import com.banmoon.test.queue.producer.RedisTestProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedisTestDelayConsumer {

@Autowired
private RedisTemplate redisTemplate;

@PostConstruct
public void pop() {
new Thread(() -> {
while (true) {
try {
// 查看范围中的消息
Set<Object> set = redisTemplate.opsForZSet().rangeByScore(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, 0, new Date().getTime(), 0, 1);
// 判断是否为空
if (CollUtil.isNotEmpty(set)) {
String params = (String) set.iterator().next();
// 删除范围中的消息
Long success = redisTemplate.opsForZSet().remove(RedisTestDelayProducer.REDIS_DELAY_TEST_KEY, params);
if (success > 0) {
log.info("模拟消费消息:{}", params);
}
} else {
// 避免高频轮循,添加休眠
TimeUnit.MILLISECONDS.sleep(1000);
}
} catch (InterruptedException e) {
// 不做任何处理,切记不要因为异常导致了消费线程的退出
}
}
}, RedisTestDelayProducer.REDIS_DELAY_TEST_KEY).start();
}

}

消费的逻辑,基本就是,取出当前时间点,要执行的消息。

score保证了队列中的消息有序性,且作为时间戳,所以可以完成延迟队列的对应功能。

注意事项和上面的普通队列差不多,简单注意一下就好。

3)测试

启动该springBoot项目,同时执行下面这段测试代码,调用三次生产者,分别在10秒后,30秒后,1分钟后进行消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.banmoon.test;

import cn.hutool.core.date.DateField;
import com.banmoon.test.queue.producer.RedisTestDelayProducer;
import com.banmoon.test.queue.producer.RedisTestProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ServiceTest {

@Autowired
private RedisTestDelayProducer redisTestDelayProducer;

@Test
void insertTest() {
redisTestDelayProducer.push("a", 10, DateField.SECOND);
redisTestDelayProducer.push("b", 30, DateField.SECOND);
redisTestDelayProducer.push("c", 1, DateField.MINUTE);
}

}

查看springBoot项目的控制台,注意查看消费者打印的日志,主要看看三条日志的时间间隔

image-20220517171656067

四、最后

我还要讲一下,redis作为消息队列的优缺点

  • 优点

    • 使用相对简单
    • 不用专门维护专业的消息中间件,降低服务和运维成本
  • 缺点

    • 没有ack,消息确认机制,存在消息丢失的可能
    • 死循环进行监听队列,消息队列一多,所需要的线程资源也会增多,服务器的负担会增大

所以,如果是简单的日志推送,消息推送等,可以使用redis队列。相反,如果对消息的可靠性有很大的要求,建议还是不要使用redis作为消息中间件了。

我是半月,祝你幸福!!!