mysql日志bingLog,redoLog,undoLog,java监听binlog

mysql日志bingLog,redoLog,undoLog

Posted by John Doe on 2023-04-17
Words 6.4k and Reading Time 27 Minutes
Viewed Times

binlog日志

binlog用于记录数据库执行的写入性操作(不包括查询)信息,以二进制的形式保存在磁盘中。binlog是mysql的逻辑日志,并且由Server层进行记录,使用任何存储引擎的mysql数据库都会记录binlog日志。

  • 逻辑日志:可以简单理解为记录的就是sql语句。
  • 物理日志:因为mysql数据最终是保存在数据页中的,物理日志记录的就是数据页变更。

binlog是通过追加的方式进行写入的,可以通过max_binlog_size参数设置每个binlog文件的大小,当文件大小达到给定值之后,会生成新的文件来保存日志。

binlog使用场景

在实际应用中,binlog的主要使用场景有两个,分别是主从复制和数据恢复。

  • 主从复制:在Master端开启binlog,然后将binlog发送到各个Slave端,Slave端重放binlog从而达到主从数据一致。
  • 数据恢复:通过使用mysqlbinlog工具来恢复数据。

binlog刷盘时机

对于InnoDB存储引擎而言,只有在事务提交时才会记录biglog,此时记录还在内存中,那么biglog是什么时候刷到磁盘中的呢?mysql通过sync_binlog参数控制biglog的刷盘时机,取值范围是0-N:

  • 0:不去强制要求,由系统自行判断何时写入磁盘;
  • 1:每次commit的时候都要将binlog写入磁盘;
  • N:每N个事务,才会将binlog写入磁盘。

从上面可以看出,sync_binlog最安全的是设置是1,这也是MySQL 5.7.7之后版本的默认值。但是设置一个大一些的值可以提升数据库性能,因此实际情况下也可以将值适当调大,牺牲一定的一致性来获取更好的性能。

binlog日志格式

binlog日志有三种格式,分别为STATMENT、ROW和MIXED。

在 MySQL 5.7.7之前,默认的格式是STATEMENT,MySQL 5.7.7之后,默认值是ROW。日志格式通过binlog-format指定。

STATMENT

基于SQL语句的复制(statement-based replication, SBR),每一条会修改数据的sql语句会记录到binlog中。

  • 优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO, 从而提高了性能;
  • 缺点:在某些情况下会导致主从数据不一致,比如执行sysdate()、sleep()等。

ROW

基于行的复制(row-based replication, RBR),不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了。

  • 优点:不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题;
  • 缺点:会产生大量的日志,尤其是alter table的时候会让日志暴涨

MIXED

基于STATMENT和ROW两种模式的混合复制(mixed-based replication, MBR),一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog

redolog日志

为什么需要redolog

我们都知道,事务的四大特性里面有一个是持久性,具体来说就是只要事务提交成功,那么对数据库做的修改就被永久保存下来了,不可能因为任何原因再回到原来的状态。那么mysql是如何保证一致性的呢?最简单的做法是在每次事务提交的时候,将该事务涉及修改的数据页全部刷新到磁盘中。但是这么做会有严重的性能问题,主要体现在两个方面:

  • 因为Innodb是以页为单位进行磁盘交互的,而一个事务很可能只修改一个数据页里面的几个字节,这个时候将完整的数据页刷到磁盘的话,太浪费资源了!
  • 一个事务可能涉及修改多个数据页,并且这些数据页在物理上并不连续,使用随机IO写入性能太差!

因此mysql设计了redo log,具体来说就是只记录事务对数据页做了哪些修改,这样就能完美地解决性能问题了(相对而言文件更小并且是顺序IO)。

redolog 在磁盘里的结构

redolog 在内存中的存放位置叫 log buffer,我们为了了解 redolog 应该在哪个页的哪个偏移量写,提供了一个叫 buf_free 的全局变量,该变量指明后续写入的 redo 日志应该写到 log buffer 的哪个位置

那 redolog 在磁盘中又是如何存储的呢?

硬盘上存储的 redo log 日志文件不只一个,而是以一个日志文件组的形式出现的,每个的 redo 日志文件的大小都是一样的

比如可以配置为一组4个文件,每个文件的大小是 1GB,整个 redo log 日志文件组可以记录 4G 的内容

