并发工具

线程池

概述

对于线程池与数据库连接池一样,都利用了池化技术来优化频繁创建线程/连接所带来的性能损耗。并且可以控制线程数来减小上下文切换的性能消耗。

线程池有两个比较重要的参数:coreThreadCountmaxThreadCount

  • coreThreadCount:线程池中的核心线程数。
  • maxThreadCount:最大线程数。

分配策略:

  • 如果线程池中的线程数少于 coreThreadCount 时,会创建新的线程处理,并将线程放入线程池;
  • 如果线程数大于 coreThreadCount 则把任务丢到一个队列里面,由当前空闲的线程执行;
  • 当队列中的任务堆积满了的时候,则继续创建线程,直到达到 maxThreadCount;
  • 当线程数达到 maxTheadCount 时还有新的任务提交,那么我们就不得不将它们丢弃了。

可以看出JDK 原生的线程池当线程数大于 coreThreadCount 不会立马创建线程,而是放入队列;因此只需要创建和 CPU 核心数相当的线程就好了,多了反而会造成线程上下文切换,降低任务执行效率。(但是这是对于CPU 密集型的任务,对于IO操作多的任务来说立马创建线程分配可以大大提高系统的吞吐量)

注意事项:

  • coreThreadCount 和 maxThreadCount 不宜设置得比较小,会导致任务在线程池里面大量的堆积
  • 一定记住不要使用无界队列(即没有设置固定大小的队列),大量的任务堆积会占用大量的内存空间,一旦内存空间被占满就会频繁地触发 Full GC,造成服务不可用

问题:

但是,我们平时开发的 Web 系统通常都有大量的 IO 操作,比方说查询数据库、查询缓存等等。任务在执行 IO 操作的时候 CPU 就空闲了下来,这时如果增加执行任务的线程数而不是把任务暂存在队列中,就可以在单位时间内执行更多的任务,大大提高了任务执行的吞吐量。

所以Tomcat 使用的线程池就不是 JDK 原生的线程池,而是做了一些改造,当线程数超过 coreThreadCount 之后会优先创建线程,直到线程数到达 maxThreadCount,这样就比较适合于 Web 系统大量 IO 操作的场景了

自定义线程池

基本结构:

