java并发

1. java线程状态转换

线程状态转换

新建(New)

创建后未启动

可运行(Runnable)

可能正在运行,也可能正在等待CPU时间片
包含了操作系统线程状态的RunningReady

阻塞(Blocked)

等待获取一个排他锁,如果其线程释放了锁就会结束此状态。

无限期等待(Waiting)

等待其他线程显示的唤醒,否则不会被分配CPU时间片。

进入方法 退出方法
没有设置Timeout参数的Object.wait()方法 Object.notify()/Object.notifyAll()
没有设置Timeout参数的Thread.join()方法 被调用的线程执行完毕
LockSupport.park()方法 LockSupport.unpark(Thread)

限期等待(Timed Waiting)

无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。

调用Thread.sleep()方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。

调用Object.wait()方法使线程进入限期等待或者无限期等待时,常常用“挂起一个线程”进行描述。

睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。

阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,通过调用Thread.sleep()Object.wait()等方法进入。

进入方法 退出方法
Thread.sleep()方法 时间结束
设置了Timeout参数的Object.wait()方法 时间结束/Object.notify()/Object.notifyAll()
设置了Timeout参数的Thread.join()方法 时间结束/被调用的线程执行完毕
LockSupport.parkNanos()方法 LockSupport.unpark(Thread)
LockSupport.parkUntil()方法 LockSupport.unpark(Thread)

死亡(Terminated)

可以是线程任务结束之后自己结束,或者产生了异常而结束。

2. 创建线程的三种方式

  • 实现Runnable接口
  • 实现Callable接口
  • 继承Thread

实现Runnable

需要实现run方法
通过Thread调用start()方法来启动线程。

public class MyRunnable implements Runnable {
    public void run(){
        //...
    }
}
public static void main(String[] args){
    MyRunnable instance = new MyRunnable();
    Thread thread = new Thread(instance);
    thread.start();
}

实现Callable接口

Runnable相比,Callable可以有返回值,返回值可以通过FutureTask进行封装。

public class MyCallable implements Callable<Integer> {
    //需要实现call函数
    public Integer call(){
        return 123;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    MyCallable mc = new MyCallable();
    FutureTask<Integer> ft = new FutureTask<>(mc);
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get());
}

ft.get()方法用于获取返回值,如果线程没有执行完,该方法会一直阻塞到线程执行完毕。另外可以调用ft.isDone()函数检验线程是否执行完毕,该方法不会被阻塞。

继承Thread类

同样也是需要实现run()方法,因为Thread类也实现了Runable接口。

当调用start()方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的run()方法。

public class MyThread extends Thread {
    public void run() {
        // ...
    }
}
public static void main(String[] args) {
    MyThread mt = new MyThread();
    mt.start();
}

实现接口 VS 继承 Thread

实现接口会更好一些,因为:

  • Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

工具类Executors可以实现Runnable对象和Callable对象之间的相互转换。(Executors.callable(Runnable task)Executors.callable(Runnable task,Object result))。

3. 线程池

线程池的顶级接口是Executor,管理多个异步任务的执行,而无需程序员显示的管理线程的生命周期。

线程池的顶级接口是Executor,线程池接口是ExecutorService

主要有以下几种类型的Executor:

  • CachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务
  • FixedThreadPool:创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程池到达最大大小,一旦达到最大值就保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
  • SingleThreadExecutor:单线程线程池,此线程池保证所有任务的执行顺序按照任务的提交顺序执行
  • ScheduledThreadPool:创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求

Executor接口:执行已经提交的Runnable任务的对象

ExecutorService接口:提供管理终止的方法,以及跟踪一个或多个异步任务执行状况而生成的Future方法

Executors类:定义了ExecutorExecutorService等的工厂和实用方法

public static void main(String[] args){
   ExecutorService executorService = Executors.newCachedThreadPool();
   for (int i = 0; i < 5; i++) {
        executorService.execute(new MyRunnable());
    }
    executorService.shutdown();
    
    ExecutorService es = Executors.newScheduledThreadPool(3);//大小无限,但仍然需要指定大小
    ((ScheduledExecutorService) es4).schedule(new MyRunnable(),3000, TimeUnit.MILLISECONDS);//延迟执行以及周期
    es4.shutdown();
}

如果创建线程池

《阿里巴巴Java开发手册》中强制线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的人更加明确线程池的运行规则,规避资源耗尽的风险。

Executors返回线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor:允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
  • CacheThreadPoolScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM

方法一:通过ThreadPoolExecutor构造方法实现

ThreadPoolExecutor构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数说明:

名称 类型 含义
corePoolSize int 核心线程池大小
maximunPoolSize int 最大线程池大小
keepAliveTime long 线程最大空闲时间
unit TimeUnit 时间单位
workQueue BlockingQueue 线程等待队列
threadFactory ThreadFactory 线程创建工厂
handler RejectedExecutionHandler 拒绝策略

当需要任务大于核心线程数时候,就开始把任务往存储任务的队列里,当存储队列满了的话,就开始增加线程池创建的线程数量,如果当线程数量也达到了最大,就开始执行拒绝策略,比如说记录日志,直接丢弃,或者丢弃最老的任务。

方法二 通过Executor框架的工具类Executors来实现(见上面例子)

4. Daemon守护线程

守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。

当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程

main()属于非守护线程

在线程启动前使用setDaemon方法将一个线程设置为守护线程。

public static void main(String[] args) {
    Thread thread = new Thread(new Runnable());
    thread.setDaemon(true);
}

5. sleep()方法

Thread.sleep(millisec)方法会休眠当前正在执行的线程,millisec单位为毫秒。

该方法可能会有两个异常:

  • IllegalArgumentException:如果参数为负数
  • InterruptedException:如果任何线程中断了当前线程,就会抛出此异常,此时线程的中断状态将被清除。
