692 lines
24 KiB
Python
692 lines
24 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
离线分析上下文压缩管道表现。
|
||
|
||
从 Docker 日志中提取压缩相关的 tracing 结构化数据,
|
||
生成汇总报告,帮助评估五层压缩管道的实际效果。
|
||
|
||
使用方法:
|
||
python3 tools/analyze_compression.py logs/docker.log
|
||
python3 tools/analyze_compression.py --top 10 logs/docker.log
|
||
python3 tools/analyze_compression.py --csv output.csv logs/docker.log
|
||
python3 tools/analyze_compression.py --json logs/docker.log
|
||
cat logs/docker.log | python3 tools/analyze_compression.py -
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
import re
|
||
import sys
|
||
from collections import defaultdict
|
||
from dataclasses import asdict, dataclass, field
|
||
from typing import Any, Dict, List, Optional, Sequence
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ANSI 清理(复用 diagnose_improper_request.py 的模式)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# 覆盖常见 CSI 序列(含少见的 ':' 参数分隔符),避免污染 URL/字段解析。
|
||
ANSI_RE = re.compile(r"\x1b\[[0-9;:?]*[A-Za-z]")
|
||
|
||
|
||
def strip_ansi(s: str) -> str:
|
||
return ANSI_RE.sub("", s)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 时间戳提取
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# ISO 8601 时间戳(行首),兼容带/不带时区
|
||
TIMESTAMP_RE = re.compile(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})")
|
||
|
||
|
||
def extract_timestamp(line: str) -> Optional[str]:
|
||
"""提取行首 ISO 时间戳,返回秒级精度字符串或 None。"""
|
||
m = TIMESTAMP_RE.search(line[:40])
|
||
return m.group(1) if m else None
|
||
|
||
|
||
def hour_bucket(ts: str) -> str:
|
||
"""截取到小时:2025-01-15T10:23:45 -> 2025-01-15T10"""
|
||
return ts[:13]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# tracing key=value 解析
|
||
# ---------------------------------------------------------------------------
|
||
|
||
KV_RE = re.compile(r"(\w+)=(\d+(?:\.\d+)?|\"[^\"]*\"|[^\s,]+)")
|
||
|
||
|
||
def parse_kv(line: str) -> Dict[str, str]:
|
||
"""从 tracing 结构化行中提取所有 key=value 对。"""
|
||
return {m.group(1): m.group(2).strip('"') for m in KV_RE.finditer(line)}
|
||
|
||
|
||
def kv_int(kv: Dict[str, str], key: str, default: int = 0) -> int:
|
||
v = kv.get(key)
|
||
if v is None:
|
||
return default
|
||
try:
|
||
return int(v)
|
||
except ValueError:
|
||
return default
|
||
|
||
|
||
def kv_float(kv: Dict[str, str], key: str, default: float = 0.0) -> float:
|
||
v = kv.get(key)
|
||
if v is None:
|
||
return default
|
||
try:
|
||
return float(v)
|
||
except ValueError:
|
||
return default
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 数据模型
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class RequestRecord:
|
||
"""一次请求行的数据。"""
|
||
line_no: int
|
||
timestamp: Optional[str] = None
|
||
model: str = ""
|
||
max_tokens: int = 0
|
||
stream: bool = True
|
||
message_count: int = 0
|
||
estimated_input_tokens: int = 0
|
||
|
||
|
||
@dataclass
|
||
class CompressionRecord:
|
||
"""一次压缩统计行的数据。"""
|
||
line_no: int
|
||
timestamp: Optional[str] = None
|
||
estimated_input_tokens: int = 0
|
||
bytes_saved_total: int = 0
|
||
whitespace_bytes_saved: int = 0
|
||
thinking_bytes_saved: int = 0
|
||
tool_result_bytes_saved: int = 0
|
||
tool_use_input_bytes_saved: int = 0
|
||
history_turns_removed: int = 0
|
||
history_bytes_saved: int = 0
|
||
|
||
|
||
@dataclass
|
||
class ContextUsageRecord:
|
||
"""contextUsageEvent 行的数据。"""
|
||
line_no: int
|
||
context_usage_percentage: float = 0.0
|
||
actual_input_tokens: int = 0
|
||
|
||
|
||
@dataclass
|
||
class RejectionRecord:
|
||
"""上游拒绝行的数据。"""
|
||
line_no: int
|
||
kiro_request_body_bytes: int = 0
|
||
|
||
|
||
@dataclass
|
||
class AdaptiveShrinkRecord:
|
||
"""自适应二次压缩触发行的数据。"""
|
||
line_no: int
|
||
timestamp: Optional[str] = None
|
||
conversation_id: Optional[str] = None
|
||
initial_bytes: int = 0
|
||
final_bytes: int = 0
|
||
threshold: int = 0
|
||
iters: int = 0
|
||
additional_history_turns_removed: int = 0
|
||
|
||
|
||
@dataclass
|
||
class LocalRejectRecord:
|
||
"""本地超限拒绝行的数据。"""
|
||
line_no: int
|
||
timestamp: Optional[str] = None
|
||
conversation_id: Optional[str] = None
|
||
request_body_bytes: int = 0
|
||
image_bytes: int = 0
|
||
effective_bytes: int = 0
|
||
threshold: int = 0
|
||
|
||
|
||
@dataclass
|
||
class MergedRequest:
|
||
"""关联后的完整请求记录。"""
|
||
line_no: int = 0
|
||
timestamp: Optional[str] = None
|
||
model: str = ""
|
||
max_tokens: int = 0
|
||
stream: bool = True
|
||
message_count: int = 0
|
||
estimated_input_tokens: int = 0
|
||
# 压缩统计
|
||
bytes_saved_total: int = 0
|
||
whitespace_bytes_saved: int = 0
|
||
thinking_bytes_saved: int = 0
|
||
tool_result_bytes_saved: int = 0
|
||
tool_use_input_bytes_saved: int = 0
|
||
history_turns_removed: int = 0
|
||
history_bytes_saved: int = 0
|
||
has_compression: bool = False
|
||
# 上下文使用
|
||
context_usage_percentage: Optional[float] = None
|
||
actual_input_tokens: Optional[int] = None
|
||
# 压缩率
|
||
compression_rate: float = 0.0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 日志解析
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# 匹配标记
|
||
MARKER_REQUEST = "Received POST /v1/messages request"
|
||
MARKER_COMPRESSION = "输入压缩完成"
|
||
MARKER_CONTEXT_USAGE = "收到 contextUsageEvent"
|
||
MARKER_REJECTION = "上游拒绝请求:输入上下文过长"
|
||
MARKER_ADAPTIVE_SHRINK = "请求体超过阈值,已执行自适应二次压缩"
|
||
MARKER_LOCAL_REJECT = "请求体超过安全阈值,拒绝发送"
|
||
|
||
# contextUsageEvent 格式:收到 contextUsageEvent: 67.2%, 计算 input_tokens: 12345
|
||
CONTEXT_USAGE_RE = re.compile(
|
||
r"收到 contextUsageEvent:\s*([\d.]+)%.*?input_tokens:\s*(\d+)"
|
||
)
|
||
|
||
|
||
def parse_log(
|
||
lines: Sequence[str],
|
||
*,
|
||
min_tokens: int = 0,
|
||
model_pattern: Optional[str] = None,
|
||
) -> tuple[
|
||
list[MergedRequest],
|
||
list[RejectionRecord],
|
||
list[AdaptiveShrinkRecord],
|
||
list[LocalRejectRecord],
|
||
int,
|
||
]:
|
||
"""
|
||
解析日志行,返回 (merged_requests, rejections, total_lines)。
|
||
|
||
关联策略:连续出现的请求行和压缩统计行,
|
||
基于 estimated_input_tokens 匹配 + 行号邻近(间距 ≤ 50 行)。
|
||
"""
|
||
requests: list[RequestRecord] = []
|
||
compressions: list[CompressionRecord] = []
|
||
context_usages: list[ContextUsageRecord] = []
|
||
rejections: list[RejectionRecord] = []
|
||
adaptive_shrinks: list[AdaptiveShrinkRecord] = []
|
||
local_rejects: list[LocalRejectRecord] = []
|
||
|
||
model_re = re.compile(model_pattern, re.IGNORECASE) if model_pattern else None
|
||
|
||
for idx, raw_line in enumerate(lines):
|
||
line_no = idx + 1
|
||
line = strip_ansi(raw_line)
|
||
|
||
if MARKER_REQUEST in line:
|
||
kv = parse_kv(line)
|
||
model = kv.get("model", "")
|
||
if model_re and not model_re.search(model):
|
||
continue
|
||
est = kv_int(kv, "estimated_input_tokens")
|
||
if est < min_tokens:
|
||
continue
|
||
requests.append(RequestRecord(
|
||
line_no=line_no,
|
||
timestamp=extract_timestamp(line),
|
||
model=model,
|
||
max_tokens=kv_int(kv, "max_tokens"),
|
||
stream=kv.get("stream", "true") == "true",
|
||
message_count=kv_int(kv, "message_count"),
|
||
estimated_input_tokens=est,
|
||
))
|
||
|
||
elif MARKER_COMPRESSION in line:
|
||
kv = parse_kv(line)
|
||
est = kv_int(kv, "estimated_input_tokens")
|
||
if est < min_tokens:
|
||
continue
|
||
compressions.append(CompressionRecord(
|
||
line_no=line_no,
|
||
timestamp=extract_timestamp(line),
|
||
estimated_input_tokens=est,
|
||
bytes_saved_total=kv_int(kv, "bytes_saved_total"),
|
||
whitespace_bytes_saved=kv_int(kv, "whitespace_bytes_saved"),
|
||
thinking_bytes_saved=kv_int(kv, "thinking_bytes_saved"),
|
||
tool_result_bytes_saved=kv_int(kv, "tool_result_bytes_saved"),
|
||
tool_use_input_bytes_saved=kv_int(kv, "tool_use_input_bytes_saved"),
|
||
history_turns_removed=kv_int(kv, "history_turns_removed"),
|
||
history_bytes_saved=kv_int(kv, "history_bytes_saved"),
|
||
))
|
||
|
||
elif MARKER_CONTEXT_USAGE in line:
|
||
m = CONTEXT_USAGE_RE.search(line)
|
||
if m:
|
||
context_usages.append(ContextUsageRecord(
|
||
line_no=line_no,
|
||
context_usage_percentage=float(m.group(1)),
|
||
actual_input_tokens=int(m.group(2)),
|
||
))
|
||
|
||
elif MARKER_REJECTION in line:
|
||
kv = parse_kv(line)
|
||
rejections.append(RejectionRecord(
|
||
line_no=line_no,
|
||
kiro_request_body_bytes=kv_int(kv, "kiro_request_body_bytes"),
|
||
))
|
||
|
||
elif MARKER_ADAPTIVE_SHRINK in line:
|
||
kv = parse_kv(line)
|
||
adaptive_shrinks.append(AdaptiveShrinkRecord(
|
||
line_no=line_no,
|
||
timestamp=extract_timestamp(line),
|
||
conversation_id=kv.get("conversation_id"),
|
||
initial_bytes=kv_int(kv, "initial_bytes"),
|
||
final_bytes=kv_int(kv, "final_bytes"),
|
||
threshold=kv_int(kv, "threshold"),
|
||
iters=kv_int(kv, "iters"),
|
||
additional_history_turns_removed=kv_int(kv, "additional_history_turns_removed"),
|
||
))
|
||
|
||
elif MARKER_LOCAL_REJECT in line:
|
||
kv = parse_kv(line)
|
||
local_rejects.append(LocalRejectRecord(
|
||
line_no=line_no,
|
||
timestamp=extract_timestamp(line),
|
||
conversation_id=kv.get("conversation_id"),
|
||
request_body_bytes=kv_int(kv, "request_body_bytes"),
|
||
image_bytes=kv_int(kv, "image_bytes"),
|
||
effective_bytes=kv_int(kv, "effective_bytes"),
|
||
threshold=kv_int(kv, "threshold"),
|
||
))
|
||
|
||
# --- 关联请求行与压缩统计行 ---
|
||
merged = _merge_records(requests, compressions, context_usages)
|
||
|
||
return merged, rejections, adaptive_shrinks, local_rejects, len(lines)
|
||
|
||
|
||
def _merge_records(
|
||
requests: list[RequestRecord],
|
||
compressions: list[CompressionRecord],
|
||
context_usages: list[ContextUsageRecord],
|
||
) -> list[MergedRequest]:
|
||
"""
|
||
关联请求行与压缩统计行。
|
||
|
||
策略:对每个请求行,在其后 50 行内查找 estimated_input_tokens 相同的压缩统计行。
|
||
"""
|
||
merged: list[MergedRequest] = []
|
||
used_comp_indices: set[int] = set()
|
||
used_ctx_indices: set[int] = set()
|
||
|
||
for req in requests:
|
||
mr = MergedRequest(
|
||
line_no=req.line_no,
|
||
timestamp=req.timestamp,
|
||
model=req.model,
|
||
max_tokens=req.max_tokens,
|
||
stream=req.stream,
|
||
message_count=req.message_count,
|
||
estimated_input_tokens=req.estimated_input_tokens,
|
||
)
|
||
|
||
# 查找匹配的压缩统计行
|
||
for ci, comp in enumerate(compressions):
|
||
if ci in used_comp_indices:
|
||
continue
|
||
# 行号邻近(压缩行在请求行之后 50 行内)
|
||
if not (0 < comp.line_no - req.line_no <= 50):
|
||
continue
|
||
# estimated_input_tokens 匹配
|
||
if comp.estimated_input_tokens != req.estimated_input_tokens:
|
||
continue
|
||
# 匹配成功
|
||
mr.bytes_saved_total = comp.bytes_saved_total
|
||
mr.whitespace_bytes_saved = comp.whitespace_bytes_saved
|
||
mr.thinking_bytes_saved = comp.thinking_bytes_saved
|
||
mr.tool_result_bytes_saved = comp.tool_result_bytes_saved
|
||
mr.tool_use_input_bytes_saved = comp.tool_use_input_bytes_saved
|
||
mr.history_turns_removed = comp.history_turns_removed
|
||
mr.history_bytes_saved = comp.history_bytes_saved
|
||
mr.has_compression = True
|
||
used_comp_indices.add(ci)
|
||
break
|
||
|
||
# 查找匹配的 contextUsageEvent(在请求行之后 500 行内)
|
||
for ui, ctx in enumerate(context_usages):
|
||
if ui in used_ctx_indices:
|
||
continue
|
||
if not (0 < ctx.line_no - req.line_no <= 500):
|
||
continue
|
||
mr.context_usage_percentage = ctx.context_usage_percentage
|
||
mr.actual_input_tokens = ctx.actual_input_tokens
|
||
used_ctx_indices.add(ui)
|
||
break
|
||
|
||
# 计算压缩率(基于估算 token 数,假设 1 token ≈ 4 bytes)
|
||
if mr.estimated_input_tokens > 0 and mr.bytes_saved_total > 0:
|
||
estimated_bytes = mr.estimated_input_tokens * 4
|
||
mr.compression_rate = mr.bytes_saved_total / estimated_bytes * 100
|
||
|
||
merged.append(mr)
|
||
|
||
return merged
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 统计计算
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def median(values: list[float]) -> float:
|
||
if not values:
|
||
return 0.0
|
||
s = sorted(values)
|
||
n = len(s)
|
||
if n % 2 == 1:
|
||
return s[n // 2]
|
||
return (s[n // 2 - 1] + s[n // 2]) / 2
|
||
|
||
|
||
def percentile(values: list[float], p: float) -> float:
|
||
if not values:
|
||
return 0.0
|
||
s = sorted(values)
|
||
k = (len(s) - 1) * p / 100
|
||
f = int(k)
|
||
c = f + 1 if f + 1 < len(s) else f
|
||
return s[f] + (s[c] - s[f]) * (k - f)
|
||
|
||
|
||
def fmt_bytes(n: int) -> str:
|
||
"""格式化字节数为人类可读形式。"""
|
||
if n >= 1_000_000:
|
||
return f"{n:,} ({n / 1_000_000:.1f} MB)"
|
||
if n >= 1_000:
|
||
return f"{n:,} ({n / 1_000:.1f} KB)"
|
||
return f"{n:,}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 报告生成
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def generate_report(
|
||
merged: list[MergedRequest],
|
||
rejections: list[RejectionRecord],
|
||
adaptive_shrinks: list[AdaptiveShrinkRecord],
|
||
local_rejects: list[LocalRejectRecord],
|
||
total_lines: int,
|
||
*,
|
||
top_n: int = 5,
|
||
) -> str:
|
||
"""生成文本格式的分析报告。"""
|
||
lines: list[str] = []
|
||
w = lines.append
|
||
|
||
w("=== 上下文压缩分析报告 ===")
|
||
w("")
|
||
w(f"扫描行数: {total_lines:,}")
|
||
w(f"匹配请求: {len(merged)}")
|
||
with_comp = [r for r in merged if r.has_compression]
|
||
w(f"有压缩统计: {len(with_comp)}")
|
||
w("")
|
||
|
||
if not with_comp:
|
||
w("未找到压缩统计数据。")
|
||
return "\n".join(lines)
|
||
|
||
# --- 总体概览 ---
|
||
total_saved = sum(r.bytes_saved_total for r in with_comp)
|
||
avg_saved = total_saved // len(with_comp) if with_comp else 0
|
||
rates = [r.compression_rate for r in with_comp if r.compression_rate > 0]
|
||
median_rate = median(rates)
|
||
|
||
w("--- 总体概览 ---")
|
||
w(f"总节省字节: {fmt_bytes(total_saved)}")
|
||
w(f"平均每请求节省: {avg_saved:,} bytes")
|
||
w(f"压缩率中位数: {median_rate:.1f}%")
|
||
w("")
|
||
|
||
# --- 各层贡献 ---
|
||
ws_total = sum(r.whitespace_bytes_saved for r in with_comp)
|
||
th_total = sum(r.thinking_bytes_saved for r in with_comp)
|
||
tr_total = sum(r.tool_result_bytes_saved for r in with_comp)
|
||
tu_total = sum(r.tool_use_input_bytes_saved for r in with_comp)
|
||
hi_total = sum(r.history_bytes_saved for r in with_comp)
|
||
|
||
def layer_line(name: str, val: int) -> str:
|
||
pct = val / total_saved * 100 if total_saved > 0 else 0
|
||
avg = val // len(with_comp) if with_comp else 0
|
||
return f" {name:<18}{val:>12,} bytes ({pct:>5.1f}%) avg {avg:,}/req"
|
||
|
||
w("--- 各层贡献 ---")
|
||
w(layer_line("空白压缩:", ws_total))
|
||
w(layer_line("thinking 截断:", th_total))
|
||
w(layer_line("tool_result:", tr_total))
|
||
w(layer_line("tool_use_input:", tu_total))
|
||
w(layer_line("历史截断:", hi_total))
|
||
w("")
|
||
|
||
# --- 历史截断详情 ---
|
||
with_history = [r for r in with_comp if r.history_turns_removed > 0]
|
||
w("--- 历史截断详情 ---")
|
||
w(f"触发历史截断的请求: {len(with_history)}/{len(with_comp)} ({len(with_history)/len(with_comp)*100:.1f}%)")
|
||
if with_history:
|
||
turns = [r.history_turns_removed for r in with_history]
|
||
w(f"平均移除轮数: {sum(turns)/len(turns):.1f}")
|
||
w(f"最大移除轮数: {max(turns)}")
|
||
w("")
|
||
|
||
# --- 上下文窗口使用 ---
|
||
with_ctx = [r for r in merged if r.context_usage_percentage is not None]
|
||
w("--- 上下文窗口使用 (contextUsageEvent) ---")
|
||
if with_ctx:
|
||
usages = [r.context_usage_percentage for r in with_ctx]
|
||
avg_usage = sum(usages) / len(usages)
|
||
over_80 = sum(1 for u in usages if u > 80)
|
||
over_95 = sum(1 for u in usages if u > 95)
|
||
overflow = sum(1 for u in usages if u >= 100)
|
||
w(f"平均使用率: {avg_usage:.1f}%")
|
||
w(f">80% 使用率的请求: {over_80} ({over_80/len(with_ctx)*100:.1f}%)")
|
||
w(f">95% 使用率的请求: {over_95} ({over_95/len(with_ctx)*100:.1f}%)")
|
||
w(f"100% (溢出): {overflow} ({overflow/len(with_ctx)*100:.1f}%)")
|
||
else:
|
||
w("无 contextUsageEvent 数据(需要 DEBUG 日志级别)")
|
||
w("")
|
||
|
||
# --- 上游拒绝 ---
|
||
w("--- 上游拒绝 ---")
|
||
w(f"输入过长拒绝: {len(rejections)} 次")
|
||
w("")
|
||
|
||
# --- 自适应二次压缩 ---
|
||
w("--- 自适应二次压缩 ---")
|
||
w(f"触发次数: {len(adaptive_shrinks)}")
|
||
if adaptive_shrinks:
|
||
initial_avg = sum(r.initial_bytes for r in adaptive_shrinks) // len(adaptive_shrinks)
|
||
final_avg = sum(r.final_bytes for r in adaptive_shrinks) // len(adaptive_shrinks)
|
||
iters_avg = sum(r.iters for r in adaptive_shrinks) / len(adaptive_shrinks)
|
||
hist_avg = sum(r.additional_history_turns_removed for r in adaptive_shrinks) / len(adaptive_shrinks)
|
||
w(f"平均压缩前: {fmt_bytes(initial_avg)}")
|
||
w(f"平均压缩后: {fmt_bytes(final_avg)}")
|
||
w(f"平均迭代次数: {iters_avg:.1f}")
|
||
w(f"平均额外移除轮数: {hist_avg:.1f}")
|
||
w("")
|
||
|
||
# --- 本地拒绝(请求体超限) ---
|
||
w("--- 本地拒绝 (请求体超限) ---")
|
||
w(f"拒绝发送: {len(local_rejects)} 次")
|
||
if local_rejects:
|
||
top = sorted(local_rejects, key=lambda r: r.effective_bytes, reverse=True)[:5]
|
||
for r in top:
|
||
w(
|
||
" line={line} effective={eff} threshold={th} body={body} image={img} conversationId={cid}".format(
|
||
line=r.line_no,
|
||
eff=r.effective_bytes,
|
||
th=r.threshold,
|
||
body=r.request_body_bytes,
|
||
img=r.image_bytes,
|
||
cid=r.conversation_id or "None",
|
||
)
|
||
)
|
||
w("")
|
||
|
||
# --- 高压缩请求 TOP-N ---
|
||
sorted_by_saved = sorted(with_comp, key=lambda r: r.bytes_saved_total, reverse=True)
|
||
w(f"--- 高压缩请求 TOP-{top_n} ---")
|
||
for i, r in enumerate(sorted_by_saved[:top_n], 1):
|
||
w(f" #{i} line={r.line_no} saved={r.bytes_saved_total:,} rate={r.compression_rate:.1f}% model={r.model} tokens={r.estimated_input_tokens:,}")
|
||
w("")
|
||
|
||
# --- 低效/无压缩请求样本 ---
|
||
no_comp = [r for r in with_comp if r.bytes_saved_total == 0]
|
||
w("--- 低效/无压缩请求样本 ---")
|
||
if no_comp:
|
||
for r in no_comp[:5]:
|
||
w(f" line={r.line_no} saved=0 tokens={r.estimated_input_tokens:,} message_count={r.message_count}")
|
||
else:
|
||
w(" (无)")
|
||
w("")
|
||
|
||
# --- 时间趋势 ---
|
||
hourly: Dict[str, list[MergedRequest]] = defaultdict(list)
|
||
for r in with_comp:
|
||
if r.timestamp:
|
||
hourly[hour_bucket(r.timestamp)].append(r)
|
||
|
||
if hourly:
|
||
w("--- 时间趋势 (按小时) ---")
|
||
for hour in sorted(hourly.keys()):
|
||
reqs = hourly[hour]
|
||
avg_s = sum(r.bytes_saved_total for r in reqs) // len(reqs)
|
||
ctx_reqs = [r for r in reqs if r.context_usage_percentage is not None]
|
||
avg_ctx = sum(r.context_usage_percentage for r in ctx_reqs) / len(ctx_reqs) if ctx_reqs else 0
|
||
ctx_str = f" avg_context_usage={avg_ctx:.1f}%" if ctx_reqs else ""
|
||
w(f" {hour}: requests={len(reqs)} avg_saved={avg_s:,}{ctx_str}")
|
||
w("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def generate_json_report(
|
||
merged: list[MergedRequest],
|
||
rejections: list[RejectionRecord],
|
||
adaptive_shrinks: list[AdaptiveShrinkRecord],
|
||
local_rejects: list[LocalRejectRecord],
|
||
total_lines: int,
|
||
) -> str:
|
||
"""生成 JSON 格式的汇总报告。"""
|
||
with_comp = [r for r in merged if r.has_compression]
|
||
total_saved = sum(r.bytes_saved_total for r in with_comp)
|
||
|
||
report = {
|
||
"total_lines": total_lines,
|
||
"matched_requests": len(merged),
|
||
"with_compression": len(with_comp),
|
||
"total_bytes_saved": total_saved,
|
||
"avg_bytes_saved": total_saved // len(with_comp) if with_comp else 0,
|
||
"layers": {
|
||
"whitespace": sum(r.whitespace_bytes_saved for r in with_comp),
|
||
"thinking": sum(r.thinking_bytes_saved for r in with_comp),
|
||
"tool_result": sum(r.tool_result_bytes_saved for r in with_comp),
|
||
"tool_use_input": sum(r.tool_use_input_bytes_saved for r in with_comp),
|
||
"history": sum(r.history_bytes_saved for r in with_comp),
|
||
},
|
||
"rejections": len(rejections),
|
||
"adaptive_shrinks": len(adaptive_shrinks),
|
||
"local_rejects": len(local_rejects),
|
||
}
|
||
return json.dumps(report, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def write_csv(merged: list[MergedRequest], path: str) -> None:
|
||
"""导出每条请求的明细为 CSV。"""
|
||
fieldnames = [
|
||
"line_no", "timestamp", "model", "max_tokens", "stream",
|
||
"message_count", "estimated_input_tokens", "bytes_saved_total",
|
||
"whitespace_bytes_saved", "thinking_bytes_saved",
|
||
"tool_result_bytes_saved", "tool_use_input_bytes_saved",
|
||
"history_turns_removed", "history_bytes_saved",
|
||
"compression_rate", "context_usage_percentage", "actual_input_tokens",
|
||
]
|
||
with open(path, "w", newline="", encoding="utf-8") as f:
|
||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
for r in merged:
|
||
row = asdict(r)
|
||
row = {k: row[k] for k in fieldnames}
|
||
writer.writerow(row)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI 入口
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def main(argv: list[str]) -> int:
|
||
parser = argparse.ArgumentParser(
|
||
description="分析上下文压缩管道表现",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=__doc__,
|
||
)
|
||
parser.add_argument(
|
||
"logfile", nargs="?", default="logs/docker.log",
|
||
help="日志文件路径,使用 '-' 从 stdin 读取(默认: logs/docker.log)"
|
||
)
|
||
parser.add_argument("--top", type=int, default=5, help="高压缩请求 TOP-N(默认: 5)")
|
||
parser.add_argument("--csv", metavar="FILE", help="导出每条请求的明细为 CSV")
|
||
parser.add_argument("--json", action="store_true", help="JSON 格式输出汇总")
|
||
parser.add_argument("--min-tokens", type=int, default=0, help="仅分析 estimated_input_tokens >= N 的请求")
|
||
parser.add_argument("--model", metavar="PATTERN", help="按模型名过滤(正则)")
|
||
args = parser.parse_args(argv)
|
||
|
||
# 读取日志
|
||
if args.logfile == "-":
|
||
log_lines = sys.stdin.read().splitlines()
|
||
else:
|
||
try:
|
||
with open(args.logfile, "r", encoding="utf-8", errors="replace") as f:
|
||
log_lines = f.read().splitlines()
|
||
except FileNotFoundError:
|
||
print(f"ERROR: 日志文件不存在: {args.logfile}", file=sys.stderr)
|
||
return 2
|
||
|
||
# 解析
|
||
merged, rejections, adaptive_shrinks, local_rejects, total_lines = parse_log(
|
||
log_lines,
|
||
min_tokens=args.min_tokens,
|
||
model_pattern=args.model,
|
||
)
|
||
|
||
# 输出
|
||
if args.json:
|
||
print(generate_json_report(merged, rejections, adaptive_shrinks, local_rejects, total_lines))
|
||
else:
|
||
print(generate_report(merged, rejections, adaptive_shrinks, local_rejects, total_lines, top_n=args.top))
|
||
|
||
# CSV 导出
|
||
if args.csv:
|
||
write_csv(merged, args.csv)
|
||
print(f"CSV 已导出: {args.csv}", file=sys.stderr)
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main(sys.argv[1:]))
|