NLP(一百一十四)使用Langfuse实现RAG流程的可视化追踪

本文将会介绍如何使用Langfuse来实现对RAG整体流程的可视化追踪(Tracing)。

在文章NLP(一百一十三)使用Langfuse提升LLM和Agents的可观测性中,笔者介绍了如何使用Langfuse来提升大模型和Agents的可观测性。

在文章NLP(六十九)智能文档助手升级,笔者使用RAG流程构建了智能文档助手,支持对文档内容进行提问。

在本文中,笔者将会介绍如何使用Langfuse来实现对RAG整体流程的可视化追踪(Tracing),这样我们就能很方便地追踪整体RAG流程,并观测到每个步骤中的输入输出及参数等信息。

RAG流程

笔者首先介绍了文档智能助手的整体流程,采用RAG框架:

  • 关键词召回: 对用户query进行ElasticSeach检索
  • 向量召回: 对用户query进行向量嵌入(embedding),并使用Mivlus实现向量检索
  • 召回融合(Retrieve): 采用RRF算法
  • 重排(Rerank): 使用Cohere的Rerank模型实现召回重排
  • 回答(Generation): 使用LLM进行回复

RAG追踪

我们将会分步骤来介绍如何使用Langfuse来实现对RAG流程的追踪(tracing)。

  • 导入模块
1
2
3
4
5
6
7
8
9
10
11
import os
import cohere
from uuid import uuid4
from dotenv import load_dotenv
from openai import OpenAI
from langfuse import Langfuse
from operator import itemgetter

from utils.db_client import es_client, milvus_client

load_dotenv('../config/.env')

在导入模块的同时,引入ElasticSeach和Milvus客户端并完成连接,数据已经在前期导入,细节可参考文章NLP(六十九)智能文档助手升级

同时,加载.env文件,设置好相关API key.

  • 初始化Langfuse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class DocumentQA(object):
def __init__(self, query):
self.query = query
self.trace = self.init_langfuse_trace()

def init_langfuse_trace(self):
trace_id = str(uuid4())
print(f"trace_id: {trace_id}")
trace = langfuse.trace(
name="RAG Pipeline",
user_id="user_123",
id=trace_id
)
trace.span(
name="User Query",
input=self.query,
)
return trace

这里采用DocumentQA,并初始化Langfuse,使用trace_id进行追踪。

  • ES检索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def _es_retrieval(self):
result = []
# 查询数据(全文搜索)
dsl = {
'query': {
'match': {
'content': self.query
}
},
"size": 5
}
search_result = es_client.search(index='docs', body=dsl)
if search_result['hits']['hits']:
result = [
{
"content": hit['_source']['content'],
"source": hit['_source']['source']
}
for hit in search_result['hits']['hits']
]
self.trace.span(
name="ES Retrieval",
input=self.query,
output=result
)
return result
  • Milvus检索

先使用OpenAI Embedding模型获取用户query的向量嵌入,并使用MIlvus获取最相似文本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def _get_text_embedding(self):
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
embedding = client.embeddings.create(
model="text-embedding-ada-002",
input=self.query
)
self.trace.generation(
name="OpenAI Text Embedding",
model="text-embedding-ada-002",
input=self.query,
output=len(embedding.data[0].embedding),
usage_details=embedding.usage
)
return embedding.data[0].embedding

# Milvus检索
def _milvus_retrieval(self):
# milvus search content
vectors_to_search = [self._get_text_embedding()]
# 通过嵌入向量相似度获取相似文本
search_params = {
"metric_type": "IP",
"params": {"nprobe": 5},
}
result = milvus_client.search(vectors_to_search,
"embeddings",
search_params,
limit=5,
output_fields=["text", "source"])
# filter by similarity score
result = [
{
"content": _.entity.get('text'),
"source": _.entity.get('source'),
"score": dist
}
for _, dist in zip(result[0], result[0].distances) if dist > 0.5
]
self.trace.span(
name="Milvus Retrieval",
input=self.query,
output=result
)
return result
  • 召回融合

