Flink简介

Flink是什么

  1. Flink: 在德语中含义是快, 灵活.
  2. Apache Flink: 是一个框架和分布式处理引擎, 用于对无界和有界数据流进行状态计算.
  3. Flink的目标: 低延迟, 高吞吐, 结果准确, 容错性好
  4. Flink的用户: 电商和市场营销, 物联网, 电信, 银行和金融
  5. 与spark streaming的区别: spark streaming是微批处理, 延迟相对高
  6. 流处理的架构演变: 从lambda架构(使用两套系统:Batch Layer(准确性)与Speed Layer(流式处理, 低延迟), 同时保证低延迟和结果准确)到Flink架构
  7. Flink的主要特点:
    • 事件驱动
    • 基于流的世界观
    • 分层API: 提供了不同层级的API, 越顶层越抽象, 表达简明使用方便; 越底层越具体, 表达丰富使用灵活.
  8. Flink的其他特点:
    • 低延迟, 高可用, 动态扩展, exactly-once
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //socket stream
    val textDataStream = env.socketTextStream("localhost",7777)

    //read data from stream
    val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
      .filter(_.nonEmpty)
      .map((_,1))
      .keyBy(0)
      .sum(1)

    wordCountDataStream.print()

    env.execute("stream word count")
  }
}

Flink的并行度默认等于CPU虚拟处理器数, 可以通过wordCountDataStream.print().setParallelism(1)设置.

Flink.conf

taskmanager.numberOfTaskSlots: 能力上能达到的并行度 parallelism.default: 实际并行度

Flink部署模式

  1. Standalone模式: 启动Flink standalone客户端: ./bin/start-cluster.sh
  2. Yarn模式(生产中常用): ./yarn-session -n(taskmanager的数量) 2 -s(每个TaskManager的slot数量) 2 -jm(jobManager的内存) 1024 -nm(appName) test -d(后台执行)
  3. Kubernetes部署: 容器化部署, 基于docker镜像, 需要启动JobManager, TaskManager, JobManagerServices

Flink运行架构

Flink运行时的组件

JobManager: 控制流处理应用程序的主进程 TaskManager: 一个TaskManager包含一定数量的插槽(slot), slot限制了TaskManager能执行的任务数量 ResourceManager: 管理slot, 不同环境有不同的资源管理器 Dispatcher: 接受用户操作请求, 为应用提供了REST接口

Flink流处理API

流式处理的主要流程: Environment > Source > Transform > Sink

Environment

  • dataset/datastream分别为微批/流处理处理方式 普通创建方式: StreamExecutionEnvironment.getExecutionEnvironment 更底层的创建方式:
  • createLocalEnvironment: StreamExecutionEnvironment.createLocalEnvironment(1) //1为并行度
  • createRemoteEnvironment: StreamExecutionEnvironment.createRemoteEnvironment(“jobmanager-hostname”,port,”yourpath//wordcound.jar”)

但是这种方式不常用, getExecutionEnvironment会自动判断坏境而给出对应的environment.

Source

以下是4种Source的Demo code

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaConsumerBase}

import scala.util.Random

//温度传感器样例类
case class SensorReading(id: String, time: Long, temperature: Double)

object SourceTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1. 从自定义集合读取数据
    val stream1 = env.fromCollection(List(
      SensorReading("sensor_1", 123212312, 12321.21312),
      SensorReading("sensor_2", 12312312, 12321.21312),
      SensorReading("sensor_3", 1211113, 12321.21312),
      SensorReading("sensor_4", 123, 12321.21312)
    ))
    //    stream1.print("stream1").setParallelism(6)
    //    env.execute("source test")

    //2. 从文件读取数据
    val stream2 = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\learn\\src\\main\\java\\ink\\cyz\\learn\\scala\\hello.txt")

    //    stream2.print("stream2").setParallelism(6)

    //    env.execute("source Test")


    //3. 从kafka中读取数据
    var properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "1205")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    //    stream3.print("stream3").setParallelism(1)
    //    env.execute("source test")

    //自定义Source
    val stream4 = env.addSource(new SensorSource())
    stream4.print("stream4").setParallelism(1)
    env.execute(" source test ")

  }

  class SensorSource() extends SourceFunction[SensorReading] {
    var running: Boolean = true

    override def cancel(): Unit = {
      running = false
    }

    override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
      val rand = new Random()
      var curTemp = 1.to(10).map(
        i => ("sensor_" + i, 60 + rand.nextGaussian() * 20)
      )
      //产生数据流
      while (running) {
        curTemp = curTemp.map(
          t => (t._1, t._2 + rand.nextGaussian())
        )
        //获取当前时间
        val curTime = System.currentTimeMillis()
        curTemp.foreach(
          t => ctx.collect(SensorReading(t._1, curTime, t._2))
        )
        Thread.sleep(500)
      }
    }
  }

}