public void run() {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

6. yield()方法

对静态方法Thread.yield()的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

与 sleep() 方法的区别

  • sleep()方法给其他进程运行时不会考虑进程的优先级问题,因此会给低优先级的线程以运行的机会;yield()方法只会给相同优先级或者更高优先级的线程以运行的机会。
  • sleep()方法声明会抛出InterruptedException异常,而yield()方法没有声明任何异常。
  • sleep()方法比yield()方法(跟操作系统CPU调度相关)具有更好的可移植性。

7. 线程中断的InterruptedException,interrupted,interrupt区别

一个线程执行完毕后会自动结束,如果在运行过程中发生异常会提前结束。

interrupt()

该方法用于标记中断一个线程,从而使线程发生提前中断。

InterruptedException

通过调用一个线程的interrupt()方法来中断该线程,如果线程处于阻塞、无期限等待或有期限等待状态,就会抛出InterruptedException,从而提前结束该线程。但是不能中断I/O阻塞和synchronized锁阻塞。

interrupted()

该方法用于判断线程是否设置了中断标记,同时会清除中断标记。

如果一个线程的run()方法执行了一个无限循环,并且没有执行sleep()等会抛出InterruptedException的操作,那么调用线程的interrupt()方法就无法使线程提前结束。但是调用interrupt()方法会设置线程中的中断标记,此时调用interrupted()方法会返回true,因此可以在循环体中使用interrupted()方法来判断线程是否处于中断状态,从而提前结束线程。

线程池中的中断操作

调用Executorshutdown()方法会等待线程都执行完毕之后再关闭,但是如果调用的是shutdownNow()方法,则相当于调用了每个线程的interrupt()方法。

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
        try {
            Thread.sleep(2000);
            System.out.println("Thread run");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    executorService.shutdownNow();
    System.out.println("Main run");
}

如果只想中断Executor中的一个线程,可以通过使用 submit()方法来提交一个线程,它会返回一个Future<?>对象,通过调用该对象的cancel(true)方法就可以中断线程。

Future<?> future = executorService.submit(() -> {
    // ..
});
future.cancel(true);

8. (锁)Java提供了哪些机制控制多个线程对共享资源的互斥访问?

Java提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是JVM实现的synchronized,第二个是JDK实现的ReentrantLock

synchronized

说一说对Synchronized关键字的理解

每一个对象有一把锁,线程可以使用 synchronized 关键字来获取对象上的锁。synchronized 关键字可应用在方法级别(粗粒度锁)或者是代码块级别(细粒度锁)。synchronized关键字主要用于解决多个线程之间访问同步资源的同步性,synchronized关键字可以保证被它修饰的方法或者代码块在同一时刻只能有一个线程执行。

早期的synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,Java的线程都是映射到操作系统的原生线程上。如果要挂起或者唤醒一个线程,都需要操作系统帮忙完成,而操作系统实现线程之间的切换需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是早期synchronized效率低的原因。java6之后从JVM层面对synchronized较大优化,所以现在synchronized锁效率也优化的跟ReentrantLock差不多了,JDK1.6引入了偏向锁,轻量级锁,自旋锁,自适应自旋锁,锁消除,锁粗化等技术来减少锁操作的开销。

说一说自己怎么使用synchronized关键字

同步一个代码块

public void fun(){
    synchronized(this) {
        // ...
    }
}

同步一个方法

public synchronized void fun() {
    // ...
}

这两个方法都只能作用于同一个对象,如果调用两个对象上的同步代码块,就不会进行同步。如果是同时调用一个对象上的同步代码块或同步方法,两个线程就会进行同步,当一个进入语句块时,另一个必须等待。

同步一个类

public void func() {
    synchronized(SynchronizedExample.class) {
        // ...
    }
}

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句时,也会进行同步。

public class SynchronizedExample {
    public void func2() {
        synchronized (SynchronizedExample.class) {
            for(int i=0; i<10; i++){
                // ...
            }
        }
    }
}

同步一个静态方法

public synchronized static void fun() {
    // ...
}

作用于整个类。因为静态成员不属于任何一个实例对象,是类成员(static表明这是该类的一个静态资源,不管new了多少个对象,只有一份)。所以如果一个线程A调用一个实例对象的非静态synchronized方法,而线程B需要调用这个实例对象所属类的静态synchronized 方法,是允许的,不会发生互斥现象,因为访问静态synchronized方法占用的锁是当前类的锁,而访问非静态synchronized方法占用的锁是当前实例对象锁。

双重校验锁实现单例对象(线程安全)

public class Singleton {

    private volatile static Singleton uniqueInstance; //注意要使用volatile关键字修饰

    private Singleton() {
    }

    public static Singleton getUniqueInstance() {
       //先判断对象是否已经实例过,没有实例化过才进入加锁代码
        if (uniqueInstance == null) {
            //类对象加锁
            synchronized (Singleton.class) {
                if (uniqueInstance == null) {
                    uniqueInstance = new Singleton();
                }
            }
        }
        return uniqueInstance;
    }
}

在上面uniqueInstance = new Singleton()这段代码中,其实是分为三步执行:

  1. uniqueInstance分配内存空间
  2. 初始化uniqueInstance
  3. uniqueInstance指向分配的内存地址。

但是由于JVM具有指令重排的特性,执行顺序有可能是1->3->2。指令重排在单线程下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程T1执行了1和3,此时T2调用getUniqueInstance()后发现uniqueInstance不为空,因此返回uniqueInstance,但此时uniqueInstance还未被初始化。

使用volatile禁止JVM指令重排,保证多线程环境下也能正常运行。

讲一下synchronized关键字底层原理

synchronized关键字的原理属于JVM层面,通过 JDK 自带的javap命令查看SynchronizedDemo类的相关字节码信息:首先切换到类的对应目录执行javac SynchronizedDemo.java命令生成编译后的.class文件,然后执行javap -c -s -v -l SynchronizedDemo.class

同步语句块的情况

synchronized同步语句块的实现使用的是monitorentermonitorexit指令,其中monitorenter指令指向同步代码块开始位置,monitorexit指令则指明同步代码块的结束位置。当执行monitorenter指令时,线程试图获取锁也就是monitor的持有权(monitor对象存在于每个Java对象的对象头中,synchronized锁便是通过这种方式获得锁的,也是为什么java中任意对象可以作为锁的原因)。当计数器为0则表示成功获取,获取后将锁计数器设为1也就就是加1。相应的在执行monitorexit指令后,将锁计数器设为0,表明锁被释放。如果获取对象锁失败,那么当前线程就要阻塞等待,直到锁被另外一个线程释放为止。

synchronized修饰方法的情况

synchronized修饰方法并没有monitorentermonitorexit指令,取而代之的是ACC_SYNCHRONIZED标识,该标识指明了该方法是一个同步方法,JVM通过该ACC_SYNCHRONIZED访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。

Synchronized 是否一定线程安全

从上面的实现原理可以知道,synchronized 针对方法和同步语句块的加锁对象不一致,所以如果使用 synchronized 修饰的地方不同,就有可能引起并发访问一个变量这种线程不安全的情况。

ReentrantLock

ReentrantLockjava.util.concurrent(J.U.C)包中的锁。

public class LockExample {

    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        } finally {
            lock.unlock(); // 确保释放锁,从而避免发生死锁。
        }
    }
}

synchronized和ReentrantLock区别

  1. 两者都是可重入锁,“可重入锁”概念是:自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁时还可以获取,如果是不可重入锁的话,会造成死锁。同一个线程每次获取锁,锁的计数器都自增1,所以要等到锁的计数器下降为0才能释放锁。
  2. synchronized是JVM实现的,ReentrantLock是JDK实现的
  3. 新版本的synchronized进行了很多优化,例如自旋锁,synchronizedReentrantLock大致相同。
  4. 当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。 ReentrantLock可中断(通过lock.lockInterruptibly()来实现),而synchronized不行
  5. 公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。
    synchronized中的锁是非公平的,ReentrantLock默认情况下也是非公平的,但是也可以是公平的(通过ReentrantLock类的ReentrantLock(boolean fair)
  6. 一个ReentrantLock可以同时绑定多个Condition对象(绑定多个条件),从而可以有选择性的进行线程通知,在调度线程上更加灵活。在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知”。

使用选择:除非需要使用ReentrantLock的高级功能,否则优先使用synchronized。这是因为synchronized是 JVM 实现的一种锁机制,JVM 原生地支持它,而ReentrantLock不是所有的 JDK 版本都支持。并且使用synchronized不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

(ReentrantLock 实现公平锁和非公平锁见35题)

9. 线程之间协作方式/线程同步和调度的相关方法

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

join()方法

在一个线程中调用另一个线程的join()方法,会将当前线程挂起,等待另一个线程执行完毕之后再继续往下执行。

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();   //调用a方法,会先让a方法执行
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}

wait(),notify(),notifyAll()

wait()方法会将当前线程挂起,等待某个条件满足。当其他线程运行使得这个条件满足了,其他线程会调用notify()或者notifyAll()方法。

它们都属于Object的一部分,而不属于Thread

只能用在同步方法或者同步控制块中使用,获得当前对象的锁资源,否则会在运行时抛出IllegalMonitorStateException

使用wait()挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行notify()或者notifyAll()来唤醒挂起的线程,造成死锁。

public class WaitNotifyExample {

    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}

wait()sleep()的区别

  • wait()Object类下的方法,而sleep()Thread类下的方法
  • wait()会释放锁,而sleep()不会

await(),signal(),signalAll()

java.util.concurrent类库中提供了Condition类来实现线程之间的协调,可以在Condition上调用await()方法使线程等待,其它线程调用signal()signalAll()方法唤醒等待的线程。

相比于wait()这种等待方式,await()可以指定等待的条件,因此更加灵活。

使用Lock来获取一个Condition对象。

public class AwaitSignalExample {

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    AwaitSignalExample example = new AwaitSignalExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}

此外还有 JUC 下的 AQS 队列同步器(第 10 题)。

10. JUC下的AQS

AQS,即AbstractQueuedSynchronizer,队列同步器,是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLockSemaphore,其他的诸如ReentrantReadWriteLockSynchronizerQueueFutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。其主要有以下几个组件:

  • ReentrantLock
  • Condition
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • FutureTask

原理概念

AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过protected类型的getState,setState,compareAndSetState进行操作。

//返回同步状态的当前值
protected final int getState() {  
        return state;
}
 // 设置同步状态的值
