版权归属于 LangChat Team
官网:https://langchat.cn
16 - 可观测性
版本说明
本文档基于 LangChain4j 1.10.0 版本编写。学习目标
通过本章节学习,你将能够:- 理解可观测性的概念和重要性
- 掌握 LangChain4j 可观测性组件
- 学会配置和启用监控功能
- 理解分布式追踪的概念
- 掌握性能指标和监控
- 实现一个完整的可观测性解决方案
前置知识
- 完成《01 - LangChain4j 简介》章节
- 完成《02 - 你的第一个 Chat 应用》章节
- 完成《15 - 日志》章节(强烈推荐)
核心概念
什么是可观测性?
可观测性是指通过日志、指标、追踪三种信号,从系统外部观察和分析系统内部状态的能力。 可观测性三支柱:Copy
┌─────────────────────────────────────────────────────────────────┐
│ 可观测性 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 日志 │ 指标 │ 追踪 │
│ (Logs) │ (Metrics) │ (Tracing) │
│ │
│ • 发生了什么 │ • 发生了多少 │ • 在哪里发生 │
│ • 时间戳 │ • 统计数据 │ • 请求链路 │
│ • 错误消息 │ • 性能计数器 │ • 分布式上下文 │
│ │
└─────────────────────────────────────────────────────────────────┘
为什么需要可观测性?
对于 LLM 应用,可观测性尤为重要:- 成本控制 - 监控 Token 使用量和 API 费用
- 性能优化 - 识别慢查询和瓶颈
- 问题排查 - 快速定位和解决错误
- 用户体验 - 监控响应时间和可用性
- 合规审计 - 记录敏感操作和数据访问
- 容量规划 - 基于使用模式进行资源规划
LangChain4j 可观测性组件
内置监控支持
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.model.chat.response.ChatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LangChain4j 可观测性示例
*/
public class LangChain4jObservability {
private static final Logger logger = LoggerFactory.getLogger(LangChain4jObservability.class);
/**
* 启用内置监控的模型
*/
public ChatModel createMonitoredModel(String apiKey) {
return OpenAiChatModel.builder()
.apiKey(apiKey)
.modelName("gpt-4o-mini")
.logRequests(true) // 记录请求
.logResponses(true) // 记录响应
.logStreamingResponses(true) // 记录流式响应
.maxRetries(3) // 自动重试
.maxTokens(1000) // Token 限制
.temperature(0.7) // 温度
.build();
}
/**
* 生成带监控的聊天
*/
public void chatWithMonitoring(ChatModel model, String userMessage) {
logger.info("开始聊天,用户消息: {}", userMessage);
// 生成响应(日志会自动记录)
ChatResponse response = model.chat(userMessage);
// 记录响应统计
logger.info("聊天完成 - 耗时: {}ms, Token: {}",
response.duration().toMillis(),
response.tokenUsage());
System.out.println("AI: " + response.aiMessage().text());
}
public static void main(String[] args) {
LangChain4jObservability observability = new LangChain4jObservability();
ChatModel model = observability.createMonitoredModel(
System.getenv("OPENAI_API_KEY")
);
observability.chatWithMonitoring(model, "你好,请介绍一下自己");
}
}
指标收集
自定义指标收集器
Copy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 指标收集器
*/
public class MetricsCollector {
private static final Logger logger = LoggerFactory.getLogger(MetricsCollector.class);
// 计数器
private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong successfulRequests = new AtomicLong(0);
private final AtomicLong failedRequests = new AtomicLong(0);
private final AtomicLong totalInputTokens = new AtomicLong(0);
private final AtomicLong totalOutputTokens = new AtomicLong(0);
// 直方图(响应时间分布)
private final AtomicLong[] responseTimeHistogram = new AtomicLong[10];
private final long[] histogramBuckets = {100, 200, 500, 1000, 2000, 5000, 10000, Long.MAX_VALUE};
// 操作级统计
private final ConcurrentHashMap<String, OperationStats> operationStats = new ConcurrentHashMap<>();
/**
* 记录请求
*/
public void recordRequest(String operation) {
totalRequests.incrementAndGet();
OperationStats stats = operationStats.computeIfAbsent(operation, k -> new OperationStats());
stats.recordRequest();
logger.debug("记录请求: {}", operation);
}
/**
* 记录成功响应
*/
public void recordSuccess(String operation, long durationMs, int inputTokens, int outputTokens) {
successfulRequests.incrementAndGet();
totalInputTokens.addAndGet(inputTokens);
totalOutputTokens.addAndGet(outputTokens);
// 记录到直方图
int histogramIndex = getHistogramIndex(durationMs);
responseTimeHistogram[histogramIndex].incrementAndGet();
// 记录操作统计
OperationStats stats = operationStats.computeIfAbsent(operation, k -> new OperationStats());
stats.recordSuccess(durationMs, inputTokens, outputTokens);
logger.info("记录成功响应 - 操作: {}, 耗时: {}ms, 输入: {}, 输出: {}",
operation, durationMs, inputTokens, outputTokens);
}
/**
* 记录失败响应
*/
public void recordFailure(String operation, long durationMs, String error) {
failedRequests.incrementAndGet();
OperationStats stats = operationStats.computeIfAbsent(operation, k -> new OperationStats());
stats.recordFailure(durationMs);
logger.warn("记录失败响应 - 操作: {}, 耗时: {}ms, 错误: {}",
operation, durationMs, error);
}
/**
* 获取指标摘要
*/
public MetricsSummary getMetricsSummary() {
return new MetricsSummary(
totalRequests.get(),
successfulRequests.get(),
failedRequests.get(),
totalInputTokens.get(),
totalOutputTokens.get(),
calculateSuccessRate(),
calculateAverageResponseTime(),
getResponseTimePercentiles()
);
}
/**
* 重置所有指标
*/
public void reset() {
totalRequests.set(0);
successfulRequests.set(0);
failedRequests.set(0);
totalInputTokens.set(0);
totalOutputTokens.set(0);
for (AtomicLong counter : responseTimeHistogram) {
counter.set(0);
}
operationStats.clear();
logger.info("所有指标已重置");
}
/**
* 获取直方图索引
*/
private int getHistogramIndex(long durationMs) {
for (int i = 0; i < histogramBuckets.length; i++) {
if (durationMs < histogramBuckets[i]) {
return i;
}
}
return histogramBuckets.length - 1;
}
/**
* 计算成功率
*/
private double calculateSuccessRate() {
long total = totalRequests.get();
if (total == 0) {
return 0.0;
}
return (double) successfulRequests.get() / total;
}
/**
* 计算平均响应时间
*/
private double calculateAverageResponseTime() {
long totalDuration = 0;
long count = 0;
for (OperationStats stats : operationStats.values()) {
totalDuration += stats.getTotalDuration();
count += stats.getRequestCount();
}
return count > 0 ? (double) totalDuration / count : 0.0;
}
/**
* 获取响应时间百分位数
*/
private Percentiles getResponseTimePercentiles() {
return new Percentiles(
calculatePercentile(50),
calculatePercentile(90),
calculatePercentile(95),
calculatePercentile(99)
);
}
/**
* 计算百分位数
*/
private long calculatePercentile(int percentile) {
long[] allDurations = operationStats.values().stream()
.mapToLong(stats -> stats.getAverageDuration())
.sorted()
.toArray();
if (allDurations.length == 0) {
return 0;
}
int index = (int) Math.ceil(percentile / 100.0 * allDurations.length) - 1;
return allDurations[Math.min(index, allDurations.length - 1)];
}
/**
* 操作统计
*/
public static class OperationStats {
private final AtomicLong requestCount = new AtomicLong(0);
private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong failureCount = new AtomicLong(0);
private final AtomicLong totalDuration = new AtomicLong(0);
private final AtomicLong totalInputTokens = new AtomicLong(0);
private final AtomicLong totalOutputTokens = new AtomicLong(0);
public void recordRequest() {
requestCount.incrementAndGet();
}
public void recordSuccess(long durationMs, int inputTokens, int outputTokens) {
successCount.incrementAndGet();
totalDuration.addAndGet(durationMs);
totalInputTokens.addAndGet(inputTokens);
totalOutputTokens.addAndGet(outputTokens);
}
public void recordFailure(long durationMs) {
failureCount.incrementAndGet();
totalDuration.addAndGet(durationMs);
}
public long getRequestCount() { return requestCount.get(); }
public long getSuccessCount() { return successCount.get(); }
public long getFailureCount() { return failureCount.get(); }
public long getTotalDuration() { return totalDuration.get(); }
public long getTotalInputTokens() { return totalInputTokens.get(); }
public long getTotalOutputTokens() { return totalOutputTokens.get(); }
public long getAverageDuration() {
long count = requestCount.get();
return count > 0 ? totalDuration.get() / count : 0;
}
}
/**
* 指标摘要
*/
public static class MetricsSummary {
private final long totalRequests;
private final long successfulRequests;
private final long failedRequests;
private final long totalInputTokens;
private final long totalOutputTokens;
private final double successRate;
private final double averageResponseTime;
private final Percentiles responseTimePercentiles;
public MetricsSummary(
long totalRequests,
long successfulRequests,
long failedRequests,
long totalInputTokens,
long totalOutputTokens,
double successRate,
double averageResponseTime,
Percentiles responseTimePercentiles
) {
this.totalRequests = totalRequests;
this.successfulRequests = successfulRequests;
this.failedRequests = failedRequests;
this.totalInputTokens = totalInputTokens;
this.totalOutputTokens = totalOutputTokens;
this.successRate = successRate;
this.averageResponseTime = averageResponseTime;
this.responseTimePercentiles = responseTimePercentiles;
}
// Getters
public long getTotalRequests() { return totalRequests; }
public long getSuccessfulRequests() { return successfulRequests; }
public long getFailedRequests() { return failedRequests; }
public long getTotalTokens() { return totalInputTokens + totalOutputTokens; }
public double getSuccessRate() { return successRate; }
public double getAverageResponseTime() { return averageResponseTime; }
public Percentiles getResponseTimePercentiles() { return responseTimePercentiles; }
}
/**
* 百分位数
*/
public static class Percentiles {
private final long p50;
private final long p90;
private final long p95;
private final long p99;
public Percentiles(long p50, long p90, long p95, long p99) {
this.p50 = p50;
this.p90 = p90;
this.p95 = p95;
this.p99 = p99;
}
public long getP50() { return p50; }
public long getP90() { return p90; }
public long getP95() { return p95; }
public long getP99() { return p99; }
}
public static void main(String[] args) {
MetricsCollector collector = new MetricsCollector();
// 模拟一些请求
for (int i = 0; i < 100; i++) {
collector.recordRequest("generateChat");
if (i % 10 != 0) { // 10% 失败率
collector.recordSuccess("generateChat", 500 + (int) (Math.random() * 1000), 100, 200);
} else {
collector.recordFailure("generateChat", 500, "模拟错误");
}
}
// 获取指标摘要
MetricsSummary summary = collector.getMetricsSummary();
System.out.println("╔═══════════════════════════════════════════════════════╗");
System.out.println("║ 指标摘要 ║");
System.out.println("╠═══════════════════════════════════════════════════════╣");
System.out.println(String.format("║ 总请求数: %,8d ║", summary.getTotalRequests()));
System.out.println(String.format("║ 成功请求: %,8d ║", summary.getSuccessfulRequests()));
System.out.println(String.format("║ 失败请求: %,8d ║", summary.getFailedRequests()));
System.out.println(String.format("║ 成功率: %.2f%% ║", summary.getSuccessRate() * 100));
System.out.println(String.format("║ 平均响应时间: %.0fms ║", summary.getAverageResponseTime()));
System.out.println("╠═══════════════════════════════════════════════════════╣");
System.out.println("║ 响应时间百分位数: ║");
System.out.println(String.format("║ P50: %,5dms (中位数) ║", summary.getResponseTimePercentiles().getP50()));
System.out.println(String.format("║ P90: %,5dms ║", summary.getResponseTimePercentiles().getP90()));
System.out.println(String.format("║ P95: %,5dms ║", summary.getResponseTimePercentiles().getP95()));
System.out.println(String.format("║ P99: %,5dms ║", summary.getResponseTimePercentiles().getP99()));
System.out.println("╠═══════════════════════════════════════════════════════╣");
System.out.println(String.format("║ 总 Token: %,8d ║", summary.getTotalTokens()));
System.out.println("╠═══════════════════════════════════════════════════════╣");
System.out.println("║ 预估成本: $%.2f ║", calculateEstimatedCost(summary.getTotalTokens()));
System.out.println("╚═══════════════════════════════════════════════════════╝");
}
private static double calculateEstimatedCost(long totalTokens) {
// 简化:假设每 1000 tokens $0.01
return (double) totalTokens / 100000.0;
}
}
分布式追踪
与 OpenTelemetry 集成
Copy
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.Scope;
import io.opentelemetry.api.OpenTelemetry;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.service.AiServices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 分布式追踪示例
*/
public class DistributedTracing {
private static final Logger logger = LoggerFactory.getLogger(DistributedTracing.class);
/**
* OpenTelemetry Tracer
*/
private final Tracer tracer;
public DistributedTracing(OpenTelemetry openTelemetry) {
this.tracer = openTelemetry.getTracer("langchain4j");
}
/**
* 带追踪的聊天服务
*/
public interface TracedChatService {
String chat(String userId, String message);
}
/**
* 创建带追踪的服务
*/
public TracedChatService createTracedService(ChatModel model) {
return AiServices.builder(TracedChatService.class)
.chatModel(model)
.build();
}
/**
* 执行带追踪的操作
*/
public <T> T executeWithTracing(
String operationName,
String spanKind,
TracedOperation<T> operation
) {
Span span = tracer.spanBuilder(operationName)
.setSpanKind(SpanKind.valueOf(spanKind))
.startSpan();
try (Scope scope = span.makeCurrent()) {
logger.debug("开始追踪: {}", operationName);
// 添加属性
span.setAttribute("application.name", "langchain4j-app");
span.setAttribute("operation.type", operationName);
span.setAttribute("start.time", System.currentTimeMillis());
// 执行操作
T result = operation.execute(span);
// 记录成功
span.setStatus(io.opentelemetry.api.trace.StatusCode.OK);
logger.debug("追踪完成: {}, 结果: {}", operationName, result);
return result;
} catch (Exception e) {
// 记录错误
span.recordException(e);
span.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getMessage());
logger.error("追踪失败: {}", operationName, e);
throw new RuntimeException("操作失败", e);
} finally {
span.end();
}
}
/**
* 追踪操作接口
*/
public interface TracedOperation<T> {
T execute(Span span);
}
/**
* 记录 Token 使用
*/
public void recordTokenUsage(
Span span,
String model,
int inputTokens,
int outputTokens
) {
int totalTokens = inputTokens + outputTokens;
span.setAttribute("llm.model", model);
span.setAttribute("llm.input.tokens", inputTokens);
span.setAttribute("llm.output.tokens", outputTokens);
span.setAttribute("llm.total.tokens", totalTokens);
span.setAttribute("llm.estimated.cost", calculateCost(totalTokens, model));
span.addEvent("token.usage", System.currentTimeMillis());
}
/**
* 记录性能指标
*/
public void recordPerformanceMetrics(
Span span,
long durationMs,
int tokensPerSecond
) {
span.setAttribute("performance.duration.ms", durationMs);
span.setAttribute("performance.tps", tokensPerSecond);
span.setAttribute("performance.efficiency", calculateEfficiency(durationMs, tokensPerSecond));
span.addEvent("performance.metrics", System.currentTimeMillis());
}
/**
* 计算成本
*/
private double calculateCost(int tokens, String model) {
// 简化成本计算
double pricePer1kTokens;
if (model.contains("gpt-4")) {
pricePer1kTokens = 0.03;
} else {
pricePer1kTokens = 0.002;
}
return (double) tokens / 1000.0 * pricePer1kTokens;
}
/**
* 计算效率
*/
private double calculateEfficiency(long durationMs, int tokensPerSecond) {
// 效率 = tokens / (duration / 1000) = tokensPerSecond * 1000 / duration
return (double) tokensPerSecond * 1000 / durationMs;
}
public static void main(String[] args) {
// 简化:创建 OpenTelemetry
OpenTelemetry openTelemetry = OpenTelemetry.noop(); // 实际应该使用真实的实现
DistributedTracing tracing = new DistributedTracing(openTelemetry);
// 带追踪执行操作
String result = tracing.executeWithTracing("generateChat", "INTERNAL", span -> {
span.setAttribute("user.id", "user123");
span.setAttribute("model", "gpt-4o-mini");
try {
Thread.sleep(500); // 模拟操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "操作完成";
});
System.out.println("结果: " + result);
}
}
健康检查
健康检查端点
Copy
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
/**
* 健康检查
*/
public class LangChain4jHealthIndicator implements HealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(LangChain4jHealthIndicator.class);
private final ChatModel model;
private final String apiKey;
public LangChain4jHealthIndicator(String apiKey) {
this.apiKey = apiKey;
this.model = OpenAiChatModel.builder()
.apiKey(apiKey)
.modelName("gpt-4o-mini")
.timeout(java.time.Duration.ofSeconds(10))
.build();
}
@Override
public Health health() {
try {
// 检查 API Key
if (apiKey == null || apiKey.isEmpty()) {
return Health.down()
.withDetail("api_key", "未配置")
.withDetail("model", model.getModelName());
}
// 测试连接
long startTime = System.currentTimeMillis();
model.chat("健康检查");
long duration = System.currentTimeMillis() - startTime;
// 检查响应时间
if (duration > 5000) { // 超过 5 秒
return Health.down()
.withDetail("connection", "响应时间过慢")
.withDetail("duration_ms", duration)
.withDetail("model", model.getModelName());
}
// 健康
return Health.up()
.withDetail("connection", "正常")
.withDetail("duration_ms", duration)
.withDetail("model", model.getModelName())
.withDetail("check_time", Instant.now())
.withDetail("status", "API 可用");
} catch (Exception e) {
logger.error("健康检查失败", e);
return Health.down()
.withDetail("error", e.getMessage())
.withDetail("model", model.getModelName());
}
}
/**
* 详细的健康检查
*/
public DetailedHealth detailedHealth() {
Health basicHealth = health();
if (basicHealth.getStatus() == Status.DOWN) {
return new DetailedHealth(
Status.DOWN,
"基础健康检查失败",
basicHealth.getDetails()
);
}
// 执行详细检查
Map<String, Object> details = new HashMap<>();
details.putAll(basicHealth.getDetails());
try {
// 检查模型配置
details.put("model.name", model.getModelName());
details.put("model.temperature", model.temperature());
details.put("model.max_tokens", model.maxTokens());
details.put("model.timeout", model.timeout());
// 测试简单生成
long startTime = System.currentTimeMillis();
ChatResponse response = model.chat("test");
long duration = System.currentTimeMillis() - startTime;
details.put("test.duration_ms", duration);
details.put("test.tokens", response.tokenUsage());
details.put("test.model", response.model());
// 性能检查
if (duration < 1000) {
details.put("performance.rating", "excellent");
} else if (duration < 2000) {
details.put("performance.rating", "good");
} else if (duration < 5000) {
details.put("performance.rating", "fair");
} else {
details.put("performance.rating", "poor");
}
return new DetailedHealth(
Status.UP,
"所有检查通过",
details
);
} catch (Exception e) {
logger.error("详细健康检查失败", e);
details.put("error", e.getMessage());
return new DetailedHealth(
Status.DOWN,
"详细检查失败",
details
);
}
}
/**
* 详细健康结果
*/
public static class DetailedHealth extends Health {
private final String message;
public DetailedHealth(Status status, String message, Map<String, Object> details) {
super(status, details);
this.message = message;
}
public String getMessage() {
return message;
}
}
public static void main(String[] args) {
LangChain4jHealthIndicator health = new LangChain4jHealthIndicator(
System.getenv("OPENAI_API_KEY")
);
// 基础健康检查
Health basicHealth = health.health();
System.out.println("基础健康检查:");
System.out.println("状态: " + basicHealth.getStatus());
System.out.println("详情: " + basicHealth.getDetails());
System.out.println();
// 详细健康检查
DetailedHealth detailedHealth = health.detailedHealth();
System.out.println("详细健康检查:");
System.out.println("状态: " + detailedHealth.getStatus());
System.out.println("消息: " + detailedHealth.getMessage());
System.out.println("详情: " + detailedHealth.getDetails());
}
}
告警系统
指标告警配置
Copy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
/**
* 告警系统
*/
public class AlertingSystem {
private static final Logger logger = LoggerFactory.getLogger(AlertingSystem.class);
// 告警规则
private final ConcurrentHashMap<String, AlertRule> alertRules = new ConcurrentHashMap<>();
// 调度器
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 添加告警规则
*/
public void addAlertRule(String ruleId, AlertRule rule) {
alertRules.put(ruleId, rule);
logger.info("添加告警规则: {} - {}", ruleId, rule);
}
/**
* 添加阈值告警
*/
public void addThresholdAlert(
String ruleId,
String metricName,
Function<Double, Double> valueSupplier,
double threshold,
String comparison,
long checkIntervalMs
) {
AlertRule rule = new ThresholdAlertRule(
metricName,
valueSupplier,
threshold,
comparison,
checkIntervalMs
);
addAlertRule(ruleId, rule);
// 定期检查
scheduler.scheduleAtFixedRate(
() -> checkAndAlert(ruleId, rule),
0,
checkIntervalMs,
TimeUnit.MILLISECONDS
);
}
/**
* 添加速率告警
*/
public void addRateAlert(
String ruleId,
String metricName,
Function<Double, Double> valueSupplier,
int timeWindowMs,
int threshold
) {
AlertRule rule = new RateAlertRule(
metricName,
valueSupplier,
timeWindowMs,
threshold
);
addAlertRule(ruleId, rule);
// 定期检查
scheduler.scheduleAtFixedRate(
() -> checkAndAlert(ruleId, rule),
0,
1000, // 每秒检查
TimeUnit.MILLISECONDS
);
}
/**
* 检查并告警
*/
private void checkAndAlert(String ruleId, AlertRule rule) {
try {
AlertResult result = rule.check();
if (result.isAlertTriggered()) {
triggerAlert(ruleId, rule, result);
}
} catch (Exception e) {
logger.error("检查告警规则 {} 失败", ruleId, e);
}
}
/**
* 触发告警
*/
private void triggerAlert(String ruleId, AlertRule rule, AlertResult result) {
// 记录告警日志
logger.warn("告警触发 - 规则: {}, 结果: {}", ruleId, result);
// 发送通知(邮件、短信、Slack 等)
sendNotification(ruleId, rule, result);
// 记录指标
recordAlertMetric(ruleId, result);
}
/**
* 发送通知
*/
private void sendNotification(String ruleId, AlertRule rule, AlertResult result) {
// 实际实现中,这里会调用通知服务
// 例如:发送邮件、Slack 消息、PagerDuty 告警等
logger.info("发送通知: {}", result.getMessage());
}
/**
* 记录告警指标
*/
private void recordAlertMetric(String ruleId, AlertResult result) {
// 记录到指标系统
logger.info("告警指标: rule={}, severity={}", ruleId, result.getSeverity());
}
/**
* 告警规则接口
*/
public interface AlertRule {
AlertResult check();
String getDescription();
String getSeverity();
}
/**
* 告警结果
*/
public static class AlertResult {
private final boolean alertTriggered;
private final String message;
private final double currentValue;
private final double threshold;
private final String severity;
public AlertResult(boolean alertTriggered, String message, double currentValue, double threshold, String severity) {
this.alertTriggered = alertTriggered;
this.message = message;
this.currentValue = currentValue;
this.threshold = threshold;
this.severity = severity;
}
public boolean isAlertTriggered() { return alertTriggered; }
public String getMessage() { return message; }
public String getSeverity() { return severity; }
}
/**
* 阈值告警规则
*/
public static class ThresholdAlertRule implements AlertRule {
private final String metricName;
private final Function<Double, Double> valueSupplier;
private final double threshold;
private final String comparison;
public ThresholdAlertRule(
String metricName,
Function<Double, Double> valueSupplier,
double threshold,
String comparison
) {
this.metricName = metricName;
this.valueSupplier = valueSupplier;
this.threshold = threshold;
this.comparison = comparison;
}
@Override
public AlertResult check() {
double currentValue = valueSupplier.apply(0.0);
boolean alertTriggered = switch (comparison) {
case ">" -> currentValue > threshold;
case ">=" -> currentValue >= threshold;
case "<" -> currentValue < threshold;
case "<=" -> currentValue <= threshold;
default -> false;
};
String message = String.format("%s = %.2f, 阈值 = %.2f (%s)",
metricName, currentValue, threshold, comparison);
return new AlertResult(
alertTriggered,
message,
currentValue,
threshold,
alertTriggered ? "warning" : "info"
);
}
@Override
public String getDescription() {
return String.format("阈值告警 - %s %s %.2f", comparison, metricName, threshold);
}
@Override
public String getSeverity() {
return "threshold";
}
}
/**
* 速率告警规则
*/
public static class RateAlertRule implements AlertRule {
private final String metricName;
private final Function<Double, Double> valueSupplier;
private final int timeWindowMs;
private final int threshold;
public RateAlertRule(
String metricName,
Function<Double, Double> valueSupplier,
int timeWindowMs,
int threshold
) {
this.metricName = metricName;
this.valueSupplier = valueSupplier;
this.timeWindowMs = timeWindowMs;
this.threshold = threshold;
}
@Override
public AlertResult check() {
// 简化:实际实现应该记录时间窗口内的值
double currentValue = valueSupplier.apply(0.0);
double rate = currentValue; // 简化:应该是 值/时间
boolean alertTriggered = rate > threshold;
String message = String.format("%s = %.2f/min, 阈值 = %d/min",
metricName, rate, threshold);
return new AlertResult(
alertTriggered,
message,
rate,
threshold,
alertTriggered ? "critical" : "info"
);
}
@Override
public String getDescription() {
return String.format("速率告警 - %s 每 %d 分钟 %d 次", metricName, timeWindowMs/60000, threshold);
}
@Override
public String getSeverity() {
return "rate";
}
}
/**
* 关闭调度器
*/
public void shutdown() {
scheduler.shutdown();
logger.info("告警系统已关闭");
}
public static void main(String[] args) {
AlertingSystem alerting = new AlertingSystem();
// 添加响应时间告警
alerting.addThresholdAlert(
"response_time_high",
"平均响应时间",
() -> System.currentTimeMillis(), // 应该从指标系统获取
5000, // 5 秒阈值
">",
10000 // 每 10 秒检查
);
// 添加错误率告警
alerting.addThresholdAlert(
"error_rate_high",
"错误率",
() -> 0.05, // 应该从指标系统获取
0.1, // 10% 阈值
">",
30000 // 每 30 秒检查
);
// 保持运行
Scanner scanner = new Scanner(System.in);
System.out.println("告警系统已启动,按回车键停止");
scanner.nextLine();
alerting.shutdown();
scanner.close();
}
}
测试代码示例
Copy
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;
/**
* 可观测性测试
*/
class ObservabilityTest {
private MetricsCollector metrics;
@BeforeEach
void setUp() {
this.metrics = new MetricsCollector();
}
@Test
void should_record_success_metrics() {
metrics.recordRequest("test_operation");
metrics.recordSuccess("test_operation", 500, 100, 200);
MetricsSummary summary = metrics.getMetricsSummary();
assertEquals(1, summary.getTotalRequests());
assertEquals(1, summary.getSuccessfulRequests());
assertEquals(300, summary.getTotalTokens());
assertTrue(summary.getSuccessRate() > 0.99);
}
@Test
void should_record_failure_metrics() {
metrics.recordRequest("test_operation");
metrics.recordFailure("test_operation", 500, "模拟错误");
MetricsSummary summary = metrics.getMetricsSummary();
assertEquals(1, summary.getTotalRequests());
assertEquals(1, summary.getFailedRequests());
assertEquals(0, summary.getSuccessCount());
assertTrue(summary.getSuccessRate() < 0.01);
}
@Test
void should_calculate_percentiles() {
// 添加一些响应时间
metrics.recordRequest("test");
metrics.recordSuccess("test", 100, 50, 50);
metrics.recordSuccess("test", 200, 100, 100);
metrics.recordSuccess("test", 300, 150, 150);
metrics.recordSuccess("test", 400, 200, 200);
metrics.recordSuccess("test", 500, 250, 250);
metrics.recordSuccess("test", 600, 300, 300);
metrics.recordSuccess("test", 700, 350, 350);
metrics.recordSuccess("test", 800, 400, 400);
metrics.recordSuccess("test", 900, 450, 450);
metrics.recordSuccess("test", 1000, 500, 500);
metrics.recordSuccess("test", 2000, 1000, 1000);
metrics.recordSuccess("test", 3000, 1500, 1500);
metrics.recordSuccess("test", 5000, 2500, 2500);
MetricsSummary summary = metrics.getMetricsSummary();
Percentiles percentiles = summary.getResponseTimePercentiles();
assertNotNull(percentiles);
assertTrue(percentiles.getP50() > 0);
assertTrue(percentiles.getP90() > percentiles.getP50());
assertTrue(percentiles.getP95() > percentiles.getP90());
assertTrue(percentiles.getP99() > percentiles.getP95());
}
@Test
void should_reset_metrics() {
metrics.recordRequest("test");
metrics.recordSuccess("test", 500, 100, 200);
metrics.reset();
MetricsSummary summary = metrics.getMetricsSummary();
assertEquals(0, summary.getTotalRequests());
assertEquals(0, summary.getTotalRequests());
}
}
实践练习
练习 1:实现实时监控面板
Copy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 实时监控面板
*/
public class RealTimeMonitoringDashboard {
private static final Logger logger = LoggerFactory.getLogger(RealTimeMonitoringDashboard.class);
private final MetricsCollector metrics;
private final ScheduledExecutorService scheduler;
public RealTimeMonitoringDashboard(MetricsCollector metrics) {
this.metrics = metrics;
this.scheduler = Executors.newScheduledThreadPool(1);
}
/**
* 启动监控
*/
public void start() {
logger.info("启动实时监控面板");
// 每秒更新一次显示
scheduler.scheduleAtFixedRate(this::updateDisplay, 0, 1, TimeUnit.SECONDS);
}
/**
* 停止监控
*/
public void stop() {
scheduler.shutdown();
logger.info("监控面板已停止");
}
/**
* 更新显示
*/
private void updateDisplay() {
MetricsSummary summary = metrics.getMetricsSummary();
// 清屏
System.out.print("\033[H"); // ANSI 清屏
// 显示标题
System.out.println("╔═══════════════════════════════════════════════════════════╗");
System.out.println("║ 实时监控面板 ║");
System.out.println("╠══════════════════════════════════════════════════════════╣");
// 显示指标
displaySection("请求统计", getRequestsStats(summary));
displaySection("性能指标", getPerformanceStats(summary));
displaySection("Token 使用", getTokenStats(summary));
displaySection("成本估算", getCostStats(summary));
System.out.println("╠══════════════════════════════════════════════════════════╣");
System.out.println("║ 更新时间: " + LocalDateTime.now().format(DateTimeFormatter.of("HH:mm:ss")));
System.out.println("╚═══════════════════════════════════════════════════════════╝");
}
/**
* 显示部分
*/
private void displaySection(String title, String content) {
System.out.println("║ " + String.format("%-18s", title));
System.out.println("╠─────────────────────────────────────────────────────────────────╣");
System.out.println("║ " + content);
System.out.println("╠─────────────────────────────────────────────────────────────────╣");
}
/**
* 获取请求统计
*/
private String getRequestsStats(MetricsSummary summary) {
return String.format(
"总请求: %,5d | 成功: %,5d | 失败: %,5d | 成功率: %.1f%%",
summary.getTotalRequests(),
summary.getSuccessfulRequests(),
summary.getFailedRequests(),
summary.getSuccessRate() * 100
);
}
/**
* 获取性能指标
*/
private String getPerformanceStats(MetricsSummary summary) {
double avgResponseTime = summary.getAverageResponseTime();
Percentiles percentiles = summary.getResponseTimePercentiles();
return String.format(
"平均: %.0fms | P50: %,4dms | P90: %,4dms | P95: %,4dms | P99: %,4dms",
avgResponseTime,
percentiles.getP50(),
percentiles.getP90(),
percentiles.getP95(),
percentiles.getP99()
);
}
/**
* 获取 Token 统计
*/
private String getTokenStats(MetricsSummary summary) {
return String.format(
"总 Token: %,8d | 输入: %,8d | 输出: %,8d",
summary.getTotalTokens(),
summary.getTotalInputTokens(),
summary.getTotalOutputTokens()
);
}
/**
* 获取成本估算
*/
private String getCostStats(MetricsSummary summary) {
double estimatedCost = calculateEstimatedCost(summary.getTotalTokens());
return String.format(
"总成本: $%.2f | 平均每请求: $%.4f",
estimatedCost,
summary.getTotalRequests() > 0 ? estimatedCost / summary.getTotalRequests() : 0
);
}
/**
* 计算估算成本
*/
private double calculateEstimatedCost(long totalTokens) {
// 简化:假设每 1000 tokens $0.01
return (double) totalTokens / 100000.0;
}
public static void main(String[] args) throws InterruptedException {
MetricsCollector metrics = new MetricsCollector();
RealTimeMonitoringDashboard dashboard = new RealTimeMonitoringDashboard(metrics);
// 模拟一些请求
for (int i = 0; i < 50; i++) {
metrics.recordRequest("generateChat");
if (i % 10 != 0) {
metrics.recordSuccess("generateChat", 500 + (int) (Math.random() * 1000), 100, 200);
} else {
metrics.recordFailure("generateChat", 500, "模拟错误");
}
Thread.sleep(100); // 模拟请求间隔
}
// 启动监控
dashboard.start();
System.out.println("监控面板已启动,按 Ctrl+C 停止");
// 保持运行
Thread.currentThread().join();
}
}
练习 2:实现 SLA 监控
Copy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
/**
* SLA (Service Level Agreement) 监控
*/
public class SlaMonitor {
private static final Logger logger = LoggerFactory.getLogger(SlaMonitor.class);
// SLA 定义
private final double maxResponseTimeMs;
private final double maxErrorRate;
private final double maxTokenCost;
private final long monitoringWindowMs;
// SLA 违规记录
private final List<SlaViolation> violations;
public SlaMonitor(
double maxResponseTimeMs,
double maxErrorRate,
double maxTokenCost,
long monitoringWindowMs
) {
this.maxResponseTimeMs = maxResponseTimeMs;
this.maxErrorRate = maxErrorRate;
this.maxTokenCost = maxTokenCost;
this.monitoringWindowMs = monitoringWindowMs;
this.violations = new ArrayList<>();
}
/**
* 检查 SLA 合规性
*/
public SlaComplianceReport checkCompliance(MetricsSummary metrics) {
List<SlaViolation> newViolations = new ArrayList<>();
// 检查响应时间
if (metrics.getAverageResponseTime() > maxResponseTimeMs) {
newViolations.add(new SlaViolation(
"response_time",
"平均响应时间",
metrics.getAverageResponseTime(),
maxResponseTimeMs,
maxResponseTimeMs / metrics.getAverageResponseTime(),
"高"
));
}
// 检查错误率
double errorRate = 1.0 - metrics.getSuccessRate();
if (errorRate > maxErrorRate) {
newViolations.add(new SlaViolation(
"error_rate",
"错误率",
errorRate * 100,
maxErrorRate * 100,
errorRate / maxErrorRate,
"高"
));
}
// 检查 P95 响应时间
if (metrics.getResponseTimePercentiles().getP95() > maxResponseTimeMs) {
newViolations.add(new SlaViolation(
"p95_response_time",
"P95 响应时间",
metrics.getResponseTimePercentiles().getP95(),
maxResponseTimeMs,
metrics.getResponseTimePercentiles().getP95() / maxResponseTimeMs,
"中"
));
}
return new SlaComplianceReport(newViolations);
}
/**
* 获取 SLA 合规分数
*/
public double getComplianceScore(SlaComplianceReport report) {
if (report.getViolations().isEmpty()) {
return 100.0; // 完全合规
}
// 计算违规影响
double totalImpact = 0;
for (SlaViolation violation : report.getViolations()) {
totalImpact += violation.getSeverity().equals("高") ? 10 : 5;
}
return Math.max(0, 100 - totalImpact);
}
/**
* 获取违规详情
*/
public List<SlaViolation> getViolations() {
return new ArrayList<>(violations);
}
/**
* 清空违规记录
*/
public void clearViolations() {
violations.clear();
logger.info("SLA 违规记录已清空");
}
/**
* SLA 违规
*/
public static class SlaViolation {
private final String metric;
private final String name;
private final double actualValue;
private final double threshold;
private final double ratio;
private final String severity;
public SlaViolation(String metric, String name, double actualValue, double threshold, double ratio, String severity) {
this.metric = metric;
this.name = name;
this.actualValue = actualValue;
this.threshold = threshold;
this.ratio = ratio;
this.severity = severity;
}
public String getMetric() { return metric; }
public String getName() { return name; }
public double getActualValue() { return actualValue; }
public double getThreshold() { return threshold; }
public double getRatio() { return ratio; }
public String getSeverity() { return severity; }
}
/**
* SLA 合规报告
*/
public static class SlaComplianceReport {
private final List<SlaViolation> violations;
public SlaComplianceReport(List<SlaViolation> violations) {
this.violations = violations;
}
public List<SlaViolation> getViolations() {
return violations;
}
public boolean isCompliant() {
return violations.isEmpty();
}
public int getViolationCount() {
return violations.size();
}
}
public static void main(String[] args) {
SlaMonitor sla = new SlaMonitor(
3000, // 最大响应时间 3 秒
0.05, // 最大错误率 5%
10.0, // 最大成本 $10
60000 // 监控窗口 1 分钟
);
MetricsCollector metrics = new MetricsCollector();
// 模拟一些请求
for (int i = 0; i < 100; i++) {
metrics.recordRequest("generateChat");
metrics.recordSuccess("generateChat", 500 + (int) (Math.random() * 2000), 100, 200);
}
MetricsSummary summary = metrics.getMetricsSummary();
SlaComplianceReport report = sla.checkCompliance(summary);
// 显示报告
System.out.println("╔═══════════════════════════════════════════════════════════╗");
System.out.println("║ SLA 合规报告 ║");
System.out.println("╠═══════════════════════════════════════════════════════════╣");
System.out.println("║ 合规状态: " + (report.isCompliant() ? "✅ 合规" : "❌ 违规"));
System.out.println("║ 违规数量: " + report.getViolationCount());
System.out.println("║ 合规分数: " + String.format("%.1f/100", sla.getComplianceScore(report)));
System.out.println("╠═════════════════════════════════════════════════════════╣");
if (!report.getViolations().isEmpty()) {
System.out.println("║ 违规详情: ║");
System.out.println("╠═══════════════════════════════════════════════════════════╣");
for (SlaViolation violation : report.getViolations()) {
System.out.printf("║ %s: %.2f (阈值: %.2f, 比例: %.2fx) [%s] %s%n",
violation.getName(),
violation.getActualValue(),
violation.getThreshold(),
violation.getRatio(),
violation.getSeverity(),
violation.getSeverity()
);
}
System.out.println("╠═════════════════════════════════════════════════════════╣");
}
System.out.println("╚═════════════════════════════════════════════════════════════════╝");
}
}
总结
本章要点
-
可观测性概念
- 日志、指标、追踪三支柱
- 从外部观察系统内部状态
- 对于 LLM 应用尤其重要
-
LangChain4j 可观测性
- 内置监控支持
- 请求和响应日志
- Token 使用追踪
- 性能指标收集
-
指标收集
- 请求/成功/失败计数
- 响应时间分布
- Token 使用统计
- 成本计算
-
分布式追踪
- OpenTelemetry 集成
- 请求链路追踪
- 跨服务上下文传播
-
监控和告警
- 健康检查
- 实时监控面板
- SLA 监控
- 智能告警
下一步
在下一章节中,我们将学习:- 测试和评估策略
- 集成测试框架
- 性能基准测试
- A/B 测试
- 错误注入测试
常见问题
Q1:可观测性与日志有什么区别? A:- 日志 - 离散的事件记录,主要用于调试
- 可观测性 - 结构化的指标和追踪,用于监控和分析
- 可观测性包括日志,但更加全面和系统化
- 业务关键指标(响应时间、错误率)
- 资源指标(Token 使用、API 调用次数)
- 性能指标(吞吐量、延迟)
- 成本指标(预算、实际花费)
- 实时计算和展示
- 存储到时序数据库(如 Prometheus)
- 定期聚合和归档
- 配置数据保留策略
- 基于 SLA 要求设置
- 使用历史数据确定合理阈值
- 配置分级告警(info/warning/critical)
- 避免告警疲劳(限流、冷却时间)
- 使用异步日志和指标收集
- 考虑采样策略(如 10% 采样)
- 避免在热路径上执行复杂计算
- 使用高效的数据结构和序列化
参考资料
版权归属于 LangChat Team
官网:https://langchat.cn

