在本章节中,我们将之前介绍HTTP Client&Server的示例修改为GRPC微服务,并演示如何使用GoFrame框架开发一个简单的GRPC服务端和客户端,并且为GRPC微服务增加链路跟踪特性。

本章节的示例代码位于:https://github.com/gogf/gf/tree/master/example/trace/grpc_with_db

目录结构

Protobuf

syntax = "proto3";

package user;

option go_package = "protobuf/user";

// User service for tracing demo.
service User {
  rpc Insert(InsertReq) returns (InsertRes) {}
  rpc Query(QueryReq) returns (QueryRes) {}
  rpc Delete(DeleteReq) returns (DeleteRes) {}
}

message InsertReq {
  string Name = 1; // v: required#Please input user name.
}
message InsertRes {
  int32 Id = 1;
}

message QueryReq {
  int32 Id = 1; // v: min:1#User id is required for querying.
}
message QueryRes {
  int32  Id = 1;
  string Name = 2;
}

message DeleteReq {
  int32 Id = 1; // v:min:1#User id is required for deleting.
}
message DeleteRes {}

使用gf gen pb命令编译该proto文件,将会生成对应的grpc接口文件和数据结构文件。

GRPC Server

package main

import (
	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
	_ "github.com/gogf/gf/contrib/nosql/redis/v2"
	"github.com/gogf/gf/contrib/registry/etcd/v2"
	"github.com/gogf/gf/example/trace/grpc_with_db/protobuf/user"

	"context"
	"fmt"
	"time"

	"github.com/gogf/gf/contrib/rpc/grpcx/v2"
	"github.com/gogf/gf/contrib/trace/otlpgrpc/v2"
	"github.com/gogf/gf/v2/database/gdb"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/os/gcache"
	"github.com/gogf/gf/v2/os/gctx"
)

type Controller struct {
	user.UnimplementedUserServer
}

const ( 
    serviceName = "otlp-grpc-server"
	endpoint    = "tracing-analysis-dc-bj.aliyuncs.com:8090"
	traceToken  = "******_******" 
)

func main() {
	grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))

	var ctx = gctx.New()
    shutdown, err := otlpgrpc.Init(serviceName, endpoint, traceToken)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown()

    // Set ORM cache adapter with redis.
	g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))

	s := grpcx.Server.New()
	user.RegisterUserServer(s.Server, &Controller{})
	s.Run()
}

// Insert is a route handler for inserting user info into database.
func (s *Controller) Insert(ctx context.Context, req *user.InsertReq) (res *user.InsertRes, err error) {
	result, err := g.Model("user").Ctx(ctx).Insert(g.Map{
		"name": req.Name,
	})
	if err != nil {
		return nil, err
	}
	id, _ := result.LastInsertId()
	res = &user.InsertRes{
		Id: int32(id),
	}
	return
}

// Query is a route handler for querying user info. It firstly retrieves the info from redis,
// if there's nothing in the redis, it then does db select.
func (s *Controller) Query(ctx context.Context, req *user.QueryReq) (res *user.QueryRes, err error) {
	err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
		Duration: 5 * time.Second,
		Name:     s.userCacheKey(req.Id),
		Force:    false,
	}).WherePri(req.Id).Scan(&res)
	if err != nil {
		return nil, err
	}
	return
}

// Delete is a route handler for deleting specified user info.
func (s *Controller) Delete(ctx context.Context, req *user.DeleteReq) (res *user.DeleteRes, err error) {
	err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
		Duration: -1,
		Name:     s.userCacheKey(req.Id),
		Force:    false,
	}).WherePri(req.Id).Scan(&res)
	return
}

func (s *Controller) userCacheKey(id int32) string {
	return fmt.Sprintf(`userInfo:%d`, id)
}

服务端代码简要说明:

1、首先,服务端需要通过jaeger.Init方法初始化Jaeger

2、可以看到,业务逻辑和之前HTTP示例项目完全一致,只是接入层修改为了GRPC协议。

3、我们仍然通过缓存适配器的方式注入Redis缓存:

g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))

5、这里也是通过Cache方法启用ORM的缓存特性,之前已经做过介绍,这里不再赘述。

GRPC Client

package main

import (
	"github.com/gogf/gf/contrib/registry/etcd/v2"
	"github.com/gogf/gf/contrib/rpc/grpcx/v2"
	"github.com/gogf/gf/contrib/trace/otlpgrpc/v2"
	"github.com/gogf/gf/example/trace/grpc_with_db/protobuf/user"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/gtrace"
	"github.com/gogf/gf/v2/os/gctx"
)

