PHP 实时日志采集与集中化
如何在 PHP 体系下实现实时日志采集、清洗、聚合与检索?本文结合物理机与 Kubernetes 两种运行环境,给出分阶段演进路线、最佳实践与可落地配置(EFK/PLG/ClickHouse/OTel),并附带 PHP 端具体实现建议。
架构概览
物理机 / VM 拓扑
flowchart TD
A["微服务 (物理机/VM)"] --> B["应用日志: JSON 文件 / journald"]
B --> C["宿主机 Agent: Fluent Bit / Vector / OTel Collector"]
C -->|"TLS+重试+背压"| D["Kafka (可选缓冲)"]
C --> E["处理/路由: Logstash / OTel Collector / Vector"]
D --> E
E --> F["Elasticsearch / OpenSearch"]
E --> G["ClickHouse"]
E --> H["Loki"]
F --> I["可视化: Kibana / Grafana"]
G --> I
H --> I
J["治理: 时间同步(NTP) + 结构化Schema + 脱敏/掩码"] -.-> E
Kubernetes 拓扑
flowchart TD
subgraph K8s["Kubernetes 集群"]
A1["Pod: 微服务容器"] --> B1["stdout/stderr (结构化JSON)"]
B1 --> C1["DaemonSet: Fluent Bit / Vector"]
A1 --> D1["Sidecar(可选): OTel Collector / Fluent Bit"]
C1 -->|"TLS+背压+限流"| E1["集群网关: OTel Collector / Ingress"]
end
E1 --> F1["Kafka / NATS / Redis Streams (缓冲)"]
E1 --> G1["Loki / Elasticsearch / ClickHouse"]
F1 --> H1["处理与路由: Logstash / Vector / OTel Collector"]
H1 --> G1
G1 --> I1["Grafana / Kibana / 自研埋点分析"]
J1["治理: 多租户、标签、索引生命周期、合规脱敏"] -.-> H1
演进阶段与适用场景
- 传统本地文件 + logrotate:单机/少量服务;简单但难检索、风险高。
- 中央化 Syslog(rsyslog/syslog-ng/journald):轻量汇聚,结构化不足。
- EFK/ELK:强检索与可视化;成本与治理压力需控制(ILM)。
- 容器化(stdout JSON + DaemonSet Agent):与 K8s 契合;注意 label 基数。
- 事件流驱动(Kafka/NATS 缓冲解耦):高吞吐可重放;链路更长。
- OpenTelemetry 一体化:标准 OTLP,日志/链路/指标互相关联。
- 成本/性能优化(Loki/ClickHouse/对象存储分层):性价比高,查询语义差异需适配。
通用最佳实践
- 结构化与上下文:统一 JSON Schema(timestamp、level、service、env、trace_id、span_id、request_id、user_id)。
- 安全合规:采集端脱敏/掩码;TLS 传输;RBAC;多租户隔离。
- 稳定性:本地缓冲、断点续传、退避重试、背压/限流、时钟同步。
- 成本与性能:控制日志级别与采样;热/温/冷分层;限制高基数字段。
- 运维治理:ILM/TTL;集中配置与热更新;SLO/告警(延迟、拒收、落库错误率)。
物理机/VM 最佳实践
推荐:应用输出 JSON 到文件或 journald;宿主机 Agent(Fluent Bit/Vector/OTel Collector)采集→可选 Kafka 缓冲→后端(OpenSearch/ClickHouse/Loki)。
Fluent Bit(Tail → Kafka → OpenSearch)
[SERVICE]
Flush 1
Parsers_File parsers.conf
Log_Level info
[INPUT]
Name tail
Path /var/log/app/*.log
Tag app.*
Multiline On
Parser docker
DB /var/lib/fluent-bit/tail.db
Mem_Buf_Limit 50MB
Skip_Long_Lines On
[FILTER]
Name modify
Match app.*
Add env prod
Add service myservice
[OUTPUT]
Name kafka
Match app.*
Brokers kafka-1:9092,kafka-2:9092
Topics logs.app
rdkafka.queue.buffering.max.ms 100
rdkafka.compression.codec lz4
rdkafka.security.protocol ssl
[OUTPUT]
Name es
Match app.*
Host opensearch.local
Port 9200
HTTP_User fluent
HTTP_Passwd xxxxxx
Logstash_Format On
tls On
Vector(Tail → 脱敏 → Loki)
[sources.app]
type = "file"
include = ["/var/log/app/*.log"]
ignore_older_secs = 86400
fingerprint.strategy = "device_and_inode"
[transforms.mask]
type = "remap"
inputs = ["app"]
source = '''
. = parse_json!(.message)
.email = replace!(.email, r'([\\w.%+-]+)@([\\w.-]+\\.[A-Za-z]{2,})', "***@***")
.credit_card = null
'''
[sinks.loki]
type = "loki"
inputs = ["mask"]
endpoint = "https://loki.local"
encoding.codec = "json"
labels = {service="myservice", env="prod"}
out_of_order_action = "accept"
logrotate(与 tail/采集器配合)
/var/log/app/*.log {
daily
rotate 7
compress
missingok
copytruncate
create 0640 app app
}
Kubernetes 最佳实践
原则:应用只输出 stdout JSON;使用 DaemonSet(Fluent Bit/Vector/Promtail)统一采集 /var/log/containers/*,自动附加 k8s 元数据;Sidecar 仅在需本地解析/脱敏时使用;开启本地缓冲、资源限额、背压与基数治理。
Fluent Bit DaemonSet(Containers → Loki)
[SERVICE]
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/containers/*.log
Tag kube.*
Parser docker
Docker_Mode On
Mem_Buf_Limit 100MB
DB /var/fluent-bit/tail.db
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Merge_Log On
Keep_Log Off
[OUTPUT]
Name loki
Match kube.*
Host loki-gateway
Port 3100
Labels job=fluentbit, env=prod, kubernetes['namespace_name'], kubernetes['container_name']
Auto_kubernetes_labels On
Promtail(自动发现 Pods)
scrape_configs:
- job_name: kubernetes-pods
pipeline_stages:
- docker: {}
- labeldrop:
- filename
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
target_label: app
OpenTelemetry Collector(OTLP/filelog → Kafka/Loki)
receivers:
otlp:
protocols: {http: {}, grpc: {}}
filelog:
include: [/var/log/containers/*.log]
operators:
- type: json_parser
processors:
batch: {}
attributes:
actions:
- key: env
value: prod
action: upsert
exporters:
kafka:
brokers: [kafka-1:9092]
topic: logs.app
loki:
endpoint: http://loki:3100/loki/api/v1/push
service:
pipelines:
logs/primary:
receivers: [filelog, otlp]
processors: [attributes, batch]
exporters: [kafka, loki]
可落地方案与模板
小团队低成本(PLG)
- Promtail/Fluent Bit → Loki → Grafana;控制标签基数,分层保留。
limits_config: retention_period: 168h compactor: working_directory: /data/compactor compaction_interval: 5m delete_request_cancel_period: 24h
中型团队检索优先(EFK + Kafka)
- Fluent Bit/Vector → Kafka → Logstash/OTel Collector → OpenSearch;Kafka 缓冲与多消费者,OpenSearch 做 ILM。
{ "policy": { "phases": { "hot": {"actions": {"rollover": {"max_size": "50gb", "max_age": "7d"}}}, "warm": {"actions": {"forcemerge": {"max_num_segments": 1}}}, "cold": {"min_age": "30d", "actions": {"freeze": {}}}, "delete": {"min_age": "90d", "actions": {"delete": {}}} } } }
海量吞吐/性价比(Vector + ClickHouse)
- Vector DS/Agent → Vector Aggregator → ClickHouse(MergeTree/表分区)。
CREATE TABLE logs.app ( ts DateTime CODEC(Delta, ZSTD), level LowCardinality(String), service LowCardinality(String), trace_id String, message String, k8s_namespace LowCardinality(String), labels Map(String, String) ) ENGINE = MergeTree PARTITION BY toDate(ts) ORDER BY (service, ts) TTL ts + INTERVAL 30 DAY DELETE SETTINGS index_granularity = 8192;
合规与审计
- 采集端脱敏(Vector remap/Fluent Bit grep),传输与落盘加密,审计日志 WORM/对象存储锁不可篡改。
边缘/离线
- 本地持久化缓冲,网络可用回传;启用压缩与节流;按优先级丢弃非关键日志。
PHP 实践与配置示例
目标
- 统一 JSON Schema;stdout(K8s)或文件/journald(VM);与链路追踪关联(trace_id/span_id);采集端脱敏与缓冲。
Monolog(stdout JSON + 关联上下文)
<?php
require 'vendor/autoload.php';
use Monolog\\Logger;
use Monolog\\Handler\\StreamHandler;
use Monolog\\Formatter\\JsonFormatter;
$logger = new Logger('app');
$handler = new StreamHandler('php://stdout', Logger::INFO);
$handler->setFormatter(new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false));
// 处理器:注入通用字段(建议从请求头、会话、OTel 上下文中提取)
$logger->pushProcessor(function (array $record) {
$record['extra']['env'] = getenv('APP_ENV') ?: 'prod';
$record['extra']['service'] = 'my-php-service';
$record['extra']['request_id'] = $_SERVER['HTTP_X_REQUEST_ID'] ?? null;
$record['extra']['trace_id'] = $_SERVER['HTTP_X_TRACE_ID'] ?? null; // 或从 OTel SDK 获取
$record['extra']['user_id'] = $_SESSION['user_id'] ?? null;
return $record;
});
$logger->pushHandler($handler);
// 示例
$logger->info('user login', ['user_id' => 123]);
$logger->error('db failed', ['error_code' => 'DB_CONN_TIMEOUT']);
PHP-FPM/Nginx(容器化)将错误与访问日志输出到 stdout/stderr
; php.ini
log_errors = On
; php-fpm.conf 或 www.conf
error_log = /proc/self/fd/2
; 可选:将 FPM 访问日志也导向 stdout/stderr
; access.log = /proc/self/fd/2
物理机(文件落盘)
- Monolog 将日志写入
/var/log/app/app.log,配合 logrotate;采集器使用 tail 指纹/offset 防重。
与 OpenTelemetry 关联
- 在反向代理或应用层透传
traceparent/baggage,在 Processor 中提取trace_id/span_id; - 采用 OTel PHP SDK(可选)向后端上报 Traces,与 Logs 通过共同字段实现互跳。
常见坑与自检清单
- 时间戳/时区混乱;多行堆栈未结构化;
- 采集器与 logrotate 不匹配导致丢失;
- 生产误开 DEBUG/TRACE 导致成本暴涨;
- 在业务线程直连日志后端造成阻塞;
- 标签高基数(Loki/Prometheus 类系统致命);
- 未做脱敏,泄露 PII/密钥;
- 无本地缓冲,网络抖动即丢;无 ILM/TTL 费用失控。
迁移与落地步骤
- 盘点与分层:统一 Schema 与追踪字段,按服务/环境/吞吐/保留需求分层。
- PoC:PLG/EFK/ClickHouse 各选一条链路做对比,回放历史日志估算成本与延迟。
- 渐进式上线:按业务域切流,灰度与采样并行,保留回滚通道。
- 治理与可视化:建立日志 SLO,Grafana/Kibana 看板与告警;
- 成本优化:ILM/TTL、热温冷分层、标签治理、对象存储归档。
简要结论:统一 JSON Schema 与追踪上下文是基础;stdout(K8s)/文件或 journald(VM)为入口,Agent 端做脱敏与缓冲;强检索选 OpenSearch,性价比选 Loki/ClickHouse,高可靠加 Kafka;OTel 统一日志/链路/指标可显著提升排障效率;生产环境务必控制标签基数、采样与 ILM/TTL,并启用背压与本地缓冲。