Kafka入门(三)Kafka在Web应用中的实践:基于FastAPI实现图片OCR

本文将会介绍Kafka 在 Web 应用中的实践,也就是Kafka作为消息队列,FastAPI作为Web框架来实现图片上传、OCR识别及结果查看。

在文章Kafka入门(一)Kafak介绍、安装与简单使用中,笔者介绍了Kafka的基础概念、使用场景、安装以及如何简单实用。

在本文中,笔者将会把FastAPI与Kafka结合使用,Kafka作为消息队列,FastAPI作为Web框架来实现图片上传、OCR识别及结果查看。主要功能如下:

  1. 上传图片到FastAPI,FastAPI将图片发送到Kafka,同时将图片信息写入数据库,状态为未完成
  2. Kafka消费者消费图片,调用模拟的OCR识别结果,将结果更新至数据库,状态为已完成
  3. FastAPI查询数据库,返回识别结果

流程图如下:

主流程示意图

项目代码

前端部分

项目的前端部分采用前端页面采用BootStrap实现,位于static目录,主要功能如下:

  • 批量上传图片页面: upload.html
  • 查询图片识别结果页面: results.html

前端代码使用使用OpenAI GPT-4o模型生成,代码略作修改,功能完整。

其中批量上传图片页面upload.html的HTML代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>批量上传图片</title>

<!-- 引入 Bootstrap 样式 -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body class="container mt-4">

<h2 class="text-center mb-4">批量上传图片(OCR 识别)</h2>

<!-- 文件上传区域 -->
<div class="mb-3">
<input type="file" class="form-control" id="fileInput" multiple>
</div>

<!-- 按钮区域 -->
<div class="d-flex gap-2">
<button class="btn btn-primary flex-fill" onclick="uploadImages()">📤 上传图片</button>
<button class="btn btn-outline-danger flex-fill" onclick="clearFiles()">🗑 清空图片</button>
</div>

<!-- 进度条 -->
<div id="progressContainer" class="d-none mt-3">
<div class="progress">
<div id="progressBar" class="progress-bar progress-bar-striped progress-bar-animated" style="width: 0%"></div>
</div>
<p id="progressText" class="text-center mt-2">等待上传...</p>
</div>

<!-- 这里不直接写 href,使用 JavaScript 控制 -->
<button class="btn btn-success w-100 mt-3" onclick="redirectToResults()">🔍 查看 OCR 识别结果</button>

<script>
/** 📤 上传图片并调用后端 API */
async function uploadImages() {
let files = document.getElementById("fileInput").files;
if (files.length === 0) {
alert("❌ 请选择至少一张图片!");
return;
}

let progressContainer = document.getElementById("progressContainer");
let progressBar = document.getElementById("progressBar");
let progressText = document.getElementById("progressText");

progressContainer.classList.remove("d-none");
progressBar.style.width = "0%";
progressText.innerText = `正在上传 0 / ${files.length}`;

for (let i = 0; i < files.length; i++) {
let formData = new FormData();
formData.append("file", files[i]);

let response = await fetch("http://127.0.0.1:8000/upload/", {
method: "POST",
body: formData
});

let result = await response.json();
console.log(`上传成功: ${result.filename}`);

// 更新进度条
let percentComplete = Math.round(((i + 1) / files.length) * 100);
progressBar.style.width = percentComplete + "%";
progressText.innerText = `正在上传 ${i + 1} / ${files.length}`;
}

alert("✅ 所有图片上传完成!");
progressText.innerText = "上传完成!";
}

/** 🗑 清空已选择的文件 */
function clearFiles() {
let fileInput = document.getElementById("fileInput");
fileInput.value = ""; // 清空文件输入框
document.getElementById("progressContainer").classList.add("d-none"); // 隐藏进度条
}

/** 🔄 确保跳转到 /static/results.html,避免路径问题 */
function redirectToResults() {
window.location.href = "/static/results.html"; // 使用**绝对路径**方式跳转
}
</script>

</body>
</html>

查询图片识别结果页面results.html的HTML代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>OCR 结果查看</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body class="container mt-4">

<h2 class="text-center">OCR 识别结果</h2>

<div class="input-group mb-3">
<input type="text" id="searchInput" class="form-control" placeholder="按文件名搜索">
<button class="btn btn-outline-primary" onclick="fetchResults()">查询</button>
</div>

<div class="form-check mb-3">
<input type="checkbox" class="form-check-input" id="sortByTime">
<label class="form-check-label" for="sortByTime"> 按时间排序</label>
</div>

