设计模式

同步模式—保护性暂停

定义

保护性暂停即Guarded Suspension,用在一个线程等待另一个线程的执行结果,因为要等待另一方的结果,因此归类到同步模式。

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject,JDK中,join的实现、Future的实现,采用的就是此模式。
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)

image-20230406202511942

实现

单个线程相互等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class Test01 {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
// 等待结果
Object o = guardedObject.get();
System.out.println(o.toString());
},"t1").start();

new Thread(() -> {
// 传输结果
guardedObject.complete("产生的结果");
},"t2").start();
}
}

class GuardedObject {
// 结果
private Object response;
// 获取结果
public Object get(){
synchronized (this){
// 没有结果,就需要等待
while (response == null){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}

// 限制时间获取结果
public Object get(long timeout){
synchronized (this){
long begin = System.currentTimeMillis();
long passedTime = 0;
// 没有结果,就需要等待
while (response == null){
long waitTime = timeout - passedTime;
// 等待时间超时则退出循环
if(waitTime <= 0){
break;
}
try {
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 等待时间
passedTime = System.currentTimeMillis() - begin;
}
return response;
}
}

// 产生结果
public void complete(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}

多个线程相互等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Boxes{
private static Map<Integer,GuardedObject> boxes = new Hashtable<>();

private static int id = 1;

// 产生唯一id
private static synchronized int generateId(){
return id++;
}

public static GuardedObject createGuardedObject(){
GuardedObject guardedObject = new GuardedObject(generateId());
boxes.put(guardedObject.getId(),guardedObject);
return guardedObject;
}

public static Set<Integer> getIds() {
return boxes.keySet();
}

public static GuardedObject getGuarObject(int id){
return boxes.remove(id);
}
}

class GuardedObject {
// 唯一表示
private int id;
// 结果
private Object response;

public GuardedObject() {
}

public GuardedObject(int id) {
this.id = id;
}

public int getId() {
return id;
}

// 获取结果
public Object get(){
synchronized (this){
// 没有结果,就需要等待
while (response == null){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}

// 获取结果
public Object get(long timeout){
synchronized (this){
long begin = System.currentTimeMillis();
long passedTime = 0;
// 没有结果,就需要等待
while (response == null){
long waitTime = timeout - passedTime;
// 等待时间超时则退出循环
if(waitTime <= 0){
break;
}
try {
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 等待时间
passedTime = System.currentTimeMillis() - begin;
}
return response;
}
}
// 产生结果
public void complete(Object response){
synchronized (this){
this.response = response;
this.notifyAll();
}
}
}

同步模式—顺序控制

有时候需要控制线程的执行顺序,例如下面的例子要保证t2线程在t1线程前执行

wait/notify方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class 顺序控制 {
static final Object lock = new Object();
// 表示t2是否允许过
static boolean flag = false;
public static void main(String[] args) {
new Thread(() -> {
synchronized (lock){
while (!flag){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(1);
},"t1").start();

new Thread(() -> {
synchronized (lock){
System.out.println(2);
flag = true;
lock.notify();
}

},"t2").start();
}
}

这种方式与ReentrantLock的await和signal一致

park/unpark方式

同样例子:要让t2线程在t1线程前执行

1
2
3
4
5
6
7
8
9
10
11
12
13
public class 顺序控制 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();
System.out.println(1);
}, "t1");
t1.start();
new Thread(()->{
System.out.println(2);
LockSupport.unpark(t1);
},"t2").start();
}
}

异步模式—生产者/消费者

定义

与前面的保护性暂停中的GuardObject 不同,不需要产生结果和消费结果的线程一一对应,并且是异步的不会立刻对消息进行处理消费;JDK中各种阻塞队列,采用的就是这种模式。

要点:

  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据

image-20230406214542777

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class xfz {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);

for (int i = 0; i < 3; i++) {
int id = i;
new Thread(()->{
queue.put(new Message(id,"值"+id ));
},"生成者"+i).start();
}
new Thread(()->{
while (true){
try {
sleep(1000);
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者").start();
}
}

// 消息队列类,java线程之间通信
class MessageQueue {
// 队列集合
private LinkedList<Message> list = new LinkedList<>();
// 容量
private int capcity;

public MessageQueue(int capcity) {
this.capcity = capcity;
}

// 获取消息
public Message take(){
// 检测队列是否为空
synchronized (list){
while(list.isEmpty()){
System.out.println("队列为空");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从头部获取消息
Message message = list.removeFirst();
list.notifyAll();
return message;
}
}

// 存入消息
public void put(Message message){
synchronized (list){
while(list.size() == capcity){
System.out.println("队列已满!!");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从尾部加入消息
list.addLast(message);
list.notifyAll();
}
}

}

final class Message{
private int id;
private Object value;

public Message(int id, Object value) {
this.id = id;
this.value = value;
}

public int getId() {
return id;
}

public Object getValue() {
return value;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}

交替执行

该模式主要是用于让多线程按照一定的顺序交替执行,案例:让a,b,c线程按照顺序交替执行

wait/notify

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class 交替输出01 {
public static void main(String[] args) {
WaitNotify waitNotify = new WaitNotify(1, 3);
new Thread(()->{
waitNotify.print("aaaa",1, 2);
},"a").start();
new Thread(()->{
waitNotify.print("bbbb",2, 3);
},"b").start();
new Thread(()->{
waitNotify.print("cccc",3, 1);
},"c").start();
}
}

class WaitNotify{
// 执行操作
public void print(String str,int waitFlag,int nextFlag){
for (int i = 0; i < loopNumber; i++) {
synchronized (this){
while (flag != waitFlag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}

// 等待标记
private int flag;
// 循环次数
private int loopNumber;

public WaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
}

结果:

image-20230412195258676

await/signal

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class 交替输出02 {
public static void main(String[] args) {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition conditionA = awaitSignal.newCondition();
Condition conditionB = awaitSignal.newCondition();
Condition conditionC = awaitSignal.newCondition();
new Thread(() -> {
awaitSignal.print("aaaa",conditionA, conditionB);
}).start();
new Thread(() -> {
awaitSignal.print("bbbb",conditionB, conditionC);
}).start();
new Thread(() -> {
awaitSignal.print("cccc",conditionC, conditionA);
}).start();
awaitSignal.lock();
try {
System.out.println("开始...");
conditionA.signal();
}finally {
awaitSignal.unlock();
}
}
}

class AwaitSignal extends ReentrantLock{

// 循环次数
private int loopNumber;

public AwaitSignal(int loopNumber){
this.loopNumber = loopNumber;
}

public void print(String str,Condition current,Condition next){
for (int i = 0; i < loopNumber; i++) {
lock();
try {
current.await();
System.out.print(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}

结果:

image-20230412200705388

park/unpark

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class 交替输出03 {
static Thread a;
static Thread b;
static Thread c;
public static void main(String[] args) {
ParkUnpark parkUnpark = new ParkUnpark(5);
a = new Thread(() -> {
parkUnpark.print("aaaa",b);
},"a");

b = new Thread(() -> {
parkUnpark.print("bbbb",c);
},"b");
c = new Thread(() -> {
parkUnpark.print("cccc",a);
},"c");
a.start();
b.start();
c.start();
// 先唤醒a
LockSupport.unpark(a);
}
}

class ParkUnpark{

public void print(String str, Thread next){
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(next);
}
}

// 循环次数
private int loopNumber;

public ParkUnpark(int loopNumber) {
this.loopNumber = loopNumber;
}
}

结果:

image-20230412201218785

两阶段终止模式

两阶段终止模式是指能够优雅的结束线程,能够进行完善的善后工作。

终止线程的错误想法:

  • 使用stop(),stop方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁,所以该方法已经被废弃!同样的还有suspend()、resume()
  • 使用System.exit(0):这样会让整个程序都停止

打断标记法实现

利用调用interrupt()方法会让线程抛出InterruptedException异常,这样可以捕获标记打断标记,被标记后会进行善后工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Termination{
private Thread monitor;

// 启动监控线程
public void start(){
monitor = new Thread(() -> {
while (true){
// 判断是否被打断
Thread current = Thread.currentThread();
if(current.isInterrupted()){
System.out.println("线程结束中...."+current.isInterrupted());
break;
}
try {
Thread.sleep(1000);
System.out.println("执行监控..");
} catch (InterruptedException e) {
e.printStackTrace();
// 如果在sleep中被打断会把标记设置为false,这里重置为true
current.interrupt();
}
}
});
monitor.start();
}
// 停止监控线程
public void stop(){
monitor.interrupt();
}
}

利用volatile优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Termination{
private Thread monitor;

private volatile boolean stop = false;

// 启动监控线程
public void start(){
monitor = new Thread(() -> {
while (true){
// 判断是否被打断
if(stop){
System.out.println("线程结束中....");
break;
}
try {
Thread.sleep(1000);
System.out.println("执行监控..");
} catch (InterruptedException e) {

}
}
});
monitor.start();
}
// 停止监控线程
public void stop(){
stop = true;
// 打断睡眠
monitor.interrupt();
}
}

避免共享

如果避免了共享,同样可以解决多线程并发问题,已经知道通过局部变量可以做到避免共享,还可以使用Java 语言提供的线程本地存储(ThreadLocal)

ThreadLocal

ThreadLocal 的目标是让不同的线程有不同的变量 V,所以其内部维护了一个Map,叫做 ThreadLocalMap,不过持有 ThreadLocalMap 的不是 ThreadLocal,而是 Thread。

image-20230618104558134

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Thread {
//内部持有ThreadLocalMap
ThreadLocal.ThreadLocalMap
threadLocals;
}
class ThreadLocal<T>{
public T get() {
//首先获取线程持有的
//ThreadLocalMap
ThreadLocalMap map =
Thread.currentThread()
.threadLocals;
//在ThreadLocalMap中
//查找变量
Entry e =
map.getEntry(this);
return e.value;
}
static class ThreadLocalMap{
//内部是数组而不是Map
Entry[] table;
//根据ThreadLocal查找Entry
Entry getEntry(ThreadLocal key){
//省略查找逻辑
}
//Entry定义
static class Entry extends
WeakReference<ThreadLocal>{
Object value;
}
}
}

将Map放在每个Thread里面的好处:

  • ThreadLocal 仅仅是一个代理工具类,内部并不持有任何与线程相关的数据,所有和线程相关的数据都存储在 Thread 里面,这样的设计容易理解。

  • 这样不容易造成内存泄漏,防止Map 中的 Thread 对象就永远不会被回收。(但是如果是线程池使用了就有可能会导致内存泄漏;原因就出在线程池中线程的存活时间太长,往往都是和程序同生共死的,这就意味着 Thread 持有的 ThreadLocalMap 一直都不会被回收)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    ExecutorService es;
    ThreadLocal tl;
    es.execute(()->{
    //ThreadLocal增加变量
    tl.set(obj);
    try {
    // 省略业务逻辑代码
    }finally {
    //手动清理ThreadLocal
    tl.remove();
    }
    });

Balking(犹豫)模式

Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。例如线程安全的单例模式中的双重检查就是利用的这个模式。

例如:尽管start了多次,始终只有一个线程在执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class 犹豫模式 {
public static void main(String[] args) {
Termination termination = new Termination();
termination.start();
termination.start();
}
}
class Termination{
private Thread monitor;

private volatile boolean stop = false;

// 是否正在执行
private boolean starting = false;

// 启动监控线程
public void start(){
synchronized (this){
// 这里需要加锁保证验证的原子性
if(starting){
return;
}
starting = true;
}
monitor = new Thread(() -> {
while (true){
// 判断是否被打断
Thread current = Thread.currentThread();
if(stop){
System.out.println("线程结束中....");
starting = false;
break;
}
try {
Thread.sleep(1000);
System.out.println("执行监控..");
} catch (InterruptedException e) {

}
}
});
monitor.start();
}
// 停止监控线程
public void stop(){
stop = true;
// 打断睡眠
monitor.interrupt();
}
}