进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位。
操作系统在分配资源时是把资源分配给进程的,但是 CPU 资源比较特殊,它是被分配到线程的,因为真正要占用CPU运行的是线程,所以也说线程是 CPU分配的基本单位。
比如在Java中,当我们启动 main 函数其实就启动了一个JVM进程,而 main 函数在的线程就是这个进程中的一个线程,也称主线程。Java程序天生就是多线程程序。
// 通过 JMX 来看一下一个普通的 Java 程序有哪些线程public class MultiThread { public static void main(String[] args) { // 获取 Java 线程管理 MXBean ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); // 不需要获取同步的 monitor 和 synchronizer 信息,仅获取线程和线程堆栈信息 ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); // 遍历线程信息,仅打印线程 ID 和线程名称信息 for (ThreadInfo threadInfo : threadInfos) { System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName()); } }}// 结果表明:一个 Java 程序的运行是 main 线程和多个其他线程同时运行。
并发(concurrency)指的是程序的结构,并行(parallelism)指的是程序运行时的状态。并发设计让并发执行成为可能,而并行是并发执行的一种模式。
从操作系统的角度来看,线程是CPU分配的最小单位。
多线程一定快吗?下面演示串行和并发执行并累加操作的时间,请分析:下面的代码并发执行一定比串行执行快吗?答案:不一定。
public class ConcurrencyTest { private static final long count = 10000l; public static void main(String[] args) throws InterruptedException { concurrency(); serial(); } private static void concurrency() throws InterruptedException { long start = System.currentTimeMillis(); Thread thread = new Thread(new Runnable() { @Override public void run() { int a = 0; for (long i = 0; i < count; i++) { a += 5; } } }); thread.start(); int b = 0; for (long i = 0; i < count; i++) { b--; } long time = System.currentTimeMillis() - start; thread.join(); System.out.println("concurrency :" + time + "ms,b=" + b); } private static void serial() { long start = System.currentTimeMillis(); int a = 0; for (long i = 0; i < count; i++) { a += 5; } int b = 0; for (long i = 0; i < count; i++) { b--; } long time = System.currentTimeMillis() - start; System.out.println("serial:" + time + "ms,b=" + b + ",a=" + a); }}
从表可以发现,当并发执行累加操作不超过百万次时,速度会比串行执行累加操作要慢。那么,为什么并发执行的速度会比串行慢呢?这是因为线程有创建和上下文切换的开销。
1.继承 Thread 类,并重写 run 方法,创建该类对象,调用 start 方法开启线程。
public class ThreadTest { /** * 继承Thread类 */ public static class MyThread extends Thread { @Override public void run() { System.out.println("This is child thread"); } } public static void main(String[] args) { MyThread thread = new MyThread(); thread.start(); }}
2.实现 Runnable 接口,重写 run 方法,创建 Thread 类对象,将 Runnable 子类对象传递给 Thread 类对象。调用 start 方法开启线程。
// 实现 Runnable 接口public class RunnableTask implements Runnable { public void run() { System.out.println("Runnable!"); } public static void main(String[] args) { RunnableTask task = new RunnableTask(); new Thread(task).start(); }}
3.创建 FutureTask 对象,创建 Callable 子类对象,重写 call(相当于 run)方法,将其传递给 FutureTask 对象(相当于一个 Runnable)。创建 Thread 类对象,将 FutureTask 对象传递给 Thread 对象。调用 start 方法开启线程。这种方式可以获得线程执行完之后的返回值。
// 实现Callable接口public class CallerTask implements Callable<String> { public String call() throws Exception { return "Hello,i am running!"; } public static void main(String[] args) { //创建异步任务 FutureTask<String> task=new FutureTask<String>(new CallerTask()); //启动线程 new Thread(task).start(); try { //等待执行完成,并获取返回结果 String result=task.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }}
Runnable、Callable、Future、FutureTask
Runnable接口,它只有一个run()函数,用于将耗时操作写在其中,该函数没有返回值。然后使用某个线程去执行该runnable即可实现多线程,Thread类在调用start()函数后就是执行的是Runnable的run()函数。
Callable与Runnable的功能大致相似,Callable中有一个call()函数,但是call()函数有返回值,而Runnable的run()函数不能将结果返回给客户程序。
Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。get方法会阻塞,直到任务返回结果。
Future(jdk1.5的产物)是一个接口,是无法生成一个实例的,所以又有了FutureTask。FutureTask实现了RunnableFuture接口,RunnableFuture接口又实现了Runnable接口和Future接口。所以FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行。并且还可以直接通过get()函数获取执行结果,该函数会阻塞,直到结果返回。因此FutureTask既是Future、Runnable,又是包装了Callable( 如果是Runnable最终也会被转换为Callable ), 它是这两者的合体。
CompletableFuture(jdk1.8)是Future的加强版。Future 是一种阉割版的异步模式。
4.基于线程池构建线程
为什么我们调用start()方法时会执行run()方法,为什么我们不能直接调用run()方法?
JVM执行start方法,会另起一条线程执行thread的run方法,这才起到多线程的效果,如果直接调用Thread的run()方法,其方法还是运行在主线程中,相当于顺序执行,没有起到多线程效果。
在Object类中有一些函数可以用于线程的等待与通知。
wait():当一个线程A调用一个共享变量的 wait()方法时, 线程A会被阻塞挂起, 发生下面几种情况才会返回 :(1) 线程A调用了共享对象 notify()或者 notifyAll()方法;(2)其他线程调用了线程A的 interrupt() 方法,线程A抛出InterruptedException异常返回。
wait(long timeout) :这个方法相比 wait() 方法多了一个超时参数,它的不同之处在于,如果线程A调用共享对象的wait(long timeout)方法后,没有在指定的 timeout ms时间内被其它线程唤醒,那么这个方法还是会因为超时而返回。
wait(long timeout, int nanos),其内部调用的是 wait(long timout)函数。
Thread类也提供了一个方法用于等待的方法:
join():如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。
上面是线程等待的方法,而唤醒线程主要是下面两个方法:
notify() : 一个线程A调用共享对象的 notify() 方法后,会唤醒一个在这个共享变量上调用 wait 系列方法后被挂起的线程。 一个共享变量上可能会有多个线程在等待,具体唤醒哪个等待的线程是随机的。
notifyAll() :不同于在共享变量上调用 notify() 函数会唤醒被阻塞到该共享变量上的一个线程,notifyAll()方法则会唤醒所有在该共享变量上由于调用 wait 系列方法而被挂起的线程
yield() :Thread类中的静态方法,当一个线程调用 yield 方法时,实际就是在暗示线程调度器当前线程请求让出自己的CPU ,但是线程调度器可以无条件忽略这个暗示。
Java 中的线程中断是一种线程间的协作模式,通过设置线程的中断标志并不能直接终止该线程的执行,而是被中断的线程根据中断状态自行处理。
void interrupt() :中断线程,例如,当线程A运行时,线程B可以调用钱程interrupt() 方法来设置线程的中断标志为true 并立即返回。设置标志仅仅是设置标志, 线程A实际并没有被中断, 会继续往下执行。
boolean isInterrupted() 方法: 检测当前线程是否被中断。
boolean interrupted() 方法: 检测当前线程是否被中断,与 isInterrupted 不同的是,该方法如果发现当前线程被中断,则会清除中断标志。
sleep(long millis) :Thread类中的静态方法,当一个执行中的线程A调用了Thread 的sleep方法后,线程A会暂时让出指定时间的执行权,但是线程A所拥有的监视器资源,比如锁还是持有不让出的。指定的睡眠时间到了后该函数会正常返回,接着参与 CPU 的调度,获取到 CPU 资源后就可以继续运行。
在Java中,线程共有六种状态:
状态 | 说明 |
NEW | 初始状态:线程被创建,但还没有调用start()方法 |
RUNNABLE | 运行状态:Java线程将操作系统中的就绪和运行两种状态笼统的称作“运行” |
BLOCKED | 阻塞状态:表示线程阻塞于锁 |
WAITING | 等待状态:表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断) |
TIME_WAITING | 超时等待状态:该状态不同于 WAITIND,它是可以在指定的时间自行返回的 |
TERMINATED | 终止状态:表示当前线程已经执行完毕 |
两个线程交换数据通过 JDK 中的
java.util.concurrent.Exchanger 类来实现的,Exchanger 就是线程之间的数据交换器,只能用于两个线程之间的数据交换。一个线程开启数据交换之后,会阻塞直到其他任意线程同样开启数据交换达到交换点。如果没有对应的线程交换就会一直阻塞,可设置超时,可以中断。
// 示例1:简单数据交换exchange(V);private static void test1() { Exchanger exchanger = new Exchanger(); new Thread(() -> { try { Object data = "-公众号Java技术栈AAA"; System.out.println(Thread.currentThread().getName() + data); // 开始交换数据 data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + data); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Object data = "-公众号Java技术栈BBB"; System.out.println(Thread.currentThread().getName() + data); // 开始交换数据 data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + data); } catch (InterruptedException e) { e.printStackTrace(); } }).start();}// 示例2:exchange(data, 3000L, TimeUnit.MILLISECONDS);// 启动3秒后,并没有其他线程和它交换数据,所以抛出了超时异常,最后线程结束运行。private static void test2() { Exchanger exchanger = new Exchanger(); new Thread(() -> { try { Object data = "-公众号Java技术栈AAA"; System.out.println(Thread.currentThread().getName() + data); // 开始交换数据 data = exchanger.exchange(data, 3000L, TimeUnit.MILLISECONDS); System.out.println(Thread.currentThread().getName() + data); } catch (Exception e) { e.printStackTrace(); } }).start();}// 示例3:默认情况下不带超时设置会一直阻塞运行中……设置中断。private static void test3() throws InterruptedException { Exchanger exchanger = new Exchanger(); Thread thread = new Thread(() -> { try { Object data = "-公众号Java技术栈AAA"; System.out.println(Thread.currentThread().getName() + data); // 开始交换数据 data = exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + data); } catch (Exception e) { e.printStackTrace(); } }); thread.start(); // 线程中断 Thread.sleep(3000L); thread.interrupt();}
使用多线程的目的是为了充分利用CPU,但是我们知道,并发其实是一个CPU来应付多个线程。为了让用户感觉多个线程是在同时执行的,CPU 资源的分配采用了时间片轮转也就是给每个线程分配一个时间片,线程在时间片内占用 CPU 执行任务。当线程使用完时间片后,就会处于就绪状态并让出 CPU 让其他线程占用,这就是上下文切换。
Java中的线程分为两类,分别为 daemon 线程(守护线程)和 user 线程(用户线程)。
在JVM 启动时会调用 main 函数,main函数所在的钱程就是一个用户线程。其实在 JVM 内部同时还启动了很多守护线程, 比如垃圾回收线程。
那么守护线程和用户线程有什么区别呢?区别之一是当最后一个非守护线程束时, JVM会正常退出,而不管当前是否存在守护线程,也就是说守护线程是否结束并不影响 JVM退出。换而言之,只要有一个用户线程还没结束,正常情况下JVM就不会退出。
关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。
关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
可以通过Java内置的等待/通知机制(wait()/notify())实现一个线程修改一个对象的值,而另一个线程感知到了变化,然后进行相应的操作。
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、 PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。
如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(long millis,int nanos)两个具备超时特性的方法。保证线程的顺序执行的方法:①可使用Thread.join();②使用JDK自带的Excutors类的newSingleThreadExecutor方法,创建一个单线程的线程池,使用LinkedBlockingQueue作为队列,而此队列按 FIFO(先进先出)排序元素。③使用CountDownLatch工具类,它是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。
public class ThreadTest {public static void main(String[] args) throws InterruptedException {join();newSingleThreadExecutor();countDownLatch();}// Thread.join()private static void join() throws InterruptedException {Thread thread1 = new Thread(() -> System.out.println("a"));Thread thread2 = new Thread(() -> System.out.println("b"));Thread thread3 = new Thread(() -> System.out.println("c"));thread1.start();thread1.join();thread2.start();thread2.join();thread3.start();}// newSingleThreadExecutorprivate static void newSingleThreadExecutor() {ExecutorService executorService = Executors.newSingleThreadExecutor();Thread thread1 = new Thread(() -> System.out.println("a"));Thread thread2 = new Thread(() -> System.out.println("b"));Thread thread3 = new Thread(() -> System.out.println("c"));executorService.submit(thread1);executorService.submit(thread2);executorService.submit(thread3);executorService.shutdown();}//countDownLatchprivate static void countDownLatch() throws InterruptedException {CountDownLatch latch1 = new CountDownLatch(0);CountDownLatch latch2 = new CountDownLatch(1);CountDownLatch latch3 = new CountDownLatch(1);Thread thread1 = new Thread(new TestRunnable(latch1, latch2, "a"));Thread thread2 = new Thread(new TestRunnable(latch2, latch3, "b"));Thread thread3 = new Thread(new TestRunnable(latch3, latch3, "c"));thread1.start();thread2.start();thread3.start();}}class TestRunnable implements Runnable {private CountDownLatch latch1;private CountDownLatch latch2;private String message;TestRunnable(CountDownLatch latch1, CountDownLatch latch2, String message) {this.latch1 = latch1;this.latch2 = latch2;this.message = message;}@Overridepublic void run() {try {latch1.await();System.out.println(message);} catch (InterruptedException e) {e.printStackTrace();}latch2.countDown();}}
ThreadLocal,也就是线程本地变量。如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个本地拷贝,多个线程操作这个变量的时候,实际是操作自己本地内存里面的变量,从而起到线程隔离的作用,避免了线程安全问题。
//创建一个ThreadLocal变量static ThreadLocal<String> localVariable = new ThreadLocal<>();
使用Condition控制线程通信: jdk1.5中,提供了多线程的升级解决方案为:(1)将同步synchronized替换为显式的Lock操作;(2)将Object类中的wait(), notify(),notifyAll()替换成了Condition对象,该对象可以通过Lock锁对象获取;(3)一个Lock对象上可以绑定多个Condition对象,这样实现了本方线程只唤醒对方线程,而jdk1.5之前,一个同步只能有一个锁,不同的同步只能用锁来区分,且锁嵌套时容易死锁。
BlockingQueue是一个接口,也是Queue的子接口。BlockingQueue具有一个特征:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则线程被阻塞;但消费者线程试图从BlockingQueue中取出元素时,如果队列已空,则该线程阻塞。程序的两个线程通过交替向BlockingQueue中放入元素、取出元素,即可很好地控制线程的通信。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest{ public static void main(String[] args)throws Exception{ //创建一个容量为1的BlockingQueue BlockingQueue b=new ArrayBlockingQueue(1); //启动3个生产者线程 new Producer(b).start(); new Producer(b).start(); new Producer(b).start(); //启动一个消费者线程 new Consumer(b).start(); }}class Producer extends Thread{ private BlockingQueue b; public Producer(BlockingQueue b){ this.b=b; } public synchronized void run(){ String [] str=new String[]{"java","struts","Spring"}; for(int i=0;i< str.length;i++){ System.out.println(getName()+"生产者准备生产集合元素!"); try{ b.put(str[i%3]); sleep(1000); //尝试放入元素,如果队列已满,则线程被阻塞 }catch(Exception e){System.out.println(e);} System.out.println(getName()+"生产完成:"+b); } }}class Consumer extends Thread{ private BlockingQueue b; public Consumer(BlockingQueue b){ this.b=b; } public synchronized void run(){ while(true){ System.out.println(getName()+"消费者准备消费集合元素!"); try{ sleep(1000); //尝试取出元素,如果队列已空,则线程被阻塞 b.take(); }catch(Exception e){System.out.println(e);} System.out.println(getName()+"消费完:"+b); } }}
线程同步
java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查),将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。
Java同步机制有4种实现方式: ① ThreadLocal ② synchronized( ) ③ wait() 与 notify() ④ volatile 目的:都是为了解决多线程中的对同一变量的访问冲突。
ThreadLocal 类的目的是为每个线程单独维护一个变量的值,避免线程间对同一变量的竞争访问,适用于一个变量在每个线程中需要有自己独立的值的场合。
如果把 ThreadLocal 声明为非静态,则在含有 ThreadLocal 变量的的每个实例中都会产生一个新对象,这是毫无意义的,只是增加了内存消耗。(ThreadLocal 类型变量为何声明为 static?)
ThreadLocal和Synchonized都用于解决多线程并发访问。可是ThreadLocal与synchronized有本质的差别。synchronized是利用锁的机制,使变量或代码块在某一时该仅仅能被一个线程访问。而ThreadLocal为每个线程都提供了变量的副本,使得每个线程在某一时间访问到的并非同一个对象,这样就隔离了多个线程对数据的数据共享。而Synchronized却正好相反,它用于在多个线程间通信时可以获得数据共享。
Synchronized用于线程间的数据共享,而ThreadLocal则用于线程间的数据隔离。
线程隔离特性,只有在线程内才能获取到对应的值,线程外不能访问。
(1)Synchronized是通过线程等待,牺牲时间来解决访问冲突。
(1)ThreadLocal是通过每个线程单独一份存储空间,牺牲空间来解决冲突。
ThreadLocalMap 是每个线程私有的。ThreadLocal 是线程 Thread 中属性 threadLocals 的管理者(
ThreadLocal.ThreadLocalMap threadLocals = null;),key 值就是指向 ThreadLocal 的弱引用,而 value 值这是我们设置的值。
1.set 给 ThreadLocalMap 设置值。
2.get 获取 ThreadLocalMap。
3.remove 删除 ThreadLocalMap 类型的对象。
用途:数据库连接,数据源的切换、Session 管理、Spring 提供的声明式事务。
原理:
ThreadLocalMap 内部实际上是一个 Entry 数组,Entry 的 key 使用的是 ThreadLocal 对象的弱引用,在没有其他地方对 ThreadLocal 依赖,ThreadLocalMap 中的 ThreadLocal 对象就会被回收掉,但是对应的 value 不会被回收,这个时候 Map 中就可能存在 key 为null但是 value 不为null的项,这需要实际的时候使用完毕及时调用 remove 方法避免内存泄漏。(Memory Leak--内存泄漏是指程序中已动态分配的堆内存由于某种原因程序未释放或者无法是否,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果。)
简单来说,就是因为 ThreadLocalMap 的 key 是弱引用,当 ThreadLocal 外部没有强引用时,就被回收,此时会出现 ThreadLocalMap<null,value> 的情况,而线程没有结束的情况下,导致这个 null 对应的 value 一直无法回收,可能导致泄漏。
ThreadLocal 内存泄漏的根源是:由于 ThreadLocalMap 的生命周期跟 Thread 一样长,如果没有手动删除对应 key 就会导致内存泄漏,而不是因为弱引用。
解决方法:
①ThreadLocal申明为private static final,private与final 尽可能不让他人修改变更引用,Static 表示为类属性,只有在程序结束才会被回收。
②ThreadLocal使用后务必调用remove方法。
思考:为什么key还要设计成弱引用?
key设计成弱引用同样是为了防止内存泄漏。假如key被设计成强引用,如果ThreadLocal Reference被销毁,此时它指向ThreadLoca的强引用就没有了,但是此时key还强引用指向ThreadLoca,就会导致ThreadLocal不能被回收,这时候就发生了内存泄漏的问题。
ThreadLocalMap虽被叫做Map,但它没有实现Map接口。其结构和HashMap比较类似,只是HashMap由数组+链表实现,而ThreadLocalMap中并没链表结构。主要关注的是两个要素:元素数组和散列方法。
这里的threadLocalHashCode计算有点东西,每创建一个ThreadLocal对象,它就会新增0x61c88647,这个值很特殊,它是斐波那契数也叫黄金分割数。hash增量为这个数字,带来的好处就是 hash 分布非常均匀。
HashMap使用了链表来解决冲突,也就是所谓的链地址法。
ThreadLocalMap没有使用链表,自然也不是用链地址法来解决冲突了,它用的是另外一种方式开放定址法。简单来说,就是这个坑被人占了,那就接着去找空着的坑。开放地址法下不同的解决冲突方案:线性探测法、平方探测法、双散列。
如上图所示,如果我们插入一个value=27的数据,通过 hash计算后应该落入第 4 个槽位中,而槽位 4 已经有了 Entry数据,而且Entry数据的key和当前不相等。此时就会线性向后查找,一直找到 Entry为 null的槽位才会停止查找,把元素放到空的槽中。
在get的时候,也会根据ThreadLocal对象的hash值,定位到table中的位置,然后判断该槽位Entry对象中的key是否和get的key一致,如果不一致,就判断下一个位置。
ThreadLocalMap 处理哈希冲突时使用的是线性探测法,因此删除 key 的时候不能直接简单把 entry 置为 null; 它采用的方法是把后续每个不为 null 的 entry 进行 rehash,放在合适的位置,保证不会因为删除导致线性探测失效中断。
ThreadLocalMap 在进行扩容之前会先进行清理工作,有两种清除方式:
探测式清理:是以当前遇到的 GC 元素开始,向后不断的清理。直到遇到 null 为止。
private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // 首先将 tab[staleSlot] 槽位的数据清空 // 然后设置 然后设置 size-- tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; // 以 staleSlot 位置往后迭代 for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // 如果遇到 key == null 的 过期数据,也是清空该槽位数据,然后 size-- if (k == null) { e.value = null; tab[i] = null; size--; } else { // 如果 key != null 表示 key 没有过期,重新计算当前 key 的下标位置是不是当前槽位下标位置 // 如果不是 h != i ,那么说明产生了 hash 冲突 ,此时以新计算出来正确的槽位位置往后迭代 // 找到最后一个存放 entry 的位置 int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. /** * 这段话提及了 Knuth 的 R 算法 我们和 R 算法的不同 * 我们必须扫描到 null,因为可能多个条目可能过期 * ThreadLocal 使用了弱引用,即有多种状态,(已回收、未回收)所以不能安全按照 R 算法实现 */ while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i;}
测式清理结束后,数组中过期的元素应该会被部分清除,而且之前发生 Hash 冲突 的 Entry 元素的位置应该更接近真实 hash 出来的位置。提升了查找的效率,这里探测式清理并不能全部清除数组中的过期元素,而是从传入的下标清理到第一个 Entry==null 为止。部分清除。其余的部分,需要通过启发式清除。
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; // do while 循环 循环中不断的右移进行寻找被清理的过期元素 // 最终都会使用 expungeStaleEntry 进行处理 do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed;}
注:ThreadLocal 调用 set(),get(),remove() 都会对 key = null 进行清除 value 操作。
在ThreadLocalMap.set()方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中Entry的数量已经达到了列表的扩容阈值(len*2/3),就开始执行rehash()逻辑:
if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash();// ThreadLocalMap 的初始容量是 16private static final int INITIAL_CAPACITY = 16;private void setThreshold(int len) { threshold = len * 2 / 3;}/*** 先去清理过期的Entry,* 然后还要根据条件判断size >= threshold - threshold / 4 * 也就是size >= threshold* 3/4来决定是否需要扩容。 */private void rehash() { //清理过期Entry expungeStaleEntries(); //扩容 if (size >= threshold - threshold / 4) resize();}//清理过期Entryprivate void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); }}
全面清理结束之后,会进一步判断数组的长度是否满足 size >= threshold - threshold / 4,也就是说,扩容前真正的阈值判断是 len * 2/3 * 3/4,也就是阈值真正的值是数组长度的 1/2。
resize()方法,扩容后的newTab的大小为老数组的两倍,然后遍历老的table数组,散列方法重新计算位置,开放地址解决冲突,然后放到新的newTab,遍历完成之后,oldTab中所有的entry数据都已经放入到newTab中了,然后table引用指向newTab。
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab;}
ThreadLocalMap 的 set 方法通过调用 replaceStaleEntry 方法(其内部也是调用了启发式清除 和 探测式清除)回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏
private void set(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } ...
应该在我们不使用的时候,主动调用 remove 方法进行清理。
try { // 其它业务逻辑} finally { threadLocal 对象。remove();}
ThreadLocal 固然很好,但是子线程并不能取到父线程的 ThreadLocal 的变量,父线程不能用ThreadLocal来给子线程传值,这时候可以用到另外一个类——InheritableThreadLocal 。JDK的 InheritableThreadLocal 类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把任务提交给线程池时的ThreadLocal值传递到任务执行时。
public class InheritableThreadLocalTest { public static void main(String[] args) { final ThreadLocal threadLocal = new InheritableThreadLocal(); // 主线程 threadLocal.set("不擅技术"); //子线程 Thread t = new Thread() { @Override public void run() { super.run(); System.out.println("鄙人三某 ," + threadLocal.get()); } }; t.start(); }}/** 原理很简单,在Thread类里还有另外一个变量:* ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;* 在Thread.init的时候,如果父线程的inheritableThreadLocals不为空,* 就把它赋给当前线程(子线程)的inheritableThreadLocals 。*/if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
线程池满了会怎么样
jdk1.6之前,synchronized是基于底层操作系统的Mutex Lock实现的,每次获取和释放锁都会带来用户态和内核态的切换,从而增加系统的性能开销。在锁竞争激烈的情况下,synchronized同步锁的性能很糟糕。JDK 1.6,Java对synchronized同步锁做了充分的优化,引入了偏向锁和轻量级锁。锁一共有4中状态:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几种状态会随着竞争情况逐渐升级,但不能降级,目的是为了提高锁和释放锁的效率。优化后在某些场景下性能已经超越了Lock同步锁。
synchronized的实现原理
ObjectMonitor() { _header = NULL; _count = 0; // 记录线程获取锁的次数 _waiters = 0, _recursions = 0; //锁的重入次数 _object = NULL; _owner = NULL; // 指向持有ObjectMonitor对象的线程 _WaitSet = NULL; // 处于wait状态的线程,会被加入到_WaitSet _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; // 处于等待锁block状态的线程,会被加入到该列表 _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; }
synchronized 底层对应的 JVM 模型为 objectMonitor,使用了3个双向链表来存放被阻塞的线程:_cxq(Contention queue)、_EntryList(EntryList)、_WaitSet(WaitSet)。
当线程获取锁失败进入阻塞后,首先会被加入到 _cxq 链表,_cxq 链表的节点会在某个时刻被进一步转移到 _EntryList 链表。
当持有锁的线程释放锁后,_EntryList 链表头结点的线程会被唤醒,该线程称为 successor(假定继承者),然后该线程会尝试抢占锁。(被唤醒的线程只是有机会去竞争锁,需要竞争但可能会失败,所以该线程并不一定会成为锁的"继承者",所以称为假定继承者,这也是synchronized 为非公平锁的一个原因)
当我们调用 wait() 时,线程会被放入 _WaitSet,直到调用了 notify()/notifyAll() 后,线程才被重新放入 _cxq 或 _EntryList,默认放入 _cxq 链表头部。
objectMonitor 的整体流程如下图:
synchronized的加锁场景:
1)作用于非静态方法,锁住的是对象实例(this),每一个对象实例有一个锁。
public synchronized void method() {}
2)作用于静态方法,锁住的是类的 Class 对象,Class 对象全局只有一份,因此静态方法锁相当于类的一个全局锁,会锁所有调用该方法的线程。
public static synchronized void method() {}
3)作用于 Lock.class,锁住的是 Lock 的 Class 对象,也是全局只有一个。
synchronized (Lock.class) {}
4)作用于 this,锁住的是对象实例,每一个对象实例有一个锁。
synchronized (this) {}
5)作用于静态成员变量,锁住的是该静态成员变量对象,由于是静态变量,因此全局只有一个。
public static Object monitor = new Object(); synchronized (monitor) {}
实例:
/***从底层看,当线程进入synchronized时,需要获取lock锁,但在调用lock.wait()的时候,*此时虽然线程还在synchronized块里,但是其实已经释放掉了lock锁。*所以,其他线程此时可以获取lock锁进入到synchronized块,从而去执行lock.notify()。*/public class SynchronizedTest { private static final Object lock = new Object(); public static void testWait() throws InterruptedException { synchronized (lock) { // 阻塞住,被唤醒之前不会输出aa,也就是还没离开synchronized lock.wait(); System.out.println("aa"); } } public static void testNotify() throws InterruptedException { synchronized (lock) { lock.notify(); System.out.println("bb"); } }}
1、如果有多个线程都进入 wait 状态,那某个线程调用 notify 唤醒线程时是否按照进入 wait 的顺序去唤醒?
synchronized 是非公平锁,不会按照顺序去唤醒。调用 wait 时,节点进入_WaitSet 链表的尾部。调用 notify 时,根据不同的策略,节点可能被移动到 cxq 头部、cxq 尾部、EntryList 头部、EntryList 尾部等多种情况。所以,唤醒的顺序并不一定是进入 wait 时的顺序。
2、notifyAll 是怎么实现全唤起的?
nofity 是获取 WaitSet 的头结点,执行唤起操作。nofityAll 的流程,可以简单的理解为就是循环遍历 WaitSet 的所有节点,对每个节点执行 notify 操作。
3、synchronized 锁能降级吗?
答案是可以的。具体的触发时机:在全局安全点(safepoint)中,执行清理任务的时候会触发尝试降级锁。当锁降级时,主要进行了以下操作:
1)恢复锁对象的 markword 对象头;
2)重置 ObjectMonitor,然后将该 ObjectMonitor 放入全局空闲列表,等待后续使用。
synchronized和ReentrantLock的区别
ReentrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。synchronized与wait()和notify()/notifyAll()方法结合实现等待/通知机制,ReentrantLock类借助Condition接口与newCondition()方法实现。
sychronized不支持中断,即调用thread.interrupt()方法无效果。为了解决sychronized无法中断问题,AQS实现了中断支持;sychronized不支持超时。
接口: Executor
Executor接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。
ExecutorService
ExecutorService继承自Executor接口,ExecutorService提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其停止接受新任务。关闭后,执行程序将最后终止,这时没有任务在执行,也没有任务在等待执行,并且无法提交新任务。
ScheduledExecutorService
ScheduledExecutorService继承自ExecutorService接口,可安排在给定的延迟后运行或定期执行的命令。
AbstractExecutorService
AbstractExecutorService继承自ExecutorService接口,其提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。
FutureTask
FutureTask 为 Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。FutureTask 常用来封装 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证。
核心: ThreadPoolExecutor
ThreadPoolExecutor实现了AbstractExecutorService接口,也是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题: 由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
核心: ScheduledThreadExecutor
ScheduledThreadPoolExecutor实现ScheduledExecutorService接口,可安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。
核心: Fork/Join框架
ForkJoinPool 是JDK 7加入的一个线程池类。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。
工具类: Executors
Executors是一个工具类,用其可以创建ExecutorService、ScheduledExecutorService、ThreadFactory、Callable等对象。它的使用融入到了ThreadPoolExecutor, ScheduledThreadExecutor和ForkJoinPool中。
核心线程大小(corePoolSize)、线程池最大线程数量(maximumPoolSize),空闲线程存活时间(keepAliveTime)、空间线程存活时间单位(unit)、工作队列(workQueue)、拒绝策略(handler)、线程工厂(ThreadFactory)这7个参数。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}
判断线程池的状态,如果不是RUNNING状态,直接执行拒绝策略
如果当前线程数 < corePoolSize,则新建一个线程来处理提交的任务
如果当前线程数 >= corePoolSize 且任务队列没满,则将任务放入阻塞队列(BlockingQueue)等待执行
如果corePoolSize < 当前线程池数 < maximumPoolSize,且任务队列已满,则创建新的线程执行提交的任务
如果当前线程数 >= maximumPoolSize,且队列已满,则执行拒绝策略拒绝该任务。
这个执行流程是 JUC 标准线程池提供的执行流程,主要用在 CPU 密集型场景下。
像 Tomcat、Dubbo 这类框架,他们内部的线程池主要用来处理网络 IO 任务的,所以他们都对 JUC 线程池的执行流程进行了调整来支持 IO 密集型场景使用。他们提供了阻塞队列 TaskQueue,该队列继承 LinkedBlockingQueue,重写了 offer() 方法来实现执行流程的调整。
Tomcat并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer() 方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:
判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务;
如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务;
如果当前线程数等于最大线程数,则将任务放入任务队列等待执行;
如果队列已满,则执行拒绝策略;
ThreadPoolExecutor.AbortPolicy:默认拒绝策略,丢弃任务并抛出
RejectedExecutionException
异常;
ThreadPoolExecutor.DiscardPolicy:丢弃任务但不抛出异常;
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交倍拒绝的任务;
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务;
不允许使用 Executors 创建线程池,而是通过 ThreadPoolExecutor 显示指定参数去创建。
Executors.newFixedThreadPool 和
Executors.SingleThreadPool 创建的线程池内部使用的是无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM。
Executors.newCachedThreadPool 和
Executors.scheduledThreadPool 创建的线程池最大线程数是用的Integer.MAX_VALUE,可能会创建大量线程,导致 OOM。
Spring 环境中使用线程池,直接使用 JUC 原生 ThreadPoolExecutor,Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。所以最好不要直接使用 ThreadPoolExecutor 在 Spring 环境中,可以使用 Spring 提供的 ThreadPoolTaskExecutor,或者 DynamicTp 框架提供的 DtpExecutor 线程池实现。