京东一面之CountDownLatch用法及底层原理解析

(39) 2024-02-13 11:12

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说京东一面之CountDownLatch用法及底层原理解析,希望能够帮助你!!!。

本文通过在京东面试遇到的问题总结而出,如有不对地方,请及时批评指正。篇幅较长,请耐心阅读

简介

CountDownLatch是Java并发编程中的一个同步辅助工具。用来协调不同线程程之间的任务同步。一般用于将一个复杂任务按照不同的执行顺序拆分成多个相互独立的子任务执行,其中某个线程需要等待其他线程执行完成后才能执行。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第1张

使用步骤

1.定义两个CountDownLatch对象,分别作用于不同的主线程和子线程。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第2张

2.创建多个子线程任务并执行。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第3张

3.分别将两个CountDownLatch对象的实例传入子线程任务中。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第4张

4.主线程中先后调用主线程任务执行。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第5张

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第6张

5.根据任务执行,打印结果。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第7张

6.执行过程详解

先开启子线程执行任务,但是在子线程Work的run方法中startSignal.await();挂起等待。主线程执行完第一个doSomethingElse()任务,然后调用startSignal.countDown(),释放同步锁,此时子线程得到了执行的机会。主线程调用doneSignal.await()方法阻塞当前线程,直到所有子线程执行完成。调用downSignal.countDown(),将同步锁计数减到0,主线程恢复执行。执行最后一个doSomethingElse()任务。

源码分析

实现原理

CountDownLatch内部定义了一个静态类Sync继承于队列同步器AbstractQueuedSynchronizer(简称AQS),AQS是用来构建同步锁的基本框架,它内部使用一个Int类型的state表示同步状态、,通过内置的FIFO队列来完成资源获取的线程的排队工作。

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第8张

设计模式

AQS的设计基于模板方法的设计模式。模板方法是一种行为设计模式,在父类中定义了一些算法框架,子类在实现时不能修改算法结构,只能重写特定的步骤

核心方法1-countDown方法

    public void countDown() {
        sync.releaseShared(1);
    }

作用:减少锁存器的计数,如果计数为0,则释放所有等待的线程,如果当前计数大于0,则依次递减。

CountDown内部调用sync的releaseShared方法进行锁的释放

  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

sync.releaseShared方法其实是AQS的方法,在AQS的releaseShared方法中,tryReleaseShared作为一个抽象方法有子类具体实现,然后调用 doReleaseShared()。

由于Sync继承与AQS,所有内部实现了tryReleaseShared方法

protected boolean tryReleaseShared(int releases) {  
    //通过循环遍历,释放锁资源
    for (;;) {
        int c = getState();
         if (c == 0)
            return false;
         int nextc = c-1;
        if (compareAndSetState(c, nextc))
         return nextc == 0;
            }
        }

Sync内部通过循环遍历,对锁的同步状态进行比较。

doReleaseShared()的具体实现在AQS中

 private void doReleaseShared() {
       
         //循环
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { //如果队链表不为null
                int ws = h.waitStatus;//获取节点的等待状态
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            //CAS操作失败,继续
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                //当前状态为0,且CAS失败,则继续
            }
            if (h == head)                   //如果只剩头节点,则跳出循环
                break;
        }
    }

总结await的执行过程如下:

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第9张

核心方法2-await方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await方法调用sync的acquireSharedInterruptibly(1)。acquireSharedInterruptibly方法在AQS内部

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果线程中断
        if (Thread.interrupted())
            //抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//状态为-1,
            //进入等待队列
            doAcquireSharedInterruptibly(arg);
    }

当前线程如果没有中断,则通过tryAcquireShared判断状态。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; 
}

当前线程如果未获取到锁资源,进入等待队列,执行doAcquireSharedInterruptibly方法。

 private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { //如果是头节点
                    int r = tryAcquireShared(arg); //获取状态
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); //设置头节点状态
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted()) //获取失败,中断线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在AQS中调用 setHeadAndPropagate(node, r),设置头节点状态。

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);//设置头节点
    
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;//获取下一个节点
            if (s == null || s.isShared()) //如果后一个节点为null,或为共享状态,则释放
                doReleaseShared();
        }
    }

如果当前只有一个头节点,则执行AQS中的doReleaseShared

 private void doReleaseShared() {
       
         //循环
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { //如果队链表不为null
                int ws = h.waitStatus;//获取节点的等待状态
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            //CAS操作失败,继续
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                //当前状态为0,且CAS失败,则继续
            }
            if (h == head)                   //如果只剩头节点,则跳出循环
                break;
        }
    }

总结await的执行过程如下:

京东一面之CountDownLatch用法及底层原理解析_https://bianchenghao6.com/blog__第10张

总结

CountDownLatch作为并发同步工具,其底层使用AQS框架实现的,一般用于对多个线程之间任务的交替执行或者任务之间具体关联性。用于控制多个线程的执行顺序。

喜欢本篇文章的小伙伴,可以点个关注哦!

今天的分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

上一篇

已是最后文章

下一篇

已是最新文章

发表回复