万字长文带你吃透这些线程池,面试再也不怕被问了!

原创
2021/08/19 06:59
阅读数 264

池化技术

小伙伴们好久不见呀~  😝

嘿嘿 迎来了自己的第一篇 万字长文!😝 (中间除了看看金色的雨外,还在做其他笔记~ 所以就拖到现在了 ,,ԾㅂԾ,,

这篇长文除了详细介绍线程池这个点以及它的使用场景外,还分享了下几种连接池滴用法以及避开一些坑🕳,(图还挺多的~) 希望对你有所帮助!!冲冲冲!😋


img

池化技术~,不知道小伙伴们对这个词是怎么理解的?

为什么要有这个技术呢?解决什么 痛点 呢?哈哈哈

带着小小的思考和 4ye 一起往下看看叭~

img

池化技术,故名思意,就是 把资源池子化

这么做有啥好处呢?

这得从我们的程序说起啦~  嘿嘿,又回到我们之前提到的那些底层知识了,程序运行起来就会占用一定的系统资源,比如 CPU内存IO 等,而为了优化对这些系统资源的使用,就有了这个池化技术啦~

常见的池化技术

比如,线程池连接池内存池对象池常量池 等等

大概有下面这些叭😝

池化技术

线程池

相信小伙伴们看完上一篇 《深入 JVM 源码之探索线程 Thread start 的奥秘》后,对这个线程的创建和启动有了粗略的一个印象~

比如

线程的创建

  • 最后是调用到了 linuxpthread_create 方法

  • 用户线程和内核线程是1比1的关系

线程的开启

  • 如果是 Java 线程,会先更改用户线程的状态为 Runnable

  • 最后在开启内核线程的时候,也会更改内核线程的状态为 RUNNABLE ,然后再调用  pd_start_thread(thread);

还有每个线程大概占用 1M 的内存,咳咳,总之就是很麻烦~

img

哪里麻烦呢?

可以看到创建线程的步骤是很繁琐的,每次创建都得去创建一个内核线程,操作系统都烦死了😄

要是我是操作系统,我肯定和你说,你就不能先存着吗,等真的不用了再给我销毁,老让我创建销毁,创建销毁,你搁这 nao tai tao 呢?

img

小小总结下线程池的优点:

通过创建一定数量的线程,充分利用系统资源,来减少程序运行过程中频繁创建和销毁线程所带来的开销,进而提高程序的响应速度

那么我们直接来看下这个Java中的线程池叭!😝

创建方式

这里的创建方式主要有两种

  1. 通过 Executors

  2. 通过 ThreadPoolExecutor

这里我们先重点介绍这个 ThreadPoolExecutor  ,这个是非常重要的, 在阿里巴巴的编程规范(《Java开发手册(嵩山版)》)中有提到 如下图:

image-20210518074447721

冲冲冲!😝

ThreadPoolExecutor 详解

参数介绍

image-20210518212342158

如果所示, ThreadPoolExecutor 一共有七个参数,分别是

  1. 核心线程数int corePoolSize

  2. 最大线程数int maximumPoolSize

  3. 非核心线程数最大存活时间long keepAliveTime

  4. 存活时间单位TimeUnit unit

  5. 阻塞队列BlockingQueue<Runnable> workQueue

  6. 线程工厂ThreadFactory threadFactory

  7. 拒绝策略RejectedExecutionHandler handler

核心参数图

image-20210531235742078

下面再展开介绍下 阻塞队列拒绝策略😄

阻塞队列

大概有下面这些,下面简单介绍先啦😝

(后面写 [[集合]] 这类文章再具体介绍下他们啦,有兴趣的小伙伴小小支持下呀👍)

image-20210518222745021

  • ArrayBlockingQueue由数组组成的有界阻塞队列,FIFO

  • DelayQueue一个延时无界阻塞队列,内部包含了 PriorityQueue (一个由优先级堆构成的无界队列)

  • DelayedWorkQueue这个属于 ThreadPoolExecutor 的内部类,专门来储存RunnableScheduledFuture 任务的,也是一个基于堆的延时队列

  • LinkedBlockingDeque由链表结构组成的双向阻塞队列,可选有界

  • LinkedBlockingQueue由链表组成的阻塞队列,可选有界

  • LinkedTransferQueue由链表结构组成的无界阻塞队列 TransferQueue 组成

  • PriorityBlockingQueue由堆构成的无界优先级队列 PriorityQueue 组成的无界阻塞队列,支持排序

  • SynchronousQueue一个大小为0的特殊集合(可以是队列,也可以是栈),插入数据和获取数据要同时存在,才能成功获取或添加数据,公平模式下是队列:由TransferQueue 组成,FIFO;非公平模式下是栈:由 TransferStack 组成,LIFO

拒绝策略

如图,这里提供了四种给我们选择😝

image-20210519072037902

拒绝策略图

拒绝策略

AbortPolicy

默认的拒绝策略

image-20210519072543758

作用:拒绝并抛出异常

源码如下

public static class AbortPolicy implements RejectedExecutionHandler {
       
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

CallerRunsPolicy

作用:除非线程池已经关闭,不然直接调用 run 方法来执行任务

源码如下

public static class CallerRunsPolicy implements RejectedExecutionHandler {
   
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

DiscardPolicy

作用:直接丢弃,什么也不做

源码如下

public static class DiscardPolicy implements RejectedExecutionHandler {
   
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

作用:线程池没关闭的条件下,移除队列中头部的任务,然后再通过 execute 方法来执行任务

源码如下

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

队列小知识补充

这个 poll 是队列的一个删除方法,作用是删除队列的头部,如果删除失败就返回 null

队列 Queue 所有方法如下图

image-20210519074333926

阻塞队列 BlockingQueue 增删查方法

多了 阻塞timeout 这两种类型的方法

image-20210523153012591

image-20210519080015289

execute 执行原理

原理图

image-20210519215911214

源码如下

源码中有非常棒的代码注释,如下

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(nullfalse);
    }
    else if (!addWorker(command, false))
        reject(command);
}

submit 方法

这个是线程池执行任务的另外一种方式,通常是用来执行有返回值的任务

源码如下:

submit 方法在源码中被重载了三次,这里只举其中一个例子。

可以看到实际上还是调用到了 execute  方法

image-20210519221801248

对于如何创建带返回结果的任务(阻塞和异步两种模式),可以参考下博主之前的这篇文章《 面试官:线程有几种创建方式?》,里面提到 CallableFutureTask (阻塞式)和 CompletableFuture (异步式)

shutdown 和 shutdownNow

说完执行,咱们再来说说这个关闭啦~ 😝

线程池的关闭有两种方式,一个是 shutdown , 另一个是 shutdownNow

区别

  • shutdown 会将线程池的状态更改为 SHUTDOWN ,同时将队列中的任务执行完,再关闭线程池

  • shutdownNow 则会将线程池的状态更改为 STOP ,并终止正在运行的任务,同时将队列中的任务导出到一个列表上,再关闭线程池

共同点

  • 调用关闭线程池的方法后,再提交任务,会触发拒绝策略

shutdown 源码如下

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

这里通过 onShutdown 去清楚 ScheduledThreadPoolExecutor 中的延迟任务

shutdownNow 源码如下

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

可以发现和 shutdown 方法的主要不同就是多了这步骤: tasks = drainQueue(); ,去导出队列的任务

img


接下来我们来看看这个 Executors 啦😄

Executors 创建线程池的六种方式

在了解了 ThreadPoolExecutor 后,我们来看这几种方式就简单太多啦,因为他们的核心都是封装了这个  ThreadPoolExecutor

image-20210518073201224

概览图

image-20210601001447926

newFixedThreadPool

创建一个线程数量固定的线程池

源码如下

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

经历了上面 ThreadPoolExecutor 的洗礼,再来看这个源码,是不是一下子就知道的弊端啦~ 嘿嘿

解析

作用:创建一个固定线程数的线程池,

  • 核心线程和最大线程数都是n

  • 非核心线程的最大存活时间为0

  • 使用无界阻塞队列

  • 使用默认的拒绝策略

正如手册中提到的,由于使用到这个无界队列,没有限制这个任务队列长度,可能会堆积太多任务,从而导致 OOM

newSingleThreadExecutor

创建一个单线程的线程池

源码如下

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(11,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

解析

作用:创建一个单线程的线程池,

  • 核心线程和最大线程数都是1

  • 非核心线程的最大存活时间为0

  • 使用无界阻塞队列

  • 使用默认的拒绝策略

正如手册中提到的,由于没有限制这个任务队列,可能会堆积太多任务,从而导致 OOM

newCachedThreadPool

创建一个线程存活时间为60s的可缓存线程池

源码如下

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

解析

作用:创建一个可缓存的线程池,

  • 核心线程数 0

  • 最大线程数 Integer.MAX_VALUE

  • 非核心线程的最大存活时间为 60s

  • 使用无缓存同步队列 SynchronousQueue

  • 使用默认的拒绝策略

正如手册中提到的,由于没有限制这个线程数量(尽管会复用线程,但难免有极端条件~),可能会导致创建过多线程,从而导致 OOM

为什么使用这个 SynchronousQueue  队列呢?

主要是因为它是一个大小为 0 的队列/栈,根据我们上面分析的 execute 执行原理 ,可以知道:

当队列长度为 0 时,相当于直接来到最大线程数的判断,这里会去判断这个当前线程数是否小于最大线程数,是的话会去创建这个非核心线程来执行任务!

为什么我这脑瓜子会想到这个问题呢,当然是因为搜资料时看到有人在提问 哈哈 如果 4ye 的这个答案不正确的话,还望不吝赐教!!

newScheduledThreadPool

创建一个可定时调度的线程池,比如延迟执行或者周期性执行任务

源码如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

image-20210522161537875

可以看到这里 ScheduledThreadPoolExecutor 去继承了 ThreadPoolExecutor

通过 super 调用父类构造器,所以还是我们熟悉的那几个参数~

解析

作用:创建一个可定时调度的线程池

  • 核心线程数: 指定数量

  • 最大线程数 Integer.MAX_VALUE

  • 非核心线程的最大存活时间为 0

  • 使用延迟队列 DelayedWorkQueue

  • 使用默认的拒绝策略

方法

主要是这里的方法和上面几个不一样,单独介绍下~👇

@Test
public void newScheduledThreadPoolTest() throws InterruptedException {
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
    executorService.schedule(run1, 0, TimeUnit.SECONDS);
    executorService.scheduleAtFixedRate(run2, 12, TimeUnit.SECONDS);
    Thread.sleep(1000 * 20);
    executorService.shutdown();
    executorService.awaitTermination(5, TimeUnit.SECONDS);
}

这里newScheduledThreadPool 返回的是 ScheduledExecutorService ,而非ExecutorService

schedule 方法只执行一次 。

scheduleAtFixedRate 以固定周期运行 。

newSingleThreadScheduledExecutor

创建一个单线程的可定时调度的线程池,比如延迟执行或者周期性执行任务

源码如下

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

解析

作用:创建一个可定时调度的线程池

  • 核心线程数: 1

  • 最大线程数 Integer.MAX_VALUE

  • 非核心线程的最大存活时间为 0

  • 使用延迟队列 DelayedWorkQueue

  • 使用默认的拒绝策略

newWorkStealingPool

这是最特殊的一个,它可以创建一个抢占式的线程池,默认线程数量为可用的逻辑处理器数量,也可手动指定

源码如下

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         nulltrue);
}
public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}


