背景
Prometheus是最近流行的监控报警系统,具体大家可以搜网上的文章来了解,而由于我司目前的应用使用了Django框架来做为后端应用,因此需要研究如何将Prometheus与Django结合在一起使用,因此有了接下来的源码研究。
在分析源代码之前,先要知道为什么需要分析源代码,对于我来说,有几个问题是我想要搞明白的:
- django-prometheus是如何注册/metrics uri并通过接口提供服务的?
- django-prometheus到底是怎样将数据从不同的接口收集上来的?
- django-prometheus收集上来Metrics后是否需要存储,如果需要,那么存储在什么地方了?
而在搞清楚这些问题的时候,发现django-prometheus又调用了prometheus_client,又不可避免的有了针对prometheus_client的问题,所以又不得不去看prometheus_client的源码,也因此有了本文。
第一篇我们已经基本回答了第一个问题,即django-prometheus
究竟是如何通过/metrics
提供接口服务的。这一篇我们就接着探寻其它问题的答案。
源码分析
Collector
首先,我们需要知道Collector在应用程序中具体是如何采集数据的,先看几个例子:
from prometheus_client import Counter, Gauge, Histogram
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) # 此为parent对象
c.labels('get', '/').inc() # 注意labels的使用
c.labels('post', '/submit').inc()
g = Gauge('gg', 'A gauge')
h = Histogram('hh', 'A histogram', buckets=(-5, 0, 5))
s = Summary('ss', 'A summary', ['label1', 'label2']) # metrics名字, metrics说明, metrics支持的label
# Gauge 有三种方法可以用来改变其记录值
g.inc() # 加1操作
g.set(5) # 设定记录值为5
g.dec(2) # 减2操作
# Histogram 使用observe()方法来记录值
h.observe(5)
我们以Counter
的inc()
方法为例,看下它是如何记录数值的。
class Counter(MetricWrapperBase):
....
def _metric_init(self):
self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
self._labelvalues)
self._created = time.time()
def inc(self, amount=1):
"""Increment counter by the given amount."""
if amount < 0:
raise ValueError('Counters can only be incremented by non-negative amounts.')
self._value.inc(amount) # 这里的self._value是在_metric_init中定义
...
- 我们在使用
Counter
对象的inc()
方法时本质上是调用了ValueClass
的inc()
方法 -
self._value
是在_metric_init()
方法中初始化的,而_metric_init()
是在Collector初始化的时候被调用的。 -
_metric_init()
是在每个Collector具体实现的类中必须要实现的方法,这个方法会被__init__()
初始化方法所调用。
而这里ValueClass
具体又是什么呢?
# prometheus_client/values.py
class MutexValue(object):
"""A float protected by a mutex."""
_multiprocess = False
def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
self._value = 0.0 # 定义了一个浮点数
self._lock = Lock() # 初始化一个线程锁,用于保证线程安全
def inc(self, amount): # 真正的inc操作实现
with self._lock:
self._value += amount
def set(self, value):
with self._lock:
self._value = value
def get(self):
with self._lock:
return self._value
...
def get_value_class():
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an environment variable.
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
return MultiProcessValue()
else:
return MutexValue # 重点关注这里,返回的是MutexValue类
ValueClass = get_value_class() # 在这里定义ValueClass
- 不考虑多进程的情况,那么
ValueClass
实际就是MutexValue
- 之所以使用
MutexValue
类,而不是直接使用原生的float,是由于增加了一个线程锁作为信号量,保证数值的更改是线程安全的。 - 至此,我们知道所有的数据本质上都是在内存中的,并没有做持久化,理论上当我们调用
collect()
去收集metrics的时候也是从内存中获取的(即从存于内存的对象中获?。?/li>
那接下来就让我们看下具体collect()
做了什么。
class MetricWrapperBase(object):
...
def _get_metric(self):
return Metric(self._name, self._documentation, self._type, self._unit)
def collect(self):
metric = self._get_metric()
for suffix, labels, value in self._samples():
metric.add_sample(self._name + suffix, labels, value)
return [metric]
...
collect()
主要做了什么事呢?就是获取到Metric
对象(命名为metric
),然后将samples加入到metric
中,然后再将metric
返回.
这里我们又会遇到以下几个问题:
- Metric究竟是个啥?
-
self._samples
是个啥? -
add_sample
干了啥?
Metric
为了回答上边的问题,我们先来看下Metric
的源码:
# prometheus_client/metrics_core.py
class Metric(object):
"""A single metric family and its samples.
This is intended only for internal use by the instrumentation client.
Custom collectors should use GaugeMetricFamily, CounterMetricFamily
and SummaryMetricFamily instead.
"""
def __init__(self, name, documentation, typ, unit=''):
if unit and not name.endswith("_" + unit):
name += "_" + unit
if not METRIC_NAME_RE.match(name):
raise ValueError('Invalid metric name: ' + name)
self.name = name
self.documentation = documentation
self.unit = unit
if typ == 'untyped':
typ = 'unknown'
if typ not in METRIC_TYPES:
raise ValueError('Invalid metric type: ' + typ)
self.type = typ # 标明是什么类型的Metric,比如gauge, 还是counter
self.samples = [] # 注意这里samples是一个list
def add_sample(self, name, labels, value, timestamp=None, exemplar=None):
"""Add a sample to the metric.
Internal-only, do not use."""
self.samples.append(Sample(name, labels, value, timestamp, exemplar))
...
从这段代码可以看出Metric
维护了一个成员变量samples
, 当调用Metric
对象的方法add_sample()
时,会初始化一个Sample
对象,并将该对象加入到samples
list当中。而Sample
是一个namedtuple,具体如下。
Sample
Sample = namedtuple('Sample', ['name', 'labels', 'value', 'timestamp', 'exemplar'])
Sample.__new__.__defaults__ = (None, None) # 设置最右两个字段的默认值,即设置timestamp和exemplar的默认值为None
Exemplar = namedtuple('Exemplar', ['labels', 'value', 'timestamp'])
Exemplar.__new__.__defaults__ = (None,)
从这部分源码我们可以看出Sample本质上是一个namedtuple。需要注意的这里有个较为特别的语法__new__.__defaults__
,这个语法用于为namedtuple设置默认值。
labels
之前还有一个问题就是self._samples
是个啥?
看如下代码,会发现_samples
是MetricWrapperBase
的一个method。
class MetricWrapperBase(object):
...
def _samples(self):
if self._is_parent():
return self._multi_samples()
else:
return self._child_samples()
def _multi_samples(self):
with self._lock:
metrics = self._metrics.copy()
for labels, metric in metrics.items():
# 这里labels实际上是lablevalues tuple
# series_labels大致是这样的:[('method', 'post'), ('path', '/submit')]
series_labels = list(zip(self._labelnames, labels))
# 这里的metric是child metric,所以_samples()调用的是_child_samples(), 也就是返回实际metric记录的数字
for suffix, sample_labels, value in metric._samples():
# 最终返回的结果大致是如下样子:
# ('total', {'method': 'post', 'path': '/submit'}, 5)
yield (suffix, dict(series_labels + list(sample_labels.items())), value)
def _child_samples(self): # pragma: no cover
raise NotImplementedError('_child_samples() must be implemented by %r' % self)
...
刚开始看这段代码有点懵逼,为啥还有pareent
, child
,到底是什么意思呢?
后来经过仔细研读代码和分析,发现是由于metric的存储结构导致的。
我们以Counter
为例,当我们的metric没有label的时候,那么存储时候只需要返回当前的数据即可,比如:
{"_total": 5, "_created": 1619692360.740}
但是当我们的metric有lable的时候,就需要分层存储了。先来看下我们是怎么使用Counter的
c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
注意这里初始化完成之后,对象c
只有label name,没有label value,这时候就会被认为是parent
,这时_metrics
会被初始化为一个dict
...
if self._is_parent():
# Prepare the fields needed for child metrics.
self._lock = Lock()
self._metrics = {}
...
然后在使用lables
方法的时候,实际会返回一个全新的Collector对象
c.labels('get', '/').inc()
c.labels('post', '/submit').inc()
关键看这个labels方法的代码:
class MetricWrapperBase(object):
...
def labels(self, *labelvalues, **labelkwargs)
...
with self._lock:
if labelvalues not in self._metrics:
# 注意这里以labelvalues这个tuple作为key,以新生成的Collector作为value
self._metrics[labelvalues] = self.__class__(
self._name,
documentation=self._documentation,
labelnames=self._labelnames,
unit=self._unit,
labelvalues=labelvalues,
**self._kwargs
)
return self._metrics[labelvalues]
...
...
关键点就在于使用label value的tuple做为key,然后生成了一个新的Collector对象作为value,存储在了_metric
字典当中,需要注意的是,这个新的Collector对象,它的labelvalues不再是None,而是有实际的值。所以这时,这个新的Collector就是child。
至此,我们已经基本清楚了,Collector究竟是如何记录数据的,而上层调用collect()
方法时,又是如何将数据收集和整理出来的。
最后上个图也许更加清晰