fix 单容器单进程模式中etcd注册的服务和单容器多进程时冲突问题

pull/141/head
ltf 4 years ago
parent 01b7fa4fa0
commit cc4f1f1f07

@ -58,6 +58,7 @@ kafka:
serverip: 0.0.0.0 serverip: 0.0.0.0
# endpoints 内部组件间访问的端点host名称访问时可以内部直接访问 host:port 来访问 # endpoints 内部组件间访问的端点host名称访问时可以内部直接访问 host:port 来访问
# 如果需要使用ip访问比如在单容器里面运行所有组件时可以直接把下列名称直接改为ip地址比如0.0.0.0
endpoints: endpoints:
api: openim_api api: openim_api
cmsapi: openim_cms_api cmsapi: openim_cms_api

@ -7,11 +7,12 @@ import (
"Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
http2 "Open_IM/pkg/common/http" http2 "Open_IM/pkg/common/http"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gin-gonic/gin"
"net/http" "net/http"
"github.com/gin-gonic/gin"
"github.com/spf13/viper"
) )
type ParamsLogin struct { type ParamsLogin struct {
@ -46,7 +47,7 @@ func Login(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"errCode": constant.PasswordErr, "errMsg": "Mobile phone number is not registered"}) c.JSON(http.StatusOK, gin.H{"errCode": constant.PasswordErr, "errMsg": "Mobile phone number is not registered"})
return return
} }
url := fmt.Sprintf("http://%s:10000/auth/user_token", utils.ServerIP) url := fmt.Sprintf("http://%s:10000/auth/user_token", viper.GetString("endpoints.api"))
openIMGetUserToken := api.UserTokenReq{} openIMGetUserToken := api.UserTokenReq{}
openIMGetUserToken.OperationID = params.OperationID openIMGetUserToken.OperationID = params.OperationID
openIMGetUserToken.Platform = params.Platform openIMGetUserToken.Platform = params.Platform

@ -8,12 +8,13 @@ import (
"Open_IM/pkg/common/db/mysql_model/im_mysql_model" "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
http2 "Open_IM/pkg/common/http" http2 "Open_IM/pkg/common/http"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http" "github.com/spf13/viper"
) )
type ParamsSetPassword struct { type ParamsSetPassword struct {
@ -48,7 +49,7 @@ func SetPassword(c *gin.Context) {
return return
} }
} }
url := fmt.Sprintf("http://%s:10000/auth/user_register", utils.ServerIP) url := fmt.Sprintf("http://%s:10000/auth/user_register", viper.GetString("endpoints.api"))
openIMRegisterReq := api.UserRegisterReq{} openIMRegisterReq := api.UserRegisterReq{}
openIMRegisterReq.OperationID = params.OperationID openIMRegisterReq.OperationID = params.OperationID
openIMRegisterReq.Platform = params.Platform openIMRegisterReq.Platform = params.Platform

@ -12,10 +12,12 @@ import (
"context" "context"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"github.com/golang/protobuf/proto"
"net" "net"
"strings" "strings"
"github.com/golang/protobuf/proto"
"github.com/spf13/viper"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -45,7 +47,8 @@ func (r *RPCServer) run() {
srv := grpc.NewServer() srv := grpc.NewServer()
defer srv.GracefulStop() defer srv.GracefulStop()
pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r) pbRelay.RegisterOnlineMessageRelayServiceServer(srv, r)
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10) host := viper.GetString("endpoints.msg_gateway")
err = getcdv3.RegisterEtcd4Unique(r.etcdSchema, strings.Join(r.etcdAddr, ","), host, r.rpcPort, r.rpcRegisterName, 10)
if err != nil { if err != nil {
log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error()) log.ErrorByKv("register push message rpc to etcd err", "", "err", err.Error())
} }

