OpenTelemetry Collector(Transform)压力测试方案 🚀

目标:验证在高并发与突发场景下,已优化的 Alloy/otelcol 配置(含 memory_limiter、queued_retry、按需 JSON 解析等)能否防止 OOM、控制队列增长,并评估解析/flatten 的内存热点。


环境准备 🔧

  • 在测试节点部署更新后的 scripts/alloy/1.alloy。确保 Collector 可访问 metrics 与 pprof 接口(例如 http://<collector>:13133/metricshttp://<collector>:13133/debug/pprof/)。
  • 在接收文件路径下(配置里的 include 路径)准备可写的测试文件目录,Collector 有权限写入。
  • 如果在 Kubernetes 中测试,准备一个模拟慢出口服务(示例在下文)。

测试分阶段(逐步加压)📈

下面为每个阶段的**详细执行步骤(可复制运行)**与推荐配置/判定阈值。

1) 基线(Baseline) — 2 分钟

目标:收集空载时基线指标与 heap 快照。 步骤:

  1. 停止任何 log_generator。确保 Collector 运行正常。
  2. 获取 metrics:
1
curl -s http://<collector>:13133/metrics | egrep "process_resident_memory_bytes|process_virtual_memory_bytes|go_memstats_|memory_limiter|queued_retry|dropped"
  1. 抓取 heap profile:
1
curl -s http://<collector>:13133/debug/pprof/heap > /tmp/collector_baseline_heap.pb
  1. 记录 baseline:RSS、heap、当前 queued_retry 队列长度(若有)、dropped 计数。

判定:基线 RSS 应低于 memory_limiter.limit_mib 的 ~60%(便于后续上升空间)。


2) 轻负载(Expected) — 5–10 分钟

目标:以预期生产 QPS 验证稳定性。 步骤:

  1. 以期望 QPS 启动 log_generator,单进程示例(每秒 200 条):
1
python3 scripts/perf/log_generator.py /host/data/k8s/.../info.json 200 &
  1. 并发扩展:在测试节点上并行多进程(例如 4 个)
1
for i in 1 2 3 4; do python3 scripts/perf/log_generator.py /host/data/k8s/.../info.json 200 & done
  1. 运行 5–10 分钟,期间每分钟采集 metrics 与一次 heap 快照。

检查点:

  • queued_retry 队列长度应为 0~低值;
  • dropped 数接近 0;
  • RSS 与 heap 小幅波动,GC 正常触发。

3) 峰值/突发(Spike) — 1–3 分钟

目标:模拟短时 3–5× 峰值突发,观察降级/保护行为。 步骤:

  1. 瞬时提高产生速率(例如单进程改为 1500 QPS 或从 4 个进程变为 20 个)。
1
2
# 单机快速突发示例(每秒 2000 条)
python3 scripts/perf/log_generator.py /host/data/k8s/.../info.json 2000 &

或并行多个进程:

1
for i in {1..20}; do python3 scripts/perf/log_generator.py /host/data/k8s/.../info.json 100 & done
  1. 同时启动慢下游(可选)以放大下游瓶颈影响:
1
python3 scripts/perf/slow_exporter.py &
  1. 持续 1–3 分钟,实时记录 metrics(每 15s):
1
watch -n 15 'curl -s http://<collector>:13133/metrics | egrep "process_resident_memory_bytes|queued_retry|queued_retry_queue_length|memory_limiter|dropped"'

判定:

  • 若 memory_limiter 触发(查看 memory_limiter.* 指标或 Collector 日志),说明保护生效;
  • queued_retry 队列可能短时增大,但应在突发结束后逐步消化;
  • 若出现大量 dropped 并且队列长期饱和或 RSS 接近容器限制 → 需要调优(增 consumers、降低 batch 大小、或更严格早期过滤)。

4) 持续高压(Sustained) — 10–30 分钟

目标:在较长时间窗口内验证系统稳定性与降级表现。 步骤:

  1. 启动持续高负载(例如 500~2000 QPS / 取决于你的环境),在多主机上并发运行 log_generator。
  2. 若想加大压力,配合 slow_exporter 来减慢 downstream 吞吐。
  3. 每 2 分钟抓取一次 heap profile 与 metrics;记录 queued_retry 队列长度与 dropped 数:
1
2
3
4
# 抓 metrics
curl -s http://<collector>:13133/metrics | egrep "process_resident_memory_bytes|memory_limiter|queued_retry_queue_length|queued_retry_.*|dropped"
# 抓 heap(并保存多份用于对比)
curl -s http://<collector>:13133/debug/pprof/heap > /tmp/collector_heap_$(date +%s).pb
  1. 若 memory_limiter 持续触发或 queued_retry 长期满,立即停止扩压并开始诊断(抓更多 pprof,开 debug 日志)。

