NLP(七十四)使用LangChain和异步Web框架实现接口流式输出

介绍

openai接口已经支持流式调用,再结合Web框架的流式响应功能,不难完成流式输出功能。

但LangChain对openai接口进行了深度包装,流式输出需要进行回调(callback)。LangChain的流式回调类为StreamingStdOutCallbackHandler,此为终端流式输出,不支持接口流式输出。

如果想要对LangChain的回答进行Web端流式输出,网络上有不少人已给出解决方案,大多数方法为继承BaseCallbackHandler类,进行改造:通常方法是借助队列,将新生成的token送入队列,在回复答案(另起一个新的线程)的同时,进行队列元素的获取,从而实现接口流式输出。

参考其中一种解决方案:https://gist.github.com/python273/563177b3ad5b9f74c0f8f3299ec13850 .

本文的创新之处在于,借助异步Web框架sanic和FastAPI, 和LangChain中的AsyncIteratorCallbackHandler,使用异步方法来实现调用LangChain,实现接口流式输出功能。

Sanic框架框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: sanic_langchain_stream.py
# @time: 2023/9/19 18:18
# sanic==23.6.0
import asyncio
from sanic import Sanic
from sanic.response import text, json, ResponseStream

from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler


app = Sanic("benchmark")


@app.route("/")
async def index(request):
return text("hello")


@app.route("/test", methods=["POST"])
async def answer(request):
content = request.json["content"]
return json({"text": content})


@app.route("/csv")
async def test(request):
async def sample_streaming_fn(response):
await response.write("foo,")
await response.write("bar")

return ResponseStream(sample_streaming_fn, content_type="text/csv")


@app.route("/answer/async", methods=["POST"])
async def answer_async(request):
content = request.json["content"]

async def predict(response):
handler = AsyncIteratorCallbackHandler()
model_message = [HumanMessage(content=content)]
chat = ChatOpenAI(streaming=True,
callbacks=[handler],
temperature=0,
openai_api_key="")
asyncio.create_task(chat.apredict_messages(model_message))
async for token in handler.aiter():
await response.write(f"data: {token}\n\n")

return ResponseStream(predict, content_type="text/event-stream")


if __name__ == "__main__":
app.run(host="0.0.0.0", port=3000, debug=False, access_log=True)

sanic.gif

FastAPI框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: fastapi_langchain_stream.py
# @time: 2023/9/20 17:36
# fastapi==0.101.1
import uvicorn
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.responses import StreamingResponse, JSONResponse

from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

app = FastAPI(description="langchain_streaming")


class Item(BaseModel):
text: str


class Question(BaseModel):
text: str


async def fake_video_streamer():
for i in range(10):
yield b"some fake video bytes\n"


@app.get("/")
async def main():
return StreamingResponse(fake_video_streamer())


@app.post("/test")
async def test(item: Item):
return JSONResponse({"content": item.text})


@app.post("/answer/async")
async def answer_async(q: Question):
content = q.text

async def predict():
handler = AsyncIteratorCallbackHandler()
model_message = [HumanMessage(content=content)]
chat = ChatOpenAI(streaming=True,
callbacks=[handler],
temperature=0,
openai_api_key="sk-xxx")
asyncio.create_task(chat.apredict_messages(model_message))
async for token in handler.aiter():
yield f"data: {token}\n\n"

return StreamingResponse(predict())


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。


NLP(七十四)使用LangChain和异步Web框架实现接口流式输出
https://percent4.github.io/NLP(七十四)使用LangChain和异步Web框架实现接口流式输出/
作者
Jclian91
发布于
2024年1月10日
许可协议