使用ORM框架操作MySQL数据库

本文将会介绍如何使用Python中的ORM框架(sqlalchemysqlmodel)来操作MySQL数据库。

在文章使用SQLAlchemy操作MySQL中,笔者介绍了如何使用sqlalchemy模块来操作MySQL数据库。

本文将在此基础上,对Python的两个ORM框架(sqlalchemysqlmodel)进行更为详细的介绍,包含内容如下:

  • MySQL数据库启动
  • ORM框架创建表格
  • ORM框架实现同步CRUD
  • ORM框架实现异步CRUD

本文所演示的Python代码已上传至Github,网址为: https://github.com/percent4/ORM_test .

MySQL数据库启动

本文采用Docker Compose方案来启动MySQL数据库,docker-compose.yml文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
version: '3'

services:
mysql8.0:
image: mysql:8.0
container_name: mysql8.0
restart: always
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: orm_test
ports:
- 3306:3306
volumes:
- ./mysql/data/:/var/lib/mysql/
- ./mysql/conf/:/etc/mysql/conf.d/
- ./mysql/init/:/docker-entrypoint-initdb.d/
command:
# 将mysql8.0默认密码策略修改为原先策略 (mysql8.0对其默认策略做了更改 会导致密码无法匹配)
--default-authentication-plugin=mysql_native_password
--character-set-server=utf8mb4
--collation-server=utf8mb4_general_ci
--explicit_defaults_for_timestamp=true
networks:
- mysql_elk_net

# 网络配置
networks:
mysql_elk_net:
driver: bridge

ORM框架创建表格

Python模块中对数据库操作常用ORM框架,常见的ORM框架主要有sqlalchemysqlmodel

我们以创建users表格为例,sqlalchemy的创建表格代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding: utf-8 -*-
from datetime import datetime as dt
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, DATETIME
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()


class Users(Base):
__tablename__ = 'users'

id = Column(INTEGER, primary_key=True, autoincrement=True)
name = Column(VARCHAR(256), nullable=False)
age = Column(INTEGER, nullable=True)
place = Column(VARCHAR(256), nullable=True)
insert_time = Column(DATETIME, default=dt.now())

def __repr__(self):
return f"User(id={self.id}, name={self.name}, age={self.age}, place={self.place}))"


def init_db():
engine = create_engine(
"mysql+pymysql://root:root@localhost:3306/orm_test",
echo=True
)
# 创建表
Base.metadata.create_all(engine)
print('Create table successfully!')
# 删除表
# Base.metadata.drop_all(engine)


if __name__ == '__main__':
init_db()

sqlmodel模块创建表格的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-
from datetime import datetime as dt
from sqlmodel import Field, SQLModel, create_engine


class Users(SQLModel, table=True):
# add table name
__tablename__ = "users"
id: int = Field(primary_key=True, nullable=False)
name: str = Field(nullable=False)
age: int = Field(nullable=True)
place: str = Field(nullable=True)
insert_time: dt = Field(nullable=True, default=dt.now())

def __repr__(self):
return f"Users(id={self.id}, name={self.name}, age={self.age}, place={self.place})"


if __name__ == '__main__':
# 创建数据库引擎
sqlite_url = "mysql+pymysql://root:root@localhost:3306/orm_test"
engine = create_engine(sqlite_url, echo=True)

# 创建所有表
SQLModel.metadata.create_all(engine)
# 删除所有表
# SQLModel.metadata.drop_all(engine)

创建的users表结构描述如下:

1
2
3
4
5
6
7
8
9
10
mysql> desc users;
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | int | NO | PRI | NULL | auto_increment |
| name | varchar(256) | NO | | NULL | |
| age | int | YES | | NULL | |
| place | varchar(256) | YES | | NULL | |
| insert_time | datetime | YES | | NULL | |
+-------------+--------------+------+-----+---------+----------------+

ORM框架实现同步CRUD

常见的MySQL数据库操作为新增(Create)、查询(Read)、修改(Update)、删除(Delete),即CRUD.

  • 使用sqlalchemy模块来实现同步CRUD

Python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# -*- coding: utf-8 -*-
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

from src.models.sqlalchemy_create_table import Users

