/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public class ThreadPoolTest {
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
ThreadFactory factory = r -> new Thread(r, "test-thread-pool");
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.SECONDS, queue, factory);
while (true) {
executor.submit(() -> {
try {
System.out.println(queue.size());
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
@Configuration
public class TaskExecutorConfig implements AsyncConfigurer {
/**
* Set the ThreadPoolExecutor's core pool size.
*/
private static final int CORE_POOL_SIZE = 5;
/**
* Set the ThreadPoolExecutor's maximum pool size.
*/
private static final int MAX_POOL_SIZE = 5;
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
*/
private static final int QUEUE_CAPACITY = 1000;
/**
* 通过重写getAsyncExecutor方法,制定默认的任务执行由该方法产生
* <p>
* 配置类实现AsyncConfigurer接口并重写getAsyncExcutor方法,并返回一个ThreadPoolTaskExevutor
* 这样我们就获得了一个基于线程池的TaskExecutor
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
taskExecutor.initialize();
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return taskExecutor;
}
}
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
ThreadFactory factory = r -> new Thread(r, "test-thread-pool");
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.SECONDS, queue, factory, new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 1000; i ) {
executor.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() ":执行任务");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}