基于protobuf序列化协议实现Netty RPC分布式通信

Protobuf序列化

在RPC通信过程需要在网络上传递数据,那么数据大小会影响RPC通信的性能,通过数据序列化(编码)减少数据体积,并且数据反序列化(解码)也要足够高,才能保证RPC通信的整体性能。不同的协议在性能方面表现不同,而且有是否支持跨语言平台的特点。

常见的序列化协议有JDK、XML、JSON、Protobuf、Thrift等。各种序列化协议之间的差异和性能对比,请参见《序列化和反序列化》。Protobuf和Thrift都是高性能的、跨平台的序列化协议,Protobuf可以与任何一款RPC框架绑定使用,是一款纯粹的数据展示层的协议;而Thrift既包括序列化协议,也包括RPC框架。

本文主要使用Protobuf作为序列化协议,Netty实现RPC通信框架。

Protobuf编译环境安装

Protobuf协议需要安装编译环境,编译环境可以将同一份.protobuf文件编译成不同语言平台支持的客户端和服务端代码。

https://github.com/protocolbuffers/protobuf/releases下载Protobuf编译环境安装包,主要注意操作系统版本,本文以mac os系统为例。

1
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.6.1/protoc-3.6.1-osx-x86_64.zip

解压缩后,得到protoc-3.6.1-osx-x86_64的绝对路径,放进环境变量中:

1
2
3
4
5
6
7
$ vi ~/.bashrc

export PROTOBUF_HOME=/soft/protoc-3.6.1-osx-x86_64
export PATH=$PATH:$PROTOBUF_HOME/bin
:wq

$ source ~/.bashrc

查看protobuf版本,验证环境变量配置成功。

1
2
$ protoc --version
libprotoc 3.6.1

添加JAVA依赖包

通过protoc命令编译生成的JAVA客户端和服务端JAVA代码需要依赖google的jar包,所以在JAVA开发环境中需要添加依赖。依赖包版本要与编译环境的版本相同,Maven管理的项目添加如下依赖:

1
2
3
4
5
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>

编写.proto文件

在磁盘上创建一个rpc_serialize_data.protobuf文件,.proto文件定义要生成的客户端和服务端代码规则,同一份文件可以编译生成不同语言的客户端和服务端代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//protobuf协议版本
syntax = "proto2";
//protobuf工作的命名空间
package com.hledu.ns.rpc.protobuf.vo;
//优化选项
//SPEED生成的代码运行效率高,但是生成的代码会占用更多空间
//CODE_SIZE与SPEED正好相反
//LITE_RUNTIME适合mobile app,缺少某些特性,例如反射
option optimize_for = SPEED;
//生成的java代码命名空间
option java_package = "com.hledu.ns.rpc.protobuf.vo";
//生成的java类名称
option java_outer_classname = "RpcVO";

//定义RpcVO中的内部类作为消息体
message RequestVO {
enum ParaType {
P1_TYPE=1;
P2_TYPE=2;
}
optional string serviceName =1;
optional string param = 2;
oneof para {
Para1 p1=3;
Para2 p2=4;
}
optional ParaType paraTye=5;
}

message Para1 {
optional string p1=1;
optional string p2=2;
}

message Para2 {
optional string a1=1;
optional string a2=2;
}

//定义RpcVO中的内部类作为消息体
message ResponseVO {
optional string status = 1;
optional string response = 2;
}

message中可以使用的字段标识

optional 数据传输非必填项,官方建议使用。

required 数据传输必填项,官方强烈不建议使用,因为生成并分发客户端代码后,再改成optional,客户端无法感知。

repeated 数据传输定义一个数组

oneof 数据传输需要选择其中一项

message中可以使用的数据类型

常用数据类型:string、bool、int32、int64、float、double、enum等数据类型,更多数据类型看这

生成Protobuf客户端和服务端代码RpcVO

在命令行执行:

1
protoc --java_out=src/java/main src/protobuf/rpc_serialize_data.protobuf

在项目包目录com.hledu.ns.rpc.protobuf.vo中生成了一个RpcVO.java文件,RpcVO代码中包含内部类RequestVO、Para1、Para2、ResponseVO和枚举ParaType。下一步将是在netty项目中使用RpcVO。

Netty集成Spring EL实现RPC通信

