1. 数据库基础
1-1. 数据库类型
Python 支持多种数据库:
数据库类型 | 描述 | 主要模块 |
---|---|---|
SQLite | 轻量级文件数据库 | sqlite3 |
MySQL | 关系型数据库 | mysql-connector-python , PyMySQL |
PostgreSQL | 对象关系型数据库 | psycopg2 |
MongoDB | 文档型数据库 | pymongo |
Redis | 键值存储数据库 | redis |
1-2. 数据库连接
python
# SQLite 连接
import sqlite3
conn = sqlite3.connect('example.db')
# MySQL 连接
import mysql.connector
conn = mysql.connector.connect(
host='localhost',
user='username',
password='password',
database='mydatabase'
)
# PostgreSQL 连接
import psycopg2
conn = psycopg2.connect(
host='localhost',
user='username',
password='password',
dbname='mydatabase'
)
# MongoDB 连接
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
# Redis 连接
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
1-3. 基本操作
python
# SQLite 基本操作
def sqlite_basic_operations():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER
)
''')
# 插入数据
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 25))
# 查询数据
cursor.execute('SELECT * FROM users')
rows = cursor.fetchall()
# 更新数据
cursor.execute('UPDATE users SET age = ? WHERE name = ?', (26, 'Alice'))
# 删除数据
cursor.execute('DELETE FROM users WHERE name = ?', ('Alice',))
conn.commit()
conn.close()
# MongoDB 基本操作
def mongo_basic_operations():
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
collection = db['users']
# 插入文档
collection.insert_one({'name': 'Alice', 'age': 25})
# 查询文档
result = collection.find({'name': 'Alice'})
# 更新文档
collection.update_one(
{'name': 'Alice'},
{'$set': {'age': 26}}
)
# 删除文档
collection.delete_one({'name': 'Alice'})
client.close()
2. 数据库操作进阶
2-1. 连接池管理
python
from dbutils.pooled_db import PooledDB
import pymysql
def create_connection_pool():
pool = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
maxcached=5,
maxshared=3,
blocking=True,
maxusage=None,
setsession=[],
ping=0,
host='localhost',
user='username',
password='password',
database='mydatabase',
charset='utf8mb4'
)
return pool
# 使用连接池
def use_connection_pool():
pool = create_connection_pool()
conn = pool.connection()
cursor = conn.cursor()
try:
cursor.execute('SELECT * FROM users')
results = cursor.fetchall()
return results
finally:
cursor.close()
conn.close()
2-2. 事务处理
python
def transaction_example():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
try:
# 开始事务
cursor.execute('BEGIN TRANSACTION')
# 执行多个操作
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Bob', 30))
cursor.execute('UPDATE users SET age = ? WHERE name = ?', (31, 'Bob'))
# 提交事务
conn.commit()
except Exception as e:
# 回滚事务
conn.rollback()
print(f"事务失败: {e}")
finally:
conn.close()
2-3. ORM 使用
python
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
def orm_example():
# 创建引擎
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
try:
# 创建新用户
new_user = User(name='Alice', age=25)
session.add(new_user)
# 查询用户
user = session.query(User).filter_by(name='Alice').first()
# 更新用户
user.age = 26
# 删除用户
session.delete(user)
# 提交更改
session.commit()
except Exception as e:
session.rollback()
print(f"操作失败: {e}")
finally:
session.close()
3. 数据库操作最佳实践
3-1. 参数化查询
python
def safe_query():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 不安全的查询(容易受到 SQL 注入攻击)
# cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# 安全的参数化查询
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
results = cursor.fetchall()
conn.close()
return results
3-2. 批量操作
python
def batch_operations():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 批量插入
users = [
('Alice', 25),
('Bob', 30),
('Charlie', 35)
]
cursor.executemany('INSERT INTO users (name, age) VALUES (?, ?)', users)
# 批量更新
updates = [
(26, 'Alice'),
(31, 'Bob'),
(36, 'Charlie')
]
cursor.executemany('UPDATE users SET age = ? WHERE name = ?', updates)
conn.commit()
conn.close()
3-3. 连接管理
python
from contextlib import contextmanager
@contextmanager
def get_db_connection():
conn = sqlite3.connect('example.db')
try:
yield conn
finally:
conn.close()
def use_connection():
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users')
return cursor.fetchall()
4. 常见数据库操作场景
4-1. 数据迁移
python
import pandas as pd
from sqlalchemy import create_engine
def migrate_data():
# 从 CSV 文件读取数据
df = pd.read_csv('data.csv')
# 创建数据库引擎
engine = create_engine('sqlite:///example.db')
# 将数据写入数据库
df.to_sql('users', engine, if_exists='replace', index=False)
4-2. 数据备份
python
import shutil
import datetime
def backup_database():
# 创建备份文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f'backup_{timestamp}.db'
# 复制数据库文件
shutil.copy2('example.db', backup_file)
return backup_file
4-3. 数据验证
python
from pydantic import BaseModel, validator
class UserModel(BaseModel):
name: str
age: int
@validator('age')
def validate_age(cls, v):
if v < 0 or v > 150:
raise ValueError('年龄必须在 0 到 150 之间')
return v
def validate_user_data(data):
try:
user = UserModel(**data)
return True, user
except ValueError as e:
return False, str(e)
5. 性能优化
5-1. 索引优化
python
def optimize_with_indexes():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_name ON users (name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_age ON users (age)')
# 分析查询性能
cursor.execute('EXPLAIN QUERY PLAN SELECT * FROM users WHERE name = ?', ('Alice',))
plan = cursor.fetchall()
conn.commit()
conn.close()
return plan
5-2. 查询优化
python
def optimize_queries():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 使用 LIMIT 限制结果集
cursor.execute('SELECT * FROM users LIMIT 100')
# 只选择需要的列
cursor.execute('SELECT name, age FROM users')
# 使用 WHERE 子句过滤
cursor.execute('SELECT * FROM users WHERE age > ?', (18,))
results = cursor.fetchall()
conn.close()
return results
6. 数据库安全
6-1. 数据加密
python
from cryptography.fernet import Fernet
def encrypt_data():
# 生成密钥
key = Fernet.generate_key()
cipher_suite = Fernet(key)
# 加密数据
data = b"Sensitive data"
encrypted_data = cipher_suite.encrypt(data)
# 解密数据
decrypted_data = cipher_suite.decrypt(encrypted_data)
return encrypted_data, decrypted_data
6-2. 访问控制
python
def setup_access_control():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE,
password_hash TEXT,
role TEXT
)
''')
# 创建权限表
cursor.execute('''
CREATE TABLE IF NOT EXISTS permissions (
id INTEGER PRIMARY KEY,
role TEXT,
resource TEXT,
action TEXT
)
''')
conn.commit()
conn.close()
7. 实战示例
7-1. 数据库迁移工具
python
import os
import sqlite3
from typing import List, Dict
class DatabaseMigrator:
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
def create_migrations_table(self):
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def apply_migration(self, migration_file: str):
with open(migration_file, 'r') as f:
sql = f.read()
try:
self.cursor.executescript(sql)
migration_name = os.path.basename(migration_file)
self.cursor.execute(
'INSERT INTO migrations (name) VALUES (?)',
(migration_name,)
)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
def get_applied_migrations(self) -> List[str]:
self.cursor.execute('SELECT name FROM migrations ORDER BY id')
return [row[0] for row in self.cursor.fetchall()]
def close(self):
self.conn.close()
7-2. 数据同步工具
python
import sqlite3
from typing import Dict, List
import hashlib
class DataSynchronizer:
def __init__(self, source_db: str, target_db: str):
self.source_conn = sqlite3.connect(source_db)
self.target_conn = sqlite3.connect(target_db)
self.source_cursor = self.source_conn.cursor()
self.target_cursor = self.target_conn.cursor()
def get_table_hash(self, table: str, cursor) -> str:
cursor.execute(f'SELECT * FROM {table} ORDER BY rowid')
data = cursor.fetchall()
return hashlib.md5(str(data).encode()).hexdigest()
def sync_table(self, table: str):
source_hash = self.get_table_hash(table, self.source_cursor)
target_hash = self.get_table_hash(table, self.target_cursor)
if source_hash != target_hash:
# 获取源表数据
self.source_cursor.execute(f'SELECT * FROM {table}')
data = self.source_cursor.fetchall()
# 清空目标表
self.target_cursor.execute(f'DELETE FROM {table}')
# 插入新数据
self.target_cursor.executemany(
f'INSERT INTO {table} VALUES ({",".join(["?"] * len(data[0]))})',
data
)
self.target_conn.commit()
return True
return False
def close(self):
self.source_conn.close()
self.target_conn.close()
TIP
数据库操作最佳实践:
- 始终使用参数化查询防止 SQL 注入
- 合理使用事务确保数据一致性
- 使用连接池管理数据库连接
- 实现适当的错误处理和日志记录
- 定期备份重要数据
- 优化查询性能
- 注意数据安全和访问控制