13 min to read
Java
学习Java线程
线程的概念
百度百科的解释
- 线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。 线程是程序中一个单一的顺序控制流程。进程内有一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指令运行时的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。
维基百科的解释
- 线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。 线程是独立调度和分派的基本单位。线程可以为操作系统内核调度的内核线程,如Win32线程;由用户进程自行调度的用户线程,如Linux平台的POSIX Thread;或者由内核与用户进程,如Windows 7的线程,进行混合调度。 同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。一个进程可以有很多线程,每条线程并行执行不同的任务。在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率。
线程状态
- 创建(new)状态: 准备好了一个多线程的对象
- 就绪(runnable)状态: 调用了start()方法, 等待CPU进行调度
- 运行(running)状态: 执行run()方法
- 阻塞(blocked)状态: 暂时停止执行, 可能将资源交给其它线程使用
- 终止(dead)状态: 线程销毁
疑问
- 为什么 wait(),notify(),notifyAll()等方法在Object类中,不在Thread中?
- synchronized和Lock区别?
- sleep()与wait()?
- CyclicBarrier 和 CountDownLatch有什么不同?
- 线程池调用shutdown(),线程池所有的线程都会关闭吗?
- 线程池中submit() 和 execute()方法有什么区别?
- Runnable与Callable区别?
- 怎么样结束线程池中的线程?
- 如何在两个线程间共享数据?
- 什么是ThreadLocal变量?
- ConcurrentHashMap原理?
- 什么是CAS,AQS?
- volatile 变量和 atomic 变量有什么不同?
volatile 禁止进行指令重排序,保证了不同线程对这个变量进行操作时的可见性,不保证原子性,如果是写操作,它会导致其他CPU中对应的缓存行无效,它会强制将对缓存的修改操作立即写入主存
- 单例模式的双检锁是什么?
- 如何实现线程的超时中断?
- Servlet属于线程安全的吗?
- 为什么阿里巴巴的Java开发中是不建议甚至禁止使用Java预置线程池?
补充
- 可返回值的任务必须实现Callable接口,类似的,无返回值的任务必须Runnable接口
- synchronized在jdk1.6之后,已经改进优化。synchronized的底层实现主要依靠Lock-Free的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和CAS类似的性能;而线程冲突严重的情况下,性能远高于CAS
线程创建的方式
继承Thread类
package com.thread.demo;
/**
* @Author yudong
* @Date 2018年06月20日 上午10:47
*/
public class ThreadTest extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
System.out.println("线程完成启动");
}
public static void main(String[] args) {
ThreadTest threadTest = new ThreadTest();
threadTest.start();
}
}
- 运行结果
Thread-0
线程完成启动
Process finished with exit code 0
实现Runnable接口
package com.thread.demo;
/**
* @Author yudong
* @Date 2018年06月20日 上午10:54
*/
public class RunnableTest implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
System.out.println("线程完成启动");
}
public static void main(String[] args) {
RunnableTest runnableTest = new RunnableTest();
Thread thread = new Thread(runnableTest);
thread.start();
}
}
- 运行结果
Thread-0
线程完成启动
Process finished with exit code 0
查看Thread源码,会发现Thread是实现了Runnable接口的(JDK1.8源码),如下是Runnable片段
public
class Thread implements Runnable {
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}
private volatile String name;
private int priority;
private Thread threadQ;
private long eetop;
......
}
查看Thread源码,会发现Thread的多种构造方法(JDK1.8源码),如下是Thread片段
/**
* Allocates a new {@code Thread} object. This constructor has the same
* effect as {@linkplain #Thread(ThreadGroup,Runnable,String) Thread}
* {@code (null, null, gname)}, where {@code gname} is a newly generated
* name. Automatically generated names are of the form
* {@code "Thread-"+}<i>n</i>, where <i>n</i> is an integer.
*/
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
/**
* Allocates a new {@code Thread} object. This constructor has the same
* effect as {@linkplain #Thread(ThreadGroup,Runnable,String) Thread}
* {@code (null, target, gname)}, where {@code gname} is a newly generated
* name. Automatically generated names are of the form
* {@code "Thread-"+}<i>n</i>, where <i>n</i> is an integer.
*
* @param target
* the object whose {@code run} method is invoked when this thread
* is started. If {@code null}, this classes {@code run} method does
* nothing.
*/
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
/**
* Creates a new Thread that inherits the given AccessControlContext.
* This is not a public constructor.
*/
Thread(Runnable target, AccessControlContext acc) {
init(null, target, "Thread-" + nextThreadNum(), 0, acc, false);
}
/**
* Allocates a new {@code Thread} object. This constructor has the same
* effect as {@linkplain #Thread(ThreadGroup,Runnable,String) Thread}
* {@code (group, target, gname)} ,where {@code gname} is a newly generated
* name. Automatically generated names are of the form
* {@code "Thread-"+}<i>n</i>, where <i>n</i> is an integer.
*
* @param group
* the thread group. If {@code null} and there is a security
* manager, the group is determined by {@linkplain
* SecurityManager#getThreadGroup SecurityManager.getThreadGroup()}.
* If there is not a security manager or {@code
* SecurityManager.getThreadGroup()} returns {@code null}, the group
* is set to the current thread's thread group.
*
* @param target
* the object whose {@code run} method is invoked when this thread
* is started. If {@code null}, this thread's run method is invoked.
*
* @throws SecurityException
* if the current thread cannot create a thread in the specified
* thread group
*/
public Thread(ThreadGroup group, Runnable target) {
init(group, target, "Thread-" + nextThreadNum(), 0);
}
Thread类实例化后不允许克隆(JDK1.8源码),如下是Thread片段
/**
* Throws CloneNotSupportedException as a Thread can not be meaningfully
* cloned. Construct a new Thread instead.
*
* @throws CloneNotSupportedException
* always
*/
@Override
protected Object clone() throws CloneNotSupportedException {
throw new CloneNotSupportedException();
}
Thread 多次调用 thread.start() 方法会有什么结果
/**
* @Author yudong
* @Date 2018年06月20日 上午10:54
*/
public class RunnableTest implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
System.out.println("线程完成启动");
}
public static void main(String[] args) {
RunnableTest runnableTest = new RunnableTest();
Thread thread = new Thread(runnableTest);
thread.start();
thread.start();
}
}
- 结果
objc[70590]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x10e3994c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10f3c44e0). One of the two will be used. Which one is undefined.
Exception in thread "main" java.lang.IllegalThreadStateException
at java.lang.Thread.start(Thread.java:708)
at com.thread.demo.RunnableTest.main(RunnableTest.java:19)
Thread-0
线程完成启动
- 为什么会有这样的结果,查看Thread类中的start()源码 (JDK1.8源码),如下是Thread片段
/**
* Causes this thread to begin execution; the Java Virtual Machine
* calls the <code>run</code> method of this thread.
* <p>
* The result is that two threads are running concurrently: the
* current thread (which returns from the call to the
* <code>start</code> method) and the other thread (which executes its
* <code>run</code> method).
* <p>
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
*
* @exception IllegalThreadStateException if the thread was already
* started.
* @see #run()
* @see #stop()
*/
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
-
会发现当再次调用的时候会判断这个线程状态,如果这个线程状态不等于0就抛出IllegalThreadStateException异常,如果 threadStatus = 0 表示还未start
-
可以根据自己业务的需要选不同的实现方式,但是现实的开发一般都使用线程池,或者自定义线程池,创建一个线程的代价是十分昂贵的,需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被 清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性
线程池
ThreadPoolExecutor Executors
- Executors提供了一系列工厂方法用于创先线程池
package com.thread.demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author yudong
* @Date 2018年06月20日 下午1:45
*/
public class ExecutorServiceTest {
public static void main(String[] args) {
//创建指定数量的线程池
ExecutorService es = Executors.newFixedThreadPool(10);
//创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
ExecutorService es1 = Executors.newCachedThreadPool();
//创建一个单线程化的Executor,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
ExecutorService es2 = Executors.newSingleThreadExecutor();
//创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类,配置核心线程数量
ExecutorService es3 = Executors.newScheduledThreadPool(10);
//创建持有足够线程的线程池来支持给定的并行级别,并通过使用多个队列,减少竞争,它需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量
ExecutorService es4 = Executors.newWorkStealingPool();
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
System.out.println("线程完成启动");
}
};
es.execute(task);
}
}
ForkJoinPool
-
ForkJoinPool这个类是JDK7后新增的线程池,很适合在单机多核的PC上部署多线程程序,ForkJoinPool使用的分而治之的思想,ForkJoinPool适合在一台PC多核CPU上运行, Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决.
-
没有返回值
package com.thread.demo;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
/**
* 带返回值的
* @Author yudong
* @Date 2018年06月20日 下午2:37
*/
public class ForkJoinPoolTest extends RecursiveAction {
//任务容器,可以为一个对象
private int task = 0;
//构造方法用于传入数据对象
public ForkJoinPoolTest(int task) {
this.task = task;
}
@Override
protected void compute() {
//是否分发的条件,这里是如果任务量大于20,就分解成两个小任务执行
if (task <=20){
System.out.println(Thread.currentThread().getName()+"承担了"+task+"份工作");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
Random m = new Random();
int x = 20;
ForkJoinPoolTest n1 = new ForkJoinPoolTest(x);
ForkJoinPoolTest n2 = new ForkJoinPoolTest(task - x);
n1.fork();
n2.fork();
}
}
public static void main(String[] args) {
//创建一个支持分解任务的线程池ForkJoinPool
ForkJoinPool pool=new ForkJoinPool();
ForkJoinPoolTest n1 = new ForkJoinPoolTest(52);
pool.submit(n1);
pool.awaitQuiescence(20, TimeUnit.SECONDS);
pool.shutdown();
}
}
- 结果
ForkJoinPool-1-worker-3承担了20份工作
ForkJoinPool-1-worker-2承担了20份工作
ForkJoinPool-1-worker-1承担了12份工作
Process finished with exit code 0
- 有返回值
package com.thread.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 计算一组数的总和
* @Author yudong
* @Date 2018年06月20日 下午3:26
*/
public class ForkJoinPool2Test extends RecursiveTask<Integer> {
private List<Integer> number = new ArrayList<>();
public ForkJoinPool2Test(List<Integer> number) {
this.number = number;
}
@Override
protected Integer compute() {
//如果这一组数超过了5个,分发下去
int sum=0;
int size = number.size();
if (size<5){
for (int i =0;i<size;i++){
sum=sum+number.get(i);
}
try {
Thread.sleep(200L);
}catch (Exception e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"求和完了:"+sum);
return sum;
}else{
ForkJoinPool2Test f1 = new ForkJoinPool2Test(number.subList(0,4));
ForkJoinPool2Test f2 = new ForkJoinPool2Test(number.subList(4,number.size()));
f1.fork();
f2.fork();
return f1.join()+f2.join();
}
}
public static void main(String[] args) throws Exception{
//初始化一个list,让里面包含1-100的值
List<Integer> number = new ArrayList<>();
for (int i = 1;i<=100;i++){
number.add(i);
}
Long time = System.currentTimeMillis();
ForkJoinPool2Test f = new ForkJoinPool2Test(number);
//这里面是求和处理
ForkJoinPool pool=new ForkJoinPool(50);
Integer sum = 0;
sum = pool.submit(f).get();
System.out.println("求和:"+sum+" 使用时间"+(System.currentTimeMillis()-time));
pool.shutdown();
}
}
- 结果
ForkJoinPool-1-worker-50求和完了:10
ForkJoinPool-1-worker-8求和完了:58
ForkJoinPool-1-worker-36求和完了:26
ForkJoinPool-1-worker-58求和完了:74
ForkJoinPool-1-worker-44求和完了:90
ForkJoinPool-1-worker-22求和完了:42
ForkJoinPool-1-worker-23求和完了:106
ForkJoinPool-1-worker-37求和完了:122
ForkJoinPool-1-worker-2求和完了:138
ForkJoinPool-1-worker-52求和完了:154
ForkJoinPool-1-worker-38求和完了:170
ForkJoinPool-1-worker-24求和完了:186
ForkJoinPool-1-worker-10求和完了:202
ForkJoinPool-1-worker-60求和完了:218
ForkJoinPool-1-worker-46求和完了:234
ForkJoinPool-1-worker-18求和完了:266
ForkJoinPool-1-worker-32求和完了:250
ForkJoinPool-1-worker-4求和完了:282
ForkJoinPool-1-worker-54求和完了:298
ForkJoinPool-1-worker-40求和完了:314
ForkJoinPool-1-worker-26求和完了:330
ForkJoinPool-1-worker-41求和完了:378
ForkJoinPool-1-worker-50求和完了:394
ForkJoinPool-1-worker-48求和完了:362
ForkJoinPool-1-worker-12求和完了:346
求和:5050 使用时间232
CompletionService
Callable