当前位置: 首页 > news >正文

淘宝宝贝链接怎么做相关网站站长统计性宝app

淘宝宝贝链接怎么做相关网站,站长统计性宝app,公司为什么做网站,广东哪里网站建设在使用org.apache.spark.sql.functions中的Window functions过程中,遇到了几个棘手的问题,经过不断搜寻和多次试验,终于找到了解决方法。 首先看例子: import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SaveMode, R…

在使用org.apache.spark.sql.functions中的Window functions过程中,遇到了几个棘手的问题,经过不断搜寻和多次试验,终于找到了解决方法。

首先看例子:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, Row}
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}object WindowQueryTest {def main(args: Array[String]) {val sc = new SparkContext(new SparkConf().setAppName("WndFnc_demo").setMaster("local"))val hiveContext = new HiveContext(sc)val data = Seq(("A", 4), ("C", 1), ("D", 1), ("B", 2), ("B", 2), ("D", 4), ("A", 1), ("B", 4))val withRowNumbers: Seq[(String, Int, Int)] = data.zipWithIndex.map(e => (e._1._1, e._1._2, e._2))val rdd: RDD[Row] = sc.parallelize(withRowNumbers).map(triplet => Row(triplet._1, triplet._2, triplet._3))hiveContext.sql("DROP TABLE IF EXISTS delme")
hiveContext.sql( """CREATE  TABLE `delme`(
                      `key`  string,`val`  int,`ord`  int)""")val schema = StructType(Seq(StructField("key", StringType),StructField("val", IntegerType), StructField("ord", IntegerType)))hiveContext.createDataFrame(rdd, schema).write.mode(SaveMode.Append).saveAsTable("delme")val qRes = hiveContext.sql("""SELECT key, val
                                            ,MAX(val)OVER(PARTITION BY key) mx,MIN(val)OVER(PARTITION BY key) mn,row_number() OVER(ORDER BY ord desc) revord,rank() OVER(ORDER BY val) rnkFROM delme""")
    qRes.collect().foreach(println)}
}

 

一、初始化必需使用HiveContext

如果初始化的是SQLContext实例:

val sqlContext = new SQLContext(sc)

则会报错,提示必需使用HiveContext:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
……

HiveContext继承自SQLContext。

class HiveContext(sc : org.apache.spark.SparkContext) extends org.apache.spark.sql.SQLContext with org.apache.spark.Logging

 

二、外部库需要添加spark/lib中的三个jar文件依赖

External Libraies必需包含以下三个jar文件,datanucleus-api-jdo, datanucleus-core和datanucleus-rdbms:

image

 

 

 

工程编译时将自动生成metastore_db文件夹和derby.log文件。

image

否则,出现如下错误信息:

16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
16/01/18 15:40:07 WARN HiveMetaStore: Retrying creating default database after error: Unexpected exception caught.
javax.jdo.JDOFatalInternalException: Unexpected exception caught.
……
16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
16/01/18 15:40:07 WARN Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
……
16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
16/01/18 15:40:07 WARN HiveMetaStore: Retrying creating default database after error: Unexpected exception caught.
javax.jdo.JDOFatalInternalException: Unexpected exception caught.
……
16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
……

这三个文件存在于spark/lib中。

 

三、运行配置JVM参数JAVA_OPTS (FATAL!)

看起来Everything is OK。编译执行程序,却发生异常退出,而且只在最后报出main进程异常,没有任何ERROR,很难发现到底是什么原因。

……
Exception in thread "main" 
Process finished with exit code 1

多次执行,会出现如下异常信息,重点在PermGen Space(持久加载区空间大小)。

Exception in thread "main" java.lang.reflect.InvocationTargetExceptionat sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:526)at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)at org.apache.spark.sql.hive.client.IsolatedClientLoader.<init>(IsolatedClientLoader.scala:179)at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)at org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174)at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:177)at WindowQueryTest$.main(WindowQueryTest.scala:14)at WindowQueryTest.main(WindowQueryTest.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:606)at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.OutOfMemoryError: PermGen spaceat org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236)at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)... 18 moreProcess finished with exit code 1

再次编译执行,还可能出现更长的异常信息,错误可能会变化,但万变不离其宗,症结依旧是PermGen Space的大小!

解决方法:在Run Configuration中添加JVM options:-server -Xms512M -Xmx1024M -XX:PermSize=256M -XX:MaxNewSize=512M -XX:MaxPermSize=512M

image

各个参数可以根据具体机器配置调整。

 

