redis发布订阅

Posted by Clear Blog on September 7, 2017

在日常开发中,由于redis是内存型db,成本会比较高,只有在要求实时性数据或者对查询响应要求很高的情况下才会选用。 像实时变动的排行榜这类数据,我就会选用redis中的zset数据结构,可以根据score值去进行排序。 再者,可以将redis当成一个消息队列,发布订阅这种方式来消耗队列中的任务。

接下来我就详细说明下我的消息队列的实现。 由于消息队列中不止存在一种消息,所以就根据topic字段来区分消息类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Message {
    /**
     * 消息主题
     */
    private String topic;

    /**
     * 消息内容
     */
    private String data;

    /**
     * 消息延时时间(ms) 为0 立即发送,
     */
    private long delay;
}

有了消息体后自然需要一个负责订阅和发布的对象

1
2
3
4
5
6
7
public interface Messager {

    void register(MessageSubscriber subscriber, String topic);

    void publish(Message message);

}

具体看下发布订阅方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Component
public class RedisMessager implements Messager, InitializingBean {
    private Logger logger = LoggerFactory.getLogger(RedisMessager.class);
    /**
     * 存储消息和消息的订阅者
     */
    private Map<String, Set<MessageSubscriber>> subscribes = Maps.newHashMap();
    @Resource
    private MessageAsyncProcess messageAsyncProcess;
    @Resource
    private RedisManager redisManager;

    public Thread daemon = new Thread("RedisMessager_Thread") {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(500);

                    int batch = 500;
                    long maxScore = Instant.now().toEpochMilli();
                    List<String> messages = redisManager.findAndRem(RedisKey.QUEUE.getKey(), maxScore, 0, batch);
                    if (CollectionUtils.isEmpty(messages)) {
                        continue;
                    }

                    while (CollectionUtils.isNotEmpty(messages)) {
                        for (String message : messages) {
                            Message msg = JSONUtils.stringToObjectNonEx(message, Message.class);
                            if (msg != null) {
                                messageAsyncProcess.processMessage(subscribes.get(msg.getTopic()), msg.getData());
                            }
                        }
                        messages = redisManager.findAndRem(RedisKey.QUEUE.getKey(), maxScore, 0, batch);
                    }

                } catch (Exception e) {
                    logger.error("QueueThread run exception", e);
                }
            }
        }
    };

    @Override
    public void register(MessageSubscriber subscriber, String topic) {
        Set<MessageSubscriber> subscribers = subscribes.get(topic);
        if (subscribers == null) {
            subscribers = Sets.newHashSet();
            subscribes.put(topic, subscribers);
        }
        subscribers.add(subscriber);
    }

    @Override
    public void publish(Message message) {
        if (message.getDelay() == 0) {
            messageAsyncProcess.processMessage(subscribes.get(message.getTopic()), message.getData());
        } else {
            long now = Instant.now().getEpochSecond();
            redisManager.zAdd(RedisKey.QUEUE.getKey(), now, message);//延时任务
        }

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        daemon.setDaemon(true);
        daemon.start();
    }
}

之后就是实现消息订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class TestSubcriber implements MessageSubscriber, InitializingBean {
    private Logger logger = LoggerFactory.getLogger(TestSubcriber.class);
    @Resource
    private RedisMessager redisMessager;

    @Override
    public void receive(String content) {
        if (isBlank(content)) {
            return;
        }

        System.out.println(content);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        redisMessager.register(this, "test");
    }
}

在业务中需要部分直接发布消息即可。