如何学习kafka?

作者:sinxu,腾讯CSIG后台开发

本文主要从Kafka消费、堆积、稳定性、预案、成本控制等角度等最佳实践。

引言:

要确保Kafka在使用过程中的稳定性,需要从kafka在业务中的使用周期进行依次保障。主要可以分为:事先预防(通过规范的使用、开发,预防问题产生)、运行时监控(保障集群稳定,出问题能及时发现)、故障时解决(有完整的应急预案)这三阶段。

一、事先预防

事先预防即通过规范的使用、开发,预防问题产生。主要包含集群/生产端/消费端的一些最佳实践、上线前测试以及一些针对紧急情况(如消息积压等)的临时开关功能。 Kafka调优原则:

  1. 确定优化目标,并且定量给出目标(Kafka 常见的优化目标是吞吐量、延时、持久性和可用性);
  2. 确定了目标之后,需要明确优化的维度:

1. 生产端最佳实践

1.1 参数调优

1.2 开发实践

(1) 做好Topic隔离

根据具体场景(是否允许一定延迟、实时消息、定时周期任务等)区分kafka topic,避免挤占或阻塞实时业务消息的处理。

(2) 做好消息流控

如果下游消息消费存在瓶颈或者集群负载过高等,需要在生产端(或消息网关)实施流量生产速率的控制或者延时/暂定消息发送等策略,避免短时间内发送大量消息。

(3) 做好消息补推

手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。

(4) 做好消息顺序性保障

如果需要在保证Kafka在分区内严格有序的话(即需要保证两个消息是有严格的先后顺序),需要设置key,让某类消息根据指定规则路由到同一个topic的同一个分区中(能解决大部分消费顺序的问题)。 但是,需要避免分区内消息倾斜的问题(例如,按照店铺Id进行路由,容易导致消息不均衡的问题)。

  1. 生产端:消息发送指定key,确保相同key的消息发送到同一个partition。
  2. 消费端:单线程消费或者写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue ;

(5) 适当提高消息发送效率

(6) 保证消息发送可靠性

2. 消费端最佳实践

2.1 参数调优

2.2 开发实践

(1) 做好消息消费幂等

消息消费的幂等主要根据业务逻辑做调整。

以处理订单消息为例:

  1. 由订单编号+订单状态唯一的幂等key,并存入redis;
  2. 在处理之前,首先会去查Redis是否存在该Key,如果存在,则说明已经处理过了,直接丢掉;
  3. 如果Redis没处理过,则将处理过的数据插入到业务DB上,再到最后把幂等Key插入到Redis上;

简而言之,即通过Redis做前置处理 + DB唯一索引做最终保证来实现幂等性。

(2) 做好Consumer隔离

在消息量非常大的情况下,实时和离线消费者同时消费一个集群,离线数据繁重的磁盘 IO 操作会直接影响实时业务的实时性和集群的稳定性。

根据消费的实时性可以将消息消费者行为划分两类:实时消费者和离线消费者。

(3) 避免消息消费堆积

注:批量拉取处理时,需注意下kafka版本,spring-kafka 2.2.11.RELEASE版本以下,如果配置kafka.batchListener=true,但是将消息接收的元素设置为单个元素(非批量List),可能会导致kafka在拉取一批消息后,仅仅消费了头部的第一个消息。

(4) 避免Rebalance问题

A. 触发条件:

B. 如何避免非必要rebalance(消费者下线、消费者主动退出消费组导致的reblance):

  1. 需要仔细地设置session.timeout.ms(决定了 Consumer 存活性的时间间隔)和heartbeat.interval.ms(控制发送心跳请求频率的参数) 的值。
  2. max.poll.interval.ms参数配置:控制 Consumer 实际消费能力对 Rebalance 的影响,限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。具体可以统计下历史的时间花费,把最长的时间为参考进行设置。

(5) 保证消息消费可靠性