private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

参数介绍

  • 线程数: int parallelism

  • 线程工厂: ForkJoinWorkerThreadFactory factory

  • 未捕获到异常时的处理器:  UncaughtExceptionHandler handler

  • 异步模式: int mode

  • 工作线程名称前缀: workerNamePrefix

可以看到内部是封装了 ForkJoinPool 线程池 ,那么简单介绍下这个来自 jdk1.7 的大家伙叭😝

ForkJoinPool

正如它的名字一样,它的作用就是 fork  和 join , 采用了分治思想

不断递归,将大任务拆分成很多小任务,然后将他们结合起来,最后就得到这个结果啦,类似于 MapReduce

fork/join图

image-20210527215618945

再来看一眼它的类图,可以发现它和 ThreadPoolExecutor 是兄dei~

那么很明显,Fork/Join 是它区别于这个 ThreadPoolExecutor 的最大特点,它可以拆分任务,

那么拆分任务后还有哪些小秘密呢?嘿嘿 这里就不卖关子啦,让我们来看看这个 工作窃取 叭😝

类图如下:

image-20210524075329831

work-stealing

工作窃取

正如它的名称一样,它会去偷走其他线程的工作(居然有这等好事!😝 )哈哈哈

当他自己的任务队列空了的时候,他就会去看看别的线程的任务队列是否为空,不为空的话,就从该线程的任务队列中偷走任务,然后去执行。