# 初始化数据库连接
engine = create_engine("mysql+pymysql://root:root@localhost:3306/orm_test")
DBSession = sessionmaker(bind=engine)
session = DBSession()


# 封装为上下文管理器
@contextmanager
def session_maker(session=session):
try:
yield session
session.commit()
except Exception: # noqa
session.rollback()
raise
finally:
session.close()


def insert_data():
with session_maker() as db_session:
# 插入单条数据
entity = {'name': 'Jack', 'age': 25, 'place': 'USA'}
db_session.add(Users(**entity))
print("插入1条数据")

with session_maker() as db_session:
# 插入多条数据
users = [
{"name": "Green", "age": 26, "place": "UK"},
{"name": "Alex", "age": 31, "place": "GER"},
{"name": "Chen", "age": 52, "place": "CHN"},
{"name": "Zhang", "age": 42, "place": "CHN"}
]
entities = [Users(**user) for user in users]
db_session.add_all(entities)
print(f"插入{len(entities)}条数据")


def query_data():
with session_maker() as db_session:
# 查询第1条数据
print("查询第1条数据")
user = db_session.query(Users).first()
print(user)
# 查询所有数据
print("查询所有数据")
users = db_session.query(Users).all()
for u in users:
print(u)
# 查询指定字段
print("查询指定字段: name, age")
user = db_session.query(Users.name, Users.age).all()
print(user)
# 条件查询1
print("条件查询: age > 30")
user = db_session.query(Users).filter(Users.age > 30).all()
print(user)
# 条件查询2
print("条件查询: age > 50 and place == 'CHN'")
user = db_session.query(Users).filter(Users.age > 50, Users.place == 'CHN').all()
print(user)
# 排序查询
print("排序查询")
user = db_session.query(Users).order_by(Users.age.desc()).all()
print(user)


def update_data():
with session_maker() as db_session:
# 更新数据
# 1. 查询数据
user = db_session.query(Users).filter(Users.name == 'Zhang').first()
print("更新前数据: ", user)
# 2. 更新数据
user.age = 45
db_session.flush()
# 3. 再次查询数据
user = db_session.query(Users).filter(Users.name == 'Zhang').first()
print("更新后数据: ", user)


def delete_data():
with session_maker() as db_session:
# 删除数据
# 1. 查询数据
user = db_session.query(Users).filter(Users.name == 'Zhang').first()
print("删除前数据: ", user)
# 2. 删除数据
db_session.delete(user)
db_session.flush()
# 3. 再次查询数据
user = db_session.query(Users).filter(Users.name == 'Zhang').first()
print("删除后数据: ", user)


if __name__ == '__main__':
print('*' * 20, '插入数据', '*' * 20)
insert_data()
print('*' * 20, '查询数据', '*' * 20)
query_data()
print('*' * 20, '更新数据', '*' * 20)
update_data()
print('*' * 20, '删除数据', '*' * 20)
delete_data()
  • 使用sqlmodel模块来实现同步CRUD

Python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
from contextlib import contextmanager
from sqlmodel import create_engine, Session, select

from src.models.sqlmodel_create_table import Users

# 创建数据库引擎
mysql_url = "mysql+pymysql://root:root@localhost:3306/orm_test"
engine = create_engine(mysql_url)


# 封装上下文
@contextmanager
def session_maker(session=Session(engine)):
try:
yield session
session.commit()
except Exception: # noqa
session.rollback()
raise
finally:
session.close()


def insert_data():
with session_maker() as db_session:
# 插入单条数据
entity = {'name': 'Jack', 'age': 25, 'place': 'USA'}
db_session.add(Users(**entity))
print("插入1条数据")

with session_maker() as db_session:
# 插入多条数据
users = [
{"name": "Green", "age": 26, "place": "UK"},
{"name": "Alex", "age": 31, "place": "GER"},
{"name": "Chen", "age": 52, "place": "CHN"},
{"name": "Zhang", "age": 42, "place": "CHN"}
]
entities = [Users(**user) for user in users]
db_session.add_all(entities)
print(f"插入{len(entities)}条数据")


