Flink写Doris的哪些坑

0. 先看兼容性

Flink作为当下炙手可热的计算引擎之一,Doris自然会提供对其的兼容:

同样,官方文档也给出了对Flink不同版本的兼容情况:

而我当前软件工程里的Flink版本为1.15.2、Java版本也为1.8、对应的Doris版本为1.2.3,于是正好对应Doris的1.2.0版本的connector。

1. 环境准备

官方文档描述了一堆Flink写Doris前的准备工作,但是读完之后,在我看来,大部分其实是没有必要的(原因跟上一篇通过spark写Doris一样)。

因为我是通过idea开发的代码,对于开发环境来说,理论上我只需要额外引入对应的flink maven依赖,doris connector maven依赖就够了。

因为flink的环境我已经具备现成的了,那么暂时就只需要引入connector依赖就可以:

而至于集群运行环境,我有yarn,理论上也就够了,其他一切花里胡哨的配置全都不需要(当然,你最好要清楚为啥)。

2. SQL API第一次试错

从官方文档描述来看,Doris即支持Flink的Table API(SQL API),也支持其普通的DataStream API,这个就有点类似spark的Dataset(DataFrame)和RDD。

由于SQL API的语法更加简洁和便利,于是,我决定先用这个API来试试,代码如下:

package com.anryg.doris

import java.time.Duration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
* @DESC: 通过SQL API读取kafka数据,写入到Doris
* @Auther: Anryg
* @Date: 2023/8/3 11:29
*/
object FlinkSQLFromKafka2Doris {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/checkpoint/FlinkSQLFromKafka2Doris")
env.getCheckpointConfig.setAlignedCheckpointTimeout(Duration.ofMinutes(1L))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)

val tableEnv = StreamTableEnvironment.create(env)

/**第一步:读取kafka数据源*/
tableEnv.executeSql(
"""
Create table dns_logs_from_kafka(
|client_ip STRING,
|domain STRING,
|`time` STRING,
|target_ip STRING,
|rcode STRING,
|query_type STRING,
|authority_record STRING,
|add_msg STRING,
|dns_ip STRING
|)
|with(
|'connector' = 'kafka',
|'topic' = 'test',
|'properties.bootstrap.servers' = '192.168.211.107:6667',
|'properties.group.id' = 'FlinkSQLFromKafka2Doris',
|'scan.startup.mode' = 'latest-offset',
|'value.format'='csv', //确定数据源为文本格式
|'value.csv.field-delimiter'='|' //确定文本数据源的分隔符
|)
""".stripMargin)

//tableEnv.executeSql("select * from dns_logs_from_kafka").print()

/**第二步:创建Doris映射表*/
tableEnv.executeSql(
"""
|CREATE TABLE dns_logs_from_flink02 (
|`client_ip` STRING
|)
| WITH (
| 'connector' = 'doris',
| 'fenodes' = '192.168.221.173:8030',
| 'table.identifier' = 'example_db.dns_logs_from_flink02',
| 'username' = 'root',
| 'password' = '',
| 'sink.properties.format' = 'json',
| 'sink.properties.read_json_by_line' = 'true',
| 'sink.label-prefix' = 'doris_label'
|)
""".stripMargin)

/**第三步:数据写入到Doris表中*/
tableEnv.executeSql(
"""
|INSERT INTO dns_logs_from_flink02
|select
|client_ip
|from
|dns_logs_from_kafka
""".stripMargin)

}
}

首先,我这肯定是严格按照官方文档的要求写的,而且还变着花样,改了多处设置,甚至为了尽可能快速解决问题,我都委屈到把要写入到Doris表的数据,减少到了一个字段。

先后尝试了至少10次以上,日志debug的模式也打开了(需要额外配置log4j2.xml文件),甚至在读取完kafka之后,我还专门把读取结果打印出来,确保了读取kafka这一步没有问题。

但是,即便这样,我还是没法定位出问题,这个奇葩的原因在于,即便我打开了日志的debug设置,程序依然没有任何报错提示,但数据就是写不进去

再次查看官方文档,我看它提到了需要在FE上做这么一个设置,心想,莫非是因为这一步没做吗?

于是赶紧把我的3台FE都增加了这个配置,然后重启集群。

但,然并卵,依然写不进数据,也依然没有任何报错提示。

