• 线程池优势:

    线程的创建和销毁浪费着系统资源。

1
2
3
4
public interface JobRun {
// Job的具体工作内容
void concreteJob();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.TimeUnit;

public class Job implements JobRun {
@Override
public void concreteJob() {
System.out.println("执行某一个工作,耗时1s");
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public interface ThreadPool<Job extends JobRun> {
// 执行一个Job,这个Job需要实现Runnable接口
void execute(Job job);
// 关闭线程池
void shutdown();
// 增加工作线程
void addWorkers(int num);
// 减少工作线程
void removeWorker(int num);
// 得到正在等待执行的任务数量
int getJobSize();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<Job extends JobRun> implements ThreadPool<Job> {
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认线程数
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小线程数
private static final int MIN_WORKER_NUMBERS = 2;
// 一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<Job>();
// 工作者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
// 工作者线程数
private int workerNum = DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private AtomicLong threadNum = new AtomicLong();

public DefaultThreadPool(){
initializeWorkers(DEFAULT_WORKER_NUMBERS);
}

public DefaultThreadPool(int num){
if(num > MAX_WORKER_NUMBERS){
workerNum = MAX_WORKER_NUMBERS;
}
if(num<MIN_WORKER_NUMBERS){
workerNum = MIN_WORKER_NUMBERS;
}
initializeWorkers(workerNum);
}

public void execute(Job job){
if(job != null){
synchronized (jobs){
// 增加一个工作,然后进行通知
jobs.addLast(job);
jobs.notify();
}
}
}

public void shutdown(){
for(Worker worker : workers){
worker.shutdown();
}
}

public void addWorkers(int num){
synchronized (jobs){
// 限制新增的Worker数量不能超过最大值
if(num + this.workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
}

public void removeWorker(int num){
synchronized (jobs){
if(num >= this.workerNum){
throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止Worker
int count = 0;
while(count < num){
Worker worker = workers.get(0);
if(workers.remove(worker)){
worker.shutdown();
++count;
}
}
this.workerNum -= count;
}
}

public int getJobSize(){
return jobs.size();
}

// 初始化线程工作者
private void initializeWorkers(int num){
for(int i =0;i<num;++i){
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker,"ThreadPool-worker-" + threadNum.incrementAndGet());
thread.start();
}
}

// 工作者,负责消费任务
class Worker implements Runnable{
// 是否工作
private volatile boolean running = true;
@Override
public void run(){
while (running){
Job job = null;
synchronized (jobs){
// 如果工作者列表为空,那么wait
while(jobs.isEmpty()){
try{
jobs.wait();
}catch(InterruptedException ex){
// 感知到外部对WorkerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个job
job = jobs.removeFirst();
}
if(job != null){
try{
job.concreteJob();
}catch(Exception ex){
System.out.println(ex.getMessage());
}
}
}
}

public void shutdown(){
running = false;
}
}
}
1
2
3
4
5
6
7
8
9
10
public class ApplicationDemo {
public static void main(String[] args){
DefaultThreadPool<Job> defaultThreadPool = new DefaultThreadPool<Job>();
defaultThreadPool.removeWorker(3);
for(int i = 0;i<20;++i){
Job job = new Job();
defaultThreadPool.execute(job);
}
}
}
  • 关于Collects.synchronizedList

    • Arraylist是非线程安全的,多线程下向list插入数据可能会造成数据丢失。并且一个线程遍历list,另一个线程修改list,会报ConcurrentModificationException(并发修改异常)错误。

    • Vector是线程安全的list,但是它的线程安全实现方式是对所有操作都加上synchronized关键字,这种方式严重影响效率,不推荐使用。

    推荐使用:

1
2
3
4
5
6
7
8
9
10
11
List<String> list = Collections.synchronizedList(new ArrayList<String>());
list.add("1");
list.add("2");
list.add("3");

synchronized(list) {
Iterator iter = list.iterator();
while(iter.hasNext()) {
System.out.println(iter.next());
}
}

Collections.synchronizedList会对add方法加锁。

遍历的时候,应避免数据被其他线程篡改,应加锁保护。

参考:Java并发编程的艺术

摘自/参考链接