博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
<Spark><Programming><Loading and Saving Your Data>
阅读量:5374 次
发布时间:2019-06-15

本文共 3066 字,大约阅读时间需要 10 分钟。

Motivation

  • Spark是基于Hadoop可用的生态系统构建的,因此Spark可以通过Hadoop MapReduce的InputFormat和OutputFormat接口存取数据。
  • Spark所提供的上层接口有这几类:
    • File formats and filesystems: 对于存储在本地或分布式系统的数据,比如NFS,HDFS,Amazon S3。Spark可以访问多种数据格式,包括text,JSON,SequenceFiles,protocol buffers。
    • Structured data sources through Spark SQL: Spark SQL模块提供了结构化数据结构的API,包括JSON和Apache Hive
    • Databases and key/value pairs:包括内置的和第三方的库,可以用来连接Cassandra,HBase,Elasticsearch以及JDBC数据库。

File Formats

  • Spark提供大量的数据格式来使数据加载和存储更加方便。这些包括了非结构化(比如文本),半结构化(比如JSON),结构化(比如序列文件)。
  • 格式名称 结构化 评论
    Text files 非结构化 Plain old text files. Records are assumed to be one per line.
    JSON 半结构化 普通的基于文本的格式,半结构化;大多数库需要每行一个记录
    CSV 结构化 非常常见的基于文本的格式,经常和电子表格应用一起使用
    SequenceFils 结构化 一个常见的Hadoop文件格式,用于key/value数据
    Protocol buffers 结构化 一个快速的,space-efficient多语言格式
    Object files 结构化

    Useful for saving data from a Spark job to be consumed by shared code. Breaks if you change your classes, as it relies on Java Serialization. 

Text Files

  • 在Spark中,加载和存储文本文件十分方便。当你加载一个文本文件作为RDD时,每一行都变成RDD中的一个元素。
  • 也可以一次加载多个text files到一个pair RDD,key就是文件名,value是文件内容

Loading text file

  • val input = sc.textFile("file:///home/holden/repos/spark/README.md")
  • 可以通过制定minPartitions来控制partitions的数量
  • 当我们指定目录而非文件时,有两种处理方式:
    • 使用textFile()方法:那么将会把所有部分加载到RDD
    • 使用wholeTextFiles()方法:当我们需要知道输入来自哪个文件或需要一次处理一个文件时,wholeTextFiles()方法会返回一个pair RDD,其中key是文件名。
    • # get the average value per fileval input = sc.wholeTextFiles("file://home/holden/salesFiles")val result = input.mapValues{ y =>    val nums = y.split(" ".map(x => x.toDouble)    nums.sum / nums.size.toDouble}

Saving text files

  • result.saveAsTextFile(outputfile)

JSON

  • 最简单的加载JSON数据的方式是作为文本加载,然后使用JSON parser来映射值。同样的,也可以使用preferred JSON序列化库来将值作为strings写出。
  • 在JAVA和sacla中我们也可以使用自定义的Hadoop format来操作JSON数据

Loading JSON

  • 上面说的以文本方式加载JSON的方法,只能用于每行一个JSON的情况。如果你想处理多行的JSON,你只能加载整个文件,然后parse每一行,这时候如果构建一个JSON parser是昂贵的的话,你可以使用mapPartitions()方法来reuse the parser.
  • # loading JSON in Scalaimport com.fasterxml.jackson.module.scala.DefaultScalaModuleimport com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapperimport com.fasterxml.jackson.databind.ObjectMapperimport com.fasterxml.jackson.databind.DeserializationFeature...case class Person(name: String, lovesPandas: Boolean) // Must be a top-level class ...// Parse it into a specific case class. We use flatMap to handle errors// by returning an empty list (None) if we encounter an issue and a// list with one element if everything is ok (Some(_)).val result = input.flatMap(record => {try {Some(mapper.readValue(record, classOf[Person]))} catch {case e: Exception => None}})
  • 处理错误格式记录是一个大问题。如果你是简单地skip错误的数据,那么你最好使用accumulators来追踪错误数目

Saving JSON

  • # example of python(data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)) .saveAsTextFile(outputFile))# example of scalaresult.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)) .saveAsTextFile(outputFile)

     

Comma-Seperated Values and Tab-Separated Values

  • Comma-separated value(CSV) are supposed to contain a fixed number of fields per line, and the fields是逗号分隔的(或是tab分隔的TSV文件)。
  • P79   TBD....

 

转载于:https://www.cnblogs.com/wttttt/p/6844085.html

你可能感兴趣的文章
php判断网页是否gzip压缩
查看>>
一个有意思的js实例,你会吗??[原创]
查看>>
sql server中bit字段实现取反操作
查看>>
Part3_lesson2---ARM指令分类学习
查看>>
jQuery拖拽原理实例
查看>>
JavaScript 技巧与高级特性
查看>>
Uva 11729 Commando War
查看>>
增强学习(一) ----- 基本概念
查看>>
ubuntu下USB连接Android手机
查看>>
C# 语句 分支语句 switch----case----.
查看>>
反射获取 obj类 的属性 与对应值
查看>>
表单中的readonly与disable的区别(zhuan)
查看>>
win10下安装配置mysql-8.0.13--实战可用
查看>>
周记2018.8.27~9.2
查看>>
MySQL中 1305-FUNCTION liangshanhero2.getdate does not exit 问题解决
查看>>
python序列化和json
查看>>
mongodb
查看>>
SSH-struts2的异常处理
查看>>
《30天自制操作系统》学习笔记--第14天
查看>>
LGPL协议的理解
查看>>