当前位置: 首页 > news >正文

叮当快药网站谁做的/品牌推广

叮当快药网站谁做的,品牌推广,安卓开发,wordpress聊天系统案例介绍与编程实现1. 案例介绍该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。2. …
案例介绍与编程实现

1. 案例介绍

该案例中,我们假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。

2. 案例分析

对于某一个访问论坛的用户,我们需要对他的行为数据做一个抽象,以便于解释网页话题热度的计算过程。

首先,我们通过一个向量来定义用户对于某个网页的行为即点击的网页,停留时间,以及是否点赞,可以表示如下:

(page001.html, 1, 0.5, 1)

向量的第一项表示网页的 ID,第二项表示从进入网站到离开对该网页的点击次数,第三项表示停留时间,以分钟为单位,第四项是代表是否点赞,1 为赞,-1 表示踩,0 表示中立。

其次,我们再按照各个行为对计算网页话题热度的贡献,给其设定一个权重,在本文中,我们假设点击次数权重是 0.8,因为用户可能是由于没有其他更好的话题,所以再次浏览这个话题。停留时间权重是 0.8,因为用户可能同时打开多个 tab 页,但他真正关注的只是其中一个话题。是否点赞权重是 1,因为这一般表示用户对该网页的话题很有兴趣。

最后,我们定义用下列公式计算某条行为数据对于该网页热度的贡献值。

f(x,y,z)=0.8x+0.8y+z

那么对于上面的行为数据 (page001.html, 1, 0.5, 1),利用公式可得:

H(page001)=f(x,y,z)= 0.8x+0.8y+z=0.8*1+0.8*0.5+1*1=2.2

读者可以留意到,在这个过程中,我们忽略了用户本身,也就是说我们不关注用户是谁,而只关注它对于网页热度所做的贡献。

3. 生产行为数据消息

在本案例中我们将使用一段程序来模拟用户行为,该程序每隔 5 秒钟会随机的向 user-behavior-topic 主题推送 0 到 50 条行为数据消息,显然,这个程序扮演消息生产者的角色,在实际应用中,这个功能一般会由一个系统来提供。为了简化消息处理,我们定义消息的格式如下:

网页 ID|点击次数|停留时间 (分钟)|是否点赞

并假设该网站只有 100 个网页。以下是该类的 Scala 实现源码。

UserBehaviorMsgProducer 类源码

import scala.util.Random
import java.util.Properties
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.producer.Producerclass UserBehaviorMsgProducer(brokers: String, topic: String) extends Runnable {private val brokerList = brokersprivate val targetTopic = topicprivate val props = new Properties()props.put("metadata.broker.list", this.brokerList)props.put("serializer.class", "kafka.serializer.StringEncoder")props.put("producer.type", "async")private val config = new ProducerConfig(this.props)private val producer = new Producer[String, String](this.config)private val PAGE_NUM = 100private val MAX_MSG_NUM = 3private val MAX_CLICK_TIME = 5private val MAX_STAY_TIME = 10//Like,1;Dislike -1;No Feeling 0private val LIKE_OR_NOT = Array[Int](1, 0, -1)def run(): Unit = {val rand = new Random()while (true) {//how many user behavior messages will be producedval msgNum = rand.nextInt(MAX_MSG_NUM) + 1try {//generate the message with format like page1|2|7.123|1for (i <- 0 to msgNum) {var msg = new StringBuilder()msg.append("page" + (rand.nextInt(PAGE_NUM) + 1))msg.append("|")msg.append(rand.nextInt(MAX_CLICK_TIME) + 1)msg.append("|")msg.append(rand.nextInt(MAX_CLICK_TIME) + rand.nextFloat())msg.append("|")msg.append(LIKE_OR_NOT(rand.nextInt(3)))println(msg.toString())//send the generated message to brokersendMessage(msg.toString())}println("%d user behavior messages produced.".format(msgNum+1))} catch {case e: Exception => println(e)}try {//sleep for 5 seconds after send a micro batch of messageThread.sleep(5000)} catch {case e: Exception => println(e)}}}def sendMessage(message: String) = {try {val data = new KeyedMessage[String, String](this.topic, message);producer.send(data);} catch {case e:Exception => println(e)}}
}
object UserBehaviorMsgProducerClient {def main(args: Array[String]) {if (args.length < 2) {println("Usage:UserBehaviorMsgProducerClient 192.168.1.1:9092 user-behavior-topic")System.exit(1)}//start the message producer threadnew Thread(new UserBehaviorMsgProducer(args(0), args(1))).start()}
}

4. 编写 Spark Streaming 程序消费消息
在弄清楚了要解决的问题之后,就可以开始编码实现了。对于本案例中的问题,在实现上的基本步骤如下:

    构建 Spark 的 StreamingContext 实例,并且开启 checkpoint 功能。因为我们需要使用 updateStateByKey 原语去累计的更新网页话题的热度值。
    利用 Spark 提供的 KafkaUtils.createStream 方法消费消息主题,这个方法会返回 ReceiverInputDStream 对象实例。
    对于每一条消息,利用上文的公式计算网页话题的热度值。
    定义一个匿名函数去把网页热度上一次的计算结果值和新计算的值相加,得到最新的热度值。
    调用 updateStateByKey 原语并传入上面定义的匿名函数更新网页热度值。
    最后得到最新结果后,需要对结果进行排序,最后打印热度值最高的 10 个网页。

源代码如下。
WebPagePopularityValueCalculator 类源码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.Durationobject WebPagePopularityValueCalculator {private val checkpointDir = "popularity-data-checkpoint"private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"def main(args: Array[String]) {if (args.length < 2) {println("Usage:WebPagePopularityValueCalculator zkserver1:2181,zkserver2:2181,zkserver3:2181 consumeMsgDataTimeInterval(secs)")System.exit(1)}val Array(zkServers,processingInterval) = argsval conf = new SparkConf().setAppName("Web Page Popularity Value Calculator")val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpointssc.checkpoint(checkpointDir)val kafkaStream = KafkaUtils.createStream(//Spark streaming contextssc,//zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,...zkServers,//kafka message consumer group IDmsgConsumerGroup,//Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own threadMap("user-behavior-topic" -> 3))val msgDataRDD = kafkaStream.map(_._2)//for debug use only//println("Coming data in this interval...")//msgDataRDD.print()// e.g page37|5|1.5119122|-1val popularityData = msgDataRDD.map { msgLine =>{val dataArr: Array[String] = msgLine.split("\\|")val pageID = dataArr(0)//calculate the popularity valueval popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1(pageID, popValue)}}//sum the previous popularity value and current valueval updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {iterator.flatMap(t => {val newValue:Double = t._2.sumval stateValue:Double = t._3.getOrElse(0);Some(newValue + stateValue)}.map(sumedValue => (t._1, sumedValue)))}val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)//set the checkpoint interval to avoid too frequently data checkpoint which may//may significantly reduce operation throughputstateDstream.checkpoint(Duration(8*processingInterval.toInt*1000))//after calculation, we need to sort the result and only show the top 10 hot pagesstateDstream.foreachRDD { rdd => {val sortedData = rdd.map{ case (k,v) => (v,k) }.sortByKey(false)val topKData = sortedData.take(10).map{ case (v,k) => (k,v) }topKData.foreach(x => {println(x)}) }}ssc.start()ssc.awaitTermination()}
}

WebPagePopularityValueCalculator 类启动命令

bin/spark-submit \
--jars $SPARK_HOME/lib/spark-streaming-kafka_2.10-1.3.1.jar, \
$SPARK_HOME/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar, \
$SPARK_HOME/lib/kafka_2.10-0.8.2.1.jar, \
$SPARK_HOME/lib/kafka-clients-0.8.2.1.jar \ 
--class com.ibm.spark.exercise.streaming.WebPagePopularityValueCalculator 
--master spark://<spark_master_ip>:7077 \
--num-executors 4 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 2

由于程序中我们要用到或者间接调用 Kafka 的 API,并且需要调用 Spark Streaming 集成 Kafka 的 API(KafkaUtils.createStream), 所以需要提前将启动命令中的 jar 包上传到 Spark 集群的每个机器上 (本例中我们将它们上传到 Spark 安装目录的 lib 目录下,即$SPARK_HOME/lib),并在启动命令中引用它们。

http://www.lbrq.cn/news/1314145.html

相关文章:

  • 新网站怎么做权重/中国十大互联网公司排名
  • 建设网站的技术难点/危机舆情公关公司
  • 派出所web网站建设策划案/微信引流推广
  • 网站被入侵后需做的检测 1/无锡网站优化公司
  • 家庭路由器建个人网站/怎么优化网站
  • wordpress怎么改后台/京东关键词优化技巧
  • 做动画 的 网站有哪些内容/百度怎么联系客服
  • 网站备案手机号/深圳电子网络推广查询
  • 新网站怎么做排名/seo查询系统
  • 一个做网站编程的条件/爱站工具下载
  • 郴州网站设计公司/win10优化大师好用吗
  • 网络公司网站模版/奉化网站关键词优化费用
  • 大型建站网站/百度免费推广平台
  • 全球云邮登陆网站/网页设计与制作作业成品
  • 线上网站制作/培训平台有哪些
  • 贵州住房和城乡建设委员会网站/山东建站
  • 哪些网站是做采购的/竞价关键词排名软件
  • 根据网站日志做seo/做推广网络
  • 电脑有固定IP 做网站/关键词推广营销
  • 江苏专业网站建设/建站推广
  • 本地最好的网站开发建设公司/nba排名最新赛程
  • 网络管理专业/魔方优化大师官网
  • 免费网站安全/优化大师官方网站
  • 宁波建网站报价/企业网站优化方案
  • 网站建设军成/百度云网盘资源搜索引擎入口
  • 用flask做网站/合肥seo优化公司
  • 山东手机网站建设公司/莱阳seo排名
  • 建设厅安全员证书查询网站/开淘宝店铺怎么运营推广
  • 在统计局网站上如何做图表/企业培训课程视频
  • 网络营销网站建设知识/深圳seo公司
  • 从12kW到800V,AI服务器电源架构变革下,功率器件如何解题?
  • 分支战略论:Git版本森林中的生存法则
  • Mysql数据库学习--多表查询
  • [Nagios Core] 通知系统 | 事件代理 | NEB模块,事件,回调
  • 代码随想录算法训练营第四十九天|单调栈part2
  • 深度学习-循环神经网络RNN