private transient volatile Node head; //同步队列的head节点 private transient volatile Node tail; //同步队列的tail节点 private volatile int state; //同步状态
protected final int getState(); //获取同步状态 protected final void setState(int newState); //设置同步状态 protected final boolean compareAndSetState(int expect, int update); //CAS设置同步状态
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
private Node addWaiter(Node mode) { //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) Node node = new Node(Thread.currentThread(), mode); //尝试快速方式直接放到队尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失败则通过enq入队。 enq(node); return node; }
private Node enq(final Node node) { //CAS"自旋",直到成功加入队尾 for (;;) { Node t = tail; if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。 if (compareAndSetHead(new Node())) tail = head; } else {//正常流程,放入队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//标记是否成功拿到资源 try { boolean interrupted = false;//标记等待过程中是否被中断过 //又是一个“自旋”! for (;;) { final Node p = node.predecessor();//拿到前驱 //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。 if (p == head && tryAcquire(arg)) { setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了! failed = false; return interrupted;//返回等待过程中是否被中断过 } //如果自己可以休息了,就进入waiting状态,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//拿到前驱的状态 if (ws == Node.SIGNAL) //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了 return true; if (ws > 0) { /* * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。 * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)! */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//调用park()使线程进入waiting状态 return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。 }
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V>
public class FutureTaskExample { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i ) { Thread.sleep(10); result = i; } return result; } }); Thread computeThread = new Thread(futureTask); computeThread.start(); Thread otherThread = new Thread(() -> { System.out.println("other task is running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); otherThread.start(); System.out.println(futureTask.get()); } }
other task is running... 4950
public class ProductorConsumer { private static BlockingQueue<String> quene = new ArrayBlockingQueue<>(5); private static class Productor extends Thread{ @Override public void run() { try { quene.put("product"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("productor..."); } } private static class Consumer extends Thread{ @Override public void run() { try { String product = quene.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("consumer..."); } } public static void main(String[] args) { for(int i = 0; i < 2; i ){ Productor productor = new Productor(); productor.start(); } for(int i = 0; i < 5; i ){ Consumer consumer = new Consumer(); consumer.start(); } for(int i = 0; i < 3; i ){ Productor productor = new Productor(); productor.start(); } } }
productor...productor...consumer...consumer...productor...productor...consumer...consumer...productor...consumer...
import java.util.concurrent.RecursiveTask; public class ForkJoinExample extends RecursiveTask<Integer> { private final int threshold = 5; private int first; private int last; public ForkJoinExample(int first, int last) { this.first = first; this.last = last; } @Override protected Integer compute() { int result = 0; if (last - first <= threshold) { // 任务足够小则直接计算 for (int i = first; i <= last; i ) { result = i; } } else { // 拆分成小任务 int middle = first (last - first) / 2; ForkJoinExample leftTask = new ForkJoinExample(first, middle); ForkJoinExample rightTask = new ForkJoinExample(middle 1, last); leftTask.fork(); rightTask.fork(); result = leftTask.join() rightTask.join(); } return result; } }