def query_data():
with session_maker() as db_session:
# 查询第1条数据
print("查询第1条数据")
statement = select(Users)
user = db_session.exec(statement).first()
print(user)
# 查询所有数据
print("查询所有数据")
statement = select(Users)
users = db_session.exec(statement).all()
for u in users:
print(u)
# 查询指定字段
print("查询指定字段: name, age")
statement = select(Users.name, Users.age)
users = db_session.exec(statement).all()
print(users)
# 条件查询1
print("条件查询: age > 30")
statement = select(Users).where(Users.age > 30)
users = db_session.exec(statement).all()
print(users)
# 条件查询2
print("条件查询: age > 50 and place == 'CHN'")
statement = select(Users).where(Users.age > 50, Users.place == 'CHN')
users = db_session.exec(statement).all()
print(users)
# # 排序查询
print("排序查询")
statement = select(Users).order_by(Users.age.desc())
users = db_session.exec(statement).all()
print(users)


def update_data():
with session_maker() as db_session:
# 更新数据
# 1. 查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = db_session.exec(statement).first()
print("更新前数据: ", user)
# 2. 更新数据
user.age = 45
db_session.flush()
# 3. 再次查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = db_session.exec(statement).first()
print("更新后数据: ", user)


def delete_data():
with session_maker() as db_session:
# 删除数据
# 1. 查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = db_session.exec(statement).first()
print("删除前数据: ", user)
# 2. 删除数据
db_session.delete(user)
db_session.flush()
# 3. 再次查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = db_session.exec(statement).first()
print("删除后数据: ", user)


if __name__ == '__main__':
print('*' * 20, '插入数据', '*' * 20)
insert_data()
print('*' * 20, '查询数据', '*' * 20)
query_data()
print('*' * 20, '更新数据', '*' * 20)
update_data()
print('*' * 20, '删除数据', '*' * 20)
delete_data()

