- node_exporter主流程源码追踪
- mem模块采集的流程
- 自定义一个模块的二开方法
- 自定义一个errLog模块,统计/var/log/message 中的错误日志
node_exporter主流程源码追踪
采集器的初始化
- 初始化handler
- 源码位置
node_exporter-release-1.2/node_exporter.go
http.Handle(*metricsPath, newHandler(!*disableExporterMetrics, *maxRequests, logger))
- 调用 newHandler,其中最关键一句是 innerHandler
if innerHandler, err := h.innerHandler(); err != nil {
panic(fmt.Sprintf("Couldn't create metrics handler: %s", err))
- 调用 innerHandler ,其中干这么几件事
- 根据过滤器初始化node_collector nc
- 把 nc注册到 prometheus的 registry 上
r.Register(nc)
- 源码如下
func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
nc, err := collector.NewNodeCollector(h.logger, filters...)
if err != nil {
return nil, fmt.Errorf("couldn't create collector: %s", err)
}
// Only log the creation of an unfiltered handler, which should happen
// only once upon startup.
if len(filters) == 0 {
level.Info(h.logger).Log("msg", "Enabled collectors")
collectors := []string{}
for n := range nc.Collectors {
collectors = append(collectors, n)
}
sort.Strings(collectors)
for _, c := range collectors {
level.Info(h.logger).Log("collector", c)
}
}
r := prometheus.NewRegistry()
r.MustRegister(version.NewCollector("node_exporter"))
if err := r.Register(nc); err != nil {
return nil, fmt.Errorf("couldn't register node collector: %s", err)
}
handler := promhttp.HandlerFor(
prometheus.Gatherers{h.exporterMetricsRegistry, r},
promhttp.HandlerOpts{
ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0),
ErrorHandling: promhttp.ContinueOnError,
MaxRequestsInFlight: h.maxRequests,
Registry: h.exporterMetricsRegistry,
},
)
if h.includeExporterMetrics {
// Note that we have to use h.exporterMetricsRegistry here to
// use the same promhttp metrics for all expositions.
handler = promhttp.InstrumentMetricHandler(
h.exporterMetricsRegistry, handler,
)
}
return handler, nil
}
NewNodeCollector 初始化nc
- 源码位置
node_exporter-release-1.2/collector/collector.go- 根据 各个模块注册的 collectorState获取他们的执行函数 collector
// NewNodeCollector creates a new NodeCollector.
func NewNodeCollector(logger log.Logger, filters ...string) (*NodeCollector, error) {
f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
if !exist {
return nil, fmt.Errorf("missing collector: %s", filter)
}
if !*enabled {
return nil, fmt.Errorf("disabled collector: %s", filter)
}
f[filter] = true
}
collectors := make(map[string]Collector)
initiatedCollectorsMtx.Lock()
defer initiatedCollectorsMtx.Unlock()
for key, enabled := range collectorState {
if !*enabled || (len(f) > 0 && !f[key]) {
continue
}
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key))
if err != nil {
return nil, err
}
collectors[key] = collector
initiatedCollectors[key] = collector
}
}
return &NodeCollector{Collectors: collectors, logger: logger}, nil
}
- 各个模块会调用 在各自的init 函数中调用 registerCollector ,向 collectorState和factories注册自己
func registerCollector(collector string, isDefaultEnabled bool, factory func(logger log.Logger) (Collector, error)) {
var helpDefaultState string
if isDefaultEnabled {
helpDefaultState = "enabled"
} else {
helpDefaultState = "disabled"
}
flagName := fmt.Sprintf("collector.%s", collector)
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", collector, helpDefaultState)
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)
flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(collector)).Bool()
collectorState[collector] = flag
factories[collector] = factory
}

执行采集
- prometheus sdk中执行采集就是执行对应的 Collect方法
- 源码位置
node_exporter-release-1.2/collector/collector.go
// Collect implements the prometheus.Collector interface.
func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
wg := sync.WaitGroup{}
wg.Add(len(n.Collectors))
for name, c := range n.Collectors {
go func(name string, c Collector) {
execute(name, c, ch, n.logger)
wg.Done()
}(name, c)
}
wg.Wait()
}
- 调用 execute函数,可以看到就是调用各个 collector模块的 update函数

