django-prometheus和prometheus_client源码分析(二)

背景

Prometheus是最近流行的监控报警系统,具体大家可以搜网上的文章来了解,而由于我司目前的应用使用了Django框架来做为后端应用,因此需要研究如何将Prometheus与Django结合在一起使用,因此有了接下来的源码研究。

在分析源代码之前,先要知道为什么需要分析源代码,对于我来说,有几个问题是我想要搞明白的:

  1. django-prometheus是如何注册/metrics uri并通过接口提供服务的?
  2. django-prometheus到底是怎样将数据从不同的接口收集上来的?
  3. django-prometheus收集上来Metrics后是否需要存储,如果需要,那么存储在什么地方了?
    而在搞清楚这些问题的时候,发现django-prometheus又调用了prometheus_client,又不可避免的有了针对prometheus_client的问题,所以又不得不去看prometheus_client的源码,也因此有了本文。

第一篇我们已经基本回答了第一个问题,即django-prometheus究竟是如何通过/metrics提供接口服务的。这一篇我们就接着探寻其它问题的答案。


源码分析

Collector

首先,我们需要知道Collector在应用程序中具体是如何采集数据的,先看几个例子:

from prometheus_client import Counter, Gauge, Histogram

c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])  # 此为parent对象
c.labels('get', '/').inc()  # 注意labels的使用
c.labels('post', '/submit').inc()


g = Gauge('gg', 'A gauge')
h = Histogram('hh', 'A histogram', buckets=(-5, 0, 5))
s = Summary('ss', 'A summary', ['label1', 'label2'])  # metrics名字, metrics说明, metrics支持的label

# Gauge 有三种方法可以用来改变其记录值
g.inc()   # 加1操作
g.set(5)  # 设定记录值为5
g.dec(2)  # 减2操作

# Histogram 使用observe()方法来记录值
h.observe(5)

我们以Counterinc()方法为例,看下它是如何记录数值的。

class Counter(MetricWrapperBase):
    ....
    def _metric_init(self):
        self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
                                        self._labelvalues)
        self._created = time.time()
        
    def inc(self, amount=1):
        """Increment counter by the given amount."""
        if amount < 0:
            raise ValueError('Counters can only be incremented by non-negative amounts.')
        self._value.inc(amount) # 这里的self._value是在_metric_init中定义
    ...
    
  • 我们在使用Counter对象的inc()方法时本质上是调用了ValueClassinc()方法
  • self._value是在_metric_init()方法中初始化的,而_metric_init()是在Collector初始化的时候被调用的。
  • _metric_init()是在每个Collector具体实现的类中必须要实现的方法,这个方法会被__init__()初始化方法所调用。

而这里ValueClass具体又是什么呢?

# prometheus_client/values.py

class MutexValue(object):
    """A float protected by a mutex."""

    _multiprocess = False

    def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
        self._value = 0.0   # 定义了一个浮点数
        self._lock = Lock() # 初始化一个线程锁,用于保证线程安全

    def inc(self, amount):  # 真正的inc操作实现
        with self._lock:
            self._value += amount

    def set(self, value):
        with self._lock:
            self._value = value

    def get(self):
        with self._lock:
            return self._value
            
...

def get_value_class():
    # Should we enable multi-process mode?
    # This needs to be chosen before the first metric is constructed,
    # and as that may be in some arbitrary library the user/admin has
    # no control over we use an environment variable.
    if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
        return MultiProcessValue()
    else:
        return MutexValue # 重点关注这里,返回的是MutexValue类


