From 722ada3f0e814f2e8295f79270454bc4e8427cae Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 10 Jun 2022 18:55:21 +0800 Subject: [PATCH 01/15] batch build --- script/batch_start_all.sh | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/script/batch_start_all.sh b/script/batch_start_all.sh index aaa6177ae..6ccf74281 100644 --- a/script/batch_start_all.sh +++ b/script/batch_start_all.sh @@ -31,15 +31,7 @@ done echo "wait all start finish....." -for i in ${need_to_start_server_shell[*]}; do - chmod +x $i - ./$i & - if [ $? -ne 0 ]; then - exit -1 - fi -done - - +exit 0 success_num=0 for ((i = 0; i < ${#need_to_start_server_shell[*]}; i++)); do @@ -49,11 +41,11 @@ for ((i = 0; i < ${#need_to_start_server_shell[*]}; i++)); do echo ${build_pid_array[i]} " " $stat if [ $stat == 0 ] then - echo -e "${GREEN_PREFIX}${need_to_start_server_shell[$i]} successfully be built ${COLOR_SUFFIX}\n" + # echo -e "${GREEN_PREFIX}${need_to_start_server_shell[$i]} successfully be built ${COLOR_SUFFIX}\n" let success_num=$success_num+1 else - echo -e "${RED_PREFIX}${need_to_start_server_shell[$i]} build failed ${COLOR_SUFFIX}\n" + #echo -e "${RED_PREFIX}${need_to_start_server_shell[$i]} build failed ${COLOR_SUFFIX}\n" exit -1 fi done From a6a80a8b936fb6887cea2c72f09463cbe7992f50 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 10 Jun 2022 21:37:48 +0800 Subject: [PATCH 02/15] batch build --- script/batch_start_all.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/script/batch_start_all.sh b/script/batch_start_all.sh index 6ccf74281..c2d78c509 100644 --- a/script/batch_start_all.sh +++ b/script/batch_start_all.sh @@ -21,12 +21,14 @@ echo "==========================================================">>../logs/openI echo "==========================================================">>../logs/openIM.log 2>&1 & build_pid_array=() - +idx=0 for i in ${need_to_start_server_shell[*]}; do chmod +x $i ./$i & build_pid=$! - build_pid_array[i]=$build_pid + echo "build_pid " $build_pid + build_pid_array[idx]=$build_pid + let idx=idx+1 done echo "wait all start finish....." From dc10c7e3e76ff1450b9b9158dfabe8b89454d3ee Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 09:23:51 +0800 Subject: [PATCH 03/15] batch push --- internal/push/logic/push_to_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 81fb024ca..a04bc9f8e 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -47,7 +47,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) } //Online push message - log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) + log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) for _, v := range grpcCons { msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) reply, err := msgClient.OnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: []string{pushMsg.PushToUserID}}) From 952033f409427353319e4a3be9b7a44039f9f973 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 09:26:49 +0800 Subject: [PATCH 04/15] compile --- internal/demo/register/onboarding_process.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/demo/register/onboarding_process.go b/internal/demo/register/onboarding_process.go index a41552962..56be5bc78 100644 --- a/internal/demo/register/onboarding_process.go +++ b/internal/demo/register/onboarding_process.go @@ -12,7 +12,6 @@ import ( commonPb "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" - "encoding/json" "errors" "fmt" "github.com/golang/protobuf/proto" From f232ffb5cc46227f53aefada0c7932ff8d489186 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 09:50:24 +0800 Subject: [PATCH 05/15] batch build --- script/batch_build_all_service.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/batch_build_all_service.sh b/script/batch_build_all_service.sh index 741da029d..bf780008f 100644 --- a/script/batch_build_all_service.sh +++ b/script/batch_build_all_service.sh @@ -41,7 +41,7 @@ for ((i = 0; i < ${#service_source_root[*]}; i++)); do echo "wait pid: " ${build_pid_array[i]} ${service_names[$i]} wait ${build_pid_array[i]} stat=$? - echo ${build_pid_array[i]} " " $stat + echo ${service_names[$i]} "pid: " ${build_pid_array[i]} "stat: " $stat if [ $stat == 0 ] then echo -e "${GREEN_PREFIX}${service_names[$i]} successfully be built ${COLOR_SUFFIX}\n" From 9216a085cf0f45cacc9432b3505625c5f9c3e07e Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 10:42:26 +0800 Subject: [PATCH 06/15] log --- internal/msg_gateway/gate/batch_push.go | 4 ++++ internal/msg_gateway/gate/rpc_server.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/msg_gateway/gate/batch_push.go b/internal/msg_gateway/gate/batch_push.go index 8f556e4d5..ebb2fa05f 100644 --- a/internal/msg_gateway/gate/batch_push.go +++ b/internal/msg_gateway/gate/batch_push.go @@ -46,15 +46,18 @@ func (r *RPCServer) GetSingleUserMsgForPushPlatforms(operationID string, msgData func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) []*sdk_ws.MsgData { userConn := ws.getUserConn(pushToUserID, platformID) if userConn == nil { + log.Debug(operationID, "userConn == nil") return []*sdk_ws.MsgData{msgData} } if msgData.Seq <= userConn.PushedMaxSeq { + log.Debug(operationID, "msgData.Seq <= userConn.PushedMaxSeq", msgData.Seq, userConn.PushedMaxSeq) return nil } msgList := r.GetSingleUserMsg(operationID, msgData.Seq, pushToUserID) if msgList == nil { + log.Debug(operationID, "GetSingleUserMsg msgList == nil", msgData.Seq, userConn.PushedMaxSeq) userConn.PushedMaxSeq = msgData.Seq return []*sdk_ws.MsgData{msgData} } @@ -65,6 +68,7 @@ func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws. userConn.PushedMaxSeq = v.Seq } } + log.Debug(operationID, "GetSingleUserMsg msgList len ", len(msgList), userConn.PushedMaxSeq) return msgList } diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 86a25850d..0a8754257 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -207,10 +207,10 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online } log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList) needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) - log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms ", req.MsgData.Seq, v, platformList, len(needPushMapList)) + log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList)) for platform, list := range needPushMapList { if list != nil { - log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms ", "userID: ", v, "platform: ", platform, "push msg num:", len(list)) + log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:", len(list)) for _, v := range list { req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) } From dad40d60b38af79f46d8a938025ebce0937d7faa Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 10:54:17 +0800 Subject: [PATCH 07/15] debug --- internal/msg_gateway/gate/batch_push.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/msg_gateway/gate/batch_push.go b/internal/msg_gateway/gate/batch_push.go index ebb2fa05f..34d7c0792 100644 --- a/internal/msg_gateway/gate/batch_push.go +++ b/internal/msg_gateway/gate/batch_push.go @@ -36,10 +36,10 @@ func (r *RPCServer) GenPullSeqList(currentSeq uint32, operationID string, userID func (r *RPCServer) GetSingleUserMsgForPushPlatforms(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformIDList []int) map[int][]*sdk_ws.MsgData { user2PushMsg := make(map[int][]*sdk_ws.MsgData, 0) - for _, v := range platformIDList { - user2PushMsg[v] = r.GetSingleUserMsgForPush(operationID, msgData, pushToUserID, v) - log.Info(operationID, "GetSingleUserMsgForPush", msgData.Seq, pushToUserID, v, "len:", len(user2PushMsg[v])) - } + //for _, v := range platformIDList { + // user2PushMsg[v] = r.GetSingleUserMsgForPush(operationID, msgData, pushToUserID, v) + // log.Info(operationID, "GetSingleUserMsgForPush", msgData.Seq, pushToUserID, v, "len:", len(user2PushMsg[v])) + //} return user2PushMsg } From 9619584de563832e3f9681007865d503c66372ee Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 11:27:58 +0800 Subject: [PATCH 08/15] debug --- internal/msg_gateway/gate/batch_push.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/msg_gateway/gate/batch_push.go b/internal/msg_gateway/gate/batch_push.go index 34d7c0792..d166a4159 100644 --- a/internal/msg_gateway/gate/batch_push.go +++ b/internal/msg_gateway/gate/batch_push.go @@ -36,14 +36,16 @@ func (r *RPCServer) GenPullSeqList(currentSeq uint32, operationID string, userID func (r *RPCServer) GetSingleUserMsgForPushPlatforms(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformIDList []int) map[int][]*sdk_ws.MsgData { user2PushMsg := make(map[int][]*sdk_ws.MsgData, 0) - //for _, v := range platformIDList { - // user2PushMsg[v] = r.GetSingleUserMsgForPush(operationID, msgData, pushToUserID, v) - // log.Info(operationID, "GetSingleUserMsgForPush", msgData.Seq, pushToUserID, v, "len:", len(user2PushMsg[v])) - //} + for _, v := range platformIDList { + user2PushMsg[v] = r.GetSingleUserMsgForPush(operationID, msgData, pushToUserID, v) + log.Info(operationID, "GetSingleUserMsgForPush", msgData.Seq, pushToUserID, v, "len:", len(user2PushMsg[v])) + } return user2PushMsg } func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) []*sdk_ws.MsgData { + return []*sdk_ws.MsgData{msgData} + userConn := ws.getUserConn(pushToUserID, platformID) if userConn == nil { log.Debug(operationID, "userConn == nil") From ae6c09aae2854d35fa41ad7e0498356b5d3c7f58 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 11:53:04 +0800 Subject: [PATCH 09/15] debug log --- internal/msg_gateway/gate/rpc_server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 0a8754257..5f5d3646c 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -212,13 +212,17 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online if list != nil { log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:", len(list)) for _, v := range list { + log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList)) req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) + log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) } + replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) if err != nil { log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) continue } + log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len()) resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { tempT.OnlinePush = true From 8269e61163d5f85ec31e7adaa898896ffb96a902 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 12:03:12 +0800 Subject: [PATCH 10/15] debug log --- internal/msg_gateway/gate/rpc_server.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 5f5d3646c..af6e62e16 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -254,7 +254,13 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online }, nil } func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) { - msgBytes, _ := proto.Marshal(wsData) + log.Debug(operationID, "encodeWsData begin", wsData.String()) + msgBytes, err := proto.Marshal(wsData) + if err != nil { + log.NewError(operationID, "Marshal", err.Error()) + return bytes.Buffer{}, utils.Wrap(err, "") + } + log.Debug(operationID, "encodeWsData begin", wsData.String()) mReply := Resp{ ReqIdentifier: constant.WSPushMsg, OperationID: operationID, @@ -262,10 +268,10 @@ func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (by } var replyBytes bytes.Buffer enc := gob.NewEncoder(&replyBytes) - err := enc.Encode(mReply) + err = enc.Encode(mReply) if err != nil { log.NewError(operationID, "data encode err", err.Error()) - return bytes.Buffer{}, err + return bytes.Buffer{}, utils.Wrap(err, "") } return replyBytes, nil } From 85c0e1f330254e910746902d504d190fd1a431be Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 12:09:50 +0800 Subject: [PATCH 11/15] debug log --- internal/msg_gateway/gate/rpc_server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index af6e62e16..582b38e47 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -216,7 +216,8 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) } - + log.Debug(req.OperationID, "r.encodeWsData no string") + log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) if err != nil { log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) From c171728749305d8662affb16de0f1601067ac851 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 12:16:32 +0800 Subject: [PATCH 12/15] fix bug --- internal/msg_gateway/gate/rpc_server.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 582b38e47..2f2dbb7f3 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -211,11 +211,13 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online for platform, list := range needPushMapList { if list != nil { log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:", len(list)) - for _, v := range list { - log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList)) - req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) - log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) - } + //for _, v := range list { + // log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList)) + // req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) + // log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) + //} + req.MsgData.MsgDataList = list + log.Debug(req.OperationID, "r.encodeWsData no string") log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) From 5f813c6f704563763c2fd52964525fb10b4e3ac6 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 12:43:54 +0800 Subject: [PATCH 13/15] fix bug --- internal/msg_gateway/gate/batch_push.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/msg_gateway/gate/batch_push.go b/internal/msg_gateway/gate/batch_push.go index d166a4159..9e08c4ef4 100644 --- a/internal/msg_gateway/gate/batch_push.go +++ b/internal/msg_gateway/gate/batch_push.go @@ -44,6 +44,7 @@ func (r *RPCServer) GetSingleUserMsgForPushPlatforms(operationID string, msgData } func (r *RPCServer) GetSingleUserMsgForPush(operationID string, msgData *sdk_ws.MsgData, pushToUserID string, platformID int) []*sdk_ws.MsgData { + msgData.MsgDataList = nil return []*sdk_ws.MsgData{msgData} userConn := ws.getUserConn(pushToUserID, platformID) From ef23fba147f8a057ca21eaf857991b649280123a Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 12:50:20 +0800 Subject: [PATCH 14/15] log --- internal/msg_gateway/gate/rpc_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 2f2dbb7f3..119afa0dd 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -205,7 +205,7 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online for k, _ := range userConnMap { platformList = append(platformList, k) } - log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList) + log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String()) needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList)) for platform, list := range needPushMapList { From af612e4b9237d99f4343a7264f5b7ea8e2f943c9 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Sat, 11 Jun 2022 13:25:32 +0800 Subject: [PATCH 15/15] log --- internal/msg_gateway/gate/rpc_server.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 119afa0dd..a3a137f0e 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -211,14 +211,15 @@ func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.Online for platform, list := range needPushMapList { if list != nil { log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:", len(list)) - //for _, v := range list { - // log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList)) - // req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) - // log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) - //} - req.MsgData.MsgDataList = list + for _, v := range list { + log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String()) + req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) + log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) + } log.Debug(req.OperationID, "r.encodeWsData no string") + log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String()) + log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) if err != nil {