Trustkeeper 技术分享:Gokit gRPC Consul 实现微服务注册发现
Trustkeeper使用 Gokit 实现微服务化。在开发环境中我们使用 docker compose 管理服务。本篇文章讲分享使用 gokit 开发 Golang 微服务过程中 grpc 往 consul 数据中心注册和发现服务。
依赖 Docker 环境,Consul 单机集群架设请参考 wenweih/docker-vault-consul-cockroach-redis-example
系统中的微服务采用客户端发现模式 (Pattern: client-side service discovery),consul 作为服务注册发现组件。Gokit 内置支持 etcd, consul 和 zookeeper 。
gRPC 服务注册
consul 支持多种网络传输协议进行服务健康检查,如果你的服务是 http transport 暴露给外部,那么在 consul AgentServiceCheck 需选用 HPPT,例子。如果你使用 gRPC ,AgentServiceCheck 就要写 GRPC,例子。
首先把服务注册函数抽象出来:
package consul
import (
"os"
"fmt"
"time"
consulsd "github.com/go-kit/kit/sd/consul"
"github.com/hashicorp/consul/api"
"github.com/go-kit/kit/log"
)
type ConsulRegister struct {
ConsulAddress string // consul address
ServiceName string // service name
ServiceIP string
Tags []string // consul tags
ServicePort int //service port
DeregisterCriticalServiceAfter time.Duration
Interval time.Duration
}
func NewConsulRegister(consulAddress, serviceName, serviceIP string, servicePort int, tags []string ) *ConsulRegister {
return &ConsulRegister {
ConsulAddress: consulAddress,
ServiceName: serviceName,
ServiceIP: serviceIP,
Tags: tags,
ServicePort: servicePort,
DeregisterCriticalServiceAfter: time.Duration(1) * time.Minute,
Interval: time.Duration(10) * time.Second,
}
}
// https://github.com/ru-rocker/gokit-playground/blob/master/lorem-consul/register.go
// https://github.com/hatlonely/hellogolang/blob/master/sample/addservice/internal/grpcsr/consul_register.go
func (r *ConsulRegister) NewConsulGRPCRegister() (*consulsd.Registrar, error) {
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
consulConfig := api.DefaultConfig()
consulConfig.Address = r.ConsulAddress
consulClient, err := api.NewClient(consulConfig)
if err != nil {
return nil, err
}
client := consulsd.NewClient(consulClient)
reg := &api.AgentServiceRegistration{
ID: fmt.Sprintf("%v-%v-%v", r.ServiceName, r.ServiceIP, r.ServicePort),
Name: fmt.Sprintf("grpc.health.v1.%v", r.ServiceName),
Tags: r.Tags,
Port: r.ServicePort,
Address: r.ServiceIP,
Check: &api.AgentServiceCheck{
// 健康检查间隔
Interval: r.Interval.String(),
//grpc 支持,执行健康检查的地址,service 会传到 Health.Check 函数中
GRPC: fmt.Sprintf("%v:%v/%v", r.ServiceIP, r.ServicePort, r.ServiceName),
// 注销时间,相当于过期时间
DeregisterCriticalServiceAfter: r.DeregisterCriticalServiceAfter.String(),
},
}
return consulsd.NewRegistrar(client, reg, logger), nil
}
Consul 对 gokit gRPC 服务健康检查
根据 GRPC 健康检查协议:gRPC golang 官方提供的 grpc_health_v1 中 HealthServer interface 有两个函数:
Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
Watch(*HealthCheckRequest, Health_WatchServer) error
同样 Consul 对 grpc 做服务健康检查时也遵循该协议,我们在使用 gokit grpc 时,service 层要实现 HealthServer
接口中的声明两个函数。
// HealthImpl grpc 健康检查
// https://studygolang.com/articles/18737
type HealthImpl struct{}
// Check 实现健康检查接口,这里直接返回健康状态,这里也可以有更复杂的健康检查策略,比如根据服务器负载来返回
// https://github.com/hashicorp/consul/blob/master/agent/checks/grpc.go
// consul 检查服务器的健康状态,consul 用 google.golang.org/grpc/health/grpc_health_v1.HealthServer 接口,实现了对 grpc健康检查的支持,所以我们只需要实现先这个接口,consul 就能利用这个接口作健康检查了
func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
// Watch HealthServer interface 有两个方法
// Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
// Watch(*HealthCheckRequest, Health_WatchServer) error
// 所以在 HealthImpl 结构提不仅要实现 Check 方法,还要实现 Watch 方法
func (h *HealthImpl) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error {
return nil
}
Gokit Consul gRPC 微服务注册
func initGRPCHandler(endpoints endpoint.Endpoints, g *group.Group) {
options := defaultGRPCOptions(logger, tracer)
// Add your GRPC options here
grpcServer := grpc.NewGRPCServer(endpoints, options)
// call grpc listen should not use Loopback address
// https://stackoverflow.com/questions/43911793/cannot-connect-to-go-grpc-server-running-in-local-docker-container
grpcListener, err := net.Listen("tcp", common.LocalIP() + ":0")
if err != nil {
logger.Log("transport", "gRPC", "during", "Listen", "err", err)
os.Exit(1)
}
port := grpcListener.Addr().(*net.TCPAddr).Port
consulReg := consul.NewConsulRegister(conf.ConsulAddress, common.WalletManagementSrv, common.LocalIP(), port, []string{"wallet"})
register, err := consulReg.NewConsulGRPCRegister()
if err != nil {
logger.Log("Get consul grpc register error: ", err.Error())
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "gRPC", "addr", grpcListener.Addr().String())
baseServer := grpc1.NewServer()
pb.RegisterWalletManagementServer(baseServer, grpcServer)
grpc_health_v1.RegisterHealthServer(baseServer, &service.HealthImpl{})
register.Register()
return baseServer.Serve(grpcListener)
}, func(error) {
register.Deregister()
grpcListener.Close()
})
}
在这里需要注意的是起 grpc listen 时不要用 [::1]:0 ,ip 要用机器的 Loopback IP ,在 Docker 跑你就会遇到这个问题。另外有一点是 consul 做服务健康检查时如果是 localhost,会直接 dial 127.0.0.1 地址。我们对同一个服务起两个实例,你会在 consul 服务注册状态:
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] parsed scheme: ""
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] scheme "" not registered, fallback to default scheme
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] ccResolverWrapper: sending new addresses to cc: [{192.168.124.5:50495 0 <nil>}]
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] ClientConn switching balancer to "pick_first"
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] pickfirstBalancer: HandleSubConnStateChange: 0xc0003b7e40, CONNECTING
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] pickfirstBalancer: HandleSubConnStateChange: 0xc0003b7e40, READY
tk_consul_server_bootstrap | 2019/06/15 10:01:27 [INFO] agent: Synced check "service:/services/walletmanagement/-192.168.124.5-50495"
...
通过 consul 的 Web UI 可以看到我们的两个服务注册成功了:
Gokit 服务发现和负载均衡
微服务集群灵活横向扩展,客户端调用时需要服务发现和通过负载均衡分发请求到微服务的实例。通常有两种方式:
- 在客户端和服务端之间增加一层代理(ngjix grpc 获 traefik grpc)客户端请求代理,然后请求被转发到服务端。
- 服务主动注册到 key/value (etcd, consul 或 zk) 服务,客户端通过数据中心拿到服务端连接信息,然后直连服务端。
回到 gikit ,我提倡每个独立的微服务都封装服务对外的客户端。
// https://github.com/go-kit/kit/blob/master/examples/apigateway/main.go
// https://github.com/go-kit/kit/blob/master/examples/profilesvc/client/client.go
// https://github.com/go-kit/kit/blob/e2d71a06a40aa95cb82ccd72e854893612c02db7/sd/consul/integration_test.go
func New(consulAddr string, logger log.Logger) (service.WalletManagementService, error) {
var (
consulTags = []string{"wallet"}
passingOnly = true
retryMax = 3
retryTimeout = 500 * time.Millisecond
)
consulClient, err := libconsule.NewClient(consulAddr)
if err != nil {
return nil, err
}
var (
sdClient = sdconsul.NewClient(consulClient)
registerSrvName = fmt.Sprintf("grpc.health.v1.%v", common.WalletManagementSrv)
instancer = sdconsul.NewInstancer(sdClient, logger, registerSrvName, consulTags, passingOnly)
endpoints = walletmanagementEndpoint.Endpoints{}
)
{
factory := factoryFor(walletmanagementEndpoint.MakeCreateChainEndpoint)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.CreateChainEndpoint = retry
}
return endpoints, nil
}
func factoryFor(makeEndpoint func(service.WalletManagementService) endpoint.Endpoint) sd.Factory {
return func(instance string) (endpoint.Endpoint, io.Closer, error) {
conn, err := grpc.Dial(instance, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
srv, err := newGRPCClient(conn, []grpctransport.ClientOption{})
if err != nil {
return nil, nil, err
}
endpoints := makeEndpoint(srv)
return endpoints, conn, err
}
}
...
func newGRPCClient(conn *grpc.ClientConn, options []grpc1.ClientOption) (service.WalletManagementService, error) {
var createChainEndpoint endpoint.Endpoint
{
createChainEndpoint = grpc1.NewClient(conn, "pb.WalletManagement", "CreateChain", encodeCreateChainRequest, decodeCreateChainResponse, pb.CreateChainReply{}, options...).Endpoint()
}
return endpoint1.Endpoints{CreateChainEndpoint: createChainEndpoint}, nil
}
// encodeCreateChainRequest is a transport/grpc.EncodeRequestFunc that converts a
// user-domain CreateChain request to a gRPC request.
func encodeCreateChainRequest(_ context.Context, request interface{}) (interface{}, error) {
r, ok := request.(endpoint1.CreateChainRequest)
if !ok {
return nil, fmt.Errorf("request interface to endpoint.CreateChainRequest type assertion error")
}
return &pb.CreateChainRequest{
Symbol: r.Symbol,
Bitid: r.Bit44ID,
Status: r.Status}, nil
}
// decodeCreateChainResponse is a transport/grpc.DecodeResponseFunc that converts
// a gRPC concat reply to a user-domain concat response.
func decodeCreateChainResponse(_ context.Context, reply interface{}) (interface{}, error) {
_, ok := reply.(*pb.CreateChainReply)
if !ok{
e := fmt.Errorf("pb CreateChainReply type assertion error")
return &endpoint1.CreateChainResponse{Err: e}, e
}
return &endpoint1.CreateChainResponse{}, nil
}
封装好了之后,客户端增加少许代码就能调用 grpc 服务端,调用代码片段:
package main
import (
"os"
"context"
log "github.com/go-kit/kit/log"
"github.com/caarlos0/env"
"trustkeeper-go/app/service/wallet_management/client"
)
type config struct {
ConsulAddr string `env:"consuladdr"`
}
func main() {
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
cfg := config{}
if err := env.Parse(&cfg); err != nil {
logger.Log("fail to parse env: ", err.Error())
os.Exit(1)
}
s, err := client.New(cfg.ConsulAddr, logger)
if err != nil {
logger.Log("service client error: ", err.Error())
}
ctx := context.Background()
for _, str := range []string{"aa", "bb", "ccc", "ee", "dd", "ff", "gg", "hh"} {
if err := s.CreateChain(ctx, str, "BIP44", true); err != nil {
logger.Log("CreateChain BTC error: ", err.Error())
}
}
}
多次请求,通过负载均衡把请求分发到了不同的实例,请看截图: