Spark Core源码精读计划#13:度量系统MetricsSystem的建立

目录

前言

前文讲完序列化管理器SerializerManager和广播管理器BroadcastManager之后,SparkEnv紧接着就会初始化Spark存储及任务调度部分的几个重要组件,如MapOutputTracker、ShuffleManager、MemoryManager、BlockManager等。由于它们的实现都相当复杂,且需要一定的背景知识,因此个人认为之后讲到相应内容时,再对它们进行分析比较自然。现在就暂时略过它们,来看看度量系统MetricsSystem。

我们已经知道,Spark Web UI是直观地展示运行状况、资源状态等监控数据的前端,而MetricsSystem就负责收集、存储和输出度量指标。对一个优秀的框架而言,监控与功能实现同等重要,因此了解MetricsSystem的相关细节很有意义。

度量系统MetricsSystem类

实例化

在代码#7.11中调用了MetricsSystem.createMetricsSystem()方法来实例化MetricsSystem类,逻辑很简单。

代码#13.1 - o.a.s.metrics.MetricsSystem.createMetricsSystem()方法

  def createMetricsSystem(
      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
  }

由此可见,MetricsSystem类有三个主构造方法参数,分别是:

  • instance,表示该度量系统对应的实例名称,可取的值如"driver"、"executor"、"master"、"worker"、"applications"、"*"(表示默认实例)等。
  • conf,即SparkConf配置项。
  • securityMgr,即安全性管理器SecurityManager的实例。

类中的属性成员

代码#13.2 - o.a.s.metrics.MetricsSystem类的属性成员

  private[this] val metricsConfig = new MetricsConfig(conf)

  private val sinks = new mutable.ArrayBuffer[Sink]
  private val sources = new mutable.ArrayBuffer[Source]
  private val registry = new MetricRegistry()

  private var running: Boolean = false

  private var metricsServlet: Option[MetricsServlet] = None

这些属性涉及到了度量系统中的几个基础角色,来看一看。

  • metricsConfig:度量系统的配置,它是MetricsConfig类的实例,MetricsConfig类提供了设置和加载度量配置的基础功能。
  • sources:度量来源的缓存数组。所谓度量来源,就是产生及收集监控指标的组件,都继承自Source特征。
  • sinks:度量目的地的缓存数组。所谓度量目的地,就是输出及表现监控指标的组件,都继承自Sink特征。
  • registry:度量注册中心,是com.codahale.metrics.MetricRegistry类的实例,Source和Sink都是通过它注册到度量仓库的。这里“度量仓库”并不是Spark内部的东西,而是Codahale提供的度量组件Metrics,Spark以它为基础来构建度量系统。
  • running:表示当前MetricsSystem是否在运行。
  • metricsServlet:本质上是一个特殊的Sink,专门供Spark Web UI使用。

关于MetricsConfig、Source和Sink,稍后会讲述。

注册度量来源

MetricsSystem提供了registerSource()方法来注册单个度量来源。

代码#13.3 - o.a.s.metrics.MetricsSystem.registerSource()与buildRegistryName()方法

  def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }

  private[spark] def buildRegistryName(source: Source): String = {
    val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
    val executorId = conf.getOption("spark.executor.id")
    val defaultName = MetricRegistry.name(source.sourceName)

    if (instance == "driver" || instance == "executor") {
      if (metricsNamespace.isDefined && executorId.isDefined) {
        MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
      } else {
        if (metricsNamespace.isEmpty) {
          logWarning(s"Using default name $defaultName for source because neither " +
            s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
        }
        if (executorId.isEmpty) {
          logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
            s"not set.")
        }
        defaultName
      }
    } else { defaultName }
  }

registerSource()方法首先将度量来源加入缓存数组,调用buildRegistryName()方法来构造Source的注册名称,然后调用MetricRegistry.register()方法注册到度量仓库。Source的注册名称取决于度量的命名空间(由spark.metrics.namespace参数控制,默认值为Application ID),以及Executor ID。其默认注册名称则由MetricRegistry.name()方法来生成。