每个文件中都包含了多个页,并且这些页负责的内容可能不一样,每个文件的前2048个字节(也就是前四页)用来存储一些管理信息,之后的页就是用来存储 redolog 的普通页了

日志文件组采用的是环形数组形式,从头开始写,写到末尾又回到头循环写.为了管理这个日志文件组,我们显然需要一些全局变量,比如记录了当前写在哪里的偏移量,一共写了多少数据等等内容

redolog 相关全局变量

lsn,也就是 log sequence number 这个全局变量是记录当前总共写入的 redo 日志量的。在 mysql 开机时,这个值为 8704(一条数据都没写入 lsn 就是8704)。每次写入 x 字节的数据,这个值就加 x。同时,他还会将遇到的 log block header 和 log block trailer 占用的字节数加上(也就是页头和页尾占用的字节数)

而 buf_free 是下一次写入的记录的偏移量,一边写一边后移,它用来记录当前事务产生的 redo log 文件

flushed_to_disk_lsn 是用来标记当前 log buffer 中已经有那些日志被刷新到磁盘中了,该值表达的是内存中该值一开始也是8704,因此 lsn >= flushed_to_disk_lsn。log buffer 就是在磁盘中存放 redolog 的地方,log buffer 比之日志文件组就类似于 buffer pool 比之磁盘中的 Innodb

checkpoint 指的是一次刷新全局变量 checkpoint_lsn 的操作,MySQL 中可以使用 lsn 来唯一确定 redolog 位置,而 checkpoint_lsn 就指向当前可以被擦除的位置。日志文件组的大小是有限的,我们不得不循环使用日志文件组中的文件,但是如果某些日志在该日志代表的数据被刷入磁盘之前就被清理掉了,日志就没有意义了。因此可以被擦除的地方就是数据已经刷入磁盘的地方,一边擦一边往后推移,MySQL 加载日志文件组恢复数据时,会清空加载过的 redo log 记录。如果 MySQL 一直不崩溃,redolog 记录满了的话,MySQL会自动刷盘并且删除一些 redo log 记录,让 checkpoint_lsn 向后推

注意,执行一次 checkpoint 与将脏页刷新到磁盘中是不同步的,因此 checkpoint_lsn 不能代表刷新到磁盘中的数据的最新位置

redolog 恢复数据库的过程

至此,我们对数据库有了一个完整的日志记录,那如何使用这个日志记录功能呢,我们如何将数据从这个日志文件组中取出来并且用于恢复数据库呢

由于之前的全局变量与日志文件组配合记录,在 MySQL 中已经有了充足的信息。我们可以拿到需要恢复的起点(全局变量 checkpoint_lsn),和恢复的终点(可以用 lsn 来表示)。这两个值都能唯一确定一个日志文件组中的位置,并且里面的数据都保证正确性(每一条数据的末尾都有唯一标识),不会出现没有被刷入磁盘的数据对应的日志被刷掉的情况(checkpoint)

获取到日志后,我们可以一条条读取数据并且恢复,但是 MySQL 的设计者有更加快速的方法,将每个日志的 spaceID 与 page number 计算出哈希值并且存入 hash 表中,如果有多个 spaceID 与 page number 都相同的日志,将它们放入一个槽中。这样遍历槽,就可以一次性将一个页面恢复好,从而避免很多随机 IO,加快了恢复速度

这么恢复还有一点要注意,我们需要根据时间来恢复,不然最终的数据不保证正确性。同时,由于 checkpoint_lsn 与刷入磁盘的机制不一样,因此可能出现数据已经刷入了,但是日志还需要重新操作一遍的情况。重新操作一遍不会导致任何错误,但是可以优化掉这些过程,在每个页面中的文件头有个 FIL_PAGE_LSN 的属性,该属性记录了最新一次对该页面修改的日志的 lsn。我们只需要简单的判断就可以略过重新写一遍的过程了

undolog日志

数据库事务四大特性中有一个是原子性,具体来说就是 原子性是指对数据库的一系列操作,要么全部成功,要么全部失败,不可能出现部分成功的情况。

实际上,原子性底层就是通过undo log实现的。undo log主要记录了数据的逻辑变化,比如一条INSERT语句,对应一条DELETE的undo log,对于每个UPDATE语句,对应一条相反的UPDATE的undo log,这样在发生错误时,就能回滚到事务之前的数据状态。