一般情况下,还是client 消费 broker 丢消息的场景比较多,想client端消费数据不能丢,肯定是不能使用autoCommit的,所以必须是手动提交的。

Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了,则此时消息就丢失了。

(6) 保证消息消费顺序性

  1. 不同topic(乱序消息):如果支付与订单生成对应不同的topic,只能在consumer层面去处理了。
  2. 同一个topic(乱序消息):一个topic可以对应多个分区,分别对应了多个consumer,与“不同topic”没什么本质上的差别。(可以理解为我们的服务有多个pod,生产者顺序发送消息,但被路由到不同分区,就可能变得乱序了,服务消费的就是无序的消息)
  3. 同一个topic,同一个分区(顺序消息):Kafka的消息在分区内是严格有序的,例如把同一笔订单的所有消息,按照生成的顺序一个个发送到同一个topic的同一个分区。

针对乱序消息:

例如:订单和支付分别封装了各自的消息,但是消费端的业务场景需要按订单消息->支付消息的顺序依次消费消息。

针对顺序消息:

两者都是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。

A. Consumer单线程顺序消费

生产者在发送消息时,已保证消息在分区内有序,一个分区对应了一个消费者,保证了消息消费的顺序性。

B. Consumer多线程顺序消费(具体策略在后面章节)

单线程顺序消费的扩展能力很差。为了提升消费者的处理速度,除了横向扩展分区数,增加消费者外,还可以使用多线程顺序消费。