protected final void setState(int newState) { 
        state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update(如果当前同步状态的值等于expect(期望值))
protected final boolean compareAndSetState(int expect, int update) {
    return STATE.compareAndSet(this, expect, update);
}

AQS对资源的两种共享方式

  1. Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  1. Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch

独占锁的实现

AQS 独占锁的实现

AQS自定义同步器

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

AQS的底层使用了模版方法模式,同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
  2. AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final,所以无法被其他类使用,只有这几个方法可以被其他类使用。

ReentrantLock中,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

CountDownLatch

用来控制一个或者多个线程等待多个线程。

CountDownLatch中,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,stateCAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

内部维护了一个计数器cnt,每次调用countDown()方法会让计数器的值减1,减到0的时候,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务,那些之前调用await()方法而在等待的线程就会被唤醒。

CountDownLatch

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchExample {
    public static void main(String[] args) {
        final int totalCount = 10;
        CountDownLatch countDownLatch = new CountDownLatch(totalCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalCount; i++) {
            executorService.execute(()->{
                System.out.println("run...");
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await(); //main线程会阻塞在这里
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end");
        executorService.shutdown();
    }
}
run..run..run..run..run..run..run..run..run..run..end

CountDownLatch的三种典型用法

  1. 某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch 的计数器初始化为n(new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减1(countdownlatch.countDown()),当计数器的值变为0时,在CountDownLatchawait()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  2. 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch对象,将其计数器初始化为 1(new CountDownLatch(1)) ,多个线程在开始执行任务前首先coundownlatch.await(),当主线程调用 countDown()时,计数器变为0,多个线程同时被唤醒。
  3. 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

CountDownLatch的不足

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

CyclicBarrier

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

CountdownLatch相似,都是通过维护计数器来实现的。线程执行await()方法之后计数器会减1,并进行等待,直到计数器为0,所有调用await()方法而在等待的线程才能继续执行。

CyclicBarrierCountdownLatch的一个区别是,CyclicBarrier的计数器通过调用reset()方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier有两个构造函数,其中parties指示计数器的初始值,barrierAction在所有线程都到达屏障的时候会执行一次。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

CyclicBarrier

public class CyclicBarrierExample {

    public static void main(String[] args) {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("before..");   //阻塞在这里等待totalThread变为0
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.print("after..");
            });
        }
        executorService.shutdown();
    }
}

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch是计数器,只能使用一次,而CyclicBarrier计数器提供了reset方法,可以多次使用。
  • CountDownLatch的设计思想强调的是一个(多个)线程等待其他线程执行完成后才能执行,而CyclicBarrier则是强调要多个线程都到达某一个条件,才能一起执行后续步骤。

Semaphore

Semaphore类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。

public class SemaphoreExample {

    public static void main(String[] args) {
        final int clientCount = 3;
        final int totalRequestCount = 10;
        Semaphore semaphore = new Semaphore(clientCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalRequestCount; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire(); //默认一次只用掉一个信号量,也可以参数传多个  
                    //检查是否有权限,没有就一直阻塞到有或者被中断
                    System.out.print(semaphore.availablePermits() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}

除了acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回false

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用acquire的顺序就是获取许可证的顺序,遵循FIFO
  • 非公平模式: 抢占式的。

Semaphore对应的两个构造方法如下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

FutureTask

Callable接口它可以有返回值,返回值通过Future进行封装。FutureTask实现了RunnableFuture接口,该接口继承自RunnableFuture接口,这使得FutureTask既可以当做一个任务执行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask适用于异步获取执行结果或取消执行任务的场景。当一个任务需要执行很久时,就可以使用FutureTask进行封装,主线程在完成自己的任务后再回来获取结果。

public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(10);
                    result += i;
                }
                return result;
            }
        });

        Thread computeThread = new Thread(futureTask);
        computeThread.start();

        Thread otherThread = new Thread(() -> {
            System.out.println("other task is running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        otherThread.start();
        System.out.println(futureTask.get());
    }
}

11. Java内存模型

Java 内存模型试图屏蔽各种硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

主内存和工作内存

处理器上的寄存器读写速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。

加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。
java内存模型

所有的变量都存储在主内存中,每个线程都有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。

线程只能操作自己工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。
工作内存和主内存

12. 内存间交互操作

Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作:
内存交互操作

  • read:把一个变量的值从主内存传输到工作内存中
  • load:在read之后执行,把read得到的值放入工作内存的变量副本中
  • use:把工作内存中一个变量的值传递给执行引擎
  • assign:把一个从执行引擎接收到的值赋给工作内存的变量
  • store:把工作内存的一个变量的值传送到主内存中
  • write:在store之后执行,把store得到的值放入主内存的变量中
  • lock:作用于主内存的变量
  • unlock

13. 内存模型的三大特性

原子性

Java内存模型保证了上面的交互操作都具有原子性。例如对一个int类型的变量执行assign赋值操作,这个操作就是原子性的。但是Java内存模型允许虚拟机将没有被volatile修饰的 64 位数据(longdouble)的读写操作划分为两次 32 位的操作来进行,即loadstorereadwrite 操作可以不具备原子性

注意,int等原子性的类型在多线程环境中虽然具有原子性,但是还是会出现线程安全问题。如下代码:最后输出结果有可能小于1000.

public class ThreadUnsafeExample {
    private int cnt = 0;

    public void add(){
        cnt++;
    }

    public int getCnt(){
        return this.cnt;
    }

    public static void main(String[] args) throws InterruptedException {
        final int threadSize = 1000;
        ThreadUnsafeExample threadUnsafeExample = new ThreadUnsafeExample();
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < threadSize; i++) {
            executorService.execute(()->{
                threadUnsafeExample.add();
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(threadUnsafeExample.getCnt());
    }
}

两个线程同时对cnt进行操作,load,assignstore等一系列操作整体上看不具有原子性,如下图所示,在T1线程修改cnt并且还没有将修改后的值写入主内存,T2依然可以读入旧值。可以看出,这两个线程虽然执行了两次自增操作,但是主内存中cnt的值最后为1而不是2。因此对int类型读写操作满足原子性只是说明load,assign,store这些单个操作具备原子性

多线程下单个操作原子性

可以使用AtomicInteger重写之前线程不安全的代码之后得到线程安全的实现:

//其他不变
private AtomicInteger cnt = new AtomicInteger();

public void add(){
    cnt.incrementAndGet();
}

public int getCnt(){
    return cnt.get();
}

也可以使用synchronized互斥锁来保证操作的原子性,它对应内存间交互操作为lockunlock,在虚拟机实现上对应的字节码指令为monitorentermonitorexit.

private int cnt = 0 ;
public synchronized void add(){
    cnt++;
}
public synchronized int getCnt(){
    return this.cnt;
}

可见性

可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。
主要有三种实现可见性的方式:

  • volatile关键字:当一个变量被volatile修饰后,表示着线程本地内存无效,当一个线程修改共享变量后他会立即被更新到主内存中,其他线程读取共享变量时,会直接从主内存中读取。
  • synchronized,对一个变量执行unlock操作之前,必须把变量值同步回主内存。
  • final:被final关键字修饰的字段在构造器中一旦初始化完成,并且没有发生this逃逸(其他线程通过this引用访问到初始化一半的对象),那么其他线程就能看见final字段的值。

    对上面的线程不安全实例的变量cnt使用volatile修饰并不能解决线程安全问题,因为volatile并不能保证操作的原子性,还是有可能发生两个线程获取到同一个主内存值的情况。

有序性

有序性指的是在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有的操作都是无序的,前半句是指“线程内表现为串行语义”,后半句是指“指令重排序”现象和“工作内存主主内存同步延迟”现象。无序是因为发生了指令重排序。在java内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
volatile关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

也可以通过synchronized来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。典型的应用是双重检查锁实现的单例模式。

volatile关键字的主要作用一个是保证变量的可见性,另一个是防止指令重排序

14. 先行发生原则

volatilesynchronized可以保证有序性,除此之外,JVM还规定了先行发生原则,让一个操作无需控制就能先于另一个操作完成。

单一线程原则

Single Thread rule

在一个线程内,在程序前面的操作先行发生于后面的操作。
单一线程原则

管程锁定规则

Monitor Lock Rule

一个unlock操作先行发生于后面对同一个锁的lock操作
管程锁定规则

volatile变量规则

Volatile Variable Rule

对一个volatile变量的写操作先行发生于后面对这个变量的读操作。
volatile变量规则

线程启动规则

Thread Start Rule

Thread对象的start()方法调用先行发生于此线程的每一个动作。
线程启动规则

线程加入规则

Thread Join Rule

Thread对象的结束先行于发生于join()方法的返回
线程加入规则

线程中断规则

Thread Interruption Rule

对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过interrupted()方法检测是否有发生中断。

对象终结规则

Finalizer Rule

一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

传递性

Transitivity

如果操作A先行发生于操作B,操作B先行发生于操作C,那么操作A先行发生于操作C。

15. 线程安全

线程安全是指多个线程不管以何种方式访问某个类,并且在主调代码不需要进行同步,都能表现正确的行为。
线程安全有以下几种实现方式:

不可变

不可变的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变的,来满足线程安全。

不可变的类型:

  • final关键字修饰的基本数据类型
  • String
  • 枚举类型
  • Number部分子类,如Long,Double等数值包装类型,BigInteger,BigDecimal等大数据类型。但同为Number的原子类AtomicIntegerAtomicLong则是可变的。

对于集合类型,可以使用Collections.unmodifiableXXX()方法来获取一个不可变的集合。

public class ImmutableExample {
    public static void main(String[] args) {
        Map<String,Integer> map = new HashMap<>();
        Map<String,Integer> unmodifiableMap = Collections.unmodifiableMap(map);
        unmodifiableMap.put("a",1);//会报错
    }
}

Collections.unmodifiableXXX()先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常:

public V put(K key, V value) {
    throw new UnsupportedOperationException();
}

互斥同步

synchronizedReentrantLock

非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

CAS

乐观锁需要操作和冲突检测这两个步骤具有原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操作最典型的是:比较并交换(Compare-and-Swap,CAS)。CAS指令需要3个操作数,分别是内存地址V,旧的预期值A和新值B。当执行操作时,只有当V的值等于A,才将V的值更新为B

AtomicInteger

详细使用见第20题

ABA

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那CAS操作就会误认为它从来没有被改变过。

J.U.C包提供了一个带有标记的原子引用类AtomicStampedReference来解决这个问题,它可以通过控制变量值的版本来保证CAS的正确性。大部分情况下ABA问题不会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类跟高效。

无同步方案

要保证线程安全,并不是一定要进行同步。如果一个方法本来就不涉及共享数据,那么它就无须任何同步措施去保证正确性。

栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的

public class StackClosedExample {
    public void add100() {
        int cnt = 0;
        for(int i = 0; i < 100; i++) {
            cnt++;
        }
        System.out.println(cnt);
    }
}
public static void main(String[] args) {
    StackClosedExample example = new StackClosedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> example.add100());  //100
    executorService.execute(() -> example.add100());  //100
    executorService.shutdown();
}

线程本地存储

如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,就可以把共享数据的可见范围限制在一个线程之内,这样,无需同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。

可以使用java.lang.ThreadLocal类来实现线程本地存储功能。

以下代码中,thread1设置了threadLocal为1,而thread2设置threadLocal为2,过了一段时间之后,thread1读取threadLocal依然是1,不受thread2的影响。

public class ThreadLocalExample {
    public static void main(String[] args) {
        ThreadLocal threadLocal = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal.set(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadLocal.get());   //输出1
            threadLocal.remove();
        });
        Thread thread2 = new Thread(() -> {
            threadLocal.set(2);
            threadLocal.remove();
        });
        thread1.start();
        thread2.start();
    }
}

关于ThreadLocal详细分析见后面19题

可重入代码(Reentrant Code)

这些代码也叫做纯代码(Pure Code),可以在代码执行的任何时刻中断它,转而去执行另外一段代码(包括递归调用它本身),而在控制权返回后,原来的程序不会出现任何错误。

可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源,用到的状态量都由参数中传入、不调用非可重入的方法等。

16. 锁优化

锁优化主要是指JVM对synchronized的优化。

自旋锁

互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景。

在 JDK 1.6 中引入了自适应的自旋锁。自适应意味着自旋的次数不再固定了,而是由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。

锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:

public static String concatString(String s1, String s2, String s3) {
    return s1 + s2 + s3;
}

String是一个不可变的类,编译器会对String的拼接自动优化。在JDK1.5之前,会转化为StringBuffer对象的连续append()操作。

现在使用StringBuilder,线程不安全,效率高

每个append()方法中都有一个同步块。虚拟机观察变量sb,很快就会发现它的动态作用域被限制在concatString()方法内部。也就是说,sb的所有引用永远不会逃逸到concatString()方法之外,其他线程无法访问到它,因此可以进行消除。

锁粗化

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。

上一节的示例代码中连续的append()方法就属于这类情况。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。对于上一节的示例代码就是扩展到第一个append()操作之前直至最后一个append()操作之后,这样只需要加锁一次就可以了。

轻量级锁

JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:无锁状态(unlocked)、偏向锁状态(biasble)、轻量级锁状态(lightweight locked)和重量级锁状态(inflated)。

以下是 HotSpot 虚拟机对象头的内存布局,这些数据被称为Mark Word。其中tag bits对应了五个状态,这些状态在右侧的state表格中给出。

HotSpot虚拟机对象头的内存布局

MarkWord

下图左侧是一个线程的虚拟机栈,其中有一部分称为Lock Record的区域,这是在轻量级锁运行过程创建的,用于存放锁对象的Mark Word。而右侧就是一个锁对象,包含了Mark Word和其它信息。

虚拟机栈

轻量级锁是相对于传统的重量级锁而言,它使用CAS操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用CAS操作进行同步,如果CAS失败了再改用互斥量进行同步。

当尝试获取一个锁对象时,如果锁对象标记为 0 01,说明锁对象的锁未锁定(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中创建 Lock Record,然后使用CAS操作将对象的Mark Word更新为Lock Record指针。如果CAS操作成功了,那么线程就获取了该对象上的锁,并且对象的Mark Word的锁标记变为 00,表示该对象处于轻量级锁状态。

如果CAS操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的虚拟机栈,如果是的话说明当前线程已经拥有了这个锁对象,那就可以直接进入同步块继续执行,否则说明这个锁对象已经被其他线程线程抢占了。如果有两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁。

偏向锁

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。

也就是说,偏向锁会偏向于第一个获得它的线程,如果在接下来的执行中,该锁没有被其他线程获取,那么持有偏向锁的线程就不需要进行同步。

当锁对象第一次被线程获得的时候,进入偏向状态,标记为 1 01。同时使用 CAS 操作将线程 ID 记录到 Mark Word 中,如果 CAS 操作成功,这个线程以后每次进入这个锁相关的同步块就不需要再进行任何同步操作。

当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态(不会立即膨胀为重量级锁)。

锁膨胀顺序:偏向锁->轻量级锁->自旋锁->重量级锁

17. 多线层开发的良好实践

  • 给线程起个有意义的名字,这样可以方便找 Bug。

  • 缩小同步范围,从而减少锁争用。例如对于 synchronized,应该尽量使用同步块而不是同步方法。

  • 多用同步工具少用 wait() 和 notify()。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 这些同步类简化了编码操作,而用 wait() 和 notify() 很难实现复杂控制流;其次,这些同步类是由最好的企业编写和维护,在后续的 JDK 中还会不断优化和完善。

  • 使用 BlockingQueue 实现生产者消费者问题。

  • 多用并发集合少用同步集合,例如应该使用 ConcurrentHashMap 而不是 Hashtable。

  • 使用本地变量和不可变类来保证线程安全。

  • 使用线程池而不是直接创建线程,这是因为创建线程代价很高,线程池可以有效地利用有限的线程来启动任务

18. synchronized关键字和volatile关键字的区别

  • volatile关键字是线程同步的轻量级实现,所以volatile性能比synchronized性能要好。但是volatile关键字只能用于变量而synchronized可以修饰方法和代码块。实际开发中synchronized比较常见。
  • 多线程使用volatile关键字不会发生阻塞,而synchronized可能会发生阻塞。
  • volatile关键字保证数据的可见性,但是不能保证数据的原子性。synchronized关键字两者都能保证。
  • volatile关键字主要用于解决变量在多个线程之间的可见性,而synchronized关键字解决的是多个线程之间访问资源的同步性。

19. ThreadLocal

ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

如果创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用get()set()方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

public class ThreadLocalExample1 {
    public static void main(String[] args) {
        ThreadLocal threadLocal1 = new ThreadLocal();
        ThreadLocal threadLocal2 = new ThreadLocal();
        Thread thread1 = new Thread(() -> {
            threadLocal1.set(1);
            threadLocal2.set(1);
        });
        Thread thread2 = new Thread(() -> {
            threadLocal1.set(2);
            threadLocal2.set(2);
        });
        thread1.start();
        thread2.start();
    }
}

以上代码对应的底层结构图为:
ThreadLocal结构图

每个Thread都有一个ThreadLocal.ThreadLocalMap对象

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
 //与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

可以把ThreadLocalMap理解为ThreadLocal类实现的定制化HashMap。默认情况下这两个变量都是null,只有当当前线程调用ThreadLocal类的setget方法时才创建它们,实际上调用这两个方法的时候,调用的是ThreadLocalMap类对应的get(),set()方法。

当调用一个ThreadLocalset(T value)方法时,先得到当前线程的ThreadLocalMap对象,然后将ThreadLocal->value键值对插入到该Map中。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        map.set(this, value);
    } else {
        createMap(t, value);
    }
}