mem采集模块的内容
- Update源码位置
node_exporter-release-1.2/collector/meminfo.go - 源码如下
// Update calls (*meminfoCollector).getMemInfo to get the platform specific
// memory metrics.
func (c *meminfoCollector) Update(ch chan<- prometheus.Metric) error {
var metricType prometheus.ValueType
memInfo, err := c.getMemInfo()
if err != nil {
return fmt.Errorf("couldn't get meminfo: %w", err)
}
level.Debug(c.logger).Log("msg", "Set node_mem", "memInfo", memInfo)
for k, v := range memInfo {
if strings.HasSuffix(k, "_total") {
metricType = prometheus.CounterValue
} else {
metricType = prometheus.GaugeValue
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, memInfoSubsystem, k),
fmt.Sprintf("Memory information field %s.", k),
nil, nil,
),
metricType, v,
)
}
return nil
}
- 内容分析
- 通过c.getMemInfo() 获取到memInfo
- 在linux 中memInfo中对应的就是 /proc/meminfo ,逐行解析
- 遍历推送即可
自定义一个模块的二开方法
-
collect/目录下新建一个 errlog.go
-
定义一个结构体体errLogCollector
type errLogCollector struct { logger log.Logger } -
写一个new xxCollector的工厂函数,一个参数为 log.logger
func NewErrLogCollector(logger log.Logger) (Collector, error) { return &errLogCollector{logger}, nil } -
写一个 init方法调用 registerCollector 注册自己
const (
errLogSubsystem = "errlog"
)
func init() {
registerCollector(errLogSubsystem, defaultEnabled, NewErrLogCollector)
}
- 给这个结构体绑定一个Update方法,签名如下
func (c *xxCollector) Update(ch chan<- prometheus.Metric) error {}
完成这个Update方法
流程说明
- 执行awk可以得到一个日志文件中错误日志按app name进行分布的结果
grep -i error /var/log/messages-20210814 |awk '{a[$5]++}END{for(i in a) print i,a[i]}'
telegraf: 3872
pushgateway: 2
kubelet: 16822
containerd: 9350
kernel: 5
grafana-server: 10
新增一个执行shell命令的函数
func errLogGrep() string {
errLogCmd := `grep -i error /var/log/messages |awk '{a[$5]++}END{for(i in a) print i,a[i]}'`
cmd := exec.Command("sh", "-c", errLogCmd)
output, _ := cmd.CombinedOutput()
return string(output)
}
然后在Update中按行遍历
-
按行遍历之后再按 :分割就能得到 appname 和value
-
然后将name中的 - 替换为_
-
value 字符串转换为int
-
然后构建一个 metric对象塞入ch中即可
func (c *errLogCollector) Update(ch chan<- prometheus.Metric) error { var metricType prometheus.ValueType metricType = prometheus.GaugeValue output := errLogGrep() for _, line := range strings.Split(output, "\n") { l := strings.Split(line, ":") if len(l) != 2 { continue } name := strings.TrimSpace(l[0]) value := strings.TrimSpace(l[1]) v, _ := strconv.Atoi(value) name = strings.Replace(name, "-", "_", -1) level.Debug(c.logger).Log("msg", "Set errLog", "name", name, "value", value) ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( prometheus.BuildFQName(namespace, errLogSubsystem, name), fmt.Sprintf("/var/log/message err log %s.", name), nil, nil, ), metricType, float64(v), ) } return nil }
运行程序
-
打包
-
编译 go build -v node_exporter.go
-
然后运行 ./node_exporter --web.listen-address=":9101"
-
查询errlog metrics
[root@prome-master01 ~]# curl -s localhost:9101/metrics |grep node_errlog # HELP node_errlog_containerd /var/log/message err log containerd. # TYPE node_errlog_containerd gauge node_errlog_containerd 9350 # HELP node_errlog_grafana_server /var/log/message err log grafana_server. # TYPE node_errlog_grafana_server gauge node_errlog_grafana_server 10 # HELP node_errlog_kernel /var/log/message err log kernel. # TYPE node_errlog_kernel gauge node_errlog_kernel 5 # HELP node_errlog_kubelet /var/log/message err log kubelet. # TYPE node_errlog_kubelet gauge node_errlog_kubelet 16822 # HELP node_errlog_pushgateway /var/log/message err log pushgateway. # TYPE node_errlog_pushgateway gauge node_errlog_pushgateway 2 # HELP node_errlog_telegraf /var/log/message err log telegraf. # TYPE node_errlog_telegraf gauge node_errlog_telegraf 3872
完整的errlog.go
package collector
import (
"fmt"
"os/exec"
"strconv"
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)
const (
errLogSubsystem = "errlog"
)
type errLogCollector struct {
logger log.Logger
}
func init() {
registerCollector(errLogSubsystem, defaultEnabled, NewErrLogCollector)
}
// NewMeminfoCollector returns a new Collector exposing memory stats.
func NewErrLogCollector(logger log.Logger) (Collector, error) {
return &errLogCollector{logger}, nil
}
func errLogGrep() string {
errLogCmd := `grep -i error /var/log/messages |awk '{a[$5]++}END{for(i in a) print i,a[i]}'`
cmd := exec.Command("sh", "-c", errLogCmd)
output, _ := cmd.CombinedOutput()
return string(output)
}
// Update calls (*meminfoCollector).getMemInfo to get the platform specific
// memory metrics.
func (c *errLogCollector) Update(ch chan<- prometheus.Metric) error {
var metricType prometheus.ValueType
metricType = prometheus.GaugeValue
output := errLogGrep()
for _, line := range strings.Split(output, "\n") {
l := strings.Split(line, ":")
if len(l) != 2 {
continue
}
name := strings.TrimSpace(l[0])
value := strings.TrimSpace(l[1])
v, _ := strconv.Atoi(value)
name = strings.Replace(name, "-", "_", -1)
level.Debug(c.logger).Log("msg", "Set errLog", "name", name, "value", value)
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, errLogSubsystem, name),
fmt.Sprintf("/var/log/message err log %s.", name),
nil, nil,
),
metricType, float64(v),
)
}
return nil
}
评论区