Flink 读写Kafka总结

前言

总结Flink读写Kafka

Flink 版本

1.15.4

Table API

本文主要总结Table API的使用(SQL),官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/kafka/

kerberos认证相关配置

官方文档:

修改conf/flink-conf.yaml

security.kerberos.login.keytab: /etc/security/keytabs/hive.service.keytab
security.kerberos.login.principal: hive/indata-192-168-44-128.indata.com@INDATA.COM

java.security.auth.login.config: /usr/hdp/3.1.0.0-78/kafka/conf/kafka_jaas.conf
security.kerberos.login.contexts: Client,KafkaClient

kafka_jaas.conf

        KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useTicketCache=true
  security.protocol=SASL_PLAINTEXT
      renewTicket=true
        serviceName="kafka";
        };
        Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
    security.protocol=SASL_PLAINTEXT
    keyTab="/etc/security/keytabs/kafka.service.keytab"
        storeKey=true
        useTicketCache=false
        serviceName="zookeeper"
        principal="kafka/indata-192-168-44-128.indata.com@INDATA.COM";
        };

jar包

flink-sql-connector-kafka-1.15.4.jar拷贝到lib路径下,下载地址:
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar

我最开始是将flink-connector-kafka-1.15.4.jarkafka-clients-2.8.1.jar拷贝到lib路径下,下载地址:
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.15.4/flink-connector-kafka-1.15.4.jar
https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar

原因是官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/kafka/中提到的依赖就是flink-connector-kafka,而不是flink-sql-connector-kafka,至于为啥还需要kafka-clients,是因为如果不放kafka-clients包,会报异常:java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy
kafka-clients的版本可以在flink源码的pom中找到

其实flink-sql-connector-kafka就是将flink-connector-kafka和kafka相关的依赖一块打包的

对于其他类型的connector的jar包,都可以直接下载flink-sql-connector-*.jar,名字带sql的jar包含了相关的依赖,不用单独配置其他依赖jar包,比较方便

sql-client

以sql-client提交SQL的方式进行测试验证。

读写Kafka

CREATE TABLE if not exists test_flink_kafka (
  id int,
  name string,
  price double,
  ts int,
  dt string
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_flink_kafka',
  'properties.bootstrap.servers' = 'indata-192-168-44-128:6667',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.group.id' = 'dkl',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'json'
);

写Kafka

insert into test_flink_kafka values (1,'hudi1',1.1,1000,'20230619'),(2,'hudi2',2.2,2000,'20230619');

读Kafka

select * from test_flink_kafka;

Kafka2Hudi

Hudi 0.13.0 需要将calcite-core-1.10.0.jar放到Flink lib下,同步Hive时需要

set yarn.application.name=kafka2hudi;

set parallelism.default=1;
set taskmanager.memory.process.size=3g;

set execution.checkpointing.interval=10000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/kafka2hudi;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;


CREATE TABLE if not exists kafka_source (
  id int,
  name string,
  price double,
  ts int,
  dt string
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_flink_kafka',
  'properties.bootstrap.servers' = 'indata-192-168-44-128:6667',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.group.id' = 'dkl',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'json'
);

CREATE TABLE hudi_sink (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/cdc/hudi_sink',
  'write.operation'='insert', --写类型,可选
  'write.tasks'='1', --并行度,可选,需要传参
  'table.type'='COPY_ON_WRITE', --表类型,可选
  'precombine.field' = 'ts', --可选,预合并字段和历史比较字段,当新来的数据该字段大于历史值时才会更新,默认为ts(如果有这个ts字段的话),需要传参,没有可不填,建议将该值设置为update_time
  'hoodie.datasource.write.recordkey.field' = 'id', -- 可选,和primary key效果一样,二者至少选一个
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', --该参数目前版本有bug
  'index.type' =  'BUCKET', -- flink只支持两种index,默认FLINK_STATE;默认的state index对于数据量比较大的情况会因为tm内存不足导致GC OOM,并且如果不是用checkpoint恢复的话会导致数据重复
  'hoodie.bucket.index.num.buckets' = '16', -- 桶数
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'cdc',
  'hive_sync.table' = 'hudi_sink',
  'hoodie.datasource.hive_sync.create_managed_table' = 'true' --是否为内部表,0.13.0版本开始支持
);

