如果遇到高耗时操作肯定首先想到的就是通过多线程来实现,但是如果再加上一层事务控制就比较难办。之前根据二阶段提交的思路实现了多线程事务的控制,感觉好像没问题了。
但是最近遇到一个问题,看起来很无解。如果任务量少,需要使用的线程数量也少,这种场景通过二阶段提交实现多线程事务控制是没有问题的。
但是如果任务量很多的情况下:线程池的池子内部线程数量是有限的,所以任务量再大,同时在运行的也只能是线程池容量大小的任务数。在这个前提下,二阶段提交是需要等所有线程一阶段执行结束后,进入二阶段,统一回滚或者提交,回滚或者提交都是需要在各自线程内部进行。
比如任务量是 20 个,线程池容量是 10 ,一次性只能有 10 个任务在执行,这个限制下,仅仅能实现 10 个任务的统一提交或者回滚。描述大概就这样,需要代码的话再贴
看有没有别的思路,我卡住了。
1
BuffDog 336 天前
资源限制,1.要么排队处理,要么直接抛弃 2.扩大资源限制
菜鸟想法 |
2
janwarlen 336 天前
通过 sql 手动开启事务和提交就可以了
|
3
RichardX2023 OP @janwarlen 确实是手动提交的,但是问题还是存在的
|
4
RichardX2023 OP @BuffDog 1 、排队,20 个任务,线程池只能处理 10 个处理结束后就得考虑提交或者回滚了,然后 10 个任务结束,继续下面 10 个任务。假如前 10 个任务提交,后 10 个需要回滚,前面 10 个任务的提交与后面的无法做到事务的一致性
|
5
RichardX2023 OP |
6
MakHoCheung 336 天前 1
你是一个任务内部开启一个事务,为啥不是一个事务内执行这 20 个任务呢?
另外,看代码,线程池处理完 10 个任务后 CountDownLatch 不会放行啊 |
7
BuffDog 336 天前
@RichardX2023 那你这堆有关联的任务,是不是得走在一个事务里
|
8
ppto 336 天前
按我的理解,多线程事务 等同于 分布式事务 最后就是最终一致性。
|
9
BQsummer 336 天前
@MakHoCheung spring 的事务信息是在 threadlocal 里的, 前 10 个任务不放行, 后 10 个任务就没线程取执行了; 释放了, 事务就没法提交或回滚了
|
10
RichardX2023 OP @BQsummer 是滴,目前我的观点看,二阶段提交无解
|
11
RichardX2023 OP @MakHoCheung 问题不是因为耗时吗,需要节省时间就需要多线程。多线程就得多事务
|
12
RichardX2023 OP @BuffDog 多个事务,只是需要协同控制一起提交和回滚
|
13
RichardX2023 OP @ppto 对的,我这个实现的思路就是来自 seata ,阿里的那个分布式事务服务。
|
14
BQsummer 336 天前
不要用声明式事务, 自己管理, 比如 https://developer.aliyun.com/article/1203834
|
15
RichardX2023 OP |
16
RichardX2023 OP 好,我研究下,忘记还有这种方式了
|
17
RichardX2023 OP @BQsummer 好的,忘记还有这种方式了
|
18
MakHoCheung 336 天前
@RichardX2023 哦,懂了,那就是相当于分布式事务
|
19
ZZ74 336 天前
这和线程多少有什么关系?也不需要分布式事务。这一批任务无论多少,开多少线程,从头到尾只是用一个数据库连接。 开始时关闭自动提交,所有任务跑完 commit 一下就好了
|
20
jli100 336 天前
@RichardX2023 问一下,你贴的这个完整代码,不是跟你上面贴的有问题的代码差不多嘛。具体差别是 ?
|
21
monmon 336 天前
我理解你想要的可能是下面这两种方法
普通多线程处理的方法 ``` Java TaskExecutor taskExecutor = SpringUtil.getTaskExecutor(); CompletableFuture<Integer>[] tasks = new CompletableFuture[100]; for (int i = 0; i < 100; i++) { int fi = i; tasks[i] = CompletableFuture.supplyAsync(() -> fi / 2, taskExecutor); } // 等待所有任务完成 CompletableFuture.allOf(tasks); for (CompletableFuture<Integer> task : tasks) { Integer result = null; try { result = task.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } System.out.println(result); } ``` 如果使用了 JDK21 可以尝试结构化并发 Shutdown policies 处理 https://openjdk.org/jeps/453#:~:text=custom%20shutdown%20policies.-,Shutdown%20policies,-When%20dealing%20with |
22
RichardX2023 OP @ZZ74 当然有关系,所有任务跑完 commit ,线程池只有 10 个线程一次只能执行 10 个任务怎么全跑完,10 个任务执行完了是 commit 还是 rollback ,后边还有任务呢
|
23
RichardX2023 OP @jli100 差不多,能看得清晰一点
|
24
RichardX2023 OP @monmon 老项目升级 jdk 比较困难
|
25
Plutooo 333 天前
好问题,但是我能想到的跟大伙想到的差不多,无非就是扩大线程池或者使用无边界线程池,在业务上控制提交任务的数量避免资源耗尽,楼主要是有好的解决办法了圈一下
|
26
RichardX2023 OP @BQsummer https://developer.aliyun.com/article/1203834
这种方式的确是可行的,它实实在在的解决了事务的提交回滚在各个子线程处理的问题,但是,它也有自己的局限性, ```sqlSession.getMapper() ```问题就在这行代码,就是说只有通过这个方法获取的 Mapper 才能进行事务控制,通过 spring 周期管理的 Mapper 还是不受这个事务控制的,这个方法获取的 Mapper 是通过代理又创建了一个新的实例。 那么它的实现逻辑也就不能实现我的诉求,我希望是这个多线程工具对业务代码没有侵入性,也就是业务代码只要实现一个 Runnable 或者 Callable 就可以直接使用,这个显然需要对业务代码进行改动的 |
29
RichardX2023 OP |
30
RichardX2023 OP 多线程操作适合用于查询、插入、删除,可能不适合更新。
更新操作数据库会自动上行锁,假如两个任务操作了同一行数据。 任务一拿到行锁-完成任务-等待任务二完成一起 commit 并释放锁, 任务二等待行锁占用者释放行锁,它得拿到锁才能进行更新。 所以任务一 一直等待事务提交释放行锁 任务二一直处于行锁等待状态。 这就是一个死锁状态。 |
31
ppto 332 天前
我理解不是这样,例如一个 mysql 连接,被 A 线程 和 B 线程共享,A 线程发起一个查询请求,mysql 服务返回查询结果。这时,可能是 B 线程因为 fd 可读事件被唤醒,也可能是 A ,或者 A 和 B 都会被唤醒。我记不太清了但是倾向于 AB 都会因为 fd 可读事件被唤醒。
|
32
ZZ74 332 天前
@ppto
OP 和我们说的不在同一个线~~~~~ 所以我才说上锁 lock conn exec select & read result unlock 这样就不会了。 @RichardX2023 共享同一个数据连接的两个任务 是不会死锁的。 |
33
Aresxue 332 天前
@RichardX2023 核心原因是 java.sql.Connection 不是线程安全的,所以每个线程只能独占一个链接,对于你说的 20 个任务 10 个 work 线程的情况可以对线程池加以改造,提交任务(必须是批量提交)的时候根据剩余可用连接分配一定数目的链接(这个算法决定了这个策略的健壮性),比如 20 个任务可以分配 5 个链接,然后每 4 个任务共享一个链接,当然这四个之间就要排队了对耗时的优化肯定没有一个任务一个线程效果这么好,但并发度确实也从 1 变成了 5 。
|
34
siweipancc 332 天前 via iPhone
@monmon 昨天在看虚拟线程,第三方库甚至 spring 对这个的兼容也不是很好,瞅了一下结构化这个孵化项目,我觉得会难产,除非 JPA4.0 规范大改。
当然结构化对楼主这种需求确实是符合的(可以实现快速失败),还有个局部作用域值(预览版本)可以使用。 |
35
RichardX2023 OP |
36
ZZ74 332 天前
@RichardX2023
trx_id 说明了一切.... |
37
RichardX2023 OP @ZZ74 说明什么,我从头到尾写的多线程,本来就是多事务
|