《Java并发编程实战》读书笔记四:常用的并发基础构建模块

《Java并发编程实战》读书笔记四:常用的并发基础构建模块

常用的并发基础构建模块

同步容器类

是一类将对应容器的状态都封装起来,并对每个共有方法都进行同步(加synchronized关键字修饰)的类,相当于让所有对容器的状态的访问串行化,虽然安全但是并发性差。

  • Vector
  • HashTable

对容器进行迭代操作时,我们要考虑它是不是会被其他的线程修改,如果是我们自己写代码,可以考虑通过如下方式对容器的迭代操作加锁:

synchronized (vector) {
    for (int i = 0; i < vector.size(); i++)
        doSomething(vector.get(i));
}

不过 Java 自己的同步容器类并没有考虑并发修改的问题,它主要采用了一种及时失败的方法,即一旦容器被其他线程修改,它就会抛出异常,例如 Vector 类,它的内部实现是这样的:

Vector#iterator()会返回一个 Vector 的内部类Itr implement Iterator<E>,在 Itr 的next()remove()方法中有如下代码:

synchronized (Vector.this) { // 类名.this:在内部类中,要用到外围类的this对象,使用“外围类名.this”
    checkForComodification();	// 在进行next和remove操作前,会先检查以下容器是否被修改
    ...
}

/* checkForComodification()方法 */
final void checkForComodification() {
    if (modCount != expectedModCount) // 在Itr的成员变量中有一个:int exceptedModCount = modCount;
        throw new ConcurrentModificationException(); // 如果容器被修改了,modCount会变
}

因此,我们在调用 Vector 的如下方法时,要小心,因为它们会隐式的调用 Vector 的迭代操作。

  • toString
  • hashCode
  • equals
  • containsAll
  • removeAll
  • retainAll
  • 容器作为参数的构造函数

并发容器类

通过上一小节的分析,我们发现同步容器类的性能实在太差,所以我们可以通过并发容器类代替同步容器类,来提高系统的可伸缩性。我们主要介绍ConcurrentHashMapCopyOnWriteArrayList

ConcurrentHashMap

特点

  • ConcorrentHashMap 实现了 ConcorrentMap 接口,能在并发环境实现更高的吞吐量,而在单线程环境中只损失很小的性能;
  • 采用分段锁,使得任意数量的读取线程可以并发地访问 Map,一定数量的写入线程可以并发地修改 Map;
  • 不会抛出 ConcorrentModificationException,它返回迭代器具有“弱一致性”,即可以容忍并发修改,但不保证将修改操作反映给容器;
  • size() 的返回结果可能已经过期,只是一个估计值,不过 size() 和 isEmpty() 方法在并发环境中用的也不多;
  • 提供了许多原子的复合操作:
    • V putIfAbsent(K key, V value);:K 没有相应映射才插入
    • boolean remove(K key, V value);:K 被映射到 V 才移除
    • boolean replace(K key, V oldValue, V newValue);:K 被映射到 oldValue 时才替换为 newValue

ConcurrentHashMap 内部结构(1.7及之前):
image.png

  • 在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16;
  • Segment 是基于 ReentrantLock 的扩展实现的,在 put 的时候,会对修改的区域加锁。

锁分段实现原理

锁分段:
不同线程在同一数据的不同部分上不会互相干扰,例如,ConcurrentHashMap 支持 16 个并发的写入器,是用 16 个锁来实现的。它的实现原理如下:

  • 使用了一个包含 16 个锁的数组,每个锁保护所有散列桶的 1/16,其中第 N 个散列桶由第(N % 16)个锁来保护;

  • 这大约能把对于锁的请求减少到原来的 1/16,也是 ConcurrentHashMap 最多能支持 16 个线程同时写入的原因;

  • 对于 ConcurrentHashMap 的 size() 操作,为了避免枚举每个元素,ConcurrentHashMap 为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值,而不是维护一个全局计数;

  • 代码示例:

    	public class StripedMap {
    	    // 同步策略:buckets[n]由locks[n % N_LOCKS]保护
    	    private static final int N_LOCKS = 16;
    	    private final Node[] buckets;
    	    private final Object[] locks; // N_LOCKS个锁
    	    private static class Node {
    	        Node next;
    	        Object key;
    	        Object value;
    	    }
    	    public StripedMap(int numBuckets) {
    	        buckets = new Node[numBuckets];
    	        locks = new Object[N_LOCKS];
    	        for (int i = 0; i < N_LOCKS; i++)
    	            locks[i] = new Object();
    	    }
    	    private final int hash(Object key) {
    	        return Math.abs(key.hashCode() % buckets.length);
    	    }
    	    public Object get(Object key) {
    	        int hash = hash(key);
    	        synchronized (locks[hash % N_LOCKS]) { // 分段加锁
    	            for (Node m = buckets[hash]; m != null; m = m.next)
    	                if (m.key.equals(key))
    	                    return m.value;
    	        }
    	        return null;
    	    }
    	    public void clear() {
    	        for (int i = 0; i < buckets.length; i++) {
    	            synchronized (locks[i % N_LOCKS]) { // 分段加锁
    	                buckets[i] = null;
    	            }	
    	        }
    	    }
    	}
    