<table class="table table-striped table-bordered">
<thead class="table-dark">
<tr>
<th>UUID</th>
<th>文件名</th>
<th>图片</th>
<th>OCR 结果</th>
<th>开始时间</th>
<th>状态</th>
<th>消耗时间</th>
</tr>
</thead>
<tbody id="resultsTableBody"></tbody>
</table>

<a href="upload.html" class="btn btn-primary w-100 mt-3">返回上传页面</a>

<script>
async function fetchResults() {
let searchText = document.getElementById("searchInput").value;
let sortByTime = document.getElementById("sortByTime").checked;
let url = `http://127.0.0.1:8000/results/?sort_by_time=${sortByTime}`;

if (searchText) {
url += `&filename=${searchText}`;
}

let response = await fetch(url);
let results = await response.json();

let tableBody = document.getElementById("resultsTableBody");
tableBody.innerHTML = "";

results.forEach(result => {
let row = `<tr>
<td>${result.uuid}</td>
<td>${result.filename}</td>
<td><a href="${result.filepath}" target="_blank">查看图片</a></td>
<td>${result.ocr_text}</td>
<td>${result.start_time}</td>
<td>${result.status}</td>
<td>${result.elapsed_time.toFixed(2)} 秒</td>
</tr>`;
tableBody.innerHTML += row;
});
}

window.onload = fetchResults;
</script>

</body>
</html>

前端代码使用使用OpenAI GPT-4o模型生成,代码略作修改,功能完整。

后端部分

后端部分,Web框架采用FastAPI,数据库采用MySQL,消息队列采用Kafka。

在本地启动Kafka后,在Kafka中创建名为ocr-topic的topic。在本地启动MySQL,创建数据库ocr

基础配置文件config.py如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
# -*- coding: utf-8 -*-

# MySQL 配置
MYSQL_USERNAME = "root"
MYSQL_PASSWORD = "root"
MYSQL_HOST = "localhost"
MYSQL_PORT = "3306"
MYSQL_DATABASE = "ocr_db"

# Kafka 配置
KAFKA_HOST = "localhost"
KAFKA_PORT = "9092"
KAFKA_TOPIC = "ocr-topic"

MySQL操作部分代码(models.py)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- coding: utf-8 -*-
from sqlalchemy import Column, Integer, String, Float, create_engine, DATETIME
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import config

# 定义 MySQL 连接 URL
DATABASE_URL = f"mysql+pymysql://{config.MYSQL_USERNAME}:{config.MYSQL_PASSWORD}@{config.MYSQL_HOST}:{config.MYSQL_PORT}/{config.MYSQL_DATABASE}"

# 连接 MySQL
engine = create_engine(DATABASE_URL, echo=True)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


# 定义 OCR 结果的数据库表
class OCRResult(Base):
__tablename__ = "ocr_results"

id = Column(Integer, primary_key=True, index=True, autoincrement=True)
uuid = Column(String(255), unique=True, index=True)
filename = Column(String(255), index=True)
filepath = Column(String(512))
ocr_text = Column(String(5000)) # OCR 处理可能会返回较长文本
start_time = Column(DATETIME) # 记录开始时间
status = Column(Integer) # 0-未完成;1-已完成
elapsed_time = Column(Float) # 记录消耗时间, 单位秒


if __name__ == "__main__":
# 确保创建表
Base.metadata.create_all(bind=engine)

FastAPI Web代码(main.py)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
import os
import uuid
import time
from datetime import datetime as dt
from fastapi import FastAPI, File, UploadFile, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from models import SessionLocal, OCRResult
from starlette.responses import FileResponse
from fastapi.staticfiles import StaticFiles

from config import KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}')

app = FastAPI()

# 允许前端访问 API
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

STATIC_FOLDER = "static"
UPLOAD_FOLDER = f"{STATIC_FOLDER}/uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(STATIC_FOLDER, exist_ok=True)

# **提供静态资源**
app.mount("/static", StaticFiles(directory=STATIC_FOLDER), name="static")


# **首页 `/` 自动打开上传页面**
@app.get("/")
async def serve_homepage():
return FileResponse("static/upload.html")


# 依赖项:数据库会话管理
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()


# **📌 上传文件并 OCR 识别**
@app.post("/upload/")
async def upload_image(file: UploadFile = File(...), db: Session = Depends(get_db)):
start_time = time.time()

