Flink简介
Flink是什么
- Flink: 在德语中含义是快, 灵活.
- Apache Flink: 是一个框架和分布式处理引擎, 用于对无界和有界数据流进行状态计算.
- Flink的目标: 低延迟, 高吞吐, 结果准确, 容错性好
- Flink的用户: 电商和市场营销, 物联网, 电信, 银行和金融
- 与spark streaming的区别: spark streaming是微批处理, 延迟相对高
- 流处理的架构演变: 从lambda架构(使用两套系统:Batch Layer(准确性)与Speed Layer(流式处理, 低延迟), 同时保证低延迟和结果准确)到Flink架构
- Flink的主要特点:
- 事件驱动
- 基于流的世界观
- 分层API: 提供了不同层级的API, 越顶层越抽象, 表达简明使用方便; 越底层越具体, 表达丰富使用灵活.
- Flink的其他特点:
- 低延迟, 高可用, 动态扩展, exactly-once
Flink stream小试
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//socket stream
val textDataStream = env.socketTextStream("localhost",7777)
//read data from stream
val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(0)
.sum(1)
wordCountDataStream.print()
env.execute("stream word count")
}
}
Flink的并行度默认等于CPU虚拟处理器数, 可以通过wordCountDataStream.print().setParallelism(1)
设置.
Flink.conf
taskmanager.numberOfTaskSlots: 能力上能达到的并行度 parallelism.default: 实际并行度
Flink部署模式
- Standalone模式: 启动Flink standalone客户端:
./bin/start-cluster.sh
- Yarn模式(生产中常用): ./yarn-session -n(taskmanager的数量) 2 -s(每个TaskManager的slot数量) 2 -jm(jobManager的内存) 1024 -nm(appName) test -d(后台执行)
- Kubernetes部署: 容器化部署, 基于docker镜像, 需要启动JobManager, TaskManager, JobManagerServices
Flink运行架构
Flink运行时的组件
JobManager: 控制流处理应用程序的主进程 TaskManager: 一个TaskManager包含一定数量的插槽(slot), slot限制了TaskManager能执行的任务数量 ResourceManager: 管理slot, 不同环境有不同的资源管理器 Dispatcher: 接受用户操作请求, 为应用提供了REST接口
Flink流处理API
流式处理的主要流程: Environment > Source > Transform > Sink
Environment
- dataset/datastream分别为微批/流处理处理方式 普通创建方式: StreamExecutionEnvironment.getExecutionEnvironment 更底层的创建方式:
- createLocalEnvironment: StreamExecutionEnvironment.createLocalEnvironment(1) //1为并行度
- createRemoteEnvironment: StreamExecutionEnvironment.createRemoteEnvironment(“jobmanager-hostname”,port,”yourpath//wordcound.jar”)
但是这种方式不常用, getExecutionEnvironment会自动判断坏境而给出对应的environment.
Source
以下是4种Source的Demo code
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaConsumerBase}
import scala.util.Random
//温度传感器样例类
case class SensorReading(id: String, time: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1. 从自定义集合读取数据
val stream1 = env.fromCollection(List(
SensorReading("sensor_1", 123212312, 12321.21312),
SensorReading("sensor_2", 12312312, 12321.21312),
SensorReading("sensor_3", 1211113, 12321.21312),
SensorReading("sensor_4", 123, 12321.21312)
))
// stream1.print("stream1").setParallelism(6)
// env.execute("source test")
//2. 从文件读取数据
val stream2 = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\learn\\src\\main\\java\\ink\\cyz\\learn\\scala\\hello.txt")
// stream2.print("stream2").setParallelism(6)
// env.execute("source Test")
//3. 从kafka中读取数据
var properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "1205")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
// stream3.print("stream3").setParallelism(1)
// env.execute("source test")
//自定义Source
val stream4 = env.addSource(new SensorSource())
stream4.print("stream4").setParallelism(1)
env.execute(" source test ")
}
class SensorSource() extends SourceFunction[SensorReading] {
var running: Boolean = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
val rand = new Random()
var curTemp = 1.to(10).map(
i => ("sensor_" + i, 60 + rand.nextGaussian() * 20)
)
//产生数据流
while (running) {
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian())
)
//获取当前时间
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(500)
}
}
}
}
Transform
转换算子包括:
- map
- flatMap
- Fliter
- KeyBy: DataStream > KeyedStream
- Rolling Aggregation: 针对于KeyedStream
- Reduce
- Split, Select: 将dataStream用split拆分成多个datqStream, 然后通过select选取dataStream进行操作
- Connect, CoMap: 合并两条类型不同的流
- Union: 合并多条类型相同的流 ```scala import org.apache.flink.streaming.api.scala._
object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)
val streamFromFile = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\learn\\src\\main\\java\\ink\\cyz\\learn\\scala\\hello.txt");
val dataStream: DataStream[SensorReading] = streamFromFile.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}) // .keyBy(0) // .sum(2)
// .keyBy(_.id) // .sum(“temperature”) val aggregateStream = dataStream .keyBy(“id”) .reduce((x,y)=>SensorReading(x.id,x.time+1,y.temperature+10))
//split, select操作
val splitStream = dataStream.split(data=>{
if(data.temperature>30)Seq("high")
else Seq("low")
})
// splitStream.select(“high”).print() val low = splitStream.select(“low”) val high = splitStream.select(“high”) val all = splitStream.select(“high”,”low”) // low.print(“low”)
// dataStream.print() // env.execute(“transform test”)
//ConnectedStream: 一国两制, 只能合并两条流
val warning = high.map(data=>(data.id,data.temperature))
val connectedStream = warning.connect(low)
val coMapDataStream = connectedStream.map(
warningStream => (warningStream._1, warningStream._2, "warning"),
lowData =>(lowData.id, "healthy")
)
// coMapDataStream.print() // env.execute(“coMapData”)
// Union可以合并多条流, 数据类型必须一致 val unionStream = coMapDataStream.union(coMapDataStream,coMapDataStream) unionStream.print(“union”) env.execute(“union”) } }
## 支持的数据类型
Flink支持所有的Java和Scala基础数据类型, Pojo, Arryas, List, Maps, Enums
## 实现UDF函数, 更细粒度的控制流
Flink暴露了所有的udf函数的接口, 例如MapFunction, FilterFunction, ProcessFunction
## Sink
对外的输出操作都要利用Sink完成
stream.addSink(new MySink(xxx))
```scala
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("C:\\Users\\Administrator\\IdeaProjects\\learn\\src\\main\\java\\ink\\cyz\\learn\\scala\\hello.txt")
val dataStream = inputStream
.map(
data=>{
val dataArray = data.split(",")
SensorReading( dataArray(0).trim,dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString
}
)
dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092","sinkTest",new SimpleStringSchema()) )
dataStream.print("sink test")
env.execute(" kafka sink test")
}
}
JDBC Sink
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
// 定义SQL连接 预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("")
insertStmt = conn.prepareStatement("")
updateStmt = conn.prepareStatement("")
}
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit ={
updateStmt.setDouble(1,value.temperature)
updateStmt.execute()
}
}
Window
- 怎么处理无界流? 将流数据分发到有限大小的桶中进行分析
Window类型
- Time Window
- 滚动: 等间隔划分窗口, 时间对齐, 没有重叠
- 应用: 计算固定时间段的销售额
- 滑动: 由固定的窗口长度和滑动间隔组成, 可以重叠
- 应用: 股票的涨跌
- 会话窗口: 固定间隙
- 应用: 客服对话窗口
- Count Window
- 应用: 客服对话窗口
- 滚动
- 滑动
窗口函数
- 增量聚合函数
- 每条数据到来就进行计算, 保持一个简单的状态
- ReduceFunction, AggregateFunction
- 全窗口函数
- 等所有数据收集完毕再遍历数据
- ProcessWindowFunction
Watermark
- 避免乱序数据带来计算不正确
- watermark表示timestamp小于watermark的数据均已经到达