「万字长文」Flink cdc源码精讲(推荐收藏)(四)

// -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------
public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        statefulTaskContext.configure(currentSnapshotSplit);
     // 拿到context的queue,在pollSplitSrecords的时候需要
        this.queue = statefulTaskContext.getQueue();
        this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
     // 主要读取逻辑在readTask中
        this.splitSnapshotReadTask =
                new MySqlSnapshotSplitReadTask(
                        statefulTaskContext.getConnectorConfig(),
                        statefulTaskContext.getOffsetContext(),
                        statefulTaskContext.getSnapshotChangeEventSourceMetrics(),
                        statefulTaskContext.getDatabaseSchema(),
                        statefulTaskContext.getConnection(),
                        statefulTaskContext.getDispatcher(),
                        statefulTaskContext.getTopicSelector(),
                        StatefulTaskContext.getClock(),
                        currentSnapshotSplit);
     // 提交一个runnable到线程中,主要是执行readTask的execute方法
        executor.submit(
                () -> {
                    try {
                        currentTaskRunning = true;
                       // 自己实现的contextImpl 主要记录高水位和低水位用
                        final SnapshotSplitChangeEventSourceContextImpl sourceContext =
                                new SnapshotSplitChangeEventSourceContextImpl();
                       // 执行readTask
                        SnapshotResult snapshotResult =
                                splitSnapshotReadTask.execute(sourceContext);
                        final MySqlBinlogSplit backfillBinlogSplit =
                                createBackfillBinlogSplit(sourceContext);
                        // optimization that skip the binlog read when the low watermark equals high
                        // watermark
                       // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read
                        final boolean binlogBackfillRequired =
                                backfillBinlogSplit
                                        .getEndingOffset()
                                        .isAfter(backfillBinlogSplit.getStartingOffset());
                        if (!binlogBackfillRequired) {
                            dispatchHighWatermark(backfillBinlogSplit);
                            currentTaskRunning = false;
                            return;
                        }
                        // snapshot执行完成后,开始binlogReadTask的读取操作
                        if (snapshotResult.isCompletedOrSkipped()) {
                           // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task
                            final MySqlBinlogSplitReadTask backfillBinlogReadTask =
                                    createBackfillBinlogReadTask(backfillBinlogSplit);
                           // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了
                           // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会
                           // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游
                            backfillBinlogReadTask.execute(
                                    new SnapshotBinlogSplitChangeEventSourceContextImpl());
                        } else {
                            readException =
                                    new IllegalStateException(
                                            String.format(
                                                    "Read snapshot for mysql split %s fail",
                                                    currentSnapshotSplit));
                        }
                    } catch (Exception e) {
                        currentTaskRunning = false;
                        LOG.error(
                                String.format(
                                        "Execute snapshot read task for mysql split %s fail",
                                        currentSnapshotSplit),
                                e);
                        readException = e;
                    }
                });
    }
// -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------
 
 @Override
    public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
        SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个
        final SnapshotContext ctx;
        try {
            ctx = prepare(context); //重新new了一个 context对象,比较无用
        } catch (Exception e) {
            LOG.error("Failed to initialize snapshot context.", e);
            throw new RuntimeException(e);
        }
        try {
           // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可
            return doExecute(context, ctx, snapshottingTask);
        } catch (InterruptedException e) {
            LOG.warn("Snapshot was interrupted before completion");
            throw e;
        } catch (Exception t) {
            throw new DebeziumException(t);
        }
    }