那怎么偷呢?

先来瞄一眼它的源码叭~😝  卖个关子先 嘿嘿~

偷窃源码如下:

👇

/**
 * Steals and runs a task in the same CC computation as the
 * given task if one exists and can be taken without
 * contention. Otherwise returns a checksum/control value for
 * use by method helpComplete.
 *
 * @return 1 if successful, 2 if retryable (lost to another
 * stealer), -1 if non-empty but no matching task found, else
 * the base index, forced negative.
 */
final int pollAndExecCC(CountedCompleter<?> task) {
    int b, h; ForkJoinTask<?>[] a; Object o;
    if ((b = base) - top >= 0 || (a = array) == null)
        h = b | Integer.MIN_VALUE;  // to sense movement on re-poll
    else {
        long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
        if ((o = U.getObjectVolatile(a, j)) == null)
            h = 2;                  // retryable
        else if (!(o instanceof CountedCompleter))
            h = -1;                 // unmatchable
        else {
            CountedCompleter<?> t = (CountedCompleter<?>)o;
            for (CountedCompleter<?> r = t;;) {
                if (r == task) {
                    if (base == b &&
                        U.compareAndSwapObject(a, j, t, null)) {
                        base = b + 1;
                        t.doExec();
                        h = 1;      // success
                    }
                    else
                        h = 2;      // lost CAS
                    break;
                }
                else if ((r = r.completer) == null) {
                    h = -1;         // unmatched
                    break;
                }
            }
        }
    }
    return h;
}

从源码中我们可以发现这里又涉及到 ForkJoin 的另一个要点,工作任务 ForkJoinTask

其中CountedCompleter 是该任务的实现类 。

主要通过执行代码中的 t.doExec(); 去执行任务的🐷

可以看到这个 CountedCompleter 的身影出现这个 JUCJUS ,不过除了 ConcurrentHashMap 有点眼熟外, stream 包下的几乎不认识,平时用到的 Stream 却没用到它 🐷 (告辞~) 有兴趣的小伙伴们可以去研究下 哈哈~

image-20210527234238311

WorkQueue 源码 (重点)

重点来啦!!

其实上面这段 偷窃源码 是位于 WorkQueue 工作队列中的!

源码图

image-20210529093013490

这里主要介绍上面两个框框的内容,嘿嘿 (之前重新刷的数据结构知识又派上用场了😝)

很容易看出它使用数组去实现了这个双端队列的,basetop 是它的两个指针。

ForkJoinTask 数组则是存放了我们的工作任务。

任务队列图

image-20210529094852743

这时候,细心的小伙伴们应该已经嗅到问题的答案了!

img

在上面的偷窃源码中有这么一段逻辑 , base+1

而且在 WorkQueuebase 使用 volatile 修饰!!,这也保证了它的可见性,使得其他线程来偷的时候能获取到这个最新的值~🐷

image-20210529095338345

那么,到此怎么偷的问题也有了答案啦~ 😝

:从任务队列的头部开始偷的,也就是我们画出来的数组底部开始~

WorkQueue 的源码中,还有这么一段 final ForkJoinWorkerThread owner;

我们可以发现 ForkJoinPool 又一个和其他线程池(ThreadPoolExecutor)不同的点:

每个线程都有自己的工作队列

那么我们可以画出偷窃图如下:

工作偷窃图

image-20210529124317132

知识小结

ForkJoinPool 也是一个线程池,线程数量默认由 Runtime.getRuntime().availableProcessors() 获取。

如果有线程没任务干了,它就会来尝试偷取其他线程的任务队列中的任务,采用 FIFO 的方式。

而线程从自己的任务队列中获取任务时

  • 如果是异步模式,则采用 FIFO 的方式 ,对应代码中的 poll 方法

  • 非异步的话是 LIFO 模式,对应代码中的 pop 方法。

说到这里,才发现这次的主角不是介绍它,哈哈 而是介绍 Executors 的方法,那到此小伙伴们是不是觉得这个 newWorkStealingPool 的名称取得挺贴切的 哈哈😄

有机会再好好聊聊这个 [[ForJoinPool]] ,初略统计了下面这些内容🐷

毕竟这个 work-stealing 是其中的一大特色!(远没有 4ye 写得那么简单) 哈哈

不仅仅在这个框架中用到, 在 [[Netty]] 的事件轮询 EventLoopGroup 中也有它的影子~  (SingleThreadEventExecutor),嘿嘿 后面分享网络通信的内容时,再来分享下咱们的这个 netty 😝

image-20210529154220194

线程池的使用场景

Springboot异步接口

这里就举举实际应用中的场景叭~

不知道小伙伴们使用过 Springboot异步接口没有,比如在下面的场景

  1. 大文件的下载,这种比较慢的 ,特别是涉及到一些复杂的统计时(如 excel )

  2. 发邮件,短信 等

  3. 异步日志

Springboot 也提供了一个异步任务的线程池给我们使用,前提是得加上

@EnableAsync 注解,才能生效,有下面两种配置方式。

可以通过 yml 文件直接配置,也可以直接通过 @Bean 注入

yaml

spring:
    task:
        execution:
            pool:
                core-size: 2
                max-size: 4
                queue-capacity: 10
                allow-core-thread-timeout: false
                keep-alive: 15

@Bean注入

@Configuration
@EnableAsync
public class ThreadPoolConfig {

