@ -2,6 +2,7 @@ package getcdv3
import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
@ -38,12 +39,15 @@ func RegisterEtcd4Unique(schema, etcdAddr, myHost string, myPort int, serviceNam
//etcdAddr separated by commas
func RegisterEtcd ( schema , etcdAddr , myHost string , myPort int , serviceName string , ttl int ) error {
operationID := utils . OperationIDGenerator ( )
args := schema + " " + etcdAddr + " " + myHost + " " + serviceName + " " + utils . Int32ToString ( int32 ( myPort ) )
ttl = ttl * 3
cli , err := clientv3 . New ( clientv3 . Config {
Endpoints : strings . Split ( etcdAddr , "," ) , DialTimeout : 5 * time . Second } )
log . Info ( "" , "RegisterEtcd, " , schema , etcdAddr , myHost , myPort , serviceName , ttl )
log . Info ( operationID , "RegisterEtcd args: " , args , ttl )
if err != nil {
log . Error ( operationID , "clientv3.New failed " , args , ttl , err . Error ( ) )
return fmt . Errorf ( "create etcd clientv3 client failed, errmsg:%v, etcd addr:%s" , err , etcdAddr )
}
@ -51,8 +55,10 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin
ctx , cancel := context . WithCancel ( context . Background ( ) )
resp , err := cli . Grant ( ctx , int64 ( ttl ) )
if err != nil {
log . Error ( operationID , "Grant failed " , err . Error ( ) , ctx , ttl )
return fmt . Errorf ( "grant failed" )
}
log . Info ( operationID , "Grant ok, resp ID " , resp . ID )
// schema:///serviceName/ip:port ->ip:port
serviceValue := net . JoinHostPort ( myHost , strconv . Itoa ( myPort ) )
@ -60,24 +66,26 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin
//set key->value
if _ , err := cli . Put ( ctx , serviceKey , serviceValue , clientv3 . WithLease ( resp . ID ) ) ; err != nil {
log . Error ( operationID , "cli.Put failed " , err . Error ( ) , ctx , args , resp . ID )
return fmt . Errorf ( "put failed, errmsg:%v, key:%s, value:%s" , err , serviceKey , serviceValue )
}
//keepalive
kresp , err := cli . KeepAlive ( ctx , resp . ID )
if err != nil {
log . Error ( operationID , "KeepAlive failed " , err . Error ( ) , args , resp . ID )
return fmt . Errorf ( "keepalive failed, errmsg:%v, lease id:%d" , err , resp . ID )
}
//log.Info("", "RegisterEtcd ok ")
log . Info ( operationID , "RegisterEtcd ok " , args )
go func ( ) {
for {
select {
case pv , ok := <- kresp :
if ok == true {
log . Debug ( "" , "KeepAlive kresp ok" , pv )
log . Debug ( operationID , "KeepAlive kresp ok" , pv )
} else {
log . Error ( "" , "KeepAlive kresp failed" , pv )
log . Error ( operationID , "KeepAlive kresp failed" , pv )
t := time . NewTicker ( time . Duration ( ttl / 2 ) * time . Second )
for {
select {
@ -86,17 +94,16 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin
ctx , _ := context . WithCancel ( context . Background ( ) )
resp , err := cli . Grant ( ctx , int64 ( ttl ) )
if err != nil {
log . Error ( "" , "Grant failed " , err . Error ( ) )
log . Error ( operationID , "Grant failed " , err . Error ( ) )
continue
}
if _ , err := cli . Put ( ctx , serviceKey , serviceValue , clientv3 . WithLease ( resp . ID ) ) ; err != nil {
log . Error ( "" , "etcd Put failed " , err . Error ( ) , serviceKey, serviceValue , resp . ID )
log . Error ( operationID , "etcd Put failed " , err . Error ( ) , args, "resp ID: " , resp . ID )
continue
} else {
log . Info ( "" , "etcd Put ok" , serviceKey , serviceValue , resp . ID )
log . Info ( operationID , "etcd Put ok " , args , "resp ID: " , resp . ID )
}
}
}
}