在MetricsSystem初始化时,会根据MetricsConfig来初始化所有对应的Source。这个方法的实现如下。

代码#13.4 - o.a.s.metrics.MetricsSystem.registerSources()方法

  private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

其具体步骤是:先调用MetricsConfig.getInstance()方法取得实例名称下的配置,然后用MetricsConfig.subProperties()方法,根据正则表达式^source\.(.+)\.(.+)匹配出该实例所有与Source相关的参数,返回类型为HashMap[String, Properties]。最后,根据配置的class属性,利用反射构造出Source实现类的对象实例,调用代码#13.3中的方法将Source注册到度量仓库。

注册度量目的地

MetricsSystem并没有提供注册单个度量目的地的方法,而只提供了registerSinks()方法在初始化时批量注册度量目的地。

代码#13.5 - o.a.s.metrics.MetricsSystem.registerSinks()方法

  private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }

它前半部分的处理方式与registerSources()方法一致,不过是改用了正则表达式^sink\.(.+)\.(.+)匹配出该实例所有与Sink相关的参数而已。然后同样利用反射构造出Sink实现类的对象实例,如果度量实例名称为servlet,说明是Web UI使用的那个Sink,将它赋值给metricsServlet属性。否则,就将其加入sinks缓存数组。在MetricsSystem初始化的最后,会调用Sink.start()方法分别启动每个Sink。

度量配置MetricsConfig类

简单来讲,MetricsConfig主要负责玩“文字游戏”,也就是度量系统配置的设置与解析。我们由它的初始化方法initialize()入手,这个方法在MetricsSystem的构造方法中也有调用。

初始化

代码#13.6 - o.a.s.metrics.MetricsConfig.initialize()方法

  def initialize() {
    setDefaultProperties(properties)
    loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))

    val prefix = "spark.metrics.conf."
    conf.getAll.foreach {
      case (k, v) if k.startsWith(prefix) =>
        properties.setProperty(k.substring(prefix.length()), v)
      case _ =>
    }

    perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
    if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
      val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
      for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
           (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
        prop.put(k, v)
      }
    }
  }

它初始化的流程是:首先调用setDefaultProperties()方法设置默认配置,调用loadPropertiesFromFile()方法从文件中加载配置。然后,在SparkConf中查找以"spark.metrics.conf."字符串为前缀的配置项,将其键的后缀和值加入度量系统的配置。最后,调用subProperties()方法,通过正则匹配分拆出各个度量实例的配置,并保存在perInstanceSubProperties属性(其数据类型为HashMap[String, Properties])中。

设置默认配置及从文件加载配置

默认属性一共有4个,代码如下。

代码#13.7 - o.a.s.metrics.MetricsConfig.setDefaultProperties()方法

  private def setDefaultProperties(prop: Properties) {
    prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
    prop.setProperty("*.sink.servlet.path", "/metrics/json")
    prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
    prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
  }

loadPropertiesFromFile()方法会首先从给定的路径中加载配置文件。如果没有提供配置文件,就会从classpath下的metrics.properties文件中读取。代码比较简单,为节省篇幅,不再贴出。

分拆各实例的配置

代码#13.8 - o.a.s.metrics.MetricsConfig.subProperties()方法

  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
    val subProperties = new mutable.HashMap[String, Properties]
    prop.asScala.foreach { kv =>
      if (regex.findPrefixOf(kv._1.toString).isDefined) {
        val regex(prefix, suffix) = kv._1.toString
        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString)
      }
    }
    subProperties
  }

度量配置的格式是:[instance].[sink|source].[name].[options]=value,比如上面默认配置中的master.sink.servlet.path=/metrics/master/json。这个方法的作用就是将原key的instance部分正则匹配出来作为HashMap的key,原key的其余部分作为Properties的key,原value作为Properties的value,以达到根据instance名称分组的效果。

度量来源Source与目的地Sink

由上面的分析,我们可以知道Spark的度量系统是由Instance、Source、Metrics、Sink四个部分组成的,它们之间的关系可以用下面的框图来表示。

