【Python连接数据库基础 01】从原生驱动到ORM框架:Python数据库连接完全指南
关键词:Python数据库连接、原生驱动、ORM框架、SQLAlchemy、PyMySQL、psycopg2、数据库编程、连接池、事务管理
摘要:本文从零开始讲解Python连接数据库的完整流程,从最基础的原生驱动使用到高级的ORM框架应用。通过生动的类比和实际案例,帮助你理解不同连接方式的优缺点,掌握数据库连接池、事务管理等核心概念,最终能够在实际项目中选择最适合的数据库连接方案。
引言:为什么Python需要连接数据库?
想象一下,你正在开发一个在线商城。用户的订单信息、商品库存、用户账户等数据都需要持久化存储。如果把这些数据都存在内存里,一旦程序重启,所有数据就会丢失。这就像把重要文件只保存在电脑的内存条里,断电就全没了。
数据库就像是一个超级可靠的"文件柜",不仅能安全存储数据,还能快速检索、更新和管理。而Python作为"办公室工作人员",需要学会如何与这个"文件柜"打交道。
第一步:理解数据库连接的本质
什么是数据库连接?
数据库连接就像是你和银行之间的"专线电话"。当你需要查询余额或转账时,需要先拨通这个专线,验证身份,然后才能进行操作。
在技术层面,数据库连接包含以下要素:
- 主机地址:数据库服务器的位置(就像银行的地址)
- 端口号:服务的具体入口(就像银行的窗口号)
- 数据库名:要访问的具体数据库(就像要访问的账户类型)
- 用户名和密码:身份验证信息
连接的生命周期
一个数据库连接的生命周期就像一次银行业务办理:
- 建立连接:走进银行,排队等号
- 身份验证:出示身份证,验证身份
- 执行操作:查询余额、转账、存款等
- 关闭连接:业务办完,离开银行
第二步:原生驱动 - 最直接的连接方式
什么是原生驱动?
原生驱动就像是"直接对话"。你直接用数据库厂商提供的"语言包"与数据库交流,没有任何中间翻译。
MySQL连接示例
import pymysql# 建立连接 - 就像拨通银行专线
connection = pymysql.connect(host='localhost', # 银行地址port=3306, # 窗口号user='root', # 用户名password='password', # 密码database='shop_db', # 要访问的数据库charset='utf8mb4' # 字符编码
)try:# 创建游标 - 就像拿到业务单据with connection.cursor() as cursor:# 执行查询 - 填写业务需求sql = "SELECT * FROM products WHERE price < %s"cursor.execute(sql, (100,))# 获取结果 - 拿到查询结果results = cursor.fetchall()for row in results:print(f"商品:{row[1]}, 价格:{row[2]}")# 提交事务 - 确认业务完成connection.commit()finally:# 关闭连接 - 离开银行connection.close()
PostgreSQL连接示例
import psycopg2# PostgreSQL连接
connection = psycopg2.connect(host='localhost',port=5432,user='postgres',password='password',database='shop_db'
)try:with connection.cursor() as cursor:# 使用参数化查询防止SQL注入cursor.execute("SELECT name, price FROM products WHERE category = %s",('electronics',))products = cursor.fetchall()for name, price in products:print(f"{name}: ${price}")connection.commit()except psycopg2.Error as e:print(f"数据库错误:{e}")connection.rollback()finally:connection.close()
原生驱动的优缺点
优点:
- 性能最优,直接通信
- 功能完整,支持数据库特有功能
- 学习成本低,接近原生SQL
缺点:
- 代码冗长,需要手动管理连接
- 容易出错,忘记关闭连接会导致资源泄露
- 不同数据库API不同,移植性差
第三步:连接池 - 提升性能的关键
为什么需要连接池?
想象一下,如果每次去银行办业务都要重新排队、验证身份,那效率会很低。连接池就像是"VIP通道",预先建立好几个连接,需要时直接使用,用完归还。
from dbutils.pooled_db import PooledDB
import pymysql# 创建连接池
pool = PooledDB(creator=pymysql, # 数据库驱动maxconnections=20, # 最大连接数mincached=5, # 最小缓存连接数maxcached=10, # 最大缓存连接数blocking=True, # 连接用完时是否阻塞等待host='localhost',port=3306,user='root',password='password',database='shop_db',charset='utf8mb4'
)def get_products_by_category(category):"""使用连接池获取商品信息"""# 从池中获取连接connection = pool.connection()try:with connection.cursor() as cursor:cursor.execute("SELECT name, price FROM products WHERE category = %s",(category,))return cursor.fetchall()finally:# 连接自动归还到池中connection.close()# 使用示例
electronics = get_products_by_category('electronics')
for name, price in electronics:print(f"{name}: ${price}")
连接池的配置参数
# 高级连接池配置
pool_config = {'maxconnections': 50, # 最大连接数(根据服务器性能调整)'mincached': 10, # 启动时创建的连接数'maxcached': 20, # 池中最多保持的连接数'maxshared': 0, # 最大共享连接数(0表示不共享)'blocking': True, # 连接池满时是否等待'maxusage': 1000, # 单个连接最大使用次数'setsession': ['SET AUTOCOMMIT = 1'] # 连接初始化SQL
}
第四步:ORM框架 - 面向对象的数据库操作
什么是ORM?
ORM(Object-Relational Mapping)就像是"智能翻译官"。它把数据库的表格翻译成Python的类,把行记录翻译成对象,让你可以用面向对象的方式操作数据库。
想象一下,原来你需要说"请帮我在用户表中查找姓名为张三的记录",现在你可以直接说"给我找到张三这个用户对象"。
SQLAlchemy Core - 表达式语言
SQLAlchemy Core提供了一种更Pythonic的方式来构建SQL语句:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float
from sqlalchemy.sql import select# 创建引擎 - 就像配置好的"翻译官"
engine = create_engine('mysql+pymysql://root:password@localhost/shop_db')# 定义表结构
metadata = MetaData()
products = Table('products', metadata,Column('id', Integer, primary_key=True),Column('name', String(100)),Column('price', Float),Column('category', String(50))
)# 使用表达式语言查询
with engine.connect() as connection:# 构建查询 - 像搭积木一样组装SQLquery = select([products.c.name, products.c.price]).where(products.c.category == 'electronics').where(products.c.price < 1000)# 执行查询result = connection.execute(query)for row in result:print(f"商品:{row.name}, 价格:${row.price}")
SQLAlchemy ORM - 完全面向对象
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker# 创建基类
Base = declarative_base()# 定义产品模型 - 就像定义一个"商品"类
class Product(Base):__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(100))price = Column(Float)category = Column(String(50))def __repr__(self):return f"<Product(name='{self.name}', price={self.price})>"# 创建引擎和会话
engine = create_engine('mysql+pymysql://root:password@localhost/shop_db')
Session = sessionmaker(bind=engine)
session = Session()# 面向对象的数据库操作
try:# 查询 - 就像在对象列表中筛选cheap_electronics = session.query(Product).filter(Product.category == 'electronics',Product.price < 500).all()for product in cheap_electronics:print(f"{product.name}: ${product.price}")# 创建新商品 - 就像创建一个新对象new_product = Product(name='智能手表',price=299.99,category='electronics')# 添加到数据库session.add(new_product)session.commit()print(f"新商品已添加:{new_product}")except Exception as e:session.rollback()print(f"操作失败:{e}")finally:session.close()
Django ORM - Web开发的最佳选择
# models.py - 定义模型
from django.db import modelsclass Product(models.Model):name = models.CharField(max_length=100)price = models.DecimalField(max_digits=10, decimal_places=2)category = models.CharField(max_length=50)created_at = models.DateTimeField(auto_now_add=True)class Meta:db_table = 'products'def __str__(self):return self.name# views.py - 使用模型
from django.shortcuts import render
from .models import Productdef product_list(request):# Django ORM查询 - 非常直观electronics = Product.objects.filter(category='electronics',price__lt=1000).order_by('price')return render(request, 'products.html', {'products': electronics})# 在Django shell中的操作
# python manage.py shell# 创建商品
product = Product.objects.create(name='无线耳机',price=199.99,category='electronics'
)# 查询商品
expensive_products = Product.objects.filter(price__gte=500)
electronics_count = Product.objects.filter(category='electronics').count()# 更新商品
Product.objects.filter(category='electronics').update(price=F('price') * 0.9)# 删除商品
Product.objects.filter(price__lt=10).delete()
第五步:事务管理 - 确保数据一致性
什么是事务?
事务就像是"银行转账"操作。要么全部成功(钱从A账户扣除,同时加到B账户),要么全部失败(两个账户都不变)。不能出现钱从A账户扣了,但没加到B账户的情况。
# 原生驱动的事务管理
import pymysqlconnection = pymysql.connect(host='localhost',user='root',password='password',database='shop_db',autocommit=False # 关闭自动提交
)try:with connection.cursor() as cursor:# 开始事务 - 就像开始一个"原子操作"# 步骤1:减少商品库存cursor.execute("UPDATE products SET stock = stock - %s WHERE id = %s",(1, 101))# 步骤2:创建订单cursor.execute("INSERT INTO orders (product_id, quantity, user_id) VALUES (%s, %s, %s)",(101, 1, 1001))# 步骤3:更新用户积分cursor.execute("UPDATE users SET points = points + %s WHERE id = %s",(10, 1001))# 所有操作成功,提交事务connection.commit()print("订单创建成功!")except Exception as e:# 任何步骤失败,回滚所有操作connection.rollback()print(f"订单创建失败,已回滚:{e}")finally:connection.close()
SQLAlchemy的事务管理
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager@contextmanager
def database_transaction(session):"""事务上下文管理器"""try:yield sessionsession.commit()except Exception:session.rollback()raisefinally:session.close()# 使用事务管理器
Session = sessionmaker(bind=engine)def create_order(product_id, quantity, user_id):session = Session()with database_transaction(session):# 查找商品product = session.query(Product).filter_by(id=product_id).first()if not product:raise ValueError("商品不存在")if product.stock < quantity:raise ValueError("库存不足")# 减少库存product.stock -= quantity# 创建订单order = Order(product_id=product_id,quantity=quantity,user_id=user_id,total_price=product.price * quantity)session.add(order)# 更新用户积分user = session.query(User).filter_by(id=user_id).first()user.points += quantity * 10# 使用示例
try:create_order(product_id=101, quantity=2, user_id=1001)print("订单创建成功!")
except ValueError as e:print(f"订单创建失败:{e}")
第六步:选择合适的连接方式
决策树
不同场景的最佳实践
1. 简单脚本或数据分析
# 推荐:原生驱动 + 连接池
import pymysql
from dbutils.pooled_db import PooledDBpool = PooledDB(creator=pymysql,maxconnections=5,host='localhost',user='root',password='password',database='analytics_db'
)def analyze_sales_data():connection = pool.connection()try:with connection.cursor() as cursor:cursor.execute("""SELECT DATE(created_at) as date, SUM(total_amount) as daily_salesFROM orders WHERE created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)GROUP BY DATE(created_at)ORDER BY date""")return cursor.fetchall()finally:connection.close()
2. Web应用开发
# 推荐:Django ORM 或 SQLAlchemy ORM
from django.db import models
from django.db import transactionclass OrderService:@transaction.atomicdef create_order(self, user, items):"""创建订单服务"""order = Order.objects.create(user=user,status='pending',total_amount=0)total = 0for item_data in items:product = Product.objects.select_for_update().get(id=item_data['product_id'])if product.stock < item_data['quantity']:raise ValueError(f"商品 {product.name} 库存不足")# 减少库存product.stock -= item_data['quantity']product.save()# 创建订单项OrderItem.objects.create(order=order,product=product,quantity=item_data['quantity'],price=product.price)total += product.price * item_data['quantity']order.total_amount = totalorder.save()return order
3. 高性能API服务
# 推荐:SQLAlchemy Core + 异步驱动
import asyncio
import asyncpg
from sqlalchemy import create_engine, textclass AsyncDatabaseService:def __init__(self, database_url):self.database_url = database_urlself.pool = Noneasync def init_pool(self):self.pool = await asyncpg.create_pool(self.database_url)async def get_user_orders(self, user_id):async with self.pool.acquire() as connection:query = """SELECT o.id, o.created_at, o.total_amount,array_agg(json_build_object('product_name', p.name,'quantity', oi.quantity,'price', oi.price)) as itemsFROM orders oJOIN order_items oi ON o.id = oi.order_idJOIN products p ON oi.product_id = p.idWHERE o.user_id = $1GROUP BY o.id, o.created_at, o.total_amountORDER BY o.created_at DESC"""rows = await connection.fetch(query, user_id)return [dict(row) for row in rows]# 使用示例
async def main():db_service = AsyncDatabaseService("postgresql://user:password@localhost/shop_db")await db_service.init_pool()orders = await db_service.get_user_orders(1001)print(f"用户订单:{orders}")# asyncio.run(main())
第七步:性能优化与最佳实践
1. 连接池优化
# 生产环境连接池配置
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePoolengine = create_engine('mysql+pymysql://user:password@localhost/db',poolclass=QueuePool,pool_size=20, # 常驻连接数max_overflow=30, # 额外连接数pool_pre_ping=True, # 连接前检查有效性pool_recycle=3600, # 连接回收时间(秒)echo=False # 生产环境关闭SQL日志
)
2. 查询优化
# 批量操作优化
from sqlalchemy.orm import sessionmakerSession = sessionmaker(bind=engine)def bulk_insert_products(products_data):"""批量插入商品"""session = Session()try:# 使用bulk_insert_mappings提升性能session.bulk_insert_mappings(Product, products_data)session.commit()except Exception:session.rollback()raisefinally:session.close()def bulk_update_prices(price_updates):"""批量更新价格"""session = Session()try:# 使用bulk_update_mappingssession.bulk_update_mappings(Product, price_updates)session.commit()except Exception:session.rollback()raisefinally:session.close()# 使用示例
products_data = [{'name': '商品1', 'price': 99.99, 'category': 'electronics'},{'name': '商品2', 'price': 199.99, 'category': 'electronics'},# ... 更多商品
]bulk_insert_products(products_data)
3. 监控和调试
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time# 配置SQL查询日志
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)# 监控慢查询
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):context._query_start_time = time.time()@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):total = time.time() - context._query_start_timeif total > 0.5: # 记录超过0.5秒的查询logger.warning(f"慢查询 ({total:.2f}s): {statement[:100]}...")
总结:选择适合你的数据库连接方案
通过这篇文章,我们从最基础的原生驱动连接,到高级的ORM框架应用,完整地了解了Python连接数据库的各种方式。
快速选择指南
- 数据分析脚本:原生驱动 + 连接池
- Web应用:Django ORM 或 SQLAlchemy ORM
- 高性能API:SQLAlchemy Core + 异步驱动
- 复杂业务逻辑:SQLAlchemy ORM + 事务管理
- 简单CRUD操作:任何ORM框架
关键要点回顾
- 连接管理:始终使用连接池,避免频繁创建连接
- 事务处理:关键业务操作必须使用事务保证一致性
- 性能优化:合理配置连接池,使用批量操作
- 安全考虑:使用参数化查询防止SQL注入
- 错误处理:完善的异常处理和资源清理
记住,没有最好的方案,只有最适合的方案。根据你的项目需求、团队技能和性能要求来选择合适的数据库连接方式。
扩展阅读
- SQLAlchemy官方文档
- Django ORM最佳实践
- Python数据库API规范 PEP 249
- 数据库连接池原理与实现