From 85229f534232697124b3afd1505aeb1495c3b4ed Mon Sep 17 00:00:00 2001 From: han-joker Date: Fri, 14 Apr 2023 21:25:56 +0800 Subject: [PATCH] service discovery --- discovery/discovery.go | 62 ++++++++++++++++++++++++++++++++++++++ discovery/register.go | 30 +++++++++++++++--- discovery/services.go | 38 +++++++++++++++++++++-- discovery/services_test.go | 10 ++++-- 4 files changed, 130 insertions(+), 10 deletions(-) create mode 100644 discovery/discovery.go diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 0000000..35bf02d --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,62 @@ +package discovery + +import ( + "context" + "errors" + clientv3 "go.etcd.io/etcd/client/v3" + "math/rand" + "time" +) + +type Discovery interface { + // 获取某个服务器的一个实例地址 + GetServiceAddr(serviceName string) (string, error) + // 监控某个服务的地址变化 + WatchService(serviceName string) error +} + +type DiscoveryEtcd struct { + cli *clientv3.Client +} + +// 实例化的方法 +func NewDiscoveryEtcd(endpoints []string) (*DiscoveryEtcd, error) { + // 一,没有etcd的地址,直接返回 + if len(endpoints) == 0 { + return nil, errors.New("endpoints is empty") + } + + // 二,连接etcd + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 3 * time.Second, + }) + if err != nil { + return nil, err + } + + // 实例化 + de := &DiscoveryEtcd{cli: cli} + + return de, nil +} + +func (de *DiscoveryEtcd) GetServiceAddr(serviceName string) (string, error) { + // 执行 Get 操作 + getResp, err := de.cli.Get(context.Background(), serviceName, clientv3.WithPrefix()) + if err != nil { + return "", err + } + + // 处理查询到的结果 + if len(getResp.Kvs) == 0 { + return "", errors.New("service not found") + } + + // 采用随机的LB,LoadBalance 算法 + randIndex := rand.Intn(len(getResp.Kvs)) // [0, n) + addr := string(getResp.Kvs[randIndex].Value) + + // 返回地址 + return addr, nil +} diff --git a/discovery/register.go b/discovery/register.go index b064d2d..00689ee 100644 --- a/discovery/register.go +++ b/discovery/register.go @@ -15,7 +15,7 @@ type Registrar interface { Register(service Service) error // 注销服务 - DeRegister(service Service) error + DeRegister() error } // 基于Etcd的服务发现中间件,实现Registrar @@ -25,6 +25,8 @@ type RegistrarEtcd struct { // 租约信息,基于租约来做健康监测 leaseID clientv3.LeaseID leaseTTL int64 + // 续约响应channel + leaseKeepAliveRespCh <-chan *clientv3.LeaseKeepAliveResponse } const leaseTTL = 5 @@ -75,17 +77,35 @@ func (re *RegistrarEtcd) Register(service Service) error { } //三,租约续约 - _, err = re.cli.KeepAlive(context.Background(), re.leaseID) + re.leaseKeepAliveRespCh, err = re.cli.KeepAlive(context.Background(), re.leaseID) if err != nil { return err } - + // 并发的接收续约的响应 + go re.HandleKeepAliveResp() // log log.Printf("service %s(%s) was registed \n", service.Name(), service.Addr()) - return nil } +func (re *RegistrarEtcd) HandleKeepAliveResp() { + // 从ch中接收内容即可 + for resp := range re.leaseKeepAliveRespCh { + log.Printf("service was Keepalive with %x\n", resp.ID) + } +} + +// 注销 +// 撤销租约 +func (re *RegistrarEtcd) DeRegister() error { + // lease revoke + if _, err := re.cli.Revoke(context.Background(), re.leaseID); err != nil { + return err + } + + // close etcd client + if err := re.cli.Close(); err != nil { + return err + } -func (re *RegistrarEtcd) DeRegister(service Service) error { return nil } diff --git a/discovery/services.go b/discovery/services.go index 6a343ae..02ab158 100644 --- a/discovery/services.go +++ b/discovery/services.go @@ -1,6 +1,10 @@ package discovery -import "log" +import ( + "log" + "os" + "os/signal" +) // 服务的类型接口 type Service interface { @@ -42,5 +46,35 @@ func ServiceOrder(addr string) { // 四:阻塞执行 log.Printf("service %s(%s) is running", orderService.Name(), orderService.Addr()) - select {} + // 监控系统的终止信号,来撤销租约 + chInt := make(chan os.Signal, 1) + signal.Notify(chInt, os.Interrupt) // 监控终止ctrl+c(interrupt)的信号 + // 多路复用channel监控语句 + select { + case <-chInt: + if err := re.DeRegister(); err != nil { + log.Fatalln(err) + } + log.Printf("service %s(%s) was deregisted", orderService.Name(), orderService.Addr()) + } +} + +func ServiceDriver() { + // 初始化服务发现中间件 + de, err := NewDiscoveryEtcd([]string{"localhost:2379"}) + if err != nil { + log.Fatalln(err) + } + + // 发现order服务 + servcieName := "order" + addr, err := de.GetServiceAddr(servcieName) + if err != nil { + log.Fatalln(err) + } + + log.Printf("servie %s was discoveried on %s\n", servcieName, addr) + + // 连接到目标服务,在微服务架构中,是grpc + // 调用(请求)目标服务的资源 } diff --git a/discovery/services_test.go b/discovery/services_test.go index da7501c..f76b87f 100644 --- a/discovery/services_test.go +++ b/discovery/services_test.go @@ -2,12 +2,16 @@ package discovery import "testing" -func TestServiceOrder1(t *testing.T) { +func TestServiceOrder0(t *testing.T) { ServiceOrder("localhost:8080") } -func TestServiceOrder2(t *testing.T) { +func TestServiceOrder1(t *testing.T) { ServiceOrder("localhost:8081") } -func TestServiceOrder3(t *testing.T) { +func TestServiceOrder2(t *testing.T) { ServiceOrder("localhost:8082") } + +func TestServiceDriver(t *testing.T) { + ServiceDriver() +}