将接收到的kafka数据进行hash取模(注意:如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。

此外,这里通过配置中心进行开关、动态扩容/缩容线程池。

(7) 处理Consumer的事务

通过事务消息,可以很好的保证一些业务场景的事务逻辑,不会因为网络不可用等原因出现系统之间状态不一致。

当更新任何一个服务出现故障时就抛出异常,事务消息不会被提交或回滚,消息服务器会回调发送端的事务查询接口,确定事务状态,发送端程序可以根据消息的内容对未做完的任务重新执行,然后告诉消息服务器该事务的状态。

3. 集群配置最佳实践

3.1 集群配置

3.2 Topic 评估

3.3 分区配置

设置多个分区在一定程度上是可以提高消费者消费的并发度,但是分区数量过多时可能会带来:句柄开销过大、生产端占用内存过大、可能增加端到端的延迟、影响系统可用性、故障恢复时间较长等问题。

根据吞吐量的要求设置 partition 数:

  1. 假设 Producer 单 partition 的吞吐量为 P
  2. consumer 消费一个 partition 的吞吐量为 C
  3. 而要求的吞吐量为 T
  4. 那么 partition 数至少应该大于 T/P、T/c 的最大值

4. 性能调优

调优目标:高吞吐量、低延时。

4.1 分层调优

自上而下分为应用程序层、框架层、JVM层和操作系统层,层级越靠上,调优的效果越明显。

4.2 吞吐量(TPS)调优

4.3 延时调优

5. 稳定性测试

kafka的稳定性测试主要在业务上线前针对Kafka实例/集群健康性、高可用性的测试。

5.1 健康性检查

(1) 检查实例:查看Kafka 实例对象中拿到所有的信息(例如 IP、端口等);

(2) 测试可用性:访问生产者和消费者,测试连接。

5.2 高可用测试

A. 单节点异常测试:重启Leader副本或Follower副本所在Pod

步骤:

  1. 查看topic的副本信息
  2. 删除相应pod
  3. 脚本检测Kafka的可用性

预期:对生产者和消费者的可用性均无影响。

B. 集群异常测试:重启所有pod

步骤:

  1. 删除所有pod
  2. 脚本检测Kafka的可用性

预期:所有broker ready后服务正常。

二、运行时监控

运行时监控主要包含集群稳定性配置与Kafka监控的最佳实践,旨在及时发现Kafka在运行时产生的相关问题与异常。

1. 集群稳定性监控

1.1 腾讯云CKafka集群配置

合理进行kafka实例配,主要关注这几个数据:

  1. 磁盘容量和峰值带宽
  2. 消息保留时长;
  3. 动态保留策略;

A. 磁盘容量和峰值带宽

可根据实际业务的消息内容大小、发送消息qps等进行预估,可以尽量设置大点;具体数值可根据实例监控查看,如果短时间内磁盘使用百分比就达到较高值,则需扩容。

峰值带宽=最大生产流量*副本数

B. 消息保留时长

消息即使被消费,也会持久化到磁盘存储保留时长的时间。该设置会占用磁盘空间,如果每天消息量很大的话,可适当缩短保留时间。

C. 动态保留策略

推荐开启动态保留设置。当磁盘容量达到阈值,则删除最早的消息,最多删除到保底时长范围外的消息(淘汰策略),可以很大程度避免磁盘被打满的情况。

但有调整时不会主动通知,但我们可以通过配置告警感知磁盘容量的变化。

1.2 自建Kafka集群配置

1.3 资源隔离

A. Broker级别物理隔离

如果不同业务线的 topic 会共享一块磁盘,若某个consumer 出现问题而导致消费产生 lag,进而导致频繁读盘,会影响在同一块磁盘的其他业务线 TP 的写入。

解决:Broker级别物理隔离:创建Topic、迁移Topic、宕机恢复流程

B. RPC队列隔离

Kafka RPC 队列缺少隔离,一旦某个 topic 处理慢,会导致所有请求 hang 住。

解决:需要按照控制流、数据流分离,且数据流要能够按照 topic 做隔离。

  1. 将 call 队列按照拆解成多个,并且为每个 call 队列都分配一个线程池。
  2. 一个队列单独处理 controller 请求的队列(隔离控制流),其余多个队列按照 topic 做 hash 的分散开(数据流之间隔离)。

如果一个 topic 出现问题,则只会阻塞其中的一个 RPC 处理线程池,以及 call 队列,可以保障其他的处理链路是畅通的。

1.4 智能限速

整个限速逻辑实现在 RPC 工作线程处理的末端,一旦 RPC 处理完毕,则通过限速控制模块进行限速检测。

  1. 配置等待时间,之后放入到 delayed queue 中,否则放到 response queue 中。
  2. 放入到 delayed queue 中的请求,等待时间达到后,会被 delayed 线程放入到 response queue 中。
  3. 最终在 response queue 中的请求被返回给 consumer。

2. Kafka监控

2.1 腾讯云CKafka告警

针对CKafka,需要配置告警(此类告警一般为消息积压、可用性、集群/机器健康性等检查)。

A. 指标

如:实例健康状态、节点数量、健康节点数量、问题分区数、生产消息数、消费请求数、jvm内存利用率、平均生产响应时间、分区消费偏移量等。

具体指标可以参考:https://cloud.tencent.com/document/product/597/54514

B. 配置

配置文档:https://cloud.tencent.com/document/product/597/57244

选择监控实例,配置告警内容和阈值。

一般会对当前服务自身的kafka集群做告警配置,但是如果是依赖自身消息的下游服务出现消费问题,我们是感知不到了;而且针对消费端服务不共用同一个集群的情况,出现消息重复发送的问题,服务自身是很难发现的。

C. 预案

在业务上线前,最好梳理下自身服务所涉及的topic消息(上游生产端和下游消费端),并细化告警配置,如果出现上游kafka异常或者下游kafka消息堆积可以及时感知。特别需要把可能有瞬时大量消息的场景(如批量数据导入、定时全量数据同步等)做一定的告警或者预案,避免服务不可用或者影响正常业务消息。

2.2 自建告警平台

通过自建告警平台配置对服务自身的异常告警,其中包括对框架在使用kafka组件时抛出与kafka消费逻辑过程中抛出的业务异常。

其中,可能需要异常升级的情况(由于)单独做下处理(针对spring kafka):

  1. 自定义kafka异常处理器:实现KafkaListenerErrorHandler接口的方法,注册自定义异常监听器,区分业务异常并抛出;
  2. 消费Kafka消息时,将@KafkaListener的errorHandler参数设置为定义的Kafka异常处理器;
  3. 此后,指定的业务异常会被抛出,而不会被封装成Spring kafka的框架异常,导致不能清晰地了解具体异常信息。

2.3 Kafka监控组件

目前业界并没有公认的解决方案,各家都有各自的监控之道。

2.4 Kafka Monitor

其中,Kafka Monitor通过模拟客户端行为,生产和消费数据并采集消息的延迟、错误率和重复率等性能和可用性指标,可以很好地发现下游的消息消费情况进而可以动态地调整消息的发送。(使用过程中需注意对样本覆盖率、功能覆盖率、流量、数据隔离、时延的控制)

Kakfa Monitor 优势:

  1. 通过为每个 Partition 启动单独的生产任务,确保监控覆盖所有 Partition。
  2. 在生产的消息中包含了时间戳、序列号,Kafka Monitor 可以依据这些数据对消息的延迟、丢失率和重复率进行统计。
  3. 通过设定消息生成的频率,来达到控制流量的目的。
  4. 生产的消息在序列化时指定为一个可配置的大小(验证对不同大小数据的处理能力、相同消息大小的性能比较)
  5. 通过设定单独的 Topic 和 Producer ID 来操作 Kafka 集群,可避免污染线上数据,做到一定程度上的数据隔离。

基于Kafka Monitor的设计思想,可以针对业务特点引入对消息的延迟、错误率和重复率等性能的监控告警指标。

三、故障时解决

防微杜渐,遇到问题/故障时有完整的应急预案,以快速定位并解决问题。

1. Kafka消息堆积紧急预案

问题描述:消费端产生消息积压,导致依赖该消息的服务不能及时感知业务变化,导致一些业务逻辑、数据处理出现延迟,容易产生业务阻塞和数据一致性问题。

方案:问题排查、扩容升配策略、消息Topic转换策略、可配置多线程的消费策略。

1.1 问题排查

遇到消息积压时,具体可以从以下几个角度去定位问题原因:

  1. 消息生产端数据量是否存在陡升的情况
  2. 消息消费端消费能力是否有下降
  3. 消息积压是发生在所有的partition还是所有的partition都有积压情况

对于第1、2点导致的消息积压:为暂时性的消息积压,通过扩分区、扩容升配、多线程消费、批量消费等方式提高消费速度能在一定程度上解决这类问题。

对于第3点导致的消息积压:可以采用消息Topic中转策略。

1.2 扩容升配策略

  1. 检查生产端消费发送情况(主要检查是否继续有消息产生、是否存在逻辑缺陷、是否有重复消息发送)
  2. 观察消费端的消费情况(预估下堆积消息的处理清理以及是否有降低趋势)
  3. 若为生产端问题,则评估是否可以通过增加分区数、调整偏移量、删除topic(需要评估影响面)等解决
  4. 消费端新增机器及依赖资源,提高消费能力;
  5. 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

1.3 配置多线程的消费策略

简而言之,即线程池消费+动态线程池配置策略:将接收到的kafka数据进行hash取模(如果kafka分区接受消息已经是取模的了,这里一定要对id做一次hash再取模)发送到不同的队列,然后开启多个线程去消费对应队列里面的数据。

设计思路:

  1. 在应用启动时初始化对应业务的顺序消费线程池(demo中为订单消费线程池)
  2. 订单监听类拉取消息提交任务至线程池中对应的队列
  3. 线程池的线程处理绑定队列中的任务数据
  4. 每个线程处理完任务后增加待提交的offsets标识数
  5. 监听类中校验待提交的offsets数与拉取到的记录数是否相等,如果相等则
  6. 手动提交offset(关闭kafka的自动提交,待本次拉取到的任务处理完成之后再提交位移)

另外,可以根据业务流量调整的线程配置与pod的配置,如高峰期设置一个相对较高的并发级别数用来快速处理消息,平峰期设置一个较小的并发级别数来让出系统资源。这里,可以参考美团提供的一种配置中心修改配置动态设置线程池参数的思路,实现动态的扩容或者缩容。

实现了动态扩容与缩容:

  1. 通过配置中心刷新OrderKafkaListener监听类中的配置concurrent的值,
  2. 通过set方法修改concurrent的值时,先修改stopped的值去停止当前正在执行的线程池。
  3. 执行完毕后通过新的并发级别数新建一个新的线程池,实现了动态扩容与缩容。

此外,还可以新增开关,它设置为true是可以中断启动中的线程池,故障时进行功能开关。

注意: 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

1.4 Topic中转策略

当消息积压是发生在所有的partition还是所有的partition都有积压情况时,只能操作临时扩容,以更快的速度去消费数据了。

设计思路:

  1. 临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。
  2. 然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。
  3. 紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。
  4. 这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。
  5. 等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。

改进:

  1. consumer程序可以写在服务里面;
  2. 指定一个“预案topic”,在服务中预先写好对“预案topic”
  3. 采用策略模式进行”业务topic“->“预案topic”的转换;

注意:

  1. 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。
  2. 需要有个单独的topic转换服务,或修改服务代码,或在事前将多线程逻辑写好或者

2. Kafka消费异常导致消费阻塞

问题描述:某个消息消费异常或者某个操作较为耗时,导致单个pod的消费能力下降,甚至产生阻塞。

方案:设置偏移量;开关多线程的消费策略;

2.1 设置偏移量

  1. 调整偏移量:联系运维,将offset后移一位;
  2. 消息补推:针对跳过的消息或某个时间段内的数据进行消息补推;
  3. 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

2.2 开关多线程的消费策略

参考上面的“可配置多线程的消费策略”,在发生阻塞时开启多线程消费开关。

注:需要修改代码或者在事前将多线程逻辑写好

3. Kafka消息丢失预案

问题描述:服务没有按照预期消费到kafka消息,导致业务产生问题

方案:根因分析;消息补推;

3.1 根因分析

(1) 生产端是否成功发送消费(源头丢失)

解决:需要检查生产端与集群健康性;消息补发。

(2) 是否被成功消费

Consumer自动提交的机制是根据一定的时间间隔,将收到的消息进行commit。commit过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit消息已经提交了。

此外,如果消费逻辑有bug,也导致消息丢失的假象。

解决:修复问题,视情况修改消费确认机制。

(3) 是否有其他服务共用了同一个消费组

多服务误用同一个消费组会导致消息一定比率或规律性丢失。

例如,创建用户的kafka消息,可能价格中心和促销服务误用了一个消费组,导致每个服务都是消费了部分消息,导致一些问题出现偶现的情况。

解决:修改配置,重启服务,各种建立的消费组;事前需要有检查是否有多个服务共用一个消费的情况(检测+比对);

3.2 消息补推

  1. 通过业务影响查询影响的数据信息;
  2. 构建kafka消息,进行消息补偿;
  3. 如果涉及数据一致性问题,需要通过数据比对、对账等功能进行校验。

针对每个对外发送的服务,生产端一般都需要有较为完善的消息补推接口,并且消费端也需要保障消息消费的幂等)