    @Bean("taskExecutor")
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(5);
        // 设置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(1000);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("[4ye]-task");
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        //执行初始化
        executor.initialize();
        return executor;
    }

}

在源码中发现它的底层也是我们的 ThreadPoolExecutor ,这里可以看到它使用的阻塞队列如下~

其他的小伙伴们可以自己发掘下啦 嘿嘿😝

image-20210529173423323

Hystrix

线程隔离

在处理服务雪崩时,除了常听到的

服务熔断,服务降级 ,还有 请求缓存,请求合并,线程隔离 这几种办法

那显然,我们这里要介绍的就是 线程隔离 。😋 冲冲冲!

嘿嘿 这里就先不介绍这个信号量隔离啦 ,后面再写写[[Sentital]]的时候可以做个对比~

注解

@HystrixCommand(fallbackMethod = "errorFallback",
        threadPoolKey = "java4ye",
        commandProperties = {
                // 默认是线程隔离
                @HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"),
                // 超时时间,默认 1000 ms
                @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000"),
                // 启动熔断器
                @HystrixProperty(name = "circuitBreaker.enabled", value = "true"),
                // 10s 内达到熔断要求的线程数
                @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "2")},
        threadPoolProperties = {
                @HystrixProperty(name = "coreSize", value = "5"),
                @HystrixProperty(name = "maximumSize", value = "5"),
                @HystrixProperty(name = "maxQueueSize", value = "10")
        })

还有很多参数可以直接参考下面的 👇

配置文档[1]

也可以直接查看源码 HystrixCommandProperties

image-20210530234657372

最后

疯狂暗示😝 哈哈哈

image-20210519232009510

img

连接池

这里主要介绍这些知识点~😝

image-20210601002212731

数据库连接池

回忆下 JDBC 的写法 ✍ ,就能感受到连接池的好处了~

