gRPC中拦截器的使用详解

 更新时间:2023年10月08日 09:05:05   作者:童话ing  
这篇文章主要介绍了gRPC中拦截器的使用详解,本次主要介绍在gRPC中使用拦截器,包括一元拦截器和流式拦截器,在拦截器中添加JWT认证,客户端登录之后会获得token,请求特定的API时候需要带上token才能访问,需要的朋友可以参考下

前言

本次主要介绍在gRPC中使用拦截器,包括一元拦截器和流式拦截器,在拦截器中添加JWT认证,客户端登录之后会获得token,请求特定的API时候需要带上token才能访问。由于代码中我们使用了grpc-gateway提供http服务,因此需要安装gateway的一些依赖

在本文中就简单介绍一下拦截器的使用,RPC请求分为一元RPC请求和流式RPC请求,所谓一元RPC指的就是请求和响应都是一次完成的,gRPC是基于HTTP2.0的,因此,一元RPC就可以看成客户端请求一次,服务端就响应一次。而流式RPC则是像流一样多次进行响应或者请求传输的。

相应地,拦截器分为服务端拦截和客户端拦截器,根据功能的不同又分为一元拦截器和流式拦截器。

服务端拦截器

1、一元拦截器:UnaryInterceptor

源码中写得比较清楚了, UnaryServerInterceptor 提供了一个钩子来拦截服务器上一元RPC的执行。 info 参数包含了这个RPC拦截器能操作的所有信息, handler 是服务方法实现的一个包装器,用于供拦截器中调用来处理RPC请求的逻辑。

// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		if o.unaryInt != nil {
			panic("The unary server interceptor was already set and may not be reset.")
		}
		o.unaryInt = i
	})
}

主要看其中包含的参数 UnaryServerInterceptor :

// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler // is the wrapper of the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

其中几个参数解释为:

  • ctx context.Context:请求上下文
  • req interface{}:RPC 方法的请求参数
  • info *UnaryServerInfo:包含了RPC 方法的所有信息
  • handler UnaryHandler:RPC 方法真正执行逻辑

2、流式拦截器:StreamInterceptor

StreamServerInterceptor 提供了一个钩子来拦截服务器上流式RPC的执行。 info 包含拦截器可以操作的RPC的所有信息。 handler 是服务方法实现。拦截器负责调用 handler 来完成RPC。

// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		if o.streamInt != nil {
			panic("The stream server interceptor was already set and may not be reset.")
		}
		o.streamInt = i
	})
}

StreamServerInterceptor:

// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC //on the server.info contains all the information of this RPC the interceptor can operate //on. And handler is the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC.
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

3、实现服务端拦截器

下面我们进行简单的实现:

// 一元拦截器
func Unary() grpc.UnaryServerInterceptor {
	return func(
		ctx context.Context,
		req interface{},
		info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (interface{}, error) {
		log.Print("---------> the unaryServerInterceptor: ", info.FullMethod)
		err := interceptor.authorize(ctx, info.FullMethod)//实现拦截验证的逻辑,自行实现,我这里是截取的完整代码中的,也可以参考我的gitee上的代码
		if err != nil {
			return nil, err
		}
		return handler(ctx, req)
	}
}
//stream拦截器
func Stream() grpc.StreamServerInterceptor {
	return func(
		srv interface{},
		stream grpc.ServerStream,
		info *grpc.StreamServerInfo,
		handler grpc.StreamHandler) error {
		log.Print("--------->the streamServerInterceptor: ", info.FullMethod)
		err := interceptor.authorize(stream.Context(), info.FullMethod)//实现拦截验证的逻辑,自行实现,我这里是截取的完整代码中的,也可以参考我的gitee上的代码
		if err != nil {
			return err
		}
		return handler(srv, stream)
	}
}

将上面实现的拦截器加入到Server中即可:

serverOptions := []grpc.ServerOption{
	grpc.UnaryInterceptor(Unary()),
	grpc.StreamInterceptor(Stream()),
}
grpcServer := grpc.NewServer(serverOptions...)

客户端拦截器

1、一元拦截器:WithUnaryInterceptor

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
// unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.unaryInt = f
	})
}

UnaryClientInterceptor

// UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
// Unary interceptors can be specified as a DialOption, using
// WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a
// ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC
// delegates all unary RPC invocations to the interceptor, and it is the
// responsibility of the interceptor to call invoker to complete the processing
// of the RPC.
//
// method is the RPC name. req and reply are the corresponding request and
// response messages. cc is the ClientConn on which the RPC was invoked. invoker
// is the handler to complete the RPC and it is the responsibility of the
// interceptor to call it. opts contain all applicable call options, including
// defaults from the ClientConn as well as per-call options.
//
// The returned error must be compatible with the status package.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

2、流式拦截器:WithStreamInterceptor

// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.streamInt = f
	})
}

StreamClientInterceptor 拦截客户端流 ClientStream 的创建,流式拦截器可以指定为一个 Dial 选项。当创建一个客户端连接时,使用 WithStreamInterceptor() 或者 WithChainStreamInterceptor() 。当一个流拦截器被设置在客户端连接中的时候,gRPC将所有流的创建都交给拦截器,拦截器调用streamer。

