大数据Flink Transformation

目录

  • ​​1 官网API列​​
  • ​​2 基本操作-略​​
  • ​​2.1 map​​
  • ​​2.2 flatMap​​
  • ​​2.3 keyBy​​
  • ​​2.4 filter​​
  • ​​2.5 sum​​
  • ​​2.6 reduce​​
  • ​​2.7 代码演示​​
  • ​​3 合并-拆分​​
  • ​​flink什么意思3.1 union系统/运维和connect​​
  • ​​3.html个人网页完整代码2 split、select和Side Outputs​​
  • ​​4 分区html5​​
  • ​​4.1 rebala数据恢复nce重平衡分区​​
  • ​​4.2 其他分区​​

1 官网APIflink教程


                                            大数据Flink Transformation

整体来说,html文件怎么打开流式数据上的操作可以分为四类。

第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)

第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个系统运维包括哪些内容小时内的所有订单记录的成交量flink什么意思加到一起。为了支持这种类型的identification操作,就得通过 Window 将需要的记录关联到一起进行处理

第三类是对多个流进行操作并转换为单个流。例如,多个流可以通flink和kafka区别过 Union、Join 或 Connect等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新windows许可证即将过期怎么办的统一的流,从而可以进行数据漫游是什么意思一些跨流的操作。

最后, DataStream 还支identify持与合并对称的拆windows10分操作,即把一个flink和kafka区别流按一定规则拆分为多个流

(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

2 基本操作-略

2identification.1 map

⚫ Awindows许可证即将过期怎么办PI

map:将函数作用在集合中的每一个元素上,并返回作用后的结果
                                            大数据Flink Transformation

2.2 flatMap

⚫ API

flatMap:将集合中的每个元素变成一个或多个元素,并返windows7旗舰版回扁平化之后的结果
                                            大数据Flink Transformation

2.3 keyBy

按照指定的key来对流中的数据进行分组windows更新有必要吗,前面入门案例中已经演示过

注意:

流处理中没有groupBy,而是keyBy
                                            大数据Flink Transformation

2.flink和spark对比4 filter

⚫ API

filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
                                            大数据Flink Transformation

2.5 sumflink

⚫ API

sum:按照指定的字段对集合中的元素进行求和

2.6 reduce

⚫ API

reduce:对集合中的元素进行聚合
                                            大数据Flink Transformation

2.html7 代码演示

需求:

对流数据中的单词进行统计,排除敏感词heihei

⚫ 代码演示

package cn.oldlu.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* Author oldlu
* Desc
*/
public class TransformationDemo01 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2.source
DataStream<String> linesDS = env.socketTextStream("node1", 9999);

//3.处理数据-transformation
DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//value就是一行行的数据
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);//将切割处理的一个个的单词收集起来并返回
}
}
});
DataStream<String> filtedDS = wordsDS.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return !value.equals("heihei");
}
});
DataStream<Tuple2<String, Integer>> wordAndOnesDS = filtedDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
//value就是进来一个个的单词
return Tuple2.of(value, 1);
}
});
//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);

DataStream<Tuple2<String, Integer>> result1 = groupedDS.sum(1);
DataStream<Tuple2<String, Integer>> result2 = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value1.f1);
}
});

//4.输出结果-sink
result1.print("result1");
result2.print("result2");

//5.触发执行-execute
env.execute();
}
}

3 合并-拆分

3html标签属性大全.1 union和connect

⚫ API

union:

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可flink应用场景以将多个DataStream[T]

合并为一个新的DataStreflink教程am[T]。数据将按照先进先出(Fiflinkrst In First Out)的模式合并windows11有必要升级吗,且不去

重。
                                            大数据Flink Transformation

connect:

conApachenecflink和spark对比t提供了和union类似的功能,用来系统运维工作内容连接两个数据Apache流,它与union的区别在于:connect只能连接两个数据流,union可以连接多个数据流。cflink面试onnect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为C数据分析onnectedStreams,ConnectedStreams会对两个流的数据应用不同的windows11有必要升级吗处理方法,且双流之间可以共享状态。
                                            大数据Flink Transformation

需求

将两个String类型的流进行union将一个String类型和一个Long类型的流进行connec系统/运维t

