本文共 12587 字,大约阅读时间需要 41 分钟。
《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接
由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》一书第三章第三部分的内容请看链接本文展现第3章第四部分的内容:
MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:
val metricsSystem = env.metricsSystem metricsSystem.start()
MetricsSystem的启动过程包括以下步骤:
1) 注册Sources;2) 注册Sinks;3) 给Sinks增加Jetty的ServletContextHandler。MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attachHandler将它们绑定到SparkUI上。metricsSystem.getServletHandlers.foreach(handler=> ui.foreach(_.attachHandler(handler)))
registerSources方法用于注册Sources,它的实现见代码清单3-44。注册Sources的过程分为以下步骤:
1) 从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,sink.servlet.path=/metrics/json}。2) 从Driver的Properties中用正则匹配以source.开头的属性。然后将属性中的Source反射得到的实例,加入ArrayBuffer[Source]。3) 将每个Source的metricRegistry(也是MetricSet的子类型)注册到ConcurrentMap metrics。这里的registerSource方法已在3.8.2节讲解过。代码清单3-44 MetricsSystemprivate def registerSources() { val instConfig =metricsConfig.getInstance(instance) val sourceConfigs =metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) //Register all the sources related to instance sourceConfigs.foreach{ kv => val classPath = kv._2.getProperty("class") try { val source =Class.forName(classPath).newInstance() registerSource(source.asInstanceOf[Source]) } catch { case e: Exception =>logError("Source class" + classPath + " cannot beinstantiated", e) } } }
registerSinks方法用于注册Sinks,它的实现见代码清单3-45。注册Sinks的步骤如下:
1) 从Driver的Properties中用正则匹配以sink.开头的属性,如:{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,sink.servlet.path=/metrics/json}。将其转换为Map(servlet-> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。2) 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。代码清单3-45 MetricsSystem注册Sinks的实现private def registerSinks() { val instConfig = metricsConfig.getInstance(instance) val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { try { val sink = Class.forName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e) } } } }
MetricsSystem的getServletHandlers方法,实现如下。
def getServletHandlers = { require(running, "Canonly call getServletHandlers on a running MetricsSystem") metricsServlet.map(_.getHandlers).getOrElse(Array()) }
可以看到调用了metricsServlet的getHandlers,其实现如下。
def getHandlers = Array[ServletContextHandler]( createServletHandler(servletPath, newServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr) )
最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由getMetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI。createServletHandler与attachHandler方法都已经在3.4.4节详细阐述。最终我们可以使用以下这些地址来访问测量数据。
ExecutorAllocationManager用于动态分配executor,创建和启动ExecutorAllocationManager的代码如下。
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (conf.getBoolean("spark.dynamicAllocation.enabled",false)) { Some(newExecutorAllocationManager(this, listenerBus, conf)) } else { None } executorAllocationManager.foreach(_.start())
默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入到listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加删除executor。并且通过Thread不断的添加executor,并且遍历executor,将超时的executor杀掉并且移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-46。
代码清单3-46 ExecutorAllocationManagerr的关键代码private valintervalMillis: Long = 100 private var clock: Clock = new RealClock private val listener = newExecutorAllocationListener def start():Unit = { listenerBus.addListener(listener) startPolling() } private defstartPolling(): Unit = { val t = new Thread { override def run(): Unit= { while (true) { try { schedule() } catch { case e: Exception =>logError("Exception in dynamic executor allocation thread!", e) } Thread.sleep(intervalMillis) } } } t.setName("spark-dynamic-executor-allocation") t.setDaemon(true) t.start() }
根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。
listenerBus.start()
由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。
private[spark] val cleaner:Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking",true)) { Some(newContextCleaner(this)) } else { None } } cleaner.foreach(_.start())
ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。ContextCleaner的组成如下:
代码清单3-47 ContextCleaner的实现
private defkeepCleaning(): Unit = Utils.logUncaughtExceptions { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) //Synchronize here to avoid being interrupted on stop() synchronized { reference.map(_.task).foreach { task=> logDebug("Gotcleaning task " + task) referenceBuffer -= reference.get task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } } } } catch { case ie:InterruptedException if stopped => // ignore case e: Exception =>logError("Error in cleaningthread", e) } } }
在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。
postEnvironmentUpdate() postApplicationStart()
SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入到httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入到httpFileServer的fileDir变量指定的路径下。见代码清单3-48。
代码清单3-48 依赖文件处理val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size!= 0)).toSeq.flatten val files: Seq[String] = conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size!= 0)).toSeq.flatten // Addeach JAR given through the constructor if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) }
httpFileServer的addFile和addJar方法,见代码清单3-49。
代码清单3-49 HttpFileServer提供对依赖文件的访问def addFile(file: File) : String = { addFileToDir(file, fileDir) serverUri + "/files/"+ file.getName } def addJar(file:File) : String = { addFileToDir(file, jarDir) serverUri + "/jars/" +file.getName } def addFileToDir(file: File, dir: File) : String = { if (file.isDirectory) { throw newIllegalArgumentException(s"$filecannot be a directory.") } Files.copy(file, new File(dir, file.getName)) dir + "/" + file.getName }
postEnvironmentUpdate的实现见代码清单3-50,其处理步骤如下:
1) 通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-51。2) 生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被EnvironmentListener监听,最终影响EnvironmentPage页面中的输出内容。代码清单3-50 SparkContext环境更新private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode =getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) val environmentUpdate =SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } }
代码清单3-51 environmentDetails的实现
val jvmInformation = Seq( ("JavaVersion", s"$javaVersion ($javaVendor)"), ("Java Home", javaHome), ("Scala Version", versionString) ).sorted val schedulerMode = if (!conf.contains("spark.scheduler.mode")) { Seq(("spark.scheduler.mode", schedulingMode)) } else { Seq[(String, String)]() } val sparkProperties = (conf.getAll ++ schedulerMode).sorted // System properties that are not java classpaths val systemProperties = Utils.getSystemProperties.toSeq val otherProperties = systemProperties.filter { case (k, _) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars andfiles val classPathEntries = javaClassPath .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "AddedBy User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVMInformation" -> jvmInformation, "Spark Properties" ->sparkProperties, "System Properties"-> otherProperties, "Classpath Entries"-> classPaths)
postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser))
在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的postStartHook方法,其目的是为了等待backend就绪,见代码清单3-52。postStartHook的实现见代码清单3-53。
创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage.runningStages、stage. waitingStages、stage. allJobs、stage.activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory.remainingMem_MB、memory. memUsed_MB、memory. diskSpaceUsed_MB。代码清单3-52 创建DAGSchedulerSource和BlockManagerSourcetaskScheduler.postStartHook() private val dagSchedulerSource =new DAGSchedulerSource(this.dagScheduler) private val blockManagerSource =new BlockManagerSource(SparkEnv.get.blockManager) ivate def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) } initDriverMetrics()
代码清单3-53 等待backend就绪的实现
override def postStartHook() { waitBackendReady() } private def waitBackendReady(): Unit = { if (backend.isReady) { return } while (!backend.isReady) { synchronized { this.wait(100) } } }
SparkContext初始化的最后将当前SparkContext的状态从contextBeingConstructed(正在构建中)改为activeContext(已激活),代码如下。
[java] view plain copy 在CODE上查看代码片派生到我的代码片SparkContext.setActiveContext(this, allowMultipleContexts) setActiveContext方法的实现如下。private[spark] defsetActiveContext( sc:SparkContext, allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = None activeContext =Some(sc) } }
回顾本章, Scala与Akka基于Actor的并发编程模型给人带来深刻的印象,改变了我本人每当需要提升性能时就想到使用多线程的传统观念,Actor与事件模型有类似之处,通过异步处理,减少线程切换开销,值得开发人员借鉴。listenerBus对于监听器模式的经典应用将处理转化为事件并交给统一的线程处理,减少了线程阻塞与切换,提升了性能,希望读者朋友能应用到自己的产品开发中去。此外,使用Netty所提供的异步网络框架构建的Block传输服务,基于Jetty构建的内嵌web服务、HTTP文件服务器和SparkUI,基于codahale提供的第三方测量仓库创建的测量系统,Executor中的心跳实现等内容,都值得借鉴。
转载地址:http://vmhtl.baihongyu.com/