0


Jetty-容器介绍与分析-一款开源的HTTP服务器、HTTP客户端和Java Servlet

一、Jetty 介绍

Jetty是一个开源的servlet容器,由Mort Bay Consulting公司创建,主要用于为基于Java的web内容(如JSP和servlet)提供运行环境。

  1. 功能丰富:Jetty不仅可以作为独立的Web服务器使用,还提供了支持JSP和Servlet的运行时环境,允许开发人员在Java应用程序中轻松地提供网络和Web连接。
  2. 设计模块化:Jetty的设计非常模块化,这意味着它可以根据需要进行灵活的配置和定制,从而提高了资源的利用率。
  3. 性能优异:Jetty支持异步Servlet,能够处理更高的并发量,特别适用于需要处理大量长连接的业务场景。它默认采用的NIO模型,使其在这类场景下成为更好的选择。
  4. 易用性高:Jetty注重易用性,其API以一组JAR包的形式发布,便于开发人员快速上手和使用。
  5. 社区活跃:作为一个开源项目,Jetty拥有一个活跃的社区,用户可以参与到社区中,贡献代码或寻求帮助。
  6. 应用广泛:由于其轻量级和灵活性,Jetty被广泛应用于许多知名的产品和项目中,如ActiveMQ、Maven、Spark、Google App Engine、Eclipse和Hadoop等。
  7. 易于集成:Jetty可以嵌入到现有的应用程序中,这使得普通的应用程序能够快速地支持HTTP服务。

综上所述,Jetty以其高性能、易用性和灵活性在Web服务器和Servlet容器领域占有一席之地,是Java开发者在构建Web应用程序时的一个优秀选择。

功能强大、易于使用、高度可定制的servlet容器,适用于各种Java Web应用程序的开发和部署。

二、Jetty 分析

2.1 new Server()

public Server(@Name("port")int port)
{
this((ThreadPool)null);
ServerConnector connector=new ServerConnector(this); 
connector.setPort(port);
setConnectors(new Connector[]{connector});
}

2.1.1 初始化线程池

public Server(@Name("threadpool") ThreadPool pool)
{
_threadPool=pool!=null?pool:new QueuedThreadPool(); 
addBean(_threadPool);
setServer(this);
}

实现了SizedThreadPool

******execute()**方法

@Override

public void execute(Runnable job)

{

if (!isRunning() || !_jobs.offer(job))

{

LOG.warn("{} rejected {}", this, job);

throw new RejectedExecutionException(job.toString());

}

else

{

// Make sure there is at least one thread executing the job.

if (getThreads() == 0)

startThreads(1);

}

}

BlockingQueue

将任务推入

BlockingQueue<Runnable> org.eclipse.jetty.util.thread.QueuedThreadPool._jobs

HTTP connector using NIO ByteChannels and Selectors

继承自 AbstractConnector

2.1.2.1 初始化ScheduledExecutorScheduler****

based on JDK's {@link ScheduledThreadPoolExecutor}.

2.1.2.2 初始化ByteBufferPool****

在数据传输过程中,不可避免需要byte数组

buffer池

默认产生 ArrayByteBufferPool

ByteBufferPool 接口有2个方法:

public ByteBuffer acquire(int size, boolean direct);

public void release(ByteBuffer buffer);

是一个很好的象池范本****

public ArrayByteBufferPool(int minSize, int increment, int maxSize)

public ArrayByteBufferPool()
{

this(0,1024,64*1024);

}

_direct=new Bucket[maxSize/increment];
_indirect=new Bucket[maxSize/increment];

****

Bucket

_direct Bucket数组

_indirect Bucket数组

为每一个大小,新建一个Bucket

但不初始化ByteBuffer

int size=0;

for (int i=0;i<_direct.length;i++)

{

size+=_inc;

_direct[i]=new Bucket(size);

_indirect[i]=new Bucket(size);

}

一个Bucekt存放大小相同的所有的ByteBuffer

_size

bytebuffer大小

_queue

public final Queue<ByteBuffer> _queue= new ConcurrentLinkedQueue<>();

acquire

public ByteBuffer acquire(int size, boolean direct)

取得合适的Bucket****

每个Bucket的大小不同,这里找到最合适的

Bucket bucket = bucketFor(size,direct);

Bucket中取得ByteBuffer****

ByteBuffer buffer = bucket==null?null:bucket._queue.poll();

不存在新建

if (buffer == null)

{

int capacity = bucket==null?size:bucket._size;

buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);

}

release

public void release(ByteBuffer buffer)

{

if (buffer!=null)

{

Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());

if (bucket!=null)

{

BufferUtil.clear(buffer);

bucket._queue.offer(buffer);

}

}

}

取得合适的Bucket****

Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());

清空Buffer****

BufferUtil.clear(buffer);

归还Pool****

bucket._queue.offer(buffer);

例外

如果申请的ByteBuffer过大或者过小,

无法在POOL中满足,则可以申请成功,但无法归还给POOL。

2.1.2.3 维护ConnectionFactory

HttpConnectionFactory

用于创建连接,

比如Accept后,需要创建一个表示连接的对象

2.1.2.4 取得可用CPU数量

