千寻

道路很长, 开始了就别停下!

0%

案例-公众号增量消息同步改造

背景

理财社区之前用的是开源的PHP discuz框架,私信也是沿用自带的那套,每次推送活动消息时,会为所有用户插入一条记录,一次就是几百万条记录到表中,db压力很大,而且运营反馈每次任务都要发送很长时间,毕竟是要insert这么多记录,慢也是情理之中。

优化过程:

第一次优化:

当时写代码的同学已线离职,大致过程如下:

采用异步拉取的方式,当用户打开消息列表页时,正常加载数据库数据,同时创建一个异步任务,扔到一个BlockQueue中(会控制长度),同时启动10个消费线程,对任务消费,为当前用户同步最新的公众号增量消息。
为了控制频率,15分钟同步一次

缺点:

1.不实时,用户主动打开消息通知页面,并不会马上获取最新消息,而只是触发同步任务,需要第二次打开页面才能看到最新的消息。体验不好

第二次优化:

公众号消息采用同步拉取的方式,大致过程:

  1. 运营发送全量活动消息,入db表,同时扔到最新的消息cache列表

  2. 当社区用户打开消息通知页时,会查询上一次的同步时间,并从cache列表中加载最新的公众号增量消息,并修改会话列表&提醒数,异步线程将增量消息插入私信分表中。

  3. 按修改时间分页查询会话列表

1
2
3
补充:
用户第一次打开消息通知页面,就能获取最新消息,基本上可以满足用户需求,体验也不错。
但产品又提出新的要求,希望在首页可以看到消息数量提醒(如果有新的消息时),刺激用户点击。原来的逻辑是打开通知页时才会同步,现在触发条件又要升级,今天是推荐首页,明天可能是其它页面,有没有会什么通用的方式来收集所有的触发点。

第三次优化:

这次是对第二次优化的一个补充,扩展增量消息的触发来源!

1.定义并收集“活跃用户”。

改造后的系统(php->java),每多业务功能都要取当前用户信息,因此就涉及token验证。因此将这样的用户定义为“活跃用户”。token校验成功后,发kafka消息。

2.编写kafka消费任务,触发同步操作

3.由于运营并不会经常用公众号推送消息,所以设定一个间隔时间(距上一次同步15分钟后)才扫描一次。

1
2
绝大部分的扫描都是空处理,设定这个时间间隔就显得很重要,如果太小,扫描频率比较高,系统压力大,实时性会好。反之,扫描频率低,系统压力小,实时性会差一些。
经验先设定15分钟,后面根据情况再调整。

4.考虑并发,看过kafka消息日志,确实挺多,为什么?因为一个页面,比如详情页,会同时请求多个api接口。

并发控制是借助于redis来实现的。

加锁—(成功)—处理业务—释放锁

加锁—(失败)—方法结束,不做任何事情。

其中:加锁和解锁是一对操作,如果因为系统的一些未知异常,导致解锁失败,那么后面可能再也无法拿到锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean isLock(Long uid) {
try {
long value = bbsRedisClient.incr(KEY_BBS_PUBLIC_SYNC_LOCK + uid);
if (value == 1) return true;
if (value >= 14) {
// 补偿机制,防止解锁失败时,后面永远无法再拿到锁
releaseLock(uid);
}
} catch (Exception e) {
logger.error("[MessageCacheManager.isLock] invoke error! uid={}", uid);
}
return false;

}
加锁逻辑,里面加了补偿机制,可能会比较猥琐

`
1
2
3
4
5
6
7
8
9
public void releaseLock(Long uid) {
try {
bbsRedisClient.del(KEY_BBS_PUBLIC_SYNC_LOCK + uid);
} catch (Exception e) {
logger.error("[MessageCacheManager.releaseLock] invoke error! uid={}", uid);
}
}

// 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public void syncPublicMessage(Long uid, boolean isAutoCheck) {
if (uid == null) return;
try {
Long currentTime = System.currentTimeMillis() / 1000;// 当前时间
Long lastSyncTime = messageCacheManager.getSyncTime(uid); // 上一次同步时间
if (lastSyncTime == null) {
lastSyncTime = 0L;
}

// 如果自动同步检测,15分钟同步一次
if (isAutoCheck) {
if (lastSyncTime + 900 > currentTime) {
return;
}
}

// 加锁成功,处理业务
if (messageCacheManager.isLock(uid)) {
handleSyncPublicMessage(uid, lastSyncTime, currentTime);
messageCacheManager.releaseLock(uid);
}
} catch (Exception e) {
messageCacheManager.releaseLock(uid);
}
}

小提醒:

  • 1.原来逻辑是先加锁–加锁成功–15分钟的频率检测–(根据需要,可能会执行业务逻辑)—再释放锁。这种写法其浪费很多,每次任务执行都要执行加锁、解锁,严重浪费系统开销。
  • 2.早上上班时,在路上推敲这个过程时,觉的可以将顺序颠倒一下
    先15分钟的频率检测—加锁—执行业务逻辑—释放锁
    相信第一步的频率检测就会拦截掉很多请求,也将没有必要抢锁了