# **生成唯一 ID 并保存文件**
file_extension = file.filename.split(".")[-1]
unique_id = str(uuid.uuid4()) # 生成 UUID 作为文件唯一标识
save_path = os.path.join(UPLOAD_FOLDER, f"{unique_id}.{file_extension}")

with open(save_path, "wb") as image_file:
image_file.write(file.file.read())

# **使用 Tesseract OCR 识别图片文字**
# image = Image.open(save_path)
# ocr_text = pytesseract.image_to_string(image)

elapsed_time = time.time() - start_time # 计算处理时间

# **存储 OCR 结果到数据库**
db_result = OCRResult(
uuid=unique_id,
filename=file.filename,
filepath=save_path,
ocr_text=None,
start_time=dt.now(),
status=0,
elapsed_time=0.0
)
db.add(db_result)
db.commit()

# 将uuid和文件路径送入至Kafka队列,作为生产者
producer.send(KAFKA_TOPIC, key=unique_id.encode(), value=save_path.encode())

return {
"uuid": unique_id,
"filename": file.filename,
"ocr_text": None,
"elapsed_time": elapsed_time
}


# **📌 获取所有 OCR 结果**
@app.get("/results/")
def get_results(
db: Session = Depends(get_db),
filename: str = Query(None, title="文件名", description="搜索 OCR 记录的文件名"),
sort_by_time: bool = Query(False, title="按时间排序", description="是否按时间倒序排序")
):
query = db.query(OCRResult)

# **🔍 文件名搜索**
if filename:
query = query.filter(OCRResult.filename.contains(filename))

# **🕒 按时间倒序**
if sort_by_time:
query = query.order_by(OCRResult.start_time.desc())

results = query.all()

return [
{
"uuid": result.uuid,
"filename": result.filename,
"filepath": f"uploads/{os.path.basename(result.filepath)}",
"ocr_text": result.ocr_text,
"start_time": result.start_time.strftime("%Y-%m-%d %H:%M:%S"), # 🎯 格式化时间
"status": "已完成" if result.status else "进行中",
"elapsed_time": result.elapsed_time,
}
for result in results
]


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
# 启动 FastAPI 服务
# uvicorn main:app --reload

上传图片使用/upload这个API,里面实现的操作如下:

  • 批量上传图片,图片保存至static/upload目录
  • 将图片基础信息写入MySQL,但此时未进行OCR识别,状态为未完成
  • 将图片信息写入Kafka,作为生产者,等待消费者端进行处理

此时,我们还需要消费者来消费Kakfa中传入的消息,代码(consumer.py)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-

import time
import string
from random import random, choice, randint
from kafka import KafkaConsumer

from models import SessionLocal, OCRResult
from config import KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC


db = SessionLocal()

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}', group_id="ocr-group")
for message in consumer:
start_time = time.time()
file_uuid = message.key.decode()
file_path = message.value.decode()
time.sleep(random() * 10) # 模拟 OCR 识别耗时
# 将模拟的 OCR 结果更新至数据库,更新的筛选条件是 uuid
chars = randint(3, 20)
result = []
for _ in range(chars):
result.append(choice(string.digits + string.ascii_letters))

db.query(OCRResult).filter(OCRResult.uuid == file_uuid).update(
{
"ocr_text": "".join(result),
"status": 1,
"elapsed_time": time.time() - start_time
}
)
db.commit()

在消费者端,笔者模拟了模拟的OCR过程(随机睡眠过程 + 随机字符生成),待OCR识别完毕后更新该图片在MySQL中的完成状态、识别结果等信息。

项目效果展示

  1. 批量上传图片页面

  1. 未识别前的MySQL数据库中的数据

  1. 未识别前的OCR结果查看页面

  1. 识别后的MySQL数据库中的数据

  1. 识别后的OCR结果查看页面

  1. Kafka中的消息查看

  1. 查看图片

总结

本文中的项目已开源至Github,网址为:https://github.com/percent4/kafka-fastapi-ocr-service 。

本文算是笔者使用Kafka做实战项目的开端,也对Kakfa的原理和使用方法有了更为深入的理解。

希望后续能在这方面有更丰富的经验,感谢阅读~

欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。


Kafka入门(三)Kafka在Web应用中的实践:基于FastAPI实现图片OCR
https://percent4.github.io/Kafka入门(三)Kafka在Web应用中的实践:基于FastAPI实现图片OCR/
作者
Jclian91
发布于
2025年2月21日
许可协议