四、WindowSpec指定窗口设置

再看这个列子:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}object WindowFunctions {def main(args: Array[String]) {val conf = new SparkConf().setAppName("Window Functions").setMaster("local")val sc = new SparkContext(conf)val hiveContext = new HiveContext(sc)import hiveContext.implicits._val l = (1997, 1) :: (1997, 4) :: (1998, 2) :: (1998, 3) :: (1999, 9) :: Nilval df = sc.parallelize(l).toDF("k", "v")val w = Window.orderBy($"k")val df1 = df.withColumn("No", rowNumber().over(w))val rowW = w.rowsBetween(-2, 0)val rangeW = w.rangeBetween(-1, 0)df1.withColumn("row", avg($"v").over(rowW)).withColumn("range", avg($"v").over(rangeW)).showsc.stop()}
}

得到结果:

image

 

org.apache.spark.sql.expressions.Window定义WindowSpec,并指定分组或者排序。

@org.apache.spark.annotation.Experimental
object Window extends scala.AnyRef {@scala.annotation.varargsdef partitionBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }@scala.annotation.varargsdef partitionBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }@scala.annotation.varargsdef orderBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }@scala.annotation.varargsdef orderBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
}

定义的WindowSpec可以调用rowsBetween或者rangeBetween设置偏移量,定义窗口的区间范围;甚至也可以重置分组和排序。

@org.apache.spark.annotation.Experimental
class WindowSpec private[sql] (partitionSpec : scala.Seq[org.apache.spark.sql.catalyst.expressions.Expression], orderSpec : scala.Seq[org.apache.spark.sql.catalyst.expressions.SortOrder], frame : org.apache.spark.sql.catalyst.expressions.WindowFrame) extends scala.AnyRef {@scala.annotation.varargsdef partitionBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }@scala.annotation.varargsdef partitionBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }@scala.annotation.varargsdef orderBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }@scala.annotation.varargsdef orderBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }def rowsBetween(start : scala.Long, end : scala.Long) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }def rangeBetween(start : scala.Long, end : scala.Long) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }private[sql] def withAggregate(aggregate : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = { /* compiled code */ }
}

最后通过具体的窗口函数计算得到需要的列。

 

References:

[1] https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

[2] http://www.cnblogs.com/mingforyou/archive/2012/03/03/2378143.html

[3] http://sonra.io/window-functions-aka-analytic-functions-in-spark/

 

END

转载于:https://www.cnblogs.com/kevingu/p/5140242.html

http://www.lbrq.cn/news/2744155.html

相关文章:

  • 生鲜网站建设规划书样板广告推广平台网站
  • ps做的网站怎样在dw里打开sem竞价
  • windows2008网站资阳地seo
  • 全flash网站淘宝seo具体优化方法
  • 番禺网站建设公司seo发帖软件
  • 唐山网站建设费用升华网络推广软件
  • 教你做企业网站网络推广有哪些方法
  • 无为县城乡建设局网站首页厦门人才网app
  • 湘潭网站优化最新的新闻 最新消息
  • 如何利用网站做淘宝客班级优化大师官网
  • 怎么做网页商城快速优化关键词排名
  • 奶茶网站建设方案模板网络的推广
  • 投资建设项目管理师报名网站建立营销型网站
  • wordpress手机中文版下载地址谷歌seo优化排名
  • 黄金网站app软件下载安装免费seo自学网站
  • 可以做彩页的网站百度信息流投放
  • 可以建设个人网站百度网站登录
  • 网站开发行业提升神马关键词排名报价
  • 东营建网站wordpress建站公司
  • php做的网站怎么调试软文推广文章案例
  • 什么是品牌网站友情链接交换条件
  • 网图素材库seo的实现方式
  • 做网站找我图片seo优化广告
  • 模板网站音响案例baidu 百度一下
  • b2c 电子商务网站的经营特点企业网站排名优化公司
  • 做网站有的浏览器网站维护一般怎么做
  • 保险设计素材网站关于华大18年专注seo服务网站制作应用开发
  • 国内新闻最近新闻今天aso优化排名
  • 泰安建设网站短视频运营方案策划书
  • 黑龙江省seo网络营销案例分析
  • C++零拷贝网络编程实战:从理论到生产环境的性能优化之路
  • 安卓14系统应用收不到开机广播
  • 焊接机器人保护气体效率优化
  • 口播数字人免费API调用方案
  • 从根本上解决MAC权限问题(关闭sip)
  • Linux中的日志管理