博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark- PySparkSQL之PySpark解析Json集合数据
阅读量:6503 次
发布时间:2019-06-24

本文共 2292 字,大约阅读时间需要 7 分钟。

PySparkSQL之PySpark解析Json集合数据

数据样本

12341234123412342|asefr-3423|[{
"name":"spark","score":"65"},{
"name":"airlow","score":"70"},{
"name":"flume","score":"55"},{
"name":"python","score":"33"},{
"name":"scala","score":"44"},{
"name":"java","score":"70"},{
"name":"hdfs","score":"66"},{
"name":"hbase","score":"77"},{
"name":"qq","score":"70"},{
"name":"sun","score":"88"},{
"name":"mysql","score":"96"},{
"name":"php","score":"88"},{
"name":"hive","score":"97"},{
"name":"oozie","score":"45"},{
"name":"meizu","score":"70"},{
"name":"hw","score":"32"},{
"name":"sql","score":"75"},{
"name":"r","score":"64"},{
"name":"mr","score":"83"},{
"name":"kafka","score":"64"},{
"name":"mo","score":"75"},{
"name":"apple","score":"70"},{
"name":"jquery","score":"86"},{
"name":"js","score":"95"},{
"name":"pig","score":"70"}]

正菜:

#-*- coding:utf-8 –*-from __future__ import print_functionfrom pyspark import SparkContextfrom pyspark.sql import SQLContextfrom pyspark.sql.types import Row, StructField, StructType, StringType, IntegerTypeimport sysreload(sys)import jsonif __name__ == "__main__":    sc = SparkContext(appName="PythonSQL")    sqlContext = SQLContext(sc)    fileName = sys.argv[1]    lines = sc.textFile(fileName)    sc.setLogLevel("WARN")    def parse_line(line):        fields=line.split("|",-1)        keyword=fields[2]        return keyword    def parse_json(keyword):        return keyword.replace("[","").replace("]","").replace("},{
","}|{
") keywordRDD = lines.map(parse_line) #print(keywordRDD.take(1)) #print("---------------") jsonlistRDD = keywordRDD.map(parse_json) #print(jsonlistRDD.take(1)) jsonRDD = jsonlistRDD.flatMap(lambda jsonlist:jsonlist.split("|")) schema = StructType([StructField("name", StringType()),StructField("score", IntegerType())]) df = sqlContext.read.schema(schema).json(jsonRDD) # df.printSchema() # df.show() df.registerTempTable("json") df_result = sqlContext.sql("SELECT name,score FROM json WHERE score > 70") df_result.coalesce(1).write.json(sys.argv[2]) sc.stop()

提交作业

spark-submit .\demo2.py "C:\\Users\\txdyl\\Desktop\\test.txt" "c:\\users\\txdyl\\Desktop\\output"

数据结果

 

转载于:https://www.cnblogs.com/RzCong/p/11094784.html

你可能感兴趣的文章
关于 typedef void * POINTER_64 PVOID64;问题
查看>>
电子书下载:Silverlight 4: Problem – Design – Solution
查看>>
2^n的第一位数字 soj 3848 mathprac
查看>>
JavaScript经典代码【二】【javascript判断用户点了鼠标左键还是右键】
查看>>
Commons.net FTPClient 上传文件
查看>>
Azure Redis Cache (5) Redis Cache Cluster集群模式
查看>>
SQL Server 2008 部分改变
查看>>
[转]使用WinINet和WinHTTP
查看>>
【原创】简单的局域网内无线文件传输(1)
查看>>
在Hyper-V下Linux不能使用鼠标
查看>>
Android ListView A~Z快速索引(改进版)
查看>>
利用JQuery制作自定义Alert Box
查看>>
让Eclipse使用jQuery的插件-spket/ jQueryWTP/ Aptana
查看>>
C语言中字符串的处理方式
查看>>
AutoMapper在MVC中的运用04-string映射各种类型、一个属性映射多个属性等
查看>>
ArcGIS帮助文档VS帮助文档不能复制图片的解决方法
查看>>
如何使用C#关键字const,readonly,static
查看>>
使用VS2013分析DMP文件
查看>>
用户管理 之 Linux 用户管理工具介绍
查看>>
JVM原理讲解和调优
查看>>