Flink实时消费Kafka中的binlog日志乱码(反序列化失败)

业务场景:

Canal将mysql的binlog日志上报到Kafka,通过Flink实时程序消费该binlog信息进行实时展示。

异常信息:

数据消费正常,但是显示乱码。

初步诊断:

Canal上报binlog到Kafka时做了序列化,而代码中反序列化失败。

源码如下:

import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringEncoder, SimpleStringSchema}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties

object MyFlinkDemo2 {
  def main(args: Array[String]): Unit = {

    println("**************BEGIN**************")
    //创建流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//并行度设置
env.setParallelism(1)


    // ===============================================
    // ==============     Kafka配置信息    ============
    // ===============================================
    //kafka中读取数据
val properties = new Properties()

    //Kafka开发集群
properties.setProperty("bootstrap.servers", "bigdata:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("auto.offset.reset", "latest")
    properties.setProperty("deserializer.encoding", "utf-8")

    // ===============================================
    // ==============   Flink连接Kafka   ==============
    // ===============================================
    // .Net binlog日志
val stream = env.addSource(new FlinkKafkaConsumer[String]("TestTopic", new SimpleStringSchema(), properties))

    stream.print()

    // ==============================================
    // ==============   程序执行入口   ================
    // ==============================================
env.execute("MyFlinkDemo2")

    println("***************END*********************")
  }
}

结果返回:

Flink实时消费Kafka中的binlog日志乱码(反序列化失败)

回答

请用byte[]接收Kafka,再处理字符集

没有地方指定编码格式吗