websoket
心跳机制建立 和 轮训垫片
powershell
import Ajax from "@/common/http";
import api, { ENV } from '@/common/api/apimix';
class MessMonitor {
constructor(connectUrl, userInfo) {
this.connectUrl = connectUrl; // ws链接地址
this.userInfo = userInfo; // 用户数据
this.ws = this.initWebsocket(); // ws对象
this.textMsgPool = []; // 文本消息池
this.audioMsgPool = []; // 语音消息池
this.subscribers = []; // 订阅消息池的回调函数
this.checkTimer = 5000; // 轮询输出消息池的间隔毫秒数
this.delayRetry = 0; // 链接断开后步进重连的延迟时间
this.wsKey = null; // 链接的wsKey,在open的时候new一个时间戳生成
this.delayUnit = 30 * 1000; // 每次重连失败后的步进值单位
this.lastMsg = null; // WS降级后,轮询每次将getMsg中createTime最大的缓存,用于下次对比
this.underDegrade = false; // 是否开始降级
// 每5S去取一次消息池中的消息,并且调用订阅者回调
setInterval(() => {
this.outPutMsg();
}, this.checkTimer);
}
// 初始化所有内部数据
initParam() {
this.textMsgPool = []; // 文本消息池
this.audioMsgPool = []; // 语音消息池
this.ws = this.initWebsocket();
this.wsKey = null;
}
// 建立websocket链接
initWebsocket() {
if (window.WebSocket) {
const ws = new WebSocket(this.connectUrl);
ws.onopen = (res) => {
// 通知后端已经建立成功
this.confirmReceived({ ws, isInit: true });
// 如果在降级中停止降级
if (this.underDegrade) {
this.stopDegrade();
}
// 开启心跳链接
this.startHeartbeat();
// 停止尝试重连,并重置步进值
if (this.delayRetryTimer) {
clearTimeout(this.delayRetryTimer);
this.delayRetry = 0;
}
};
this.bindWebsocket(ws);
return ws;
}
console.error("当前浏览器不支持websocket");
return {};
}
// 收到消息后后发送ack包通知后端收到成功
confirmReceived({
ws, pushType, msgDataId, isInit,
}) {
console.log(this.wsKey, "..........");
if (ws.readyState === 1) {
ws.send(
JSON.stringify({
wsType: isInit ? 2 : 3,
userId: this.userInfo.userId,
tenantId: this.userInfo.tenantId,
wsKey: this.wsKey ? this.wsKey : (this.wsKey = Date.now()),
pushType,
msgDataId,
}),
);
}
}
// 绑定websocket各生命周期事件
bindWebsocket(ws) {
// wsType : 1-心跳请求;2-新用户上线;3-ack请求;4-后台推送;
if (window.WebSocket) {
// 后端推送消息触发
ws.onmessage = (res) => {
if (res && res.data) {
const {
pushType, title, actionUrl, voiceUrl, wsType, msgDataId, importantStatus, groupId,
} = JSON.parse(res.data);
if (wsType === 4) {
this.confirmReceived({
ws, msgDataId, pushType, isInit: false,
});
if (this.heartTimer) {
clearInterval(this.heartTimer);
this.startHeartbeat();
}
}
const textMsgItem = {
title, actionUrl, importantStatus, groupId,
};
switch (pushType) {
// 1-文本推送;2-语音推送;3-文本和语音推送
case 1:
this.addMsg("text", textMsgItem);
break;
case 2:
this.addMsg("audio", voiceUrl);
break;
case 3:
this.addMsg("text", textMsgItem);
this.addMsg("audio", voiceUrl);
break;
default:
break;
}
}
};
// 链接关闭的时候触发
ws.onclose = (e) => {
if (this.heartTimer) {
clearInterval(this.heartTimer);
}
this.reConnect();
};
// 链接建立错误的时候触发
ws.onerror = (err) => {};
} else {
console.error("该浏览器暂不支持websocket!");
}
}
// 心跳包开始
startHeartbeat() {
this.heartTimer = setInterval(() => {
this.heartRequest();
}, 1 * 60 * 1000);
}
// 心跳请求
heartRequest() {
if (this.ws.readyState === 1) {
this.ws.send(
JSON.stringify({
wsType: 1,
wsKey: this.wsKey,
userId: this.userInfo.userId,
tenantId: this.userInfo.tenantId,
}),
);
}
}
// 断开重连
reConnect() {
this.delayRetryTimer = setTimeout(() => {
this.initParam();
this.delayRetry += this.delayUnit;
}, this.delayRetry);
// 当重连失败3次后,进入到降级方案,采用轮询
if (this.delayRetry === this.delayUnit * 2) {
this.startDegrade();
}
}
/**
*
* 开始降级方案,轮询逻辑
* @memberof MessMonitor
*/
startDegrade() {
console.warn("开始降级!");
this.underDegrade = true;
this.getMsg();
}
getMsg() {
Ajax.post("getMsg", { requestType: 2 })
.then((res) => {
if (res && res.code === "0000") {
const { ordinaryDatas, voiceUrls } = res.data;
const combineData = [...ordinaryDatas];
// 如果this.lastMsg不为空的话,说明已经进入了轮询降级,需要判断新的getMsg中的消息是否有晚于上次缓存的
if (this.lastMsg) {
// 将本次取到的数据和缓存的最新的消息对比,筛选出创建时间大于缓存消息的数据
const newMsgArr = combineData.filter(
(item) => item.createTime > this.lastMsg.createTime,
);
if (newMsgArr.length > 0) {
newMsgArr.forEach((item) => {
const { title, actionUrl } = item;
this.addMsg("text", { title, actionUrl });
});
if (voiceUrls && voiceUrls.length > 0) {
voiceUrls.forEach((item) => {
this.addMsg("voice", item);
});
}
}
}
let lastMsg = this.lastMsg || { createTime: 0 };
combineData.forEach((item) => {
if (item.createTime > lastMsg.createTime) {
lastMsg = item;
}
});
this.lastMsg = lastMsg;
}
})
.finally((res) => {
if (this.underDegrade) {
this.getMsgTimer = setTimeout(() => {
this.getMsg();
}, 30 * 1000);
}
});
}
/**
*
* 结束降级方案
* @memberof MessMonitor
*/
stopDegrade() {
console.warn("停止降级!");
this.underDegrade = false;
clearTimeout(this.getMsgTimer);
}
/**
*
*往消息池中增加消息
* @param {string:'text'||'audio'}
* @param {object:{}} msg
* @memberof MessMonitor
*/
addMsg(type, msg) {
if (type === "text") {
this.textMsgPool.push(msg);
} else {
this.audioMsgPool.push(msg);
}
}
/**
*
* 输出消息池中的消息
* @param {string:'text'||'audio'}
* @memberof MessMonitor
*/
outPutMsg() {
if (this.textMsgPool.length + this.audioMsgPool.length === 0) {
return;
}
const msgPool = {
textMsgPool: this.textMsgPool,
audioMsgPool: this.audioMsgPool,
};
this.subscribers.forEach((cb) => {
cb(msgPool);
});
this.textMsgPool = [];
this.audioMsgPool = [];
}
/**
*
* 暴露给外界订阅消息池数据,如果消息池有数据每3秒会统一执行一次订阅函数
* @param {function} cb
* @return {null}
* @memberof MessMonitor
*/
subscribeMsg(cb) {
if (!(cb instanceof Function)) {
console.log("订阅者非函数!");
return;
}
this.subscribers.push(cb);
}
/**
*
* 消息池的消息变化了触发,通知所有订阅者更新视图,
* @memberof MessMonitor
*/
msgChanged() {
this.subscribers.forEach((cb) => {
cb();
});
}
}
// 获取用户信息
function getInstance() {
Ajax.get("getUserInfo").then((res) => {
if (res && res.code === "0000" && res.data) {
const userInfo = {
...res.data,
cryptoPhone: res.data.mobile.replace(
/(\d{3})(\d+)(\d{4})/,
(a, b, c, d) => b + c.replace(/\d/g, "*") + d,
),
};
window.xPartner_msIns = new MessMonitor(`wss://${ENV}xiaoxi.fit.dmall.com:8106/wsServer?token=${userInfo.token}`, userInfo);
}
});
}
getInstance();