那我就没有办法了,现在就是想找出问题,都不知道从哪找起,就是这么气人(当然,也可能是因为我水平不行)。

既然这样,那咱就换一棵歪脖子树试试(不想折腾太久了)。

3. DataStream第二次试错

除了SQL API,官网还给出了DataStream API的样例,既然这样,那就试一下吧。

我根据这个官网样例,写出来这样的代码:

package com.anryg.doris

import java.util.Properties

import com.alibaba.fastjson.JSONObject
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.doris.flink.cfg.DorisOptions
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.cfg.DorisExecutionOptions
import org.apache.doris.flink.cfg.DorisReadOptions
import org.apache.doris.flink.sink.writer.{DorisRecordSerializer, SimpleStringSerializer}
import org.apache.flink.api.common.eventtime.WatermarkStrategy

/**
* @DESC: 通过DataStream API读取kafka数据,写入到Doris
* @Auther: Anryg
* @Date: 2023/8/3 17:05
*/
object FlinkDSFromKafka2Doris {

def main(args: Array[String]): Unit = {
//获取流任务的环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE) //打开checkpoint功能

env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Doris") //设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略

val kafkaSource = KafkaSource.builder() //获取kafka数据源
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("FlinkDSFromKafka2Doris")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()

import org.apache.flink.streaming.api.scala._ //引入隐式转换函数,需要将从kafka读出来的数据做隐式转换

/**配置Doris相关参数*/
val dorisBuilder = DorisOptions.builder 
dorisBuilder.setFenodes("192.168.221.173:8030")
.setTableIdentifier("example_db.dns_logs_from_flink01")
.setUsername("root")
.setPassword("")

/**确定数据写入到Doris的方式,即stream load方式*/
val executionBuilder = DorisExecutionOptions.builder
executionBuilder.setLabelPrefix("flink-doris-label03") //streamload label prefix

/**确定Doris的Sink数据方式*/
val builder = DorisSink.builder[String] //注意这个数据类型String 需要加上
builder.setDorisReadOptions(DorisReadOptions.builder.build) //确定数据读取策略
.setDorisExecutionOptions(executionBuilder.build) //确定数据执行策略
.setSerializer(new SimpleStringSerializer()) //确定数据数据序列化(写入)类型
.setDorisOptions(dorisBuilder.build) //添加Doris配置

/**读取数据源生成DataStream对象*/
val kafkaDS = env.fromSource[String](kafkaSource, WatermarkStrategy.noWatermarks(),"kafka-data") 

/**对读取的数据进行处理*/
val targetDS = kafkaDS.map(line => {
val array = line.split("|")
array
}).filter(_.length == 9)
.map(array => {
val json = new JSONObject() /**将数据封装为json对象*/
json.put("client_ip",array(0))
json.put("domain",array(1))
json.put("time",array(2))
json.put("target_ip",array(3))
json.put("rcode",array(4))
json.put("query_type",array(5))
json.put("authority_record",array(6))
json.put("add_msg",array(7))
json.put("dns_ip",array(8))
json.toJSONString /**转换为json string*/
})

targetDS.sinkTo(builder.build())

env.execute("FlinkDSFromKafka2Doris")
}
}

而且吸取了上次用spark写Doris的惨痛经历,我把数据格式都转成json string了。

按理说,应该没什么毛病才对,可是一运行呢,又报错了:

不过好消息是,它终于给我错误提示了,对比上面用SQL api时,没有任何报错的一脸懵逼要强多了。

通过给的这个错误连接,我顺利打开了其中的内容,是这样的:

告诉我说,现在这个数据处理格式(json string)还是不符合要求,这下又有点懵逼了。

心想,这个通过计算引擎写Doris的数据格式,还真有点让人捉摸不透,上一篇spark写Doris时,是必须要把数据转为json string格式的,但到了Flink这里,又不行了

那咋办呢?又只能看源码了。

4. 查看源码,定位问题

既然要决定看源码,从哪开始看呢?

那就是找到它的数据写入逻辑,或者写入前配置,因为Flink写Doris的方式是Steam Load,那么就找到这个对应的类,而这个相关的类在这里:

怎么说呢,又是一个几乎0注释的源码。

通读这个类之后呢,发现了一个关键函数:

这里面清清楚楚的定义了数据的格式为json string类型,而且从函数命名来看,说它是一个默认的设置方式(函数名为defaults嘛)。

