Positive example 1: //org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); Positive example 2: ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); //Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); pool.execute(()-> System.out.println(Thread.currentThread().getName())); pool.shutdown();//gracefully shutdown Positive example 3: <bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="100" /> <property name="queueCapacity" value="2000" /> <property name="threadFactory" value= threadFactory /> <property name="rejectedExecutionHandler"> <ref local="rejectedExecutionHandler" /> </property> </bean> //in code userThreadPool.execute(thread);
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 核心线程数 会一直存在,除非allowCoreThreadTimeOut设置为true maximumPoolSize 线程池最大线程数 keepAliveTime:除了核心线程数外的线程 如果没有任务多久释放。 unit:超时时间的单位 workQueue:工作队列,保存未执行的Runnable 任务 threadFactory:创建线程的工厂类 handler:当线程已满,工作队列也满了的时候,会被调用。被用来实现各种拒绝策略。
3.5.1.默认直接拒绝抛出ThreadPoolExecutor.AbortPolicy RejectedExecutionException
3.5.2.直接不处理ThreadPoolExecutor.DiscardPolicy()
3.5.3.把加入队列最早的任务删除。ThreadPoolExecutor.DiscardOldestPolicy()
3.5.4.让调用线程池的任务去处理。ThreadPoolExecutor.CallerRunsPolicy()
自定义拒绝策略 实现RejectedExecutionHandler接口,实现抽象方法rejectedExecution方法。 当引用自定义拒绝策略时会初始化自定义拒绝策略类的构造方法。 当线程堵塞触发拒绝策略时会执行rejectedExecution方法。 这几种拒绝策略都是静态内部类实现RejectedExecutionHandler接口。当我们要向队列中添加一个元素时,我们需要调用put()方法。该方法将阻塞,直到其他某个线程调用take()方法,表明它已准备好获取一个元素。
SynchronousQueue并不是真正的队列,而是一种管理直接在线程之间移交信息的机制,但我们应该将其视为两个线程之间单个元素的交换点,其中一个线程正在传递一个元素,另一个线程正在获取该元素。
ThreadLocal<Integer> threadLocalValue = new ThreadLocal<>(); threadLocalValue.set(1); Integer result = threadLocalValue.get();
Lock是一个接口,接口的实现类ReentrantLock, ReentrantReadWriteLock.ReadLock,ReentrantReadWriteLock.WriteLock
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。
使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回,在拿不到锁时也不会一直在那等待。
JUC包中的原子操作类可以分为4类。 1、基本类型: AtomicInteger, AtomicLong, AtomicBoolean ; 2、数组类型: AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray ; 3、引用类型: AtomicReference, AtomicStampedRerence, AtomicMarkableReference ; 4、对象的属性修改类型: AtomicIntegerFieldUpdater,AtomicLongFieldUpdater, AtomicReferenceFieldUpdater 。
import java.util.concurrent.CountDownLatch; public class ThreadRunnableDemo implements Runnable{ private CountDownLatch downLatch; public ThreadRunnableDemo(CountDownLatch downLatch) { this.downLatch = downLatch; } @Override public void run() { System.out.printf("Thread %s start\t",Thread.currentThread().getName()); try { Thread.sleep(300); System.out.printf("Thread %s stop\n",Thread.currentThread().getName()); downLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.jy.lejutaobao.testDemo; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCountDownLatchDemo { public static void main(String[] args) { CountDownLatch downLatch=new CountDownLatch(3); ExecutorService executor = Executors.newFixedThreadPool(3); for(int i=0; i < 3; i ) { executor.submit(new ThreadRunnableDemo(downLatch)); } try { downLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); System.out.println("都执行完了."); } }
CountDownLatch: 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。 CyclicBrrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCyclicBarrierExample1 { public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(5); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i ) { final int threadNum = i; executor.execute(() -> { try { System.out.print("线程 = " threadNum " 开始 \t"); if((threadNum 1) % 5==0){ System.out.println("\n"); } Thread.sleep(1000 threadNum); System.out.print("线程 = " threadNum " 已完成\t"); if((threadNum 1) % 5==0){ System.out.println("\n"); } barrier.await(); } catch (Exception e) { } }); } executor.shutdown(); } }
线程池分批执行,一批一批执行. 五个一批 Semaphore semaphore = new Semaphore(5); 获取信号量 semaphore.acquire(); 释放信号量 semaphore.release();
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class TestSemaphoreDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(5); for(int i = 0;i<20;i ){ int finalI = i; executorService.execute(new Runnable() { @Override public void run() { try { /*获取信号量*/ semaphore.acquire(); System.out.println("Thread = " finalI " 获取acquire"); Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); System.out.println("Thread = " finalI " 释放release"); } }); } } }
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); } finally { lock.unlock(); } } public void conditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); } finally { lock.unlock(); } }