如何实现一个基于WebSocket的RPC组件

在基于WebSocket的应用中,数据通信往往是分离式的,也就是说,客户端发送请求后,服务器数据响应是完全脱离请求的,当然也允许服务器不做任何响应。RPC(Remote Procedure Call),即远程过程调用。这种模式下,请求与响应是强关联的,一个请求的响应结果只能是正确的期望结果、异常失败或超时,那么它在Cocos Creator中应用场景有哪些呢?当首屏加载进度需要依赖用户登录时,最好的方式是不是同步通信呢?当获取排行榜数据时,是不是严格满足请求和响应的这种一对一应答模式呢?……
下面实现一个伪RPC调用,之所以有个“伪”,是因为本人知识面有限,未发现JS/TS同步阻塞方式,伪同步也仅发现 async/await。

        private readonly invokeMap: Map<string, object> = new Map<string, object>();  //缓存RPC请求
        private readonly invokeOpCode: number = -999999;  //RPC调用前后端通信协议号
        /**
         * 异步RPC调用
         * @param invokeName 远程函数唯一标识
         * @param timeout 调用超时时间,毫秒
         * @param params 远程函数参数,其数组元素仅支持number、string、Uint8Array、ByteArray、Array<number|string|Uint8Array|ByteArray>
         * @returns 远程函数调用结果,仅支持number、string、ByteArray、Array<number|string>
         */
        public async request(invokeName: number | string, params: any[], timeout: number = 3000): Promise<any> {
            const id: string = StringUtils.randomString(16, true); //RPC调用唯一标识,须保证单个连接内为唯一
            let _this = this;
            let invokeObj = { response: { data: null, err: null }, cmd: invokeName, startTime: DateUtils.localTimeMillis };
            let promise: Promise<any> = new Promise<any>((resolve, reject) => {
                const timeoutId = setTimeout(() => {
                    _this.invokeMap.delete(id);
                    reject(new RemotingError(1, '操作超时'));
                }, timeout);
                invokeObj['proxy'] = new Proxy(invokeObj['response'], {
                    set(target, property, value, receiver) {
                        target[property] = value;
                        clearTimeout(timeoutId);
                        if (property === 'data') {
                            resolve(value);
                        } else {
                            reject(new RemotingError(2, value));
                        }
                        return true;
                    }
                });
            });
            this.invokeMap.set(id, invokeObj);
            let ba: ByteArray = new ByteArray();
            ba.writeInt(this.invokeOpCode); // 协议号
            ba.writeUTF(id); // 调用唯一标识
            if (Utils.isNumber(invokeName)) { // 远程方法名是数字
                ba.writeByte(1);
                ba.writeInt(Number(invokeName));
            } else { // 远程方法名是字符串
                ba.writeByte(2);
                ba.writeUTF(String(invokeName));
            }
            // 远程方法参数
            if (!params || params.length == 0) {
                ba.writeShort(0);
            } else {
                ba.writeShort(params.length);
                for (let i = 0; i < params.length; i++) {
                    this.writeValue(ba, params[i]); // 自定义数据序列化
                }
            }
            ba.position = 0;
            this.ws.send(ba.buffer);
            return await promise;
        }

WebSocket onmessage中做如下处理

    let ba: ByteArray = new ByteArray(ev.data);
    ba.position = 0;
    if (ba.readAvailable >= 4) {
        let opcode = ba.readInt(); //协议号
        let bytes: Uint8Array = ba.bytes.slice(4); //去除协议号后的数据
        if (opcode === this.invokeOpCode) { // 处理RPC响应
            let id: string = ba.readUTF();
            let retType: number = ba.readByte(); //返回成功与否,1或0为成功,-1为失败
            let obj: object = this.invokeMap.get(id);
            this.invokeMap.delete(id);
            if (obj) {
                //let cmd = obj['cmd'];
                //let startTime: number = obj['startTime'];
                //let endTime: number = DateUtils.localTimeMillis;
                //console.log('调用rpc方法[' + cmd + ']耗时:', (endTime - startTime));
                let proxy = obj['proxy'];
                if (proxy) {
                    if (retType == 0 || retType == 1) {
                        proxy.data = this.readValue(ba);
                    } else {
                        let err = this.readValue(ba);
                        if (Utils.isString(err)) {
                            proxy.err = err;
                        } else if (retType === -2 && err instanceof ByteArray) {
                            //处理错误码
                            let gameProxy: GameProxy = this.facade.retrieveProxy(GameProxy);
                            gameProxy.processError(err.bytes);
                        }
                    }
                }
            }
        } else if (opcode === pb.protos.MsgId.heart_beat) {
            let sc: pb.protos.ScHeartBeat = pb.protos.ScHeartBeat.decode(bytes);
            if (Utils.isNumber(sc.time)) {
                DateUtils.init(Number(sc.time));
            }
        } else {
            this.facade.sendMessage(new Message(opcode, bytes), true); //派发异步事件
        }
    }

相信聪明的大家都发现了,以上代码的核心原理是为通信协议分配一个连接内的唯一标识,哈哈哈,简单吧。下面是简单使用示例:

// 请求登录
ws.request(pb.protos.MsgId.login, [pb.protos.CsLogin.encode(param).finish()]).then(res => {
    let byteArr: ByteArray = res as ByteArray;
    let sc: pb.protos.ScLogin = pb.protos.ScLogin.decode(byteArr.bytes);
    console.log('登录成功');
    ……
}).catch(err => {
    console.log(`登录失败, ${err}`);
});

// RPC获取排行数据
// 第一个参数为远程方法名
// 第二个参数为远程方法参数,此处仅有一个参数,为proto序列化后的字节数据
ws.request(pb.protos.MsgId.rank_get_list, [pb.protos.CsRankList.encode(param).finish()]).then(res => {
    let byteArr: ByteArray = res as ByteArray;
    let sc: pb.protos.ScRankList = pb.protos.ScRankList.decode(byteArr.bytes);
    ……
}).catch(err => {
    console.log(err);
});

以上实现仍支持分离式请求响应,当然也支持服务器主动推送。