Skip to content

1. 数据库基础

1-1. 数据库类型

Python 支持多种数据库:

数据库类型描述主要模块
SQLite轻量级文件数据库sqlite3
MySQL关系型数据库mysql-connector-python, PyMySQL
PostgreSQL对象关系型数据库psycopg2
MongoDB文档型数据库pymongo
Redis键值存储数据库redis

1-2. 数据库连接

python
# SQLite 连接
import sqlite3
conn = sqlite3.connect('example.db')

# MySQL 连接
import mysql.connector
conn = mysql.connector.connect(
    host='localhost',
    user='username',
    password='password',
    database='mydatabase'
)

# PostgreSQL 连接
import psycopg2
conn = psycopg2.connect(
    host='localhost',
    user='username',
    password='password',
    dbname='mydatabase'
)

# MongoDB 连接
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']

# Redis 连接
import redis
r = redis.Redis(host='localhost', port=6379, db=0)

1-3. 基本操作

python
# SQLite 基本操作
def sqlite_basic_operations():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 创建表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            age INTEGER
        )
    ''')
    
    # 插入数据
    cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 25))
    
    # 查询数据
    cursor.execute('SELECT * FROM users')
    rows = cursor.fetchall()
    
    # 更新数据
    cursor.execute('UPDATE users SET age = ? WHERE name = ?', (26, 'Alice'))
    
    # 删除数据
    cursor.execute('DELETE FROM users WHERE name = ?', ('Alice',))
    
    conn.commit()
    conn.close()

# MongoDB 基本操作
def mongo_basic_operations():
    client = MongoClient('mongodb://localhost:27017/')
    db = client['mydatabase']
    collection = db['users']
    
    # 插入文档
    collection.insert_one({'name': 'Alice', 'age': 25})
    
    # 查询文档
    result = collection.find({'name': 'Alice'})
    
    # 更新文档
    collection.update_one(
        {'name': 'Alice'},
        {'$set': {'age': 26}}
    )
    
    # 删除文档
    collection.delete_one({'name': 'Alice'})
    
    client.close()

2. 数据库操作进阶

2-1. 连接池管理

python
from dbutils.pooled_db import PooledDB
import pymysql

def create_connection_pool():
    pool = PooledDB(
        creator=pymysql,
        maxconnections=6,
        mincached=2,
        maxcached=5,
        maxshared=3,
        blocking=True,
        maxusage=None,
        setsession=[],
        ping=0,
        host='localhost',
        user='username',
        password='password',
        database='mydatabase',
        charset='utf8mb4'
    )
    return pool

# 使用连接池
def use_connection_pool():
    pool = create_connection_pool()
    conn = pool.connection()
    cursor = conn.cursor()
    
    try:
        cursor.execute('SELECT * FROM users')
        results = cursor.fetchall()
        return results
    finally:
        cursor.close()
        conn.close()

2-2. 事务处理

python
def transaction_example():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    try:
        # 开始事务
        cursor.execute('BEGIN TRANSACTION')
        
        # 执行多个操作
        cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Bob', 30))
        cursor.execute('UPDATE users SET age = ? WHERE name = ?', (31, 'Bob'))
        
        # 提交事务
        conn.commit()
    except Exception as e:
        # 回滚事务
        conn.rollback()
        print(f"事务失败: {e}")
    finally:
        conn.close()

2-3. ORM 使用

python
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

def orm_example():
    # 创建引擎
    engine = create_engine('sqlite:///example.db')
    Base.metadata.create_all(engine)
    
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()
    
    try:
        # 创建新用户
        new_user = User(name='Alice', age=25)
        session.add(new_user)
        
        # 查询用户
        user = session.query(User).filter_by(name='Alice').first()
        
        # 更新用户
        user.age = 26
        
        # 删除用户
        session.delete(user)
        
        # 提交更改
        session.commit()
    except Exception as e:
        session.rollback()
        print(f"操作失败: {e}")
    finally:
        session.close()

3. 数据库操作最佳实践

3-1. 参数化查询

python
def safe_query():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 不安全的查询(容易受到 SQL 注入攻击)
    # cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
    
    # 安全的参数化查询
    cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
    
    results = cursor.fetchall()
    conn.close()
    return results

3-2. 批量操作

python
def batch_operations():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 批量插入
    users = [
        ('Alice', 25),
        ('Bob', 30),
        ('Charlie', 35)
    ]
    cursor.executemany('INSERT INTO users (name, age) VALUES (?, ?)', users)
    
    # 批量更新
    updates = [
        (26, 'Alice'),
        (31, 'Bob'),
        (36, 'Charlie')
    ]
    cursor.executemany('UPDATE users SET age = ? WHERE name = ?', updates)
    
    conn.commit()
    conn.close()

3-3. 连接管理

python
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    conn = sqlite3.connect('example.db')
    try:
        yield conn
    finally:
        conn.close()

def use_connection():
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM users')
        return cursor.fetchall()

4. 常见数据库操作场景

4-1. 数据迁移

python
import pandas as pd
from sqlalchemy import create_engine

def migrate_data():
    # 从 CSV 文件读取数据
    df = pd.read_csv('data.csv')
    
    # 创建数据库引擎
    engine = create_engine('sqlite:///example.db')
    
    # 将数据写入数据库
    df.to_sql('users', engine, if_exists='replace', index=False)

4-2. 数据备份

python
import shutil
import datetime

def backup_database():
    # 创建备份文件名
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_file = f'backup_{timestamp}.db'
    
    # 复制数据库文件
    shutil.copy2('example.db', backup_file)
    
    return backup_file

4-3. 数据验证

python
from pydantic import BaseModel, validator

class UserModel(BaseModel):
    name: str
    age: int
    
    @validator('age')
    def validate_age(cls, v):
        if v < 0 or v > 150:
            raise ValueError('年龄必须在 0 到 150 之间')
        return v

def validate_user_data(data):
    try:
        user = UserModel(**data)
        return True, user
    except ValueError as e:
        return False, str(e)

5. 性能优化

5-1. 索引优化

python
def optimize_with_indexes():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 创建索引
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_name ON users (name)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_age ON users (age)')
    
    # 分析查询性能
    cursor.execute('EXPLAIN QUERY PLAN SELECT * FROM users WHERE name = ?', ('Alice',))
    plan = cursor.fetchall()
    
    conn.commit()
    conn.close()
    return plan

5-2. 查询优化

python
def optimize_queries():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 使用 LIMIT 限制结果集
    cursor.execute('SELECT * FROM users LIMIT 100')
    
    # 只选择需要的列
    cursor.execute('SELECT name, age FROM users')
    
    # 使用 WHERE 子句过滤
    cursor.execute('SELECT * FROM users WHERE age > ?', (18,))
    
    results = cursor.fetchall()
    conn.close()
    return results

6. 数据库安全

6-1. 数据加密

python
from cryptography.fernet import Fernet

def encrypt_data():
    # 生成密钥
    key = Fernet.generate_key()
    cipher_suite = Fernet(key)
    
    # 加密数据
    data = b"Sensitive data"
    encrypted_data = cipher_suite.encrypt(data)
    
    # 解密数据
    decrypted_data = cipher_suite.decrypt(encrypted_data)
    
    return encrypted_data, decrypted_data

6-2. 访问控制

python
def setup_access_control():
    conn = sqlite3.connect('example.db')
    cursor = conn.cursor()
    
    # 创建用户表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            username TEXT UNIQUE,
            password_hash TEXT,
            role TEXT
        )
    ''')
    
    # 创建权限表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS permissions (
            id INTEGER PRIMARY KEY,
            role TEXT,
            resource TEXT,
            action TEXT
        )
    ''')
    
    conn.commit()
    conn.close()

7. 实战示例

7-1. 数据库迁移工具

python
import os
import sqlite3
from typing import List, Dict

class DatabaseMigrator:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
    
    def create_migrations_table(self):
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS migrations (
                id INTEGER PRIMARY KEY,
                name TEXT UNIQUE,
                applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        self.conn.commit()
    
    def apply_migration(self, migration_file: str):
        with open(migration_file, 'r') as f:
            sql = f.read()
        
        try:
            self.cursor.executescript(sql)
            migration_name = os.path.basename(migration_file)
            self.cursor.execute(
                'INSERT INTO migrations (name) VALUES (?)',
                (migration_name,)
            )
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            raise e
    
    def get_applied_migrations(self) -> List[str]:
        self.cursor.execute('SELECT name FROM migrations ORDER BY id')
        return [row[0] for row in self.cursor.fetchall()]
    
    def close(self):
        self.conn.close()

7-2. 数据同步工具

python
import sqlite3
from typing import Dict, List
import hashlib

class DataSynchronizer:
    def __init__(self, source_db: str, target_db: str):
        self.source_conn = sqlite3.connect(source_db)
        self.target_conn = sqlite3.connect(target_db)
        self.source_cursor = self.source_conn.cursor()
        self.target_cursor = self.target_conn.cursor()
    
    def get_table_hash(self, table: str, cursor) -> str:
        cursor.execute(f'SELECT * FROM {table} ORDER BY rowid')
        data = cursor.fetchall()
        return hashlib.md5(str(data).encode()).hexdigest()
    
    def sync_table(self, table: str):
        source_hash = self.get_table_hash(table, self.source_cursor)
        target_hash = self.get_table_hash(table, self.target_cursor)
        
        if source_hash != target_hash:
            # 获取源表数据
            self.source_cursor.execute(f'SELECT * FROM {table}')
            data = self.source_cursor.fetchall()
            
            # 清空目标表
            self.target_cursor.execute(f'DELETE FROM {table}')
            
            # 插入新数据
            self.target_cursor.executemany(
                f'INSERT INTO {table} VALUES ({",".join(["?"] * len(data[0]))})',
                data
            )
            
            self.target_conn.commit()
            return True
        return False
    
    def close(self):
        self.source_conn.close()
        self.target_conn.close()

TIP

数据库操作最佳实践:

  1. 始终使用参数化查询防止 SQL 注入
  2. 合理使用事务确保数据一致性
  3. 使用连接池管理数据库连接
  4. 实现适当的错误处理和日志记录
  5. 定期备份重要数据
  6. 优化查询性能
  7. 注意数据安全和访问控制