int cores = Runtime.getRuntime().availableProcessors();

2.1.2.5 更新acceptor数量

if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));

2.1.2.6 创建acceptor现场组

_acceptors = new Thread[acceptors];

2.1.2.7 初始化ServerConnectorManager****

继承自 SelectorManager

_manager = new ServerConnectorManager(getExecutor(), getScheduler(), selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));
保存selector线程数量****

Math.min(4,Runtime.getRuntime().availableProcessors()/2))

2.1.3 设置Port

connector.setPort(port);

setConnectors(new Connector[]{connector});

2.2 Server.start()

org.eclipse.jetty.server.Server

启动web服务器

WebAppContext context = new WebAppContext();

context.setContextPath("/"); context.setResourceBase("./web/");

context.setClassLoader(Thread.currentThread().getContextClassLoader()); server.setHandler(context);

server.start();

2.2.1 设置启动状态

// AbstractLifeCycle

private void setStarting()
{
if (LOG.isDebugEnabled()) 
    LOG.debug("starting {}",this);
_state = STARTING;

for (Listener listener : _listeners) {
    listener.lifeCycleStarting(this);
}
}

2.2.2 启动过程doStart()

Server

启动整个server

protected void doStart() throws Exception
{

//If the Server should be stopped when the jvm exits, register
//with the shutdown handler thread. 
if (getStopAtShutdown())
ShutdownThread.register(this);

//Register the Server with the handler thread for receiving
//remote stop commands ShutdownMonitor.register(this);

//Start a thread waiting to receive "stop" commands. ShutdownMonitor.getInstance().start(); // initialize

LOG.info("jetty-" + getVersion()); HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION); 

MultiException mex=new MultiException();

// check size of thread pool
SizedThreadPool pool = getBean(SizedThreadPool.class); 
int max=pool==null?-1:pool.getMaxThreads();
int selectors=0; 
int acceptors=0;
if (mex.size()==0)
{
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector) 
    acceptors+=((AbstractConnector)connector).getAcceptors();

if (connector instanceof ServerConnector) 
    selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}

int needed=1+selectors+acceptors; 
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));

try
{
super.doStart();

}
catch(Throwable e)
{
mex.add(e);
}

// start connectors last
for (Connector connector : _connectors)
{
try
{
connector.start();
}
catch(Throwable e)
{
mex.add(e);
}
}

if (isDumpAfterStart()) 
    dumpStdErr();

mex.ifExceptionThrow();

LOG.info(String.format("Started @%dms",Uptime.getUptime()));
}
2.2.2.1 注册ShutdownMonitor

远程控制接口

//Register the Server with the handler thread for receiving

//remote stop commands

ShutdownMonitor.register(this);

//Start a thread waiting to receive "stop" commands.

ShutdownMonitor.getInstance().start(); // initialize

2.2.2.2 获取线程池

// check size of thread pool

SizedThreadPool pool = getBean(SizedThreadPool.class);

QueuedThreadPool

2.2.2.3 设置selector数量

根据Connector数量进行累计

大部分情况下,只有一个ServerConnector

for (Connector connector : _connectors)

{

if (connector instanceof AbstractConnector)

acceptors+=((AbstractConnector)connector).getAcceptors();

if (connector instanceof ServerConnector)

selectors+=((ServerConnector)connector).getSelectorManager().getSelectorCount();

}

累计所有Connector的需求

2.2.2.4 计算所需的所有线程数量

nt needed=1+selectors+acceptors;

如果大于默200中断程序****

if (max>0 && needed>max)

throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d

  • selectors=%d + request=1)",max,acceptors,selectors));
2.2.2.5 维护Bean

**QueuedThreadPool **

doStart()

startThreads()

建立需要的线程

创建线程

Thread thread = newThread(_runnable);

_runnable

_jobs中取任务并执行

设置线程的属性

thread.setDaemon(isDaemon());

thread.setPriority(getThreadsPriority());

thread.setName(_name + "-" + thread.getId());

_threads.add(thread);

启动线程
thread.start();

WebAppContext

如果需要使用,在此处启动

2.2.2.6 启动Connector

取得****ConnectionFactory

_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);

selector线程并启****

for (int i = 0; i < _selectors.length; i++)

{

ManagedSelector selector = newSelector(i);

_selectors[i] = selector;

selector.start();

execute(new NonBlockingThread(selector));

}

newSelector()

protected ManagedSelector newSelector(int id)

{

return new ManagedSelector(id);

}

Acceptor线

_stopping=new CountDownLatch(_acceptors.length);

for (int i = 0; i < _acceptors.length; i++)

{

Acceptor a = new Acceptor(i);

addBean(a); getExecutor().execute(a);

}

Acceptor

线程名字****

final Thread thread = Thread.currentThread();

String name=thread.getName();

_name=String.format("%s-acceptor-%d@%x-%s",name,_acceptor,hashCode(),AbstractConnector.this.toString()); thread.setName(_name);

将自己放入_acceptors

synchronized (AbstractConnector.this)

{

_acceptors[_acceptor] = thread;

}

听端口****

try