注意

  • 关于 put 操作:
    • 是否需要扩容
      • 在插入元素前判断是否需要扩容,
      • 比 HashMap 的插入元素后判断是否需要扩容要好,因为可以插入元素后,Map 扩容,之后不再有新的元素插入,Map就进行了一次无效的扩容
    • 如何扩容
      • 先创建一个容量是原来的2倍的数组,然后将原数组中的元素进行再散列后插入新数组中
      • 为了高效,ConcurrentHashMap 只对某个 segment 进行扩容
  • 关于 size 操作:
    • 存在问题:如果不进行同步,只是计算所有 Segment 维护区域的 size 总和,那么在计算的过程中,可能有新的元素 put 进来,导致结果不准确,但如果对所有的 Segment 加锁,代价又过高。
    • 解决方法:重试机制,通过获取两次来试图获取 size 的可靠值,如果没有监控到发生变化,即 Segment.modCount 没有变化,就直接返回,否则获取锁进行操作。

CopyOnWriteArrayList

  • 只要正确发布了这个 list,它就是不可变的了,所以随便并发访问,当需要修改时,就创建一个新的容器副本替代原来的,以实现可变性;
  • 应用于迭代操作远多于修改操作的情形,如:事件通知系统,分发通知时需要迭代已注册监听器链表,并调用每一个监听器,一般注册和注销事件监听器的操作远少于接收事件通知的操作。

并发工具类

可以根据自身状态协调线程的控制流:

  • 生产者消费者模式:阻塞队列(BlockingQueue)
  • 并发流程控制:
    • 闭锁(CountDownLatch)
    • 栅栏(Barrier)
    • 信号量(Semaphore)
  • 线程间的数据交换:交换者(Exchanger)

BlockingQueue

BlockingQueue 提供了可阻塞的 put 和 take 方法:(都是阻塞方法,会抛出 InterruptException 异常)

  • 如果队列为空,take 方法一直被阻塞,直到队列中出现一个可用元素
  • 如果队列已满,put 方法一直被阻塞,直到队列中出现可用空间

是设计 “生产者 -- 消费者模式” 的利器!

Java 中支持的阻塞队列

阻塞队列类结构特点
ArrayBlockingQueue数组FIFO
LinkedBlockingQueue链表FIFO
PriorityBlockingQueue优先队列按优先级先后出队
DelayQueue使用优先队列实现向队列中 put 元素时指定多久才能从队列中获取当前元素,只有当延时时间到了,才能从队列中获取该元素,队列元素要实现 Delayed 接口,可以用来设计缓存系统
SynchronousQueue不存储元素的阻塞队列每一个 put 操作等待一个 take 操作,否则无法继续添加元素
LinkedTransferQueue链表transfer():如果当前有在等待接收元素的消费者,可以把新元素直接给消费者,没有则阻塞;tryTransfer():如果没有消费者等待会返回 false;它们的区别就在于会不会立即返回
LinkedBlockDeque链表(双向队列)双向队列可用来实现工作密取模式,即如果一个消费者完成了自己的 Deque 中的全部任务,它可以偷偷的去其他消费者的 Deque 的尾部获取工作,以保证所有线程都处于忙碌状态,可应用于爬虫。

阻塞队列的实现原理

对于阻塞队列的实现原理,我们最关注的是其通知模式的实现,即 BlockingQueue 是如何在队列满时通知 put 操作等待,和如何在队列空时通知 take 操作等待的。

我们可以通过阅读 ArrayBlockingQueue 的源码得知:

  • ArrayBlockingQueue 中有一个 ReentrantLock lock
  • 这个 lock 给我们提供了两个 Condition:notEmpty 和 notFull
  • take 操作中,会以 while 循环的方式轮询 count == items.length,如果为 true,就 notFull.await(),这个阻塞状态需要通过 dequeue 方法中notFull.signal() 来解除;
  • put 操作中,会以 while 循环的方式轮询 count == 0,如果为 true,就 notEmpty.await(),这个阻塞状态需要通过 enqueue 方法中notEmpty.signal() 来解除。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
		implements BlockingQueue<E>, java.io.Serializable {
	int count;  // 队列中元素的个数
	final ReentrantLock lock;  // 下面的两个Condition绑定在这个锁上
	private final Condition notEmpty;  // 用来等待take的条件
	private final Condition notFull;  // 用来等待put的条件
	
	public ArrayBlockingQueue(int capacity, boolean fair) {
        // 省略...
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();  // 加可中断锁
        try {
            while (count == items.length)
                notFull.await();  // 轮询count值,等待count < items.length
            enqueue(e);  // 包含notFull.signal();
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();  // 轮询count值,等待count > 0
            return dequeue();  // 包含notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();  // 会唤醒在等待的take操作
    }
    
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();  // 会唤醒在等待的put操作
        return x;
    }
}