图#13.1 - 度量系统主要组件的关系

接下来就看一看与Source和Sink相关的细节。

Source实现类与示例

Source是一个非常简单的特征,其中只定义了2个方法,分别用来获取度量来源的名称,以及其对应的注册中心。它有非常多种实现,如下图所示。

图#13.2 - Source继承体系

其中我们也会发现很多之前耳熟能详的名词,比如DAGScheduler、LiveListenerBus等,这说明度量系统在Spark内部有着广泛的应用。下面我们以ExecutorSource的部分代码为例来简单看看Source的具体实现。

代码#13.9 - o.a.s.executor.ExecutorSource类的部分代码

private[spark]
class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source {
  override val metricRegistry = new MetricRegistry()
  override val sourceName = "executor"

  private def registerFileSystemStat[T](
        scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
    metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
      override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
    })
  }

  metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
    override def getValue: Int = threadPool.getActiveCount()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
    override def getValue: Long = threadPool.getCompletedTaskCount()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
    override def getValue: Int = threadPool.getPoolSize()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
    override def getValue: Int = threadPool.getMaximumPoolSize()
  })

  for (scheme <- Array("hdfs", "file")) {
    registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
    registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
    registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
    registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
    registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
  }

  val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
  val METRIC_RUN_TIME = metricRegistry.counter(MetricRegistry.name("runTime"))
  val METRIC_JVM_GC_TIME = metricRegistry.counter(MetricRegistry.name("jvmGCTime"))
  // ......
}

由此可见,ExecutorSource向注册中心中注册了很多指标,包括与线程池(threadpool)相关的Gauge、与文件系统(filesystem)相关的Gauge(Gauge是Metrics体系内提供的估计度量值的工具),以及大量的计数器,如GC、Shuffle、序列化方面的计数值。这些指标覆盖了整个Executor运行期的方方面面,看官也可以寻找其他Source的实现来进一步参考。

Sink实现类与示例

Sink也是一个非常简单的特征,其中定义了3个方法:start()/stop()方法分别用来启动和停止Sink,report()方法用于输出度量值。Sink的实现类如下图所示。

图#13.3 - Sink继承体系

这其中有些名称是可以顾名思义的,比如ConsoleSink输出到控制台,CsvSink输出到CSV文件,Slf4jSink输出到符合SLF4J规范的日志。另外,JmxSink可以将监控数据输出到JMX中,从而通过JVM可视化工具(如VisualVM)进行观察。MetricsServlet在前面已经说过,它可以利用Spark UI内置的Jetty服务将监控数据输出到浏览器页面。

下面以Slf4jSink为例简单看看Sink的具体实现。

代码#13.10 - o.a.s.metrics.sink.Slf4jSink类

private[spark] class Slf4jSink(
    val property: Properties,
    val registry: MetricRegistry,
    securityMgr: SecurityManager)
  extends Sink {
  val SLF4J_DEFAULT_PERIOD = 10
  val SLF4J_DEFAULT_UNIT = "SECONDS"
  val SLF4J_KEY_PERIOD = "period"
  val SLF4J_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => SLF4J_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
    case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
  }

  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
    .convertDurationsTo(TimeUnit.MILLISECONDS)
    .convertRatesTo(TimeUnit.SECONDS)
    .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}

可见,Slf4jSink实际上就是Codehale Metrics中Slf4jReporter类的简单封装。Slf4jReporter启动之后,会按照pollPeriod和pollUnit规定的时间定期去轮询度量值并输出。

总结

本文首先介绍了Spark度量系统的概念,通过阅读MetricsSystem类的相关源码,明确了度量系统是如果运作及发挥作用的。然后对度量配置MetricsConfig做了简单了解,最后简述了度量来源Source及目的地Sink的实现方式。由于度量和监控在Spark各主要功能模块中都是不可或缺的,因此今后在深入阅读Spark Core的其他源码时,我们会非常频繁地见到度量系统相关的方法调用。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容