当前位置: 首页 > news >正文

盐城做企业网站的价格/百度热搜榜单

盐城做企业网站的价格,百度热搜榜单,平台公司名称,wap网站建设方案sparkstreaming参数配置设置 spark.streaming.receiver.writeAheadLog.enable 防止读取数据丢失参数设置为true 然后persist(缓存在内存中StorageLevel.MEMORY_AND_DISK) conf.set(“spark.streaming.stopGracefullyOnShutdown”,“true”)//优雅的关闭…

sparkstreaming参数配置设置

spark.streaming.receiver.writeAheadLog.enable 防止读取数据丢失参数设置为true
然后persist(缓存在内存中StorageLevel.MEMORY_AND_DISK)
conf.set(“spark.streaming.stopGracefullyOnShutdown”,“true”)//优雅的关闭
conf.set(“spark.streaming.receiver.writeAheadLog.enable”,“true”)//防止数据丢失
conf.set(“spark.streaming.backpressure.enabled”,“true”)//激活削峰功能
conf.set(“spark.streaming.backpressure.initialRate”,firstCount.toString)//第一次读取的最大数据值
conf.set(“spark.streaming.kafka.maxRatePerPartition”,threadCount.toString)//每个进程每秒最多从kafka读取的数据条数
conf.set(“spark.mongodb.input.uri”, Property.getProperty(“bigScreenInUri1”))
conf.set(“spark.mongodb.output.uri”, Property.getProperty(“bigScreenOutUri1”))
conf.set(“spark.streaming.kafka.consumer.poll.ms”,“10000”)//拉取数据超时时间

批量写入MongoDB($inc 的用法就是原有基础上在增加多少)

val mongoConnector = MongoConnector(writeAllTrade.asOptions)allTrades.foreachPartition(iter => if (iter.nonEmpty) {val writeConfig = WriteConfig(Map("database" -> Property.getProperty("resultDatabase"), "collection" ->  Property.getProperty("bigScreenAllTradeTable"), "writeConcern.w" -> "1","spark.mongodb.output.uri"->Property.getProperty("uri")))mongoConnector.withCollectionDo(writeConfig, {collection: MongoCollection[BsonDocument] =>iter.grouped(writeConfig.maxBatchSize).foreach(batch => {val updateOptions = new UpdateOptions().upsert(true)val requests = batch.map(doc =>{val queryDocument = new BsonDocument()queryDocument.append("_id", doc.get("_id"))doc.remove("_id")new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$inc", doc), updateOptions)})collection.bulkWrite(requests.toList.asJava)})})})

StreamUtils.stopByMarkFile(server,ssc,hdfs_file_path) //方式二通过扫描HDFS文件来优雅的关闭
/***
* 通过一个消息文件来定时触发是否需要关闭流程序
* @param ssc StreamingContext
*/

 def stopByMarkFile(server :Server,ssc:StreamingContext,hdfs_file_path :String):Unit= {
val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
var isStop = false
while (!isStop) {isStop = ssc.awaitTerminationOrTimeout(intervalMills)if (!isStop && isExistsMarkFile(hdfs_file_path)) {server.stop()log.warn("2秒后开始关闭sparstreaming程序.....")Thread.sleep(2000)ssc.stop(true, true)}

//更新 $set 的用法和特性(没有就自动添加一条,有就更新)

mercTradesByDay.foreachPartition(iter => if (iter.nonEmpty) {val writeConfig = WriteConfig(Map("database" -> Property.getProperty("resultDatabase"), "collection" ->  Property.getProperty("bigScreenByMercTable"), "writeConcern.w" -> "1","spark.mongodb.output.uri"->Property.getProperty("uri")))mongoConnector.withCollectionDo(writeConfig, {collection: MongoCollection[BsonDocument] =>iter.grouped(writeConfig.maxBatchSize).foreach(batch => {val updateOptions = new UpdateOptions().upsert(true)val requests = batch.map(doc =>{val queryDocument = new BsonDocument()queryDocument.append("_id", doc.get("_id"))var value = new BasicDBObject("$set", new BasicDBObject("MERC_NAME",doc.get("MERC_NAME")))new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), updateOptions)})collection.bulkWrite(requests.toList.asJava)})})})package com.eptok.scala.offline.testmimport com.mongodb.spark.MongoSpark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document

/*

  • @author: ljx

  • @date: 2019/11/13

  • @description:
    */

     	object Demo {def main(args: Array[String]): Unit = {//spark操作mongodb入门val spark = SparkSession.builder()
    .master("local[2]").appName("ConnAppTest").config("spark.mongodb.input.uri", 	"mongodb://interface_manager:2wsxCDE#@10.213.:50000,1	0.213.32.85:50000/xzq_test.collection?authSource=admin") // 指定	mongodb输入.config("spark.mongodb.output.uri", "mongodb://interface_manager:2wsxCDE#@10.213.32.:50000,10.213.35:50000/xzq_test.collection?authSource=admin") // 指定mongodb输出.getOrCreate()// 生成测试数据val documents = spark.sparkContext.parallelize((8 to 10).map(i => Document.parse(s"{test: $i}")))// 存储数据到mongodbMongoSpark.save(documents)//    // 加载数据val rdd: DataFrame = MongoSpark.load(spark)//    // 打印输出rdd.show}}
    
http://www.lbrq.cn/news/1451701.html

相关文章:

  • 怎样做自己可以发布消息的网站/如何免费自己创建网站
  • 珠海网站制作网络推广/网搜网
  • 青岛中嘉建设集团网站/温州网站建设优化
  • 哪些网站做推广好/360关键词推广
  • 玉林市建设工程交易中心网站/百度收录提交入口网址
  • 简历制作在线/seo关键词排名查询
  • 中山祥云做的网站/合肥网络推广网络运营
  • 网站建设方法/百度云网盘资源搜索引擎
  • 网上帮别人做网站/抖音代运营公司
  • 展示型企业网站开发/郑州做网站推广哪家好
  • 包头怎样做网站/什么是搜索推广
  • 做美国直邮物流网站/企业网站有哪些
  • 手机 网站 源码/2020十大网络热词
  • 自助建站帮助网/百度下载免费官方安装
  • 学做网站需要买什么书/北京seo服务销售
  • 建设企业网站公司机构官网/网络营销做的好的企业
  • 网络运维培训/宁波网站关键词优化排名
  • 做网站开发服务商/建网站要多少钱
  • 怎么找网站url地址/广东网络推广运营
  • raid管理网站开发/aso优化公司
  • 上海企业网站建站/人工智能培训心得体会
  • 鞍山制作网站/株洲seo优化报价
  • 网站建设跟推广评价指标有什么关系/网店代运营靠谱吗
  • 昌平电子网站建设/河北seo技术交流
  • wordpress全站静态化/百度一下首页问问
  • 网站转移空间备案是不是就没有了/三只松鼠的软文范例
  • 网站主页的要素/seo方案
  • otc场外交易网站开发/seo学徒
  • 新翼设计网站建设公司/成都网站关键词推广
  • 网站 建设 申请/北京网站seo费用
  • 调试|谷歌浏览器调试长连接|调试SSE和websocket
  • Redis——常用指令汇总指南(三)(哈希类型)
  • ORA-12514:TNS: 监听程序当前无法识别连接描述符中请求的服务
  • SpringBoot3.x入门到精通系列:2.5 整合 MyBatis 详解
  • C语言数据结构(7)贪吃蛇项目2.贪吃蛇项目实现
  • 大语言模型涉及的一些概念(持续更新)