版权归属于 LangChat Team
官网:https://langchat.cn
21 - 错误处理和重试
版本说明
本文档基于 LangChain4j 1.10.0 版本编写。学习目标
通过本章节学习,你将能够:- 理解 LangChain4j 中错误处理的重要性
- 掌握常见的错误类型和处理策略
- 学会实现自动重试机制
- 理解超时和网络错误的处理
- 掌握回退策略和错误恢复
- 实现一个完整的错误处理解决方案
前置知识
- 完成《01 - LangChain4j 简介》章节
- 完成《02 - 你的第一个 Chat 应用》章节
- Java 异常处理基础
核心概念
为什么需要错误处理?
LLM 应用的错误类型:-
网络错误
- 连接超时
- DNS 解析失败
- SSL/TLS 握手错误
-
API 错误
- 认证失败(401)
- 速率限制(429)
- 服务不可用(503)
- 服务器错误(500)
-
模型错误
- 模型不存在
- Token 超限
- 上下文超长
-
客户端错误
- 请求格式错误
- 参数验证失败
- JSON 解析错误
错误处理策略
Copy
┌─────────────────────────────────────────────────────────────────┐
│ 错误处理策略 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 立即失败 2. 简单重试 3. 指数退避 │
│ (严重错误) (临时错误) (网络错误) │
│ │
│ 4. 熔断器 5. 熔断重试 6. 熔断恢复 │
│ (保护服务) (保护服务) (自动恢复) │
│ │
└─────────────────────────────────────────────────────────────────┘
基础错误处理
try-catch 示例
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 基础错误处理示例
*/
public class BasicErrorHandling {
private static final Logger logger = LoggerFactory.getLogger(BasicErrorHandling.class);
/**
* 简单的对话方法(带错误处理)
*/
public static String chatWithErrorHandling(ChatModel model, String message) {
try {
// 尝试生成响应
logger.info("开始聊天,消息: {}", message);
String response = model.chat(message);
logger.info("聊天成功");
return response;
} catch (java.net.SocketTimeoutException e) {
// 处理超时错误
logger.error("请求超时: {}", e.getMessage());
throw new RuntimeException("请求超时,请稍后重试", e);
} catch (java.net.ConnectException e) {
// 处理连接错误
logger.error("连接失败: {}", e.getMessage());
throw new RuntimeException("无法连接到服务器", e);
} catch (java.io.IOException e) {
// 处理 IO 错误
logger.error("IO 错误: {}", e.getMessage());
throw new RuntimeException("网络通信错误", e);
} catch (Exception e) {
// 处理未知错误
logger.error("未知错误: {}", e.getMessage());
throw new RuntimeException("发生未知错误", e);
}
}
/**
* 创建带错误处理的模型
*/
public static ChatModel createModelWithErrorHandling(String apiKey) {
try {
return OpenAiChatModel.builder()
.apiKey(apiKey)
.modelName("gpt-4o-mini")
.timeout(Duration.ofSeconds(30))
.build();
} catch (Exception e) {
logger.error("创建模型失败: {}", e.getMessage());
throw new RuntimeException("无法创建模型", e);
}
}
public static void main(String[] args) {
String apiKey = System.getenv("OPENAI_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
logger.error("未设置 OPENAI_API_KEY 环境变量");
return;
}
// 创建模型
ChatModel model = createModelWithErrorHandling(apiKey);
// 带错误处理的聊天
try {
String response = chatWithErrorHandling(model, "你好,请介绍一下自己");
System.out.println("AI: " + response);
} catch (RuntimeException e) {
System.err.println("错误: " + e.getMessage());
}
}
}
自动重试机制
指数退避重试
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
/**
* 指数退避重试示例
*/
public class ExponentialBackoffRetry {
private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
/**
* 重试配置
*/
public static class RetryConfig {
private final int maxRetries;
private final Duration initialDelay;
private final Duration maxDelay;
private final double backoffFactor;
private final Class<? extends Exception>[] retryableExceptions;
private final Class<? extends Exception>[] nonRetryableExceptions;
public RetryConfig(
int maxRetries,
Duration initialDelay,
Duration maxDelay,
double backoffFactor,
Class<? extends Exception>[] retryableExceptions,
Class<? extends Exception>[] nonRetryableExceptions
) {
this.maxRetries = maxRetries;
this.initialDelay = initialDelay;
this.maxDelay = maxDelay;
this.backoffFactor = backoffFactor;
this.retryableExceptions = retryableExceptions;
this.nonRetryableExceptions = nonRetryableExceptions;
}
// Getters
public int getMaxRetries() { return maxRetries; }
public Duration getInitialDelay() { return initialDelay; }
public Duration getMaxDelay() { return maxDelay; }
public double getBackoffFactor() { return backoffFactor; }
public Class<? extends Exception>[] getRetryableExceptions() { return retryableExceptions; }
public Class<? extends Exception>[] getNonRetryableExceptions() { return nonRetryableExceptions; }
}
/**
* 重试结果
*/
public static class RetryResult<T> {
private final T result;
private final int attemptCount;
private final long totalDelayMs;
private final Exception lastException;
public RetryResult(T result, int attemptCount, long totalDelayMs, Exception lastException) {
this.result = result;
this.attemptCount = attemptCount;
this.totalDelayMs = totalDelayMs;
this.lastException = lastException;
}
public T getResult() { return result; }
public int getAttemptCount() { return attemptCount; }
public long getTotalDelayMs() { return totalDelayMs; }
public Exception getLastException() { return lastException; }
}
/**
* 创建默认重试配置
*/
public static RetryConfig createDefaultRetryConfig() {
return new RetryConfig(
3, // 最多重试 3 次
Duration.ofMillis(1000), // 初始延迟 1 秒
Duration.ofMillis(10000), // 最大延迟 10 秒
2.0, // 指数退避因子
new Class[]{
java.net.SocketTimeoutException.class,
java.net.ConnectException.class,
java.io.IOException.class
},
new Class[]{
IllegalArgumentException.class,
NullPointerException.class
}
);
}
/**
* 带重试的方法执行
*/
public static <T> T executeWithRetry(
String operationName,
RetryOperation<T> operation,
RetryConfig config
) throws Exception {
int attempt = 0;
Exception lastException = null;
long totalDelayMs = 0;
while (attempt <= config.getMaxRetries()) {
attempt++;
try {
logger.info("执行 '{}', 尝试: {}/{}", operationName, attempt, config.getMaxRetries());
// 执行操作
T result = operation.execute();
logger.info("操作 '{}' 成功,尝试次数: {}", operationName, attempt);
return result;
} catch (Exception e) {
lastException = e;
// 检查是否为不可重试的错误
if (isNonRetryableException(e, config)) {
logger.error("操作 '{}' 失败,错误不可重试: {}", operationName, e.getMessage());
throw new RuntimeException("操作失败: " + operationName, e);
}
// 检查是否已达到最大重试次数
if (attempt > config.getMaxRetries()) {
logger.error("操作 '{}' 失败,已达到最大重试次数: {}",
operationName, config.getMaxRetries());
throw new RuntimeException(
String.format("操作 '%s' 失败,已重试 %d 次", operationName, config.getMaxRetries()),
lastException
);
}
// 计算延迟
long delayMs = calculateDelayMs(attempt, config);
totalDelayMs += delayMs;
logger.warn("操作 '{}' 失败(尝试 {}/{}),{} ms 后重试: {}",
operationName, attempt, config.getMaxRetries(),
delayMs, e.getMessage());
// 等待重试
Thread.sleep(delayMs);
}
}
// 不应该到达这里
throw new RuntimeException("重试逻辑错误", lastException);
}
/**
* 计算延迟
*/
private static long calculateDelayMs(int attempt, RetryConfig config) {
double delayMs = config.getInitialDelay().toMillis() *
Math.pow(config.getBackoffFactor(), attempt - 1);
long delay = (long) delayMs;
// 限制最大延迟
return Math.min(delay, config.getMaxDelay().toMillis());
}
/**
* 判断是否为不可重试的错误
*/
private static boolean isNonRetryableException(Exception e, RetryConfig config) {
for (Class<? extends Exception> exceptionClass : config.getNonRetryableExceptions()) {
if (exceptionClass.isInstance(e)) {
return true;
}
}
return false;
}
/**
* 重试操作接口
*/
public interface RetryOperation<T> {
T execute() throws Exception;
}
/**
* 使用示例
*/
public static void main(String[] args) {
String apiKey = System.getenv("OPENAI_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("未设置 OPENAI_API_KEY 环境变量");
return;
}
// 创建模型
ChatModel model = OpenAiChatModel.builder()
.apiKey(apiKey)
.modelName("gpt-4o-mini")
.timeout(Duration.ofSeconds(5)) // 短超时以触发重试
.build();
// 创建重试配置
RetryConfig config = createDefaultRetryConfig();
// 创建重试操作
RetryOperation<String> operation = () -> {
return model.chat("你好,请介绍一下自己");
};
// 带重试执行
try {
String result = executeWithRetry("聊天", operation, config);
System.out.println("AI: " + result);
} catch (RuntimeException e) {
System.err.println("错误: " + e.getMessage());
if (e.getCause() != null) {
System.err.println("根本原因: " + e.getCause().getMessage());
}
}
}
}
断路器模式
熔断器实现
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 断路器模式示例
*/
public class CircuitBreakerExample {
private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerExample.class);
/**
* 断路器状态
*/
public enum CircuitState {
CLOSED, // 正常状态,允许请求
OPEN, // 熔断状态,拒绝请求
HALF_OPEN // 半开状态,允许少量请求以测试恢复
}
/**
* 断路器配置
*/
public static class CircuitBreakerConfig {
private final int failureThreshold;
private final Duration timeout;
private final Duration resetTimeout;
private final int halfOpenMaxCalls;
public CircuitBreakerConfig(
int failureThreshold,
Duration timeout,
Duration resetTimeout,
int halfOpenMaxCalls
) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
this.resetTimeout = resetTimeout;
this.halfOpenMaxCalls = halfOpenMaxCalls;
}
// Getters
public int getFailureThreshold() { return failureThreshold; }
public Duration getTimeout() { return timeout; }
public Duration getResetTimeout() { return resetTimeout; }
public int getHalfOpenMaxCalls() { return halfOpenMaxCalls; }
}
/**
* 断路器
*/
public static class CircuitBreaker {
private final CircuitBreakerConfig config;
private final AtomicInteger failureCount;
private final AtomicInteger successCount;
private volatile CircuitState state;
private volatile long lastFailureTime;
public CircuitBreaker(CircuitBreakerConfig config) {
this.config = config;
this.failureCount = new AtomicInteger(0);
this.successCount = new AtomicInteger(0);
this.state = CircuitState.CLOSED;
}
/**
* 获取当前状态
*/
public CircuitState getState() {
return state;
}
/**
* 允许请求
*/
public synchronized boolean allowRequest() {
switch (state) {
case OPEN:
long timeSinceLastFailure = System.currentTimeMillis() - lastFailureTime;
if (timeSinceLastFailure > config.getResetTimeout().toMillis()) {
state = CircuitState.HALF_OPEN;
successCount.set(0);
logger.info("断路器从 OPEN 转换到 HALF_OPEN");
} else {
logger.warn("断路器状态 OPEN,拒绝请求");
}
return state == CircuitState.CLOSED || state == CircuitState.HALF_OPEN;
case HALF_OPEN:
logger.info("断路器状态 HALF_OPEN,允许测试请求");
return successCount.get() < config.getHalfOpenMaxCalls();
case CLOSED:
default:
return true;
}
}
/**
* 记录成功
*/
public synchronized void recordSuccess() {
if (state == CircuitState.HALF_OPEN) {
int currentSuccess = successCount.incrementAndGet();
logger.info("断路器 HALF_OPEN 状态,成功次数: {}", currentSuccess);
if (currentSuccess >= config.getHalfOpenMaxCalls()) {
state = CircuitState.CLOSED;
failureCount.set(0);
logger.info("断路器从 HALF_OPEN 转换到 CLOSED");
}
} else {
logger.info("断路器状态 CLOSED,记录成功");
successCount.incrementAndGet();
}
}
/**
* 记录失败
*/
public synchronized void recordFailure() {
int currentFailures = failureCount.incrementAndGet();
lastFailureTime = System.currentTimeMillis();
logger.warn("断路器记录失败,当前失败数: {}, 阈值: {}",
currentFailures, config.getFailureThreshold());
if (currentFailures >= config.getFailureThreshold()) {
state = CircuitState.OPEN;
logger.error("断路器从 {} 转换到 OPEN", state);
}
if (state == CircuitState.HALF_OPEN) {
state = CircuitState.OPEN;
logger.error("断路器从 HALF_OPEN 转换到 OPEN");
}
}
/**
* 重置断路器
*/
public synchronized void reset() {
state = CircuitState.CLOSED;
failureCount.set(0);
successCount.set(0);
logger.info("断路器已重置");
}
}
/**
* 带断路器的方法执行
*/
public static <T> T executeWithCircuitBreaker(
String operationName,
RetryOperation<T> operation,
CircuitBreaker circuitBreaker
) throws Exception {
// 检查是否允许请求
if (!circuitBreaker.allowRequest()) {
throw new RuntimeException(
String.format("断路器状态 %s,拒绝操作 '%s'",
circuitBreaker.getState(), operationName)
);
}
try {
// 执行操作
T result = operation.execute();
// 记录成功
circuitBreaker.recordSuccess();
logger.info("操作 '{}' 成功", operationName);
return result;
} catch (Exception e) {
// 记录失败
circuitBreaker.recordFailure();
logger.error("操作 '{}' 失败: {}", operationName, e.getMessage());
throw e;
}
}
/**
* 重试操作接口
*/
public interface RetryOperation<T> {
T execute() throws Exception;
}
/**
* 使用示例
*/
public static void main(String[] args) {
String apiKey = System.getenv("OPENAI_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("未设置 OPENAI_API_KEY 环境变量");
return;
}
// 创建模型
ChatModel model = OpenAiChatModel.builder()
.apiKey(apiKey)
.modelName("gpt-4o-mini")
.timeout(Duration.ofSeconds(5))
.build();
// 创建断路器配置
CircuitBreakerConfig config = new CircuitBreakerConfig(
5, // 失败阈值
Duration.ofSeconds(10), // 超时
Duration.ofSeconds(30), // 重置时间
2 // 半开最大调用
);
// 创建断路器
CircuitBreaker circuitBreaker = new CircuitBreaker(config);
// 创建重试操作
RetryOperation<String> operation = () -> {
return model.chat("测试消息");
};
// 执行多次请求
for (int i = 0; i < 10; i++) {
try {
String result = executeWithCircuitBreaker("聊天", operation, circuitBreaker);
System.out.printf("[请求 %d] 成功: %s\n", i + 1, result);
System.out.println();
} catch (Exception e) {
System.err.printf("[请求 %d] 失败: %s\n\n", i + 1, e.getMessage());
}
}
// 重置断路器
System.out.println("重置断路器");
circuitBreaker.reset();
}
}
错误恢复
优雅降级
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* 优雅降级示例
*/
public class GracefulDegradation {
private static final Logger logger = LoggerFactory.getLogger(GracefulDegradation.class);
/**
* 响应类型
*/
public enum ResponseType {
FULL, // 完整响应
CACHED, // 缓存响应
SIMPLIFIED, // 简化响应
FALLBACK // 降级响应
}
/**
* 响应结果
*/
public static class ResponseResult<T> {
private final T response;
private final ResponseType responseType;
private final long latencyMs;
private final String message;
public ResponseResult(T response, ResponseType responseType,
long latencyMs, String message) {
this.response = response;
this.responseType = responseType;
this.latencyMs = latencyMs;
this.message = message;
}
public T getResponse() { return response; }
public ResponseType getResponseType() { return responseType; }
public long getLatencyMs() { return latencyMs; }
public String getMessage() { return message; }
}
/**
* 缓存
*/
public static class ResponseCache {
private final Map<String, String> cache;
private final long maxAgeMs;
public ResponseCache(long maxAgeMs) {
this.cache = new HashMap<>();
this.maxAgeMs = maxAgeMs;
}
/**
* 获取缓存响应
*/
public synchronized String get(String key) {
// 简化实现:实际应该考虑缓存过期
return cache.get(key);
}
/**
* 添加缓存响应
*/
public synchronized void put(String key, String value) {
cache.put(key, value);
}
/**
* 清除缓存
*/
public synchronized void clear() {
cache.clear();
}
}
/**
* 带降级的聊天
*/
public static <T> ResponseResult<T> chatWithDegradation(
ChatModel model,
String message,
ResponseCache cache,
long cacheKey,
long maxLatencyMs,
String fallbackMessage
) throws Exception {
long startTime = System.currentTimeMillis();
try {
// 尝试从缓存获取
String cachedResponse = cache.get(String.valueOf(cacheKey));
if (cachedResponse != null) {
long latency = System.currentTimeMillis() - startTime;
logger.info("使用缓存响应,延迟: {}ms", latency);
return new ResponseResult<>(
(T) cachedResponse,
ResponseType.CACHED,
latency,
"从缓存返回"
);
}
// 执行请求
String response = model.chat(message);
long latency = System.currentTimeMillis() - startTime;
// 检查是否超过最大延迟
if (latency > maxLatencyMs) {
logger.warn("响应时间 {}ms 超过阈值 {}ms", latency, maxLatencyMs);
// 返回简化响应
return new ResponseResult<>(
(T) response.substring(0, Math.min(100, response.length())),
ResponseType.SIMPLIFIED,
latency,
"响应超时,返回简化版本"
);
}
// 缓存响应
cache.put(String.valueOf(cacheKey), response);
return new ResponseResult<>(
(T) response,
ResponseType.FULL,
latency,
"完整响应"
);
} catch (Exception e) {
long latency = System.currentTimeMillis() - startTime;
logger.error("请求失败: {}", e.getMessage());
// 返回降级响应
return new ResponseResult<>(
(T) fallbackMessage,
ResponseType.FALLBACK,
latency,
"请求失败,使用降级响应"
);
}
}
/**
* 使用示例
*/
public static void main(String[] args) {
String apiKey = System.getenv("OPENAI_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("未设置 OPENAI_API_KEY 环境变量");
return;
}
// 创建模型
ChatModel model = OpenAiChatModel.builder()
.apiKey(apiKey)
.modelName("gpt-4o-mini")
.timeout(Duration.ofSeconds(10))
.build();
// 创建缓存
ResponseCache cache = new ResponseCache(30000); // 30 秒缓存
// 多次请求
for (int i = 0; i < 5; i++) {
try {
ResponseResult<String> result = chatWithDegradation(
model,
"你好,第 " + (i + 1) + " 次问候",
cache,
i,
5000, // 5 秒阈值
"抱歉,我现在无法处理您的请求,请稍后再试。"
);
System.out.println("╔═════════════════════════════════════════════════════════╗");
System.out.println("║ 请求 " + (i + 1) + " 响应 ║");
System.out.println("╠═════════════════════════════════════════════════════════╣");
System.out.println("║ 类型: " + result.getResponseType());
System.out.println("║ 延迟: " + result.getLatencyMs() + "ms");
System.out.println("║ 消息: " + result.getMessage());
System.out.println("║ 响应: " +
result.getResponse().substring(0, Math.min(80, result.getResponse().length())));
System.out.println("╠───────────────────────────────────────────────────────────╣");
System.out.println("║ 完整响应: ║");
System.out.println("║ " + result.getResponse());
System.out.println("╚═════════════════════════════════════════════════════════╝");
System.out.println();
} catch (Exception e) {
System.err.println("错误: " + e.getMessage());
}
}
}
}
错误监控和告警
错误监控
Copy
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* 错误监控和告警
*/
public class ErrorMonitoring {
private static final Logger logger = LoggerFactory.getLogger(ErrorMonitoring.class);
/**
* 错误统计
*/
public static class ErrorStatistics {
private final String errorType;
private final String errorType;
private final AtomicLong count;
private final AtomicLong lastOccurrence;
private final Map<String, Long> errorDetails;
public ErrorStatistics(String errorType, String errorType) {
this.errorType = errorType;
this.errorType = errorType;
this.count = new AtomicLong(0);
this.lastOccurrence = new AtomicLong(0);
this.errorDetails = new ConcurrentHashMap<>();
}
/**
* 记录错误
*/
public void recordError(String details) {
count.incrementAndGet();
lastOccurrence.set(System.currentTimeMillis());
errorDetails.merge(details, 1L, Long::sum);
}
/**
* 获取统计
*/
public long getCount() { return count.get(); }
public long getLastOccurrence() { return lastOccurrence.get(); }
public Map<String, Long> getErrorDetails() { return new ConcurrentHashMap<>(errorDetails); }
}
/**
* 告警配置
*/
public static class AlertConfig {
private final long errorThreshold;
private final long timeWindowMs;
private final boolean enableEmailAlert;
private final boolean enableSlackAlert;
public AlertConfig(
long errorThreshold,
long timeWindowMs,
boolean enableEmailAlert,
boolean enableSlackAlert
) {
this.errorThreshold = errorThreshold;
this.timeWindowMs = timeWindowMs;
this.enableEmailAlert = enableEmailAlert;
this.enableSlackAlert = enableSlackAlert;
}
// Getters
public long getErrorThreshold() { return errorThreshold; }
public long getTimeWindowMs() { return timeWindowMs; }
public boolean isEnableEmailAlert() { return enableEmailAlert; }
public boolean isEnableSlackAlert() { return enableSlackAlert; }
}
/**
* 告警管理器
*/
public static class AlertManager {
private final AlertConfig config;
private final Map<String, ErrorStatistics> errorStats;
public AlertManager(AlertConfig config) {
this.config = config;
this.errorStats = new ConcurrentHashMap<>();
}
/**
* 记录错误
*/
public void recordError(String errorType, String errorType, String details) {
ErrorStatistics stats = errorStats.computeIfAbsent(errorType,
k -> new ErrorStatistics(errorType, errorType));
stats.recordError(details);
// 检查是否需要告警
checkAlert(errorType, stats);
}
/**
* 检查告警
*/
private void checkAlert(String errorType, ErrorStatistics stats) {
long timeSinceLastOccurrence = System.currentTimeMillis() - stats.getLastOccurrence();
// 检查错误数量阈值
if (stats.getCount() >= config.getErrorThreshold()) {
logger.error("错误告警: 错误类型 '{}' 在过去 {}ms 内发生了 {} 次",
errorType, config.getTimeWindowMs(), stats.getCount());
// 发送告警
sendAlert(errorType, stats.getCount(), stats.getErrorDetails());
}
}
/**
* 发送告警
*/
private void sendAlert(String errorType, long count, Map<String, Long> details) {
String message = String.format(
"错误告警\n" +
"错误类型: %s\n" +
"发生次数: %d\n" +
"时间: %s\n" +
"详细信息: %s",
errorType,
count,
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
details
);
logger.error("告警消息: {}", message);
// 发送邮件
if (config.isEnableEmailAlert()) {
sendEmailAlert(message);
}
// 发送 Slack
if (config.isEnableSlackAlert()) {
sendSlackAlert(message);
}
}
/**
* 发送邮件告警
*/
private void sendEmailAlert(String message) {
// 简化:实际应该使用 JavaMail 或其他邮件库
logger.info("发送邮件告警: {}", message);
}
/**
* 发送 Slack 告警
*/
private void sendSlackAlert(String message) {
// 简化:实际应该使用 Slack Web API
logger.info("发送 Slack 告警: {}", message);
}
/**
* 获取错误统计
*/
public Map<String, ErrorStatistics> getErrorStatistics() {
return new ConcurrentHashMap<>(errorStats);
}
/**
* 生成报告
*/
public String generateReport() {
StringBuilder report = new StringBuilder();
report.append("╔═════════════════════════════════════════════════════════╗\n");
report.append("║ 错误监控报告 ║\n");
report.append("╠═════════════════════════════════════════════════════════╣\n");
report.append("║ 错误类型总数: ").append(errorStats.size()).append("\n");
errorStats.forEach((errorType, stats) -> {
report.append("╠─────────────────────────────────────────────────────────╣\n");
report.append("║ 错误类型: ").append(errorType).append("\n");
report.append("║ 发生次数: ").append(stats.getCount()).append("\n");
report.append("║ 最后发生: ").append(
LocalDateTime.ofEpochMilli(stats.getLastOccurrence())
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
).append("\n");
report.append("║ 详细统计: ║\n");
stats.getErrorDetails().forEach((detail, count) -> {
report.append("║ - ").append(detail).append(": ")
.append(count).append(" 次\n");
});
});
report.append("╠═════════════════════════════════════════════════════════╣\n");
report.append("║ 时间: ").append(
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
).append("\n");
report.append("╚═════════════════════════════════════════════════════════╝\n");
return report.toString();
}
}
/**
* 使用示例
*/
public static void main(String[] args) {
// 创建告警配置
AlertConfig config = new AlertConfig(
10, // 10 次错误
60000, // 1 分钟内
true, // 启用邮件告警
true // 启用 Slack 告警
);
// 创建告警管理器
AlertManager alertManager = new AlertManager(config);
// 模拟错误
for (int i = 0; i < 15; i++) {
alertManager.recordError("API错误", "超时", "请求超时 " + i);
Thread.sleep(1000); // 1 秒间隔
}
// 生成报告
System.out.println(alertManager.generateReport());
}
}
测试代码示例
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;
import java.time.Duration;
import static org.mockito.Mockito.*;
/**
* 错误处理测试
*/
class ErrorHandlingTest {
@Test
void should_retry_on_timeout() {
// 创建 Mock 模型,模拟超时
ChatModel mockModel = mock(ChatModel.class);
int[] attempts = {0};
when(mockModel.chat(anyString()))
.thenAnswer(invocation -> {
attempts[0]++;
if (attempts[0] < 3) {
throw new java.net.SocketTimeoutException("Timeout");
}
return "Success";
});
// 执行重试
String result = ExponentialBackoffRetry.executeWithRetry(
"测试",
() -> mockModel.chat("test"),
ExponentialBackoffRetry.createDefaultRetryConfig()
);
// 验证
assertEquals(3, attempts[0]);
assertEquals("Success", result);
}
@Test
void should_not_retry_on_validation_error() {
// 创建 Mock 模型,模拟验证错误
ChatModel mockModel = mock(ChatModel.class);
when(mockModel.chat(anyString()))
.thenThrow(new IllegalArgumentException("Invalid input"));
// 执行重试
Exception exception = assertThrows(RuntimeException.class, () -> {
ExponentialBackoffRetry.executeWithRetry(
"测试",
() -> mockModel.chat("test"),
ExponentialBackoffRetry.createDefaultRetryConfig()
);
});
// 验证没有重试
assertTrue(exception.getCause() instanceof IllegalArgumentException);
}
@Test
void should_circuit_break_after_failures() {
CircuitBreaker circuitBreaker = new CircuitBreaker(
new CircuitBreakerConfig(
3,
Duration.ofSeconds(10),
Duration.ofSeconds(30),
2
)
);
CircuitBreaker.RetryOperation<String> operation = mock(CircuitBreaker.RetryOperation.class);
when(operation.execute())
.thenThrow(new RuntimeException("Failed"));
// 执行多次请求
for (int i = 0; i < 5; i++) {
try {
executeWithCircuitBreaker("测试", operation, circuitBreaker);
fail("应该抛出异常");
} catch (RuntimeException e) {
// 预期失败
}
}
// 验证断路器状态
assertEquals(CircuitBreaker.CircuitState.OPEN, circuitBreaker.getState());
}
@Test
void should_reset_after_success() {
CircuitBreaker circuitBreaker = new CircuitBreaker(
new CircuitBreakerConfig(
3,
Duration.ofSeconds(10),
Duration.ofSeconds(30),
2
)
);
// 先触发断路器
CircuitBreaker.RetryOperation<String> failOperation = mock(CircuitBreaker.RetryOperation.class);
when(failOperation.execute()).thenThrow(new RuntimeException("Failed"));
for (int i = 0; i < 3; i++) {
try {
executeWithCircuitBreaker("失败", failOperation, circuitBreaker);
} catch (Exception e) {
// 预期失败
}
}
assertEquals(CircuitBreaker.CircuitState.OPEN, circuitBreaker.getState());
// 重置
circuitBreaker.reset();
assertEquals(CircuitBreaker.CircuitState.CLOSED, circuitBreaker.getState());
// 成功请求
CircuitBreaker.RetryOperation<String> successOperation = mock(CircuitBreaker.RetryOperation.class);
when(successOperation.execute()).thenReturn("Success");
String result = executeWithCircuitBreaker("成功", successOperation, circuitBreaker);
assertEquals("Success", result);
assertEquals(CircuitBreaker.CircuitState.CLOSED, circuitBreaker.getState());
}
}
实践练习
练习 1:实现完整的错误处理系统
Copy
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.openai.OpenAiChatModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
/**
* 完整的错误处理系统
*/
public class ErrorHandlingSystem {
private static final Logger logger = LoggerFactory.getLogger(ErrorHandlingSystem.class);
/**
* 错误类型枚举
*/
public enum ErrorType {
NETWORK_ERROR("网络错误"),
API_ERROR("API 错误"),
VALIDATION_ERROR("验证错误"),
TIMEOUT_ERROR("超时错误"),
UNKNOWN_ERROR("未知错误");
private final String description;
ErrorType(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
/**
* 错误处理器接口
*/
public interface ErrorHandler {
boolean canHandle(Exception e);
ErrorResponse handle(Exception e);
}
/**
* 错误响应
*/
public static class ErrorResponse {
private final String code;
private final String message;
private final String type;
private final Map<String, Object> details;
private final long timestamp;
public ErrorResponse(String code, String message, ErrorType type, Map<String, Object> details) {
this.code = code;
this.message = message;
this.type = type.name();
this.details = details;
this.timestamp = System.currentTimeMillis();
}
public String getCode() { return code; }
public String getMessage() { return message; }
public String getType() { return type; }
public Map<String, Object> getDetails() { return details; }
public long getTimestamp() { return timestamp; }
}
/**
* 网络错误处理器
*/
public static class NetworkErrorHandler implements ErrorHandler {
@Override
public boolean canHandle(Exception e) {
return e instanceof java.net.SocketTimeoutException ||
e instanceof java.net.ConnectException ||
e instanceof java.io.IOException;
}
@Override
public ErrorResponse handle(Exception e) {
Map<String, Object> details = new ConcurrentHashMap<>();
details.put("original_message", e.getMessage());
if (e instanceof java.net.SocketTimeoutException) {
return new ErrorResponse(
"NETWORK_TIMEOUT",
"请求超时",
ErrorType.TIMEOUT_ERROR,
details
);
} else if (e instanceof java.net.ConnectException) {
return new ErrorResponse(
"CONNECTION_FAILED",
"无法连接到服务器",
ErrorType.NETWORK_ERROR,
details
);
} else {
return new ErrorResponse(
"NETWORK_ERROR",
"网络通信错误",
ErrorType.NETWORK_ERROR,
details
);
}
}
}
/**
* API 错误处理器
*/
public static class ApiErrorHandler implements ErrorHandler {
@Override
public boolean canHandle(Exception e) {
String message = e.getMessage();
return message != null && (
message.contains("401") ||
message.contains("429") ||
message.contains("500") ||
message.contains("503")
);
}
@Override
public ErrorResponse handle(Exception e) {
String message = e.getMessage();
if (message.contains("401")) {
return new ErrorResponse(
"AUTHENTICATION_FAILED",
"认证失败",
ErrorType.API_ERROR,
Map.of("status_code", 401)
);
} else if (message.contains("429")) {
return new ErrorResponse(
"RATE_LIMIT_EXCEEDED",
"请求频率过高",
ErrorType.API_ERROR,
Map.of("status_code", 429)
);
} else if (message.contains("503")) {
return new ErrorResponse(
"SERVICE_UNAVAILABLE",
"服务暂时不可用",
ErrorType.API_ERROR,
Map.of("status_code", 503)
);
} else if (message.contains("500")) {
return new ErrorResponse(
"INTERNAL_SERVER_ERROR",
"服务器内部错误",
ErrorType.API_ERROR,
Map.of("status_code", 500)
);
}
return new ErrorResponse(
"UNKNOWN_API_ERROR",
"未知的 API 错误",
ErrorType.API_ERROR,
Map.of("message", message)
);
}
}
/**
* 验证错误处理器
*/
public static class ValidationErrorHandler implements ErrorHandler {
@Override
public boolean canHandle(Exception e) {
return e instanceof IllegalArgumentException ||
e instanceof NullPointerException;
}
@Override
public ErrorResponse handle(Exception e) {
return new ErrorResponse(
"VALIDATION_ERROR",
e.getMessage(),
ErrorType.VALIDATION_ERROR,
Map.of("exception_type", e.getClass().getSimpleName())
);
}
}
/**
* 默认错误处理器
*/
public static class DefaultErrorHandler implements ErrorHandler {
@Override
public boolean canHandle(Exception e) {
return true;
}
@Override
public ErrorResponse handle(Exception e) {
Map<String, Object> details = new ConcurrentHashMap<>();
details.put("exception_type", e.getClass().getName());
details.put("message", e.getMessage());
details.put("cause", e.getCause() != null ? e.getCause().getMessage() : null);
return new ErrorResponse(
"INTERNAL_ERROR",
"内部错误",
ErrorType.UNKNOWN_ERROR,
details
);
}
}
/**
* 错误处理管理器
*/
public static class ErrorHandlingManager {
private final Map<ErrorType, ErrorHandler> handlers;
public ErrorHandlingManager() {
this.handlers = new ConcurrentHashMap<>();
// 注册处理器
registerHandler(ErrorType.NETWORK_ERROR, new NetworkErrorHandler());
registerHandler(ErrorType.API_ERROR, new ApiErrorHandler());
registerHandler(ErrorType.VALIDATION_ERROR, new ValidationErrorHandler());
registerHandler(ErrorType.TIMEOUT_ERROR, new NetworkErrorHandler()); // 复用
registerHandler(ErrorType.UNKNOWN_ERROR, new DefaultErrorHandler());
}
/**
* 注册错误处理器
*/
public void registerHandler(ErrorType type, ErrorHandler handler) {
handlers.put(type, handler);
logger.info("注册错误处理器: {} -> {}", type, handler.getClass().getSimpleName());
}
/**
* 处理错误
*/
public ErrorResponse handleError(Exception e) {
for (ErrorHandler handler : handlers.values()) {
if (handler.canHandle(e)) {
logger.error("使用处理器处理错误: {}", handler.getClass().getSimpleName());
return handler.handle(e);
}
}
logger.warn("未找到合适的错误处理器,使用默认处理器");
return new DefaultErrorHandler().handle(e);
}
}
/**
* 使用示例
*/
public static void main(String[] args) {
String apiKey = System.getenv("OPENAI_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("未设置 OPENAI_API_KEY 环境变量");
return;
}
// 创建错误处理管理器
ErrorHandlingManager manager = new ErrorHandlingManager();
// 测试各种错误类型
System.out.println("╔═══════════════════════════════════════════════════════════╗");
System.out.println("║ 错误处理系统示例 ║");
System.out.println("╠══════════════════════════════════════════════════════════╣");
System.out.println();
// 测试 1:网络错误
System.out.println("=== 测试 1: 网络错误 ===");
Exception networkError = new java.net.SocketTimeoutException("Connection timeout");
ErrorResponse networkResponse = manager.handleError(networkError);
displayErrorResponse(networkResponse);
System.out.println();
// 测试 2:验证错误
System.out.println("=== 测试 2: 验证错误 ===");
Exception validationError = new IllegalArgumentException("参数不能为空");
ErrorResponse validationResponse = manager.handleError(validationError);
displayErrorResponse(validationResponse);
System.out.println();
// 测试 3:API 错误
System.out.println("=== 测试 3: API 错误 ===");
Exception apiError = new RuntimeException("API returned 429 Too Many Requests");
ErrorResponse apiErrorResponse = manager.handleError(apiError);
displayErrorResponse(apiErrorResponse);
System.out.println();
// 测试 4:未知错误
System.out.println("=== 测试 4: 未知错误 ===");
Exception unknownError = new RuntimeException("Something went wrong");
ErrorResponse unknownErrorResponse = manager.handleError(unknownError);
displayErrorResponse(unknownErrorResponse);
}
/**
* 显示错误响应
*/
private static void displayErrorResponse(ErrorResponse response) {
System.out.printf("║ 代码: %s ║\n", response.getCode());
System.out.printf("║ 类型: %s ║\n", response.getType());
System.out.printf("║ 消息: %s ║\n", response.getMessage());
System.out.printf("║ 时间: %s ║\n",
java.time.LocalDateTime.ofEpochMilli(response.getTimestamp())
.format(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME));
System.out.printf("║ 详情: ║\n");
response.getDetails().forEach((key, value) -> {
System.out.printf("║ %s: %s ║\n", key, value);
});
System.out.println("╠─────────────────────────────────────────────────────────────╣");
}
}
总结
本章要点
-
错误处理重要性
- 提高应用可靠性
- 改善用户体验
- 便于问题排查
-
错误类型
- 网络错误
- API 错误
- 验证错误
- 超时错误
-
重试策略
- 指数退避
- 线性退避
- 固定延迟
- 随机抖动
-
断路器模式
- 保护服务免受过载
- 快速失败
- 自动恢复
- 半开状态测试
-
最佳实践
- 合理配置重试次数和延迟
- 正确设置断路器阈值
- 记录错误和发送告警
- 实现优雅降级
下一步
在下一章节中,我们将学习:- Spring Boot 集成
- Web 应用开发
- REST API 创建
- WebSocket 实时通信
- 安全和认证
常见问题
Q1:如何确定重试次数? A:考虑因素:- 错误类型(网络错误需要更多重试)
- 业务要求(重要操作可以重试更多次)
- 成本控制(每次重试都消耗 Token)
- 用户容忍度(重试过多次会影响体验)
- 打开:失败次数达到阈值
- 半开:重置时间过后
- 关闭:半开状态下多次成功后
- 使用缓存结果
- 返回简化响应
- 返回默认响应
- 使用备用服务
- 记录所有错误
- 统计错误频率
- 设置错误告警
- 分析错误趋势
- 生成错误报告
- 记录详细信息
- 返回通用错误消息
- 通知开发团队
- 收集错误上下文
- 提供技术支持信息
参考资料
版权归属于 LangChat Team
官网:https://langchat.cn