insert into hudi_sink select * from kafka_source;

cdc2Kafka

对于cdc数据写kafka,普通的formats是不支持的,需要使用cdc格式的formats,比如debezium-json,此外还有canal-jsonmaxwell-jsonogg-json,Flink有哪些formats以及支持的connectors可以参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/formats/overview/
除了使用cdc格式的formats,还可以将cdc数据写到upsert-kafka中('connector' = 'upsert-kafka'),upsert-kafka对应的文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/upsert-kafka/

普通的formats会报如下异常:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafka_sink' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_cdc_source]], fields=[id, name, price, ts, dt])

使用debezium-json,cdc_mysql2kafka示例SQL

set yarn.application.name=cdc_mysql2kafka;

set execution.checkpointing.interval=1000;
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2kafka;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;

set state.backend=rocksdb;

CREATE TABLE mysql_cdc_source (
  id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.44.128',
    'port' = '3306',
    'username' = 'root',
    'password' = 'password',
    'database-name' = 'cdc',
    'table-name' = 'mysql_cdc_source'
);

CREATE TABLE if not exists kafka_sink (
  id int,
  name string,
  price double,
  ts bigint,
  dt string
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_sink',
  'properties.bootstrap.servers' = 'indata-192-168-44-128:6667',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.group.id' = 'dkl',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

insert into kafka_sink select * from mysql_cdc_source;

使用kafka命令行查看一下topic里面的内容,看看debezium-json格式的数据长什么样

/usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.128:6667 --from-beginning --topic kafka_sink --group dkl --consumer-property security.protocol=SASL_PLAINTEXT

{"before":null,"after":{"id":1,"name":"my_catalog","price":11.11,"ts":1000,"dt":"2023-04-12"},"op":"c"}
{"before":null,"after":{"id":2,"name":"my_catalog","price":11.11,"ts":1000,"dt":"2023-04-12"},"op":"c"}
{"before":null,"after":{"id":3,"name":"my_catalog","price":123.0,"ts":1000,"dt":"2023-04-12"},"op":"c"}
{"before":null,"after":{"id":4,"name":"my_catalog","price":123.0,"ts":1000,"dt":"2023-04-12"},"op":"c"}

upsert-kafka SQL示例

CREATE TABLE if not exists upsert_kafka_sink (
  id int,
  name string,
  price double,
  ts bigint,
  dt string,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'upsert_kafka_sink',
  'properties.bootstrap.servers' = 'indata-192-168-44-128:6667',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI',
  'properties.sasl.kerberos.service.name' = 'kafka',
  'properties.group.id' = 'dkl',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json'
);

upsert-kafka参数和kafka参数有以下不同,多了两个必选:

PRIMARY KEY

key.format

不支持参数:

scan.startup.mode

消费CDC格式的Kafka

Flink消费CDC格式的kafa数据时,有一个问题:checkpoint会随着消费数据量的增加越来越大,和sink端无关

比如sink端是print('connector' = 'print')的截图

根据我个人理解,checkpoint一般只需要保存kafka offset信息就可以了,但是offset对应的文件不可能有那么大,将checkpoint文件查看一下里面的内容,发现保存的也不是kafka的数据。
可能和SourceSplit有关,这块我也不太懂,这个问题我还没有解决,下面是checkpoint文件内容截图:





Sink kafka exactly once

官方文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/kafka/#%E4%B8%80%E8%87%B4%E6%80%A7%E4%BF%9D%E8%AF%81
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/#%E5%AE%B9%E9%94%99

文档一:
默认情况下,如果查询在 启用 checkpoint 模式下执行时,Kafka sink 按照至少一次(at-lease-once)语义保证将数据写入到 Kafka topic 中。
当 Flink checkpoint 启用时,kafka 连接器可以提供精确一次(exactly-once)的语义保证。
除了启用 Flink checkpoint,还可以通过传入对应的 sink.semantic 选项来选择三种不同的运行模式:

文档二:

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释:

DeliveryGuarantee.NONE 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在Flink重启时重复,因为Flink会重新处理旧数据。
DeliveryGuarantee.EXACTLY_ONCE: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在checkpoint完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务ID的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务不会互相影响!此外,强烈建议将Kafka的事务超时时间调整至远大于checkpoint最大间隔 + 最大重启时间,否则Kafka对未提交事务的过期处理会导致数据丢失。

前提:开启checkpoint;exactly-once其实是通过kafka事务实现的,kafka 0.11+开始支持事务,所以还需要kafka的版本大于0.11;将Kafka的事务超时时间(producer transaction.timeout.ms)调整至远大于checkpoint最大间隔 + 最大重启时间,否则Kafka对未提交事务的过期处理会导致数据丢失。

文档一为Table API的文档,文档二为DataStream的文档,两者看起来差不太多,但是参数又不太一样,这很令人费解,加上后面验证sink cdc格式exactly once正好碰到了问题,用文档二的参数才解决,所以我看了源码一探究竟,两者的区别如下:
sink.semantic为弃用参数,新的参数为sink.delivery-guarantee,另外可能还需要结合sink.transactional-id-prefix一起使用,下面是相关源码:

    private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig tableOptions) {
        // 如果配置了sink.semantic
        if (tableOptions.getOptional(SINK_SEMANTIC).isPresent()) {
            // 警告sink.semantic已经弃用,其使用新的参数sink.delivery-guarantee
            LOG.warn(
                    "{} is deprecated and will be removed. Please use {} instead.",
                    SINK_SEMANTIC.key(),
                    DELIVERY_GUARANTEE.key());
            // 根据sink.semantic的值构建DeliveryGuarantee并返回     
            return DeliveryGuarantee.valueOf(
                    tableOptions.get(SINK_SEMANTIC).toUpperCase().replace("-", "_"));
        }
        // 否则直接返回sink.delivery-guarantee对应的值
        return tableOptions.get(DELIVERY_GUARANTEE);
    }

