Tomcat - 线程池的设计与实现:StandardThreadExecutor
Tomcat - 线程池的设计与实现:StandardThreadExecutor
上文中我们研究了下Service的设计和实现,StandardService中包含Executor的调用;这个比较好理解,Tomcat需要并发处理用户的请求,自然而言就想到线程池,那么Tomcat中线程池(Executor)具体是如何实现的?本文带你继续深度解析。@pdai
理解思路
我们如下几个方面开始引入线程池的,这里主要从上文Service引入,保持上下文之间的衔接,会很好的构筑你的知识体系。@pdai
- 上文中我们了解到,Executor是包含在Service中的,Service中关于Executor的配置和相关代码如下:
server.xml中service里包含Executor的配置
<Service name="Catalina">
<!-- 1. 属性说明 name:Service的名称 -->
<!--2. 一个或多个excecutors --> // 看这里
<!-- <Executor name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="150" minSpareThreads="4"/> -->
</Service>
Service中executors相关方法
/** * Adds a named executor to the service * @param ex Executor */
@Override
public void addExecutor(Executor ex) {
synchronized (executors) {
if (!executors.contains(ex)) {
executors.add(ex);
if (getState().isAvailable()) {
try {
ex.start(); // 启动
} catch (LifecycleException x) {
log.error(sm.getString("standardService.executor.start"), x);
}
}
}
}
}
/** * Retrieves all executors * @return Executor[] */
@Override
public Executor[] findExecutors() {
synchronized (executors) {
Executor[] arr = new Executor[executors.size()];
executors.toArray(arr);
return arr;
}
}
/** * Retrieves executor by name, null if not found * @param executorName String * @return Executor */
@Override
public Executor getExecutor(String executorName) {
synchronized (executors) {
for (Executor executor: executors) {
if (executorName.equals(executor.getName()))
return executor;
}
}
return null;
}
/** * Removes an executor from the service * @param ex Executor */
@Override
public void removeExecutor(Executor ex) {
synchronized (executors) {
if ( executors.remove(ex) && getState().isAvailable() ) {
try {
ex.stop(); // 停止
} catch (LifecycleException e) {
log.error(sm.getString("standardService.executor.stop"), e);
}
}
}
}
- 和Server、Service实现一样,StandardThreadExecutor也是继承LifecycleMBeanBase;然后实现Executor的接口。
- Tomcat关于Executor相关的配置文档
http://tomcat.apache.org/tomcat-9.0-doc/config/executor.html
准备知识
在理解Tomcat的线程池时,需要有一定的基础,这里推荐学习下JDK关于线程池的设计和实现。
并发基础
多线程的出现是要解决什么问题的?
线程不安全是指什么? 举例说明
并发出现线程不安全的本质什么? 可见性,原子性和有序性。
Java是怎么解决并发问题的? 3个关键字,JMM和8个Happens-Before
线程安全是不是非真即假? 不是
线程安全有哪些实现思路?
如何理解并发和并行的区别?
线程有哪几种状态? 分别说明从一种状态到另一种状态转变有哪些方式?
通常线程有哪几种使用方式?
基础线程机制有哪些?
线程的中断方式有哪些?
线程的互斥同步方式有哪些? 如何比较和选择?
线程之间有哪些协作方式?
JUC BlockingQueue 和 ThreadPoolExecutor
什么是BlockingDeque?
BlockingQueue大家族有哪些? ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, SynchronousQueue...
BlockingQueue适合用在什么样的场景?
BlockingQueue常用的方法?
BlockingQueue插入方法有哪些? 这些方法(
add(o)
,offer(o)
,put(o)
,offer(o, timeout, timeunit)
)的区别是什么?BlockingDeque 与BlockingQueue有何关系,请对比下它们的方法?
BlockingDeque适合用在什么样的场景?
BlockingDeque大家族有哪些?
BlockingDeque 与BlockingQueue实现例子?
为什么要有线程池?
Java是实现和管理线程池有哪些方式? 请简单举例如何使用。
为什么很多公司不允许使用Executors去创建线程池? 那么推荐怎么使用呢?
ThreadPoolExecutor有哪些核心的配置参数? 请简要说明
ThreadPoolExecutor可以创建哪是哪三种线程池呢?
当队列满了并且worker的数量达到maxSize的时候,会怎么样?
说说ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 默认是什么策略?
简要说下线程池的任务执行机制? execute –> addWorker –>runworker (getTask)
线程池中任务是如何提交的?
线程池中任务是如何关闭的?
在配置线程池的时候需要考虑哪些配置因素?
如何监控线程池的状态?
Executor接口设计
Executor的设计很简单,在理解的时候需要理解两点:
- 1.Tomcat希望将Executor也纳入Lifecycle生命周期管理,所以让它实现了Lifecycle接口
- 2.引入超时机制:也就是说当work queue满时,会等待指定的时间,如果超时将抛出RejectedExecutionException,所以这里增加了一个
void execute(Runnable command, long timeout, TimeUnit unit)
方法; 其实本质上,它构造了JUC中ThreadPoolExecutor,通过它调用ThreadPoolExecutor的void execute(Runnable command, long timeout, TimeUnit unit)
方法。
public interface Executor extends java.util.concurrent.Executor, Lifecycle {
public String getName();
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the <code>Executor</code> implementation. * If no threads are available, it will be added to the work queue. * If the work queue is full, the system will wait for the specified * time until it throws a RejectedExecutionException * * @param command the runnable task * @param timeout the length of time to wait for the task to complete * @param unit the units in which timeout is expressed * * @throws java.util.concurrent.RejectedExecutionException if this task * cannot be accepted for execution - the queue is full * @throws NullPointerException if command or unit is null */
void execute(Runnable command, long timeout, TimeUnit unit);
}
找到Executor的实现类
StandardThreadExecutor的实现
接下来我们看下具体的实现类StandardThreadExecutor。
理解相关配置参数
- 公共属性
Executor的所有实现都 支持以下属性:
属性描述className实现的类。实现必须实现 org.apache.catalina.Executor接口。此接口确保可以通过其name属性引用对象并实现Lifecycle,以便可以使用容器启动和停止对象。className的默认值是org.apache.catalina.core.StandardThreadExecutorname用于在server.xml中的其他位置引用此池的名称。该名称是必需的,必须是唯一的。* StandardThreadExecutor属性
默认实现支持以下属性:
属性描述threadPriority(int)执行程序中线程的线程优先级,默认为 5(Thread.NORM_PRIORITY常量的值)daemon(boolean)线程是否应该是守护程序线程,默认为 truenamePrefix(字符串)执行程序创建的每个线程的名称前缀。单个线程的线程名称将是namePrefix+threadNumbermaxThreads(int)此池中活动线程的最大数量,默认为 200minSpareThreads(int)最小线程数(空闲和活动)始终保持活动状态,默认为 25maxIdleTime(int)空闲线程关闭之前的毫秒数,除非活动线程数小于或等于minSpareThreads。默认值为60000(1分钟)maxQueueSize(int)在我们拒绝之前可以排队等待执行的可运行任务的最大数量。默认值是Integer.MAX_VALUEprestartminSpareThreads(boolean)是否应该在启动Executor时启动minSpareThreads,默认值为 falsethreadRenewalDelay(long)如果配置了ThreadLocalLeakPreventionListener,它将通知此执行程序有关已停止的上下文。上下文停止后,池中的线程将被更新。为避免同时更新所有线程,此选项在任意2个线程的续订之间设置延迟。该值以ms为单位,默认值为1000ms。如果值为负,则不会续订线程。
Lifecycle模板方法
先看核心变量:
// 任务队列
private TaskQueue taskqueue = null;
// 包装了一个ThreadPoolExecutor
protected ThreadPoolExecutor executor = null;
- initInternal和destroyInternal默认父类实现
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
}
@Override
protected void destroyInternal() throws LifecycleException {
super.destroyInternal();
}
- startInternal方法
这个方法中,我们不难看出,就是初始化taskqueue,同时构造ThreadPoolExecutor的实例,后面Tomcat的StandardThreadExecutor的实现本质上通过ThreadPoolExecutor实现的。
/** * Start the component and implement the requirements * of {@link org.apache.catalina.util.LifecycleBase#startInternal()}. * * @exception LifecycleException if this component detects a fatal error * that prevents this component from being used */
@Override
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
- stopInternal方法
代码很简单,关闭线程池后置null, 方便GC回收。
/** * Stop the component and implement the requirements * of {@link org.apache.catalina.util.LifecycleBase#stopInternal()}. * * @exception LifecycleException if this component detects a fatal error * that needs to be reported */
@Override
protected void stopInternal() throws LifecycleException {
setState(LifecycleState.STOPPING);
if (executor != null) {
executor.shutdownNow();
}
executor = null;
taskqueue = null;
}
核心executor方法
本质上就是调用ThreadPoolExecutor的实例的相关方法。
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
if (executor != null) {
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
@Override
public void execute(Runnable command) {
if (executor != null) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
//there could have been contention around the queue
if (!((TaskQueue) executor.getQueue()).force(command)) {
throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
}
}
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
动态调整线程池
我们还注意到StandardThreadExecutor还实现了ResizeableExecutor,从名称上我们就可知道它是希望实现对线程池的动态调整,所以呢,它封装了一个ResizeableExecutor的接口,看下接口。
public interface ResizableExecutor extends Executor {
/** * Returns the current number of threads in the pool. * * @return the number of threads */
public int getPoolSize();
public int getMaxThreads();
/** * Returns the approximate number of threads that are actively executing * tasks. * * @return the number of threads */
public int getActiveCount();
public boolean resizePool(int corePoolSize, int maximumPoolSize);
public boolean resizeQueue(int capacity);
}
前三个方法比较简单,我们看下后两个方法是如何实现的, 其实也很简单。
@Override
public boolean resizePool(int corePoolSize, int maximumPoolSize) {
if (executor == null)
return false;
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maximumPoolSize);
return true;
}
// 默认没有实现
@Override
public boolean resizeQueue(int capacity) {
return false;
}
补充TaskQueue
我们知道工作队列是有TaskQueue保障的,它集成自LinkedBlockingQueue(一个阻塞的链表队列),来看下源代码吧。
/** * As task queue specifically designed to run with a thread pool executor. The * task queue is optimised to properly utilize threads within a thread pool * executor. If you use a normal queue, the executor will spawn threads when * there are idle threads and you wont be able to force items onto the queue * itself. */
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1;
private transient volatile ThreadPoolExecutor parent = null;
// No need to be volatile. This is written and read in a single thread
// (when stopping a context and firing the listeners)
private int forcedRemainingCapacity = -1;
public TaskQueue() {
super();
}
public TaskQueue(int capacity) {
super(capacity);
}
public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
public boolean force(Runnable o) {
if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
// the poll timed out, it gives an opportunity to stop the current
// thread if needed to avoid memory leaks.
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
@Override
public Runnable take() throws InterruptedException {
if (parent != null && parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
// yes, this may return null (in case of timeout) which normally
// does not occur with take()
// but the ThreadPoolExecutor implementation allows this
}
return super.take();
}
@Override
public int remainingCapacity() {
if (forcedRemainingCapacity > DEFAULT_FORCED_REMAINING_CAPACITY) {
// ThreadPoolExecutor.setCorePoolSize checks that
// remainingCapacity==0 to allow to interrupt idle threads
// I don't see why, but this hack allows to conform to this
// "requirement"
return forcedRemainingCapacity;
}
return super.remainingCapacity();
}
public void setForcedRemainingCapacity(int forcedRemainingCapacity) {
this.forcedRemainingCapacity = forcedRemainingCapacity;
}
void resetForcedRemainingCapacity() {
this.forcedRemainingCapacity = DEFAULT_FORCED_REMAINING_CAPACITY;
}
}
TaskQueue这个任务队列是专门为线程池而设计的。优化任务队列以适当地利用线程池执行器内的线程。
如果你使用一个普通的队列,当有空闲线程executor将产生线程并且你不能强制将任务添加到队列。
为什么不是直接使用ThreadPoolExecutor
这里你是否考虑过一个问题,为什么Tomcat会自己构造一个StandardThreadExecutor而不是直接使用ThreadPoolExecutor?
从上面的代码,你会发现这里只是使用executor只是使用了execute的两个主要方法,它希望让调用层屏蔽掉ThreadPoolExecutor的其它方法:
它体现的原则:最少知识原则: 只和你的密友谈话。也就是说客户对象所需要交互的对象应当尽可能少
它体现的设计模式:结构型 - 外观(Facade)
外观模式(Facade pattern),它提供了一个统一的接口,用来访问子系统中的一群接口,从而让子系统更容易使用