get()方法类似:

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();
}

每个Thread中都具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocalkey的键值对。ThreadLocalMapkey就是ThreadLocal对象,value就是ThreadLocal对象调用set方法设置的值

ThreadLocal从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。

ThreadLocal 应用场景

  1. 比如线程中处理一个非常复杂的业务,可能方法有很多,那么,使用 ThreadLocal 可以代替一些参数的显式传递;
  2. 比如用来存储用户 Session。Session 的特性很适合 ThreadLocal ,因为 Session 之前当前会话周期内有效,会话结束便销毁。
  3. 在一些多线程的情况下,如果用线程同步的方式,当并发比较高的时候会影响性能,可以改为 ThreadLocal 的方式,例如高性能序列化框架 Kyro 就要用 ThreadLocal 来保证高性能和线程安全;
  4. 还有像线程内上线文管理器、数据库连接等可以用到 ThreadLocal;

在一些场景 (尤其是使用线程池) 下,由于ThreadLocal.ThreadLocalMap的底层数据结构导致ThreadLocal有内存泄漏的情况,应该尽可能在每次使用ThreadLocal后手动调用remove(),以避免出现ThreadLocal经典的内存泄漏甚至是造成自身业务混乱的风险。

ThreadLocal内存泄漏问题

ThreadLocalMap中使用的keyThreadLocal的弱引用,所以,如果ThreadLocal没有被外部强引用的情况下,在垃圾回收的时候key会被清理掉,而value不会被清理掉。这样依赖,ThreadLocalMap中就会出现keynullEntry。如果不采取任何措施的话,value永远无法被GC回收,这个时候可能会产生内存泄漏。ThreadLocalMap实现中考虑到了这种情况,在调用set(),get(),remove()方法的时候,会清理掉keynull的记录。在使用完ThreadLocal方法之后,最好手动使用remove()方法。原因:

  • 使用staticThreadLocal,延长了ThreadLocal生命周期,可能导致内存泄漏
  • 分配使用了ThreadLocal又不调用set(),get(),remove()方法,那么就会导致内存泄漏。