四、其他

1. Kafka成本控制

机器、存储和网络

1.1 机器

需要重新评估你的实例类型决策:你的集群是否饱和?在什么情况下饱和?是否存在其他实例类型,可能比你第一次创建集群时选择的类型更合适?EBS 优化实例与 GP2/3 或 IO2 驱动器的混合是否真的比 i3 或 i3en 机器(及其带来的优势)有更好的性价比?

1.2 存储与网络

压缩在 Kafka 中并不新鲜,大多数用户已经知道了自己可以在 GZIP、Snappy 和 LZ4 之间做出选择。但自从KIP-110被合并进 Kafka,并添加了用于 Zstandard 压缩的压缩器后,它已实现了显著的性能改进,并且是降低网络成本的完美方式。

以生产者端略高的 CPU 使用率为代价,你将获得更高的压缩率并在线上“挤进”更多信息。

Amplitude在他们的帖子中介绍,在切换到 Zstandard 后,他们的带宽使用量减少了三分之二,仅在处理管道上就可以节省每月数万美元的数据传输成本。

1.3 集群

不平衡的集群可能会损害集群性能,导致某些 borker 比其他 broker 的负载更大,让响应延迟更高,并且在某些情况下会导致这些 broker 的资源饱和,从而导致不必要的扩容,进而会影响集群成本。