⚫ 代码实现

package cn.oldlu.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
* Author oldlu
* Desc
*/
public class TransformationDemo02 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.Source
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);

//3.Transformation
ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);
//interface CoMapFunction<IN1, IN2, OUT>
DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {
@Override
public String map1(String value) throws Exception {
return "String->String:" + value;
}

@Override
public String map2(Long value) throws Exception {
return "Long->String:" + value.toString();
}
});

//4.Sink
result1.print();
result2.print();

//5.execute
env.execute();
}
}

3.2 split、select和Side Outputs

⚫ API

Split就是将一个流分成多个流Select就是获取分流后对应的数据

注意:split函数已过期并移除

S数据科学与大数据技术ide Ouflink应用场景tputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的Ouwindows怎么激活tputTag中

⚫ 需求:

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

⚫ 代码实现:

package cn.oldlu.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
* Author oldlu
* Desc
*/
public class TransformationDemo03 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.Source
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

//3.Transformation
/*SplitStream<Integer> splitResult = ds.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
//value是进来的数字
if (value % 2 == 0) {
//偶数
ArrayList<String> list = new ArrayList<>();
list.add("偶数");
return list;
} else {
//奇数
ArrayList<String> list = new ArrayList<>();
list.add("奇数");
return list;
}
}
});
DataStream<Integer> evenResult = splitResult.select("偶数");
DataStream<Integer> oddResult = splitResult.select("奇数");*/

//定义两个输出标签
OutputTag<Integer> tag_even = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));
OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇数"){};
//对ds中的数据进行处理
SingleOutputStreamOperator<Integer> tagResult = ds.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
//偶数
ctx.output(tag_even, value);
} else {
//奇数
ctx.output(tag_odd, value);
}
}
});

//取出标记好的数据
DataStream<Integer> evenResult = tagResult.getSideOutput(tag_even);
DataStream<Integer> oddResult = tagResult.getSideOutput(tag_odd);

//4.Sink
evenResult.print("偶数");
oddResult.print("奇数");

//5.execute
env.execute();
}
}

4 分区

4.1 rebalance重平衡分区

⚫ API

类似于Spark中的rep系统运维包括哪些内容artition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据html标签属性大全倾斜apache服务的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;
                                            大数据Flink Transformation

所以在实际的工作中,出数据透视表现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)
                                            大数据Flink Transformation

⚫ 代码演示:

package cn.oldlu.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* Author oldlu
* Desc
*/
public class TransformationDemo04 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC).setParallelism(3);

//2.source
DataStream<Long> longDS = env.fromSequence(0, 100);

//3.Transformation
//下面的操作相当于将数据随机分配一下,有可能出现数据倾斜
DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long num) throws Exception {
return num > 10;
}
});

//接下来使用map操作,将数据转为(分区编号/子任务编号, 数据)
//Rich表示多功能的,比MapFunction要多一些API可以供我们使用
DataStream<Tuple2<Integer, Integer>> result1 = filterDS
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
//获取分区编号/子任务编号
int id = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(id, 1);
}
}).keyBy(t -> t.f0).sum(1);

DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
//获取分区编号/子任务编号
int id = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(id, 1);
}
}).keyBy(t -> t.f0).sum(1);

//4.sink
//result1.print();//有可能出现数据倾斜
result2.print();//在输出前进行了rebalance重分区平衡,解决了数据倾斜

//5.execute
env.execute();
}
}

4.2 其他分区

⚫ API

说明:

recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operahtml是什么意思tor的每个实例。

举例:

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出flink面试到下游另两个并行度上。若上游windows10关闭自动更新并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录apache是什么意思输出到下游另一个并行度上。

⚫ 需求:

对流中的元素使用各种分区,并输出

⚫ 代码实现

package cn.oldlu.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* Author oldlu
* Desc
*/
public class TransformationDemo05 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//2.Source
DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});

//3.Transformation
DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.equals("hello") ? 0 : 1;
}
}, t -> t.f0);

//4.sink
//result1.print();
//result2.print();
//result3.print();
//result4.print();
//result5.print();
//result6.print();
result7.print();

//5.execute
env.execute();
}
}