void jdbcTest() throws ClassNotFoundException {
    String url = "jdbc:mysql://localhost:3306/db";
    String username = "";
    String password = "";

    Class.forName("com.mysql.jdbc.Driver");
    Connection connection = null;
    PreparedStatement preparedStatement = null;
    try {
        connection = DriverManager.getConnection(url, username, password);
        preparedStatement= connection.prepareStatement("select * from User where name=?");
        preparedStatement.setString(1"Java4ye");
        preparedStatement.executeQuery();
    } catch (SQLException throwables) {
        throwables.printStackTrace();
    } finally {
        try {
            if (preparedStatement!=null){
                preparedStatement.close();
            }
            if (connection != null){
                connection.close();
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }

}

可以发现在这种模式下,每次都要去创建和销毁连接,造成了很大的浪费。我们可以尝试去优化下,

比如,参考上面 线程池的思想

  • 用完不销毁,先存起来;

  • 一开始的时候,就初始化多个连接;

  • 给连接加上最大空闲时间限定;

  • 加上最大连接数限定;

......

最后,你可以发现,你封装了一个简单的连接池了😄

img

数据库连接池原理图

数据库连接池原理图

那数据库连接池有啥作用呢?

负责连接的管理,分配和释放,当连接的空闲时间超过最大空闲时间时,会释放连接,避免连接泄露

常见的数据库连接池

这里就主要说说这个 HikariCPDruid 了,其他的如  C3P0DBCP 等就不说啦(主要是没用上 哈哈哈)

HikariCP

HikariCPSpringboot2 开始默认使用的连接池(官方认证~🐷),速度最快

优点

  1. 字节码精简 :优化代码,编译后的字节码最少,这样,CPU缓存可以加载更多的程序代码;

  2. 优化代理和拦截器 :减少代码,例如  HikariCP 的 Statement proxy 只有100行代码,只有 BoneCP 的十分之一;

  3. 自定义数组类型(FastStatementList)代替ArrayList :避免每次get()调用都要进行range check,避免调用remove()时的从头到尾的扫描;

  4. 自定义集合类型(ConcurrentBag) :提高并发读写的效率;

  5. 其他针对 BoneCP 缺陷的优化。

源码

有机会可以研究看看 ~

下面这段代码是 HikariCPgetConnection() 的源码~ 给大家感受下👇 ([[HikariCP 源码]])

   // ***********************************************************************
   //                          DataSource methods
   // ***********************************************************************

   /** {@inheritDoc} */
   @Override
   public Connection getConnection() throws SQLException
   {
      if (isClosed()) {
         throw new SQLException("HikariDataSource " + this + " has been closed.");
      }

      if (fastPathPool != null) {
         return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {
         synchronized (this) {
            result = pool;
            if (result == null) {
               validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {
                  pool = result = new HikariPool(this);
                  this.seal();
               }
               catch (PoolInitializationException pie) {
                  if (pie.getCause() instanceof SQLException) {
                     throw (SQLException) pie.getCause();
                  }
                  else {
                     throw pie;
                  }
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

      return result.getConnection();
   }

看下来就捕捉到单例设计模式的影子~

小伙伴们可以看看之前这篇 《一文带你看遍单例模式的八个例子,面试再也不怕被问了

是不是用到了单例的 双重检查锁模式  😄

配置属性

有这么多!🐷

private static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
private static final long CONNECTION_TIMEOUT = SECONDS.toMillis(30);
private static final long VALIDATION_TIMEOUT = SECONDS.toMillis(5);
private static final long IDLE_TIMEOUT = MINUTES.toMillis(10);
private static final long MAX_LIFETIME = MINUTES.toMillis(30);
private static final int DEFAULT_POOL_SIZE = 10;

private static boolean unitTest = false;

// Properties changeable at runtime through the HikariConfigMXBean
//
private volatile String catalog;
private volatile long connectionTimeout;
private volatile long validationTimeout;
private volatile long idleTimeout;
private volatile long leakDetectionThreshold;
private volatile long maxLifetime;
private volatile int maxPoolSize;
private volatile int minIdle;
private volatile String username;
private volatile String password;

// Properties NOT changeable at runtime
//
private long initializationFailTimeout;
private String connectionInitSql;
private String connectionTestQuery;
private String dataSourceClassName;
private String dataSourceJndiName;
private String driverClassName;
private String exceptionOverrideClassName;
private String jdbcUrl;
private String poolName;
private String schema;
private String transactionIsolationName;
private boolean isAutoCommit;
private boolean isReadOnly;
private boolean isIsolateInternalQueries;
private boolean isRegisterMbeans;
private boolean isAllowPoolSuspension;
private DataSource dataSource;
private Properties dataSourceProperties;
private ThreadFactory threadFactory;
private ScheduledExecutorService scheduledExecutor;
private MetricsTrackerFactory metricsTrackerFactory;
private Object metricRegistry;
private Object healthCheckRegistry;
private Properties healthCheckProperties;

private volatile boolean sealed;

默认值👉

image-20210515200950942

简单介绍下上面红框中的配置:

CONNECTION_TIMEOUT = SECONDS.toMillis(30); // 连接超时,默认 30 秒
VALIDATION_TIMEOUT = SECONDS.toMillis(5); // 验证连接有效性的超时时间,默认 5 秒
IDLE_TIMEOUT = MINUTES.toMillis(10); // 空闲连接存活最大时间 , 默认 10 分钟
MAX_LIFETIME = MINUTES.toMillis(30); // 连接最大存活时间 , 默认 30 分钟
DEFAULT_POOL_SIZE = 10;             // 默认连接池大小 , 10

一些常见的配置

spring:
  datasource:
    hikari:
      # 最小空闲连接数量
      minimum-idle: 5
      # 连接超时,默认 30 秒
      connection-timeout: 30
      # 空闲连接存活最大时间 , 默认 10 分钟
      idle-timeout: 3
      # 连接池最大连接数,默认是10
      maximum-pool-size: 20
      # 连接池名称
      pool-name: 4yePool_HikariCP
      # 自动提交事务,默认 true
      auto-commit: true
      # 连接最长的生命周期,默认 30 分钟
      max-lifetime: 1800

魔力转转转

嘿嘿 再来介绍下阿里的 Druid 🐂

Druid

阿里云计算平台团队出品,为监控而生的数据库连接池

Alibaba-Druid

下面那个是 ApacheDruid ,两个是不同的! 一个是连接池,一个是数据库~

Apache Druid是一个高性能的实时分析型数据库

Druid连接池介绍

主要特点是监控,提供了web页面给我们监控 SQL,Session,Spring 等等~ 而且据说监控对性能的影响很小🐷

小伙伴们点击下面连接就可以看到啦~👇

介绍[2]

常见问题

Druid 常见问题 可以在 githubwiki 中查看😃

常见问题[3]

很多问题都可以在上面找到答案~ 比如

  • 数据库密码加密 [4]

  • 连接泄漏监测 (RemoveAbandanded功能不建议) [5]

  • 如何配置多数据源 [6]

  • 如何配置 Filter [7]

  • 如何获取 Druid 的监控数据 [8]

......

功能:

如图~ (原来这么多!)

功能

优点:

经历了阿里大规模验证!😄

是 阿里巴巴内部唯一使用的连接池,在内部数据库相关中间件 TDDL/DRDS[9] 都内置使用强依赖了Druid连接池

Springboot 集成 druid 👇

集成文档[10]

这里配置超级全,大家就直接看文档就好啦 😄

img

HttpClient连接池

http请求过程

如果所示,这是一个 Http 请求的大致过程

http请求过程

HTTP keep-alive 机制

上面的请求也是一个短链接的过程,每次请求结束就关闭连接了,很浪费资源,而长连接和短链接的最主要区别就是这个 keep-alive 机制。

这里简单说下这个 长连接  keep-alive

http1.0 中 ,请求头中是默认没有这个 Connection: keep-alive

而在 http1.1 中 , 请求头中默认是带上这个的。

TCP  keep-alive (保活)机制

除了应用层的 HTTP 有这个机制外,传输层的 TCP 也有。

TCP 保活机制的主要特点是 每隔一段时间会通过心跳包检测对端是否存活

这里参考下 linux

我们可以通过 sysctl -a | grep keepalive 查看内核参数

net.ipv4.tcp_keepalive_intvl = 75 
net.ipv4.tcp_keepalive_probes = 9
net.ipv4.tcp_keepalive_time = 7200
  • 保活时间(tcp_keepalive_time)默认:7200 秒

  • 保活时间间隔(tcp_keepalive_intvl)默认:75 秒

  • 探测循环次数(tcp_keepalive_probes)默认:9 次

这里的意思是,在两个钟(7200秒)内没有收到报文的话,会每隔75秒发送一个 保活探测包 ,重复9 次,直到收到响应,到达9次的话,如果还是没有响应,就会关闭连接。

保活机制的区别

HTTP 是为了复用连接 , 而 TCP 是为了保证对端存活,如果对端挂了的话,会关闭 TCP 连接。

连接池

这里和大家扯这么多主要是为了简单说说这个 Http 建立连接的麻烦 ,哈哈哈 频繁的创建和销毁连接很不友好。(说来说去都是这句话😝)

img

而且在 HTTP 的背后,还有着这么一个保活机制,这也意味着我们的连接池在实现这个连接的复用时,还要考虑这个Keep-alive 机制~

所以。。说了这么多,还是来看看这个 apache 的  httpcomponents 中的 httpclient 有啥秘密叭 😝

HttpClient 介绍[11]

总体上分为下面七大块

  1. Fundamentals [12]

  2. Connection management [13]

  3. HTTP state management [14]

  4. HTTP authentication [15]

  5. Fluent API [16]

  6. HTTP Caching [17]

  7. Advanced topics [18]

Keep Alive 策略

Keep Alive 策略

官网介绍

The HTTP specification does not specify how long a persistent connection may be and should be kept alive. Some HTTP servers use a non-standard Keep-Alive header to communicate to the client the period of time in seconds they intend to keep the connection alive on the server side. HttpClient makes use of this information if available.

If the Keep-Alive header is not present in the response, HttpClient assumes the connection can be kept alive indefinitely.

可以看到如果返回头中没有设置这个 Keep-Avlie 的话,HttpClient 会认为它是无限期存活的!

However, many HTTP servers in general use are configured to drop persistent connections after a certain period of inactivity in order to conserve system resources, quite often without informing the client. In case the default strategy turns out to be too optimistic, one may want to provide a custom keep-alive strategy.

这里直接用官网的简单例子运行下,从debug日志中可以看到,没配置的话,确实输出了  kept alive indefinitely

debug日志

代码 demo

 public static void main(String[] args) throws IOException {
        CloseableHttpClient httpclient = HttpClients.createDefault();
        HttpGet httpGet = new HttpGet("http://www.baidu.com");
        CloseableHttpResponse response1 = httpclient.execute(httpGet);
// The underlying HTTP connection is still held by the response object
// to allow the response content to be streamed directly from the network socket.
// In order to ensure correct deallocation of system resources
// the user MUST call CloseableHttpResponse#close() from a finally clause.
// Please note that if response content is not fully consumed the underlying
// connection cannot be safely re-used and will be shut down and discarded
// by the connection manager.
        try {
            System.out.println(response1.getStatusLine());
            HttpEntity entity1 = response1.getEntity();
            // do something useful with the response body
            // and ensure it is fully consumed
            EntityUtils.consume(entity1);
        } finally {
            response1.close();
        }

        HttpPost httpPost = new HttpPost("http://www.baidu.com");
        List <NameValuePair> nvps = new ArrayList <NameValuePair>();
        nvps.add(new BasicNameValuePair("username""vip"));
        nvps.add(new BasicNameValuePair("password""secret"));
        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
        CloseableHttpResponse response2 = httpclient.execute(httpPost);

        try {
            System.out.println(response2.getStatusLine());
            HttpEntity entity2 = response2.getEntity();
            // do something useful with the response body
            // and ensure it is fully consumed
            EntityUtils.consume(entity2);
        } finally {
            response2.close();
        }
    }

源码分析

嘿嘿  ,来都来了,就顺手 debug 分析下上面这个 execute 方法~

从下图中我们可以发现,这里会去调用 reuseStrategy.keepAlive() 做判断,接着通过keepAliveStrategy.getKeepAliveDuration 去获取该连接的存活时间 🐷

源码

这两个策略分别为 重用策略 ConnectionReuseStrategy 和 保活策略 ConnectionKeepAliveStrategy

源码

省去debug步骤~ 🐷

我们直接来到这个默认的重用策略 DefaultConnectionReuseStrategy  ,来看看这里是怎么去判断这个连接可以不可以重用叭~

keepAlive 源码

public boolean keepAlive(HttpResponse response, HttpContext context) {
    Args.notNull(response, "HTTP response");
    Args.notNull(context, "HTTP context");
    if (response.getStatusLine().getStatusCode() == 204) {
        Header clh = response.getFirstHeader("Content-Length");
        if (clh != null) {
            try {
                int contentLen = Integer.parseInt(clh.getValue());
                if (contentLen > 0) {
                    return false;
                }
            } catch (NumberFormatException var11) {
            }
        }

        Header teh = response.getFirstHeader("Transfer-Encoding");
        if (teh != null) {
            return false;
        }
    }

    HttpRequest request = (HttpRequest)context.getAttribute("http.request");
    if (request != null) {
        try {
            BasicTokenIterator ti = new BasicTokenIterator(request.headerIterator("Connection"));

            while(ti.hasNext()) {
                String token = ti.nextToken();
                if ("Close".equalsIgnoreCase(token)) {
                    return false;
                }
            }
        } catch (ParseException var13) {
            return false;
        }
    }

    ProtocolVersion ver = response.getStatusLine().getProtocolVersion();
    Header teh = response.getFirstHeader("Transfer-Encoding");
    if (teh != null) {
        if (!"chunked".equalsIgnoreCase(teh.getValue())) {
            return false;
        }
    } else if (this.canResponseHaveBody(request, response)) {
        Header[] clhs = response.getHeaders("Content-Length");
        if (clhs.length != 1) {
            return false;
        }

        Header clh = clhs[0];

        try {
            long contentLen = Long.parseLong(clh.getValue());
            if (contentLen < 0L) {
                return false;
            }
        } catch (NumberFormatException var10) {
            return false;
        }
    }

    HeaderIterator headerIterator = response.headerIterator("Connection");
    if (!headerIterator.hasNext()) {
        headerIterator = response.headerIterator("Proxy-Connection");
    }

    if (headerIterator.hasNext()) {
        try {
            TokenIterator ti = new BasicTokenIterator(headerIterator);
            boolean keepalive = false;

            while(ti.hasNext()) {
                String token = ti.nextToken();
                if ("Close".equalsIgnoreCase(token)) {
                    return false;
                }

                if ("Keep-Alive".equalsIgnoreCase(token)) {
                    keepalive = true;
                }
            }

            if (keepalive) {
                return true;
            }
        } catch (ParseException var12) {
            return false;
        }
    }

    return !ver.lessEquals(HttpVersion.HTTP_1_0);
}

哈哈哈 不想看分析的话 往下滑动一点点,有图~ 😝

分析

  1. 判断 HTTP  响应头中的状态码是不是 204 ,是的话进入下面的判断

    • 响应头中是否有 Content-Length  ,有的话看它的值是否大于0,大于的时候 不重用

    • 响应头 中是否有 Transfer-Encoding ,有的话 不重用

      ( Tip:  204 表示 No Content:服务器成功处理了请求,但没返回任何内容 ,但是上面两种都表示还有内容,是错误的❌,所以不重用)

  2. 判断 请求头中是否有 Connection:Close ,有的话也 不重用

  3. 判断 响应头 中是否有 Transfer-Encoding,有的话看它的值,如果值 不等于 chunked不重用

  4. 判断 响应头中是否有 Content-Length   , 有的话看它的值,如果值 小于 0 , 不重用

  5. 判断  响应头中是否有 Connection 或者 Proxy-Connection 其中的一个 ,有的话看它的值,如果是 Close不重用,如果是 Keep-Alive  重用

  6. 最后,如果上面的判断条件都不成立,会判断 http 的版本是不是 小于 1.0,是的话也返回 false

贴心的 4ye 居然画了这么详细的 流程图😝  (感谢老板的 一键三连 😝)

HttpClient重用策略

这里要注意,连接池中有两个默认的参数很重要(下图👇),而且官网的 demo 肯定是不能直接用在生产环境下的, 不然... 等监控报警的时候,就有得难受了 哈哈

下面整理了一些配置 给小伙伴们参考下~ 😝

如下图

这两个分别是

路由最大连接数defaultMaxPerRoute  ,默认值 2 ,表示对某个 ip / 路由 一次能处理的最大并发数  。例如,如果我去请求百度,则同一时刻,最多能处理两个请求,所以 别忘了修改它!🐷 ,不然你的连接池配多少连接都没用~

 

总连接数: maxTotal  , 默认值 20 , 这个表示总的连接数,即 连接的最大并发数是20

HttpClientConnectionManager

PoolingHttpClientConnectionManager 设置

PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
poolingHttpClientConnectionManager.setMaxTotal(6);
poolingHttpClientConnectionManager.setDefaultMaxPerRoute(1);
poolingHttpClientConnectionManager.setMaxPerRoute(new HttpRoute(new HttpHost("www.baidu.com")),4);

如上所示,还可以自定义某个 route 的最大值,效果如下~

DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager - Connection leased: [id: 41][route: {}->http://www.baidu.com:80][total available: 1; route allocated: 2 of 4; total allocated: 6 of 6]

其他参数 如 SocketConfigRequestConfig  等也要进行相应的配置,设置 等待数据的超时时间 和 请求超时时间 等,还有 重发策略 serviceUnavailStrategyKeep-Alive  策略 ConnectionKeepAliveStrategy

img

RequestConfig 设置

  1. connectionRequestTimout:从连接池获取连接超时

  2. connetionTimeout:连接服务器超时

  3. socketTimeout :等待数据超时

RequestConfig requestConfig = RequestConfig.custom()
        .setConnectionRequestTimeout(1000) 
        .setConnectTimeout(1000)
        .setSocketTimeout(1000).build();

ConnectionKeepAliveStrategy  (保活策略)设置

代码如下

ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
            // Honor 'keep-alive' header
            HeaderElementIterator it = new BasicHeaderElementIterator(
                    response.headerIterator(HTTP.CONN_KEEP_ALIVE));
            while (it.hasNext()) {
                HeaderElement he = it.nextElement();
                String param = he.getName();
                String value = he.getValue();
                if (value != null && param.equalsIgnoreCase("timeout")) {
                    try {
                        return Long.parseLong(value) * 1000;
                    } catch (NumberFormatException ignore) {
                    }
                }
            }
            HttpHost target = (HttpHost) context.getAttribute(
                    HttpClientContext.HTTP_TARGET_HOST);
            if ("www.baidu.com".equalsIgnoreCase(target.getHostName())) {
                // Keep alive for 5 seconds only
                return 5 * 1000;
            } else {
                // otherwise keep alive for 30 seconds
                return 30 * 1000;
            }
        };

okhttp 就不展开啦,嘿嘿,这里的连接池使用场面也挺多的,比如我们接着要来讲的  RestTemplate ,还有这个 Feign (这个就先记着啦 嘿嘿 有时间再补上)

现在来简单说下怎么在 RestTemplate  中使用这个连接池~ 🐷

RestTemplate

它提供了一个简单的 SimpleClientHttpRequestFactory , 该类里面主要有  connetionTimeoutreadTimeout  这两个超时设置,额 实在是太简单了...  大部分时候还是不能满足的,所以我们还是要选择其他的连接池呀~ !

image-20210501203538974

可以看到上面中还有 OkHttp ,还有 Netty 等等,小伙伴们可以根据需要选择~  (๑•̀ㅂ•́)و✧

不过为啥 Netty 会标记为已过期了呢?

嘿嘿,这里下载了 Netty4ClientHttpRequestFactory  源码 ,可以看到 第一个红框里说,每次 http 请求的连接都会被关闭 。

这显然不能重用连接以及保持长连接了~ 😄

image-20210509224325366

配置 HttpClient

我们也可以这样配置,就可以使用到这个 HttpClient 了 。

@Configuration
public class RestTemplateConfig {
    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
        RestTemplate restTemplate = new RestTemplate(factory);
        // 支持中文编码
        restTemplate.getMessageConverters().set(1new StringHttpMessageConverter(Charset.forName("UTF-8")));
        return restTemplate;
    }

    @Bean
    public ClientHttpRequestFactory httpComponentsClientHttpRequestFactory() {
        return  new HttpComponentsClientHttpRequestFactory(httpClient());
    }
    @Bean
    public HttpClient httpClient() {

        PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
        poolingHttpClientConnectionManager.setMaxTotal(6);
        poolingHttpClientConnectionManager.setDefaultMaxPerRoute(1);
        poolingHttpClientConnectionManager.setMaxPerRoute(new HttpRoute(new HttpHost("www.baidu.com")), 4);

        ConnectionKeepAliveStrategy myStrategy = (response, context) -> {
            // Honor 'keep-alive' header
            HeaderElementIterator it = new BasicHeaderElementIterator(
                    response.headerIterator(HTTP.CONN_KEEP_ALIVE));
            while (it.hasNext()) {
                HeaderElement he = it.nextElement();
                String param = he.getName();
                String value = he.getValue();
                if (value != null && param.equalsIgnoreCase("timeout")) {
                    try {
                        return Long.parseLong(value) * 1000;
                    } catch (NumberFormatException ignore) {
                    }
                }
            }
            HttpHost target = (HttpHost) context.getAttribute(
                    HttpClientContext.HTTP_TARGET_HOST);
            if ("www.baidu.com".equalsIgnoreCase(target.getHostName())) {
                // Keep alive for 5 seconds only
                return 5 * 1000;
            } else {
                // otherwise keep alive for 30 seconds
                return 30 * 1000;
            }
        };

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectionRequestTimeout(5000)
                .setConnectTimeout(10000)
                .setSocketTimeout(5000).build();

       return HttpClients.custom()
                .setConnectionManager(poolingHttpClientConnectionManager)
                .setKeepAliveStrategy(myStrategy)
                .setDefaultRequestConfig(requestConfig)
                .build();
    }

}

img

前往下一站,Redis 连接池

Redis 连接池

redis 的官网中 ,我们可以发现有下面这些客户端

客户端地址[19]

image-20210531085114518

这里我们主要介绍这个 lettuce

Jedis 和 Lettuce

好久以前,在花菜🥦 lettuce 还没有出现以前,Springboot 默认使用的是Jedis

为啥现在默认使用的是这个 lettuce 呢?

作者的回答[20]

image-20210510080302046

嘿嘿,翻译下作者的原话  😝

  1. Jedis 是一个 直连Redis 客户端,在多线程环境下共享同一个 Jedis 实例,这是 线程不安全的。

  2. 在多线程环境中使用 Jedis 的方法是使用 连接池。每个使用 Jedis 的并发线程在 Jedis 交互期间获得自己的 Jedis 实例。连接池是以每个 Jedis 实例的物理连接为代价的,这增加了 Redis 连接的数量。

  3. lettuce 是建立在 netty 和连接实例(StatefulRedisConnection)之上,可以跨多个线程共享。因此,多线程应用程序可以使用单个连接而不考虑与 Lettuce 交互的并发线程数。

配置

<!-- spring boot redis 缓存引入 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- lettuce pool 缓存连接池 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
    GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
    genericObjectPoolConfig.setMaxIdle(maxIdle);
    genericObjectPoolConfig.setMinIdle(minIdle);
    genericObjectPoolConfig.setMaxTotal(maxActive);
    genericObjectPoolConfig.setMaxWaitMillis(maxWait);
    genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(100);
    RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
    redisStandaloneConfiguration.setDatabase(database);
    redisStandaloneConfiguration.setHostName(host);
    redisStandaloneConfiguration.setPort(port);
    redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
    LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .commandTimeout(Duration.ofMillis(timeout))
            .shutdownTimeout(Duration.ofMillis(shutDownTimeout))
            .poolConfig(genericObjectPoolConfig)
            .build();

    LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
    factory.setShareNativeConnection(false);
    return factory;
}

注意这里,如果想要连接池生效的话,必须设置配置这句话,不然默认只用一条连接的🐖

lettuceConnectionFactory.setShareNativeConnection(false);

特意去官网看了下,发现真是这样操作的 (⊙﹏⊙)

image-20210531220518918

而且当你配置了最小连接数时,你会发现在 redis 中,查到的连接数是比你配置的多一个,额 目前还不知道是啥连接,先记录下 嘿嘿~

img

最后

这篇文章就分享到这里啦,至于 常量池 又要夹着 jvm ,还有字节码这一块来说,所以还是另外再写✍ 啦!

嘿嘿,还记得第一个最后的内容吗~  就是那个在线程池的“最后”目录来 ,快去看看叭!

我是4ye 咱们下期再见!╰(°▽°)╯   

谢谢可爱又帅气的大佬们的观看!祝您 天天开心!😄  

 

 

感谢您的关注!您的每个关注,都是博主 肝肝肝的动力 😝

         

 

往期推荐

 

 

Java中的锁居然有这么多!

图解线程生命周期

站了5个小时换来的书单

面试官:线程有几种创建方式?

终于来到 ConcurrentHashMap 了~

fail-safe 和 fail-fast 硬核解析,让你和面试官多聊十分钟!

面试, HashMap,看?

 

 

 

 

参考资料

[1]

配置文档: https://gitee.com/nilera/Hystrix/wikis/Configuration

[2]

介绍: https://github.com/alibaba/druid/wiki/Druid%E8%BF%9E%E6%8E%A5%E6%B1%A0%E4%BB%8B%E7%BB%8D

[3]

常见问题: https://github.com/alibaba/druid/wiki/Druid%E8%BF%9E%E6%8E%A5%E6%B1%A0%E4%BB%8B%E7%BB%8D

[4]

数据库密码加密: https://github.com/alibaba/druid/wiki/%E5%A6%82%E4%BD%95%E5%9C%A8Spring-Boot%E4%B8%AD%E9%85%8D%E7%BD%AE%E6%95%B0%E6%8D%AE%E5%BA%93%E5%AF%86%E7%A0%81%E5%8A%A0%E5%AF%86%EF%BC%9F

[5]

连接泄漏监测 (RemoveAbandanded功能不建议): https://github.com/alibaba/druid/wiki/%E8%BF%9E%E6%8E%A5%E6%B3%84%E6%BC%8F%E7%9B%91%E6%B5%8B

[6]

如何配置多数据源: https://github.com/alibaba/druid/tree/master/druid-spring-boot-starter#%E5%A6%82%E4%BD%95%E9%85%8D%E7%BD%AE%E5%A4%9A%E6%95%B0%E6%8D%AE%E6%BA%90

[7]

如何配置 Filter: https://github.com/alibaba/druid/tree/master/druid-spring-boot-starter#%E5%A6%82%E4%BD%95%E9%85%8D%E7%BD%AE%E5%A4%9A%E6%95%B0%E6%8D%AE%E6%BA%90

[8]

如何获取 Druid 的监控数据: https://github.com/alibaba/druid/tree/master/druid-spring-boot-starter#%E5%A6%82%E4%BD%95%E9%85%8D%E7%BD%AE%E5%A4%9A%E6%95%B0%E6%8D%AE%E6%BA%90

[9]

DRDS: https://www.aliyun.com/product/drds

[10]

集成文档: https://github.com/alibaba/druid/tree/master/druid-spring-boot-starter

[11]

HttpClient 介绍: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/

[12]

Fundamentals: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/fundamentals.html

[13]

Connection management: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/connmgmt.html

[14]

HTTP state management: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/statemgmt.html

[15]

HTTP authentication: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/authentication.html

[16]

Fluent API: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/fluent.html

[17]

HTTP Caching: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/caching.html

[18]

Advanced topics: http://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/advanced.html

[19]

客户端地址: https://redis.io/clients#java

[20]

作者的回答: https://github.com/spring-projects/spring-session/issues/789

 

本文分享自微信公众号 - Java4ye(Java4ye)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部