此外,不平衡集群还面临一个风险:在一个 broker 出故障后出现更高的 MTTR(例如当该 broker 不必要地持有更多分区时),以及更高的数据丢失风险(想象一个复制因子为 2 的主题,其中一个节点由于启动时要加载的 segment 过多,于是难以启动)。

2. 消息消费的幂等

定义:

所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。通俗点来讲就是,在消费者收到重复消息进行重复处理时,也要保证最终结果的一致性。

比如,银行转账、下单等,不管重试多少次,都要保证最终结果一定是一致的。

2.1 利用数据库的唯一约束

将数据库中的多个字段联合,创建一个唯一约束,即使多次操作也能保证表里至多存在一条记录(如创建订单、创建账单、创建流水等)。

此外,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统(如Redis的SETNX)都可以用于实现幂等消费。

2.2 设置前置条件

  1. 给数据变更设置一个前置条件(版本号version、updateTime);
  2. 如果满足条件就更新数据,否则拒绝更新数据;
  3. 在更新数据的时候,同时变更前置条件中的数据(版本号+1、更新updateTime)。

2.3 记录并检查操作

  1. 给每条消息都记录一个全局唯一 ID;
  2. 消费时,先根据这个全局唯一 ID 检查这条消息是否有被消费过;
  3. 如果没有消费过,则更新数据,并将消费状态置为“已消费”状态。