判定成功条件(Sustained):

  • 系统未被 OOM kill;
  • queued_retry 即使达到高值也能在停止扩压后在合理时间内(几分钟)逐步下降;
  • dropped 在可接受范围(例如 <1% 的突发期间总量,视业务可接受度而定)。

推荐的临时 Collector 配置片段(用于测试期间的快速调整)

  • memory_limiter:
1
2
3
4
5
processors:
  memory_limiter:
    check_interval: 5s
    limit_mib: 1024    # 根据测试节点内存按需调整
    spike_limit_mib: 256
  • queued_retry(logs):
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
processors:
  queued_retry/logs_retry:
    queue:
      enabled: true
      queue_size: 2000
      num_consumers: 4
    retry_on_failure:
      enabled: true
      initial_interval: 1s
      max_interval: 30s
      max_elapsed_time: 300s
  • batch(减少单批内存):
1
2
3
4
processors:
  batch/log_collector_batch:
    send_batch_size: 50
    timeout: 5s

在测试中可以动态调整 queue_size / num_consumers / limit_mib 来观察系统行为并找到合适值。


(以下为原文其它部分,保持不变)


流量生成器(可复制运行)🐍

把下面脚本保存为 scripts/perf/log_generator.py,按需在多台机器或多进程运行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# scripts/perf/log_generator.py
import time, json, random, sys
path = sys.argv[1] if len(sys.argv) > 1 else "/tmp/test.log"
qps = float(sys.argv[2]) if len(sys.argv) > 2 else 100.0
interval = 1.0 / qps
with open(path, "a") as f:
    while True:
        obj = {
            "msg": "ok",
            "traceId": "t"+str(random.randint(1,1000000)),
            "LOGX_TRACE_LOG": True,
            "logger": "app",
            "level": "INFO",
            "date": "now"
        }
        f.write(json.dumps(obj) + "\n")
        f.flush()
        time.sleep(interval)

运行示例:

1
2
# 每秒 500 条
python3 scripts/perf/log_generator.py /host/data/k8s/.../info.json 500

要产生大量“非 trace”日志以验证按需解析:把 LOGX_TRACE_LOG 设为 False


模拟慢下游(Exporter 服务)🐢

把下面脚本保存为 scripts/perf/slow_exporter.py 并在 9481 端口运行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# scripts/perf/slow_exporter.py
from http.server import BaseHTTPRequestHandler, HTTPServer
import time
class SlowHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        time.sleep(2.0)  # 模拟慢处理
        self.send_response(200)
        self.end_headers()
        self.wfile.write(b"ok")
HTTPServer(("0.0.0.0", 9481), SlowHandler).serve_forever()

将 Collector exporter 指向该服务以触发 queued_retry 和队列积压行为。


采集指标与 pprof(诊断命令)🔍

  • 查看内存与相关指标:
1
curl -s http://<collector>:13133/metrics | egrep "process_resident_memory_bytes|memory_limiter|queued_retry|dropped"
  • 获取 heap profile:
1
2
3
curl -s http://<collector>:13133/debug/pprof/heap > heap.pb
# 在本地使用 go tool pprof 查看
go tool pprof -http=:7070 heap.pb
  • 获取短时 CPU profile(30s):
1
go tool pprof -http=:7071 http://<collector>:13133/debug/pprof/profile?seconds=30
  • 开启 Collector debug 日志(临时):在 Collector 启动配置中设置 service.telemetry.logs.level=debug,观察 TransformProcessor 的 TransformContext 日志(会打印 cache 内容,注意日志量大)。

判定准则(通过 / 失败)✅ / ⚠️

通过:

  • 在峰值/持续阶段 Collector 不会 OOM;RSS 受 memory_limiter 控制且在预期范围内。
  • queued_retry 队列有可控增长并能在恢复后消化,dropped 保持在接受范围(接近 0)。
  • pprof 定位热点在预期模块(例如 json flatten),便于下一步优化。

失败:

  • Collector 被 OOM kill 或内存持续上升不回落。
  • queued_retry 或其他队列长期满且 dropped 激增。

优化要点(摘要) ✅

