发现问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| Semaphore semaphore = new Semaphore(nThread);
ExecutorService executorService =new ThreadPoolExecutor(1, nThread, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1)); for (T obj : list) { try { semaphore.acquire(); executorService.execute(() -> { try { func.accept(obj); } catch (Exception ex) { Logger.error("startWithMultiThread 出错!"+ex.getMessage()); } finally { semaphore.release(); } }); } catch (InterruptedException e) { Logger.error("startWithMultiThread 出错!"+e.getMessage()); } }
public static void main(String[] args) { ArrayList<Object> objects = Lists.newArrayList(); for (int i = 0; i < 1000; i++) { objects.add(i); } startWithMultiThread(objects,5, obj -> { Threads.sleep(1000); System.out.println(obj); }); }
|
在执行上面的代码的时候,出现了 reject 的异常,按道理说有semaphore.acquire();
拦截,不应该会出现 reject 的异常。
但是如果把 keepAliveTime 时间改成 0 ,就可以正常执行。
分析原因
1 2 3 4
| new ThreadPoolExecutor(1, nThread,1000L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1)); new ThreadPoolExecutor(1, nThread,0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1)); new ThreadPoolExecutor(nThread, nThread,0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1)); new ThreadPoolExecutor(nThread, nThread,1000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1));
|
对于上面的 4 种创建线程池的方法:
第一种和第二种的不同只是 keeplive 的事件不一样,在运行的时候运行的线程会大于核心线程数的,这样就在线程池的逻辑中会自动采用非核心线程超时策略,
在拉取的队列的任务时,采用的的 poll(n)的方式,如果 n>0是,线程池满了再次分配任务的时候会导致执行拒绝策略。poll(0),不会阻塞,可以正常分配。
如果采用第三种和第四种方式,则不会开启空闲线程超时释放策略,在拉取的任务的时候后采用了的 take()方法,一直阻塞,直到新的数据过来(从入队列到出队列,也会出现延迟),这样也会导致线程池执行拒绝策略。