其中,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性。

参考:

[1]https://www.infoq.cn/article/ucSru1uKkSswLXPcjQgC?source=app_share

[2]https://blog.csdn.net/qq_32179907/article/details/122599769

[3]https://blog.csdn.net/qq_32179907/article/details/122599769

[4]https://zhuanlan.zhihu.com/p/513559802?utm_source=wechat_session&utm_medium=social&utm_oi=689250073002930176&utm_campaign=shareopn

[5]https://blog.csdn.net/philip502/article/details/118997899?utm_medium=distribute.wap_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-118997899-blog-125192952.wap_relevant_multi_platform_whitelistv1&spm=1001.2101.3001.4242.1&utm_relevant_index=1

[6]https://www.zhihu.com/question/483747691/answer/2392949203?utm_source=wechat_session&utm_medium=social&utm_oi=689250073002930176&utm_content=group3_Answer&utm_campaign=shareopn

[7]https://zhuanlan.zhihu.com/p/354772550?utm_source=wechat_session&utm_medium=social&utm_oi=689250073002930176&utm_campaign=shareopn

[8]https://www.infoq.cn/article/contrast-with-kafka-and-jingdong-jmq?source=app_share

[9]https://www.infoq.cn/article/BF3mm9haDs-cdHCXOLlf?source=app_share

[10]https://www.infoq.cn/article/wmM8WXzLEgfGMKYpbF0N?source=app_share

[11]https://www.infoq.cn/article/Q0o*QzLQiay31MWiOBJH?source=app_share

展开阅读全文

页面更新:2024-02-28

标签:队列   集群   线程   磁盘   分区   顺序   消费者   消息   业务   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top