弱引用:如果一个对象只具有弱引用,那就类似于可有可无的生活用品。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象。

弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java虚拟机就会把这个弱引用加入到与之关联的引用队列中。

为什么使用弱引用?

分析两种情况:

  • key使用强引用:引用的ThreadLocal的对象被回收了,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
  • key使用弱引用:引用的ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用set,getremove的时候会被清除。

比较两种情况,可以发现:由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏,但是使用弱引用可以多一层保障:弱引用ThreadLocal不会内存泄漏,对应的value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。

因此,ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应keyvalue就会导致内存泄漏,而不是因为弱引用。

20. JUC下Atomic原子类

Atomic是指一个操作是不可中断的,即使是在多个线程一起执行任的时候,一个操作一旦开始,就不会被其他线程干扰。

并发包java.util.concurrent的原子类都存放在java.util.concurrent.atomic下,主要包括以下4类:

基本类型

  • AtomicInteger:整形原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean:布尔型原子类

数组类型

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整型数组原子类
  • AtomicReferenceArray:引用类型数组原子类

引用类型

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新引用类型里的字段原子类
  • AtomicMarkableReference:原子更新带有标记位的引用类型

对象的属性修改类型

  • AtomicIntegerFieldUpdater:原子更新整形字段的更新器
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新可能出现的ABA问题。

AtomicInteger的使用

JUC包里的整数原子类AtomicInteger的方法调用了Unsafe类中的CAS操作。

常用方法:

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

以下代码使用了AtomicInteger执行了自增的操作:

private AtomicInteger cnt = new AtomicInteger();
public void add(){
    cnt.incrementAndGet();
}

原理

下面是incrementAndGet源码,它调用了UnsafegetAndAddInt()方法

private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;

public final int incrementAndGet() {
    return U.getAndAddInt(this, VALUE, 1) + 1;
}

getAndAddInt方法的源码如下:

offset指示该字段相对对象内存地址的偏移,delta指示操作需要加的数值,这里为1。通过getIntVolatile方法得到旧的预期值,通过调用weakCompareAndSetInt来进行CAS比较,如果该字段内存地址中的值等于v,那么就更新内存地址o+offset的值为v+delta

在发生冲突时,其做法是不断的进行重试。

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

其中CAS比较的源码如下:

@HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset,
                                          int expected,
                                          int x) {
    return compareAndSetInt(o, offset, expected, x);
}

@HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt(Object o, long offset,
                                             int expected,
                                             int x);

综上,AtomicInteger主要利用 CAS+volatile+native 方法来保证原子操作,从而避免synchronized的高开销。

CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe类的objectFieldOffset()方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址,返回值是valueOffset。另外value是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。

数组类型原子类

AtomicIntegerArray为例

AtomicIntegerArray类常用方法

public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i,int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

常见方法使用

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayTest {
    public static void main(String[] args) {
        int temvalue = 0;
        int[] nums = {0,1,2,3,4,5,6,7,8,9};
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(nums);
        for (int i = 0; i < 10; i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        temvalue = atomicIntegerArray.getAndSet(0,2);
        System.out.println("temvalue: "+temvalue+";atomicIntegerArray: "+atomicIntegerArray);
        temvalue = atomicIntegerArray.getAndIncrement(0);
        System.out.println("temvalue: "+temvalue+";atomicIntegerArray:"+atomicIntegerArray);
        temvalue = atomicIntegerArray.getAndAdd(0,5);
        System.out.println("temvalue: "+temvalue+";atomicIntegerArray:"+atomicIntegerArray);
    }
}

引用类型原子类

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

AtomicReference使用示例

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
    public static void main(String[] args) {
        AtomicReference<Person> ar = new AtomicReference<>();
        Person person = new Person("xm",18);
        ar.set(person);
        Person updatePerson = new Person("xxm",16);
        ar.compareAndSet(person,updatePerson);
        System.out.println(ar.get().getName());
        System.out.println(ar.get().getAge());
    }
}

class Person {

    private String name;
    private int age;

    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

上述代码首先创建了一个Person对象,然后把Person对象设置进AtomicReference对象中,然后调用compareAndSet方法,该方法就是通过通过CAS操作设置ar。如果ar的值为person的话,则将其设置为updatePerson。实现原理与AtomicInteger类中的compareAndSet方法相同。

AtomicStampedReference类使用示例

import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicStampedReferenceDemo {

    public static void main(String[] args) {
    
        //实例化,获取当前引用值和版本戳的值
        final Integer initialRef = 0,initialStamp = 0;
        final AtomicStampedReference<Integer> asr = new AtomicStampedReference<>(initialRef,initialStamp);
        System.out.println("currentValue=" +asr.getReference() + ",currentStamp="+asr.getStamp());

        //compare and set
        final Integer newReference = 666,newStamp = 999;
        //比较当前引用和版本戳是否和预期相同,相同就替换新的值
        final boolean casResult = asr.compareAndSet(initialRef, newReference, initialStamp, newStamp);
        System.out.println("currentValue=" + asr.getReference()
                + ", currentStamp=" + asr.getStamp()
                + ", casResult=" + casResult);

        //获取当前的引用值和版本戳值
        int[] arr = new int[1];
        final Integer currentValue = asr.get(arr);
        final int currentStamp = arr[0];
        System.out.println("currentValue="+currentValue+",currentStamp="+currentStamp);

        //单独设置版本戳的值
        final boolean attemptStampResult = asr.attemptStamp(newStamp,88);//期望值以及希望修改的新值
        System.out.println("currentValue=" + asr.getReference()
                + ", currentStamp=" + asr.getStamp()
                + ", attemptStampResult=" + attemptStampResult);

        //重新设置当前值和版本戳值
        asr.set(initialRef,initialStamp);
        System.out.println("currentValue="+asr.getReference()+"currentStamp="+asr.getStamp());
    }
}

AtomicMarkableReference使用示例

import java.util.concurrent.atomic.AtomicMarkableReference;

public class AtomicMarkableReferenceDemo {

