/** * The default rejected execution handler */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public class ThreadPoolTest { public static void main(String[] args) { BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); ThreadFactory factory = r -> new Thread(r, "test-thread-pool"); ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, queue, factory); while (true) { executor.submit(() -> { try { System.out.println(queue.size()); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
@Configuration public class TaskExecutorConfig implements AsyncConfigurer { /** * Set the ThreadPoolExecutor's core pool size. */ private static final int CORE_POOL_SIZE = 5; /** * Set the ThreadPoolExecutor's maximum pool size. */ private static final int MAX_POOL_SIZE = 5; /** * Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private static final int QUEUE_CAPACITY = 1000; /** * 通过重写getAsyncExecutor方法,制定默认的任务执行由该方法产生 * <p> * 配置类实现AsyncConfigurer接口并重写getAsyncExcutor方法,并返回一个ThreadPoolTaskExevutor * 这样我们就获得了一个基于线程池的TaskExecutor */ @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(CORE_POOL_SIZE); taskExecutor.setMaxPoolSize(MAX_POOL_SIZE); taskExecutor.setQueueCapacity(QUEUE_CAPACITY); taskExecutor.initialize(); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); return taskExecutor; } }
public static void main(String[] args) { BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10); ThreadFactory factory = r -> new Thread(r, "test-thread-pool"); ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, queue, factory, new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 1000; i ) { executor.submit(() -> { try { System.out.println(Thread.currentThread().getName() ":执行任务"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } }