|
|
package discovery
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"errors"
|
|
|
"github.com/google/uuid"
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
|
"log"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
// 服务注册的通用接口
|
|
|
type Registrar interface {
|
|
|
// 注册服务
|
|
|
Register(service Service) error
|
|
|
|
|
|
// 注销服务
|
|
|
DeRegister() error
|
|
|
}
|
|
|
|
|
|
// 基于Etcd的服务发现中间件,实现Registrar
|
|
|
type RegistrarEtcd struct {
|
|
|
// 客户端信息
|
|
|
cli *clientv3.Client
|
|
|
// 租约信息,基于租约来做健康监测
|
|
|
leaseID clientv3.LeaseID
|
|
|
leaseTTL int64
|
|
|
// 续约响应channel
|
|
|
leaseKeepAliveRespCh <-chan *clientv3.LeaseKeepAliveResponse
|
|
|
}
|
|
|
|
|
|
const leaseTTL = 5
|
|
|
|
|
|
// 初始化方法
|
|
|
// 实例化一个初始化了必要字段的数据
|
|
|
func NewRegistrarEtcd(endpoints []string) (*RegistrarEtcd, error) {
|
|
|
// 一,没有etcd的地址,直接返回
|
|
|
if len(endpoints) == 0 {
|
|
|
return nil, errors.New("endpoints is empty")
|
|
|
}
|
|
|
|
|
|
// 二,连接etcd
|
|
|
cli, err := clientv3.New(clientv3.Config{
|
|
|
Endpoints: endpoints,
|
|
|
DialTimeout: 3 * time.Second,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
// 三,初始化RegistrarEtcd
|
|
|
// 设置必要的属性
|
|
|
re := &RegistrarEtcd{
|
|
|
cli: cli,
|
|
|
leaseTTL: leaseTTL,
|
|
|
}
|
|
|
|
|
|
// 返回对象
|
|
|
return re, nil
|
|
|
}
|
|
|
|
|
|
// 实现接口方法
|
|
|
func (re *RegistrarEtcd) Register(service Service) error {
|
|
|
//一,租约的申请 lease grant
|
|
|
grantResp, err := re.cli.Grant(context.Background(), re.leaseTTL)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//grantResp.ID 就是租约ID
|
|
|
re.leaseID = grantResp.ID
|
|
|
|
|
|
//二,将服务标识:服务地址 put到etcd中,同时绑定租约
|
|
|
key := service.Name() + "-" + uuid.New().String()
|
|
|
_, err = re.cli.Put(context.Background(), key, service.Addr(), clientv3.WithLease(re.leaseID))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
//三,租约续约
|
|
|
re.leaseKeepAliveRespCh, err = re.cli.KeepAlive(context.Background(), re.leaseID)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
// 并发的接收续约的响应
|
|
|
go re.HandleKeepAliveResp()
|
|
|
// log
|
|
|
log.Printf("service %s(%s) was registed \n", service.Name(), service.Addr())
|
|
|
return nil
|
|
|
}
|
|
|
func (re *RegistrarEtcd) HandleKeepAliveResp() {
|
|
|
// 从ch中接收内容即可
|
|
|
for resp := range re.leaseKeepAliveRespCh {
|
|
|
log.Printf("service was Keepalive with %x\n", resp.ID)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 注销
|
|
|
// 撤销租约
|
|
|
func (re *RegistrarEtcd) DeRegister() error {
|
|
|
// lease revoke
|
|
|
if _, err := re.cli.Revoke(context.Background(), re.leaseID); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
// close etcd client
|
|
|
if err := re.cli.Close(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|