跳到主要内容

InfluxDB

问题

InfluxDB 是什么?它的架构和使用方式是怎样的?

答案

一、InfluxDB 简介

InfluxDB 是目前最流行的开源时序数据库,由 InfluxData 公司开发。它专门处理高写入负载的时间序列数据,广泛用于监控、IoT 和实时分析。

版本特点
InfluxDB 1.x自定义 TSM 引擎,InfluxQL 查询语言
InfluxDB 2.x统一平台(数据库 + UI + 任务 + 告警),Flux 查询语言
InfluxDB 3.x基于 Apache Arrow + DataFusion,SQL 查询,性能大幅提升

二、核心概念

概念对应 MySQL说明
BucketDatabase数据存储容器,带保留策略
MeasurementTable度量名称
Tag索引列标签键值对,自动索引
Field数据列实际数据值,不索引
PointRow一条时序数据记录
SeriesMeasurement + Tags 的唯一组合
示例 Point:
measurement: cpu_usage
tags: host=server1, region=us-east
fields: value=85.3, idle=14.7
timestamp: 2024-01-15T10:30:00Z
Tag vs Field
  • Tag:自动索引,用于 WHERE 过滤和 GROUP BY,存字符串
  • Field:不索引,存实际数值

错误示例:把 user_id(高基数值)设为 Tag → 索引爆炸,性能急剧下降

三、写入

Line Protocol(行协议)

InfluxDB 使用 Line Protocol 格式写入数据:

# 格式:measurement,tag1=v1,tag2=v2 field1=v1,field2=v2 timestamp
cpu_usage,host=server1,region=us-east value=85.3,idle=14.7 1705312200000000000
cpu_usage,host=server2,region=us-west value=72.1,idle=27.9 1705312200000000000

API 写入

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(
url="http://localhost:8086",
token="your-token",
org="your-org"
)
write_api = client.write_api(write_options=SYNCHRONOUS)

# 写入单条数据
point = Point("cpu_usage") \
.tag("host", "server1") \
.tag("region", "us-east") \
.field("value", 85.3) \
.field("idle", 14.7)

write_api.write(bucket="monitoring", record=point)

# 批量写入
points = [
Point("cpu_usage").tag("host", f"server{i}").field("value", 80 + i)
for i in range(100)
]
write_api.write(bucket="monitoring", record=points)

四、查询

Flux 查询语言(2.x)

// 查询过去 1 小时的 CPU 使用率,按 5 分钟聚合
from(bucket: "monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> filter(fn: (r) => r.host == "server1")
|> aggregateWindow(every: 5m, fn: mean)
|> yield(name: "mean_cpu")

SQL 查询(3.x)

-- InfluxDB 3.x 支持标准 SQL
SELECT
DATE_BIN(INTERVAL '5 minutes', time) AS window,
host,
AVG(value) AS avg_cpu,
MAX(value) AS max_cpu
FROM cpu_usage
WHERE time > NOW() - INTERVAL '1 hour'
AND host = 'server1'
GROUP BY window, host
ORDER BY window DESC;

五、数据保留与降采样

# 创建 Bucket 时指定保留策略
bucket_api = client.buckets_api()
bucket_api.create_bucket(
bucket_name="monitoring",
retention_rules=[
{"type": "expire", "everySeconds": 7 * 86400} # 7 天后自动删除
],
org="your-org"
)

降采样任务(2.x Flux Task):

// 每 5 分钟执行一次:将原始数据聚合后写入长期存储 Bucket
option task = {name: "downsample_cpu", every: 5m}

from(bucket: "monitoring")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "monitoring_longterm")

六、与监控生态集成

  • Telegraf:InfluxData 开发的数据采集 Agent,支持 200+ 输入插件
  • Grafana:与 InfluxDB 天然集成,提供丰富的仪表盘
  • Kapacitor / InfluxDB Alerts:告警引擎

七、InfluxDB vs Prometheus

对比InfluxDBPrometheus
数据采集Push(数据推送到 InfluxDB)Pull(Prometheus 拉取数据)
查询语言Flux / SQLPromQL
长期存储原生支持需要 Thanos/Cortex
高可用企业版支持集群需要外部方案
适合场景通用时序数据、IoT云原生监控
学习成本中(Flux 语法)中(PromQL)

常见面试问题

Q1: InfluxDB 的 TSM 引擎和 LSM-Tree 有什么区别?

答案

TSM 是 InfluxDB 基于 LSM-Tree 思想的时序优化版:

对比LSM-Tree(通用)TSM(时序优化)
Key 排序按 Key 字典序按 Series Key + 时间排序
数据压缩通用压缩Delta-of-Delta + XOR
CompactionL0→L1→L2 多级按时间范围合并
索引Bloom FilterTSI(Time Series Index)

TSM 利用时序数据的时间有序性,在压缩和查询上做了专门优化。

Q2: Tag 和 Field 应该如何选择?

答案

准则用 Tag用 Field
是否用于过滤(WHERE)
是否用于分组(GROUP BY)
基数(不同值的数量)低(< 10000)不限
数据类型只能是字符串数值、字符串、布尔

错误做法:将 user_idtrace_id 等高基数值设为 Tag → 导致 Series 爆炸。

Q3: InfluxDB 如何处理高写入场景?

答案

  1. WAL + 内存缓存:写入先写 WAL(持久性),再写内存(速度)
  2. 批量写入:客户端 SDK 默认攒批写入(如 5000 条/批)
  3. 压缩:TSM 引擎的极致压缩减少磁盘 I/O
  4. 无索引 Field:Field 不建索引,减少写入开销
  5. 只追加不修改:时序数据天然只追加

InfluxDB 2.x 单机可达 50 万+ points/秒的写入吞吐。

相关链接