undolog 的作用

为什么有了 redolog 还需要 undolog?这就要从磁盘性能的优化说起

Commit Logging 有一个巨大的先天缺陷:所有对数据的真实修改都必须发生在事务提交以后,即日志写入了 Commit Record 之后。在此之前,即使磁盘有足够空闲,即使某个事务修改的数据量非常庞大,占用了大量的内存缓冲区,无论何种理由,都决不允许在事务提交之前就修改磁盘上的数据,这一点是 Commit Logging 成立的前提(因为我们我们不能使用 redolog 删除错误的记录,只能用它重做正确的记录),却对提升数据库的性能十分不利。为此,ARIES 提出了“提前写入日志”(Write-Ahead Logging)的日志改进方案,所谓“提前写入”(Write-Ahead),就是允许在事务提交之前写入变动数据的意思

Write-Ahead Logging 按照事务提交时点,将何时写入变动数据划分为 FORCE 和 STEAL 两类情况
·FORCE:当事务提交后,要求变动数据必须同时完成写入则称为 FORCE,如果不强制变动数据必须同时完成写入则称为 NO-FORCE
·STEAL:在事务提交前,允许变动数据提前写入则称为 STEAL,不允许则称为 NO-STEAL

Write-Ahead Logging 允许 NO-FORCE,也允许 STEAL,它给出的解决办法是增加了另一种被称为 Undo Log 的日志类型,当变动数据写入磁盘前,必须先记录 Undo Log,注明修改了哪个位置的数据、从什么值改成什么值等,以便在事务回滚或者崩溃恢复时根据 Undo Log 对提前写入的数据变动进行擦除

由于 Undo Log 的加入,Write-Ahead Logging 在崩溃恢复时会经历以下三个阶段

分析阶段(Analysis):该阶段从最后一次检查点(Checkpoint,可理解为在这个点之前所有应该持久化的变动都已安全落盘)开始扫描日志,找出所有没有 End Record 的事务,组成待恢复的事务集合,这个集合至少会包括事务表(Transaction Table)和脏页表(Dirty Page Table)两个组成部分
重做阶段(Redo):该阶段依据分析阶段中产生的待恢复的事务集合来重演历史(Repeat History),具体操作是找出所有包含 Commit Record 的日志,将这些日志修改的数据写入磁盘,写入完成后在日志中增加一条 End Record,然后移出待恢复事务集合
回滚阶段(Undo):该阶段处理经过分析、重做阶段后剩余的恢复事务集合,此时剩下的都是需要回滚的事务,它们被称为 Loser,根据 Undo Log 中的信息,将已经提前写入磁盘的信息重新改写回去,以达到回滚这些 Loser 事务的目的

undolog 的链表

一个页面只能存储一种类型的 undo 日志,不可以混合存储,之所以做出区分,是因为 insert 类型的日志可以在事务提交后直接删除,而 update 类型的由于需要为 MVCC 服务,因此要区别对待

trx_undo_page_start:第一条undo日志在本页面中的起始偏移量
trx_undo_page_free:最后一条undo日志结束时偏移量
trx_undo_page_node:链表节点结构
从页面的角度上来说,需要注意的点就这些了,但是从事务的角度上说,知识点还没结束。一个事务可能产生很多日志,这些日志在一个页面中可能放不下,那么就需要放到更多的页面中,这些页面就通过 trx_undo_page_node 形成了一个链表

链表中的第一个 undo 页面称为 first undo page,其余称为 normal undo page,因为第一个页面除了 undo page header 还有一些其他的管理信息,即 undo log segment header

一个事务的执行过程中,增删改的操作都会有,因为一个 undo 页面只能存放一种类型,所以一个事务的执行过程中可能有两种链表,一个是 insert undo 链表,一个是 update undo 链表

此外,Innodb 还规定,普通表和临时表的 undo 日志也要分别记录,所以一个事务中如果同时对临时表,普通表进行增删改操作,就会有4个链表

undo log segment header 拥有的部分属性如下:

1,trx_undo_state:本 undo 页面链表处于什么状态,我们可以用该属性了解事务是否结束,可能的状态有下面几种:

trx_undo_active:活跃状态,即一个活跃的事务正在向这个Undo页面链表中写入Undo日志。
trx_undo_cached:被缓存状态,该状态的Undo页面链表等待被其他事务重用。
trx_undo_to_free:等到被释放的状态,对于insert undo类型,在其对应的事务提交后,该链表不会被重用,就是这种状态
trx_undo_purge:等待被purge的状态,对于update undo类型,如果在其对应的事物提交后,该链表不能被重用,则处于这种状态
2,trx_undo_fseg_header:本 Undo 页面链表对应的段的 Segment Header 信息

3,trx_undo_page_list:Undo 页面链表的基节点,用于串联起其他页面的 trx_undo_page_node 属性,形成一个链表

因此,一个事务的 undolog 不是由页为单位来管理的,而是由链表来管理的,那应该需要一个整合的地方来管理这些链表吧,我们提出了回滚段的概念。回滚段就是被称为 Rollback Segment Header 的页面,这个页面中存放了各个 Undo 页面链表的 first undo page 的页号,这些页号被称为 undo slot。这样我们就可以提供 slot 来找到对应的链表头了

一个事务在执行过程中最多分配4个undo页面链表,一个回滚段中只有1024个undo slot,意味着同时只支持1024个事务的并发,

为了支持更多的事务执行,Innodb定义了128个回滚段,因此可以支持更多的事务,这些回滚段的页面存在系统表空间的第五个页面的一个区域中

事务分配 undolog 的过程

事务首次修改普通表的记录时,先去系统表空间的5号页面中分配到一个回滚段,之后该事务再修改记录时,不会重复分配,多个回滚段的分配方式使用 round-robin 来分配,从第一大类中循环分配回滚段给多个事务

分配到回滚段后,查看回滚段的两个 cached 链表是否有缓存的 undo slot,不同的操作看不同的链表,insert 类的看 insert undo cached,update 类型的看 update undo cached

如果在缓存中没找到,就从回滚段中分配一个可用的 undo slot

找到可用的 undo slot,如果该 slot 是从缓存链表中获取的,其 Undo Log Segment 已经分配,否则就需要重新分配一个 Undo Log Segment,然后从该 Segment 中申请一个页面作为 Undo 页面链表的 first undo page,并把该页填入 undo slot 中

事务开始写入日志到 Undo 页面链表中

shyiko监听解析binlog日志

shyiko项目地址

binlog监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.google.common.collect.Maps;
import com.pwc.sdc.FPA.fms.event.AssembleCostEvent;
import com.pwc.sdc.FPA.fms.event.AssembleHeadcountEvent;
import com.pwc.sdc.FPA.utils.FPAStringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;


@Slf4j
@Component
public class BinLogListener implements ApplicationRunner {

//主库地址
@NacosValue("${spring.datasource.master.url}")
private String url;

//主库username
@NacosValue("${spring.datasource.master.username}")
private String username;

//主库密码
@NacosValue("${spring.datasource.master.password}")
private String password;

@Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor executor;

@Autowired
private AssembleEvent assembleEvent;

private final HashMap<String, String> tableInfo = Maps.newHashMap();


@Override
public void run(ApplicationArguments args) {
assembleEvent.doUpdate(executor);
CompletableFuture.runAsync(this::connectMysqlBinLog, executor);
}

/**
* 连接mysqlBinLog
*/
public void connectMysqlBinLog() {
log.info("监控BinLog服务已启动");
Map<String, String> jdbcInfo = FPAStringUtils.getJdbcInfo(url);
BinaryLogClient client = new BinaryLogClient(jdbcInfo.get("host"), Integer.parseInt(jdbcInfo.get("port")), username, password);
client.setServerId(1);
client.setKeepAlive(true);
client.registerEventListener(event -> event(jdbcInfo, event));
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}

}

private void event(Map<String, String> jdbcInfo, Event event) {
EventData data = event.getData();
TableMapEventData tableMapEventData;
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
tableMapEventData = (TableMapEventData) data;
String database = tableMapEventData.getDatabase();
String table = tableMapEventData.getTable();
tableInfo.put("database", database);
tableInfo.put("table", table);
}
assembleHeadcountEvent.assembleHeadcount(jdbcInfo, tableInfo, event, executor);
assembleCostEvent.assembleCost(jdbcInfo, tableInfo, event, executor);
}

}