CountDownLatch

可以让一个或多个线程等待其他线程操作完成在继续执行,不可以循环使用,只能使用一次。

API

public CountDownLatch(int count);  // 参数count为计数值

// 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行,或等待中线程中断
public void await() throws InterruptedException;

// 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;

public void countDown();  // 将count值减1

使用 CountDownLatch 替代 join()

public class CountDownLatchAndJoin {
    static CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread() {
            @Override
            public void run() {
                System.out.println(1);
                countDownLatch.countDown();
                System.out.println(2);
                countDownLatch.countDown();
            }
        }.start();
        countDownLatch.await();
        System.out.println("Main Finished");
    }
}
/*
输出:
1
2
Main Finished  // main线程会等待main启动的线程执行完再结束
*/

CyclicBarrier

可以让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,让所有线程通过,并且这个屏障可以循环使用(这点和 CountDownLatch 很不同)。

API

/**
 * parties指让多少个线程或者任务等待至barrier状态
 * barrierAction为当这些线程都达到barrier状态时会执行的内容
 */
public CyclicBarrier(int parties, Runnable barrierAction);  // 常用
public CyclicBarrier(int parties);

public int await()
        throws InterruptedException, BrokenBarrierException;
public int await(long timeout, TimeUnit unit)
        throws InterruptedException, BrokenBarrierException, TimeoutException;

Demo

public class CyclicBarrierDemo2 {
    static CyclicBarrier barrier = new CyclicBarrier(2, new After());

    public static void main(String[] args) {
        new Thread() {
            @Override
            public void run() {
                System.out.println("In thread");
                try {
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();

        System.out.println("In main");
        try {
            barrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Finish.");
    }

    static class After implements Runnable {
        @Override
        public void run() {
            System.out.println("All reach barrier.");
        }
    }
}
/*
输出:
In main  // main线程到达屏障之后会被阻塞
In thread
All reach barrier.  // thread到达屏障之后会执行After的run
Main finish  // 然后被阻塞的main线程和thread线程才会继续执行下去
Thread finish
*/

Semaphore

用来控制同时访问特定资源的线程数量。

API

// 参数permits表示许可数目,即同时可以允许多少线程进行访问,默认是非公平的
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

// 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
    sync = (fair) ? new FairSync(permits) : new NonfairSync(permits);
}

/* 会阻塞等待的acquire方法 */
public void acquire() throws InterruptedException;  // 获取一个许可
public void acquire(int permits) throws InterruptedException;  // 获取permits个许可
public void release();  // 释放一个许可
public void release(int permits);  // 释放permits个许可

/* 会阻塞但不等待,立即返回的acquire方法 */
// 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { }

// 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException { }

// 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { }

// 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException { }

Demo

public class SemaphoreDemo2 {
    private static final int THREAD_COUNT = 10;

    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

    private static Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("save data");
                        Thread.sleep(1000);
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}
/*
结果会两个两个的蹦出:save data,说明同时只有两个线程能拿到资源
*/

Exchanger

一个用于两个线程间交换数据的工具类。如果第一个线程先执行了exchange(V)方法,它会阻塞在那里,等待第二个线程执行exchange(V)方法,exchange(V)会返回另一个线程传入的数据。

API

public Exchanger();

public V exchange(V x)
           throws InterruptedException;

public V exchange(V x, long timeout, TimeUnit unit)
           throws InterruptedException, TimeoutException;

Demo

public class ExchangeDemo {
    private static Exchanger<String> exch = new Exchanger<>();

    private static ExecutorService pool = Executors.newFixedThreadPool(2);

    // 用来保证线程池在两个线程执行完之后再关闭
    private static CountDownLatch latch = new CountDownLatch(2);

    public static void main(String[] args) {
        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String data = "第一个线程的结果";
                    Thread.sleep(100);
                    String res = exch.exchange(data);
                    System.out.println("我是第一个线程,我收到另一个线程的结果为:" + res);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String data = "第二个线程的结果";
                    Thread.sleep(1000);
                    String res = exch.exchange(data);
                    System.out.println("我是第二个线程,我收到另一个线程的结果为:" + res);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        try {
            latch.await();  // 等待两线程执行完,然后关闭线程池
        } catch (Exception e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }
}