官方文档不及时更新或者不一致也挺耽误事儿的,之前总结checkpoint时也发现了官方文档很多配置参数都是过时弃用的。。

配置参数:

'properties.transaction.timeout.ms'='900000', --可选,后面异常解决中有解释
'sink.delivery-guarantee'='exactly-once' 
-- 'sink.semantic' = 'exactly-once' -- 已经弃用           
-- 'sink.transactional-id-prefix'='dkl',     --可选,默认值:kafka-sink;事务前缀,需要保证对不同的应用是唯一的,以保证不同作业的事务不会互相影响

先说结论:sink.delivery-guarantee和sink.transactional-id-prefix最好一起使用,因为不设置sink.transactional-id-prefix也就是使用默认值的情况会出现问题(后面有解释)

一致性验证

首先启动sink kafka的任务,在生成了一个ckp之后(且还有数据没有写完),停止kafka服务,等一段时间后启动kafka服务,然后等待数据跑完,验证数据量。

假如source数据量有200万条数据,用Flink SQL count验证

默认参数:数据量大于200万
配置consumer读事务参数:'properties.isolation.level'='read_committed' (Flink SQL建表语句中添加),数据量正好等于200万
isolation.level默认值为read_uncommitted,默认读所有的消息,commit和没commit都会读出来,所以数据可能会重复。而read_committed只读commit的记录,数据没有重复,可以保证 exactly once。

事务和topic是否分区无关,也就是对于多个分区的topic也支持事务

CDC格式 exactly once

对于sink普通格式,我最开始通过参数'sink.semantic' = 'exactly-once'就可以了(开始不知道有新的参数),但是对于sink cdc格式,只使用sink.semantic参数,会有问题,需要通过参数sink.transactional-id-prefixsink.delivery-guarantee来解决。

'sink.transactional-id-prefix'='dkl',     --事务前缀,需要保证对不同的应用是唯一的,以保证不同作业的事务不会互相影响
'sink.delivery-guarantee'='exactly-once'  