Transform

转换算子包括:

  1. map
  2. flatMap
  3. Fliter
  4. KeyBy: DataStream > KeyedStream
  5. Rolling Aggregation: 针对于KeyedStream
  6. Reduce
  7. Split, Select: 将dataStream用split拆分成多个datqStream, 然后通过select选取dataStream进行操作
  8. Connect, CoMap: 合并两条类型不同的流
  9. Union: 合并多条类型相同的流 ```scala import org.apache.flink.streaming.api.scala._

object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)

val streamFromFile = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\learn\\src\\main\\java\\ink\\cyz\\learn\\scala\\hello.txt");

val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
  val dataArray = data.split(",")
  SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}) //      .keyBy(0) //      .sum(2)

// .keyBy(_.id) // .sum(“temperature”) val aggregateStream = dataStream .keyBy(“id”) .reduce((x,y)=>SensorReading(x.id,x.time+1,y.temperature+10))

//split, select操作
val splitStream = dataStream.split(data=>{
  if(data.temperature>30)Seq("high")
  else Seq("low")
})

// splitStream.select(“high”).print() val low = splitStream.select(“low”) val high = splitStream.select(“high”) val all = splitStream.select(“high”,”low”) // low.print(“low”)

// dataStream.print() // env.execute(“transform test”)

//ConnectedStream: 一国两制, 只能合并两条流
val warning = high.map(data=>(data.id,data.temperature))
val connectedStream = warning.connect(low)

val coMapDataStream = connectedStream.map(
  warningStream => (warningStream._1, warningStream._2, "warning"),
  lowData =>(lowData.id, "healthy")
)

// coMapDataStream.print() // env.execute(“coMapData”)

// Union可以合并多条流, 数据类型必须一致 val unionStream = coMapDataStream.union(coMapDataStream,coMapDataStream) unionStream.print(“union”) env.execute(“union”) } }

## 支持的数据类型

Flink支持所有的Java和Scala基础数据类型, Pojo, Arryas, List, Maps, Enums

## 实现UDF函数, 更细粒度的控制流
Flink暴露了所有的udf函数的接口, 例如MapFunction, FilterFunction, ProcessFunction

## Sink
对外的输出操作都要利用Sink完成
stream.addSink(new MySink(xxx))

```scala
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\learn\\src\\main\\java\\ink\\cyz\\learn\\scala\\hello.txt")
    val dataStream = inputStream
        .map(
          data=>{
            val dataArray = data.split(",")
            SensorReading( dataArray(0).trim,dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString
          }
        )
    dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092","sinkTest",new SimpleStringSchema()) )
    dataStream.print("sink test")
    env.execute(" kafka sink test")
  }
}

JDBC Sink

class MyJdbcSink() extends RichSinkFunction[SensorReading]{
//  定义SQL连接 预编译器
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("")
    insertStmt = conn.prepareStatement("")
    updateStmt = conn.prepareStatement("")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit ={
    updateStmt.setDouble(1,value.temperature)
    updateStmt.execute()
  }
}

Window

  • 怎么处理无界流? 将流数据分发到有限大小的桶中进行分析

    Window类型

    1. Time Window
    • 滚动: 等间隔划分窗口, 时间对齐, 没有重叠
      • 应用: 计算固定时间段的销售额
    • 滑动: 由固定的窗口长度和滑动间隔组成, 可以重叠
      • 应用: 股票的涨跌
    • 会话窗口: 固定间隙
      • 应用: 客服对话窗口
        1. Count Window
    • 滚动
    • 滑动

      窗口函数

      1. 增量聚合函数
    • 每条数据到来就进行计算, 保持一个简单的状态
    • ReduceFunction, AggregateFunction
      1. 全窗口函数
    • 等所有数据收集完毕再遍历数据
    • ProcessWindowFunction
graph LR DataStream--keyBy-->KeyedStream KeyedStream--window-->windowedStream

Watermark

  • 避免乱序数据带来计算不正确
  • watermark表示timestamp小于watermark的数据均已经到达