目录
前言
前文讲完序列化管理器SerializerManager和广播管理器BroadcastManager之后,SparkEnv紧接着就会初始化Spark存储及任务调度部分的几个重要组件,如MapOutputTracker、ShuffleManager、MemoryManager、BlockManager等。由于它们的实现都相当复杂,且需要一定的背景知识,因此个人认为之后讲到相应内容时,再对它们进行分析比较自然。现在就暂时略过它们,来看看度量系统MetricsSystem。
我们已经知道,Spark Web UI是直观地展示运行状况、资源状态等监控数据的前端,而MetricsSystem就负责收集、存储和输出度量指标。对一个优秀的框架而言,监控与功能实现同等重要,因此了解MetricsSystem的相关细节很有意义。
度量系统MetricsSystem类
实例化
在代码#7.11中调用了MetricsSystem.createMetricsSystem()方法来实例化MetricsSystem类,逻辑很简单。
代码#13.1 - o.a.s.metrics.MetricsSystem.createMetricsSystem()方法
def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
}
由此可见,MetricsSystem类有三个主构造方法参数,分别是:
- instance,表示该度量系统对应的实例名称,可取的值如"driver"、"executor"、"master"、"worker"、"applications"、"*"(表示默认实例)等。
- conf,即SparkConf配置项。
- securityMgr,即安全性管理器SecurityManager的实例。
类中的属性成员
代码#13.2 - o.a.s.metrics.MetricsSystem类的属性成员
private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()
private var running: Boolean = false
private var metricsServlet: Option[MetricsServlet] = None
这些属性涉及到了度量系统中的几个基础角色,来看一看。
- metricsConfig:度量系统的配置,它是MetricsConfig类的实例,MetricsConfig类提供了设置和加载度量配置的基础功能。
- sources:度量来源的缓存数组。所谓度量来源,就是产生及收集监控指标的组件,都继承自Source特征。
- sinks:度量目的地的缓存数组。所谓度量目的地,就是输出及表现监控指标的组件,都继承自Sink特征。
- registry:度量注册中心,是com.codahale.metrics.MetricRegistry类的实例,Source和Sink都是通过它注册到度量仓库的。这里“度量仓库”并不是Spark内部的东西,而是Codahale提供的度量组件Metrics,Spark以它为基础来构建度量系统。
- running:表示当前MetricsSystem是否在运行。
- metricsServlet:本质上是一个特殊的Sink,专门供Spark Web UI使用。
关于MetricsConfig、Source和Sink,稍后会讲述。
注册度量来源
MetricsSystem提供了registerSource()方法来注册单个度量来源。
代码#13.3 - o.a.s.metrics.MetricsSystem.registerSource()与buildRegistryName()方法
def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}
private[spark] def buildRegistryName(source: Source): String = {
val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
if (metricsNamespace.isDefined && executorId.isDefined) {
MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
} else {
if (metricsNamespace.isEmpty) {
logWarning(s"Using default name $defaultName for source because neither " +
s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
}
if (executorId.isEmpty) {
logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
s"not set.")
}
defaultName
}
} else { defaultName }
}
registerSource()方法首先将度量来源加入缓存数组,调用buildRegistryName()方法来构造Source的注册名称,然后调用MetricRegistry.register()方法注册到度量仓库。Source的注册名称取决于度量的命名空间(由spark.metrics.namespace参数控制,默认值为Application ID),以及Executor ID。其默认注册名称则由MetricRegistry.name()方法来生成。
在MetricsSystem初始化时,会根据MetricsConfig来初始化所有对应的Source。这个方法的实现如下。
代码#13.4 - o.a.s.metrics.MetricsSystem.registerSources()方法
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
其具体步骤是:先调用MetricsConfig.getInstance()方法取得实例名称下的配置,然后用MetricsConfig.subProperties()方法,根据正则表达式^source\.(.+)\.(.+)
匹配出该实例所有与Source相关的参数,返回类型为HashMap[String, Properties]。最后,根据配置的class属性,利用反射构造出Source实现类的对象实例,调用代码#13.3中的方法将Source注册到度量仓库。
注册度量目的地
MetricsSystem并没有提供注册单个度量目的地的方法,而只提供了registerSinks()方法在初始化时批量注册度量目的地。
代码#13.5 - o.a.s.metrics.MetricsSystem.registerSinks()方法
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
}
}
它前半部分的处理方式与registerSources()方法一致,不过是改用了正则表达式^sink\.(.+)\.(.+)
匹配出该实例所有与Sink相关的参数而已。然后同样利用反射构造出Sink实现类的对象实例,如果度量实例名称为servlet,说明是Web UI使用的那个Sink,将它赋值给metricsServlet属性。否则,就将其加入sinks缓存数组。在MetricsSystem初始化的最后,会调用Sink.start()方法分别启动每个Sink。
度量配置MetricsConfig类
简单来讲,MetricsConfig主要负责玩“文字游戏”,也就是度量系统配置的设置与解析。我们由它的初始化方法initialize()入手,这个方法在MetricsSystem的构造方法中也有调用。
初始化
代码#13.6 - o.a.s.metrics.MetricsConfig.initialize()方法
def initialize() {
setDefaultProperties(properties)
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
(k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}
它初始化的流程是:首先调用setDefaultProperties()方法设置默认配置,调用loadPropertiesFromFile()方法从文件中加载配置。然后,在SparkConf中查找以"spark.metrics.conf."字符串为前缀的配置项,将其键的后缀和值加入度量系统的配置。最后,调用subProperties()方法,通过正则匹配分拆出各个度量实例的配置,并保存在perInstanceSubProperties属性(其数据类型为HashMap[String, Properties])中。
设置默认配置及从文件加载配置
默认属性一共有4个,代码如下。
代码#13.7 - o.a.s.metrics.MetricsConfig.setDefaultProperties()方法
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
loadPropertiesFromFile()方法会首先从给定的路径中加载配置文件。如果没有提供配置文件,就会从classpath下的metrics.properties文件中读取。代码比较简单,为节省篇幅,不再贴出。
分拆各实例的配置
代码#13.8 - o.a.s.metrics.MetricsConfig.subProperties()方法
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
prop.asScala.foreach { kv =>
if (regex.findPrefixOf(kv._1.toString).isDefined) {
val regex(prefix, suffix) = kv._1.toString
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
}
}
subProperties
}
度量配置的格式是:[instance].[sink|source].[name].[options]=value
,比如上面默认配置中的master.sink.servlet.path=/metrics/master/json
。这个方法的作用就是将原key的instance部分正则匹配出来作为HashMap的key,原key的其余部分作为Properties的key,原value作为Properties的value,以达到根据instance名称分组的效果。
度量来源Source与目的地Sink
由上面的分析,我们可以知道Spark的度量系统是由Instance、Source、Metrics、Sink四个部分组成的,它们之间的关系可以用下面的框图来表示。
接下来就看一看与Source和Sink相关的细节。
Source实现类与示例
Source是一个非常简单的特征,其中只定义了2个方法,分别用来获取度量来源的名称,以及其对应的注册中心。它有非常多种实现,如下图所示。
其中我们也会发现很多之前耳熟能详的名词,比如DAGScheduler、LiveListenerBus等,这说明度量系统在Spark内部有着广泛的应用。下面我们以ExecutorSource的部分代码为例来简单看看Source的具体实现。
代码#13.9 - o.a.s.executor.ExecutorSource类的部分代码
private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "executor"
private def registerFileSystemStat[T](
scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
})
}
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
override def getValue: Int = threadPool.getActiveCount()
})
metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
override def getValue: Long = threadPool.getCompletedTaskCount()
})
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
override def getValue: Int = threadPool.getPoolSize()
})
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
override def getValue: Int = threadPool.getMaximumPoolSize()
})
for (scheme <- Array("hdfs", "file")) {
registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}
val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
val METRIC_RUN_TIME = metricRegistry.counter(MetricRegistry.name("runTime"))
val METRIC_JVM_GC_TIME = metricRegistry.counter(MetricRegistry.name("jvmGCTime"))
// ......
}
由此可见,ExecutorSource向注册中心中注册了很多指标,包括与线程池(threadpool)相关的Gauge、与文件系统(filesystem)相关的Gauge(Gauge是Metrics体系内提供的估计度量值的工具),以及大量的计数器,如GC、Shuffle、序列化方面的计数值。这些指标覆盖了整个Executor运行期的方方面面,看官也可以寻找其他Source的实现来进一步参考。
Sink实现类与示例
Sink也是一个非常简单的特征,其中定义了3个方法:start()/stop()方法分别用来启动和停止Sink,report()方法用于输出度量值。Sink的实现类如下图所示。
这其中有些名称是可以顾名思义的,比如ConsoleSink输出到控制台,CsvSink输出到CSV文件,Slf4jSink输出到符合SLF4J规范的日志。另外,JmxSink可以将监控数据输出到JMX中,从而通过JVM可视化工具(如VisualVM)进行观察。MetricsServlet在前面已经说过,它可以利用Spark UI内置的Jetty服务将监控数据输出到浏览器页面。
下面以Slf4jSink为例简单看看Sink的具体实现。
代码#13.10 - o.a.s.metrics.sink.Slf4jSink类
private[spark] class Slf4jSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"
val SLF4J_KEY_PERIOD = "period"
val SLF4J_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => SLF4J_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}
可见,Slf4jSink实际上就是Codehale Metrics中Slf4jReporter类的简单封装。Slf4jReporter启动之后,会按照pollPeriod和pollUnit规定的时间定期去轮询度量值并输出。
总结
本文首先介绍了Spark度量系统的概念,通过阅读MetricsSystem类的相关源码,明确了度量系统是如果运作及发挥作用的。然后对度量配置MetricsConfig做了简单了解,最后简述了度量来源Source及目的地Sink的实现方式。由于度量和监控在Spark各主要功能模块中都是不可或缺的,因此今后在深入阅读Spark Core的其他源码时,我们会非常频繁地见到度量系统相关的方法调用。