{

while (isAccepting())

{

try

{

accept(_acceptor);

}

catch (Throwable e)

{

if (isAccepting()) LOG.warn(e);

else

LOG.ignore(e);

}

}

}

finally

{

thread.setName(name);

if (_acceptorPriorityDelta!=0)

thread.setPriority(priority);

synchronized (AbstractConnector.this)

{

_acceptors[_acceptor] = null;

}

CountDownLatch stopping=_stopping;

if (stopping!=null)

stopping.countDown();

}

ServerConnector.accept()

public void accept(int acceptorID) throws IOException

{

ServerSocketChannel serverChannel = _acceptChannel;

if (serverChannel != null && serverChannel.isOpen())

{

SocketChannel channel = serverChannel.accept();

    accepted(channel);

}

}

在accept的地方等待

没有Acceptor的情况

channle默认是blocking的

如果acceptor数量为0,没有安排线程专门进行accept,则设置为非阻塞模式若是非0,有专门线程进行accept,因此,为阻塞模式

protected void doStart() throws Exception

{

super.doStart();

if (getAcceptors()==0)

{

_acceptChannel.configureBlocking(false);

_manager.acceptor(_acceptChannel);

}

}

2.3 启动完毕

AbstractLifeCycle

private void setStarted()

{

_state = STARTED;

if (LOG.isDebugEnabled())

LOG.debug(STARTED+" @{}ms {}",Uptime.getUptime(),this);

for (Listener listener : _listeners)

listener.lifeCycleStarted(this);

}

2.3 Http 请求

2.3.1 Accept成功

private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false); 
Socket socket = channel.socket(); 
configure(socket);
_manager.accept(channel);

}

2.3.2 设置为非阻塞模式

channel.configureBlocking(false);

2.3.3 配置Socket****

Socket socket = channel.socket(); configure(socket);

2.3.4 正式

SelectorManager _manager;

_manager.accept(channel);

选择可用的ManagedSelector线

private ManagedSelector chooseSelector()
{
// The ++ increment here is not atomic, but it does not matter,
// so long as the value changes sometimes, then connections will
// be distributed over the available selectors. 
    long s = _selectorIndex++;
    int index = (int)(s % getSelectorCount());
    return _selectors[index];
}

ManagedSelector

ManagedSelector 是一个线程封装了Selector 的使用

提交任务

selector.submit(selector.new Accept(channel, attachment));

提交这个处理任务到ManagedSelector:

private final Queue<Runnable> _changes = new ConcurrentArrayQueue<>();

_changes.offer(change);

ConcurrentArrayQueue

与ConcurrentLinkedQueue相似的性能,但直接保存元素而不是node,因此需要更少的对象,更少的GC

2.4 请求处理

2.4.1 ManagedSelector.run()

while (isRunning()) select();

****** select()******

发现有任务就执行

runChanges();

private void runChanges()

{

Runnable change;

while ((change = _changes.poll()) != null)

    runChange(change);

}

runChange()

change.run();

Accept.run

SelectionKey key = channel.register(_selector, 0, attachment);

EndPoint endpoint = createEndPoint(channel, key);

key.attach(endpoint);

select()

int selected = _selector.select();

SelectionKey

Set<SelectionKey> selectedKeys = _selector.selectedKeys();

for (SelectionKey key : selectedKeys)

{

if (key.isValid())

{

processKey(key);

}

else

{

if (debug)

LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());

Object attachment = key.attachment();

if (attachment instanceof EndPoint)

((EndPoint)attachment).close();

}

}

selectedKeys.clear();

processKey()

private void processKey(SelectionKey key)

{

Object attachment = key.attachment();

try

{

if (attachment instanceof SelectableEndPoint)

{

((SelectableEndPoint)attachment).onSelected();

}

else if (key.isConnectable())

{

processConnect(key, (Connect)attachment);

}

else if (key.isAcceptable())

{

processAccept(key);

}

else

{

throw new IllegalStateException();

}

}

catch (CancelledKeyException x)

{

LOG.debug("Ignoring cancelled key for channel {}", key.channel());

if (attachment instanceof EndPoint)

closeNoExceptions((EndPoint)attachment);

}

catch (Throwable x)

{

LOG.warn("Could not process key for channel " + key.channel(), x);

if (attachment instanceof EndPoint)

closeNoExceptions((EndPoint)attachment);

}

}

onSelected()

@Override

public void onSelected()

{

assert _selector.isSelectorThread();

int oldInterestOps = _key.interestOps();

int readyOps = _key.readyOps();

int newInterestOps = oldInterestOps & ~readyOps;

setKeyInterests(oldInterestOps, newInterestOps);

updateLocalInterests(readyOps, false);

if (_key.isReadable())

getFillInterest().fillable();

if (_key.isWritable())

getWriteFlusher().completeWrite();

}

会使用新的线程进行HTTP业务处理 (提交到线程池)

标签: jetty http 服务器

本文转载自: https://blog.csdn.net/ly_7956/article/details/136089880
版权归原作者 纵然间 所有, 如有侵权,请联系我们删除。

“Jetty-容器介绍与分析-一款开源的HTTP服务器、HTTP客户端和Java Servlet”的评论:

还没有评论