ValueClass = get_value_class() # 在这里定义ValueClass
  • 不考虑多进程的情况,那么ValueClass实际就是MutexValue
  • 之所以使用MutexValue类,而不是直接使用原生的float,是由于增加了一个线程锁作为信号量,保证数值的更改是线程安全的。
  • 至此,我们知道所有的数据本质上都是在内存中的,并没有做持久化,理论上当我们调用collect() 去收集metrics的时候也是从内存中获取的(即从存于内存的对象中获?。?/li>

那接下来就让我们看下具体collect()做了什么。

class MetricWrapperBase(object):
    ...
    def _get_metric(self):
        return Metric(self._name, self._documentation, self._type, self._unit)
        
    
    def collect(self):
        metric = self._get_metric()
        for suffix, labels, value in self._samples():
            metric.add_sample(self._name + suffix, labels, value)
        return [metric]
    ...

collect()主要做了什么事呢?就是获取到Metric对象(命名为metric),然后将samples加入到metric中,然后再将metric返回.
这里我们又会遇到以下几个问题:

  • Metric究竟是个啥?
  • self._samples是个啥?
  • add_sample干了啥?

Metric

为了回答上边的问题,我们先来看下Metric的源码:

# prometheus_client/metrics_core.py

class Metric(object):
    """A single metric family and its samples.
    This is intended only for internal use by the instrumentation client.
    Custom collectors should use GaugeMetricFamily, CounterMetricFamily
    and SummaryMetricFamily instead.
    """

    def __init__(self, name, documentation, typ, unit=''):
        if unit and not name.endswith("_" + unit):
            name += "_" + unit
        if not METRIC_NAME_RE.match(name):
            raise ValueError('Invalid metric name: ' + name)
        self.name = name
        self.documentation = documentation
        self.unit = unit
        if typ == 'untyped':
            typ = 'unknown'
        if typ not in METRIC_TYPES:
            raise ValueError('Invalid metric type: ' + typ)
        self.type = typ  # 标明是什么类型的Metric,比如gauge, 还是counter
        self.samples = [] # 注意这里samples是一个list

    def add_sample(self, name, labels, value, timestamp=None, exemplar=None):
        """Add a sample to the metric.
        Internal-only, do not use."""
        self.samples.append(Sample(name, labels, value, timestamp, exemplar))
        ...

从这段代码可以看出Metric维护了一个成员变量samples, 当调用Metric对象的方法add_sample()时,会初始化一个Sample对象,并将该对象加入到samples list当中。而Sample是一个namedtuple,具体如下。

Sample

Sample = namedtuple('Sample', ['name', 'labels', 'value', 'timestamp', 'exemplar'])
Sample.__new__.__defaults__ = (None, None) # 设置最右两个字段的默认值,即设置timestamp和exemplar的默认值为None

Exemplar = namedtuple('Exemplar', ['labels', 'value', 'timestamp'])
Exemplar.__new__.__defaults__ = (None,)

从这部分源码我们可以看出Sample本质上是一个namedtuple。需要注意的这里有个较为特别的语法__new__.__defaults__,这个语法用于为namedtuple设置默认值。

labels

之前还有一个问题就是self._samples是个啥?
看如下代码,会发现_samplesMetricWrapperBase的一个method。


class MetricWrapperBase(object):
    ...
    
    def _samples(self):
        if self._is_parent():
            return self._multi_samples()
        else:
            return self._child_samples()

    def _multi_samples(self):
        with self._lock:
            metrics = self._metrics.copy()
        for labels, metric in metrics.items():
            # 这里labels实际上是lablevalues tuple
            # series_labels大致是这样的:[('method', 'post'), ('path', '/submit')]
            series_labels = list(zip(self._labelnames, labels))
            
            # 这里的metric是child metric,所以_samples()调用的是_child_samples(), 也就是返回实际metric记录的数字
            for suffix, sample_labels, value in metric._samples():
                # 最终返回的结果大致是如下样子:
                # ('total', {'method': 'post', 'path': '/submit'}, 5)
                yield (suffix, dict(series_labels + list(sample_labels.items())), value)

    def _child_samples(self):  # pragma: no cover
        raise NotImplementedError('_child_samples() must be implemented by %r' % self)
    
    ...

刚开始看这段代码有点懵逼,为啥还有pareent, child,到底是什么意思呢?
后来经过仔细研读代码和分析,发现是由于metric的存储结构导致的。

我们以Counter为例,当我们的metric没有label的时候,那么存储时候只需要返回当前的数据即可,比如:

{"_total": 5, "_created": 1619692360.740}

但是当我们的metric有lable的时候,就需要分层存储了。先来看下我们是怎么使用Counter的

c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])

注意这里初始化完成之后,对象c只有label name,没有label value,这时候就会被认为是parent,这时_metrics会被初始化为一个dict

...
        if self._is_parent():
            # Prepare the fields needed for child metrics.
            self._lock = Lock()
            self._metrics = {}
...

然后在使用lables方法的时候,实际会返回一个全新的Collector对象

c.labels('get', '/').inc()
c.labels('post', '/submit').inc()

关键看这个labels方法的代码:

class MetricWrapperBase(object):
...

    def labels(self, *labelvalues, **labelkwargs)
        ...
        with self._lock:
            if labelvalues not in self._metrics:
                # 注意这里以labelvalues这个tuple作为key,以新生成的Collector作为value
                self._metrics[labelvalues] = self.__class__(
                    self._name,
                    documentation=self._documentation,
                    labelnames=self._labelnames,
                    unit=self._unit,
                    labelvalues=labelvalues,
                    **self._kwargs
                )
            return self._metrics[labelvalues]
        ...
...

关键点就在于使用label value的tuple做为key,然后生成了一个新的Collector对象作为value,存储在了_metric字典当中,需要注意的是,这个新的Collector对象,它的labelvalues不再是None,而是有实际的值。所以这时,这个新的Collector就是child。

至此,我们已经基本清楚了,Collector究竟是如何记录数据的,而上层调用collect()方法时,又是如何将数据收集和整理出来的。

最后上个图也许更加清晰


Screen Shot 2021-04-30 at 2.55.29 PM.png

References

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容