From 396729fc24fad1606f125ee5b2416752b564944c Mon Sep 17 00:00:00 2001 From: saeipi Date: Sun, 24 Apr 2022 19:14:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=BF=87Watcher=E7=9B=91=E5=90=AC?= =?UTF-8?q?=E5=92=8C=E6=9B=B4=E6=96=B0=E6=9C=8D=E5=8A=A1=EF=BC=8C=E9=81=BF?= =?UTF-8?q?=E5=85=8D=E6=AF=8F=E6=AC=A1=E9=87=8D=E6=96=B0=E8=8E=B7=E5=8F=96?= =?UTF-8?q?OnlineMessageRelay=E6=9C=8D=E5=8A=A1=EF=BC=8C=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E8=AF=B7=E6=B1=82ETCD=E6=AC=A1=E6=95=B0=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E5=86=B3=E4=B8=8B=E9=9D=A2=E7=9A=84warn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit {"level":"warn","ts":"2022-04-22T17:37:26.375+0800","logger":"etcd-client","caller":"v3@v3.5.2/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc00cf55c00/127.0.0.1:2379","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = latest balancer error: last connection error: connection closed before server preface received"} --- internal/push/logic/init.go | 4 +- internal/push/logic/push_to_client.go | 5 +- pkg/grpc-etcdv3/getcdv3/watcher.go | 170 ++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 4 deletions(-) create mode 100644 pkg/grpc-etcdv3/getcdv3/watcher.go diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index d2d127091..596eb6ea1 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -10,11 +10,13 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" + "Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/statistics" "fmt" ) var ( + watcher *getcdv3.Watcher rpcServer RPCServer pushCh PushConsumerHandler pushTerminal []int32 @@ -23,7 +25,7 @@ var ( ) func Init(rpcPort int) { - + watcher = getcdv3.NewWatcher() rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 607d36db0..4580b4136 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -13,13 +13,11 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" - "Open_IM/pkg/grpc-etcdv3/getcdv3" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/utils" "context" "encoding/json" - "strings" ) type OpenIMContent struct { @@ -38,7 +36,8 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingleMsgToUser isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) log.Debug("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) - grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + //grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) + grpcCons := watcher.GetAllConns() //Online push message log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) for _, v := range grpcCons { diff --git a/pkg/grpc-etcdv3/getcdv3/watcher.go b/pkg/grpc-etcdv3/getcdv3/watcher.go new file mode 100644 index 000000000..7abac7c2a --- /dev/null +++ b/pkg/grpc-etcdv3/getcdv3/watcher.go @@ -0,0 +1,170 @@ +package getcdv3 + +import ( + cfg "Open_IM/pkg/common/config" + "context" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" + "strings" + "sync" + "time" +) + +type Watcher struct { + rwLock sync.RWMutex + client *clientv3.Client + kv clientv3.KV + watcher clientv3.Watcher + catalog string + kvs map[string]string + allService []string + schema string + address []string +} + +func NewWatcher() (w *Watcher) { + var ( + catalog string + config clientv3.Config + client *clientv3.Client + kv clientv3.KV + watcher clientv3.Watcher + err error + ) + catalog = cfg.Config.Etcd.EtcdSchema + ":///" + cfg.Config.RpcRegisterName.OpenImOnlineMessageRelayName + + config = clientv3.Config{ + Endpoints: cfg.Config.Etcd.EtcdAddr, // 集群地址 + DialTimeout: time.Duration(5000) * time.Millisecond, // 连接超时 + } + // 1、建立连接 + if client, err = clientv3.New(config); err != nil { + panic(err.Error()) + return + } + // 2、得到KV和观察者 + kv = clientv3.NewKV(client) + watcher = clientv3.NewWatcher(client) + + w = &Watcher{ + client: client, + kv: kv, + watcher: watcher, + catalog: catalog, + kvs: make(map[string]string), + allService: make([]string, 0), + schema: cfg.Config.Etcd.EtcdSchema, + address: cfg.Config.Etcd.EtcdAddr, + } + return +} + +// 监听变化 +func (w *Watcher) Run() (err error) { + var ( + resp *clientv3.GetResponse + kvpair *mvccpb.KeyValue + watchStartRevision int64 + watchChan clientv3.WatchChan + watchResp clientv3.WatchResponse + watchEvent *clientv3.Event + key string + value string + ) + + // 1、get目录下的所有键值对,并且获知当前集群的revision + if resp, err = w.kv.Get(context.TODO(), w.catalog, clientv3.WithPrefix()); err != nil { + return + } + for _, kvpair = range resp.Kvs { + key = string(kvpair.Key) + value = string(kvpair.Value) + w.kvs[key] = value + } + w.updateServices() + + // 2、从该revision向后监听变化事件 + go func() { + // 从GET时刻的后续版本开始监听变化 + watchStartRevision = resp.Header.Revision + 1 + // 监听目录的后续变化 + watchChan = w.watcher.Watch(context.TODO(), w.catalog, clientv3.WithRev(watchStartRevision), clientv3.WithPrefix()) + // 处理监听事件 + for watchResp = range watchChan { + for _, watchEvent = range watchResp.Events { + switch watchEvent.Type { + case mvccpb.PUT: // 任务保存事件 + w.rwLock.Lock() + + key = string(watchEvent.Kv.Key) + value = string(watchEvent.Kv.Value) + w.kvs[key] = value + w.updateServices() + + w.rwLock.Unlock() + case mvccpb.DELETE: // 任务被删除了 + w.rwLock.Lock() + + key = string(watchEvent.Kv.Key) + delete(w.kvs, key) + w.updateServices() + + w.rwLock.Unlock() + } + } + } + }() + return +} + +func (w *Watcher) updateServices() { + var ( + maps map[string]string + key string + serviceName string + ) + w.allService = make([]string, 0) + maps = make(map[string]string) + for key, _ = range w.kvs { + serviceName = getServiceName(key) + if _, ok := maps[serviceName]; ok == true { + continue + } + maps[serviceName] = serviceName + w.allService = append(w.allService, serviceName) + } +} + +func getServiceName(key string) (name string) { + var ( + index int + str string + ) + index = strings.LastIndex(key, "///") + str = key[index+len("///"):] + index = strings.Index(str, "/") + name = str[:index] + return +} + +func (w *Watcher) GetAllConns() (conns []*grpc.ClientConn) { + var ( + services []string + service string + clientConn *grpc.ClientConn + ) + w.rwLock.RLock() + services = w.allService + w.rwLock.RUnlock() + + conns = make([]*grpc.ClientConn, 0) + for _, service = range services { + clientConn = GetConn(w.schema, strings.Join(w.address, ","), service) + if clientConn == nil { + continue + } + conns = append(conns, clientConn) + } + return +}