// -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------
 @Override
    protected SnapshotResult doExecute(
            ChangeEventSourceContext context,
            SnapshotContext snapshotContext,
            SnapshottingTask snapshottingTask)
            throws Exception {
        final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
                (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
        ctx.offset = offsetContext;
       // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了
        final SignalEventDispatcher signalEventDispatcher =
                new SignalEventDispatcher(
                        offsetContext.getPartition(),
                        topicSelector.topicNameFor(snapshotSplit.getTableId()),
                        dispatcher.getQueue());
    // 其实log输出的日志就已经很清晰了
       // 记录低水位
        final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 1 - Determining low watermark {} for split {}",
                lowWatermark,
                snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
        LOG.info("Snapshot step 2 - Snapshotting data");
       // 读取数据  主要方法重点介绍的地方
        createDataEvents(ctx, snapshotSplit.getTableId());
    // 记录高水位
        final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
        LOG.info(
                "Snapshot step 3 - Determining high watermark {} for split {}",
                highWatermark,
                snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(
                snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
                .setHighWatermark(highWatermark);
        return SnapshotResult.completed(ctx.offset);
    }
 
// 我们看看createDataEvents 调用过程
private void createDataEvents(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            TableId tableId)
            throws Exception {
        EventDispatcher.SnapshotReceiver snapshotReceiver =
                dispatcher.getSnapshotChangeEventReceiver();
        LOG.debug("Snapshotting table {}", tableId);
        createDataEventsForTable(
                snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
     // receiver的逻辑我们就不看了,我这里介绍一下就好
     // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record
     // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻
        snapshotReceiver.completeSnapshot();
    }
// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑
    private void createDataEventsForTable(
            RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
            EventDispatcher.SnapshotReceiver snapshotReceiver,
            Table table)
            throws InterruptedException {
        long exportStart = clock.currentTimeInMillis();
        LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());
    
       // 构建sql
        final String selectSql =
                StatementUtils.buildSplitScanQuery(
                        snapshotSplit.getTableId(),
                        snapshotSplit.getSplitKeyType(),
                        snapshotSplit.getSplitStart() == null,
                        snapshotSplit.getSplitEnd() == null);
        LOG.info(
                "For split '{}' of table {} using select statement: '{}'",
                snapshotSplit.splitId(),
                table.id(),
                selectSql);
    
        try (PreparedStatement selectStatement =
                        StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql
                                jdbcConnection,
                                selectSql,
                                snapshotSplit.getSplitStart() == null,
                                snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),
                                snapshotSplit.getSplitEnd(),
                                snapshotSplit.getSplitKeyType().getFieldCount(),
                                connectorConfig.getQueryFetchSize());
             // 然后对查询出来的数据进行封装成sourceRecord发送下游
                ResultSet rs = selectStatement.executeQuery()) {
            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
            long rows = 0;
            Threads.Timer logTimer = getTableScanLogTimer();
            while (rs.next()) {
                rows++;
                final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; i++) {
                    Column actualColumn = table.columns().get(i);
                    row[columnArray.getColumns()[i].position() - 1] =
                            readField(rs, i + 1, actualColumn, table);
                }
                if (logTimer.expired()) {
                    long stop = clock.currentTimeInMillis();
                    LOG.info(
                            "Exported {} records for split '{}' after {}",
                            rows,
                            snapshotSplit.splitId(),
                            Strings.duration(stop - exportStart));
                    snapshotProgressListener.rowsScanned(table.id(), rows);
                    logTimer = getTableScanLogTimer();
                }
                // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解
                dispatcher.dispatchSnapshotEvent(
                        table.id(),
                        getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个
                        snapshotReceiver);
            }
            LOG.info(
                    "Finished exporting {} records for split '{}', total duration '{}'",
                    rows,
                    snapshotSplit.splitId(),
                    Strings.duration(clock.currentTimeInMillis() - exportStart));
        } catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
        }
    }
// -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------
 // 进入evnentDisptcher.dispatchSnapshotEvent方法
   public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
        DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            errorOnMissingSchema(dataCollectionId, changeRecordEmitter);
        }
        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
            @Override
            public void changeRecord(DataCollectionSchema schema,
                                     Operation operation,
                                     Object key, Struct value,
                                     OffsetContext offset,
                                     ConnectHeaders headers)
                    throws InterruptedException {
                eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);
               // 真正的放入队列的逻辑在这里调用
               // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类
                receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);
            }
        });
    }
  // BufferingSnapshotChangeRecordReceiver的changeRecord方法
 // 前面简单介绍过他的处理逻辑了,就不必多做介绍了
  @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema,
                                 Operation operation,
                                 Object key, Struct value,
                                 OffsetContext offsetContext,
                                 ConnectHeaders headers)
                throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", operation, key);
            if (bufferedEvent != null) {
                queue.enqueue(bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
            // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
            bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(
                        offsetContext.getPartition(),
                        offsetContext.getOffset(),
                        topicName, null,
                        keySchema, key,
                        dataCollectionSchema.getEnvelopeSchema().schema(), value,
                        null, headers);
                return changeEventCreator.createDataChangeEvent(record);
            };
        }
展开阅读全文

页面更新:2024-04-05

标签:队列   水位   下游   变量   源码   逻辑   成员   阶段   操作   收藏   方法   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top