image-20230420212402982

  • 阻塞队列:任务队列放置任务等待空闲线程执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    class BlockingQueue<T> {

    /**
    * 1.用双端链表定义任务队列
    */
    private Deque<T> queue = new ArrayDeque<>();

    /**
    * 2.锁,防止多个线程取走/放置同一个任务
    */
    private ReentrantLock lock = new ReentrantLock();

    /**
    * 3.生产者条件变量
    */
    private Condition fullWaitSet = lock.newCondition();
    /**
    * 3.消费者条件变量
    */
    private Condition emptyWaitSet = lock.newCondition();

    /**
    * 容量
    */
    private int capacity;

    /**
    * 阻塞获取
    */
    public T take(){
    lock.lock();
    try {
    // 队列为空,等待
    while (queue.isEmpty()){
    try {
    emptyWaitSet.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    fullWaitSet.signal();
    return queue.removeFirst();
    }finally {
    lock.unlock();
    }
    }
    /**
    * 带超时的阻塞获取
    */
    public T poll(long timeout, TimeUnit unit){
    lock.lock();
    try {
    long nanos = unit.toNanos(timeout);
    // 队列为空,等待
    while (queue.isEmpty()){
    try {
    // 防止虚假唤醒
    if(nanos <= 0 ){
    return null;
    }
    nanos = emptyWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    fullWaitSet.signal();
    return queue.removeFirst();
    }finally {
    lock.unlock();
    }
    }

    /**
    * 阻塞添加
    */
    public void put(T element){
    lock.lock();
    try {
    // 队列满了,等待
    while (queue.size() == capacity){
    try {
    fullWaitSet.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    queue.addLast(element);
    emptyWaitSet.signal();
    }finally {
    lock.unlock();
    }
    }

    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    lock.lock();
    try {
    long nanos = timeUnit.toNanos(timeout);
    while (queue.size() == capacity) {
    try {
    if(nanos <= 0) {
    return false;
    }
    nanos = fullWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    queue.addLast(task);
    emptyWaitSet.signal();
    return true;
    } finally {
    lock.unlock();
    }
    }

    /**
    * 获取大小
    * @return
    */
    public int size() {
    lock.lock();
    try {
    return queue.size();
    }finally {
    lock.unlock();
    }
    }
    }
  • 线程池:用于存放线程,并消费阻塞队列中的任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    class ThreadPool{
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet();

    // 核心线程数
    private int coreSize;

    // 获取任务的超时时间
    private long timeout;
    // 超时时间单位
    private TimeUnit timeUnit;

    // 执行任务
    public void execute(Runnable task){
    // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
    // 如果任务数超过 coreSize 时,加入任务队列暂存
    synchronized (workers){
    // 需要保证workers的线程安全
    if(workers.size() < coreSize){
    // 创建线程进行处理
    Worker worker = new Worker(task);
    // 并且添加到线程池中
    workers.add(worker);
    // 执行任务
    worker.start();
    }else{
    taskQueue.put(task);
    }
    }
    }


    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {
    this.coreSize = coreSize;
    this.timeout = timeout;
    this.timeUnit = timeUnit;
    this.taskQueue = new BlockingQueue<>(queueCapacity);
    }

    class Worker extends Thread{
    private Runnable task;

    public Worker(Runnable task) {
    this.task = task;
    }

    @Override
    public void run() {
    // 执行任务
    // 当task不为空,执行该任务
    // 当task执行完毕,从任务队列获取任务
    while (task != null || (task = taskQueue.take()) != null){
    // 如果需要加线程空闲时间while (task != null || (task = taskQueue.poll(timeout,timeUit)) != null){
    try {
    task.run();
    }catch (Exception e){
    e.printStackTrace();
    }finally {
    task = null;
    }
    }
    synchronized (workers){
    workers.remove(this);
    }
    }
    }
    }
  • 测试:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static void main(String[] args) {
    ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
    for (int i = 0; i < 5; i++) {
    int j = i;
    threadPool.execute(() -> {
    System.out.println(j);
    });
    }
    }

改进:当任务队列满了的时候,会有很多方案,例如:死等、超时等待、放弃执行、自己开线程执行等;如果放在线程池中一一实现就不灵活。不如给调用者提供一个拒绝策略接口让其自己选择方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}

class ThreadPool{
// 任务队列
private BlockingQueue<Runnable> taskQueue;

// 线程集合
private HashSet<Worker> workers = new HashSet();

// 核心线程数
private int coreSize;

// 获取任务的超时时间
private long timeout;
// 超时时间单位
private TimeUnit timeUnit;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task){
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers){
// 需要保证workers的线程安全
if(workers.size() < coreSize){
// 创建线程进行处理
Worker worker = new Worker(task);
// 并且添加到线程池中
workers.add(worker);
// 执行任务
worker.start();
}else{
// taskQueue.put(task);
// 尝试获取,如果满了执行相应拒绝策略
taskQueue.tryPut(rejectPolicy,task);
}
}
}


public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}

class Worker extends Thread{
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
// 执行任务
// 当task不为空,执行该任务
// 当task执行完毕,从任务队列获取任务
while (task != null || (task = taskQueue.take()) != null){
try {
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
workers.remove(this);
}
}
}
}

class BlockingQueue<T> {

/**
* 1.用双端链表定义任务队列
*/
private Deque<T> queue = new ArrayDeque<>();

/**
* 2.锁,防止多个线程取走/放置同一个任务
*/
private ReentrantLock lock = new ReentrantLock();

/**
* 3.生产者条件变量
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 3.消费者条件变量
*/
private Condition emptyWaitSet = lock.newCondition();

/**
* 容量
*/
private int capacity;

public BlockingQueue(int queueCapacity) {
this.capacity = queueCapacity;
}

/**
* 阻塞获取
*/
public T take(){
lock.lock();
try {
// 队列为空,等待
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return queue.removeFirst();
}finally {
lock.unlock();
}
}
/**
* 带超时的阻塞获取
*/
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
// 队列为空,等待
while (queue.isEmpty()){
try {
// 防止虚假唤醒
if(nanos <= 0 ){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return queue.removeFirst();
}finally {
lock.unlock();
}
}

/**
* 阻塞添加
*/
public void put(T element){
lock.lock();
try {
// 队列满了,等待
while (queue.size() == capacity){
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}

// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if(nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}

/**
* 获取大小
* @return
*/
public int size() {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否已经满了
if(queue.size() == capacity){
rejectPolicy.reject(this,task);
}else{
// 有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,(queue,task) -> {
// 死等
// queue.put(task);
// 超时等待
// queue.offer(task,500,TimeUnit.MILLISECONDS);
// 放弃执行即不调用任何处理方法
// System.out.println("放弃");
// 抛异常
// throw new RuntimeException("执行失败");
// 自己执行
task.run();
});
for (int i = 0; i < 15; i++) {
int j = i;
threadPool.execute(() -> {
System.out.println(j);
});
}
}
}

ThreadPoolExecutor

ThreadPoolExecutor是一个ExecutorService,是JDK提供的线程池,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

线程池状态:

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名 高 3位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y 正常执行
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终结状态

构造方法:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
)
  • corePoolSize:核心线程数目 (最多保留的线程数)
  • maximumPoolSize:最大线程数目 = 核心线程数+救急线程数
  • keepAliveTime:生存时间 - 针对救急线程的存活时间
  • unit:时间单位 -生存时间的时间单位
  • workQueue: 阻塞队列,即任务队列
  • threadFactory: 线程工厂 - 用于线程的创建
  • handler: 拒绝策略

拒绝策略:

拒绝策略jdk提供了4种实现

image-20230424194254094

  • DiscardPolicy:放弃本次任务
  • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之
  • AbortPolicy:让调用者抛出RejectedExecutionException异常,这是默认策略
  • CallerRunsPolicy:让调用者运行任务

提交任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

关闭线程池:

  • shutdown:将线程池状态变为 SHUTDOWN,不会接收新任务,但已提交任务会执行完,此方法不会阻塞调用线程的执行。
  • shutdownNow:将线程池状态变为 STOP,不会接收新任务,会将队列中的任务返回,用 interrupt 的方式中断正在执行的任务。

其他方法:

  • isShutdown():如果状态不是RUNNING,此方法就返回 true。
  • isTerminated():判断线程池状态是否是 TERMINATED。
  • awaitTermination:调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待。

Executor创建线程池

  1. newFixedThreadPool:

    实现代码:

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    • 特点:
      • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
      • 阻塞队列是无界的,可以放任意数量的任务
    • 适用场景:适用于任务量已知,相对耗时的任务
  2. newCachedThreadPool

    实现代码:

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
    • 特点:
      • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,并且可以无线创建
      • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
    • 适用场景:适合任务数比较密集,但每个任务执行时间较短的情况
  3. newSingleThreadExecutor

    实现代码:

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    • 特点:
      • 线程数固定为 1,任务数多于 1 时,会放入无界队列排队。
      • 任务执行完毕,这唯一的线程也不会被释放。
    • 适用场景:希望多个任务排队执行。
  4. ScheduledThreadPoolExecutor

    实现代码:

    1
    2
    3
    4
    5
    public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    // 利用schedule方法提交任务
    • 特点:

      • 一个任务调度的线程池,继承于ThreadPoolExecutor,实现了ScheduledExecutorService 接口。
    • 适用场景:用于处理延时/定时任务。

    • 使用

      • 提交延时任务:

        1
        2
        3
        4
        5
        executor.schedule(() -> {
        System.out.println("任务1,执行时间:" + new Date());
        // 这里的异常还可以返回Future对象来获取异常
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        }, 1000, TimeUnit.MILLISECONDS);
      • 提交定时任务:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        // 第一个参数任务、第二个参数延迟时间、第三个参数间隔时间、第四个时间单位
        // 这个方法第三个参数可能会被任务里面的延迟吞没比如sleep了2秒,这样就不会再推迟一秒执行而是直接执行
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start...");
        pool.scheduleAtFixedRate(() -> {
        log.debug("running...");
        }, 1, 1, TimeUnit.SECONDS);
        // 这个方法第三个参数间隔时间就不会被任务的sleep等时间所吞没
        pool.scheduleWithFixedRate(() -> {
        log.debug("running...");
        sleep(2);
        }, 1, 1, TimeUnit.SECONDS);

        线程池大小问题:

        • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
        • 过大会导致更多的线程上下文切换,占用更多内存

        计算方法:

        • CPU密集型运算:

          CPU核数+1

        • I/O密集型运算:

          线程数 = 核数 * 期望CPU利用率 * 总时间(CPU计算时间 + 等待时间) / CPU计算时间

Fork/Join

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 CPU 密集型
运算。Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率(Fork/Join 默认会创建与 CPU核心数大小相同的线程池)。

基本使用:

  • 创建任务:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 需要返回值就继承RecursiveTask<T>,不会要就继承RecursiveAction
    class MyTask extends RecursiveTask<Integer> {

    private int n;

    public MyTask(int n) {
    this.n = n;
    }

    @Override
    protected Integer compute() {
    // 需要将任务分解为递归任务
    if(n == 1){
    return 1;
    }
    // 拆分为子任务
    MyTask t1 = new MyTask(n - 1);
    // fork让线程去执行子任务
    t1.fork();
    // join获取结果
    return n + t1.join();
    }
    }
  • 利用线程池来执行任务

    1
    2
    3
    4
    5
    6
    7
    8
    public class ForkJoin {
    public static void main(String[] args) {
    // 默认线程数和CPU核心数一样
    ForkJoinPool pool = new ForkJoinPool();
    // 执行任务,求和
    System.out.println(pool.invoke(new MyTask(100)));
    }
    }

    JUC

AQS

AQS全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,后面很多并发工具类都是基于AQS来实现的。

特点:

  • 用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState:获取state状态
    • setState:设置state状态
    • compareAndSetState:利用乐观锁机制设置state状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于FIFO的等待队列,类似于Monitor的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet

使用:

使用子类继承它并按需实现下列方法:

  • tryAcquire:获取独占锁
  • tryRelease:释放独占锁
  • tryAcquireShared:获取共享锁
  • tryReleaseShared:获取独占锁
  • isHeldExclusively:释放持有锁

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 自定义锁(不可重入锁)
class MyLock implements Lock{

// 定义同步器,用来给这个自定义锁进行所有的锁操作
// 独占锁
class MySync extends AbstractQueuedSynchronizer {
// 获取锁
@Override
protected boolean tryAcquire(int arg) {
// 利用cas来修改state
if(compareAndSetState(0,1)){
// 设置owner持有线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
// 因为这个state加了volatile写屏障,所以要放后面
setState(0);
return true;
}
// 是否持有独占锁
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

//创建条件变量
public Condition newCondition(){
return new ConditionObject();
}
}

private MySync sync = new MySync();

// 加锁(不成功进入等待队列)
@Override
public void lock() {
sync.acquire(1);
}
// 加锁可打断
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁(一次)
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 尝试加锁带超时时间
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}

// 释放锁
@Override
public void unlock() {
// 会释放锁并且唤醒等待线程
sync.release(0);
}
// 创建条件变量
@Override
public Condition newCondition() {
return sync.newCondition();
}
}

ReentrantReadWriteLock

ReentrantReadWriteLock是一种读写锁,类似于数据库的共享锁/排他锁,当读操作远远高于写操作时,这时候使用 读写锁让读-读 可以并发,提高性能。

特点:

  • 读锁不支持条件变量,写锁支持条件变量。
  • 重入时不支持升级:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待。
  • 重入时降级支持:即持有写锁的情况下去获取读锁。

简单使用:

编写一个数据容器类,内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public Object read() {
System.out.println("获取读锁...");
r.lock();
try {
System.out.println("读取");
sleep(1000);
return data;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("释放读锁...");
r.unlock();
}
return null;
}

public void write() {
System.out.println("获取写锁...");
w.lock();
try {
System.out.println("写入");
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("释放写锁...");
w.unlock();
}
}
}

StampedLock

该类自JDK 8加入,是为了进一步优化读性能(因为每次读读并发的时候都需要利用CAS去修改状态),它的特点是在使用读锁、写锁时都必须配合【戳】使用。

使用

  • 加解读锁

    1
    2
    long stamp = lock.readLock();
    lock.unlockRead(stamp) ;
  • 加解写锁

    1
    2
    long stamp = lock.writeLock();
    lock.unlockwrite(stamp) ;
  • 乐观读锁,StampedLock支持tryoptimisticRead()方法(乐观读),读取完毕后需要做一次戳校验如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

    1
    2
    3
    4
    5
    long stamp = lock.tryoptimisticRead();
    //验戳
    if(!lock.validate(stamp)){
    // 锁升级 加读锁
    }

缺点:

  • StampedLock不支持条件变量
  • StampedLock不支持可重入

Semaphore

Semaphore是JDK提供的一种信号量的实现,用来限制能同时访问共享资源的线程上限;例如对有限共享资源的使用限制。

使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数,除了资源就是由一个线程持有的例如数据库连接池。

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SemaphoreDemo {
public static void main(String[] args) {
// 1.创建 semaphore对象, 参数一限制的线程数,第二个参数是否公平性
Semaphore semaphore = new Semaphore(3);
// 2. 让10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 获取信号量
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getName()+"running");
Thread.sleep(10);
System.out.println(Thread.currentThread().getName()+"end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量
semaphore.release();
}
}).start();
}
}
}

