线程池问题探究

发现问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    Semaphore semaphore = new Semaphore(nThread);//定义几个许可
//这里
ExecutorService executorService =new ThreadPoolExecutor(1, nThread,
1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1));//创建一个固定的线程池
for (T obj : list) {
try {
semaphore.acquire();
executorService.execute(() -> {
try {
func.accept(obj);
} catch (Exception ex) {
Logger.error("startWithMultiThread 出错!"+ex.getMessage());
} finally {
semaphore.release();
}
});
} catch (InterruptedException e) {
Logger.error("startWithMultiThread 出错!"+e.getMessage());
}
}


public static void main(String[] args) {
ArrayList<Object> objects = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
objects.add(i);
}
startWithMultiThread(objects,5, obj -> {
Threads.sleep(1000);
System.out.println(obj);
});
}

在执行上面的代码的时候,出现了 reject 的异常,按道理说有semaphore.acquire(); 拦截,不应该会出现 reject 的异常。

但是如果把 keepAliveTime 时间改成 0 ,就可以正常执行。

分析原因

1
2
3
4
new ThreadPoolExecutor(1, nThread,1000L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));
new ThreadPoolExecutor(1, nThread,0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));
new ThreadPoolExecutor(nThread, nThread,0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));
new ThreadPoolExecutor(nThread, nThread,1000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));

对于上面的 4 种创建线程池的方法:

第一种和第二种的不同只是 keeplive 的事件不一样,在运行的时候运行的线程会大于核心线程数的,这样就在线程池的逻辑中会自动采用非核心线程超时策略,

在拉取的队列的任务时,采用的的 poll(n)的方式,如果 n>0是,线程池满了再次分配任务的时候会导致执行拒绝策略。poll(0),不会阻塞,可以正常分配。

如果采用第三种和第四种方式,则不会开启空闲线程超时释放策略,在拉取的任务的时候后采用了的 take()方法,一直阻塞,直到新的数据过来(从入队列到出队列,也会出现延迟),这样也会导致线程池执行拒绝策略。