盐城做企业网站的价格/百度热搜榜单
sparkstreaming参数配置设置
spark.streaming.receiver.writeAheadLog.enable 防止读取数据丢失参数设置为true
然后persist(缓存在内存中StorageLevel.MEMORY_AND_DISK)
conf.set(“spark.streaming.stopGracefullyOnShutdown”,“true”)//优雅的关闭
conf.set(“spark.streaming.receiver.writeAheadLog.enable”,“true”)//防止数据丢失
conf.set(“spark.streaming.backpressure.enabled”,“true”)//激活削峰功能
conf.set(“spark.streaming.backpressure.initialRate”,firstCount.toString)//第一次读取的最大数据值
conf.set(“spark.streaming.kafka.maxRatePerPartition”,threadCount.toString)//每个进程每秒最多从kafka读取的数据条数
conf.set(“spark.mongodb.input.uri”, Property.getProperty(“bigScreenInUri1”))
conf.set(“spark.mongodb.output.uri”, Property.getProperty(“bigScreenOutUri1”))
conf.set(“spark.streaming.kafka.consumer.poll.ms”,“10000”)//拉取数据超时时间
批量写入MongoDB($inc 的用法就是原有基础上在增加多少)
val mongoConnector = MongoConnector(writeAllTrade.asOptions)allTrades.foreachPartition(iter => if (iter.nonEmpty) {val writeConfig = WriteConfig(Map("database" -> Property.getProperty("resultDatabase"), "collection" -> Property.getProperty("bigScreenAllTradeTable"), "writeConcern.w" -> "1","spark.mongodb.output.uri"->Property.getProperty("uri")))mongoConnector.withCollectionDo(writeConfig, {collection: MongoCollection[BsonDocument] =>iter.grouped(writeConfig.maxBatchSize).foreach(batch => {val updateOptions = new UpdateOptions().upsert(true)val requests = batch.map(doc =>{val queryDocument = new BsonDocument()queryDocument.append("_id", doc.get("_id"))doc.remove("_id")new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$inc", doc), updateOptions)})collection.bulkWrite(requests.toList.asJava)})})})
StreamUtils.stopByMarkFile(server,ssc,hdfs_file_path) //方式二通过扫描HDFS文件来优雅的关闭
/***
* 通过一个消息文件来定时触发是否需要关闭流程序
* @param ssc StreamingContext
*/
def stopByMarkFile(server :Server,ssc:StreamingContext,hdfs_file_path :String):Unit= {
val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
var isStop = false
while (!isStop) {isStop = ssc.awaitTerminationOrTimeout(intervalMills)if (!isStop && isExistsMarkFile(hdfs_file_path)) {server.stop()log.warn("2秒后开始关闭sparstreaming程序.....")Thread.sleep(2000)ssc.stop(true, true)}
//更新 $set 的用法和特性(没有就自动添加一条,有就更新)
mercTradesByDay.foreachPartition(iter => if (iter.nonEmpty) {val writeConfig = WriteConfig(Map("database" -> Property.getProperty("resultDatabase"), "collection" -> Property.getProperty("bigScreenByMercTable"), "writeConcern.w" -> "1","spark.mongodb.output.uri"->Property.getProperty("uri")))mongoConnector.withCollectionDo(writeConfig, {collection: MongoCollection[BsonDocument] =>iter.grouped(writeConfig.maxBatchSize).foreach(batch => {val updateOptions = new UpdateOptions().upsert(true)val requests = batch.map(doc =>{val queryDocument = new BsonDocument()queryDocument.append("_id", doc.get("_id"))var value = new BasicDBObject("$set", new BasicDBObject("MERC_NAME",doc.get("MERC_NAME")))new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), updateOptions)})collection.bulkWrite(requests.toList.asJava)})})})package com.eptok.scala.offline.testmimport com.mongodb.spark.MongoSpark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.Document
/*
-
@author: ljx
-
@date: 2019/11/13
-
@description:
*/object Demo {def main(args: Array[String]): Unit = {//spark操作mongodb入门val spark = SparkSession.builder() .master("local[2]").appName("ConnAppTest").config("spark.mongodb.input.uri", "mongodb://interface_manager:2wsxCDE#@10.213.:50000,1 0.213.32.85:50000/xzq_test.collection?authSource=admin") // 指定 mongodb输入.config("spark.mongodb.output.uri", "mongodb://interface_manager:2wsxCDE#@10.213.32.:50000,10.213.35:50000/xzq_test.collection?authSource=admin") // 指定mongodb输出.getOrCreate()// 生成测试数据val documents = spark.sparkContext.parallelize((8 to 10).map(i => Document.parse(s"{test: $i}")))// 存储数据到mongodbMongoSpark.save(documents)// // 加载数据val rdd: DataFrame = MongoSpark.load(spark)// // 打印输出rdd.show}}