个性化阅读
专注于IT技术分析

带有示例和用例:Apache Spark简介

本文概述

当我对Scala(Spark的编写语言)感兴趣时, 我在2013年底首次听说了Spark。一段时间后, 我做了一个有趣的数据科学项目, 试图预测泰坦尼克号的生存时间。事实证明, 这是进一步介绍Spark概念和编程的好方法。我强烈建议有志向的Spark开发人员寻找入门的地方。

如今, Spark已被亚马逊, eBay和Yahoo!等主要公司采用。许多组织在具有数千个节点的群集上运行Spark。根据Spark FAQ, 最大的已知集群有8000多个节点。的确, Spark是一项值得关注和学习的技术。

本文提供了Spark的介绍, 包括用例和示例。它包含来自Apache Spark网站的信息以及《 Learning Spark-Lightning-Fast Big Data Analysis》一书。

什么是Apache Spark?一个介绍

Spark是一个Apache项目, 被宣传为”闪电般的快速集群计算”。它具有蓬勃发展的开源社区, 并且是当前最活跃的Apache项目。

Spark提供了一个更快, 更通用的数据处理平台。使用Spark, 你可以在内存上运行程序的速度比Hadoop快100倍, 在磁盘上的速度快10倍。去年, Spark接手了Hadoop, 使100 TB Daytona GraySort竞赛的机器速度提高了三倍, 而机器数量却是原来的十分之一, 而且它也成为了对PB级进行分类的最快的开源引擎。

当你拥有80多个高级操作员时, Spark也使更快地编写代码成为可能。为了说明这一点, 让我们看一下” Hello World!” BigData:字数统计示例。用Java编写的MapReduce大约有50行代码, 而在Spark(和Scala)中, 你可以像下面这样简单地完成它:

sparkContext.textFile("hdfs://...")
            .flatMap(line => line.split(" "))
            .map(word => (word, 1)).reduceByKey(_ + _)
            .saveAsTextFile("hdfs://...")

学习如何使用Apache Spark时, 另一个重要方面是它提供的现成的交互式shell(REPL)。使用REPL, 你可以测试每一行代码的结果, 而无需首先编写代码并执行整个作业。因此, 工作代码的路径要短得多, 并且可以进行临时数据分析。

Spark的其他主要功能包括:

  • 目前提供Scala, Java和Python的API, 并在支持其他语言(例如R)
  • 与Hadoop生态系统和数据源(HDFS, Amazon S3, Hive, HBase, Cassandra等)良好集成
  • 可以在Hadoop YARN或Apache Mesos管理的集群上运行, 也可以独立运行

Spark核心由一组功能强大的高级库进行了补充, 这些库可以在同一应用程序中无缝使用。这些库当前包括SparkSQL, Spark Streaming, MLlib(用于机器学习)和GraphX, 本文将对其进行详细说明。目前还正在开发其他Spark库和扩展。

Spark库和扩展

Spark Core

Spark Core是大规模并行和分布式数据处理的基本引擎。它负责:

  • 内存管理和故障恢复
  • 在集群上调度, 分配和监视作业
  • 与存储系统交互

Spark引入了RDD(弹性分布式数据集)的概念, 这是一个不变的容错分布式对象集合, 可以并行操作。 RDD可以包含任何类型的对象, 并且可以通过加载外部数据集或从驱动程序分配分布来创建。

RDD支持两种类型的操作:

  • 转换是在RDD上执行的操作(例如映射, 过滤, 联接, 联合等), 它们会产生一个包含结果的新RDD。
  • 动作是在RDD上运行计算后返回值的操作(例如reduce, count, first等)。

Spark中的转换是”惰性”的, 这意味着它们不会立即计算出结果。相反, 它们只是”记住”要执行的操作以及要对其执行操作的数据集(例如文件)。仅当调用动作并将结果返回到驱动程序时才实际计算转换。这种设计使Spark可以更高效地运行。例如, 如果将一个大文件以各种方式转换并传递给第一个动作, Spark将只处理并返回第一行的结果, 而不是整个文件的工作。

默认情况下, 每次在其上执行操作时, 都可能会重新计算每个转换后的RDD。但是, 你也可以使用persist或cache方法将RDD保留在内存中, 在这种情况下, Spark会将元素保留在群集中, 以便下次查询时可以更快地进行访问。

SparkSQL

SparkSQL是一个Spark组件, 它支持通过SQL或Hive查询语言查询数据。它最初是作为Apache Hive端口运行在Spark之上(代替MapReduce), 现在已与Spark堆栈集成。除了提供对各种数据源的支持之外, 还可以通过代码转换来编织SQL查询, 从而产生了非常强大的工具。以下是Hive兼容查询的示例:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Spark Streaming