// StreamClientInterceptor intercepts the creation of a ClientStream. Stream
// interceptors can be specified as a DialOption, using WithStreamInterceptor()
// or WithChainStreamInterceptor(), when creating a ClientConn. When a stream
// interceptor(s) is set on the ClientConn, gRPC delegates all stream creations
// to the interceptor, and it is the responsibility of the interceptor to call
// streamer.
//
// desc contains a description of the stream. cc is the ClientConn on which the
// RPC was invoked. streamer is the handler to create a ClientStream and it is
// the responsibility of the interceptor to call it. opts contain all applicable
// call options, including defaults from the ClientConn as well as per-call
// options.
//
// StreamClientInterceptor may return a custom ClientStream to intercept all I/O
// operations. The returned error must be compatible with the status package.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

参数解释:

  • ctx context.Context是请求上下文
  • desc *StreamDesc包含了流中描述的信息
  • cc *ClientConn是调用RPC的客户端连接
  • method string是请求的方法名
  • streamer Streamer是一个创建客户端流的处理器,流式拦截器中需要调用它。
  • opts ...CallOption包含了所有适用呼叫选项,包括来自于客户端连接的默认选项和所有的呼叫。

3、实现客户端拦截器

下面是具体实现:

func Unary() grpc.UnaryClientInterceptor {
	return func(
		ctx context.Context,
		method string,
		req, reply interface{},
		conn *grpc.ClientConn,
		invoker grpc.UnaryInvoker, //回调函数
		opts ...grpc.CallOption,
	) error {
		log.Printf("-------> unary interceptor: %s", method)
		if authMethods[method] { //如果是拦截的方法,在调用实际的rpc方法之前将token添加到context中,这个存储需要拦截的方法
			return invoker(attachToken(ctx), method, req, reply, conn, opts...) //attachToken为自己实现的客户端拦截请求之后附加token的方法
		}
		return invoker(ctx, method, req, reply, conn, opts...)
	}
}
// Stream returns a client interceptor to authenticate stream RPC
func Stream() grpc.StreamClientInterceptor {
	return func(
		ctx context.Context,
		desc *grpc.StreamDesc,
		conn *grpc.ClientConn,
		method string,
		streamer grpc.Streamer,
		opts ...grpc.CallOption,
	) (grpc.ClientStream, error) {
		log.Printf("-------> stream interceptor: %s", method)
		if interceptor.authMethods[method] {
			return streamer(attachToken(ctx), desc, conn, method, opts...)
		}
		return streamer(ctx, desc, conn, method, opts...)
	}
}

使用也比较简单,在Dial中添加即可:

conn2, err := grpc.Dial(
	Address, //地址
	transportOption, //SSL/TLS证书选择
	grpc.WithUnaryInterceptor(Unary()),
	grpc.WithStreamInterceptor(Stream()))
if err != nil {
	log.Fatal("cannot dial server: ", err)
}

到此这篇关于gRPC中拦截器的使用详解的文章就介绍到这了,更多相关gRPC中的拦截器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • golang中数组与切片的区别详析

    golang中数组与切片的区别详析

    数组是固定长度,常量,切片长度是可以改变,所以是一个可变的数组,下面这篇文章主要给大家介绍了关于golang中数组与切片区别的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-11-11
  • Golang依赖注入工具digo的使用详解

    Golang依赖注入工具digo的使用详解

    这篇文章主要为大家详细介绍了Golang中依赖注入工具digo的使用,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-06-06
  • 一文带你熟悉Go语言中的分支结构

    一文带你熟悉Go语言中的分支结构

    这篇文章主要和大家分享一下Go语言中的分支结构(if - else-if - else、switch),文中的示例代码讲解详细,对我们学习Go语言有一定的帮助,需要的可以参考一下
    2022-11-11
  • 阿里云go开发环境搭建过程

    阿里云go开发环境搭建过程

    这篇文章主要介绍了阿里云go开发环境搭建过程,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2018-02-02
  • 一文带你了解Go语言如何解析JSON

    一文带你了解Go语言如何解析JSON

    本文将说明如何利用 Go 语言将 JSON 解析为结构体和数组,如果解析 JSON 的嵌入对象,如何将 JSON 的自定义属性名称映射到结构体,如何解析非结构化的 JSON 字符串
    2023-01-01
  • Go 数据结构之二叉树详情

    Go 数据结构之二叉树详情

    这篇文章主要介绍了 Go 数据结构之二叉树详情,二叉树是一种数据结构,在每个节点下面最多存在两个其他节点。即一个节点要么连接至一个、两个节点或不连接其他节点,下文基于GO语言展开二叉树结构详情,需要的朋友可以参考一下
    2022-05-05
  • Go错误和异常CGO fallthrough处理教程详解

    Go错误和异常CGO fallthrough处理教程详解

    这篇文章主要为大家介绍了Go错误和异常CGO fallthrough使用教程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-08-08
  • 详解在Go语言单元测试中如何解决文件依赖问题

    详解在Go语言单元测试中如何解决文件依赖问题

    现如今的 Web 应用程序往往采用 RESTful API 接口形式对外提供服务,后端接口直接向前端返回 HTML 文件的情况越来越少,所以在程序中操作文件的场景也变少了,在编写单元测试时,文件就成了被测试代码的外部依赖,本文就来讲解下测试过程中如何解决文件外部依赖问题
    2023-08-08
  • golang实现并发数控制的方法

    golang实现并发数控制的方法

    下面小编就为大家分享一篇golang实现并发数控制的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2017-12-12
  • Golang Type关键字的使用

    Golang Type关键字的使用

    Type关键字在Go语言中作用很重要,比如定义结构体,接口,还可以自定义类型,定义类型别名等,具有一定的参考价值,感兴趣的可以了解一下
    2023-11-11

最新评论