package discovery import ( "context" "errors" "github.com/google/uuid" clientv3 "go.etcd.io/etcd/client/v3" "log" "time" ) // 服务注册的通用接口 type Registrar interface { // 注册服务 Register(service Service) error // 注销服务 DeRegister() error } // 基于Etcd的服务发现中间件,实现Registrar type RegistrarEtcd struct { // 客户端信息 cli *clientv3.Client // 租约信息,基于租约来做健康监测 leaseID clientv3.LeaseID leaseTTL int64 // 续约响应channel leaseKeepAliveRespCh <-chan *clientv3.LeaseKeepAliveResponse } const leaseTTL = 5 // 初始化方法 // 实例化一个初始化了必要字段的数据 func NewRegistrarEtcd(endpoints []string) (*RegistrarEtcd, error) { // 一,没有etcd的地址,直接返回 if len(endpoints) == 0 { return nil, errors.New("endpoints is empty") } // 二,连接etcd cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: 3 * time.Second, }) if err != nil { return nil, err } // 三,初始化RegistrarEtcd // 设置必要的属性 re := &RegistrarEtcd{ cli: cli, leaseTTL: leaseTTL, } // 返回对象 return re, nil } // 实现接口方法 func (re *RegistrarEtcd) Register(service Service) error { //一,租约的申请 lease grant grantResp, err := re.cli.Grant(context.Background(), re.leaseTTL) if err != nil { return err } //grantResp.ID 就是租约ID re.leaseID = grantResp.ID //二,将服务标识:服务地址 put到etcd中,同时绑定租约 key := service.Name() + "-" + uuid.New().String() _, err = re.cli.Put(context.Background(), key, service.Addr(), clientv3.WithLease(re.leaseID)) if err != nil { return err } //三,租约续约 re.leaseKeepAliveRespCh, err = re.cli.KeepAlive(context.Background(), re.leaseID) if err != nil { return err } // 并发的接收续约的响应 go re.HandleKeepAliveResp() // log log.Printf("service %s(%s) was registed \n", service.Name(), service.Addr()) return nil } func (re *RegistrarEtcd) HandleKeepAliveResp() { // 从ch中接收内容即可 for resp := range re.leaseKeepAliveRespCh { log.Printf("service was Keepalive with %x\n", resp.ID) } } // 注销 // 撤销租约 func (re *RegistrarEtcd) DeRegister() error { // lease revoke if _, err := re.cli.Revoke(context.Background(), re.leaseID); err != nil { return err } // close etcd client if err := re.cli.Close(); err != nil { return err } return nil }