service discovery

main
han-joker 2 years ago
parent 849ec4a2e7
commit 85229f5342

@ -0,0 +1,62 @@
package discovery
import (
"context"
"errors"
clientv3 "go.etcd.io/etcd/client/v3"
"math/rand"
"time"
)
type Discovery interface {
// 获取某个服务器的一个实例地址
GetServiceAddr(serviceName string) (string, error)
// 监控某个服务的地址变化
WatchService(serviceName string) error
}
type DiscoveryEtcd struct {
cli *clientv3.Client
}
// 实例化的方法
func NewDiscoveryEtcd(endpoints []string) (*DiscoveryEtcd, error) {
// 一没有etcd的地址直接返回
if len(endpoints) == 0 {
return nil, errors.New("endpoints is empty")
}
// 二连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 3 * time.Second,
})
if err != nil {
return nil, err
}
// 实例化
de := &DiscoveryEtcd{cli: cli}
return de, nil
}
func (de *DiscoveryEtcd) GetServiceAddr(serviceName string) (string, error) {
// 执行 Get 操作
getResp, err := de.cli.Get(context.Background(), serviceName, clientv3.WithPrefix())
if err != nil {
return "", err
}
// 处理查询到的结果
if len(getResp.Kvs) == 0 {
return "", errors.New("service not found")
}
// 采用随机的LBLoadBalance 算法
randIndex := rand.Intn(len(getResp.Kvs)) // [0, n)
addr := string(getResp.Kvs[randIndex].Value)
// 返回地址
return addr, nil
}

@ -15,7 +15,7 @@ type Registrar interface {
Register(service Service) error
// 注销服务
DeRegister(service Service) error
DeRegister() error
}
// 基于Etcd的服务发现中间件实现Registrar
@ -25,6 +25,8 @@ type RegistrarEtcd struct {
// 租约信息,基于租约来做健康监测
leaseID clientv3.LeaseID
leaseTTL int64
// 续约响应channel
leaseKeepAliveRespCh <-chan *clientv3.LeaseKeepAliveResponse
}
const leaseTTL = 5
@ -75,17 +77,35 @@ func (re *RegistrarEtcd) Register(service Service) error {
}
//三,租约续约
_, err = re.cli.KeepAlive(context.Background(), re.leaseID)
re.leaseKeepAliveRespCh, err = re.cli.KeepAlive(context.Background(), re.leaseID)
if err != nil {
return err
}
// 并发的接收续约的响应
go re.HandleKeepAliveResp()
// log
log.Printf("service %s(%s) was registed \n", service.Name(), service.Addr())
return nil
}
func (re *RegistrarEtcd) HandleKeepAliveResp() {
// 从ch中接收内容即可
for resp := range re.leaseKeepAliveRespCh {
log.Printf("service was Keepalive with %x\n", resp.ID)
}
}
// 注销
// 撤销租约
func (re *RegistrarEtcd) DeRegister() error {
// lease revoke
if _, err := re.cli.Revoke(context.Background(), re.leaseID); err != nil {
return err
}
// close etcd client
if err := re.cli.Close(); err != nil {
return err
}
func (re *RegistrarEtcd) DeRegister(service Service) error {
return nil
}

@ -1,6 +1,10 @@
package discovery
import "log"
import (
"log"
"os"
"os/signal"
)
// 服务的类型接口
type Service interface {
@ -42,5 +46,35 @@ func ServiceOrder(addr string) {
// 四:阻塞执行
log.Printf("service %s(%s) is running", orderService.Name(), orderService.Addr())
select {}
// 监控系统的终止信号,来撤销租约
chInt := make(chan os.Signal, 1)
signal.Notify(chInt, os.Interrupt) // 监控终止ctrl+c(interrupt)的信号
// 多路复用channel监控语句
select {
case <-chInt:
if err := re.DeRegister(); err != nil {
log.Fatalln(err)
}
log.Printf("service %s(%s) was deregisted", orderService.Name(), orderService.Addr())
}
}
func ServiceDriver() {
// 初始化服务发现中间件
de, err := NewDiscoveryEtcd([]string{"localhost:2379"})
if err != nil {
log.Fatalln(err)
}
// 发现order服务
servcieName := "order"
addr, err := de.GetServiceAddr(servcieName)
if err != nil {
log.Fatalln(err)
}
log.Printf("servie %s was discoveried on %s\n", servcieName, addr)
// 连接到目标服务在微服务架构中是grpc
// 调用(请求)目标服务的资源
}

@ -2,12 +2,16 @@ package discovery
import "testing"
func TestServiceOrder1(t *testing.T) {
func TestServiceOrder0(t *testing.T) {
ServiceOrder("localhost:8080")
}
func TestServiceOrder2(t *testing.T) {
func TestServiceOrder1(t *testing.T) {
ServiceOrder("localhost:8081")
}
func TestServiceOrder3(t *testing.T) {
func TestServiceOrder2(t *testing.T) {
ServiceOrder("localhost:8082")
}
func TestServiceDriver(t *testing.T) {
ServiceDriver()
}

Loading…
Cancel
Save