Linux Host → Vector → Kafka → Python(收敛+Drain3+AI) → Kafka → ClickHouse / 告警
- 输入源:
- Docker 容器日志(
docker_logs) - Linux 系统日志文件(
/var/log/messages、/var/log/syslog、/var/log/auth.log、/var/log/kern.log) - Nginx Access/Error 日志文件(
/var/log/nginx/*.log)
- Docker 容器日志(
- 输出:
- Kafka(Vector 容器内地址
kafka:29092,topiclinux_raw_logs)
- Kafka(Vector 容器内地址
- 说明:
- 已移除
syslog网络输入,避免额外端口依赖 journald在部分容器环境不可用,当前采用文件日志 + docker logs 方案
- 已移除
ai_convergence_service.py:兼容入口(保持原启动方式)log_pipeline/config.py:集中配置加载log_pipeline/converger.py:收敛主流程编排log_pipeline/ai_analyzer.py:AI 分析(缓存/重试/降级)log_pipeline/template_extractor.py:模板提取(Drain3/正则回退)log_pipeline/nginx_behavior.py:Nginx访问日志按IP行为聚合(时间窗口统计)log_pipeline/clickhouse_sink.py:ClickHouse 缓冲与批量写入log_pipeline/notifier.py:Webhook 告警发送log_pipeline/commit_manager.py:offset 批量/定时提交策略log_pipeline/consumer_worker.py:消费循环与消息处理工作器log_pipeline/middleware.py:消费中间件钩子(过滤/审计/采样扩展)log_pipeline/middleware_examples.py:示例中间件(host 白名单、正则黑名单、审计)log_pipeline/runner.py:组件装配入口(装配 worker + commit manager)
docs/workflow.md:流程图 + 时序图 + 文字总览docs/workflow_flowchart.mmd:流程图源码docs/workflow_sequence.mmd:时序图源码docs/refactored_execution_flow.md:模块化拆分后的执行流程(入口、循环、扩展点)
- 初始化配置(推荐)
cp .env.template .env0.1 安装依赖(推荐使用 uv)
uv sync --no-dev兼容 pip:
python3 -m pip install -r requirements.txt- docker compose up -d kafka clickhouse vector
- (可选,本地 Ollama 模式)docker compose up -d ollama && docker exec -it $(docker compose ps -q ollama) ollama pull llama3.1:8b
- bash setup_ch.sh
- python ai_convergence_service.py
Python 服务默认本机直连:Kafka
localhost:9092、ClickHouselocalhost:8123ClickHouse 默认鉴权:CLICKHOUSE_USER=logai、CLICKHOUSE_PASSWORD=logai123456AI 分析支持远程 OpenAI 兼容接口;仅在AI_PROVIDER=ollama且未显式设置AI_BASE_URL时才默认本地http://localhost:11434/v1Vector 容器内默认连接 Kafka:kafka:29092
现在默认会自动加载项目根目录
.env,系统环境变量优先于.env。
- WINDOW_MINUTES(默认5):收敛窗口分钟数(建议压测/联调用1)
- MIN_COUNT_THRESHOLD(默认5):触发 AI 分析阈值
- MAX_SAMPLE_SIZE(默认3):样本日志保留条数
- KAFKA_GROUP_ID(默认ai-convergence-pipeline):消费组ID
- KAFKA_AUTO_OFFSET_RESET(默认latest):
latest或earliest - AI_PROVIDER(默认openai_compatible):
openai_compatible或ollama - AI_ANALYSIS_ENABLED(默认true):是否启用 AI 分析(
true/false) - AI_MODEL:模型名称(由目标模型服务决定)
- AI_BASE_URL:模型服务地址(如
https://your-llm-gateway/v1,官方 OpenAI 可留空) - AI_API_KEY:模型服务访问密钥
- AI_ORGANIZATION / AI_PROJECT(默认空):可选组织/项目标识
- AI_RETRY_TIMES(默认2)/ AI_TIMEOUT_SEC(默认15):AI 调用重试与超时
- BEHAVIOR_AI_ENABLED(默认true):是否启用 Nginx IP行为聚合的 AI 解释
- BEHAVIOR_MIN_REQUESTS(默认20):单窗口内单IP最小请求数,低于阈值仅统计不做AI
- BEHAVIOR_TOP_PATHS(默认5):保存/分析的Top路径数量
- SANITIZE_ENABLED(默认true):是否启用日志脱敏
- SANITIZE_MASK_IP(默认true):是否脱敏IP地址
- SANITIZE_MASK_CREDENTIALS(默认true):是否脱敏密码/token/key
- SANITIZE_EXTRA_RULES(默认空):自定义脱敏规则(
pattern=>replacement,多条用||分隔) - KAFKA_COMMIT_BATCH(默认100):批量提交 offset 条数
- KAFKA_COMMIT_INTERVAL_SEC(默认5):最迟提交间隔秒数
- KAFKA_TEST_TOPIC(默认linux_raw_logs):联调脚本注入测试日志 topic
- CLICKHOUSE_USER / CLICKHOUSE_PASSWORD(默认
logai/logai123456):ClickHouse 鉴权账号 - AI_CACHE_MAX_SIZE(默认1000)/ AI_CACHE_TTL_SEC(默认600):AI 结果缓存容量与TTL
- MIDDLEWARE_HOST_ALLOWLIST(默认空):按 host 白名单过滤(逗号分隔)
- MIDDLEWARE_MESSAGE_DENY_REGEX(默认空):按 message 正则黑名单过滤(
||分隔多个表达式) - MIDDLEWARE_AUDIT_ENABLED(默认false):开启示例中间件审计日志
- 在
ConsumerWorker中支持中间件钩子:before_decode(raw_message)after_decode(payload)(返回None可跳过处理)on_process_success(payload)on_process_error(raw_message, error)示例启动(白名单 + 审计):
MIDDLEWARE_HOST_ALLOWLIST="host-a,host-b" MIDDLEWARE_AUDIT_ENABLED=1 bash scripts/run_with_middleware.sh示例启动(白名单 + 正则黑名单 + 审计):
MIDDLEWARE_HOST_ALLOWLIST="host-a,host-b" MIDDLEWARE_MESSAGE_DENY_REGEX="healthcheck||heartbeat||debug noise" MIDDLEWARE_AUDIT_ENABLED=1 bash scripts/run_with_middleware.shuv sync --no-dev
bash scripts/e2e_smoke_test.sh可选参数示例:
WAIT_SECONDS=90 WINDOW_MINUTES=1 MIN_COUNT_THRESHOLD=10 TEST_HOST=smoke-host bash scripts/e2e_smoke_test.sh联调脚本关键参数:
KAFKA_BROKERS(默认localhost:9092)KAFKA_TEST_TOPIC(默认linux_raw_logs)KAFKA_GROUP_ID(默认自动生成)WINDOW_MINUTES/MIN_COUNT_THRESHOLDWAIT_SECONDS/KAFKA_WAIT_SECONDS/CH_WAIT_SECONDS说明:scripts/e2e_smoke_test.sh会读取.env作为默认值来源(仅填补未设置变量)。 远程模型示例:
AI_PROVIDER=openai_compatible \
AI_BASE_URL="https://your-llm-gateway/v1" \
AI_API_KEY="your_api_key" \
AI_MODEL="your_model_name" \
python ai_convergence_service.py本地 Ollama 示例:
AI_PROVIDER=ollama AI_MODEL=llama3.1:8b python ai_convergence_service.py关闭 AI 分析(仅做日志收敛,不调用模型):
AI_ANALYSIS_ENABLED=false python ai_convergence_service.py关闭 Nginx 行为 AI(保留行为统计入库):
BEHAVIOR_AI_ENABLED=false python ai_convergence_service.py关闭全部脱敏(原文入库,谨慎):
SANITIZE_ENABLED=false python ai_convergence_service.py仅关闭IP脱敏(保留凭据脱敏):
SANITIZE_MASK_IP=false SANITIZE_MASK_CREDENTIALS=true python ai_convergence_service.py自定义脱敏规则示例:
SANITIZE_EXTRA_RULES='sessionid=[^\\s;]+=>sessionid=[MASKED]||[A-Fa-f0-9]{32}=>[HEX32]' python ai_convergence_service.py失败自动诊断包(默认开启):
DIAG_ON_FAIL=1(默认)失败时自动抓取DIAG_OUTPUT_DIR=./diagnostics诊断包输出目录DIAG_RETENTION_DAYS=7自动清理超过N天的诊断目录和压缩包 单独手动抓取:
bash scripts/collect_diagnostics.sh --label manual --service-log /tmp/log_ai_service_smoke.log --test-host smoke-host --retention-days 7for i in {1..50}; do echo "$(date -Iseconds) sshd[1234]: Failed password for invalid user admin from 10.0.0.5 port 22" | logger -t sshd; sleep 0.2; donedocker exec -it $(docker compose ps -q clickhouse) clickhouse-client --user "${CLICKHOUSE_USER:-logai}" --password "${CLICKHOUSE_PASSWORD:-logai123456}" --query "SELECT host, event_pattern, count, ai_result FROM log_ai.converged_logs ORDER BY count DESC LIMIT 5 FORMAT Pretty"docker exec -it $(docker compose ps -q clickhouse) clickhouse-client --user "${CLICKHOUSE_USER:-logai}" --password "${CLICKHOUSE_PASSWORD:-logai123456}" --query "SELECT window, host, client_ip, request_count, unique_path_count, status_4xx, status_5xx, ai_analyzed, ai_result FROM log_ai.user_behavior_windows ORDER BY window DESC LIMIT 20 FORMAT Pretty"- 现象:
telnet localhost 8123可通,但程序报Authentication failed/REQUIRED_PASSWORD/Unexpected Http Driver Exception - 原因:端口连通仅表示 TCP 可达,不代表 ClickHouse HTTP 鉴权成功
- 旧数据卷修复(推荐):
docker exec -it clickhouse clickhouse-client --query "CREATE USER IF NOT EXISTS logai IDENTIFIED BY 'logai123456'; GRANT ALL ON log_ai.* TO logai;"- 然后确保
.env中:
CLICKHOUSE_USER=logai
CLICKHOUSE_PASSWORD=logai123456- 若你希望彻底重建(会清空 ClickHouse 数据):
docker compose down
docker volume rm log_analysis_local_ch-data
docker compose up -d clickhouse