binlog事件处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Component
public class AssembleEvent {
private final List<String> TABLE_NAME = Lists.newArrayList("LISTENER_DATABASE");
@Autowired
private DatabaseUpdateService databaseUpdateService;

public void assembleCost(Map<String, String> jdbcInfo, HashMap<String, String> map, Event event, Executor executor) {
// 当LISTENER_DATABASE表发生变化
if (StringUtils.equals(jdbcInfo.get("database"), map.get("database")) && TABLE_NAME.contains(map.get("table"))) {
// XID表示事务提交,即当LISTENER_DATABASE发生事务提交走下面分支
if (event.getHeader().getEventType() == EventType.XID) {
log.info("监听到变化,进行关联更新");
try {
this.doUpdate(executor);
} finally {
map.clear();
}
}
}
}

public void doUpdate(Executor executor) {
//调用mapper更新sql
CompletableFuture.runAsync(() -> databaseUpdateService.update(), executor);
}

}

binlog数据转换器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@Component
@Slf4j
public class BaseConverter implements Converter<BinlogData<List<Map<String, Object>>>> {
@Autowired
private ResourceLoader resourceLoader;
@Autowired
private JdbcTemplate jdbcTemplate;
@Value("${binglog.convert.model.package}")
private String packageName;
@Autowired
private CodeUtil codeUtil;
@Autowired
private DbProperties dbProperties;
/**
* 需要转成的model对象
*/
private List<Class> classes;
/**
* 拼接sql常量
*/
private static final String SQL_STATEMENT_SELECT = "SELECT ";
private static final String SQL_STATEMENT_FROM = " FROM ";
private static final String SQL_STATEMENT_WHERE = " WHERE ";
private static final String SQL_STATEMENT_LIMIT = " LIMIT 1";
@PostConstruct
public void init() {
classes = getClass(packageName, resourceLoader);
}
@Override
public List convert(BinlogData<List<Map<String, Object>>> data) throws Exception {
for (Class clazz : classes) {
Table tableAnnotation = (Table) clazz.getAnnotation(Table.class);
if (tableAnnotation != null && StringUtils.isNotBlank(tableAnnotation.name())) {
String dbName = dbProperties.getValue(tableAnnotation.catalog());//获取实际数据库名称
if (data.getDatabase().equals(dbName) && tableAnnotation.name().equals(data.getTable())) {
// 找到库 需要处理的表(data.getTable())对应的model
return setBeanProperties(data.getData(), clazz);
}
}
}
return null;
}
private List setBeanProperties(List<Map<String, Object>> list, Class clazz) {
List target = new ArrayList();
try {
Field[] fields = clazz.getDeclaredFields();
for (Object object : list) {
target.add(clazz.newInstance());
}
List<Field> fieldList = new ArrayList<>();
for (Field field : fields) {
Column column = field.getAnnotation(Column.class);
if (column != null) {
// 该字段需要通过配置的库表查询
if ( StringUtils.isNotBlank(column.table())) {
// 需要查询该字段的值,本数据未包含该字段值,查询可能以来本对象其他字段值
// 先设置不需要查库的字段(查库字段可能以来这些字段得值)
// setFieldFromTable(field, target, column);
fieldList.add(field);
} else {
setFieldFromBinlog(field, target, list, column);
}
}
}
// cachedMap 用于缓存,多个字段值依赖同一条数据库数据
Map<String, Map<String, Object>> cachedMap = new HashMap<>();
for (Field field : fieldList) {
Column column = field.getAnnotation(Column.class);
if (column != null) {
// 该字段需要通过配置的库表查询
if ( StringUtils.isNotBlank(column.table()) && StringUtils.isNotBlank(column.whereCause())) {
// 需要查询该字段的值,本数据未包含该字段值
setFieldFromTable(field, cachedMap, list, target, column);
}
}
}
} catch (Exception e) {
log.info("", e);
}
return target;
}
/**
* 类型转换
*
* @param field
* @param target
* @param list
* @param column
*/
private void setFieldFromBinlog(Field field, List target, List<Map<String, Object>> list, Column column) {
if (StringUtils.isNotBlank(column.name())) {
for (int i = 0; i < list.size(); i++) {
try {
Object value = list.get(i).get(column.name());
if (value != null) {
field.setAccessible(true);
log.debug("field:{},value:{}", field.getName(), column.name());
BinLogDict binLogDict = field.getAnnotation(BinLogDict.class);//判断是否需要翻译
if (binLogDict != null) {
Object companyCode = codeUtil.getCountryCode(binLogDict.codeType(), value, field.getType());
log.info("get companyCode fail value:{} companyCode:{}", value, companyCode);
field.set(target.get(i), companyCode);
} else {
field.set(target.get(i), value);
}
field.setAccessible(false);
}
} catch (Exception e) {
log.info("", e);
}
}
}
}
/**
* 数据库类型转换
* 该字段从数据库查询设置
* ${id} 参数取本对象的id 值
* #{id} 参数取binlog 过来的id 值
* <p>
* <p>
* // 本对象缓存(db.table.columnName.value)
* // 本对象缓存(db.table.id.3)
*
* @param field
* @param list
* @param column
*/
private void setFieldFromTable(Field field, Map<String, Map<String, Object>> cachedMap, List<Map<String, Object>> datas, List list, Column column) {
try {
field.setAccessible(true);
String dbName = dbProperties.getValue(column.catalog());//获取实际数据库名称
if (column.whereCause().contains("#")) {
// 需要依赖对象binlog字段值
int startLocation = column.whereCause().indexOf("#{") + 2;
int endLocation = column.whereCause().indexOf("}");
String refFieldName = column.whereCause().substring(startLocation, endLocation);
for (int i = 0; i < list.size(); i++) {
Object target = list.get(i);
StringBuilder sql = new StringBuilder();
sql.append(SQL_STATEMENT_SELECT).append("*").append(SQL_STATEMENT_FROM).append(dbName).append(".").append(column.table()).append(SQL_STATEMENT_WHERE);
Object refFieldValue = datas.get(i).get(refFieldName);
if (refFieldValue == null) {
continue;
}
// 本对象缓存(db.table.columnName.value)
// 本对象缓存(db.table.id.3)
String cacheKey = dbName + "." + column.table() + "." + refFieldName + "." + refFieldValue;
Map<String, Object> dataMap;
if (cachedMap.containsKey(cacheKey)) {
dataMap = cachedMap.get(cacheKey);
log.debug("命中缓存:cache_key:{}", cacheKey);
} else {
log.debug("cache_key:{}", cacheKey);
if (refFieldValue instanceof String) {
refFieldValue = "'" + refFieldValue + "'";
}
String whereCause = column.whereCause().replace("#{" + refFieldName + "}", String.valueOf(refFieldValue));
sql.append(whereCause).append(SQL_STATEMENT_LIMIT);
log.debug("查询字段:{},sql:{}", field.getName(), sql);
dataMap = jdbcTemplate.queryForMap(sql.toString());
if (dataMap != null) {
cachedMap.put(cacheKey, dataMap);
}
}
if (dataMap != null) {
log.debug("field:{},value:{}", field.getName(), column.name());
Object value = dataMap.get(column.name());
BinLogDict binLogDict = field.getAnnotation(BinLogDict.class);//判断是否需要翻译
if (binLogDict != null) {
Object countryCode = codeUtil.getCountryCode(binLogDict.codeType(), value, field.getType());
log.info("get companyCode fail value:{} companyCode:{}", value, countryCode);
field.set(target, countryCode);
} else {
field.set(target, value);
}
}
}
} else if (column.whereCause().contains("$")) {
// 需要依赖对象自身字段值
int startLocation = column.whereCause().indexOf("${") + 2;
int endLocation = column.whereCause().indexOf("}");
String refFieldName = column.whereCause().substring(startLocation, endLocation);
for (Object target : list) {
StringBuilder sql = new StringBuilder();
sql.append(SQL_STATEMENT_SELECT).append("*").append(SQL_STATEMENT_FROM).append(dbName).append(".").append(column.table()).append(SQL_STATEMENT_WHERE);
Field refField = target.getClass().getDeclaredField(refFieldName);
refField.setAccessible(true);
Object refFieldValue = refField.get(target);
if (refFieldValue == null) {
continue;
}
// 本对象缓存(db.table.columnName.value)
// 本对象缓存(db.table.id.3)
String cacheKey = dbName + "." + column.table() + "." + refFieldName + "." + refFieldValue;
Map<String, Object> dataMap;
if (cachedMap.containsKey(cacheKey)) {
dataMap = cachedMap.get(cacheKey);
log.debug("命中缓存:cache_key:{}", cacheKey);
} else {
if (refFieldValue instanceof String) {
refFieldValue = "'" + refFieldValue + "'";
}
String whereCause = column.whereCause().replace("${" + refFieldName + "}", String.valueOf(refFieldValue));
sql.append(whereCause).append(SQL_STATEMENT_LIMIT);
log.debug("查询字段:{},sql:{}", field.getName(), sql);
dataMap = jdbcTemplate.queryForMap(sql.toString());
if (dataMap != null) {
cachedMap.put(cacheKey, dataMap);
}
}
if (dataMap != null) {
log.debug("field:{},value:{}", field.getName(), column.name());
Object value = dataMap.get(column.name());
BinLogDict binLogDict = field.getAnnotation(BinLogDict.class);//判断是否需要翻译
if (binLogDict != null) {
Object countryCode = codeUtil.getCountryCode(binLogDict.codeType(), value, field.getType());
log.info("get companyCode fail value:{} companyCode:{}", value, countryCode);
field.set(target, countryCode);
} else {
field.set(target, value);
}
}
}
} else {
// 不需要依赖对象自身字段值
StringBuilder sql = new StringBuilder();
sql.append(SQL_STATEMENT_SELECT).append(column.name()).append(SQL_STATEMENT_FROM).append(dbName).append(".").append(column.table()).append(SQL_STATEMENT_WHERE);
sql.append(column.whereCause()).append(SQL_STATEMENT_LIMIT);
log.debug("查询字段:{},sql:{}", field.getName(), sql);
Object object = jdbcTemplate.queryForObject(sql.toString(), field.getType());
for (Object target : list) {
log.debug("field:{},value:{}", field.getName(), column.name());
BinLogDict binLogDict = field.getAnnotation(BinLogDict.class);//判断是否需要翻译
if (binLogDict != null) {
Object countryCode = codeUtil.getCountryCode(binLogDict.codeType(), object, field.getType());
log.info("get companyCode fail value:{} companyCode:{}", object, countryCode);
field.set(target, countryCode);
} else {
field.set(target, object);
}
}
}
} catch (Exception e) {
log.info("{}", e);
} finally {
field.setAccessible(false);
}
}
/**
* 根据报名取得改包下所有类
*
* @param packageName
* @param resourceLoader
* @return
*/
private List<Class> getClass(String packageName, ResourceLoader resourceLoader) {
List<Class> classList = new ArrayList<>();
try {
ResourcePatternResolver resolver = ResourcePatternUtils.getResourcePatternResolver(resourceLoader);
MetadataReaderFactory metaReader = new CachingMetadataReaderFactory(resourceLoader);
Resource[] resources = resolver.getResources("classpath*:" + packageName.replaceAll("\\.", "/") + "/*.class");
for (Resource r : resources) {
MetadataReader reader = metaReader.getMetadataReader(r);
classList.add(Class.forName(reader.getClassMetadata().getClassName()));
log.debug("----->binlog会自动转化的model类型:{}", reader.getClassMetadata().getClassName());
}
} catch (Exception e) {
log.info("", e);
}
return classList;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public enum EventType {
UNKNOWN,
START_V3,
QUERY,
STOP,
ROTATE,
INTVAR,
LOAD,
SLAVE,
CREATE_FILE,
APPEND_BLOCK,
EXEC_LOAD,
DELETE_FILE,
NEW_LOAD,
RAND,
USER_VAR,
FORMAT_DESCRIPTION,
XID,
BEGIN_LOAD_QUERY,
EXECUTE_LOAD_QUERY,
TABLE_MAP,
PRE_GA_WRITE_ROWS,
PRE_GA_UPDATE_ROWS,
PRE_GA_DELETE_ROWS,
WRITE_ROWS,
UPDATE_ROWS,
DELETE_ROWS,
INCIDENT,
HEARTBEAT,
IGNORABLE,
ROWS_QUERY,
EXT_WRITE_ROWS,
EXT_UPDATE_ROWS,
EXT_DELETE_ROWS,
GTID,
ANONYMOUS_GTID,
PREVIOUS_GTIDS,
TRANSACTION_CONTEXT,
VIEW_CHANGE,
XA_PREPARE;

private EventType() {
}
}

This is copyright.

...

...

00:00
00:00