const (
 	serviceName = "otlp-grpc-client"
	endpoint    = "tracing-analysis-dc-bj.aliyuncs.com:8090"
	traceToken  = "******_******" 
)

func main() {
	grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))

	var ctx = gctx.New()
    shutdown, err := otlpgrpc.Init(serviceName, endpoint, traceToken)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown()

    StartRequests()
}

func StartRequests() {
	ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
	defer span.End()

	client := user.NewUserClient(grpcx.Client.MustNewGrpcClientConn("demo"))

	// Baggage.
	ctx = gtrace.SetBaggageValue(ctx, "uid", 100)

	// Insert.
	insertRes, err := client.Insert(ctx, &user.InsertReq{
		Name: "john",
	})
	if err != nil {
		g.Log().Fatalf(ctx, `%+v`, err)
	}
	g.Log().Info(ctx, "insert id:", insertRes.Id)

	// Query.
	queryRes, err := client.Query(ctx, &user.QueryReq{
		Id: insertRes.Id,
	})
	if err != nil {
		g.Log().Errorf(ctx, `%+v`, err)
		return
	}
	g.Log().Info(ctx, "query result:", queryRes)

	// Delete.
	_, err = client.Delete(ctx, &user.DeleteReq{
		Id: insertRes.Id,
	})
	if err != nil {
		g.Log().Errorf(ctx, `%+v`, err)
		return
	}
	g.Log().Info(ctx, "delete id:", insertRes.Id)

	// Delete with error.
	_, err = client.Delete(ctx, &user.DeleteReq{
		Id: -1,
	})
	if err != nil {
		g.Log().Errorf(ctx, `%+v`, err)
		return
	}
	g.Log().Info(ctx, "delete id:", -1)
}

客户端代码简要说明:

1、首先,客户端也是需要通过jaeger.Init方法初始化Jaeger

2、客户端非常简单,内部初始化以及默认拦截器的设置已经由Katyusha框架封装好了,开发者只需要关心业务逻辑实现即可,

效果查看

启动服务端:

启动客户端:

这里客户端的执行最后报了一个错误,那是我们故意为之,目的是演示GRPC报错时的链路信息展示。我们打开jaeger查看一下链路跟踪信息:

可以看到本次请求涉及到两个服务:tracing-grpc-clienttracing-grpc-server,即客户端和服务端。整个请求链路涉及到17span,客户端5span,服务端12span,并且产生了2个错误。我们点击查看详情:

我们点击查看一下最后接口调用错误的span情况:

看起来像个参数校验错误,点击查看Events/Logs中的请求参数:

查看Process中的Log信息可以看到,是由于传递的参数为-1,不满足校验规则,因此在数据校验的时候报错返回了。

GRPC Client

由于ormredislogging组件在之前的章节中已经介绍过链路信息,因此我们这里主要介绍GRPC Client&Server的链路信息。

Attributes

Attribute/Tag说明
net.peer.ip请求的目标IP。
net.peer.port请求的目标端口。
rpc.grpc.status_codeGRPC的内部状态码,0表示成功,非0表示失败。
rpc.serviceRPC的服务名称,注意这里是RPC而不是GRPC,因为这里是通用定义,客户端支持多种RPC通信协议,GRPC只是其中一种。
rpc.methodRPC的方法名称。
rpc.systemRPC协议类型,如:grpc, thrift等。

Events/Logs

Event/Log说明
grpc.metadata.outgoingGRPC客户端请求提交的Metadata信息,可能会比较大。
grpc.request.baggageGRPC客户端请求提交的Baggage信息,用于服务间链路信息传递。
grpc.request.message

GRPC客户端请求提交的Message数据,可能会比较大,最大只记录512KB,如果超过该大小则忽略。仅对Unary请求类型有效。

grpc.response.messageGRPC客户端请求接收返回的的Message信息,可能会比较大。仅对Unary请求类型有效。

GRPC Server

Attributes

GRPC Server端的Attributes含义同GRPC Client,在同一请求中,打印的数据基本一致。

Events

GRPC Server端的EventsGRPC Client不同的是,在同一请求中,服务端接收到的metadatagrpc.metadata.incoming,其他同GRPC Client





Content Menu

  • No labels