但是,奇葩的是,官网给的这个获取该执行对象的代码,却并没有读到这些所谓的默认配置

看到这里,我似乎一下子就找到了原因。

5. 解决问题

原因既然找到了,是因为源码那个数据格式的设置没有生效。

那么咱就只能手动把这个设置给加上,怎么加呢,这样:

加完之后,整个代码逻辑就是这样的:

package com.anryg.doris

import java.util.Properties

import com.alibaba.fastjson.JSONObject
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.doris.flink.cfg.DorisOptions
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.cfg.DorisExecutionOptions
import org.apache.doris.flink.cfg.DorisReadOptions
import org.apache.doris.flink.sink.writer.{DorisRecordSerializer, SimpleStringSerializer}
import org.apache.flink.api.common.eventtime.WatermarkStrategy


/**
* @DESC: 通过DataStream API读取kafka数据,写入到Doris
* @Auther: Anryg
* @Date: 2023/8/3 17:05
*/
object FlinkDSFromKafka2Doris {

def main(args: Array[String]): Unit = {
//获取流任务的环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE) //打开checkpoint功能

env.getCheckpointConfig.setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Doris") //设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //设置checkpoint记录的保留策略

val kafkaSource = KafkaSource.builder() //获取kafka数据源
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("FlinkDSFromKafka2Doris")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()

import org.apache.flink.streaming.api.scala._ //引入隐式转换函数,需要将从kafka读出来的数据做隐式转换

/**配置Doris相关参数*/
val dorisBuilder = DorisOptions.builder
dorisBuilder.setFenodes("192.168.221.173:8030")
.setTableIdentifier("example_db.dns_logs_from_flink01")
.setUsername("root")
.setPassword("")

/**确定数据写入到Doris的方式,即stream load方式*/
val executionBuilder = DorisExecutionOptions.builder
executionBuilder.setLabelPrefix("flink-doris-label03") //streamload label prefix

/**添加额外的数据格式设置,否则写不进去*/
val properties = new Properties
properties.setProperty("format", "json")
properties.setProperty("read_json_by_line", "true")

executionBuilder.setStreamLoadProp(properties)

/**确定Doris的Sink数据方式*/
val builder = DorisSink.builder[String] //注意这个数据类型String 需要加上
builder.setDorisReadOptions(DorisReadOptions.builder.build) //确定数据读取策略
.setDorisExecutionOptions(executionBuilder.build) //确定数据执行策略
.setSerializer(new SimpleStringSerializer()) //确定数据数据序列化(写入)类型
.setDorisOptions(dorisBuilder.build) //添加Doris配置

/**读取数据源生成DataStream对象*/
val kafkaDS = env.fromSource[String](kafkaSource, WatermarkStrategy.noWatermarks(),"kafka-data")


/**对读取的数据进行处理*/
val targetDS = kafkaDS.map(line => {
val array = line.split("|")
array
}).filter(_.length == 9)
.map(array => {
val json = new JSONObject() /**将数据封装为json对象*/
json.put("client_ip",array(0))
json.put("domain",array(1))
json.put("time",array(2))
json.put("target_ip",array(3))
json.put("rcode",array(4))
json.put("query_type",array(5))
json.put("authority_record",array(6))
json.put("add_msg",array(7))
json.put("dns_ip",array(8))
json.toJSONString /**转换为json string*/
})

targetDS.sinkTo(builder.build())

env.execute("FlinkDSFromKafka2Doris")
}
}

官网的例子给的是batch模式,我这里是流模式

再执行,之前一直显示数据量为0的表记录,一下子就蹭蹭往上涨了。

最后

跟上次用spark写Doris一样,这次用Flink写Doris,也一样不顺利,磕磕碰碰一路,好在汲取了上次的教训,最终解决问题的效率提高了不少。

这里我想表达一点的就是,官方文档的详尽、严谨程度,源码注释的详细程度,在很大程度上决定了使用者对一项新技术的学习成本

其次就是,只要对出现的问题,框架能够抛出足够详细(一般详细也成)的错误提示,使用者通过对逻辑关系的梳理,也能找到问题的原因并解决。

但是,就怕那种虽然有问题,但是偏偏还不告诉你问题原因的(不报错,不提示),这就让人很惆怅。

展开阅读全文

页面更新:2024-06-04

标签:数据源   函数   源码   对象   策略   原因   方式   文档   官方   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top