
一、I/O 复用模型解读
Tomcat 的 NIO 是基于 I/O 复用来实现的。对这点一定要清楚,不然我们的讨论就不在一个逻辑线上。下面这张图学习过 I/O 模型知识的一般都见过,出自《UNIX 网络编程》,I/O 模型一共有阻塞式 I/O,非阻塞式 I/O,I/O 复用 (select/poll/epoll),信号驱动式 I/O 和异步 I/O。这篇文章讲的是 I/O 复用。
IO 复用.png
这里先来说下用户态和内核态,直白来讲,如果线程执行的是用户代码,当前线程处在用户态,如果线程执行的是内核里面的代码,当前线程处在内核态。更深层来讲,操作系统为代码所处的特权级别分了 4 个级别。不过现代操作系统只用到了 0 和 3 两个级别。0 和 3 的切换就是用户态和内核态的切换。更详细的可参照《深入理解计算机操作系统》。I/O 复用模型,是同步非阻塞,这里的非阻塞是指 I/O 读写,对应的是 recvfrom 操作,因为数据报文已经准备好,无需阻塞。说它是同步,是因为,这个执行是在一个线程里面执行的。有时候,还会说它又是阻塞的,实际上是指阻塞在 select 上面,必须等到读就绪、写就绪等网络事件。有时候我们又说 I/O 复用是 多路复用,这里的多路是指 N 个连接,每一个连接对应一个 channel,或者说多路就是多个 channel。复用,是指多个连接复用了一个线程或者少量线程 (在 Tomcat 中是 Math.min(2,Runtime.getRuntime().availableProcessors()))。
上面提到的网络事件有连接就绪,接收就绪,读就绪,写就绪四个网络事件。I/O 复用主要是通过 Selector 复用器来实现的,可以结合下面这个图理解上面的叙述。
Selector 图解.png
二、TOMCAT 对 IO 模型的支持
tomcat 支持 IO 类型图.png
tomcat 从 6 以后开始支持 NIO 模型,实现是基于 JDK 的 java.nio 包。这里可以看到对 read body 和 response body 是 Blocking 的。关于这点在第 6.3 节源代码阅读有重点介绍。
三、TOMCAT 中 NIO 的配置与使用
在 Connector 节点配置 protocol=”org.apache.coyote.http11.Http11NioProtocol”,Http11NioProtocol 协议下默认最大连接数是 10000,也可以重新修改 maxConnections 的值,同时我们可以设置最大线程数 maxThreads,这里设置的最大线程数就是 Excutor 的线程池的大小。在 BIO 模式下实际上是没有 maxConnections,即使配置也不会生效,BIO 模式下的 maxConnections 是保持跟 maxThreads 大小一致,因为它是一请求一线程模式。
四、NioEndpoint 组件关系图解读
tomcatnio 组成.png
我们要理解 tomcat 的 nio 最主要就是对 NioEndpoint 的理解。 它一共包含 LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5 个部分。LimitLatch 是连接控制器,它负责维护连接数的计算,nio 模式下默认是 10000,达到这个阈值后,就会拒绝连接请求。Acceptor 负责接收连接,默认是 1 个线程来执行,将请求的事件注册到事件列表。有 Poller 来负责轮询,Poller 线程数量是 cpu 的核数 Math.min(2,Runtime.getRuntime().availableProcessors())。由 Poller 将就绪的事件生成 SocketProcessor 同时交给 Excutor 去执行。Excutor 线程池的大小就是我们在 Connector 节点配置的 maxThreads 的值。在 Excutor 的线程中,会完成从 socket 中读取 http request,解析成 HttpServletRequest 对象,分派到相应的 servlet 并完成逻辑,然后将 response 通过 socket 发回 client。在从 socket 中读数据和往 socket 中写数据的过程,并没有像典型的非阻塞的 NIO 的那样,注册 OP_READ 或 OP_WRITE 事件到主 Selector,而是 直接通过 socket 完成读写,这时是阻塞完成的 ,但是在 timeout 控制上,使用了 NIO 的 Selector 机制,但是这个 Selector 并不是 Poller 线程维护的主 Selector,而是 BlockPoller 线程中维护的 Selector,称之为辅 Selector。详细源代码可以参照 第 6.3 节。
五、NioEndpoint 执行序列图
tomcatnio 序列图.png
在下一小节 NioEndpoint 源码解读中我们将对步骤 1-步骤 11 依次找到对应的代码来说明。
六、NioEndpoint 源码解读
6.1、初始化
无论是 BIO 还是 NIO,开始都会初始化连接限制,不可能无限增大,NIO 模式下默认是 10000。
public void startInternal() throws Exception {
if (!running) {
//省略代码...
initializeConnectionLatch();
//省略代码...
}
}
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null;
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
6.2、步骤解读
下面我们着重叙述跟 NIO 相关的流程,共分为 11 个步骤,分别对应上面序列图中的步骤。
步骤 1: 绑定 IP 地址及端口,将 ServerSocketChannel 设置为阻塞。
这里为什么要设置成阻塞呢,我们一直都在说非阻塞。Tomcat 的设计初衷主要是为了操作方便。这样这里就跟 BIO 模式下一样了。只不过在 BIO 下这里返回的是 Socket,NIO 下这里返回的是 SocketChannel。
public void bind() throws Exception {
//省略代码...
serverSock.socket().bind(addr,getBacklog());
serverSock.configureBlocking(true);
//省略代码...
selectorPool.open();
}
步骤 2: 启动接收线程
public void startInternal() throws Exception {
if (!running) {
//省略代码...
startAcceptorThreads();
}
}
//这个方法实际是在它的超类 AbstractEndpoint 里面
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
Thread t = new Thread(acceptors[i], getName() + "-Acceptor-" + i);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
步骤 3:ServerSocketChannel.accept() 接收新连接
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
while (running) {
try {
//省略代码...
SocketChannel socket = null;
try {
socket = serverSock.accept();//接收新连接
} catch (IOException ioe) {
//省略代码...
throw ioe;
}
//省略代码...
if (running && !paused) {
if (!setSocketOptions(socket)) {
//省略代码...
}
} else {
//省略代码...
}
} catch (SocketTimeoutException sx) {
} catch (IOException x) {
//省略代码...
} catch (OutOfMemoryError oom) {
//省略代码...
} catch (Throwable t) {
//省略代码...
}
}
}
}
步骤 4: 将接收到的链接通道设置为非阻塞
步骤 5: 构造 NioChannel 对象
步骤 6:register 注册到轮询线程
protected boolean setSocketOptions(SocketChannel socket) {
try {
socket.configureBlocking(false);//将连接通道设置为非阻塞
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.poll();//构造 NioChannel 对象
//省略代码...
getPoller0().register(channel);//register 注册到轮询线程
} catch (Throwable t) {
//省略代码...
}
//省略代码...
}
步骤 7: 构造 PollerEvent,并添加到事件队列
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
public void register(final NioChannel socket)
{
//省略代码...
PollerEvent r = eventCache.poll();
//省略代码...
addEvent(r);
}
步骤 8: 启动轮询线程
public void startInternal() throws Exception {
if (!running) {
//省略代码...
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
//省略代码...
}
}
步骤 9: 取出队列中新增的 PollerEvent 并注册到 Selector
public static class PollerEvent implements Runnable {
//省略代码...
@Override
public void run() {
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
//省略代码...
}//end if
}//run
//省略代码...
}
步骤 10:Selector.select()
public void run() {
// Loop until destroy() is called
while (true) {
try {
//省略代码...
try {
if ( !close ) {
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
//省略代码...
}
//省略代码...
} catch ( NullPointerException x ) {
//省略代码...
} catch ( CancelledKeyException x ) {
//省略代码...
} catch (Throwable x) {
//省略代码...
}
//省略代码...
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
processKey(sk, attachment);//此方法跟下去就是把 SocketProcessor 交给 Excutor 去执行
}
}//while
//省略代码...
} catch (OutOfMemoryError oom) {
//省略代码...
}
}//while
//省略代码...
}
步骤 11: 根据选择的 SelectionKey 构造 SocketProcessor 提交到请求处理线程
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
//省略代码...
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
//省略代码...
} catch (Throwable t) {
//省略代码...
}
//省略代码...
}
6.3、NioBlockingSelector 和 BlockPoller 介绍
上面的序列图有个地方我没有描述,就是 NioSelectorPool 这个内部类,是因为在整体理解 tomcat 的 nio 上面在序列图里面不包括它更好理解。在有了上面的基础后,我们在来说下 NioSelectorPool 这个类,对更深层了解 Tomcat 的 NIO 一定要知道它的作用。NioEndpoint 对象中维护了一个 NioSelecPool 对象,这个 NioSelectorPool 中又维护了一个 BlockPoller 线程,这个线程就是基于辅 Selector 进行 NIO 的逻辑。以执行 servlet 后,得到 response,往 socket 中写数据为例,最终写的过程调用 NioBlockingSelector 的 write 方法。代码如下:
public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
KeyAttachment att = (KeyAttachment) key.attachment();
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining()) {
if (keycount > 0) { //only write if we were registered for a write
//直接往 socket 中写数据
int cnt = socket.write(buf); //write the data
lastWrite.set(cnt);
if (cnt == -1)
throw new EOFException();
written += cnt;
//写数据成功,直接进入下一次循环,继续写
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
}
//如果写数据返回值 cnt 等于 0,通常是网络不稳定造成的写数据失败
try {
//开始一个倒数计数器
if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);
//将 socket 注册到辅 Selector,这里 poller 就是 BlockSelector 线程
poller.add(att,SelectionKey.OP_WRITE);
//阻塞,直至超时时间唤醒,或者在还没有达到超时时间,在 BlockSelector 中唤醒
att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);
}catch (InterruptedException ignore) {
Thread.interrupted();
}
if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) {
keycount = 0;
}else {
//还没超时就唤醒,说明网络状态恢复,继续下一次循环,完成写 socket
keycount = 1;
att.resetWriteLatch();
}
if (writeTimeout > 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >= writeTimeout;
} //while
if (timedout)
throw new SocketTimeoutException();
} finally {
poller.remove(att,SelectionKey.OP_WRITE);
if (timedout && key != null) {
poller.cancelKey(socket, key);
}
}
return written;
}
也就是说当 socket.write() 返回 0 时,说明网络状态不稳定,这时将 socket 注册 OP_WRITE 事件到辅 Selector,由 BlockPoller 线程不断轮询这个辅 Selector,直到发现这个 socket 的写状态恢复了,通过那个倒数计数器,通知 Worker 线程继续写 socket 动作。看一下 BlockSelector 线程的代码逻辑:
public void run() {
while (run) {
try {
......
Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (run && iterator != null && iterator.hasNext()) {
SelectionKey sk = (SelectionKey) iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
try {
attachment.access();
iterator.remove(); ;
sk.interestOps(sk.interestOps() & (~sk.readyOps()));
if ( sk.isReadable() ) {
countDown(attachment.getReadLatch());
}
//发现 socket 可写状态恢复,将倒数计数器置位,通知 Worker 线程继续
if (sk.isWritable()) {
countDown(attachment.getWriteLatch());
}
}catch (CancelledKeyException ckx) {
if (sk!=null) sk.cancel();
countDown(attachment.getReadLatch());
countDown(attachment.getWriteLatch());
}
}//while
}catch ( Throwable t ) {
log.error("",t);
}
}
events.clear();
try {
selector.selectNow();//cancel all remaining keys
}catch( Exception ignore ) {
if (log.isDebugEnabled())log.debug("",ignore);
}
}
使用这个辅 Selector 主要是减少线程间的切换,同时还可减轻主 Selector 的负担。
七、关于性能
下面这份报告是我们压测的一个结果,跟想象的是不是不太一样?几乎没有差别,实际上 NIO 优化的是 I/O 的读写,如果瓶颈不在这里的话,比如传输字节数很小的情况下,BIO 和 NIO 实际上是没有差别的。NIO 的优势更在于用少量的线程 hold 住大量的连接。还有一点,我们在压测的过程中,遇到在 NIO 模式下刚开始的一小段时间内容,会有错误,这是因为一般的压测工具是基于一种长连接,也就是说比如模拟 1000 并发,那么同时建立 1000 个连接,下一时刻再发送请求就是基于先前的这 1000 个连接来发送,还有 TOMCAT 的 NIO 处理是有 POLLER 线程来接管的,它的线程数一般等于 CPU 的核数,如果一瞬间有大量并发过来,POLLER 也会顿时处理不过来。
压测 1.jpeg
压测 2.jpeg
八、总结
NIO 只是优化了网络 IO 的读写,如果系统的瓶颈不在这里,比如每次读取的字节说都是 500b,那么 BIO 和 NIO 在性能上没有区别。NIO 模式是最大化压榨 CPU,把时间片都更好利用起来。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源如内存,有关线程资源可参照这篇文章 《一台 java 服务器可以跑多少个线程》。因此,使用的线程越少越好。而 I/O 复用模型正是利用少量的线程来管理大量的连接。在对于维护大量长连接的应用里面更适合用基于 I/O 复用模型 NIO,比如 web qq 这样的应用。所以我们要清楚系统的瓶颈是 I/O 还是 CPU 的计算。
转载请注明作者及出处,并附上链接 http://ift.tt/2AaaTWE
参考资料:
http://ift.tt/1l2xZow
http://ift.tt/2j8jrtS
《Tomcat 内核设计剖析》
《深入理解计算机操作系统》
《UNIX 网络编程》卷 1
The post 深度解读 Tomcat 中的 NIO 模型 appeared first on Linuxeden开源社区.
http://ift.tt/2ArdVXj
没有评论:
发表评论