@ -4,12 +4,14 @@ import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/push" pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"google.golang.org/grpc"
"net" "net"
"strings" "strings"
"github.com/spf13/viper"
"google.golang.org/grpc"
) )
type RPCServer struct { type RPCServer struct {
@ -37,7 +39,8 @@ func (r *RPCServer) run() {
srv := grpc.NewServer() srv := grpc.NewServer()
defer srv.GracefulStop() defer srv.GracefulStop()
pbPush.RegisterPushMsgServiceServer(srv, r) pbPush.RegisterPushMsgServiceServer(srv, r)
err = getcdv3.RegisterEtcd(r.etcdSchema, strings.Join(r.etcdAddr, ","), ip, r.rpcPort, r.rpcRegisterName, 10) host := viper.GetString("endpoints.push")
err = getcdv3.RegisterEtcd(r.etcdSchema, strings.Join(r.etcdAddr, ","), host, r.rpcPort, r.rpcRegisterName, 10)
if err != nil { if err != nil {
log.ErrorByKv("register push module rpc to etcd err", "", "err", err.Error()) log.ErrorByKv("register push module rpc to etcd err", "", "err", err.Error())
} }

@ -16,6 +16,7 @@ import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -88,10 +89,11 @@ func (rpc *rpcAuth) Run() {
//service registers with etcd //service registers with etcd
pbAuth.RegisterAuthServer(srv, rpc) pbAuth.RegisterAuthServer(srv, rpc)
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName, 10) host := viper.GetString("endpoints.rpc_auth")
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), host, rpc.rpcPort, rpc.rpcRegisterName, 10)
if err != nil { if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(), log.NewError("0", "RegisterEtcd failed ", err.Error(),
rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName) rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), host, rpc.rpcPort, rpc.rpcRegisterName)
return return
} }
log.NewInfo("0", "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName) log.NewInfo("0", "RegisterAuthServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName)

@ -14,11 +14,13 @@ import (
sdkws "Open_IM/pkg/proto/sdk_ws" sdkws "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"google.golang.org/grpc"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/spf13/viper"
"google.golang.org/grpc"
) )
type friendServer struct { type friendServer struct {
@ -56,7 +58,8 @@ func (s *friendServer) Run() {
defer srv.GracefulStop() defer srv.GracefulStop()
//User friend related services register to etcd //User friend related services register to etcd
pbFriend.RegisterFriendServer(srv, s) pbFriend.RegisterFriendServer(srv, s)
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName, 10) host := viper.GetString("endpoints.rpc_friend")
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), host, s.rpcPort, s.rpcRegisterName, 10)
if err != nil { if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName) log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName)
return return

