Netty Socks代理服务器源码分析
文中出现的代码片段可于 netty example 中的
socksproxy 找到。
主要配合 rfc1928 讲解 Socks5 的实现。
程序入口
main 函数位于 io.netty.example.socksproxy.SocksServer.
public final class SocksServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "1080"));
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SocksServerInitializer());
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这里主要启动了一个默认监听1080端口的服务器, 对于每一个 accept 进来的请求, 将由 SocksServerInitializer
往 pipeline 里添加的 handler 处理。
// io.netty.example.socksproxy.SocksServerInitializer
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
// 开始主要就是这三个handler
new LoggingHandler(LogLevel.DEBUG), // 作用为输出日志,因此不用深究
new SocksPortUnificationServerHandler(),
SocksServerHandler.INSTANCE);
}
客户端发送 Socks5 初始请求
于 RFC 文档中可以得知此请求是这样的:
The client connects to the server, and sends a version identifier/method selection message:
+----+----------+----------+ |VER | NMETHODS | METHODS | +----+----------+----------+ | 1 | 1 | 1 to 255 | +----+----------+----------+
上面的数字表示这个协议包的各字段字节长度。由于是 Socks5 , VER 是 0x05。
简单起见, 一个不需要认证的请求会包含三个字节,分别是 0x05、0x01、0x00。
第二个字节是 METHODS 的长度,第三个字节 NOAUTH 协议规定为 0x00。
接下来看 netty 中如何处理这个请求,前面提到 SocksPortUnificationServerHandler
将负责处理这个请求。处理逻辑在其 decode 方法:
// 篇幅原因,删掉部分代码,具体请参阅io.netty.handler.codec.socksx.SocksPortUnificationServerHandler
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ChannelPipeline p = ctx.pipeline();
final byte versionVal = in.getByte(readerIndex); // 1
SocksVersion version = SocksVersion.valueOf(versionVal);
switch (version) {
case SOCKS5:
logKnownVersion(ctx, version);
p.addAfter(ctx.name(), null, socks5encoder); // 2
p.addAfter(ctx.name(), null, new Socks5InitialRequestDecoder()); // 3
break;
}
p.remove(this); // 4
}
可见 SocksPortUnificationServerHandler
主要用来判断 socks 版本号。
具体步骤可分为:
- 先获取了一个字节,用来判断 Socks 版本号
- 往 pipeline 加入 Socks5 编码器
- 添加
Socks5InitialRequestDecoder
对该请求解码 SocksPortUnificationServerHandler
已完成任务,从 pipeline 中移除
Socks5InitialRequestDecoder 解码请求
Socks5InitialRequestDecoder
实现了 ReplayingDecoder
可以用一种状态机式的方式解码二进制的请求:
// 仍然删掉了部分异常检测、出错处理的代码
// io.netty.handler.codec.socksx.v5.Socks5InitialRequestDecoder
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case INIT: {
final byte version = in.readByte(); // 1
final int authMethodCnt = in.readUnsignedByte(); // 2
final Socks5AuthMethod[] authMethods = new Socks5AuthMethod[authMethodCnt];
for (int i = 0; i < authMethodCnt; i++) {
authMethods[i] = Socks5AuthMethod.valueOf(in.readByte());
}
out.add(new DefaultSocks5InitialRequest(authMethods)); // 3
checkpoint(State.SUCCESS); // 4
}
case SUCCESS: {
int readableBytes = actualReadableBytes();
if (readableBytes > 0) {
out.add(in.readSlice(readableBytes).retain());
}
break;
}
}
}
具体步骤:
- 读取版本号
- 读取认证方法计数
- 把初始请求的字节包转换为
DefaultSocks5InitialRequest
对象 - 状态变为 SUCCESS , 以后不再解码任何数据
上面第三步的提到的对象将在 SocksServerHandler#messageReceived
方法中接收,
这个方法处理 Socks5InitialRequest
的逻辑为:
在 pipeline 顶端添加了 Socks5CommandRequestDecoder
负责解码接下来会收到的
Command 请求, 并给客户端发送了采用 NOAUTH 的响应。
if (socksRequest instanceof Socks5InitialRequest) {
ctx.pipeline().addFirst(new Socks5CommandRequestDecoder());
ctx.write(new DefaultSocks5InitialResponse(Socks5AuthMethod.NO_AUTH));
}
客户端发送 Socks5 命令请求
一旦方法协商完毕, 客户端继续发送请求:
The SOCKS request is formed as follows:
+----+-----+-------+------+----------+----------+ |VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT | +----+-----+-------+------+----------+----------+ | 1 | 1 | X'00' | 1 | Variable | 2 | +----+-----+-------+------+----------+----------+
netty 的 example 仅实现 CONNECT 命令, 一个简单的 CONNECT 命令请求将由0x05、0x01、0x00、0x01、一个 INT 表示的 IPV4 地址, 一个 SHORT 表示的端口号组成,总共10个字节。
第一个是 socks 协议版本号,第二个字节表示 CONNECT 命令,第三个保留字节必须为0x00,第四个0x01代表接下来的字节表示IPV4地址,IPV4地址可以用整型即四个字节表示,端口号0~65535可以用两个字节表示
Socks5CommandRequestDecoder 解码请求
类似于 Socks5InitialRequestDecoder
, Socks5CommandRequestDecoder
也是一个差不多的解码器。
// io.netty.handler.codec.socksx.v5.Socks5CommandRequestDecoder
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case INIT: {
final byte version = in.readByte();
final Socks5CommandType type = Socks5CommandType.valueOf(in.readByte());
in.skipBytes(1); // RSV
final Socks5AddressType dstAddrType = Socks5AddressType.valueOf(in.readByte());
final String dstAddr = addressDecoder.decodeAddress(dstAddrType, in);
final int dstPort = in.readUnsignedShort();
out.add(new DefaultSocks5CommandRequest(type, dstAddrType, dstAddr, dstPort));
checkpoint(State.SUCCESS);
}
case SUCCESS: {
int readableBytes = actualReadableBytes();
if (readableBytes > 0) {
out.add(in.readSlice(readableBytes).retain());
}
break;
}
}
}
具体步骤仍是依次读取每个字节并把请求转换为 DefaultSocks5CommandRequest
对象。
SocksServerHandler#messageReceived
方法中接收,这个方法处理 Socks5CommandRequest
的逻辑为:
else if (socksRequest instanceof Socks5CommandRequest) {
Socks5CommandRequest socks5CmdRequest = (Socks5CommandRequest) socksRequest;
if (socks5CmdRequest.type() == Socks5CommandType.CONNECT) {
ctx.pipeline().addLast(new SocksServerConnectHandler()); // 1
ctx.pipeline().remove(this); // 2
ctx.fireChannelRead(socksRequest); // 3
} else {
ctx.close();
}
}
对于 CONNECT 命令:
- pipeline 末端加入
SocksServerConnectHandler
- 移除
SocksServerHandler
这个handler - 把这个 commandrequest 对象交给下一个 handler 处理。
此时 pipeline 中的 handler 应该是这样的:
Socks5CommandRequestDecoder
\|/
LoggingHandler
\|/
Socks5ServerEncoder
\|/
Socks5InitialRequestDecoder
\|/
SocksServerConnectHandler
上面提到 LoggingHandler
只是输出日志,
两个 Decoder 此时都处于 SUCCESS 状态,不再处理任何数据。Socks5ServerEncoder
则负责把响应信息编码为字节。
即 commandrequest 最后由 SocksServerConnectHandler
处理。
SocksServerConnectHandler 处理请求
对于 Socks5CommandRequest
对象, SocksServerConnectHandler
处理逻辑如下:
if (message instanceof Socks5CommandRequest) {
final Socks5CommandRequest request = (Socks5CommandRequest) message;
Promise<Channel> promise = ctx.executor().newPromise(); // 1
// ... omitting code
final Channel inboundChannel = ctx.channel();
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientHandler(promise)); // 2
// 3
b.connect(request.dstAddr(), request.dstPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().writeAndFlush( // 4
new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, request.dstAddrType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
});
}
具体步骤如下:
- 先获取一个 Promise 对象,并注册了 Listener ,省略了这部分代码稍后分析
- b 是一个
Bootstrap
给它的DirectClientHandler
传递了 promise 对象 - b 连接到指定的 IPV4 地址和端口
- 连接失败时向客户端写出失败响应
DirectClientHandler
逻辑很简单,连接到指定地址成功后,setSuccess
让 Promise 的回调函数执行。
同时在这个 Promise 中放有一个 Channel 的引用。
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
}
在这里先明确两个概念:
DirectClientHandler
中的 Channel 是 Socks 服务器与 IPV4 地址建立的,称为 OutboundChannel
客户端与 Socks 服务器之间建立的 Channel 称为 InboundChannel
因此, Promise 持有的是 OutboundChannel
让两个 Channel 对接实现代理
代理个人简单的理解,客户端发送的任何数据经过代理服务器,此处是 Socks,发送到指定的主机。
指定主机发送的任何数据经过代理服务器,再传回客户端。代理服务器相当于一个中继点。
刚才省略了 Promise 上注册的 Listener 代码如下:
promise.addListener(
new FutureListener<Channel>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow(); // 1
if (future.isSuccess()) {
ChannelFuture responseFuture = // 2
ctx.channel().writeAndFlush(new DefaultSocks5CommandResponse(
Socks5CommandStatus.SUCCESS, request.dstAddrType()));
responseFuture.addListener(new ChannelFutureListener() { // 3
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(SocksServerConnectHandler.this); // 4
outboundChannel.pipeline().addLast(new RelayHandler(ctx.channel())); // 5
ctx.pipeline().addLast(new RelayHandler(outboundChannel)); // 6
}
});
}
}
});
- Promise 继承了 Future, 这里 future 即上文的 promise, 通过
getNow
获得OutboundChannel
- 向客户端写出成功响应,返回 responseFuture
- 监听写出操作, 写出完成会回调注册
ChannelFutureListener
- 移除
SocksServerConnectHandler
OutboundChannel
的 pipeline 增加持有InboundChannel
的RelayHandler
InboundChannel
的 pipeline 增加持有OutboundChannel
的RelayHandler
Relay 有中继、转达的意思, 其作用不言而喻:
读到的任何信息都往其实例变量 relayChannel 写
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (relayChannel.isActive()) {
relayChannel.writeAndFlush(msg);
} else {
ReferenceCountUtil.release(msg);
}
}
最终效果
client <===inbound===> socks server <===outbound===> host