博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark- PySparkSQL之PySpark解析Json集合数据
阅读量:4448 次
发布时间:2019-06-07

本文共 2292 字,大约阅读时间需要 7 分钟。

PySparkSQL之PySpark解析Json集合数据

数据样本

12341234123412342|asefr-3423|[{
"name":"spark","score":"65"},{
"name":"airlow","score":"70"},{
"name":"flume","score":"55"},{
"name":"python","score":"33"},{
"name":"scala","score":"44"},{
"name":"java","score":"70"},{
"name":"hdfs","score":"66"},{
"name":"hbase","score":"77"},{
"name":"qq","score":"70"},{
"name":"sun","score":"88"},{
"name":"mysql","score":"96"},{
"name":"php","score":"88"},{
"name":"hive","score":"97"},{
"name":"oozie","score":"45"},{
"name":"meizu","score":"70"},{
"name":"hw","score":"32"},{
"name":"sql","score":"75"},{
"name":"r","score":"64"},{
"name":"mr","score":"83"},{
"name":"kafka","score":"64"},{
"name":"mo","score":"75"},{
"name":"apple","score":"70"},{
"name":"jquery","score":"86"},{
"name":"js","score":"95"},{
"name":"pig","score":"70"}]

正菜:

#-*- coding:utf-8 –*-from __future__ import print_functionfrom pyspark import SparkContextfrom pyspark.sql import SQLContextfrom pyspark.sql.types import Row, StructField, StructType, StringType, IntegerTypeimport sysreload(sys)import jsonif __name__ == "__main__":    sc = SparkContext(appName="PythonSQL")    sqlContext = SQLContext(sc)    fileName = sys.argv[1]    lines = sc.textFile(fileName)    sc.setLogLevel("WARN")    def parse_line(line):        fields=line.split("|",-1)        keyword=fields[2]        return keyword    def parse_json(keyword):        return keyword.replace("[","").replace("]","").replace("},{
","}|{
") keywordRDD = lines.map(parse_line) #print(keywordRDD.take(1)) #print("---------------") jsonlistRDD = keywordRDD.map(parse_json) #print(jsonlistRDD.take(1)) jsonRDD = jsonlistRDD.flatMap(lambda jsonlist:jsonlist.split("|")) schema = StructType([StructField("name", StringType()),StructField("score", IntegerType())]) df = sqlContext.read.schema(schema).json(jsonRDD) # df.printSchema() # df.show() df.registerTempTable("json") df_result = sqlContext.sql("SELECT name,score FROM json WHERE score > 70") df_result.coalesce(1).write.json(sys.argv[2]) sc.stop()

提交作业

spark-submit .\demo2.py "C:\\Users\\txdyl\\Desktop\\test.txt" "c:\\users\\txdyl\\Desktop\\output"

数据结果

 

转载于:https://www.cnblogs.com/RzCong/p/11094784.html

你可能感兴趣的文章
H5 65-清除浮动方式一
查看>>
四旋翼姿态解算——基础理论及推导
查看>>
通过用户模型,对数据库进行增删改查操作
查看>>
redis安装使用配置
查看>>
C++数据文件存储与加载(利用opencv)
查看>>
[TensorFlow 2] [Keras] fit()、fit_generator() 和 train_on_batch() 分析与应用
查看>>
Java 编译错误:缺少返回语句
查看>>
leanote使用本地账户时,去掉待同步的小红点
查看>>
整理pandas操作
查看>>
REST-assured 2发送消息代码重构
查看>>
python-面向对象
查看>>
11 go并发编程-上
查看>>
操作DOM树
查看>>
WPF——TargetNullValue(如何在绑定空值显示默认字符)
查看>>
巧用五招提升Discuz!X运行速度
查看>>
01构建之法阅读笔记
查看>>
mac svn命令 linux同样适用
查看>>
jQuery,ctrl+enter组合事件
查看>>
python sendmail
查看>>
centOS7下安装GUI图形界面
查看>>