Consul 服务注册和发现

安装 Consul

安装 consul-server 和 consul-client

1
2
3
git clone https://github.com/hashicorp/learn-consul-docker.git
cd learn-consul-docker/datacenter-deploy-service-discovery
docker-compose up -d

访问 http://127.0.0.1:8500 即可看到界面

Consul 注册,注销

完整代码示例:https://github.com/rexyan/Go-Microservice/tree/main/consul

新建 consul.go 文件,新增 NewConsul,RegisterServer,Deregister,GetOutboundIP 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"fmt"
"github.com/hashicorp/consul/api"
"net"
)

type consul struct {
client *api.Client
}

// RegisterServer 服务注册
func (c *consul) RegisterServer(serverName string, ip string, port int) error {
// 健康检查
healthCheck := &api.AgentServiceCheck{
GRPC: fmt.Sprintf("%s:%d", ip, port), // 这里一定是外部可以访问的地址
Timeout: "10s", // 超时时间
Interval: "10s", // 运行检查的频率
// 指定时间后自动注销不健康的服务节点
// 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。
DeregisterCriticalServiceAfter: "1m",
}

srv := &api.AgentServiceRegistration{
ID: fmt.Sprintf("%s-%s-%d", serverName, ip, port), // 服务唯一ID
Name: serverName, // 服务名称
Tags: []string{"goland-grpc"}, // 为服务打标签
Address: ip,
Port: port,
Check: healthCheck,
}
return c.client.Agent().ServiceRegister(srv)
}

// Deregister 服务注销
func (c *consul) Deregister(serviceID string) error {
return c.client.Agent().ServiceDeregister(serviceID)
}

// NewConsul 获取 Consul 实例
func NewConsul(addr string) (*consul, error) {
config := api.DefaultConfig()
config.Address = addr
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
return &consul{client: client}, nil
}

//GetOutboundIP 获取出口 IP
func GetOutboundIP() net.IP {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return nil
}
defer conn.Close()

addr := conn.LocalAddr().(*net.UDPAddr)
return addr.IP
}

服务端实现

服务端在以前的基础上,需新增 grpc 健康检查,并将提供服务的地址注册到 consul,最后如果检测到 syscall.SIGTERM 或者 syscall.SIGINT 操作则在 Consul 注销注册的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"net"
"os"
"os/signal"
"server/pb"
"syscall"
)

type server struct {
pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
return &pb.HelloResponse{Reply: "Hi: " + in.Name}, nil
}

func main() {
// 创建 tcp
listen, err := net.Listen("tcp", ":8888")
if err != nil {
return
}
// 创建 grpc 服务
s := grpc.NewServer()
// 注册服务
pb.RegisterGreeterServer(s, &server{})
// 注册 grpc 健康检查
healthcheck := health.NewServer()
healthpb.RegisterHealthServer(s, healthcheck)

// 注册到 consul
// 1. 获取 consul 实例
consul, err := NewConsul("127.0.0.1:8500")
if err != nil {
fmt.Printf("NewConsul Error: %v\n", err)
return
}
// 2. 注册服务
ip := GetOutboundIP()
serverId := "test-server"
err = consul.RegisterServer(serverId, ip.String(), 8888)
if err != nil {
fmt.Printf("RegisterServer Error: %v\n", err)
return
}
// 启动服务
go func() {
err := s.Serve(listen)
if err != nil {
fmt.Printf("gRPC Serve Error: %v\n", err)
return
}
}()
// 3. 销毁服务
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)
<-quit
err = consul.Deregister(serverId)
if err != nil {
fmt.Printf("Deregister Error: %v\n", err)
return
}
}

启动成功后,可以在 Consul 控台看到服务已经注册

客户端实现

客户端只需更改 grpc.Dial 地址即可, 使用时候需要提前导入 import _ "github.com/mbobakov/grpc-consul-resolver"

grpc.Dial时直接使用类似 consul://[user:password@]127.0.0.127:8555/my-service?[healthy=]&[wait=]&[near=]&[insecure=]&[limit=]&[tag=]&[token=]的连接字符串来指定连接目标。

目前支持的参数:

Name 格式 介绍
tag string 根据标签筛选
healthy true/false 只返回通过所有健康检查的端点。默认值:false
wait time.ParseDuration 监控变更的等待时间。在这个时间段内,端点将强制刷新。默认值:继承agent的配置
insecure true/false 允许与consul进行不安全的通信。默认值:true
near string 按响应持续时间对端点排序。可与“limit”参数有效结合。默认值:”_agent”
limit int 限制服务的端点数。默认值:无限制
timeout time.ParseDuration Http-client超时。默认值:60s
max-backoff time.ParseDuration 重新连接到consul的最大后退时间。重连从10ms开始,成倍增长,直到到max-backoff。默认值:1s
token string Consul token
dc string consul数据中心。可选
allow-stale true/false 允许agent返回过期读的结果
require-consistent true/false 强制读取完全一致。这比较昂贵,但可以防止执行过期读操作

客户端实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"client/pb"
"context"
"fmt"
_ "github.com/mbobakov/grpc-consul-resolver"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"time"
)

func main() {
// 连接 server 端
conn, err := grpc.Dial(
"consul://127.0.0.1:8500/test-server?wait=14s&health=true",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)

if err != nil {
fmt.Printf("grpc.Dial Error: %v\n", err)
return
}
defer conn.Close()

// 创建客户端
c := pb.NewGreeterClient(conn)

// 创建一个带超时时间的 ctx
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()

// 调用方法,返回值为 stream
response, err := c.SayHello(ctx, &pb.HelloRequest{Name: "张"})
if err != nil {
fmt.Printf("SayHello Error: %v\n", err)
return
}
fmt.Println(response.GetReply())
}