// ------------------------- SnapshotSplitReader.submitSplit方法 ------------------------------------------
public void submitSplit(MySqlSplit mySqlSplit) {
this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
statefulTaskContext.configure(currentSnapshotSplit);
// 拿到context的queue,在pollSplitSrecords的时候需要
this.queue = statefulTaskContext.getQueue();
this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
this.hasNextElement.set(true);
this.reachEnd.set(false);
// 主要读取逻辑在readTask中
this.splitSnapshotReadTask =
new MySqlSnapshotSplitReadTask(
statefulTaskContext.getConnectorConfig(),
statefulTaskContext.getOffsetContext(),
statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
statefulTaskContext.getDatabaseSchema(),
statefulTaskContext.getConnection(),
statefulTaskContext.getDispatcher(),
statefulTaskContext.getTopicSelector(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
// 提交一个runnable到线程中,主要是执行readTask的execute方法
executor.submit(
() -> {
try {
currentTaskRunning = true;
// 自己实现的contextImpl 主要记录高水位和低水位用
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
new SnapshotSplitChangeEventSourceContextImpl();
// 执行readTask
SnapshotResult snapshotResult =
splitSnapshotReadTask.execute(sourceContext);
final MySqlBinlogSplit backfillBinlogSplit =
createBackfillBinlogSplit(sourceContext);
// optimization that skip the binlog read when the low watermark equals high
// watermark
// 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
final boolean binlogBackfillRequired =
backfillBinlogSplit
.getEndingOffset()
.isAfter(backfillBinlogSplit.getStartingOffset());
if (!binlogBackfillRequired) {
dispatchHighWatermark(backfillBinlogSplit);
currentTaskRunning = false;
return;
}
// snapshot执行完成后,开始binlogReadTask的读取操作
if (snapshotResult.isCompletedOrSkipped()) {
// 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
createBackfillBinlogReadTask(backfillBinlogSplit);
// 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
// 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
// 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
readException =
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
}
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
}
});
}
// ------------------------- MySqlSnapshotSplitReadTask.execute(sourceContext)方法 ------------------------------------------
@Override
public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
final SnapshotContext ctx;
try {
ctx = prepare(context); //重新new了一个 context对象,比较无用
} catch (Exception e) {
LOG.error("Failed to initialize snapshot context.", e);
throw new RuntimeException(e);
}
try {
// 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
return doExecute(context, ctx, snapshottingTask);
} catch (InterruptedException e) {
LOG.warn("Snapshot was interrupted before completion");
throw e;
} catch (Exception t) {
throw new DebeziumException(t);
}
}
// ------------------------- MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法 ------------------------------------------
@Override
protected SnapshotResult doExecute(
ChangeEventSourceContext context,
SnapshotContext snapshotContext,
SnapshottingTask snapshottingTask)
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
ctx.offset = offsetContext;
// 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
final SignalEventDispatcher signalEventDispatcher =
new SignalEventDispatcher(
offsetContext.getPartition(),
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());
// 其实log输出的日志就已经很清晰了
// 记录低水位
final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit);
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setLowWatermark(lowWatermark);
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
LOG.info("Snapshot step 2 - Snapshotting data");
// 读取数据 主要方法重点介绍的地方
createDataEvents(ctx, snapshotSplit.getTableId());
// 记录高水位
final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setHighWatermark(highWatermark);
return SnapshotResult.completed(ctx.offset);
}
// 我们看看createDataEvents 调用过程
private void createDataEvents(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
TableId tableId)
throws Exception {
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
// receiver的逻辑我们就不看了,我这里介绍一下就好
// receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
// 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
snapshotReceiver.completeSnapshot();
}
// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑
private void createDataEventsForTable(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
EventDispatcher.SnapshotReceiver snapshotReceiver,
Table table)
throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
// 构建sql
final String selectSql =
StatementUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
LOG.info(
"For split '{}' of table {} using select statement: '{}'",
snapshotSplit.splitId(),
table.id(),
selectSql);
try (PreparedStatement selectStatement =
StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
jdbcConnection,
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getFieldCount(),
connectorConfig.getQueryFetchSize());
// 然后对查询出来的数据进行封装成sourceRecord发送下游
ResultSet rs = selectStatement.executeQuery()) {
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
long rows = 0;
Threads.Timer logTimer = getTableScanLogTimer();
while (rs.next()) {
rows++;
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
for (int i = 0; i < columnArray.getColumns().length; i++) {
Column actualColumn = table.columns().get(i);
row[columnArray.getColumns()[i].position() - 1] =
readField(rs, i + 1, actualColumn, table);
}
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOG.info(
"Exported {} records for split '{}' after {}",
rows,
snapshotSplit.splitId(),
Strings.duration(stop - exportStart));
snapshotProgressListener.rowsScanned(table.id(), rows);
logTimer = getTableScanLogTimer();
}
// 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
dispatcher.dispatchSnapshotEvent(
table.id(),
getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
snapshotReceiver);
}
LOG.info(
"Finished exporting {} records for split '{}', total duration '{}'",
rows,
snapshotSplit.splitId(),
Strings.duration(clock.currentTimeInMillis() - exportStart));
} catch (SQLException e) {
throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
}
}
// ------------------------- dispatcher.dispatchSnapshotEvent方法之后的流程 ----------------------------------
// 进入evnentDisptcher.dispatchSnapshotEvent方法
public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
if (dataCollectionSchema == null) {
errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
}
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
@Override
public void changeRecord(DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
// 真正的放入队列的逻辑在这里调用
// receiver使我们传入的 对应BufferingSnapshotChangeRecordReceiver类
receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
}
});
}
// BufferingSnapshotChangeRecordReceiver的changeRecord方法
// 前面简单介绍过他的处理逻辑了,就不必多做介绍了
@Override
public void changeRecord(DataCollectionSchema dataCollectionSchema,
Operation operation,
Object key, Struct value,
OffsetContext offsetContext,
ConnectHeaders headers)
throws InterruptedException {
Objects.requireNonNull(value, "value must not be null");
LOGGER.trace("Received change record for {} operation on key {}", operation, key);
if (bufferedEvent != null) {
queue.enqueue(bufferedEvent.get());
}
Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
// the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
bufferedEvent = () -> {
SourceRecord record = new SourceRecord(
offsetContext.getPartition(),
offsetContext.getOffset(),
topicName, null,
keySchema, key,
dataCollectionSchema.getEnvelopeSchema().schema(), value,
null, headers);
return changeEventCreator.createDataChangeEvent(record);
};
}
页面更新:2024-04-05
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号