Redis发布订阅机制

在开发进程缓存系统的时候,在更新缓存值的时候需要有一个通知机制。之前使用的是spring的监听机制,在涉及更新操作后,通过ApplicationContext发布ApplicationEvent,由@EventListener监听事件并更新缓存。但是spring监听机制只能在单机使用,涉及到集群部署,只能更新当前机器的缓存。如果使用消息队列又太重、太繁琐,后来发现redis本身提供订阅发布机制,可有效解决此类问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* redis发布-订阅机制中的订阅者,相当于监听器
*/
@Service
public class RedisTopicSubscriber {

private Logger logger = LoggerFactory.getLogger(getClass());

@Resource
private RedisMessageListenerContainer redisMessageListenerContainer;

private MessageListenerAdapter adapter;

@Resource
private CacheDao cacheDao;

@PostConstruct
private void init() {
adapter = new MessageListenerAdapter(this);
adapter.afterPropertiesSet();

redisMessageListenerContainer.addMessageListener(adapter, new ChannelTopic(RedisConstant.TOPIC));
}

/**
* 处理监听到的消息
*
* @param channel
* @param message
*/
public <T extends EntityHasNCM> void handleMessage(String message, String channel) {
logger.info("接收到redis订阅消息,渠道{},信息{}", channel, message);

RedisEvent redisEvent = JSON.parseObject(message, RedisEvent.class);
JSONObject enitity = (JSONObject)redisEvent.getItem();
Object item = JSONObject.parseObject(enitity.toJSONString(), redisEvent.gettClass());

switch (redisEvent.getType()) {
case 0:
cacheDao.updateCache((T)item);
break;
case 1:
cacheDao.updateCache((T)item);
break;
case 2:
cacheDao.updateCache((T)item);
break;
case 3:
cacheDao.deleteCache((T)item);
break;
default:
return;
}
}

RedisMessageListenerContainer为redis监听者容器,redis中的监听者在启动时注册到容器中,MessageListenerAdapter为redis监听者适配,可指定适配对象与适配方法,默认是handleMessage方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* redis发布-订阅中的发布者
*/
@Service
public class RedisTopicPublisher {

private Logger logger = LoggerFactory.getLogger(getClass());

@Resource
private RedisUtil redisUtil;

public <T extends EntityHasNCM> void publishSearchEvent(T item, String message) {
logger.info("发布查询事件开始:" + message);

publishRedisEvent(JSON.toJSONString(new CacheSearchEvent<>(this, item, message)));

logger.info("发布查询事件结束:" + message);
}

public <T extends EntityHasNCM> void publishInsertEvent(T item, String message) {
logger.info("发布插入事件开始:" + message);

publishRedisEvent(JSON.toJSONString(new CacheInsertEvent<>(this, item, message)));

logger.info("发布插入事件结束:" + message);
}

public <T extends EntityHasNCM> void publishUpdateEvent(T item, String message) {
logger.info("发布更新事件开始:" + message);

publishRedisEvent(JSON.toJSONString(new CacheUpdateEvent<>(this, item, message)));

logger.info("发布更新事件结束:" + message);
}

public <T extends EntityHasNCM> void publishDeleteEvent(T item, String message) {
logger.info("发布删除事件开始:" + message);

publishRedisEvent(JSON.toJSONString(new CacheDeleteEvent<>(this, item, message)));

logger.info("发布删除事件结束:" + message);
}

private <T extends EntityHasNCM> void publishRedisEvent(String message) {
redisUtil.call(jedis -> jedis.publish(RedisConstant.TOPIC, message));
}
}