博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Mark : spark streaming去重
阅读量:4216 次
发布时间:2019-05-26

本文共 2624 字,大约阅读时间需要 8 分钟。

spark体系除其spark core外,还有spark streaming实时计算、spark SQL结构化数据、MLib机器学习、GraphX图计算四大组件。其中spark streaming用对数据按时间分片的方式,来实现近似的流计算,我们在项目中用来对日志数据进行处理。

问题场景

由于客观原因,后台系统在记录的日志中有重复数据,重复数据的生成时间一般间隔在1s之内,在spark对日志数据进行消费处理时,需要对数据去重,减少对HBase的写入。

问题的难点在于一个batch的数据,除了自身内要去重外,还要和相邻时间分片的batch数据进行去重。更具体一点:当前时间片的一条数据,若上一时间片存在,则丢弃;若上一时间片不存在,则存储。

本文给出两种思路,均已验证能稳定运行:

  1. window + time方式
  2. remember+ subtract(或leftOuterJoin)方式

window+time方式

和相邻时间片的数据进行计算,spark提供了window的方式。常用的如groupByKeyAndWindow、reduceByKeyAndWindow等。

在该处运用window时,稍稍复杂一点,就是需要区分出待计算的数据是上一时间片的还是当前时间片的,这就需要用到spark提供的time机制了。常用的带time的操作有transform、transformToPair等。

因此,思路是在对数据进行转换时,用transform方式对数据赋予time属性,在window操作中根据time属性对数据进行取舍。

示例代码如下:

kafkaStream.transformToPair((rdd, time) -> {    return rdd.mapToPair((putInfo -> {        putInfo.setTime(time); // 设置time        return new Tuple2<>(putInfo.getKey(), putInfo);    }));}).reduceByKeyAndWindow((putInfo1, putInfo2) -> {    return putInfo1.getTime().less(putInfo2.getTime()) ? putInfo1 : putInfo2; // 保留time小的数据} , Durations.seconds(Constants.DURATION * 2), Durations.seconds(Constants.DURATION)// format).transformToPair((rdd, time) -> {    return rdd.filter(tuple -> {        return tuple._2().getTime().greaterEq(time);// time大于等于当前时间分片,说明是新数据    });}).foreachRDD(rdd -> {// 存入hbase});

remember+join方式

“当前时间片的一条数据,若上一时间片存在,则丢弃;若上一时间片不存在,则存储”,这样看来,只要保存了上一时间片的数据,然后通过取当前时间片与上一时间片的差集就可以了。

保存上一时间片的数据,spark提供了remember机制。如:

ssc.remember(Durations.seconds(2)); //保存2s的数据,包含当前interval
  • 1

在进行取差集之前,应先进行一次时间片内的reduce操作:

stream.reduceByKey((putInfo1, putInfo2) -> {    return putInfo1;})

取当前时间片与上一时间片的差集,有两种方式:

  • 集合操作subtractByKey,如:
rdd.subtractByKey(lastRdd);
  • 通过左连接的方式,如:
rdd.leftOuterJoin(lastRdd).filter(tuple -> {    Optional.empty().equals(tuple._2()._2()); // 说明lastRdd中不存在});

但无论哪种方式,都需要在开始初始化一个空的lastRdd,并且需要在当前task处理完成后,将当前rdd保存为lastRdd。代码整体大致如下:

ssc.remember(Durations.seconds(2)); //保存2s的数据,包含当前intervalfinal JavaPairRDD
[] lastRdd = new JavaPairRDD[] { JavaPairRDD.fromJavaRDD(sc.emptyRDD()) }; // 为使用final且使得lastRdd允许赋值,采用了数组形式。stream.reduceByKey((putInfo1, putInfo2) -> { return putInfo1;}).transform(tuple -> { JavaRDD
temp = tuple.leftOuterJoin(lastRdd[0]).filter(tuple2 -> { return Optional.empty().equals(tuple2._2._2);// 说明lastRdd中不存在 }).map(tuple2 -> { return tuple2._2._1; });// 或如下subtractByKey方式:// JavaRDD
temp = tuple.subtractByKey(oldData[0]).map(t1 -> {// return t1._2;// }); lastRdd[0] = tuple; //当前rdd保存为lastRdd return temp;})

开始担心lastRdd这一全局变量的多线程同步问题,经验证后知道多虑了,spark对rdd数据妥善进行了处理。另外leftOuterJoin方式要比subtract方式耗时要高,长时间运行时加上GC时间,会不太稳定

转载地址:http://gkvmi.baihongyu.com/

你可能感兴趣的文章
MySQL MyISAM引擎的读锁与写锁
查看>>
面向对象与面向过程的本质的区别
查看>>
Java语言有哪些特点?
查看>>
idea创建maven项目并关联gitee
查看>>
HashMap和Hashtable的区别
查看>>
JVM 对 Java 的原生锁做了哪些优化?
查看>>
JAVA实现简单的阻塞队列
查看>>
我的2020
查看>>
idea快捷键使用
查看>>
2.1MAC协议概述
查看>>
2.3 WSN的MAC协议
查看>>
图解后缀表达式的计算过程
查看>>
栈与队列的应用——计算表达式的值
查看>>
静态链表——sharing
查看>>
静态链表——sorting
查看>>
DFS——背包问题
查看>>
DFS——选数问题
查看>>
BFS——求矩阵中“块”的个数
查看>>
BFS——走迷宫的最小步数
查看>>
并查集——好朋友
查看>>