采用RRF(Reciprocal Rank Fusion)算法对上述两种召回方式进行结果融合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# sort the es and milvus retrieval results by rrf
def retrieval(
self
) -> list[dict[str, float]]:
"""
Sort the ES and Milvus retrieval results by RRF.
:return:
"""
es_result = [_["content"] for _ in self._es_retrieval()]
milvus_result = [_["content"] for _ in self._milvus_retrieval()]
doc_lists = [es_result, milvus_result]
# Create a union of all unique documents in the input doc_lists
all_documents = set()
for doc_list in doc_lists:
for doc in doc_list:
all_documents.add(doc)

# Initialize the RRF score dictionary for each document
rrf_score_dic = {doc: 0.0 for doc in all_documents}

# Calculate RRF scores for each document
weights = [0.5, 0.5]
c = 60
for doc_list, weight in zip(doc_lists, weights):
for rank, doc in enumerate(doc_list, start=1):
rrf_score = weight * (1 / (rank + c))
rrf_score_dic[doc] += rrf_score

# Sort documents by their RRF scores in descending order
sorted_documents = sorted(rrf_score_dic.items(), key=itemgetter(1), reverse=True)
# get top 5 documents
result = []
for i in range(len(sorted_documents)):
text, score = sorted_documents[i]
result.append({"text": text, "score": score})
# add langfuse tracing
self.trace.span(
name="Retrieval RRF",
input={"es_result": es_result, "milvus_result": milvus_result},
output=result
)
return result
  • 重排

使用Cohere的Rerank模型对上述融合召回结果进行精排。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def rerank(self, before_rerank_contents):
documents = [_["text"] for _ in before_rerank_contents]
cohere_client = cohere.Client(os.getenv("COHERE_API_KEY", ""))
results = cohere_client.rerank(model="rerank-multilingual-v2.0",
query=self.query,
documents=documents,
top_n=5)
after_rerank_contents = []
for hit in results:
after_rerank_contents.append({"text": hit.document['text'], "score": hit.relevance_score})
self.trace.span(
name="Cohere Rerank",
input=before_rerank_contents,
output=after_rerank_contents
)
return after_rerank_contents
  • LLM回复

使用大模型,基于上述重排后的召回结果,回答用户问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# answer by OpenAI
def answer(self):
# retrieve
before_rerank_contents = self.retrieval()
after_rerank_contents = self.rerank(before_rerank_contents)
# construct prompt
prompt = "Based on the following documents, answer the question: \n"
for i, text_dict in enumerate(after_rerank_contents):
prompt += f"document {i+1}: {text_dict['text']}\n"
prompt += f"\nHere is the user's question: {self.query}\nPlease answer the question."
# chat completion
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
max_tokens = 300
temperature = 0.5
result = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=temperature
)
response = result.choices[0].message.content
# langfuse tracing
self.trace.generation(
name="OpenAI Generation",
input=prompt,
output=response,
model_parameters={"max_tokens": max_tokens, "temperature": temperature},
usage_details=result.usage
)
return response

Langfuse Tracing

上述的RAG流程只是为了演示如何使用Langfuse对RAG流程进行追踪,实际上,上述的RAG流程有许多可以优化的地方。

我们以问题格里芬发表演讲时说了什么?为例,看看Langfuse中的RAG流程的可观测性。运行代码如下:

1
2
3
4
if __name__ == '__main__':
doc_qa = DocumentQA("格里芬发表演讲时说了什么?")
answer = doc_qa.answer()
print(answer)

整体的Trace如下:

  • ES召回

  • 获取query向量

  • 向量召回

  • 召回融合

  • 重排

  • LLM回答

总结

本文给出的Langfuse追踪RAG流程的代码已经开源,Github网址为: https://github.com/percent4/llm_4_doc_qa .

后续笔者将会持续关注Langfuse及更多工具的使用~

欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。


NLP(一百一十四)使用Langfuse实现RAG流程的可视化追踪
https://percent4.github.io/NLP(一百一十四)使用Langfuse实现RAG流程的可视化追踪/
作者
Jclian91
发布于
2025年4月27日
许可协议