if (this.value == A) { this.value = B return true; } else { return false; }
// AtomicInteger.java private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
// AtomicInteger.java public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) delta; } // Unsafe.java // compareAndSwapInt(var1, var2, var5, var5 var4)其实换成 compareAndSwapInt(obj, offset, expect, update)比较清楚,意思就是如果 obj 内的 value 和 expect 相等,就证明没有其他线程改变过这个变量,那么就更新它为 update,如果这一步的 CAS 没有成功,那就采用自旋的方式继续进行 CAS 操作,取出乍一看这也是两个步骤了啊,其实在 JNI 里是借助于一个 CPU 指令完成的。所以还是原子操作。 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 var4)); return var5; } // 该方法为本地方法,有四个参数,分别代表:对象、对象的地址、预期值、修改值 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public class ProducerConsumer { public static void main(String[] args) { ProducerConsumer main = new ProducerConsumer(); Queue<Integer> buffer = new LinkedList<>(); int maxSize = 5; new Thread(main.new Producer(buffer, maxSize), "Producer1").start(); new Thread(main.new Consumer(buffer, maxSize), "Comsumer1").start(); new Thread(main.new Consumer(buffer, maxSize), "Comsumer2").start(); } class Producer implements Runnable { private Queue<Integer> queue; private int maxSize; Producer(Queue<Integer> queue, int maxSize) { this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.size() == maxSize) { try { System.out.println("Queue is full"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Random random = new Random(); int i = random.nextInt(); System.out.println(Thread.currentThread().getName() " Producing value : " i); queue.add(i); queue.notifyAll(); } } } } class Consumer implements Runnable { private Queue<Integer> queue; private int maxSize; public Consumer(Queue<Integer> queue, int maxSize) { super(); this.queue = queue; this.maxSize = maxSize; } @Override public void run() { while (true) { synchronized (queue) { while (queue.isEmpty()) { try { System.out.println("Queue is empty"); queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() " Consuming value : " queue.remove()); queue.notifyAll(); } } } } }