image-20230511210254682

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。其中构造参数用来初始化等待计数值,等待的线程就调用await()用来等待计数归零,其他线程调用countDown()用来让计数减一,计数为零时等待线程开始执行。类似于Join()但是比其更加高级,并且适用于线程池中的线程。

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CountdownLatchDemo {
public static void main(String[] args) {
// 创建countDownLatch,设置等待计数
CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" begin..");
try {
Thread.sleep(1000);
// 让计数减一
latch.countDown();
System.out.println(Thread.currentThread().getName()+" end..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" begin..");
try {
Thread.sleep(1000);
// 让计数减一
latch.countDown();
System.out.println(Thread.currentThread().getName()+" end..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println(Thread.currentThread().getName()+" watting...");
try {
// 等待计数减为零
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" watting end...");
}
}

Cyclicbarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用await()方法进行等待,当等待的线程数满足『计数个数』时,继续执行。与CountdownLatch的区别在于这个工具类是可以重用的,可以重置为最开始的值。

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CyclicbarrierDemo {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
// 创建循环栅栏,参数二为当计数为0时执行的动作
CyclicBarrier barrier = new CyclicBarrier(2,() -> {
System.out.println("task1 task2 end....");
});
// 循环执行3次,CyclicBarrier计数会自动重置为初始值
for (int i = 0; i < 3; i++) {
service.submit(() ->{
System.out.println("task1 begin....");
try {
Thread.sleep(1000);
// 将计数减一,若不为0则阻塞
barrier.await();
System.out.println("task1 end...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() ->{
System.out.println("task2 begin....");
try {
Thread.sleep(2000);
// 将计数减一,若不为0则阻塞
barrier.await();
System.out.println("task2 end...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}

线程安全集合类

image-20230516194049286

线程安全集合类可以分为三大类:

  • 遗留的线程安全集合如:

    • Hashtable
    • Vector
    • Stack

    都是用synchronized修饰的线程安全方法,所以性能不好,目前已经不推荐使用了。

  • 使用 Collections 装饰的线程安全集合:

    • Collections.synchronizedCollection
    • Collections.synchronizedList
    • Collections.synchronizedMap
    • Collections.synchronizedSet
    • Collections.synchronizedNavigableMap
    • Collections.synchronizedNavigableSet
    • Collections.synchronizedSortedMap
    • Collections.synchronizedSortedSet

    利用装饰器模式的思想,通过Collections中的同步包装方法来包装成线程安全的集合,其实底层就是用synchronized修饰原来集合的方法。

    注意单次操作时线程安全的,但是如果是组合操作很有可能会发生竞态条件

  • java.util.concurrent.*下的线程安全集合类:

    这些都是基于JUC实现的安全集合类,重点推荐使用的线程安全集合。

    image-20230605224311912

    • CopyOnWrite类:利用写时复制来避免多线程带来的并发安全问题,适用于读多写特别少的场景,因为修改/写开销相对较重,并且会有短暂的数据不一致问题

      image-20230605224521561

      CopyOnWrite类遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。

    • Concurrent类:带Concurrent的集合类,内部很多操作使用 cas 优化,一般可以提供较高吞吐量,但是会有弱一致性问题。

      • ConcurrentHashMap:无序线程安全的Map,不支持key和value为空。
      • ConcurrentSkipListMap:有序线程安全的Map,不支持key和value为空,底层使用了跳表,插入、删除、查询操作平均的时间复杂度是 O(log n)。
      • ConcurrentSkipListSet:有序线程安全的Set。
    • Queue:以下两个维度来分类。一个维度是阻塞与非阻塞,另一个维度是单端与双端

      • 单端阻塞队列
        • ArrayBlockingQueue:以数组为队列的实现的阻塞队列。
        • LinkedBlockingQueue:以链表为队列的实现的阻塞队列。
        • SynchronousQueue:同步队列,每一个插入操作必须等待一个相应的删除操作,用于同步的数据交换。
        • LinkedTransferQueue:融合 LinkedBlockingQueue 和 SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好。
        • PriorityBlockingQueue:支持按照优先级出队。
        • DelayQueue:可以支持延迟出队。
      • 双端阻塞队列
        • LinkedBlockingDeque:双端阻塞,以双向链表为队列实现。
      • 单端非阻塞队列
        • ConcurrentLinkedQueue
      • 双端非阻塞队列
        • ConcurrentLinkedDeque

      注意点:

      • Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识

      • 类似阻塞队列之类的,大部分实现基于,并提供用来阻塞的方法。而非阻塞队列是基于CAS来是实现的无锁机制。

      • 上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患

      • Java 容器有一个快速失败机制(Fail-Fast):是指在多线程环境下,当多个线程同时对同一个容器进行修改操作时,如果检测到容器的结构发生了变化就会抛出ConcurrentModificationException 异常,从而快速失败。

        在编写多线程程序时,仍然需要通过其他手段来保证线程安全,例如使用同步机制(如 synchronized)或并发容器(如 ConcurrentHashMap)等。

CompletableFuture

CompletableFuture是Java 在 1.8 版本提供的来支持异步编程的强大工具类。

优点:

  • 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注。
  • 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
  • 语义更清晰。

主要方法

CompletableFuture如果构造的时候不指定线程池,默认使用公共的ForkJoinPool 线程池,线程数是CPU核心数;但是这个线程池适用于CPU密集型计算,所以很多时候需要自己传入线程池,根据不同的业务类型创建不同的线程池,以避免互相干扰

创建CompletableFuture对象:

  • runAsync(Runnable runnable)/runAsync(Runnable runnable, Executor executor):

    这个方法是不需要有返回值的,可以直接传一个Runnable进去异步执行。

  • supplyAsync(Supplier supplier)/supplyAsync(Supplier supplier, Executor executor) :

    这个方法支持有返回值的异步任务。

1
2
3
4
5
6
//使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

描述串行方法关系:

  • thenApply():这个方法既能接收参数也支持返回值。
  • thenAccept():这个方法虽然支持参数,但却不支持回值。
  • thenRun():方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值。
  • thenCompose():这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。
1
2
3
4
5
6
7
8
9
// 加了Async表示异步去执行,不加则是同步
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

描述AND汇聚关系方法:

  • thenCombine():描述的就是一种and汇聚关系,该任务依赖于f1和f2
  • thenAcceptBoth():不支持返回值,支持传参
  • runAfterBoth:不支持返回值,不支持传参
1
2
3
4
5
6
7
// 加了Async表示异步去执行,不加则是同步
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

描述 OR 汇聚关系方法:

  • applyToEither():这个方法既能接收参数也支持返回值。
  • acceptEither():这个方法虽然支持参数,但却不支持回值。
  • runAfterEither():既不能接收参数也不支持返回值。
1
2
3
4
5
6
7
// 加了Async表示异步去执行,不加则是同步
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

异常处理:

  • exceptionally():非常类似于 try{}catch{}中的 catch{}
  • whenComplete():类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行,支持传参
  • handle():类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行,但不支持传参
1
2
3
4
5
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

简单使用

同样使用烧水泡茶的例子:

image-20230614113847499

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(()->{
System.out.println("T1:洗水壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开水...");
sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "龙井";
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 =
f1.thenCombine(f2, (__, tf)->{
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任务3执行结果
System.out.println(f3.join());
void sleep(int t, TimeUnit u) {
try {
u.sleep(t);
}catch(InterruptedException e){}
}

CompletionService

CompletionService(完成服务)是Java中的一个接口,它可以用于管理异步任务的执行和获取结果。CompletionService接口是Executor框架的一部分,提供了一种方便的方式来处理一批异步任务,并按照它们完成的顺序获取结果

CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中。

相关方法

构造函数:

  • ExecutorCompletionService(Executor executor):需要传入一个线程池,默认使用无界的 LinkedBlockingQueue。
  • ExecutorCompletionService(Executor executor, BlockingQueue<Future> completionQueue):支持传入线程池和阻塞队列。

提交任务:

  • Future submit(Runnable task):传入需要执行的任务

  • Future submit(Runnable task, T result):传入需要执行的任务,以及返回的值

队列相关方法:

  • take():从队列中获取并移除元素,队列为空则阻塞
  • poll():从队列中获取并移除元素,队列为空则返回null
  • poll(long timeout, TimeUnit unit):支持超时时间的获取方法

简单使用

模拟电商询价问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}