侧边栏壁纸
  • 累计撰写 28 篇文章
  • 累计创建 23 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

node_exporter采集原理和二开自定义模块

zhanjie.me
2024-09-03 / 0 评论 / 0 点赞 / 9 阅读 / 0 字
  • node_exporter主流程源码追踪
  • mem模块采集的流程
  • 自定义一个模块的二开方法
  • 自定义一个errLog模块,统计/var/log/message 中的错误日志

node_exporter主流程源码追踪

采集器的初始化

  • 初始化handler
  • 源码位置 node_exporter-release-1.2/node_exporter.go
http.Handle(*metricsPath, newHandler(!*disableExporterMetrics, *maxRequests, logger))
  • 调用 newHandler,其中最关键一句是 innerHandler
    if innerHandler, err := h.innerHandler(); err != nil {
        panic(fmt.Sprintf("Couldn't create metrics handler: %s", err))
  • 调用 innerHandler ,其中干这么几件事
    • 根据过滤器初始化node_collector nc
    • 把 nc注册到 prometheus的 registry 上r.Register(nc)
  • 源码如下
func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
    nc, err := collector.NewNodeCollector(h.logger, filters...)
    if err != nil {
        return nil, fmt.Errorf("couldn't create collector: %s", err)
    }

    // Only log the creation of an unfiltered handler, which should happen
    // only once upon startup.
    if len(filters) == 0 {
        level.Info(h.logger).Log("msg", "Enabled collectors")
        collectors := []string{}
        for n := range nc.Collectors {
            collectors = append(collectors, n)
        }
        sort.Strings(collectors)
        for _, c := range collectors {
            level.Info(h.logger).Log("collector", c)
        }
    }

    r := prometheus.NewRegistry()
    r.MustRegister(version.NewCollector("node_exporter"))
    if err := r.Register(nc); err != nil {
        return nil, fmt.Errorf("couldn't register node collector: %s", err)
    }
    handler := promhttp.HandlerFor(
        prometheus.Gatherers{h.exporterMetricsRegistry, r},
        promhttp.HandlerOpts{
            ErrorLog:            stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0),
            ErrorHandling:       promhttp.ContinueOnError,
            MaxRequestsInFlight: h.maxRequests,
            Registry:            h.exporterMetricsRegistry,
        },
    )
    if h.includeExporterMetrics {
        // Note that we have to use h.exporterMetricsRegistry here to
        // use the same promhttp metrics for all expositions.
        handler = promhttp.InstrumentMetricHandler(
            h.exporterMetricsRegistry, handler,
        )
    }
    return handler, nil
}

NewNodeCollector 初始化nc

  • 源码位置 node_exporter-release-1.2/collector/collector.go
    • 根据 各个模块注册的 collectorState获取他们的执行函数 collector
// NewNodeCollector creates a new NodeCollector.
func NewNodeCollector(logger log.Logger, filters ...string) (*NodeCollector, error) {
    f := make(map[string]bool)
    for _, filter := range filters {
        enabled, exist := collectorState[filter]
        if !exist {
            return nil, fmt.Errorf("missing collector: %s", filter)
        }
        if !*enabled {
            return nil, fmt.Errorf("disabled collector: %s", filter)
        }
        f[filter] = true
    }
    collectors := make(map[string]Collector)
    initiatedCollectorsMtx.Lock()
    defer initiatedCollectorsMtx.Unlock()
    for key, enabled := range collectorState {
        if !*enabled || (len(f) > 0 && !f[key]) {
            continue
        }
        if collector, ok := initiatedCollectors[key]; ok {
            collectors[key] = collector
        } else {
            collector, err := factories[key](log.With(logger, "collector", key))
            if err != nil {
                return nil, err
            }
            collectors[key] = collector
            initiatedCollectors[key] = collector
        }
    }
    return &NodeCollector{Collectors: collectors, logger: logger}, nil
}
  • 各个模块会调用 在各自的init 函数中调用 registerCollector ,向 collectorState和factories注册自己