sqlmodel模块的输出结果如下(sqlalchemy模块的结果类似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
******************** 插入数据 ********************
插入1条数据
插入4条数据
******************** 查询数据 ********************
查询第1条数据
place='USA' id=1 name='Jack' age=25 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
查询所有数据
place='USA' id=1 name='Jack' age=25 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
place='UK' id=2 name='Green' age=26 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
place='GER' id=3 name='Alex' age=31 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
place='CHN' id=4 name='Chen' age=52 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
place='CHN' id=5 name='Zhang' age=42 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
查询指定字段: name, age
[('Jack', 25), ('Green', 26), ('Alex', 31), ('Chen', 52), ('Zhang', 42)]
条件查询: age > 30
[Users(id=3, name=Alex, age=31, place=GER), Users(id=4, name=Chen, age=52, place=CHN), Users(id=5, name=Zhang, age=42, place=CHN)]
条件查询: age > 50 and place == 'CHN'
[Users(id=4, name=Chen, age=52, place=CHN)]
排序查询
[Users(id=4, name=Chen, age=52, place=CHN), Users(id=5, name=Zhang, age=42, place=CHN), Users(id=3, name=Alex, age=31, place=GER), Users(id=2, name=Green, age=26, place=UK), Users(id=1, name=Jack, age=25, place=USA)]
******************** 更新数据 ********************
更新前数据: place='CHN' id=5 name='Zhang' age=42 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
更新后数据: place='CHN' id=5 name='Zhang' age=45 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
******************** 删除数据 ********************
删除前数据: place='CHN' id=5 name='Zhang' age=45 insert_time=datetime.datetime(2025, 2, 28, 21, 27, 18)
删除后数据: None

ORM框架实现异步CRUD

接下来介绍如何使用异步来实现MySQL数据库的操作。

  • 使用sqlalchemy模块来实现异步CRUD

Python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: sqlalchemy_async_crud.py
# @time: 2025/2/28 19:45
# 使用SQLAlchemy异步操作数据库
import asyncio
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from src.models.sqlalchemy_create_table import Users

# 初始化数据库连接
async_engine = create_async_engine("mysql+aiomysql://root:root@localhost:3306/orm_test")
async_session = sessionmaker(bind=async_engine, class_=AsyncSession)


async def insert_data():
async with async_session() as db_session:
async with db_session.begin():
# 插入单条数据
entity = {'name': 'Jack', 'age': 25, 'place': 'USA'}
db_session.add(Users(**entity))
print("插入1条数据")

async with db_session.begin():
# 插入多条数据
users = [
{"name": "Green", "age": 26, "place": "UK"},
{"name": "Alex", "age": 31, "place": "GER"},
{"name": "Chen", "age": 52, "place": "CHN"},
{"name": "Zhang", "age": 42, "place": "CHN"}
]
entities = [Users(**user) for user in users]
db_session.add_all(entities)
print(f"插入{len(entities)}条数据")


async def query_data():
async with async_session() as db_session:
async with db_session.begin():
# 查询第1条数据
print("查询第1条数据")
statement = select(Users)
user = await db_session.execute(statement)
print(user.first())
# 查询所有数据
print("查询所有数据")
statement = select(Users)
users = await db_session.execute(statement)
for u in users.scalars():
print(u)
# 查询指定字段
print("查询指定字段: name and age")
statement = select(Users.name, Users.age)
users = await db_session.execute(statement)
print(users.all())
# 条件查询1
print("条件查询: age > 30")
statement = select(Users).where(Users.age > 30)
users = await db_session.execute(statement)
for u in users.scalars():
print(u)
# 条件查询2
print("条件查询: age > 50 and place == 'CHN'")
statement = select(Users).where((Users.age > 50) & (Users.place == 'CHN'))
users = await db_session.execute(statement)
for u in users.scalars():
print(u)
# 排序查询
print("排序查询")
statement = select(Users).order_by(Users.age.desc())
users = await db_session.execute(statement)
for u in users.scalars():
print(u)


async def update_data():
async with async_session() as db_session:
async with db_session.begin():
# 更新数据
# 1. 查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = await db_session.execute(statement)
user = user.scalars().first()
print("更新前数据: ", user)
# 2. 更新数据
user.age = 45
# 3. 再次查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = await db_session.execute(statement)
user = user.scalars().first()
print("更新后数据: ", user)


async def delete_data():
async with async_session() as db_session:
async with db_session.begin():
# 删除数据
# 1. 查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = await db_session.execute(statement)
user = user.scalars().first()
print("删除前数据: ", user)
# 2. 删除数据
await db_session.delete(user)
# 3. 再次查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = await db_session.execute(statement)
user = user.scalars().first()
print("删除后数据: ", user)


if __name__ == '__main__':
# 异步插入数据
print("异步插入数据")
loop = asyncio.get_event_loop()
loop.run_until_complete(insert_data())
# 异步查询数据
print("异步查询数据")
loop = asyncio.get_event_loop()
loop.run_until_complete(query_data())
# 异步更新数据
print("异步更新数据")
loop = asyncio.get_event_loop()
loop.run_until_complete(update_data())
# 异步删除数据
print("异步删除数据")
loop = asyncio.get_event_loop()
loop.run_until_complete(delete_data())
  • 使用sqlmodel模块来实现异步CRUD

Python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: sqlmodel_async_crud.py
# @time: 2025/2/28 20:12
# 使用SQLModel异步CRUD操作
import asyncio
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel import select
from sqlalchemy.ext.asyncio import create_async_engine

from src.models.sqlmodel_create_table import Users

# 初始化数据库连接
async_engine = create_async_engine("mysql+aiomysql://root:root@localhost:3306/orm_test")
async_session = AsyncSession(async_engine)


async def insert_data():
async with async_session as db_session:
async with db_session.begin():
# 插入单条数据
entity = {'name': 'Jack', 'age': 25, 'place': 'USA'}
db_session.add(Users(**entity))
print("插入1条数据")

async with async_session as db_session:
async with db_session.begin():
# 插入多条数据
users = [
{"name": "Green", "age": 26, "place": "UK"},
{"name": "Alex", "age": 31, "place": "GER"},
{"name": "Chen", "age": 52, "place": "CHN"},
{"name": "Zhang", "age": 42, "place": "CHN"}
]
entities = [Users(**user) for user in users]
db_session.add_all(entities)
print(f"插入{len(entities)}条数据")


async def query_data():
async with async_session as db_session:
async with db_session.begin():
# 查询第1条数据
print("查询第1条数据")
statement = select(Users)
user = await db_session.exec(statement)
print(user.first())
# 查询所有数据
print("查询所有数据")
statement = select(Users)
users = await db_session.exec(statement)
for u in users.all():
print(u)
# 查询指定字段
print("查询指定字段: name and age")
statement = select(Users.name, Users.age)
users = await db_session.exec(statement)
print(users.all())
# 条件查询1
print("条件查询: age > 30")
statement = select(Users).where(Users.age > 30)
users = await db_session.exec(statement)
for u in users.all():
print(u)
# 条件查询2
print("条件查询: age > 50 and place == 'CHN'")
statement = select(Users).where(Users.age > 50).where(Users.place == 'CHN')
users = await db_session.exec(statement)
for u in users.all():
print(u)
# 排序查询
print("排序查询")
statement = select(Users).order_by(Users.age)
users = await db_session.exec(statement)
for u in users.all():
print(u)


async def update_data():
async with async_session as db_session:
async with db_session.begin():
# 更新数据
# 1. 查询数据
statement = select(Users).where(Users.name == 'Zhang')
result = await db_session.exec(statement)
user = result.first()
print("更新前数据: ", user)
# 2. 更新数据
user.age = 45
db_session.add(user)
await db_session.commit()

async with db_session.begin():
# 3. 再次查询数据
statement = select(Users).where(Users.name == 'Zhang')
user = await db_session.exec(statement)
print("更新后数据: ", user.first())


async def delete_data():
async with async_session as db_session:
async with db_session.begin():
# 删除数据
# 1. 查询数据
statement = select(Users).where(Users.name == 'Zhang')
result = await db_session.exec(statement)
user = result.first()
print("删除前数据: ", user)
# 2. 删除数据
await db_session.delete(user)
await db_session.commit()
# 3. 再次查询数据
async with db_session.begin():
statement = select(Users).where(Users.name == 'Zhang')
user = await db_session.exec(statement)
print("删除后数据: ", user.first())


if __name__ == '__main__':
# 异步插入数据
# print("异步插入数据")
# loop = asyncio.get_event_loop()
# loop.run_until_complete(insert_data())
# 异步查询数据
# print("异步查询数据")
# loop = asyncio.get_event_loop()
# loop.run_until_complete(query_data())
# 异步更新数据
# print("异步更新数据")
# loop = asyncio.get_event_loop()
# loop.run_until_complete(update_data())
# 异步删除数据
print("异步删除数据")
loop = asyncio.get_event_loop()
loop.run_until_complete(delete_data())

sqlmodel模块的输出结果如下(sqlalchemy模块的结果类似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
异步插入数据
插入1条数据
插入4条数据
查询第1条数据
place='USA' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Jack' id=1 age=25
查询所有数据
place='USA' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Jack' id=1 age=25
place='UK' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Green' id=2 age=26
place='GER' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Alex' id=3 age=31
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Chen' id=4 age=52
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Zhang' id=5 age=42
查询指定字段: name and age
[('Jack', 25), ('Green', 26), ('Alex', 31), ('Chen', 52), ('Zhang', 42)]
条件查询: age > 30
place='GER' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Alex' id=3 age=31
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Chen' id=4 age=52
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Zhang' id=5 age=42
条件查询: age > 50 and place == 'CHN'
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Chen' id=4 age=52
排序查询
place='USA' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Jack' id=1 age=25
place='UK' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Green' id=2 age=26
place='GER' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Alex' id=3 age=31
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Zhang' id=5 age=42
place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Chen' id=4 age=52
异步更新数据
更新前数据: place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Zhang' id=5 age=42
更新后数据: place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Zhang' id=5 age=45
异步删除数据
删除前数据: place='CHN' insert_time=datetime.datetime(2025, 2, 28, 21, 33, 8) name='Zhang' id=5 age=45
删除后数据: None

总结

本文在之前文章的基础上,添加了两个ORM框架(sqlalchemysqlmodel)之间的代码对比,并介绍了这两个ORM框架如何实现创建表格、同步CRUD、异步CRUD。

本文演示的Python代码已开源,网址为:https://github.com/percent4/ORM_test .

后续文章将会介绍如何使用Alembic来实现MySQL数据库迁移。

欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。


使用ORM框架操作MySQL数据库
https://percent4.github.io/使用ORM框架操作MySQL数据库/
作者
Jclian91
发布于
2025年4月27日
许可协议