Spark Streaming支持实时处理流数据, 例如生产Web服务器日志文件(例如Apache Flume和HDFS / S3), 社交媒体(如Twitter)和各种消息队列(如Kafka)。在后台, Spark Streaming接收输入数据流, 并将数据分成批次。接下来, 它们由Spark引擎处理, 并分批生成最终结果流, 如下所示。

Spark Streaming

Spark Streaming API与Spark Core的API紧密匹配, 这使程序员可以轻松地在批处理和流数据领域工作。

开发库

MLlib是一个机器学习库, 提供了各种算法, 这些算法旨在在集群上进行扩展以进行分类, 回归, 聚类, 协作过滤等(有关该主题的更多信息, 请查看srcmini的机器学习文章)。这些算法中的一些还可以用于流数据, 例如使用普通最小二乘法或k均值聚类的线性回归(以及更多)。 Apache Mahout(用于Hadoop的机器学习库)已经远离MapReduce并在Spark MLlib上联合起来。

GraphX

graphx

GraphX是用于处理图形和执行图形并行操作的库。它为ETL, 探索性分析和迭代图计算提供了一个统一的工具。除了用于图形操作的内置操作外, 它还提供了一个常见的图形算法库, 例如PageRank。

如何使用Apache Spark:事件检测用例

既然我们已经回答了”什么是Apache Spark?”这个问题, 让我们考虑一下它可以最有效地使用哪种问题或挑战。

最近, 我碰到一篇有关通过分析Twitter流检测地震的实验的文章。有趣的是, 事实证明, 这种技术可能比日本气象厅更快地通知你日本发生地震。尽管他们在文章中使用了不同的技术, 但我认为这是一个很好的例子, 看看我们如何可以在简化的代码片段和无胶合代码的情况下使用Spark。

首先, 我们必须过滤看起来很相关的推文, 例如”地震”或”震动”。为此, 我们可以轻松地使用Spark Streaming:

TwitterUtils.createStream(...)
            .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

然后, 我们将不得不对这些推文进行一些语义分析, 以确定它们是否似乎在引用当前的地震事件。像”地震!”这样的推文例如, “现在正在发抖”或”现在正在发抖”将被视为正面匹配, 而”参加地震会议”或”昨天的地震令人恐惧”之类的推文则不会。为此, 本文的作者使用了支持向量机(SVM)。我们将在此处执行相同的操作, 但也可以尝试使用流式版本。 MLlib产生的代码示例如下所示:

// We would prepare some earthquake tweet data and load it in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set. 
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Area under ROC = " + auROC)

如果我们对模型的预测率感到满意, 则可以进入下一阶段, 并在发现地震时做出反应。为了检测一个, 我们需要在定义的时间窗口内(如文章中所述)一定数量(即密度)的正推文。请注意, 对于启用了Twitter位置服务的推文, 我们还将提取地震的位置。有了这些知识, 我们可以使用SparkSQL并查询现有的Hive表(存储对接收地震通知感兴趣的用户)以检索其电子邮件地址并向其发送个性化的警告电子邮件, 如下所示:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
// sendEmail is a custom function
sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email")
          .collect().foreach(sendEmail)

其他Apache Spark用例

当然, Spark的潜在用例远远超出了检测地震的范围。

以下是一些其他快速处理(但肯定远远不能穷尽!)示例, 这些示例需要处理大数据的速度, 多样性和数量, Spark非常适合:

在游戏行业中, 处理和发现实时游戏内潜在事件的模式并能够立即对其做出响应是一项可以产生利润丰厚的业务的功能, 例如玩家保留, 针对性广告, 自动-调整复杂程度等。

在电子商务行业中, 可以将实时交易信息传递给像k-means这样的流聚类算法, 或者像ALS这样的协作过滤。然后甚至可以将结果与其他非结构化数据源(例如客户评论或产品评论)组合在一起, 并用于随着时间的推移不断改进和适应新趋势的建议。

在金融或安全行业中, Spark堆栈可以应用于欺诈或入侵检测系统或基于风险的身份验证。它可以通过收集大量的存档日志, 将其与外部数据源(例如有关数据泄露和被盗帐户的信息(例如, 参见https://haveibeenpwned.com/)以及来自连接/ IP地理位置或时间等请求。

总结

总而言之, Spark有助于简化处理大量实时数据或存档数据(结构化和非结构化)的挑战性和计算量大的任务, 并无缝集成相关的复杂功能, 例如机器学习和图形算法。 Spark将大数据处理带入了大众。看看这个!

赞(0)
未经允许不得转载:srcmini » 带有示例和用例:Apache Spark简介

评论 抢沙发

评论前必须登录!