1.需求背景
在推荐页请求热点数据的请求中,热点数据刷新间隔是半小时一次,因为返回热点数据的服务可承受的请求量较小,并且数据在刷新间隔内是不会变化的,没必要所有的用户请求都透传到后续服务(推荐场景一天将近1.2亿次的pv),这会对后台服务要求太高也没必要,所以需要在node服务层做缓存数据,每次用户的请求从缓存中返回并且每30min node服务更新一次数据。
2.缓存数据分类
目前接触到的数据主要分为两大类吧:
- 有边界数据:这种数据是指请求的数据可以一次性全部返回,后续的请求都可以沿用上次请求的数据,类似的有热点数据等;
- 无边界数据:这种是指数据带有特征性质,上个用户的请求不能给下个用户请求用,类似的有用户画像等数据
先附上老大给写的c++伪码吧,我后面也自己封装了个node版的
/*
* LocalCache.h
*
* Created on: 2020.3.16
* Author: bostinshi
*/
#ifndef LocalCache_H_
#define LocalCache_H_
typedef promise::Future<int> CommonRetFuture;
template<typename T>
typedef CommonRetFuture FuncSyncData(const T &); // Func是函数类型;
template<typename T, typename TKey, typename TVal>
class LocalCache {
public:
LocalCache(size_t iCacheSize, size_t iBaseKey,
size_t iShmSize, size_t iDataMaxSize, size_t iDataMinSize,
int iEffectTime, int iReqIntvalTime, bool bSyncGetData,
FuncSyncData<T> fSyncData
) {
m_iEffectTime = iEffectTime;
m_iReqIntvalTime = iReqIntvalTime;
m_bSyncGetData = bSyncGetData;
m_cache.initDataBlockSize(iDataMinSize, iDataMaxSize, 1.0);
m_cache.initStore(iBaseKey, iShmSize);
m_cache.initLock(iBaseKey);
m_cache.setAutoErase(true);
m_cache.setEraseMode(TC_HashMap::ERASEBYGET);
}
void doInterval()
{
fSyncData();
}
CommonRetFuture get(const TKey& stKey, TVal& stVal)
{
promise::Promise<ECommonRetCode> promise;
//REPORT_COUNT("ProfileCache.get.request", 1);
int shmRet = m_cache.get(stKey, stVal);
//MT_RLOG<<"get : "<<stKey.sMkey<<"|shmRet :"<< shmRet << "|iRefreshTime:" << iRefreshTime <<endl;
if (shmRet != TC_HashMap::RT_OK)
{
//LOGDEBUG<< "get fail : " << stKey.sMkey << "|shmRet :" << shmRet <<endl;
//REPORT_COUNT("ProfileCache.get.retfailed_"+taf::TC_Common::tostr(shmRet), 1);
promise.setValue(shmRet);
return promise.getFuture();
}
long lNowMs = TC_TimeProvider::getInstance()->getNowMs();
int iTimeInterval = lNowMs - stVal.iTimeStamp;
if (iTimeInterval > m_iEffectTime) // refresh time
{
if(m_bSyncGetData)
{
return fSyncData(stKey);
}
else
{
if(!stVal.bHasReq || (lNowMs - stVal.iTimeStamp) > m_iReqIntvalTime)
{
stVal.bHasReq = true;
stVal.iReqTimeStamp = lNowMs;
set(stKey, stVal);
fSyncData(stKey);
}
promise.setValue(shmRet);// time out data
return promise.getFuture();
}
}
return shmRet;
}
int set(const TKey& stKey, TVal& stVal)
{
int shmRet = m_cache.set(stKey, stVal);
if (shmRet != TC_HashMap::RT_OK)
{
//MT_RELOG<<"set fail : "<<stKey.sMkey<<"|shmRet :"<< shmRet <<endl;
//REPORT_COUNT("ProfileCache.set.retfailed_"+taf::TC_Common::tostr(shmRet), 1);
}
//MT_RLOG<<"set : "<<stKey.sMkey<<"|shmRet :"<< shmRet <<endl;
return shmRet;
}
private:
int m_iEffectTime;
int m_iReqIntvalTime;
bool m_bSyncGetData;
FuncSyncData<T> fSyncData;
T m_cache;
};
#endif /* LocalCache_H_ */
3.实现代码
组件主要实现了3个基本方法,get、set和getCacheKeys,主要的逻辑就是始终保持catch里面有数据,就会保证数据每次都是从node缓存里取回,不会对后续服务造成缓存穿透和太大响应压力;无论是打底的数据还是上次取回来的缓存,或者是上上次取回来的过期的缓存,在get函数返回的数据中,都会用code这个字段区分当前数据的性质:100->打底数据 200->正常缓存数据 300->过期缓存数据。
export const Cache : any = {};
Cache.global = {}
Cache.cacheObj = {}
/**
* 获取对应key值的缓存
* @param key 缓存对应的key值
*/
Cache.get = (key) => {
if (Object.keys(Cache.cacheObj).indexOf(key) > -1) {
return Cache.cacheObj[key]
} else {
throw `暂无当前key值为:${key}的缓存`
}
}
/**
* 将取回来的数据存进缓存
* 传回去的数据结构{code: xxx, data: 取回的数据}
* 状态码:100->打底数据 200->正常缓存数据 300->过期缓存数据
* @param asyfun 异步请求数据的func,需要返回data或者error
* @param normal_interval 正常情况下向后台取数据的间隔
* @param error_interval 缓存穿透后调整的取数据间隔
* @param key 缓存对应的key值
* @param backData 打底的数据 {code: 100, data: 打底数据}
*/
Cache.set = async function (asyfun, normal_interval, error_interval, key, backData) {
for (let cacheKey in Cache.cacheObj) {
if (cacheKey === key) {
Monitor.report('cache key duplicate', `key值重复,重复的key值:${key}`);
throw `key值重复,当前已有key值:${Object.keys(Cache.cacheObj)}`
}
}
Cache.global[key + 'timer'] = null
async function intervalFunc() {
try {
let res = await asyfun()
Cache.cacheObj[key] = {code: 200, data: res}
} catch (e) {
console.log('cache error', e)
if (Cache.cacheObj[key].data && Cache.cacheObj[key].code && Cache.cacheObj[key].code === 200) { // 缓存里是上次的缓存,修改状态码是300
Cache.cacheObj[key].code = 300
} else if (Cache.cacheObj[key].data && Cache.cacheObj[key].code && Cache.cacheObj[key].code === 300) { // 缓存里是很久的缓存,修改状态码是300
Cache.cacheObj[key].code = 300
} else {
Cache.cacheObj[key] = {code: 100, data: backData}
}
}
}
try {
let res = await asyfun()
Cache.cacheObj[key] = {code: 200, data: res}
if (Cache.global[key + 'timer']) {
clearInterval(Cache.global[key + 'timer'])
}
Cache.global[key + 'timer'] = setInterval(intervalFunc, normal_interval)
} catch (e) { // 取数据操作异常
Monitor.report('cache get error', e);
if (Cache.cacheObj[key] && Cache.cacheObj[key].data && Cache.cacheObj[key].code && Cache.cacheObj[key].code === 200) { // 缓存里是上次的缓存,修改状态码是300
Cache.cacheObj[key].code = 300
} else if (Cache.cacheObj[key] && Cache.cacheObj[key].data && Cache.cacheObj[key].code && Cache.cacheObj[key].code === 300) { // 缓存里是很久的缓存,修改状态码是300
Cache.cacheObj[key].code = 300
} else {
Cache.cacheObj[key] = {code: 100, data: backData}
}
if (Cache.global[key + 'timer']) {
clearInterval(Cache.global[key + 'timer'])
}
Cache.global[key + 'timer'] = setInterval(intervalFunc, error_interval)
}
}
/**
* 获取当前已有缓存的key值
*/
Cache.getCacheKeys = () => {
return Object.keys(Cache.cacheObj)
}