0%

Atomic原子类

Atomic原子类的学习

介绍

  • 原子类说简单点就是具有原子/原子操作特征的类

    • 所谓原子操作就是指多线程环境下,一旦开始操作就不会被其他线程干扰的操作
    • 使用原子类的优势在于在对对应的数据进行更新的时候不需要加锁,避免使用synchronized的相对较高的开销,也可以实现线程安全(在没有Atomic原子类时,直接使用synchronized包裹来实现数据更新时的原子性,但是开销比较大,而Atomic的出现就是直接在类的基础上进行改造实现更改的原子性
  • 原子类的位置在java.util.concurrent.atomic

    JUC原子类概览
    • 这个包提供了对于基本类型,数组,引用类型以及对象属性的原子操作操作方法,以保证线程安全,根据可以操作的类型,将原子类分为4类
      • 基本类型
        • 使用原子的方式更新基本类型
          • AtomicInteger:整型原子类
          • AtomicLong:长整型原子类
          • AtomicBoolean :布尔型原子类
      • 数组类型
        • 使用原子的方式更新数组里的某个元素
          • AtomicIntegerArray:整型数组原子类
          • AtomicLongArray:长整型数组原子类
          • AtomicReferenceArray :引用类型数组原子类
      • 引用类型基本类型原子类只能更新一个变量,如果需要原子更新多个变量(或者说是多个属性),需要使用引用类型原子类
        • AtomicReference:引用类型原子类
        • AtomicMarkableReference:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,但是注意不能使用此类型解决ABA问题
        • AtomicStampedReference :原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题
      • 对象的属性修改类型(其使用场景在于原子的更改一个已有的类的属性,该类的属性并没有设计成原子类,但是又不至于使用加锁的方式更新,为了折中,就有了FieldUpdater这个原子类
        • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
        • AtomicLongFieldUpdater:原子更新长整型字段的更新器
        • AtomicReferenceFieldUpdater:原子更新引用类型里的字段

使用方法

  1. 基本类型的原子类

    • 三种类型的用法基本一致

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public final int get() //获取当前的值
      public final int getAndSet(int newValue)//获取当前的值,并设置新的值
      public final int getAndIncrement()//获取当前的值,并自增,相当于返回旧的值
      public final int incrementAndGet()//自增并返回当前的值,相当于返回新的值
      public final int getAndDecrement() //获取当前的值,并自减,相当于返回旧的值
      public final int decrementAndGet() //自减并获取当前的值,相当于返回新的值
      public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 旧值
      public final int addAndGet(int delta) //先加上预期的值再获取当前的值,新值
      boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
      public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值,但最终的结果还是新的值。
      • compareAndSet方法内部直接调用native方法public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  2. 数组类型的原子类

    • 用法基本一致

      1
      2
      3
      4
      5
      6
      7
      public final int get(int i) //获取 index=i 位置元素的值
      public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
      public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
      public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
      public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上设定的值
      boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
      public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
      • 上述都是获取旧值,类似的也可以获取新的值
  3. 引用类型的原子类

    • 基本类型原子类只能更新一个变量,如果需要原子更新多个变量(或者说是多个属性),需要使用引用类型原子类

    • 用法基本一致:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      		/**
      * 引用类型原子类
      */
      static void AtomicReferenceTest() {

      Person person = new Person("李佳", 12);
      AtomicReference<Person> atomicReference = new AtomicReference();
      atomicReference.set(person);

      Person newPerson = new Person("宋丹丹", 23);
      atomicReference.compareAndSet(person, newPerson);

      logger.info("name: {}, age: {}", atomicReference.get().getAge(), atomicReference.get().getName());

      person.setAge(10);
      // 引用对象属性的更改会被发现,不属于ABA问题
      logger.info("update success? [{}]", atomicReference.compareAndSet(person, person));

      }


      /**
      * 带更新版本的引用原子类型
      */
      static void AtomicStampedReferenceTest() {

      final Integer initialRef = 0, initialStamp = 0;
      // 初始化时应携带初始化的版本
      final AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(initialRef, initialStamp);

      // compare and set
      final Integer updateRef = 233, updateStamp = 999;

      // 注意: 不是使用get方法而是使用getReference和getStamp
      logger.info("更新状态: [{}], 新的值: [{}],新的标签: [{}]", asr.compareAndSet(initialRef, updateRef, initialStamp, updateStamp), asr.getReference(), asr.getStamp());

      // get的使用方法,只需提供一个长度为1的数组即可
      int[] holder = new int[1];
      logger.info("当前值:{} 当前的stamp: {}", asr.get(holder), holder[0]);

      // 单独更新版本而不更新引用
      // 1. 注意更新版本时也要提供预期的索引对象,也就是说更新版本也应该是线程安全的,毕竟版本是与引用对应的
      logger.info("更新结果:{}, 当前的引用:{}, 当前的版本: {}", asr.attemptStamp(updateRef, 88), asr.getReference(), asr.getStamp());

      // 无条件的重新设置当前值与版本号
      // 1. 注意是无条件的设置,所以不需要提供预期值,但是也不是线程安全的
      // 与compareAndSet方法是对应的
      asr.set(initialRef, initialStamp);
      logger.info("当前的值:{}, 当前的版本: {}", asr.getReference(), asr.getStamp());

      // weakCompareAndSet
      // 1. 其内部实际上就是完全调用了compareSet方法,但是这个方法不保证立即更新,推测可能是jvm通过方法名在native中做了转发
      logger.info("更新结果:{}, 当前的引用:{}, 当前的版本: {}", asr.weakCompareAndSet(initialRef, updateRef, initialStamp, updateStamp), asr.getReference(), asr.getStamp());

      }
      /**
      * 只有两个更新版本的带版本引用类型原子类
      */
      static void AtomicMarkableReferenceTest() {

      final Boolean initialRef = null, initialStamp = false;
      // 初始化时应携带初始化的版本
      final AtomicMarkableReference<Boolean> asr = new AtomicMarkableReference<>(initialRef, initialStamp);

      // compare and set
      final Boolean updateRef = true, updateStamp = true;

      // 注意: 不是使用get方法而是使用getReference和isMarked
      logger.info("更新状态: [{}], 新的值: [{}],新的标签: [{}]", asr.compareAndSet(initialRef, updateRef, initialStamp, updateStamp), asr.getReference(), asr.isMarked());

      // get的使用方法,只需提供一个长度为1的数组即可
      boolean[] holder = new boolean[1];
      logger.info("当前值:{} 当前的stamp: {}", asr.get(holder), holder[0]);

      // 单独更新版本而不更新引用
      // 1. 注意更新版本时也要提供预期的索引对象,也就是说更新版本也应该是线程安全的,毕竟版本是与引用对应的
      logger.info("更新结果:{}, 当前的引用:{}, 当前的版本: {}", asr.attemptMark(updateRef, false), asr.getReference(), asr.isMarked());

      // 无条件的重新设置当前值与版本号
      // 1. 注意是无条件的设置,所以不需要提供预期值,但是也不是线程安全的
      // 与compareAndSet方法是对应的
      asr.set(initialRef, initialStamp);
      logger.info("当前的值:{}, 当前的版本: {}", asr.getReference(), asr.isMarked());

      // weakCompareAndSet
      // 1. 其内部实际上就是完全调用了compareSet方法,但是这个方法不保证立即更新,推测可能是jvm通过方法名在native中做了转发
      logger.info("更新结果:{}, 当前的引用:{}, 当前的版本: {}", asr.weakCompareAndSet(initialRef, updateRef, initialStamp, updateStamp), asr.getReference(), asr.isMarked());
      }
  4. 对象属性修改类型原子类

    1. 其作用在于原子更新对象的属性

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      /**
      * 类属性更改类型原子类的测试
      */
      static void AtomicIntegerFieldUpdaterTest() {
      // 使用方法相当于使用包装类
      AtomicIntegerFieldUpdater<Person> a = AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
      Person person = new Person("李佳", 13);
      logger.info("原有的年龄:{}", a.getAndIncrement(person));
      logger.info("更新后的年龄: {}", a.get(person));
      }
      • **注意:被改造成原子更新的属性应当被volatile修饰,否则会抛出java.lang.IllegalArgumentException: Must be volatile type异常 **
      • 原子的更改一个已有的类的属性,该类的属性并没有设计成原子类,但是又不至于使用加锁的方式更新,为了折中,就有了FieldUpdater这个原子类

原子类实现线程安全的方式(以AtomicInteger为例)

  • AtomicInteger 类主要利用 CAS (compare and swap) 自旋+ volatile来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升

    • CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值
      • CAS是一种原子操作,它是一种系统原语,是一条CPU的原子指令,从CPU层面保证它的原子性
        • CAS起作用的过程包括了旧值(期待值)的判断,与新值的赋予,看起来是两个步骤,但实际上这整个的过程都是原子操作,也就是说不可能出现,某线程判断旧值之后,在准备更新新值前,其余的线程更改了值
          • 但是确实在获取期望值与比较期望值之间出现ABA问题
      • 与Synchronized相比,CAS更新值失败后可以重试(自旋)或放弃,而不是一直挂起(话说JDK1.7之后的sync也会自旋,使用的也是CAS
    • 关于volatile的使用,AtomicInteger中存储数据的属性valuevolatile修饰,保证其可见性,这使得Unsafe类可以在JVM层面随时获取该属性的最新的值
      • Unsafe中的CAS方法在获取期待值的时候,也是使用Volatile语义来获取指定偏移位置的字段(value属性)的值
    • Atomic相关类中的CAS方法以及获取特定字段的值的方法来自于Unsafe类,关于Unsafe类详见对应的文章
  • 源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // Unsafe类为AtomicInteger类提供CAS方法
    // Unsafe是单例的并且只能在BootStrap Class Loader加载的源码类中调用getUnsafe方法
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
    try {
    // objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址
    valueOffset = unsafe.objectFieldOffset
    (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
    }
    // 数据存储在value变量中,使用volatile修饰,保证任何时刻线程都能获取最新的值
    private volatile int value;
    • objectFieldOffset方法获得对象的实例字段的相对偏移位置因为是相对偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内存模型的实现细节更相关
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 以getAndSet方法的内部源码举例
    public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

    public final int getAndSetInt(Object var1, long var2, int var4) {

    // CAS 的典型用法,使用do-while循环
    int var5;
    do {
    // 获取旧的值,也就是期望的值
    var5 = this.getIntVolatile(var1, var2);
    // compareAndSwapInt为true表示更新成功,跳出循环,否则一直循环
    // 只有判断期望的值就是现在的值,也就是现在没有其他线程正在操作此值时,才会更新值,并且更新的过程是原子的,这个期望值可以通过getIntVolatile获取,也可以自己提供,比如compareAndSet方法
    } while(!this.compareAndSwapInt(var1, var2, var5, var4));

    return var5;
    }

    public native int getIntVolatile(Object var1, long var2);
    • CAS自旋的经典用法就是do-while循环,注意要将获取旧值的操作放到循环体内,因为一旦失败就需要更新旧值才行
      • 经典CAS自旋步骤(do-while循环)
        • 首先根据value属性的偏移值与实例地址获取value属性的最新值(以volatile语义进行加载)(当前值)
        • 使用cas方法设置为新的值,如果设置成功退出do-while循环,否则重新进入do-while循环继续获得最新的值,再尝试更新
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 使用AtominIntegr的compareAndSet方法自己实现的incrementAndGet方法
    // 自己实现AtomicInteger的incrementAndGet方法
    public static int incrementAndGet (AtomicInteger var) {
    // 局部变量不用担心线程安全
    int prev;
    int next;

    // 使用CAS时的经典用法就是使用do while 循环(或者说是自旋)
    do {
    // 获取当前的值
    prev = var.get();
    // 进行加1操作,注意这个操作是线程安全的,因为是使用的局部变量进行操作的,而不是共享变量
    next = prev + 1;
    // 每次操作完之后检查现在的值还是不是之前获取的值,如果是则没有其他线程进行写干扰,返回true,并进行更新,如果值更改了,说明其他线程
    // 进行了写操作,则不进行更新并返回false,继续循环尝试更新
    }while (!var.compareAndSet(prev, next));
    // 跳出循环后 表示更新成功则返回新值
    return next;

    }

    // AtomicInteger的compareAndSet方法
    public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  • Java中实现CAS操作的底层方法是sun.misc.Unsafe这个类下的几个native方法,底层使用c++实现的

    1
    2
    3
    4
    5
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    • Unsafe类里面还有其它方法用于不同的用途。比如支持线程挂起和恢复的parkunpark, LockSupport类底层就是调用了这两个方法。还有支持反射操作的allocateInstance()方法等

    • 关于Unsafe类的解析,参考Unsafe类介绍这篇文章

CAS实现原子操作的三大问题

  1. ABA问题

    1. 概念

      1. 所谓ABA问题,就是一个值原来是A,变成了B,又变回了A。这个时候使用CAS是检查不出变化的,但实际上却被更新了两次

        一个小偷,把别人家的钱偷了之后又还了回来,还是原来的钱吗,你老婆出轨之后又回来,还是原来的老婆嘛?ABA问题也一样,如果不好好解决就会带来大量的问题。最常见的就是资金问题,也就是别人如果挪用了你的钱,在你发现之前又还了回来。但是别人却已经触犯了法律

        ABA的场景出现时,预期值和旧值相同会让线程以为这个值没有被改变过,然而“值相同=没有被改变过”无法成立,即使第二个线程确确实实改过这个值,只不过又改回来了。“值曾经发生过改动”这个事件就无法被观测到。对于需要跟踪值的改变过程(比如记录值改变的次数)的场景来说ABA问题就是致命的

    2. ABA问题实际上分两类(对于Java只需关心第二类即可)第一种情况,针对没有GC机制的语言:

      1. 场景描述
        1. 在多线程的环境中,线程a从共享的地址X中读取到了对象A(期望值)。
        2. 在线程a准备对地址X进行更新之前,线程b将地址X中的值修改为了B。
        3. 接着线程b将地址X中的值又修改回了A。
        4. 最新线程a对地址X执行CAS,发现X中存储的还是对象A,对象匹配,CAS成功。
      2. 以上步骤发生在CAS原子操作的过程中,显然已经破坏了原子性,但是在拥有自动GC环境的编程语言,比如说java中,2,3的情况是不可能出现的,因为在java中,只要两个对象的地址一致,就表示这两个对象是相等的
      3. 2,3两步可能出现的情况就在像C++这种,不存在自动GC环境的编程语言中。因为可以自己控制对象的生命周期,如果我们从一个list中删除掉了一个对象,然后又重新分配了一个对象,并将其add back到list中去,那么根据 MRU memory allocation算法,这个新的对象很有可能和之前删除对象的内存地址是一样的。这样就会导致ABA的问题
    3. 第二种情况,针对Java来说

      1. 场景:

        考虑下面的情况,有一个链表里面的数据是A->B->C,我们希望执行一个CAS操作,将A替换成D,生成链表D->B->C。考虑下面的步骤:

        1. 线程a读取链表头部节点A。
        2. 线程b将链表中的B节点删掉,链表变成了A->C
        3. 线程a执行CAS操作,将A替换从D。
        4. 最后我们的到的链表是D->C,而不是D->B->C。
      2. 显然 CAS只关心了链表头部的元素是否是元素A(地址比较),而对于元素A的内容(链表指针的指向)的变动并不关心,看下边的例子证实此结论:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        /**
        * ABA 问题的模拟
        */
        static void ABATest () {
        List<Integer> a = new ArrayList<>(Arrays.asList(1,2,3,4));
        List<Integer> b = new ArrayList<>(Arrays.asList(5,6,7,4));
        List<Integer> c = new ArrayList<>(Arrays.asList(6,7,3,4));

        // 使用AtomicReference对列表实现原子操作
        AtomicReference<List<Integer>> atomicReference= new AtomicReference<>(a);
        System.out.println(atomicReference.compareAndSet(a,b));
        System.out.println(atomicReference.compareAndSet(b,a));
        a.add(133);
        // 无视数组内容的更改
        System.out.println(atomicReference.compareAndSet(a,c));
        }
        • 此类ABA问题可能会导致某些集合类的操作并不是原子性的,因为你并不能保证在CAS的过程中,有没有其他的节点发送变化
        • 上边提出的场景应该只是一个很明显易懂的场景,应该也有其他ABA问题的场景
  2. 解决方法

    1. 每一次对集合类型进行CAS更新成功后都应提交一个版本号,使用CAS更新前要提供预期版本号,如果版本号并非预期说明引用的内容被其他线程更新了(相当于除了CAS之外,手动添加了一个约束),版本号由程序员提供

      1. 从JDK 1.5开始,JDK的atomic包里提供了一个类AtomicStampedReference类来解决ABA问题

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        /**
        * ABA 问题的解决
        */
        static void ABASolutionTest () {

        Logger logger = LogManager.getLogger(AtomicDemo.class.getName());


        List<Integer> a = new ArrayList<>(Arrays.asList(1,2,3,4));
        List<Integer> b = new ArrayList<>(Arrays.asList(5,6,7,4));
        List<Integer> c = new ArrayList<>(Arrays.asList(6,7,3,4));

        AtomicStampedReference<List<Integer>> atomicReference= new AtomicStampedReference<List<Integer>>(a, 0);


        logger.info(atomicReference.compareAndSet(a,b,0, 1));

        logger.info(atomicReference.compareAndSet(b,a,1,2));
        a.add(133); // 执行更新时应当提交新的版本
        logger.info(atomicReference.compareAndSet(a,c,3,4));
        }
        • AtomicMarkableReference 不能解决ABA问题,AtomicMarkableReference是将一个boolean值作是否有更改的标记,本质就是它的版本号只有两个,true和false,修改的时候在这两个版本号之间来回切换,这样做并不能解决ABA的问题,只是会降低ABA问题发生的几率而已
        • 并不是其他的Atomic类型可以解决ABA问题,而是只能通过AtomicStampedReference来解决
    2. 使用自旋的开销比较大

      1. 如果长时间自旋还是不成功会导致开销较大,会占用大量的CPU资源

      2. 解决方法:

        1. 让JVM支持处理器提供的pause指令

          • pause指令能让自旋失败时cpu睡眠一小段时间再继续自旋,从而使得读操作的频率低很多,为解决内存顺序冲突而导致的CPU流水线重排的代价也会小很多

            自旋 CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一:它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二:它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率

        2. 通过读并发编程的代码,自己得出的结论:在自旋的代码中加入判断是否需要挂起,如果能挂起的话就直接挂起

        3. synchronized中的自适应自旋

    3. CAS本身只能保证一个共享变量的原子操作

      1. 使用AtomicReference,将多个变量放在一个类中
      2. 使用锁(废话)

参考