关键措施(按优先级)

  1. memory_limiter(第一优先):设置并监控,防止 OOM;示例:limit_mib: 1024check_interval: 5s。目的:在内存突发时自动保护 Collector。
  2. queued_retry / 有界队列(第二优先):在出口前使用有界队列和重试(示例:queue_size: 2000, num_consumers: 4),避免下游慢导致内存无限积压。
  3. 早期过滤(第三优先):在 receiver/operator 层尽早丢弃无用日志(例如只保留 LOGX_TRACE_LOG==true),以减少不必要的解析与内存占用。
  4. 按需 JSON 解析与最小化 flatten:仅当日志可能含 trace 字段时解析 JSON 或只提取必要字段(msgtraceId 等),避免 full-flatten。
  5. 及时回收临时字段:在 Transformer 中尽快 delete_key 中间结构(如 attributes.json),释放短期内存。
  6. 调整 batch/并发参数:减小 send_batch_size、合理设置 num_consumers,降低单次内存占用并加速下游消费。
  7. 监控与 Profiling(持续):持续采集 process_resident_memory_bytesmemory_limiterqueued_retry 等指标,并周期性抓取 heap profile 定位热点。
  8. 持久化队列作为最后手段:若不能接受数据丢失,考虑使用持久化队列(Kafka/SQS)但需权衡复杂度与延迟。

上述每项优化都可以通过本文前文中的示例配置片段快速应用(memory_limiter / queued_retry / batch / operator 条件化等)。

优化配置示例(可直接复制) 🛠️

下面包含可直接拷贝到 Alloy 或 Collector 的配置片段,覆盖前文提出的优化要点:

1) Receiver:按需 JSON 解析(Alloy operator)

目的:只在可能含 trace 信息的日志上做解析与 flatten,节省大量内存。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Alloy receiver operators 示例简化
{
  type       = "json_parser",
  parse_from = "body",
  parse_to   = "attributes.json",
  on_error   = "drop",
  // 只在 body  trace 指示时解析
  if         = "IsMatch(body, \"LOGX_TRACE_LOG\") or IsMatch(body, \"traceId\") or IsMatch(body, \"apmSpanId\")",
}

{
  type     = "flatten",
  field    = "attributes.json",
  on_error = "drop",
  if       = "attributes[\"attributes.json\"] != nil or IsMatch(body, \"LOGX_TRACE_LOG\")",
}

2) Transform 清理临时字段(OTTL 语句示例)

目的:在 Transform 阶段尽快删除中间结构(如 attributes.json),释放内存。

1
2
3
4
5
6
7
processors:
  transform:
    log_statements:
      - statements:
          - merge_maps(log.cache, ParseJSON(log.body), "upsert") where IsString(log.body) and IsMatch(log.body, "^[\\{\\[]")
          - set(log.attributes["attr1"], log.cache["attr1"]) where log.cache["attr1"] != nil
          - delete_key(log.attributes, "attributes.json") where log.attributes["attributes.json"] != nil

3) memory_limiter(保护内存)

目的:避免因短时峰值导致 OOM,作为第一道防线。

1
2
3
4
5
processors:
  memory_limiter:
    check_interval: 5s
    limit_mib: 1024      # 根据容器/节点内存调整(建议占用 60%~80% 左右)
    spike_limit_mib: 256

4) queued_retry(有界队列 + 重试)

目的:在出口前限制内存队列,避免 downstream 慢导致无限积压。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
processors:
  queued_retry/logs_retry:
    queue:
      enabled: true
      queue_size: 2000
      num_consumers: 4
    retry_on_failure:
      enabled: true
      initial_interval: 1s
      max_interval: 30s
      max_elapsed_time: 300s

(可为 traces 再建一个 trace_retry,使用更小的 queue_size)

5) batch(减小单批内存占用)

1
2
3
4
processors:
  batch/log_collector_batch:
    send_batch_size: 50
    timeout: 5s

6) 监控/诊断指标(关注项)

  • process_resident_memory_bytes / go_memstats_alloc_bytes
  • memory_limiter.* (观察触发次数、最近动作)
  • queued_retry_queue_length / queued_retry_* (队列长度、丢弃数)
  • collector pipeline dropped_* 指标

诊断 + 后续调优建议 💡

  • 若 memory_limiter 经常触发:逐步调高 limit_mib 或进一步收紧早期过滤(只在 LOGX_TRACE_LOG==true 时 full-flatten)。
  • 若热点出在 flatten/解析:移除全量 flatten,只提取必要字段(msg, traceId, LOGX_TRACE_LOG 等)。
  • 若 queued_retry 长期满:增加 num_consumers 或减小 batch 大小、提高 downstream 吞吐(或扩容)。
  • 根据 pprof 输出,定位长寿命 map 或大量 transient allocations,优化对应 OTTL/transform 文本或 operator 流程。

快捷检查清单 🧾

  • Collector 的 metrics 与 pprof 可访问
  • baseline heap/profile 已采集
  • 运行轻负载并收集数据
  • 做 spike & sustained,期间多次抓 heap
  • 检查 debug 日志(TransformContext)
  • 根据 pprof 调整配置并重复测试

0%