    public static void main(String[] args) {

        //实例化,设置当前值和标记值(布尔类型)
        final Boolean initialRef = null,initialMark = false;
        final AtomicMarkableReference<Boolean> amr = new AtomicMarkableReference<>(initialRef,initialMark);
        System.out.println("currentValue=" + amr.getReference() + ",currentMark=" + amr.isMarked());

        // compare and set
        final Boolean newReference1 = true, newMark1 = true;
        final boolean casResult = amr.compareAndSet(initialRef, newReference1, initialMark, newMark1);
        System.out.println("currentValue=" + amr.getReference()
                + ", currentMark=" + amr.isMarked()
                + ", casResult=" + casResult);

        // 获取当前的值和当前的 mark 值
        boolean[] arr = new boolean[1];
        final Boolean currentValue = amr.get(arr);
        final boolean currentMark = arr[0];
        System.out.println("currentValue=" + currentValue + ", currentMark=" + currentMark);

        // 单独设置 mark 值
        final boolean attemptMarkResult = amr.attemptMark(newReference1, false);
        System.out.println("currentValue=" + amr.getReference()
                + ", currentMark=" + amr.isMarked()
                + ", attemptMarkResult=" + attemptMarkResult);

        // 重新设置当前值和 mark 值
        amr.set(initialRef, initialMark);
        System.out.println("currentValue=" + amr.getReference() + ", currentMark=" + amr.isMarked());

    }
}

对象的属性修改类型原子类

要想原子地更新对象的属性需要两步。第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新的对象属性必须使用public volatile修饰符。

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterTest {
	public static void main(String[] args) {
		AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

		User user = new User("Java", 22);
		System.out.println(a.getAndIncrement(user));// 22
		System.out.println(a.get(user));// 23
	}
}

class User {
	private String name;
	public volatile int age;

	public User(String name, int age) {
		super();
		this.name = name;
		this.age = age;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

}

21. 简要描述线程与进程的关系,区别以及优缺点

java运行时内存区域

从上图可以看出,一个进程中可以有多个线程,多个线程共享进程的方法区资源(JDK1.8之后使用元空间),但是每个线程有自己的程序计数器,虚拟机栈和本地方法栈

堆和方法区是所有线程共享的资源,其中堆是进程中最大的一块内存,主要用于存放新创建的对象 (所有对象都在这里分配内存),方法区主要用于存放已被加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。

线程是进程划分成更小的运行单位。线程和进程最大的不同在于基本上各个进程都是独立的,而线程则不一定。同一进程中的不同线程极有可能相互影响。线程执行开销小,但不利于资源的管理和保护;进程则相反。

22. 并发与并行的区别

  • 并发:同一时间段,多个任务都在执行(单位时间内不一定同时执行)
  • 并行:单位时间内,多个任务同时执行

23. 说说sleep()方法和wait()方法的区别和共同点?

  • 两者最主要的区别在于:sleep方法没有释放锁,wait()方法释放了锁。
  • 两者都可用用于暂停线程的执行
  • wait()通常被用于线程间交互/通信sleep通常被用于暂行执行
  • wait()方法被调用后,线程不会自动苏醒,需要别的线程调用同一个对象上的notify()或者notifyAll()方法。sleep()方法执行完成后,线程会自动苏醒。或者可以使用wait(long timeout)超时后线程自动苏醒。

24. 调用start()方法时会执行run()方法,为什么不能直接调用run方法?

new一个Thread,线程进入了新建状态,然后调用start()方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。start()会执行线程的相应准备工作,然后自动去执行run()方法的内容,这时真正的多线程工作。而直接执行run()方法,会把run方法当成一个main()线程下的普通方法执行,并不会在某个线程中执行它,所以这并不是多线程工作。

也就是说,调用start()方法可启动线程并使线程进入就绪状态,而run方法只是thread的一个普通方法调用,还是在主线程里执行。

25. ConcurrentSkipListMap

ConcurrentSkipListMap是并发容器的一种,其运用了跳表的数据结构。

跳表

对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是O(logn),所以在并发数据结构中,JDK 使用跳表来实现一个Map

跳表的本质是同时维护了多个链表,并且链表是分层的,最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。

跳表结构

跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素18。

跳表遍历

查找18的时候原来需要遍历18次,现在只需要7次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。

从上面很容易看出,跳表是一种利用空间换时间的算法。

使用跳表实现Map和使用哈希算法实现Map的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK中实现这一数据结构的类是ConcurrentSkipListMap

26. 什么是乐观锁和悲观锁,使用场景分别是什么?

乐观锁:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。

悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronizedReentrantLock等独占锁就是悲观锁思想的实现。

两种锁的使用场景

从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适

27. 乐观锁常见的两种实现方式是什么?

乐观锁常见的两种实现方式是版本号机制CAS算法

版本号机制

一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。

CAS算法

compare and swap(比较与交换),是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。CAS算法涉及到三个操作数

  • 需要读写的内存值 V
  • 进行比较的值 A
  • 拟写入的新值 B

当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的值,否则不会执行任何操作(比较和替换是一个原子操作)。一般情况下是一个自旋操作,即不断的重试。

28. 乐观锁的缺点

乐观锁主要有以下几个问题:

  • ABA问题
  • 循环时间开销大
  • 只能保证一个共享变量的原子操作

ABA问题

如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 “ABA” 问题。

JDK 1.5 以后的AtomicStampedReference类就提供了检测ABA问题的能力,其中的compareAndSet方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值

循环时间开销

自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

只能保证一个共享变量的原子操作

CAS只对单个共享变量有效,当操作涉及跨多个共享变量时CAS无效。但是从JDK 1.5开始,提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。所以我们可以使用锁或者利用AtomicReference类把多个共享变量合并成一个共享变量来操作。

29. CAS和Synchronized的使用场景

简单的来说CAS适用于写比较少的情况下(多读场景,冲突一般较少),synchronized适用于写比较多的情况下(多写场景,冲突一般较多)

对于资源竞争较少(线程冲突较轻)的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源;而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。

对于资源竞争严重(线程冲突严重)的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized

补充: Java并发编程这个领域中synchronized关键字一直都是元老级的角色,很久之前很多人都会称它为 “重量级锁” 。但是,在JavaSE 1.6之后进行了主要包括为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁轻量级锁以及其它各种优化之后变得在某些情况下并不是那么重了。synchronized的底层实现主要依靠Lock-Free的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和CAS类似的性能;而线程冲突严重的情况下,性能远高于CAS。

30. 如何正确的停止一个线程?/为什么不推荐使用 stop 和 suspend 方法?

初始的Java版本中定义了一个 stop 方法来终止一个线程,还定义了一个 suspend 方法来阻塞一个线程,直到另一个线程调用 resume 方法。在1.2之后就被弃用了。

反对使用 stop(),是因为它不安全。它会解除由线程获取的所有锁定,而且如果对象处于一种不连贯状态,那么其他线程能在那种状态下检查和修改它们。结果很难检查出真正的问题所在。

suspend() 方法容易发生死锁。调用 suspend() 的时候,目标线程会停下来,但却仍然持有在这之前获得的锁定。此时,其他任何线程都不能访问锁定的资源,除非被”挂起”的线程恢复运行。对任何线程来说,如果它们想恢复目标线程,同时又试图使用任何一个锁定的资源,就会造成死锁。所以不应该使用 suspend()

要正确停止一个线程,应该使用 interrupt() 方法。在自己的 Thread 类中置入一个标志,指出线程应该活动还是挂起。若标志指出线程应该挂起,便用 wait() 命其进入等待状态。若标志指出线程应当恢复,则用一个 notify() 重新启动线程。interrupt() 其本身并不是一个强制打断线程的方法,其仅仅会修改线程的 interrupt 标志位,然后让线程自行去读标志位,自行判断是否需要中断。

interrupt()是一个“很软”的操作,也就是提醒线程应该结束了,至于如何结束,什么时候结束,以及是否需要结束,都是由线程自行处理。所以,interrupt()的使用,会让开发做更多的事,但这是有必要的,因为只有线程自己,才知道如何合适的结束自己。

31. 使用内部类实现线程设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。

package com.xm.thread;

public class MultiThread {
    private int j;

    public static void main(String[] args) {
        MultiThread multiThread = new MultiThread();
        Inc inc = multiThread.new Inc();
        Dec dec = multiThread.new Dec();

        for (int i = 0; i < 2; i++) {
            Thread thread = new Thread(inc);
            thread.start();
            thread = new Thread(dec);
            thread.start();
        }
    }


    class Inc implements Runnable{

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                j++;
                System.out.println(Thread.currentThread().getName() + "-inc:" +j);
            }
        }
    }

    class Dec implements Runnable{
        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                j--;
                System.out.println(Thread.currentThread().getName() + "-dec:" +j);
            }
        }
    }
}

32. Java多线程回调的理解?

所谓回调,是指客户程序C调用服务程序S中的某个方法A,然后S又在某个时候反过来调用C中的某个方法B,对于C来说,这个B便叫做回调方法。

实例:设置一个提问者(客户程序C),一个回答者(服务程序S),而回答者需要回答提问者一个很深奥的问题时,这时需要很多时间去查找,提问者又开始做其他的事情,等回答者找到答案后,再把答案告诉提问者。

回答者的类

package com.xm.thread;

public class Answer extends Thread {

    //回答1+1,很简单不需要线程
    public int answerAdd(int num1,int num2) {
        return num1 + num2;
    }