@ -20,6 +20,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -57,7 +58,8 @@ func (s *groupServer) Run() {
defer srv.GracefulStop() defer srv.GracefulStop()
//Service registers with etcd //Service registers with etcd
pbGroup.RegisterGroupServer(srv, s) pbGroup.RegisterGroupServer(srv, s)
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName, 10) host := viper.GetString("endpoints.rpc_group")
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), host, s.rpcPort, s.rpcRegisterName, 10)
if err != nil { if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error()) log.NewError("0", "RegisterEtcd failed ", err.Error())
return return
@ -642,9 +644,9 @@ func (s *groupServer) TransferGroupOwner(_ context.Context, req *pbGroup.Transfe
} }
func (s *groupServer) GetGroupById(_ context.Context, req *pbGroup.GetGroupByIdReq) (*pbGroup.GetGroupByIdResp, error) { func (s *groupServer) GetGroupById(_ context.Context, req *pbGroup.GetGroupByIdReq) (*pbGroup.GetGroupByIdResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbGroup.GetGroupByIdResp{CMSGroup: &pbGroup.CMSGroup{ resp := &pbGroup.GetGroupByIdResp{CMSGroup: &pbGroup.CMSGroup{
GroupInfo: &open_im_sdk.GroupInfo{}, GroupInfo: &open_im_sdk.GroupInfo{},
}} }}
group, err := imdb.GetGroupById(req.GroupId) group, err := imdb.GetGroupById(req.GroupId)
if err != nil { if err != nil {
@ -682,7 +684,7 @@ func (s *groupServer) GetGroup(_ context.Context, req *pbGroup.GetGroupReq) (*pb
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsByName error", req.String()) log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsByName error", req.String())
return resp, http.WrapError(constant.ErrDB) return resp, http.WrapError(constant.ErrDB)
} }
nums, err := imdb.GetGroupsCountNum(db.Group{GroupName:req.GroupName}) nums, err := imdb.GetGroupsCountNum(db.Group{GroupName: req.GroupName})
if err != nil { if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsCountNum error", err.Error()) log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupsCountNum error", err.Error())
return resp, http.WrapError(constant.ErrDB) return resp, http.WrapError(constant.ErrDB)
@ -707,7 +709,7 @@ func (s *groupServer) GetGroup(_ context.Context, req *pbGroup.GetGroupReq) (*pb
CreatorUserID: v.CreatorUserID, CreatorUserID: v.CreatorUserID,
}, },
GroupMasterName: groupMember.Nickname, GroupMasterName: groupMember.Nickname,
GroupMasterId: groupMember.UserID, GroupMasterId: groupMember.UserID,
}) })
} }
return resp, nil return resp, nil
@ -747,7 +749,7 @@ func (s *groupServer) GetGroups(_ context.Context, req *pbGroup.GetGroupsReq) (*
Status: v.Status, Status: v.Status,
CreatorUserID: v.CreatorUserID, CreatorUserID: v.CreatorUserID,
}, },
GroupMasterId: groupMember.UserID, GroupMasterId: groupMember.UserID,
GroupMasterName: groupMember.Nickname, GroupMasterName: groupMember.Nickname,
}) })
} }
@ -786,11 +788,11 @@ func (s *groupServer) OperateUserRole(_ context.Context, req *pbGroup.OperateUse
} }
func (s *groupServer) GetGroupMembersCMS(_ context.Context, req *pbGroup.GetGroupMembersCMSReq) (*pbGroup.GetGroupMembersCMSResp, error) { func (s *groupServer) GetGroupMembersCMS(_ context.Context, req *pbGroup.GetGroupMembersCMSReq) (*pbGroup.GetGroupMembersCMSResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "args:", req.String()) log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "args:", req.String())
resp := &pbGroup.GetGroupMembersCMSResp{} resp := &pbGroup.GetGroupMembersCMSResp{}
groupMembers, err := imdb.GetGroupMembersByGroupIdCMS(req.GroupId, req.UserName, req.Pagination.ShowNumber, req.Pagination.PageNumber) groupMembers, err := imdb.GetGroupMembersByGroupIdCMS(req.GroupId, req.UserName, req.Pagination.ShowNumber, req.Pagination.PageNumber)
if err != nil { if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(),"GetGroupMembersByGroupIdCMS Error", err.Error()) log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupMembersByGroupIdCMS Error", err.Error())
return resp, http.WrapError(constant.ErrDB) return resp, http.WrapError(constant.ErrDB)
} }
groupMembersCount, err := imdb.GetGroupMembersCount(req.GroupId, req.UserName) groupMembersCount, err := imdb.GetGroupMembersCount(req.GroupId, req.UserName)
@ -802,13 +804,13 @@ func (s *groupServer) GetGroupMembersCMS(_ context.Context, req *pbGroup.GetGrou
resp.MemberNums = groupMembersCount resp.MemberNums = groupMembersCount
for _, groupMember := range groupMembers { for _, groupMember := range groupMembers {
resp.Members = append(resp.Members, &open_im_sdk.GroupMemberFullInfo{ resp.Members = append(resp.Members, &open_im_sdk.GroupMemberFullInfo{
GroupID: req.GroupId, GroupID: req.GroupId,
UserID: groupMember.UserID, UserID: groupMember.UserID,
RoleLevel: groupMember.RoleLevel, RoleLevel: groupMember.RoleLevel,
JoinTime: groupMember.JoinTime.Unix(), JoinTime: groupMember.JoinTime.Unix(),
Nickname: groupMember.Nickname, Nickname: groupMember.Nickname,
FaceURL: groupMember.FaceURL, FaceURL: groupMember.FaceURL,
JoinSource: groupMember.JoinSource, JoinSource: groupMember.JoinSource,
}) })
} }
resp.Pagination = &open_im_sdk.ResponsePagination{ resp.Pagination = &open_im_sdk.ResponsePagination{
@ -870,7 +872,7 @@ func (s *groupServer) AddGroupMembersCMS(_ context.Context, req *pbGroup.AddGrou
if err := imdb.InsertIntoGroupMember(groupMember); err != nil { if err := imdb.InsertIntoGroupMember(groupMember); err != nil {
log.NewError(req.OperationId, utils.GetSelfFuncName(), "InsertIntoGroupMember failed", req.String()) log.NewError(req.OperationId, utils.GetSelfFuncName(), "InsertIntoGroupMember failed", req.String())
resp.Failed = append(resp.Failed, userId) resp.Failed = append(resp.Failed, userId)
} else { } else {
resp.Success = append(resp.Success, userId) resp.Success = append(resp.Success, userId)
chat.MemberInvitedNotification(req.OperationId, req.GroupId, req.OpUserId, "admin add", resp.Success) chat.MemberInvitedNotification(req.OperationId, req.GroupId, req.OpUserId, "admin add", resp.Success)
} }
@ -879,9 +881,8 @@ func (s *groupServer) AddGroupMembersCMS(_ context.Context, req *pbGroup.AddGrou
return resp, nil return resp, nil
} }
func (s *groupServer) GetUserReqApplicationList(_ context.Context, req *pbGroup.GetUserReqApplicationListReq) (*pbGroup.GetUserReqApplicationListResp, error) { func (s *groupServer) GetUserReqApplicationList(_ context.Context, req *pbGroup.GetUserReqApplicationListReq) (*pbGroup.GetUserReqApplicationListResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbGroup.GetUserReqApplicationListResp{} resp := &pbGroup.GetUserReqApplicationListResp{}
groupRequests, err := imdb.GetUserReqGroupByUserID(req.UserID) groupRequests, err := imdb.GetUserReqGroupByUserID(req.UserID)
if err != nil { if err != nil {

@ -7,10 +7,12 @@ import (
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbChat "Open_IM/pkg/proto/chat" pbChat "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"google.golang.org/grpc"
"net" "net"
"strconv" "strconv"
"strings" "strings"
"github.com/spf13/viper"
"google.golang.org/grpc"
) )
type rpcChat struct { type rpcChat struct {
@ -51,7 +53,8 @@ func (rpc *rpcChat) Run() {
//service registers with etcd //service registers with etcd
pbChat.RegisterChatServer(srv, rpc) pbChat.RegisterChatServer(srv, rpc)
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), utils.ServerIP, rpc.rpcPort, rpc.rpcRegisterName, 10) host := viper.GetString("endpoints.rpc_msg")
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), host, rpc.rpcPort, rpc.rpcRegisterName, 10)
if err != nil { if err != nil {
log.Error("", "", "register rpc get_token to etcd failed, err = %s", err.Error()) log.Error("", "", "register rpc get_token to etcd failed, err = %s", err.Error())
return return

@ -21,6 +21,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/spf13/viper"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -59,7 +60,8 @@ func (s *userServer) Run() {
defer srv.GracefulStop() defer srv.GracefulStop()
//Service registers with etcd //Service registers with etcd
pbUser.RegisterUserServer(srv, s) pbUser.RegisterUserServer(srv, s)
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName, 10) host := viper.GetString("endpoints.rpc_user")
err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), host, s.rpcPort, s.rpcRegisterName, 10)
if err != nil { if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName) log.NewError("0", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), ip, s.rpcPort, s.rpcRegisterName)
return return

@ -5,6 +5,12 @@ import (
"net" "net"
) )
// Deprecated: This value is no longer recommended.
// 不在建议使用该值主要因为该值在每个组件部署时无法表示各自的实际ip建议使用viper读取目标配置
//
// 比如:
//
// 需要读取rpc_auth地址时viper.GetString("endpoints.rpc_auth")
var ServerIP = "" var ServerIP = ""
func init() { func init() {

Loading…
Cancel
Save