diff --git a/internal/api/init.go b/internal/api/init.go index fe8ac1cd0..e5338fc57 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -26,7 +26,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" - "github.com/openimsdk/tools/utils/jsonutil" ) type Config struct { @@ -96,10 +95,7 @@ func Start(ctx context.Context, index int, cfg *Config) error { etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } + prommetrics.Register(ctx, etcdClient, prommetrics.APIKeyName, registerIP, prometheusPort) } else { prometheusPort, err = datautil.GetElemByIndex(cfg.API.Prometheus.Ports, index) if err != nil { diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go index 6f17953ae..2bf9d495d 100644 --- a/internal/api/prometheus_discovery.go +++ b/internal/api/prometheus_discovery.go @@ -38,7 +38,7 @@ func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { } func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { - eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKeyPrefix(key)) if err != nil { // Log and respond with an error if preparation fails. apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 1ac97eeb1..6507fa3be 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -26,7 +26,6 @@ import ( "syscall" "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -178,10 +177,7 @@ func (m *MsgTransfer) Start(index int, cfg *Config) error { etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } + prommetrics.Register(context.TODO(), etcdClient, prommetrics.MessageTransferKeyName, registerIP, prometheusPort) } else { prometheusPort, err = datautil.GetElemByIndex(cfg.MsgTransfer.Prometheus.Ports, index) if err != nil { diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go index 8f03bc2ae..b45902d63 100644 --- a/pkg/common/prommetrics/discovery.go +++ b/pkg/common/prommetrics/discovery.go @@ -1,6 +1,13 @@ package prommetrics -import "fmt" +import ( + "context" + "fmt" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/jsonutil" + clientv3 "go.etcd.io/etcd/client/v3" +) const ( APIKeyName = "api" @@ -17,8 +24,12 @@ type RespTarget struct { Labels map[string]string `json:"labels"` } -func BuildDiscoveryKey(name string) string { - return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) +func BuildDiscoveryKeyPrefix(name string) string { + return fmt.Sprintf("%s/%s/%s/", "openim", "prometheus_discovery", name) +} + +func BuildDiscoveryKey(name string, host string, port int) string { + return fmt.Sprintf("%s/%s/%s/%s:%d", "openim", "prometheus_discovery", name, host, port) } func BuildDefaultTarget(host string, ip int) Target { @@ -29,3 +40,30 @@ func BuildDefaultTarget(host string, ip int) Target { }, } } + +func Register(ctx context.Context, etcdClient *clientv3.Client, rpcRegisterName string, registerIP string, prometheusPort int) error { + // create lease + leaseResp, err := etcdClient.Grant(ctx, 30) + if err != nil { + return errs.WrapMsg(err, "failed to create lease in etcd") + } + // release + keepAliveChan, err := etcdClient.KeepAlive(ctx, leaseResp.ID) + if err != nil { + return errs.WrapMsg(err, "failed to keep alive lease") + } + // release resp + go func() { + for range keepAliveChan { + } + }() + putOpts := []clientv3.OpOption{} + if leaseResp != nil { + putOpts = append(putOpts, clientv3.WithLease(leaseResp.ID)) + } + _, err = etcdClient.Put(ctx, BuildDiscoveryKey(rpcRegisterName, registerIP, prometheusPort), jsonutil.StructToJsonString(BuildDefaultTarget(registerIP, prometheusPort)), putOpts...) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + return nil +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 3d4394c51..c6aaacc2f 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -29,7 +29,6 @@ import ( conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/grpc/status" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" @@ -118,10 +117,7 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() - _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) - if err != nil { - return errs.WrapMsg(err, "etcd put err") - } + prommetrics.Register(ctx, etcdClient, rpcRegisterName, registerIP, prometheusPort) } else { prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index) if err != nil {