「并发编程」JUC并发容器类

java.util.concurrent包中,提供了两种类型的并发集合:一种是阻塞式,另一种是非阻塞式。

  • 阻塞式集合:当集合已满或为空时,被调用的添加(满)、移除(空)方法就不能立即被执行,调用这个方法的线程将被阻塞,一直等到该方法可以被成功执行
  • 非阻塞式集合:当集合已满或为空时,被调用的添加(满)、移除(空)方法就不能立即被执行,调用这个方法的线程不会被阻塞,而是直接则返回null或抛出异常。

1. 线程安全相关容器

1.1 线程安全-同步容器:

  1. ArrayList –> Vector,Stack
  2. HashMap –> HashTable(key、value不能为null)
  3. Collections.synchronizedXXX(List/Set/Map) //本质是对相应的容器进行包装,通过在方法中加synchronized同步锁来实现
  • 同步容器的同步原理就是在方法上用synchronized修饰。性能开销大
  • 单独使用里面的方法的时候,可以保证线程安全,但是,复合操作需要额外加锁来保证线程安全。

1.2 线程安全-并发容器:

  1. ArrayList –> **CopyOnWriteArrayList:保证最终一致性,写时复制,适用于读多写少**的并发场景
  2. HashSetTreeSet –> CopyOnWriteArraySetConcurrentSkipListSet
  3. HashMapTreeMap –> **ConcurrentHashMap**、ConcurrentSkipListMap

1.3 安全共享对象策略

  1. 线程限制:一个被线程限制的对象,由线程独占,并且只能被占有者修改
  2. 共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但不能修改
  3. 线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,其他线程无需额外的同步就可以通过公共接口随意访问它
  4. 被守护对象:被守护对象只能通过获取特定的锁来访问

2. CopyOnWrite机制

CopyOnWrite(简称COW),是计算机程序设计领域中的一种优化策略,也是一种思想–即写入时复制思想

  • CopyOnWrite中,对容器的修改操作加锁后,通过copy一个新的容器副本来进行修改,修改完毕后将容器替换为新的容器即可。
    • 这种方式的好处显而易见:通过copy一个新的容器来进行修改,这样读操作就不需要加锁,可以并发读,因为在读的过程中是采用的旧的容器,即使新容器做了修改对旧容器也没有影响,同时也很好的解决了迭代过程中其他线程修改导致的并发问题。
  • 从JDK1.5开始,java.util.concurrent包中提供了两个CopyOnWrite机制容器,分别为**CopyOnWriteArrayListCopyOnWriteArraySet**
  • CopyOnWriteArrayList通过使用**ReentrantLock锁**来实现线程安全:
    • 在添加、获取元素时,使用getArray()获取底层数组对象,获取此时集合中的数组对象;使用setArray()设置底层数组,将原有数组对象指针指向新的数组对象—-实以此来实现CopyOnWrite副本概念
    • 添加元素: 在添加元素之前进行加锁操作,保证数据的原子性。在添加过程中,进行数组复制,修改操作,再将新生成的数组复制给集合中的array属性。最后,释放锁;
      • 由于array属性被volatile修饰,所以当添加完成后,其他线程就可以立刻查看到被修改的内容。
    • 获取元素:在获取元素时,由于array属性被volatile修饰,所以每当获取线程执行时,都会拿到最新的数据。此外,添加线程在进行添加元素时,会将新的数组赋值给array属性,所以在获取线程中并不会因为元素的添加而导致本线程的执行异常。因为获取线程中的array和被添加后的array指向了不同的内存区域。

在执行add()时,为什么还要在加锁的同时又copy了一分新的数组对象?

  • 因为,在add()时候加了锁,首先不会有多个线程同时进到add中去,这一点保证了数组的安全。当在一个线程执行add时,又进行了数组的复制操作,生成了一个新的数组对象,在add后又将新数组对象的指针指向了旧的数组对象指针,注意此时是指针的替换,原来旧的数组对象还存在。这样就实现了,添加方法无论如何操作数组对象,获取方法在获取到集合后,都不会受到其他线程添加元素的影响。
  • CopyOnWrite机制的优缺点
    • 优点: CopyOnWriteArrayList保证了数据在多线程操作时的最终一致性
    • 缺点: 缺点也同样显著,那就是内存空间的浪费:因为在写操作时,进行数组复制,在内存中产生了两份相同的数组。如果数组对象比较大,那么就会造成频繁的GC操作,进而影响到系统的性能;

适用场景:读多写少的并发场景

3. ConcurrentHashMap

ConcurrentHashMap容器相较于CopyOnWrite容器在并发加锁粒度上有了更大一步的优化,它通过修改对单个hash桶元素加锁的达到了更细粒度的并发控制。

  • 在底层数据结构上,ConcurrentHashMapHashMap都使用了数组+链表+红黑树的方式,只是在HashMap的基础上添加了并发相关的一些控制。
  • JDK1.8中取消了segment分段锁,而采用CAS和synchronized来保证并发安全。synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。并且初始化操作大大简化,修改为lazy-load形式。

3.1 put方法过程

