本文最后更新于:4 个月前
破冰
🔥 线程池:
💥 异步编程:
🍖 锁:
线程池 线程池的创建(参数) 线程池的运行机制(理论 + 演示) 线程池的使用(实战 submit + CompatableFuture异步)
精髓所在 线程基础
线程状态总结
Java 创建线程的十种方式
🌮 推荐阅读:大家都说Java有三种创建线程的方式!并发编程中的惊天骗局! - 掘金 (juejin.cn)
继承 Thread 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class ExtendsThread extends Thread { @Override public void run () { System.out.println("1......" ); } public static void main (String[] args) { new ExtendsThread ().start(); } }
实现 Runnable 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class ImplementsRunnable implements Runnable { @Override public void run () { System.out.println("2....." ); } public static void main (String[] args) { new Thread (new ImplementsRunnable ()).start(); } }
实现 Callable 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ImplementsCallable implements Callable { @Override public Object call () { System.out.println("3...." ); return "hhh" ; } public static void main (String[] args) { FutureTask<String> futureTask = new FutureTask <>(new ImplementsCallable ()); new Thread (futureTask).start(); } }
使用 ExecutorService 线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class UseExecutorsService { public static void main (String[] args) { ThreadPoolExecutor threadPoolB = new ThreadPoolExecutor (3 , 5 , 0 , TimeUnit.SECONDS, new LinkedBlockingDeque <Runnable>(4 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy()); threadPoolB.submit(() -> { System.out.println("4B......" ); }); threadPoolB.shutdown(); ExecutorService threadPoolA = Executors.newFixedThreadPool(5 ); while (true ) { threadPoolA.execute(() -> { System.out.println("4.1...." + Thread.currentThread().getName()); }); try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } } }
使用 CompletableFuture 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class UseCompletableFuture { public static void main (String[] args) { CompletableFuture.supplyAsync(() -> { System.out.println("5.1...." ); return "hhh" ; }); CompletableFuture.runAsync(() -> { System.out.println("5.2...." ); }); } }
基于 ThreadGroup 线程组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class UseThreadGroup { public static void main (String[] args) { ThreadGroup threadGroup = new ThreadGroup ("groupName" ); new Thread (threadGroup, () -> { System.out.println("6...." + Thread.currentThread().getName()); }).start(); new Thread (threadGroup, () -> { System.out.println("6...." + Thread.currentThread().getName()); },"T2" ).start(); new Thread (threadGroup, () -> { System.out.println("6...." + Thread.currentThread().getName()); }).start(); } }
使用 FutureTask 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class UseFutureTask { public static void main (String[] args) { FutureTask<String> futureTask = new FutureTask <>(() -> { System.out.println("7...." ); return "hhh" ; }); new Thread (futureTask).start(); } }
使用匿名内部类或 Lambda 表达式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class UseAnonymousClass { public static void main (String[] args) { new Thread (new Runnable () { @Override public void run () { System.out.println("8...." ); } }).start(); new Thread (() -> { System.out.println("8...." ); }).start(); } }
使用 Timer 定时器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class UseTimer { public static void main (String[] args) { Timer timer = new Timer (); timer.schedule(new TimerTask () { @Override public void run () { System.out.println("9...." ); } }, 2000 , 1000 ); } }
使用 ForkJoin 线程池或 Stream 并行流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class UseForkJoinPool { public static void main (String[] args) { ForkJoinPool joinPool = new ForkJoinPool (); joinPool.execute(() -> { System.out.println("10...." ); }); List<String> list = Arrays.asList("10B......" ); list.parallelStream().forEach(System.out::println); } }
线程池 使用线程池的优势 构造线程池 核心参数介绍
corePoolSize(核心线程数)(2023/10/22晚)
maximumPoolSize(最大线程数)
keepAliveTime(闲置超时时长)
unit(闲置超时单位)
workQueue(任务队列)
threadFactory(线程工厂)
handler(拒绝策略)
线程池的工作原理 图文介绍 实操测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Configuration public class ThreadPoolExecutorConfig { @Bean public ThreadPoolExecutor threadPoolExecutor () { ThreadFactory threadFactory = new ThreadFactory () { private int count = 1 ; @Override public Thread newThread (Runnable r) { Thread thread = new Thread (r); thread.setName("线程" + count); count++; return thread; } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (2 , 4 , 100 , TimeUnit.MINUTES, new ArrayBlockingQueue <>(4 ), threadFactory); return threadPoolExecutor; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RestController @RequestMapping("/queue") @Slf4j public class QueueController { @Resource private ThreadPoolExecutor threadPoolExecutor; ............................... }
1 2 3 4 5 6 7 8 9 10 11 12 13 @GetMapping("/get") public String get () { HashMap<String, Object> map = new HashMap <>(); int size = threadPoolExecutor.getQueue().size(); map.put("队列长度" , size); long taskCount = threadPoolExecutor.getTaskCount(); map.put("任务总数" , taskCount); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); map.put("已完成任务数" , completedTaskCount); int activeCount = threadPoolExecutor.getActiveCount(); map.put("正在工作的线程数" , activeCount); return JSONUtil.toJsonStr(map); }
1 2 3 4 5 6 7 8 9 10 11 @GetMapping("/add") public void add (String name) { CompletableFuture.runAsync(() -> { log.info(Thread.currentThread().getName() + "正在执行中" ); try { Thread.sleep(600000 ); } catch (InterruptedException e) { e.printStackTrace(); } }, threadPoolExecutor); }
其他参数介绍 阻塞队列 线程工厂与自定义工厂 拒绝策略与自定义策略 线程池的工作机制
这部分内容我计划在新的博文中介绍,可以看这篇文章学习:
🥣 推荐阅读:
简单测试,理解线程池工作流程 自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public class ThreadPoolExecutorConfig { @Bean public ThreadPoolExecutor threadPoolExecutor () { ThreadFactory threadFactory = new ThreadFactory () { private int count = 1 ; @Override public Thread newThread (@NotNull Runnable r) { Thread thread = new Thread (r); thread.setName("线程" + count); count++; return thread; } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (2 , 4 , 100 , TimeUnit.MINUTES, new ArrayBlockingQueue <>(4 ), threadFactory); return threadPoolExecutor; } }
在代码中,首先定义了一个ThreadFactory接口的实现类 ,用于创建线程 。(2023/10/15晚)
在newThread方法 中,通过创建线程并设置名称的方式来实现线程的创建 。每次创建线程时,都会使用一个计数器count,用于记录线程的数量,保证线程名称的唯一性 。
然后,使用ThreadPoolExecutor类创建一个线程池对象 。括号内的参数依次为:核心线程数、最大线程数、线程空闲时间、时间单位、任务队列以及线程工厂 。这些参数分别表示线程池的基本配置,如最小/最大线程数、线程空闲时间等。其中,任务队列使用了ArrayBlockingQueue,表示使用有界队列来存储线程任务 。
最后,将创建好的线程池对象返回 ,供其他地方进行调用和使用。
提交任务到线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @RestController @RequestMapping("/queue") @Slf4j public class QueueController { @Resource private ThreadPoolExecutor threadPoolExecutor; @GetMapping("/get") public String get () { HashMap<String, Object> map = new HashMap <>(); int size = threadPoolExecutor.getQueue().size(); map.put("队列长度" , size); long taskCount = threadPoolExecutor.getTaskCount(); map.put("任务总数" , taskCount); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); map.put("已完成任务数" , completedTaskCount); int activeCount = threadPoolExecutor.getActiveCount(); map.put("正在工作的线程数" , activeCount); return JSONUtil.toJsonStr(map); } @GetMapping("/add") public void add (String name) { CompletableFuture.runAsync(() -> { log.info(Thread.currentThread().getName() + "正在执行中" ); try { Thread.sleep(600000 ); } catch (InterruptedException e) { e.printStackTrace(); } }, threadPoolExecutor); } }
这段代码是一个简单的示例,在Spring Boot中使用自定义的线程池 。首先通过@Resource注解将之前配置好的线程池对象threadPoolExecutor注入进来,然后定义了两个接口方法:get和add 。
在get方法中,通过调用线程池对象的不同方法,获取了线程池的一些状态信息 ,如队列长度、任务总数、已完成任务数和正在工作的线程数 ,并将这些信息封装进一个HashMap 中,最后使用 JSONUtil.toJsonStr 方法将其转换成JSON格式的字符串 返回。
在add方法中,使用 CompletableFuture.runAsync 方法来在线程池中执行一个任务 。这里的任务是一个简单的代码块,通过log输出当前线程的名称,并休眠10分钟。通过将线程池对象threadPoolExecutor作为参数传递给runAsync方法,使得任务在该线程池中执行。
通过这段代码,我们可以在Spring Boot项目中方便地使用自定义的线程池,并对其进行状态监控和管理 。(2023/10/15晚)
由 ThreadPoolExecutorConfig 配置可知,我们自定义的线程池参数设置如下:
核心线程数:2
最大线程数:4
任务队列数:4
超时等待时间:未设置,默认拒绝
时间单位:秒(SECEND)
我们开始测试,依次添加任务,观察执行的线程数和任务队列数,详情如下:
简单描述一下测试情况吧:
当正在运行的线程数未达到核心线程数阈值 时,优先添加线程处理新任务
当正在运行的线程数达到核心线程数阈值,但任务队列未满 时,优先将任务放入任务队列中
当任务队列放满后,但正在运行的线程数未达到最大线程数阈值 时,优先添加线程处理新任务
当正在运行的线程数达到最大线程数阈值后 ,采用合适的拒绝策略(这里我们采用默认的拒绝策略:直接扔掉这个任务)
测试完成(2023/10/15晚)
真实业务场景下的实战 批量插入用户记录
1 2 3 4 StopWatch stopWatch = new StopWatch (); stopWatch.start();
限制可创建的最大线程数量,限制每个线程最大可插入的用户记录条数:
1 2 3 4 int batchSize = 1000 ;int maxSize = 100 ;
异步条件下,执行批量插入,并使用异步任务集合 futureList
维护所有正在执行中的异步任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 for (int i = 0 , j = 0 ; i < maxSize; i++) { ArrayList<User> userList = new ArrayList <>(); while (true ) { j++; User user = new User (); user.setUserAccount("memory" + "_" + (UUID.randomUUID() + "" ).substring(0 , 8 )); String password = DigestUtils.md5DigestAsHex((SALT + 12345678 ).getBytes()); user.setUserPassword(password); user.setAvatarUrl("https://fastly.jsdelivr.net/npm/@vant/assets/ipad.jpeg" ); user.setGender("0" ); user.setPhone("18535854763" ); user.setEmail("3348407547@qq.com" ); user.setUserStatus(0 ); user.setUserRole(0 ); user.setPlanetCode("17625" ); user.setTags("[\"女\",\"Vue\",\"Python\",\"在校本科\",\"发呆\",\"emo中\"]" ); userList.add(user); if (j % batchSize == 0 ) { break ; } } CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("threadName: " + Thread.currentThread().getName()); userService.saveBatch(userList, batchSize); }); futureList.add(future); }
结束所有异步任务,计时结束,计算批量插入数据所消耗的时间:
1 2 3 4 5 6 7 CompletableFuture<Void> future = CompletableFuture.allOf(futureList.toArray(new CompletableFuture []{})); future.join(); stopWatch.stop(); System.out.println(stopWatch.getTotalTimeMillis());
异步查询图片数据
1 2 3 4 5 6 7 8 9 10 11 CompletableFuture<Page<PostVO>> postTask = CompletableFuture.supplyAsync(() -> postDataSource.search(searchText, pageSize, current)); CompletableFuture<Page<ArticleVO>> articleTask = CompletableFuture.supplyAsync(() -> articleDataSource.search(searchText, pageSize, current)); CompletableFuture<Page<Picture>> pictureTask = CompletableFuture.supplyAsync(() -> pictureDataSource.search(searchText, pageSize, current));
1 CompletableFuture.allOf(postTask, pictureTask, articleTask).join();
1 2 3 4 5 6 7 8 9 10 11 12 13 try { Page<PostVO> postVOPage = postTask.get(); Page<Picture> picturePage = pictureTask.get(); Page<ArticleVO> articlePage = articleTask.get(); searchVO = new SearchVO (); searchVO.setPostVOList(postVOPage.getRecords()); searchVO.setPictureList(picturePage.getRecords()); searchVO.setArticleList(articlePage.getRecords()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException (e); }
AI服务处理请求任务
这个需求场景是我在做 Memory BI智能分析平台 时遇到的:调用第三方AI服务处理用户请求
自定义线程池:(2024/01/09晚)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class ThreadPoolExecutorConfig { @Bean public ThreadPoolExecutor threadPoolExecutor () { ThreadFactory threadFactory = new ThreadFactory () { private int count = 1 ; @Override public Thread newThread (@NotNull Runnable r) { Thread thread = new Thread (r); thread.setName("线程" + count); count++; return thread; } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (2 , 4 , 100 , TimeUnit.MINUTES, new ArrayBlockingQueue <>(4 ), threadFactory); return threadPoolExecutor; } }
1 2 3 String name = genChartByAiRequest.getName();String goal = genChartByAiRequest.getGoal();String chartType = genChartByAiRequest.getChartType();
1 2 User loginUser = userService.getLoginUser(request); ThrowUtils.throwIf(loginUser == null , ErrorCode.NOT_LOGIN_ERROR, "请先登录后再尝试调用接口" );
限流(限制用户的调用次数,以用户id为key,区分各个限流器):
1 2 redisLimiterManager.doRateLimit("genCharByAi_" + loginUser.getId());
提取图表名信息、分析需求(分析目标 图表类型),做好参数校验:
1 2 3 4 5 6 ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100 , ErrorCode.PARAMS_ERROR, "名称过长" ); ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "分析目标为空" ); ThrowUtils.throwIf(StringUtils.isBlank(chartType), ErrorCode.PARAMS_ERROR, "分析图表类型为空" );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 long size = multipartFile.getSize(); ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M" );String originalFilename = multipartFile.getOriginalFilename();String suffix = FileUtil.getSuffix(originalFilename); ThrowUtils.throwIf(!VALID_FILE_SUFFIX_LIST.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法" );StringBuilder userInput = new StringBuilder ();String excelToCsv = ExcelUtils.excelToCsv(multipartFile); userInput.append("\n" ) .append("分析需求:" ).append("\n" ) .append(goal).append(", " ).append("请生成一张" ).append(chartType).append("\n" ) .append("原始数据:" ).append("\n" ) .append(excelToCsv);
1 2 3 4 5 6 7 8 9 Chart chart = new Chart (); chart.setName(name); chart.setGoal(goal); chart.setChartType(chartType); chart.setChartData(excelToCsv); chart.setUserId(loginUser.getId()); chart.setStatus(ChartStatusEnum.WAIT.getValue());boolean save = save(chart); ThrowUtils.throwIf(!save, ErrorCode.OPERATION_DATABASE_ERROR, "插入图表信息失败" );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 CompletableFuture.runAsync(() -> { Chart updateChart = new Chart (); updateChart.setId(chart.getId()); updateChart.setStatus(ChartStatusEnum.RUNNING.getValue()); boolean updateById = updateById(updateChart); ThrowUtils.throwIf(!updateById, ErrorCode.OPERATION_DATABASE_ERROR, "更新图表执行中状态失败" ); String result = aiManager.doChat(AiConstant.BI_MODEL_ID, userInput.toString()); if (result == null ) { Chart updateChartResult = new Chart (); updateChartResult.setId(chart.getId()); updateChartResult.setStatus(ChartStatusEnum.FAILED.getValue()); boolean updateResultById = updateById(updateChartResult); ThrowUtils.throwIf(!updateResultById, ErrorCode.OPERATION_DATABASE_ERROR, "更新图表失败状态失败" ); } String[] split = result.split("【【【【【" ); String genChart = split[1 ]; String genResult = split[2 ]; Chart updateChartResult = new Chart (); updateChartResult.setId(chart.getId()); updateChartResult.setStatus(ChartStatusEnum.SUCCEED.getValue()); updateChartResult.setGenChart(genChart); updateChartResult.setGenResult(genResult); boolean updateResultById = updateById(updateChartResult); ThrowUtils.throwIf(!updateResultById, ErrorCode.OPERATION_DATABASE_ERROR, "更新图表成功状态失败" ); }, threadPoolExecutor);
1 2 3 4 BiResponse biResponse = new BiResponse (); biResponse.setUserId(loginUser.getId());return biResponse;
异步编程 CompletableFuture 简介 概述 优势与特点 CompletableFuture 基本用法 小试牛刀
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ThreadTask implements Runnable { public static int index = 0 ; public ThreadTask () { } public void run () { Thread.currentThread().setName("MyThread: " + index++); System.out.println(Thread.currentThread().getName()); try { Thread.sleep(3000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } }
创建线程池,新增线程并使用 CompletableFuture 对象异步执行线程,实现并发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ThreadPool3 { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (2 , 4 , 1000 , TimeUnit.MILLISECONDS, new ArrayBlockingQueue <Runnable>(4 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy()); for (int i = 0 ; i < 8 ; i++) { CompletableFuture.runAsync(new ThreadTask (), threadPoolExecutor); } } }
在上面的代码中,我们还使用了三种方法 来处理异步执行任务 :
threadPoolExecutor.execute(new ThreadTask())
: 这个方法将新的任务提交给线程池来执行。新的任务将创建一个新的线程来执行。如果线程池已满,新提交的任务将会等待现有任务完成后再开始执行。
Future<?> submit = threadPoolExecutor.submit(new ThreadTask())
: 这个方法将新的任务提交给线程池,但不立即执行,而是返回一个Future对象。Future对象表示任务的结果。在等待任务结果时,你还可以对其他任务进行调度,这是使用Future的优点。如果线程池已满,新提交的任务也会等待现有任务完成后再开始执行。
CompletableFuture.runAsync(new ThreadTask(), threadPoolExecutor)
: 这个方法用于提交一个异步任务到线程池中执行。这个方法创建了一个 CompletableFuture 对象,该对象允许你使用回调或future对象来等待任务的完成并获取结果。这种方法的优点是可以并发执行多个任务,无需等待上一个任务完成就开始下一个任务。
先简单了解一下吧,稍后我们会详细说明的(2023/10/22晚)
运行程序,得到执行结果如下:
创建 CompletableFuture 对象 获取任务结果 异步回调 多任务组合回调 异常处理与错误处理 自定义线程池与资源管理 并发任务的调度与控制 IO操作与网络请求 Java 锁机制
🔥 推荐阅读:(2023/10/24晚)
总结
悲观锁:悲观锁总是假设最坏的情况 ,认为在高并发情况下,共享变量每次被访问都会出现问题,所以每个线程在获取资源时都会加锁。激烈的锁竞争会导致线程阻塞和频繁的上下文切换,大大增加系统的性能开销,还可能引发死锁问题
乐观锁:乐观锁总是假设最好的情况 ,认为共享资源被访问时不会出现问题,线程无需加锁也无需等待,不存在锁竞争和线程阻塞。只需在提交修改时确认该资源是否被其他线程修改了,这就是验证线程之间是否发生冲突了,一般采用版本号机制 或 CAS 算法实现 。如果线程提交修改频繁失败、频繁重试,同样会大大增加系统性能开销。
乐观锁还存在 ABA 问题 ,即错误地判断要修改的资源没有被其他线程修改,可以通过追加版本号 或引入时间戳 来解决
总体来说,悲观锁适用于写比较多 的场景,避免频繁失败和频繁重试影响性能;乐观锁适用于写比较少 的情况,避免频繁加锁影响性能(2023/10/27晚)
亮点集锦