编写RpcServer代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @program: netty_springboot
* @description: 基于protobuf实现rpc server
* @author: Forwardlee
* @create: 2018-10-27
**/
@Slf4j
@Component("rpcServerProtobuf")
public class RpcServer {

@Resource
private RpcServerConfig rpcServerConfig;

@PostConstruct
public void startRpcServer(){
new Thread(()->{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.TCP_NODELAY,true)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new RpcServerChannelInitionalizer());
SocketAddress adress = new InetSocketAddress(rpcServerConfig.getHost(),rpcServerConfig.getPort2());
ChannelFuture future = bootstrap.bind(adress).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("RPC Server 启动出错",e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @program: netty_springboot
* @description: 服务端channel pipeline初始化
* @author: Forwardlee
* @create: 2018-10-27
**/
public class RpcServerChannelInitionalizer extends ChannelInitializer<SocketChannel> {


@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//处理的拆包、粘包的解、编码器
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,0,4));
pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));

//protobuf序列化编解码,使用RpcVO创建RPC通信传输的序列化对象,并使用protobuf协议进行编解码
pipeline.addLast(new ProtobufDecoder(RpcVO.RequestVO.getDefaultInstance()));
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new RpcServerChannelHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* @program: netty_springboot
* @description: 接收rpc请求,spel接口处理protobuf序列数据,返回rpc序列化结果
* @author: Forwardlee
* @create: 2018-10-27
**/
@Slf4j
public class RpcServerChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("RPC Server 收到消息:{}",msg);
try {
if (msg instanceof RpcVO.RequestVO) {
RpcVO.RequestVO requestVO = (RpcVO.RequestVO) msg;
String serviceName = requestVO.getServiceName();
Serializable param = requestVO.getParam();
SpelExpressionParser parser = new SpelExpressionParser();
StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
evaluationContext.setBeanResolver(new BeanFactoryResolver(SpringContext.getContext()));
evaluationContext.setVariable("param", param);
Object responseVo = parser.parseExpression(serviceName).getValue(evaluationContext);
log.info(ctx.channel().remoteAddress()+"-RPC请求执行结果:{}",responseVo);
RpcVO.ResponseVO responseVO = (RpcVO.ResponseVO) responseVo;
ctx.writeAndFlush(responseVO);
}
} finally {
ReferenceCountUtil.release(msg);
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

编写RpcClient代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @program: netty_springboot
* @description: 使用protobuf实现rpc client --rpc请求
* @author: Forwardlee
* @create: 2018-10-27
**/
@Slf4j
@Component("rpcClientProtobuf")
public class RpcClient {

@Resource
private RpcClientConfig rpcClientConfig;

public RpcVO.ResponseVO submit(RpcVO.RequestVO requestVO){
EventLoopGroup clientGroup = new NioEventLoopGroup();
RpcClientChannelHandler rpcClientChannelHandler = new RpcClientChannelHandler(requestVO);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.option(ChannelOption.TCP_NODELAY,true)
.channel(NioSocketChannel.class)
.handler(new RpcClientChannelHandlerInitializer(rpcClientChannelHandler));
SocketAddress address = new InetSocketAddress(rpcClientConfig.getHost(),rpcClientConfig.getPort2());
ChannelFuture future = bootstrap.connect(address).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("RPC Client 连接 RPC Server 出错",e);
} finally {
clientGroup.shutdownGracefully();
}
return rpcClientChannelHandler.getResponseVO();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @program: netty_springboot
* @description: channel初始化
* @author: Forwardlee
* @create: 2018-10-27
**/
public class RpcClientChannelHandlerInitializer extends ChannelInitializer<SocketChannel> {

private ChannelInboundHandlerAdapter channelInboundHandlerAdapter;

public RpcClientChannelHandlerInitializer(ChannelInboundHandlerAdapter channelInboundHandlerAdapter) {
this.channelInboundHandlerAdapter = channelInboundHandlerAdapter;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

//处理的拆包、粘包的解、编码器
pipeline.addLast("frameDecoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,0,4));
pipeline.addLast("frameEncoder",new LengthFieldPrepender(4));

//protobuf序列化编解码
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(RpcVO.ResponseVO.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(channelInboundHandlerAdapter);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @program: netty_springboot
* @description: 处理protobuf序列化数据的请求和相应
* @author: Forwardlee
* @create: 2018-10-27
**/
@Slf4j
public class RpcClientChannelHandler extends ChannelInboundHandlerAdapter {

private RpcVO.ResponseVO responseVO;

private RpcVO.RequestVO requestVO;

public RpcClientChannelHandler(RpcVO.RequestVO requestVO) {
this.requestVO = requestVO;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("收到RPC Server返回消息:{}",msg);
if (msg instanceof RpcVO.ResponseVO) {
this.responseVO = (RpcVO.ResponseVO) msg;
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("RPC Client 发送消息:{}",requestVO);
ctx.writeAndFlush(requestVO);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

public RpcVO.ResponseVO getResponseVO() {
return responseVO;
}
}
谢谢你请我吃糖果