10 Comments

  1. Katyusha微服务框架何时开放体验?

  2. neo

    server启动报错,需要别的依赖吗?

    2023-01-09 22:30:21.230 [DEBU] set default registry using etcd service, address: 127.0.0.1:2379
    2023-01-09 22:30:21.233 [DEBU] service register: &{Head: Deployment: Namespace: Name:demo Version: Endpoints:192.168.10.216:8805 Metadata:map[protocol:grpc]}
    {"level":"warn","ts":"2023-01-09T22:30:26.242+0800","logger":"etcd-client","caller":"v3@v3.5.4/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc00052e8c0/127.0.0.1:2379","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = latest balancer error: last connection error: connection error: desc = \"transport: Error while dialing dial tcp 127.0.0.1:2379: connectex: No connection could be made because the target machine actively refused it.\""}
    2023-01-09 22:30:26.242 [FATA] etcd grant failed with keepalive ttl "10s": context deadline exceeded
    1. etcd grant failed with keepalive ttl "10s"
       1).  github.com/gogf/gf/contrib/registry/etcd/v2.(*Registry).Register
            D:/code/demo/gf-master/contrib/registry/etcd/etcd_registrar.go:22
       2).  github.com/gogf/gf/v2/net/gsvc.Register
            D:/code/demo/gf-master/net/gsvc/gsvc_registry.go:24
       3).  main.main
            D:/code/demo/gf-master/example/trace/grpc_with_db/server/main.go:40
    2. context deadline exceeded


    郭强 魏金虎 智刚  大佬们有没有空帮看下?

    1. etcd 服务没有启动

      1. neo

        现在了解了,不太明白demo中为啥要加入ectd,直接grpc 客户端连接grpc 服务端不是更直接明了,不依赖etcd就可以跑起来

        1. 用ECTD可以多个相同服务,直接用IP连只能连一个。简单来说可以负载

  3. 请教一个问题,数据类型是整数的时候。

    dao 层生成的是 uint 类型。protobuf 生成的是 uint32。不能统一起来,这个大家有遇到吗

    1. 可以用cli的dao配置typemapping.目前还没有发版需要自己clone master编译一个

    2. 因为grpc对类型的长度比较严格,所以生成的数据结构类型会不太一样,有两个方案解决:一个是统一使用固定长度类型如uint32,通过后续版本提供的TypeMapping特性在工具中配置,一种是在使用时手动将uint转换为uint32赋值给grpc的数据结构。

  4. lin

    在实例的仓库代码执行make pb 直接报错, 代码都没改过

    /tmp/gf-demo-grpc on  main! ⌚ 18:30:05
    $ make pb
    /opt/homebrew/bin/protoc --proto_path=/private/tmp/gf-demo-grpc/manifest/protobuf --go_out=paths=source_relative:/tmp/gf-demo-grpc/api --go-grpc_out=paths=source_relative:/tmp/gf-demo-grpc/api /tmp/gf-demo-grpc/manifest/protobuf/pb-test/aaa/aaa.proto
    /tmp/gf-demo-grpc/manifest/protobuf/pb-test/aaa/aaa.proto: File does not reside within any path specified using --proto_path (or -I).  You must specify a --proto_path which encompasses this file.  Note that the proto_path must be an exact prefix of the .proto file names -- protoc is too dumb to figure out when two paths (e.g. absolute and relative) are equivalent (it's harder than you think).
    exit status 1
    make: *** [pb] Error 1
    
    /tmp/gf-demo-grpc on  main! ⌚ 18:30:59
    $ cat manifest/protobuf/user/v1/user.proto
    // protoc --go_out=plugins=grpc:. *.proto
    
    syntax = "proto3";
    
    package user;
    
    option go_package = "github.com/gogf/gf-demo-grpc/api/user/v1";
    
    import "pbentity/user.proto";
    
    service User{
        rpc Create(CreateReq) returns (CreateRes) {}
        rpc GetOne(GetOneReq) returns (GetOneRes) {}
        rpc GetList(GetListReq) returns (GetListRes) {}
        rpc Delete(DeleteReq) returns (DeleteRes) {}
    }
    
    message CreateReq {
        string Passport = 1; // v: required
        string Password = 2; // v: required
        string Nickname = 3; // v: required
    }
    message CreateRes {}
    
    message GetOneReq {
        uint64 Id = 1; // v: required
    }
    message GetOneRes {
        pbentity.User User = 1;
    }
    
    message GetListReq {
        int32 Page = 1;
        int32 Size = 2;
    }
    message GetListRes {
        repeated pbentity.User Users = 1;
    }
    
    message DeleteReq {
        // v: min:1#
        // v: Please select the user to be deleted.
        uint64 Id = 1;
    }
    message DeleteRes {}
    1. lin

      郭强 大佬能不能看看怎么破,是bug还是我操作有问题