线程池中真正做事的是 Worker:
/** * Worker类主要维护了要执行的任务和执行任务的线程。继承AbstractQueuedSynchronizer主要是因为能更方便地获取锁。 */private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * 序列化ID */ private static final long serialVersionUID = 6138294804551838833L; /** 执行任务的线程 */ final Thread thread; /** 被执行的任务 */ Runnable firstTask; /** 记录已完成的任务数 */ volatile long completedTasks; /** * 构造方法 */ Worker(Runnable firstTask) { // 禁止中断,直到启动Worker。 // 此操作与void interruptIfStarted()相呼应。 setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 线程启动以后,执行run方法 */ public void run() { // 内部调用runWorker方法执行任务 runWorker(this); } // 锁 // 0:解锁。 // 1:加锁。 // 判断当前Worker是否未解锁 // true:未解锁 // false:解锁 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试加锁 protected boolean tryAcquire(int unused) { // compareAndSetState(0, 1):预期值是0,即解锁;新值是1,即加锁。 // true:加锁成功。 // false:加锁失败。 if (compareAndSetState(0, 1)) { // 设置当前线程独占此锁 setExclusiveOwnerThread(Thread.currentThread()); // 加锁成功 return true; } // 加锁失败 return false; } // 尝试解锁 protected boolean tryRelease(int unused) { // 设置没有如何线程独占此锁 setExclusiveOwnerThread(null); // 解锁 setState(0); // 解锁成功 return true; } // 加锁,即将锁的状态设置为1 public void lock() { acquire(1); } // 尝试加锁,即将锁的状态设置为1 public boolean tryLock() { return tryAcquire(1); } // 解锁,即将锁的状态设置为0 public void unlock() { release(1); } // 是否有线程已拥有此锁?true:有线程独占此锁。false:没有任何线程拥有此锁。 // 当此方法返回true时,表示你可以获取此锁。 public boolean isLocked() { return isHeldExclusively(); } // 中断Worker的线程 void interruptIfStarted() { // 记录即将被中断线程 Thread t; // getState() >= 0:锁的状态要正常。与构造方法中的setState(-1); 操作相呼应。 // t = thread) != null:Worker的线程不能为null。 // !t.isInterrupted():Worker的线程没有被中断过。 // 综上所述,满足以下条件才可以: // 锁的状态要正常,即要么为加锁状态,要么为解锁状态。且线程不能为null,还没有被中断过。 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 中断Worker的线程。 t.interrupt(); } catch (SecurityException ignore) { } } }}
阅读 Worker 类源码时要搞懂 1 个问题和 3 个方法:
1 个问题:
为什么要继承
AbstractQueuedSynchronizer 类?
3 个方法:
答案就一点:为了方便获取锁。
如果不继承
AbstractQueuedSynchronizer 类,Worker 自身没有锁的能力,需要在类中定义 Lock 属性来获取锁。
如果继承
AbstractQueuedSynchronizer 类,Worker 自身拥有锁的能力,不需要再在类中定义 Lock 属性来获取锁。
void interruptIfStarted() 方法是用来中断 Worker 的线程:
// 中断Worker的线程void interruptIfStarted() { // 记录即将被中断线程 Thread t; // getState() >= 0:锁的状态要正常。与构造方法中的setState(-1); 操作相呼应。 // t = thread) != null:Worker的线程不能为null。 // !t.isInterrupted():Worker的线程没有被中断过。 // 综上所述,满足以下条件才可以: // 锁的状态要正常,即要么为加锁状态,要么为解锁状态。且线程不能为null,还没有被中断过。 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 中断Worker的线程。 t.interrupt(); } catch (SecurityException ignore) { } }}
此方法在什么时候被使用过呢?
/** * 中断所有线程。 */private void interruptWorkers() { // 遍历Worker集合 for (Worker w : workers) // 中断Worker的线程 w.interruptIfStarted();}
在调用 List<Runnable> shutdownNow() 方法关闭线程池的时候,内部就会调用 interruptWorkers() 方法,interruptWorkers() 方法内部再去调用 Worker 的 void interruptIfStarted() 方法中断 Worker 的线程。
public List<Runnable> shutdownNow() { ... try { ... // 中断所有线程。 interruptWorkers(); ... } finally { ... } ...}
Worker(Runnable firstTask) 是 Worker 的构造方法:
/** * 构造方法 */Worker(Runnable firstTask) { // 禁止中断,直到启动Worker。 // 此操作与void interruptIfStarted()相呼应。 setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this);}
setState(-1); 这行代码是为了防止线程刚创建就被中断。
即呼应了 void interruptIfStarted() 方法中 if 语句的 getState() >= 0 这个条件。
this.firstTask = firstTask; 这一步在记录 Worker 要执行的任务。
this.thread = getThreadFactory().newThread(this); 到这一步线程池中的线程可以说才是真正被创建出来。
void run() 方法是线程启动后执行的方法:
/** 线程启动以后,执行run方法 */public void run() { // 内部调用runWorker方法执行任务 runWorker(this);}
runWorker(this); 这一步是去执行任务。
runWorker 在下一章分析。
如果大家有任何疑问,请在下方留言或评论。
Java线程池核心(十四):线程池是如何拒绝任务的?
Java线程池核心(十六):任务是如何被执行的?
加入同步学习小组,共同交流与进步。
欢迎加入“人人都是程序员”编程圈子,与圈友一起交流讨论。
原创不易,未经允许不得转载!