Netty is an asynchronous event-driven network application frameworkfor rapid development of maintainable high performance protocol servers & clients.
前言
Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
Netty
Netty的介绍:
1)Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目;
2)Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序;
3)Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用;
4)Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景;
5)要透彻理解Netty , 需要先学习 NIO , 这样我们才能阅读 Netty 的源码。
Netty的优点:
1)设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;
2)使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了;
3)高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制;
4)安全:完整的 SSL/TLS 和 StartTLS 支持;
5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。
Netty的组成:
Netty的工作架构:
Netty的工作原理:
1) Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写;
2) BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup;
3) NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop;
4) NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯;
5) NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop;
6) 每个Boss NioEventLoop 循环执行的步骤有3步:
1.轮询accept 事件;
2.处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector;
3.处理任务队列的任务 , 即 runAllTasks。
7) 每个 Worker NIOEventLoop 循环执行的步骤:
1.轮询read, write 事件;
2.处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理;
3.处理任务队列的任务 , 即 runAllTasks。
8) 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器。
netty服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer { public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new NettyServerChannelInitializer());
System.out.println(".....服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } });
cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
}
}
import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer;
public class NettyServerChannelInitializer extends ChannelInitializer { @Override protected void initChannel(Channel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); ch.pipeline().addLast(new NettyServerHandler()); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline();
ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + channel.remoteAddress()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
netty客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient { public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射) .handler(new NettyClientChannelInitializer());
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); channelFuture.channel().closeFuture().sync(); } finally {
group.shutdownGracefully();
} } }
import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer;
public class NettyClientChannelInitializer extends ChannelInitializer { protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址: "+ ctx.channel().remoteAddress()); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
延伸
Netty
彻底理解Netty
Netty-百度百科
尚硅谷韩顺平Netty视频教程(2019发布)