put方法内部是一个 putVal 的调用:

  1. 判断键值是否为null,为null抛出异常
  2. 调用spread()方法计算key的hashCode()获得哈希地址
  3. 判断Node[]数组(table)是否为空,若空则进行初始化操作
    • 需要注意的是这里并没有加synchronized,也就是允许多个线程去**尝试**初始化table,但是在初始化函数里面使用了CAS保证只有一个线程去执行初始化过程
  4. 使用(容量大小-1 & 哈希地址)计算下标,如果没有碰撞,使用CAS原子性操作放入桶中;插入失败(被别的线程抢先插入了)则进入下次循环。
  5. 如果该下标上的节点(头节点)的哈希地址为-1,代表需要扩容,该线程执行helpTransfer()方法协助扩容。
  6. 如果碰撞了(bucket不为空)且又不需要扩容,则进入到bucket中,且锁住该bucket,其他bucket不影响。
  7. 进入到bucket里面,首先判断这个bucket存储的是红黑树(哈希地址小于0)还是链表。
  8. 如果是链表,则遍历链表,若节点已经存在(key相同)就覆盖旧值,没有找到相同的节点就将新增的节点插入到链表尾部。如果是红黑树,则将节点插入。到这里释放锁。
  9. 判断该bucket上的链表长度是否链表长度超过阀值(TREEIFY_THRESHOLD==8),大于则调用treeifyBin()方法将链表转成红黑树。
  10. 调用addCount()方法,作用是将ConcurrentHashMap的键值对数量+1,还有另一个作用是检查ConcurrentHashMap是否需要扩容。

总结:

  • JDK8中的实现也是锁分离的思想,它把锁分的比segment(JDK1.5)更细一些,只要hash不冲突,就不会出现并发获得锁的情况。它首先使用无锁操作CAS插入头结点,如果插入失败,说明已经有别的线程插入头结点了,再次循环进行操作。如果头结点已经存在,则通过synchronized获得头结点锁,进行后续的操作。性能比segment分段锁又再次提升

3.2 ConcurrentHashMap多线程环境下扩容

  • transfer()方法为ConcurrentHashMap扩容操作的核心方法。由于ConcurrentHashMap支持多线程扩容,而且也没有进行加锁,所以实现会变得有点儿复杂。整个扩容操作分为两步:
    • 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
    • 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗。
  • 扩容的时机:
    1. 如果新增节点之后,所在链表的元素个数达到了阈值 8,则会调用treeifyBin方法把链表转换成红黑树,不过在结构转换之前,会对数组长度进行判断:
      • 如果数组长度n小于阈值MIN_TREEIFY_CAPACITY,默认是64,则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。
    2. 新增节点之后,会调用addCount方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值时,会触发transfer方法,重新调整节点的位置。
  • JDK8的源码里面就引入了一个**ForwardingNode**类,在一个线程发起扩容的时候,就会改变sizeCtl这个值,其含义如下:
    • sizeCtl :默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
    • -1 代表table正在初始化
    • -N 表示有N-1个线程正在进行扩容操作
    • 其余情况:
      1. 如果table未初始化,表示table需要初始化的大小。
      2. 如果table初始化完成,表示table的容量,默认是table大小的0.75
  • 扩容时候会判断sizeCtl的值,如果超过阈值就要扩容,首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素f,初始化一个forwardNode实例fwd,如果f == null,则在table中的i位置放入fwd,否则采用头插法的方式把当前旧table数组的指定任务范围的数据给迁移到新的数组中,然后给旧table原位置赋值fwd。直到遍历过所有的节点以后就完成了复制工作,把table指向nextTable,并更新sizeCtl为新数组大小的0.75倍 ,扩容完成。在此期间如果其他线程的有读写操作都会判断head节点是否为forwardNode节点,如果是就帮助扩容
  • 扩容时读写操作如何进行
    1. 对于get读操作,如果当前节点有数据,还没迁移完成,此时不影响读,能够正常进行。如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时get线程会帮助扩容。
    2. 对于put/remove写操作,如果当前链表已经迁移完成,那么头节点会被设置成fwd节点,此时写线程会帮助扩容,如果扩容没有完成,当前链表的头节点会被锁住,所以写线程会被阻塞,直到扩容完成。

总结: ConcurrentHashMap扩容的原理是新生成原来2倍的数组,然后拷贝旧数组数据到新的数组里面,在多线程情况下,这里面如果注意线程安全问题,在解决安全问题的同时,我们也要关注其效率,这才是并发容器类的最出色的地方。

3.3 size、mappingCount方法

  • sizemappingCount方法都是用来统计table的size
  • 这两者不同的地方在size返回的是一个int类型,即可以表示size的范围是[-2^31,2^31-1],超过这个范围就返回int能表示的最大值
  • mappingCount返回的是一个long类型,即可以表示size的范围是[-2^63,2^63-1]
  • 这两个方法都是调用的sumCount()方法实现统计。
  • 对于size和迭代器是弱一致性
    • volatile修饰的数组引用是强可见的,但是其元素却不一定,所以,这导致size的根据sumCount的方法并不准确
    • 同理Iteritor的迭代器也一样,并不能准确反映最新的实际情况

4. ConcurrentSkipListMap

ConcurrentSkipListMap内部使用跳表(SkipList这种数据结构来实现,他的结构相对红黑树来说非常简单理解,实现起来也相对简单,而且在理论上它的查找、插入、删除时间复杂度都为log(n)。在并发上,ConcurrentSkipListMap采用无锁的**CAS+自旋**来控制。

  • 跳表简单来说就是一个多层的链表,底层是一个普通的链表,然后逐层减少,通常通过一个简单的算法实现每一层元素是下一层的元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。
  • ConcurrentSkipListMap的**put**(插入):
    • 调用doPut()方法,可以分为3大步来理解:
    • 第一步获取前继节点后通过CAS来插入节点;
    • 第二步对level层数进行判断,如果大于最大层数,则插入一层;
    • 第三步插入对应层的数据。整个插入过程全部通过CAS自旋的方式保证并发情况下的数据正确性。

5. volatile & Atmoic & UnSafe

  • **volatile**作用:①多线程间的可见性、②阻止指令重排序
  • **Atmoic系列类**提供了原子性操作,保障多线程下的安全
  • **UnSafe类**的作用:①内存操作、②字段的定位与修改(底层)、③线程挂起与恢复、④CAS操作(乐观锁)