func registerCollector(collector string, isDefaultEnabled bool, factory func(logger log.Logger) (Collector, error)) {
    var helpDefaultState string
    if isDefaultEnabled {
        helpDefaultState = "enabled"
    } else {
        helpDefaultState = "disabled"
    }

    flagName := fmt.Sprintf("collector.%s", collector)
    flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", collector, helpDefaultState)
    defaultValue := fmt.Sprintf("%v", isDefaultEnabled)

    flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(collector)).Bool()
    collectorState[collector] = flag

    factories[collector] = factory
}

image-ftlxytre.png

执行采集

  • prometheus sdk中执行采集就是执行对应的 Collect方法
  • 源码位置 node_exporter-release-1.2/collector/collector.go
// Collect implements the prometheus.Collector interface.
func (n NodeCollector) Collect(ch chan<- prometheus.Metric) {
    wg := sync.WaitGroup{}
    wg.Add(len(n.Collectors))
    for name, c := range n.Collectors {
        go func(name string, c Collector) {
            execute(name, c, ch, n.logger)
            wg.Done()
        }(name, c)
    }
    wg.Wait()
}
  • 调用 execute函数,可以看到就是调用各个 collector模块的 update函数
    image-slxfcwvs.png

mem采集模块的内容

  • Update源码位置 node_exporter-release-1.2/collector/meminfo.go
  • 源码如下
// Update calls (*meminfoCollector).getMemInfo to get the platform specific
// memory metrics.
func (c *meminfoCollector) Update(ch chan<- prometheus.Metric) error {
    var metricType prometheus.ValueType
    memInfo, err := c.getMemInfo()
    if err != nil {
        return fmt.Errorf("couldn't get meminfo: %w", err)
    }
    level.Debug(c.logger).Log("msg", "Set node_mem", "memInfo", memInfo)
    for k, v := range memInfo {
        if strings.HasSuffix(k, "_total") {
            metricType = prometheus.CounterValue
        } else {
            metricType = prometheus.GaugeValue
        }
        ch <- prometheus.MustNewConstMetric(
            prometheus.NewDesc(
                prometheus.BuildFQName(namespace, memInfoSubsystem, k),
                fmt.Sprintf("Memory information field %s.", k),
                nil, nil,
            ),
            metricType, v,
        )
    }
    return nil
}
  • 内容分析
    • 通过c.getMemInfo() 获取到memInfo
    • 在linux 中memInfo中对应的就是 /proc/meminfo ,逐行解析
    • 遍历推送即可

自定义一个模块的二开方法

  • collect/目录下新建一个 errlog.go

  • 定义一个结构体体errLogCollector

    type errLogCollector struct {
      logger log.Logger
    }
    
  • 写一个new xxCollector的工厂函数,一个参数为 log.logger

    func NewErrLogCollector(logger log.Logger) (Collector, error) {
      return &errLogCollector{logger}, nil
    }
    
  • 写一个 init方法调用 registerCollector 注册自己

const (
    errLogSubsystem = "errlog"
)
func init() {
    registerCollector(errLogSubsystem, defaultEnabled, NewErrLogCollector)
}
  • 给这个结构体绑定一个Update方法,签名如下
func (c *xxCollector) Update(ch chan<- prometheus.Metric) error {}

完成这个Update方法

流程说明

  • 执行awk可以得到一个日志文件中错误日志按app name进行分布的结果
grep -i error /var/log/messages-20210814 |awk '{a[$5]++}END{for(i in a) print i,a[i]}'
telegraf: 3872
pushgateway: 2
kubelet: 16822
containerd: 9350
kernel: 5
grafana-server: 10

新增一个执行shell命令的函数

func errLogGrep() string {
    errLogCmd := `grep -i error /var/log/messages |awk '{a[$5]++}END{for(i in a) print i,a[i]}'`
    cmd := exec.Command("sh", "-c", errLogCmd)
    output, _ := cmd.CombinedOutput()
    return string(output)

}

然后在Update中按行遍历

  • 按行遍历之后再按 :分割就能得到 appname 和value

  • 然后将name中的 - 替换为_

  • value 字符串转换为int

  • 然后构建一个 metric对象塞入ch中即可

    func (c *errLogCollector) Update(ch chan<- prometheus.Metric) error {
      var metricType prometheus.ValueType
      metricType = prometheus.GaugeValue
      output := errLogGrep()
      for _, line := range strings.Split(output, "\n") {
          l := strings.Split(line, ":")
          if len(l) != 2 {
              continue
          }
          name := strings.TrimSpace(l[0])
          value := strings.TrimSpace(l[1])
          v, _ := strconv.Atoi(value)
          name = strings.Replace(name, "-", "_", -1)
          level.Debug(c.logger).Log("msg", "Set errLog", "name", name, "value", value)
    
          ch <- prometheus.MustNewConstMetric(
              prometheus.NewDesc(
                  prometheus.BuildFQName(namespace, errLogSubsystem, name),
                  fmt.Sprintf("/var/log/message err log %s.", name),
                  nil, nil,
              ),
              metricType, float64(v),
          )
      }
      return nil
    }
    

运行程序

  • 打包

  • 编译 go build -v node_exporter.go

  • 然后运行 ./node_exporter --web.listen-address=":9101"

  • 查询errlog metrics

    [root@prome-master01 ~]# curl -s localhost:9101/metrics |grep node_errlog
    # HELP node_errlog_containerd /var/log/message err log containerd.
    # TYPE node_errlog_containerd gauge
    node_errlog_containerd 9350
    # HELP node_errlog_grafana_server /var/log/message err log grafana_server.
    # TYPE node_errlog_grafana_server gauge
    node_errlog_grafana_server 10
    # HELP node_errlog_kernel /var/log/message err log kernel.
    # TYPE node_errlog_kernel gauge
    node_errlog_kernel 5
    # HELP node_errlog_kubelet /var/log/message err log kubelet.
    # TYPE node_errlog_kubelet gauge
    node_errlog_kubelet 16822
    # HELP node_errlog_pushgateway /var/log/message err log pushgateway.
    # TYPE node_errlog_pushgateway gauge
    node_errlog_pushgateway 2
    # HELP node_errlog_telegraf /var/log/message err log telegraf.
    # TYPE node_errlog_telegraf gauge
    node_errlog_telegraf 3872
    

完整的errlog.go

package collector

import (
    "fmt"
    "os/exec"
    "strconv"
    "strings"

    "github.com/go-kit/log"
    "github.com/go-kit/log/level"
    "github.com/prometheus/client_golang/prometheus"
)

const (
    errLogSubsystem = "errlog"
)

type errLogCollector struct {
    logger log.Logger
}

func init() {
    registerCollector(errLogSubsystem, defaultEnabled, NewErrLogCollector)
}

// NewMeminfoCollector returns a new Collector exposing memory stats.
func NewErrLogCollector(logger log.Logger) (Collector, error) {
    return &errLogCollector{logger}, nil
}

func errLogGrep() string {
    errLogCmd := `grep -i error /var/log/messages |awk '{a[$5]++}END{for(i in a) print i,a[i]}'`
    cmd := exec.Command("sh", "-c", errLogCmd)
    output, _ := cmd.CombinedOutput()
    return string(output)

}

// Update calls (*meminfoCollector).getMemInfo to get the platform specific
// memory metrics.
func (c *errLogCollector) Update(ch chan<- prometheus.Metric) error {
    var metricType prometheus.ValueType
    metricType = prometheus.GaugeValue
    output := errLogGrep()
    for _, line := range strings.Split(output, "\n") {
        l := strings.Split(line, ":")
        if len(l) != 2 {
            continue
        }
        name := strings.TrimSpace(l[0])
        value := strings.TrimSpace(l[1])
        v, _ := strconv.Atoi(value)
        name = strings.Replace(name, "-", "_", -1)
        level.Debug(c.logger).Log("msg", "Set errLog", "name", name, "value", value)

        ch <- prometheus.MustNewConstMetric(
            prometheus.NewDesc(
                prometheus.BuildFQName(namespace, errLogSubsystem, name),
                fmt.Sprintf("/var/log/message err log %s.", name),
                nil, nil,
            ),
            metricType, float64(v),
        )
    }
    return nil
}
0

评论区