

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 最佳实践
<a name="semantic-caching-best-practices"></a>

## 选择可以缓存的数据
<a name="semantic-caching-bp-choosing-data"></a>

语义缓存非常适合响应相对稳定的重复查询，而实时或高度动态的响应通常不适合缓存。

使用从现有应用程序上下文（例如产品 ID、类别、区域或用户细分）派生的标签和数字筛选器来决定哪些查询和响应符合缓存条件并提高缓存命中率的相关性。

## 相似度阈值调整
<a name="semantic-caching-bp-threshold"></a>

相似度阈值控制缓存命中率和答案质量之间的权衡。为您的用例选择一个在成本节省和准确性之间取得平衡的阈值：


| Threshold | 命中率 | 质量风险 | 适用于 | 
| --- | --- | --- | --- | 
| 0.95（严格） | 低（约 25%） | 非常低 | 医疗、法律、金融应用 | 
| 0.90（中等） | 中等（约 55%） | 低 | 普通聊天机器人 | 
| 0.80（平衡） | 最高（约 75%） | 低-中 | 常见问题机器人、IT 支持 | 
| 0.75（放松） | 非常高（约 90%） | 中 | High-volume 重复查询 | 

**重要**  
从较高的阈值 (0.90—0.95) 开始，然后在监控精度的同时逐渐降低该阈值。使用 A/B 测试来找到工作负载的最佳平衡。

## 独立查询与对话
<a name="semantic-caching-bp-standalone-vs-conversations"></a>
+ **对于独立查询**-直接对用户查询文本应用语义缓存。
+ **对于多回合对话** — 首先使用对话记忆来检索回答当前回合所需的关键事实和最新消息。然后将语义缓存应用于当前用户消息和检索到的上下文的组合，而不是嵌入整个原始对话框。

## 设置缓存失效期
<a name="semantic-caching-bp-ttl"></a>

使用 TTL 控制缓存响应在缓存未命中时重新生成缓存响应之前的时间。


| 数据类型 | 推荐的 TTL | 理由 | 
| --- | --- | --- | 
| 静态事实（文档、政策） | 24 小时 | 事实很少改变 | 
| 产品信息 | 12—24 小时 | 大多数目录每天更新 | 
| 一般助理的回应 | 1—4 小时 | 在新鲜度与命中率之间取得平衡 | 
| Real-time 数据（价格、库存） | 5—15 分钟 | 数据经常更改 | 
| 对话背景 | 30 分钟 | Session-scoped，短暂的 | 

```
# Set TTL with random jitter to spread out cache invalidations
import random

base_ttl = 82800  # ~23 hours
jitter = random.randint(0, 3600)  # Up to 1 hour of jitter
valkey_client.expire(cache_key, base_ttl + jitter)
```

**提示**  
设置与您的应用程序用例以及数据或模型输出更改频率相匹配的 TTL。较长的 TTL 会增加缓存命中率，但会增加答案过时的风险。较短的 TTL 可以使响应更新鲜，但缓存命中率较低，并且需要更多的 LLM 推理。

## 监控和成本跟踪
<a name="semantic-caching-bp-monitoring"></a>

跟踪缓存性能指标以随着时间的推移优化语义缓存：

```
def record_cache_event(valkey_client, event_type: str):
    """Track cache hits and misses using atomic counters."""
    valkey_client.incr(f"cache:metrics:{event_type}")

    # Also track hourly for time-series analysis
    from datetime import datetime
    hour_key = datetime.now().strftime("%Y%m%d%H")
    counter_key = f"cache:metrics:{event_type}:{hour_key}"
    valkey_client.incr(counter_key)
    valkey_client.expire(counter_key, 86400 * 7)  # Keep 7 days

def get_cache_stats(valkey_client) -> dict:
    """Get current cache performance metrics."""
    hits = int(valkey_client.get("cache:metrics:hit") or 0)
    misses = int(valkey_client.get("cache:metrics:miss") or 0)
    total = hits + misses
    hit_rate = hits / total if total > 0 else 0

    avg_cost_per_call = 0.015  # Example: ~$0.015 per LLM call
    savings = hits * avg_cost_per_call

    return {
        "total_requests": total,
        "hits": hits,
        "misses": misses,
        "hit_rate": round(hit_rate, 3),
        "estimated_savings_usd": round(savings, 2),
    }
```

## 内存管理
<a name="semantic-caching-bp-memory"></a>
+ **设置 maxmemory 策略 — 在 ElastiCache 集群`maxmemory-policy allkeys-lru`上进行配置，以便在集群达到内存**限制时自动移出最近最少使用的缓存条目。
+ **容量规划**-每个缓存条目通常需要大约 4—6 KB（嵌入维度 × 4 字节 \+ 查询文本 \+ 响应文本）。一个 1 GB 的 ElastiCache 实例可以存储大约 170,000 个缓存条目。
+ **对陈旧数据使用缓存失效 — 当基础数据**发生变化时，使用文本搜索来查找相关的缓存条目并使其失效：

  ```
  def invalidate_by_topic(valkey_client, topic_keyword: str):
      """Remove cached entries matching a topic after a data update."""
      results = valkey_client.execute_command(
          "FT.SEARCH", "semantic_cache",
          f"@query:{topic_keyword}",
          "NOCONTENT",  # Only return keys, not fields
      )
  
      if results[0] > 0:
          keys = results[1:]
          for key in keys:
              valkey_client.delete(key)
          print(f"Invalidated {len(keys)} cached entries for '{topic_keyword}'")
  ```