在日常开发中,由于redis是内存型db,成本会比较高,只有在要求实时性数据或者对查询响应要求很高的情况下才会选用。 像实时变动的排行榜这类数据,我就会选用redis中的zset数据结构,可以根据score值去进行排序。 再者,可以将redis当成一个消息队列,发布订阅这种方式来消耗队列中的任务。
接下来我就详细说明下我的消息队列的实现。 由于消息队列中不止存在一种消息,所以就根据topic字段来区分消息类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Message {
/**
* 消息主题
*/
private String topic;
/**
* 消息内容
*/
private String data;
/**
* 消息延时时间(ms) 为0 立即发送,
*/
private long delay;
}
有了消息体后自然需要一个负责订阅和发布的对象
1
2
3
4
5
6
7
public interface Messager {
void register(MessageSubscriber subscriber, String topic);
void publish(Message message);
}
具体看下发布订阅方法实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Component
public class RedisMessager implements Messager, InitializingBean {
private Logger logger = LoggerFactory.getLogger(RedisMessager.class);
/**
* 存储消息和消息的订阅者
*/
private Map<String, Set<MessageSubscriber>> subscribes = Maps.newHashMap();
@Resource
private MessageAsyncProcess messageAsyncProcess;
@Resource
private RedisManager redisManager;
public Thread daemon = new Thread("RedisMessager_Thread") {
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);
int batch = 500;
long maxScore = Instant.now().toEpochMilli();
List<String> messages = redisManager.findAndRem(RedisKey.QUEUE.getKey(), maxScore, 0, batch);
if (CollectionUtils.isEmpty(messages)) {
continue;
}
while (CollectionUtils.isNotEmpty(messages)) {
for (String message : messages) {
Message msg = JSONUtils.stringToObjectNonEx(message, Message.class);
if (msg != null) {
messageAsyncProcess.processMessage(subscribes.get(msg.getTopic()), msg.getData());
}
}
messages = redisManager.findAndRem(RedisKey.QUEUE.getKey(), maxScore, 0, batch);
}
} catch (Exception e) {
logger.error("QueueThread run exception", e);
}
}
}
};
@Override
public void register(MessageSubscriber subscriber, String topic) {
Set<MessageSubscriber> subscribers = subscribes.get(topic);
if (subscribers == null) {
subscribers = Sets.newHashSet();
subscribes.put(topic, subscribers);
}
subscribers.add(subscriber);
}
@Override
public void publish(Message message) {
if (message.getDelay() == 0) {
messageAsyncProcess.processMessage(subscribes.get(message.getTopic()), message.getData());
} else {
long now = Instant.now().getEpochSecond();
redisManager.zAdd(RedisKey.QUEUE.getKey(), now, message);//延时任务
}
}
@Override
public void afterPropertiesSet() throws Exception {
daemon.setDaemon(true);
daemon.start();
}
}
之后就是实现消息订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class TestSubcriber implements MessageSubscriber, InitializingBean {
private Logger logger = LoggerFactory.getLogger(TestSubcriber.class);
@Resource
private RedisMessager redisMessager;
@Override
public void receive(String content) {
if (isBlank(content)) {
return;
}
System.out.println(content);
}
@Override
public void afterPropertiesSet() throws Exception {
redisMessager.register(this, "test");
}
}
在业务中需要部分直接发布消息即可。