仅使用sink.semantic参数时也就是不设置事务前缀(默认值kafka-sink)的问题:有一个任务一直处于INITIALIZING,很长时间(现在测试的是卡住8分钟左右,之前测试的是卡住一个半小时左右,不知道什么原因导致时间减少了)之后才会变为running,导致任务卡住很长时间。



小结

我最开始仅设置了sink.semantic参数来验证sink普通格式是没有问题的,后来同事提出问题说sink cdc格式时任务会卡住。最后用了新的参数:sink.delivery-guarantee和sink.transactional-id-prefix解决了这个问题,当时以为对于cdc格式的必须用新参数才行,当时就感觉这个结论不是很合理。

后面因为没有时间管这个问题就没有去深究,现在再进行总结,有了新的认知:
1、sink.semantic和sink.delivery-guarantee是一样的,只是一个是弃用的旧参数,一个新参数
2、仅设置sink.semantic时和同时设置sink.delivery-guarantee和sink.transactional-id-prefix参数的唯一区别是前缀不一样,一个是使用默认前缀kafka-sink(当时不知道有默认值),另一个是自己配置

那么问题来了,其实两种设置没有本质区别,都有前缀,那为啥表现不一样,后者会卡住呢?后来我再验证发现,对于普通格式,使用默认前缀也会卡住;对于所有的格式,我自己配置了一个前缀有的也会卡住,也就是问题和格式无关。
那原因是啥呢,我想的是卡住的原因是不同的任务前缀不能一样,如果一样了就会卡住。其实我停止了前面的所有任务,再使用相同的前缀起一个新的任务也有可能会卡住(有概率),我想这个可能是因为缓存的原因,也就是前面的任务虽然停了,但是不知道哪里还有缓存,导致新的任务认为有其他任务在使用这个相同的前缀,所以卡住了(结论不一定正确)。所以对于每一个任务我们都最好生成一个新的唯一的前缀。

默认前缀:

异常解决

异常一

org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed.

这个异常,网上没有找到对应的资料,是我自己偶然发现的,解决方案:在ranger kafka策略里有一条策略单独控制Transactional Id,改一下这个策略加上对应的用户,比如我是用的hive用户

异常二

org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1386)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1290)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:417)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
    at java.lang.Thread.run(Thread.java:748)

原因是因为flink kafka connector sink kafka时将事务默认的超时时间(transaction.timeout.ms)设置为一小时,超过了kafka默认的最大超时时间(transaction.max.timeout.ms)15分钟,解决方法有两个,一个是在建表语句中将transaction.timeout.ms设置小一点,比如设置为15分钟

'properties.transaction.timeout.ms'='900000'

另一种方法是修改kafka服务端的配置,增加transaction.max.timeout.ms,比如设置为一小时,transaction.max.timeout.ms='3600000',两种方法可以结合使用,个人建议最好将transaction.max.timeout.ms设置大一点。因为上面有提到:强烈建议将Kafka的事务超时时间(producer transaction.timeout.ms)调整至远大于checkpoint最大间隔 + 最大重启时间,否则Kafka对未提交事务的过期处理会导致数据丢失。如果checkpoint的时间间隔比,较大超过15分钟了,大于了事务时间,那么就可能会丢失数据,所以不如直接将transaction.max.timeout.ms设置为一小时,properties.transaction.timeout.ms不用设置,也就是按照默认的一小时。

ambari修改kafka broker的事务最大超时时间

flink sink kafka默认一小时(transaction.timeout.ms)


kafka默认15分钟(transaction.max.timeout.ms)

其他异常

有几个可以忽略的异常,具体表现为停止kafka一段时间,再重启动kafka时会抛出一个异常,只有一次,不会一直报,具体的异常信息就不贴了

默认配置

默认值是AT_LEAST_ONCE,也就是默认情况下数据可能会重复但不会少,但是我之前测试的是数据会少,不过没有研究原因,现在想要研究一下之前问题少的原因,结果又正常了,无法复现之前的问题了,不知道啥原因~

参考文章

https://www.cnblogs.com/xijiu/p/16917741.html

展开阅读全文

页面更新:2024-04-28

标签:可能会   前缀   异常   参数   原因   事务   文档   格式   时间   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top