    //重写run方法
    @Override
    public void run() {
        //回答地球为什么是圆的
        askQuestion();
        super.run();
    }

    //设置回调接口,这个接口中有回调方法,在提问者中实现
    public interface CallPhone {
        void call(String question);
    }
    //回调接口的对象
    CallPhone callPhone;

    //回答地球为什么是圆的,比较费时间,使用sleep
    private void askQuestion() {
        System.out.println("开始查找资料...");
        try {
            //花了3天
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //把答案返回到回调接口的call方法中
        if(callPhone != null) {
            //提问者实例化callPhone对象,相当于提问者已经告诉回答者用什么方式回答答案
            //该接口方法的实现在提问者的类里面
            callPhone.call("回答者知道答案了!!!");
        }
    }
}

提问者的类

package com.xm.thread;

public class Asker {

    /**
     * java回调方法的使用:
     * 1. 在回答者的类内部创建回调的接口
     * 2. 在回答者的类内部创建回调接口的对象
     * 3. 在提问者类里面实例化接口对象,重写接口方法
     * 2-3 这个点很重要,回调对象的实例化,要在提问者的类内实例化,然后重写接口的方法
     * 相当于提问者先把一个联络方式给回答者,回答者找到答案后,通过固定的联络方式,来告诉提问者答案
     * 4. 调用开始新线程的start方法
     * 5. 原来的提问者还可以做自己的事
     */
    public static void main(String[] args) {

        //问1+1,线程同步
        Answer answer = new Answer();
        int add = answer.answerAdd(1, 1);//回答1+1答案

        //问地球为什么是圆的,回调方法的使用
        //相当于先定好一个返回答案的方式,再来执行实际操作

        //实例化回调接口的对象
        Answer.CallPhone callPhone = question -> {
            //回答者回答问题后才能输出答案
            System.out.println(question);
        };
        answer.callPhone = callPhone;
        System.out.println("交代完毕!");

        //相关交代完毕后再执行查询操作
        answer.start();

        System.out.println("提问者做自己的事");
    }
}

设计模式中模版方法模式也是使用了回调

33. 非公平锁和公平锁在 ReentrantLock 里的实现是怎么样的?

ReentrantLock UML图

ReentrantLock 属于 AQS 的一个子类,AQS 依赖于内部的 FIFO 同步队列(CLH)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个 Node 对象并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

ReentrantLock 内部有一个抽象类 Sync,继承了 AQS。而公平锁的实现就是 FairSync,非公平锁的实现就是 NodFairSync

两把锁的区别在于 lock 方法的实现。

公平锁 lock方法实现

public void lock() {
    sync.acquire(1);
}

调用的是 AQS 的 acquire方法,而 AQS 会回调子类的 tryAcquire方法。

公平锁的情况是通过在构造方法中指定使用公平锁,此时内部类的 sync 为子类 FairSync

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

FairSynctryAcquire 的方法实现如下:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取state变量,如果是0,说明锁可以获取
    int c = getState();
    if (c == 0) {
        //判断AQS队列中是否有等待的线程,如果没有,就是用CAS尝试获取。获取成功后,将CLH的持有线程改成当前线程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //重入锁逻辑
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)  //overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

上面判断AQS队列中是否有等待的线程便是公平的体现。

非公平锁的实现

ReentrantLock 默认采用的是非公平锁,除非在构造方法中传入参数 true

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //直接就是用CAS进行操作获取锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

ReentrantLock 中的公平锁和非公平锁的区别就在于:

调用 lock 方法获取锁的时候要不要判断 AQS 队列中是否有等待的线程,公平锁为了让每一个线程都均衡的使用锁,就需要判断,如果有,让给他,非公平锁很霸道,不让不让就不让。

但如果失败了,进入队列了,进会按照 AQS 的逻辑来,整体顺序就是公平的。

还有个注意的地方就是:ReentrantLock 的 tryLock(无超时机制) 方法使用的非公平策略。符合他的设计。
tryLock(long timeout, TimeUnit unit) 方法则会根据 Sync 的具体实现来调用。不会直接的调用 nonfairTryAcquire 方法。

34. LongAdder 和 AtomicLong 的区别?

  • AutomaticLong的底层是通过 CAS(compareAndSwap) 来实现线程的同步,是在一个死循环内不断的尝试修改目标的值,直到修改成功。如果在竞争不激烈的情况下,它修改成功的概率很高,否则的话修改失败的概率就会很高, 在大量修改失败的时候这些原子操作就会多次循环尝试, 因此性能就会受到影响。对于普通类型的 longdubble 变量 JVM 允许将64位的读或者写操作拆分成2个32位的读或者写操作(使用 volatile 修饰)

  • LongAdder 的内部实现思想是: 将热点数据分离,将 AutomaticLong 的内部核心数据 value 分割成一个数组,每个线程访问时,通过 Hash 等算法映射到其中一个数字进行计数,最终的计数结果为这个数组的求和累加,其中热点数据 value 它会被分割成多个单元的 cell,每个 cell 独立维护内部的值,当前对象的实际值由所有的 cell 累计合成,这样热点就进行了有效的分离,提高了并行度。即 LongAdder 是在 AtomicLong 的基础上将单点的更新压力分散到各个节点上。在低并发的时候通过对 base 的值直接更新,可以很好地保证和 AutomaticLong 性能基本一致,而在高并发的时候则通过分散提高了性能。

  • LongAdder 缺点:在统计的时候如果有并发更新,可能会导致统计的数据有些误差。在实际处理高并发中,我们根据实际业务场景优先考虑使用 LongAdder 而不是继续使用 AtomicLong,当然在线程竞争很低的情况下 AutomaticLong 才是最佳选择(序列号生成等要求准确且全局唯一)。

35. volatile底层实现

关键字 volatile 主要有两个作用:

  • 内存可见性:当一个变量被 volatile 修饰后,表示着线程本地内存无效,当一个线程修改共享变量后它会立即被更新到主内存中,其他线程读取共享变量时,会直接从主内存中读取(不能保证数据的原子性)。
  • 禁止指令重排序:通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

其实现主要涉及到两个 CPU 术语:

  • 内存屏障(memory barriers):一组处理器指令,用于实现对内存操作的顺序限制。
  • 缓存行(cache line):CPU高速缓存中可以分配的最小存储单位。处理器填写缓存行时会加载整个缓存行。

可见性实现原理

通过打印 volatile 汇编指令如下:

volatile底层汇编实现

加入 volatile 关键字和没有加入 volatile 关键字时所生成的汇编代码对比发现,加入 volatile 关键字时,会多出一个 lock 前缀指令。volatile 变量在字节码级别没有任何区别,在汇编级别使用了 lock 指令前缀。

lock 指令在多核处理器下会引发下面的事件:
将当前处理器的缓存行的数据写回到系统内存,同时使其他 CPU 里缓存了该内存地址的数据置为无效
为了提高处理速度,处理器一般不直接和内存通信,而是先将系统内存的数据读到内部缓存后再进行操作,但操作完成后并不知道处理器何时将缓存数据写回到内存。但如果对加了 volatile 修饰的变量进行写操作,JVM 就会向处理器发送一条 lock 前缀的指令,将这个变量在缓存行的数据写回到系统内存。这时只是写回到系统内存,但其他处理器的缓存行中的数据还是旧的,要使其他处理器缓存行的数据也是新写回的系统内存的数据,就需要实现缓存一致性协议。即在一个处理器将自己缓存行的数据写回到系统内存后,其他的每个处理器就会通过嗅探在总线上传播的数据来检查自己缓存的数据是否已过期,当处理器发现自己缓存行对应的内存地址的数据被修改后,就会将自己缓存行缓存的数据设置为无效,当处理器要对这个数据进行修改操作的时候,会重新从系统内存中把数据读取到自己的缓存行,重新缓存。

总结:volatile 可见性的实现就是借助了 CPU 的 lock 指令,通过在写 volatile 的机器指令前加上 lock 前缀,使写 volatile 具有以下两个原则:

  • 写 volatile 时处理器会将缓存写回到主内存。
  • 一个处理器的缓存写回到内存会导致其他处理器的缓存失效。

防止指令重排实现原理

lock 指令后就是一个原子操作。原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (上下文切换,切换到另一个线程)。

当使用 LOCK 指令前缀时,它会使 CPU 宣告一个 LOCK# 信号,这样就能确保在多处理器系统或多线程竞争的环境下互斥地使用这个内存地址。当指令执行完毕,这个锁定动作也就会消失。

lock 前缀指令其实就相当于一个内存屏障。内存屏障是一组CPU处理指令,用来实现对内存操作的顺序限制。volatile的底层就是通过内存屏障来实现的。

硬件层的内存屏障分为两种:Load BarrierStore Barrier,即读屏障和写屏障。

内存屏障有两个作用:

  • 阻止屏障两侧的指令重排序;
  • 强制把写缓冲区/高速缓存中的脏数据等写回主内存,让缓存中相应的数据失效。

对于 Load Barrier 来说,在指令前插入 Load Barrier,可以让高速缓存中的数据失效,强制从新从主内存加载数据;

对于 Store Barrier 来说,在指令后插入 Store Barrier,能让写入缓存中的最新数据更新写入主内存,让其他线程可见。

java 的内存屏障通常所谓的四种即 LoadLoad,StoreStore,LoadStore,StoreLoad,实际上也是上述两种的组合,完成一系列的屏障和数据同步功能:

  • LoadLoad 屏障:对于这样的语句 Load1; LoadLoad; Load2,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕。
  • StoreStore 屏障:对于这样的语句 Store1; StoreStore; Store2,在 Store2 及后续写入操作执行前,保证 Store1 的写入操作对其它处理器可见。
  • LoadStore 屏障:对于这样的语句 Load1; LoadStore; Store2,在 Store2 及后续写入操作被刷出前,保证 Load1 要读取的数据被读取完毕。
  • StoreLoad 屏障:对于这样的语句 Store1; StoreLoad; Load2,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能

编译器和执行器可以在保证输出结果一样的情况下对指令重排序,使性能得到优化。插入一个内存屏障,相当于告诉 CPU 和编译器先于这个命令的必须先执行,后于这个命令的必须后执行。

JMM 为 volatile 加内存屏障有以下4种情况:

  • 在每个 volatile 写操作的前面插入一个 StoreStore 屏障,防止写 volatile 与后面的写操作重排序。
  • 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障,防止写 volatile 与后面的读操作重排序。
  • 在每个 volatile 读操作的前面插入一个 LoadLoad 屏障,防止读 volatile 与后面的读操作重排序。
  • 在每个 volatile 读操作的后面插入一个 LoadStore 屏障,防止读 volatile 与后面的写操作重排序。

内存屏障另一个作用是强制更新一次不同 CPU 的缓存。例如,一个写屏障会把这个屏障前写入的数据刷新到缓存,这样任何试图读取该数据的线程将得到最新值,而不用考虑到底是被哪个CPU核心或者哪个CPU执行的。这正是 volatile 实现内存可见性的基础。

volatile 修饰数组和对象

volatile 修饰对象或数组时,只能保证他们的引用地址的可见性,而不能保证其内部元素具有可见性。

但是如果对于同一个数组,用 volatile 修饰后,在一个线程改编数组中的值,在另一个线程中还是可以马上读取到更新的新值,原因是:

当另一个线程读取 array 时,因为 array 的引用被 volatile 修饰,所以线程对所有变量都会从主内存去获取,当然也就包括数组的内部值(例如 array[0])。 所以会让人产生误解,以为是volatile修饰的数组保证了其数组的可见性,其实不然。

36. 为什么要使用多线程

先从总体上来说:

  • 从计算机底层来说: 线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程。另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销。
  • 从当代互联网发展趋势来说: 现在的系统动不动就要求百万级甚至千万级的并发量,而多线程并发编程正是开发高并发系统的基础,利用好多线程机制可以大大提高系统整体的并发能力以及性能。

再深入到计算机底层来探讨:

  • 单核时代: 在单核时代多线程主要是为了提高 CPU 和 IO 设备的综合利用率。举个例子:当只有一个线程的时候会导致 CPU 计算时,IO 设备空闲;进行 IO 操作时,CPU 空闲。我们可以简单地说这两者的利用率目前都是 50%左右。但是当有两个线程的时候就不一样了,当一个线程执行 CPU 计算时,另外一个线程可以进行 IO 操作,这样两个的利用率就可以在理想情况下达到 100%了。
  • 多核时代: 多核时代多线程主要是为了提高 CPU 利用率。举个例子:假如我们要计算一个复杂的任务,我们只用一个线程的话,CPU 只会一个 CPU 核心被利用到,而创建多个线程就可以让多个 CPU 核心被利用到,这样就提高了 CPU 的利用率。

37. BlockingQueue

java.util.concurrent.BlockingQueue接口有以下阻塞队列的实现:

  • FIFO队列 :LinkedBlockingQueueArrayBlockingQueue(固定长度)
  • 优先级队列 :PriorityBlockingQueue

提供了阻塞的take()put()方法:如果队列为空take()将阻塞,直到队列中有内容;如果队列为满put()将阻塞,直到队列有空闲位置。

使用BlockingQueue实现生产者,消费者案例

public class ProducerConsumer {
    private static BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);

    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                blockingQueue.put("product");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("product...");
        }
    }

    private static class Consumer extends Thread {
        @Override
        public void run() {
            try {
                String take = blockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("consume...");
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            Producer producer  = new Producer();
            producer.start();
        }

        for (int i = 0; i < 5; i++) {
            Consumer consumer = new Consumer();
            consumer.start();
        }

        for (int i = 0; i < 3; i++) {
            Producer producer = new Producer();
            producer.start();
        }
    }
}

ArrayBlockingQueue

ArrayBlockingQueueBlockingQueue接口的有界队列实现类,底层采用数组来实现。ArrayBlockingQueue一旦创建,容量不能改变。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。

ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,当ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的ArrayBlockingQueue,可采用如下代码:

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);

LinkedBlockingQueue

LinkedBlockingQueue底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE

有界无界是根据容量是否为Interger.MAX_VALUE来判别。

相关构造方法

//某种意义上的无界队列
public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}

//指定初始容量
public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

//将集合加入队列
public LinkedBlockingDeque(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    addAll(c);
}

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。

PriorityBlockingQueue并发控制采用的是ReentrantLock,队列为无界队列(ArrayBlockingQueue是有界队列,LinkedBlockingQueue也可以通过在构造函数中传入capacity指定队列最大的容量,但是PriorityBlockingQueue只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是PriorityQueue的线程安全版本。不可以插入null值,同时,插入队列的对象必须是可比较大小的(comparable),否则报ClassCastException异常。它的插入操作put方法不会block,因为它是无界队列(take方法在队列为空的时候会阻塞)。

ConcurrentLinkedQueue

Java提供的线程安全的Queue可以分为阻塞队列非阻塞队列,其中阻塞队列的典型例子就是BlockingQueue,而非阻塞队列的典型例子就是ConcurrentLinkedQueue,在实际应用中要根据需要选用阻塞队列或者非阻塞队列。阻塞队列可以通过加锁来实现,非阻塞队列可以通过CAS操作实现

从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构,通过使用CAS非阻塞算法来实现线程安全。ConcurrentLinkedQueue应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的ConcurrentLinkedQueue来替代。

38. Fork/Join

主要用于并行计算中,就是把大的计算任务拆分成多个小任务并行计算。

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5; //设置分割门槛
    private int start;
    private int end;

    public ForkJoinExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 重写的方法,在任务执行后将根据这里进行判断是否继续划分更小的任务
     * @return
     */
    @Override
    protected Integer compute() {
        int result = 0;
        if (end - start <= threshold) {
            //任务足够小则计算
            for(int i=start;i<end;i++){
                result += i;
            }
        }else {
            //分割为更小的任务
            int middle = (start+end)/2;
            ForkJoinExample leftTask = new ForkJoinExample(start,middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, end);
            //分割执行
            leftTask.fork();
            rightTask.fork();
            //合并结果
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinExample forkJoinExample = new ForkJoinExample(1,10000);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future result  = forkJoinPool.submit(forkJoinExample);
        System.out.println(result.get());
    }
}

ForkJoin使用ForkJoinPool来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool实现了工作窃取算法来提高CPU的利用率。每个线程都维护了一个双端链表,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2Thread1的队列中拿出最晚的Task1任务,Thread1会拿出Task2来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。
工作窃取算法

参考内容

主要参考以来两篇博客以及相关博客推荐,因找的博客比较多,没注意记录,最后好多忘了在哪2333,如果有侵权,请及时联系我,非常抱歉。
https://github.com/Snailclimb/JavaGuide
https://github.com/CyC2018/CS-Notes
如何正确的停止一个线程?
volatile修饰数组或引用对象的问题
单线程下StringBuffer与StringBuilder有区别吗
图解AQS系列(上)–独占锁
ThreadLocal 原理和使用场景分析
用synchronized就一定线程安全吗?