当前位置: 首页 > news >正文

贵阳市网站做的最好的/简单的网页设计作品

贵阳市网站做的最好的,简单的网页设计作品,axcure做网站ui,宁波专业制作网站参考文章: https://blog.csdn.net/qq_20641565/article/details/76216417 今天模拟实现 broadcastJoin 的时候突然意识到了这个点,对 Spark 的 Cache 做个总结。 问题 在Spark中有时候我们很多地方都会用到同一个RDD, 按照常规的做法的话,那么每个地方遇…

参考文章:

https://blog.csdn.net/qq_20641565/article/details/76216417

 

今天模拟实现 broadcastJoin 的时候突然意识到了这个点,对 Spark 的 Cache 做个总结。

 

问题

   

     在Spark中有时候我们很多地方都会用到同一个RDD, 按照常规的做法的话,那么每个地方遇到Action操作的时候都会对同一个算子计算多次。这样会造成效率低下的问题 !!!! 

 

常见 transform , action 算子 =>

https://blog.csdn.net/u010003835/article/details/106341908

 

例如:

val rdd1 = sc.textFile("xxx")

rdd1.xxxxx.xxxx.collect

rdd1.xxx.xxcollect

 

 

方法


   上面就是两个代码都用到了rdd1这个RDD,如果程序执行的话,那么sc.textFile(“xxx”)就要被执行两次,  可以把rdd1的结果进行cache到内存中,使用如下方法

val rdd1 = sc.textFile("xxx")

val rdd2 = rdd1.cache

rdd2.xxxxx.xxxx.collect

rdd2.xxx.xxcollect

 

示例

例如 如下Demo

package com.spark.test.offline.skewed_dataimport org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.types.{StructField, _}
import org.apache.spark.sql.{Row, SparkSession}import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random/*** Created by szh on 2020/6/5.*/
object JOINSkewedData2 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConfsparkConf.setAppName("JOINSkewedData").set("spark.sql.autoBroadcastJoinThreshold", "1048576") //1M broadcastJOIN//.set("spark.sql.autoBroadcastJoinThreshold", "104857600") //100M broadcastJOIN.set("spark.sql.shuffle.partitions", "3")if (args.length > 0 && args(0).equals("ide")) {sparkConf.setMaster("local[3]")}val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")//sparkContext.setCheckpointDir("")val userArr = new ArrayBuffer[(Int, String)]()val nameArr = Array[String]("sun", "zhen", "hua", "kk", "cc")val threshold = 1000000for (i <- 1 to threshold) {var id = 10if (i < (threshold * 0.9)) {id = 1} else {id = i}val name = nameArr(Random.nextInt(5))userArr.+=((id, name))}val rddA = sparkContext.parallelize(userArr)//spark.sql("CACHE TABLE userA")//-----------------------------------------//---------------------------------------val arrList = new ArrayBuffer[(Int, Int)]for (i <- 1 to (threshold * 0.1).toInt) {val id = ival salary = Random.nextInt(100)arrList.+=((id, salary))}val rddB = sparkContext.parallelize(arrList)val broadData: Broadcast[Array[(Int, Int)]] = sparkContext.broadcast(rddB.collect())import scala.util.control._val resultRdd = rddA.mapPartitions(arr => {val broadVal = broadData.valuevar rowArr = new ArrayBuffer[Row]()val broadMap = new mutable.HashMap[Int, Int]()while (arr.hasNext) {val x = arr.nextval loop = new Breaksvar rRow: Row = null//var rRow: Option[Row] = Noneloop.breakable(for (tmpVal <- broadVal) {if (tmpVal._1 == x._1) {rRow = Row(tmpVal._1, x._2, tmpVal._2)//println(rRow)loop.break}})if (rRow != null) {rowArr.+=(rRow)rRow = null}}println(rowArr.size)rowArr.iterator})//      .filter(x => {//        x match {//          case None => false//          case _ => true//        }//      })val resultStruct = StructType(Array(StructField("uid", IntegerType, nullable = true), StructField("name", StringType, nullable = true), StructField("salary", IntegerType, nullable = true)))spark.createDataFrame(resultRdd, resultStruct).createOrReplaceTempView("resultB")val resultDF = spark.sql("SELECT uid, name, salary FROM resultB")//resultDF.checkpoint()resultDF.cache()resultDF.foreach(x => {val i = 1})println(resultDF.count())resultDF.show()resultDF.explain(true)Thread.sleep(60 * 10 * 1000)sparkContext.stop()}}

注意其中 

    resultDF.foreach(x => {
      val i = 1
    })

    println(resultDF.count())

    resultDF.show()

foreach, count , show  是 3个 Action 操作 !!

不对 resultDF 进行 cache, 整个任务的执行时间 如下图 :

 

 

对 resultDF 进行 cache, 整个任务的执行时间 如下图 :

对比上图,可以清楚的看到没有进行 cache, count 对上游又重新计算了一遍多了20多秒 !!!!!

 

http://www.lbrq.cn/news/840799.html

相关文章:

  • 做seo网站的步骤/百度一下照片识别
  • 建设牌安全带官方网站/百度网站推广排名优化
  • 如何自己做门户网站/网络营销方法
  • 网站被模仿怎么办/东莞seo靠谱
  • 公司做网站的费属于广告费么/百度指数可以用来干什么
  • 做黄色网站的违法吗/建立网站用什么软件
  • 网站建设用php建设优点/自助优化排名工具
  • wordpress单本小说主题/关键词首页优化
  • 网站建设的前后台代码/商丘网站优化公司
  • 图片站 wordpress/十大seo公司
  • 医药网站备案/媒体发稿公司
  • 做seo网站标题用什么符号/网盘搜索神器
  • 自己建立网站/百度客服号码
  • 做ps的素材哪个网站/好的推广平台
  • 十大购物网站排行榜/长沙百度seo
  • 惠州最专业的网站建设公司/网站优化及推广
  • wordpress 自动 tag/seo优化实训总结
  • 个人主页url指的是什么/seo排名赚app下载
  • 网站开发是分为前端和后端吗/南昌seo搜索优化
  • 镇江百度网站建设/郑州网络公司
  • 天津微信网站开发/如何做网络推广外包
  • 为什么要给企业建设网站/广告开户南京seo
  • 网站效果图制作/宁德市人民政府
  • 如何做简洁网站设计/企业网站如何优化
  • 上海交通大学网站建设/宁波网络推广优化公司
  • 怎样搭建web网站/在百度上怎么打广告
  • 广州市企业网站建设企业/网站排名优化外包公司
  • 做网站自动赚钱/友情链接网址
  • 网站建设是用自己的服务器/网站外包一般多少钱啊
  • 手机html5网站模板/网推项目
  • 基于dcmtk的dicom工具 第二章 图像接受StoreSCP(2)
  • 从函数调用到进程通信:Linux下的多语言协作实践
  • excel 通过openpyxl表格下载和插入图片
  • Linux操作系统从入门到实战(九)Linux开发工具(中)自动化构建-make/Makefile知识讲解
  • 如何设计实现开发自助重启工具-01-设计篇
  • Redis作缓存时存在的问题及其解决方案