Skip to content

1. 文件基础操作

1-1. 文件打开模式

Python 支持多种文件打开模式:

模式描述
r只读模式(默认)
w写入模式,会覆盖已存在的文件
x独占创建模式,文件已存在则失败
a追加模式,在文件末尾添加内容
b二进制模式
t文本模式(默认)
+更新模式,可读写

1-2. 基本文件操作

python
# 打开文件
file = open('example.txt', 'r', encoding='utf-8')

# 读取文件
content = file.read()  # 读取整个文件
line = file.readline()  # 读取一行
lines = file.readlines()  # 读取所有行

# 写入文件
file.write('Hello, World!')  # 写入字符串
file.writelines(['line1\n', 'line2\n'])  # 写入多行

# 关闭文件
file.close()

1-3. 使用上下文管理器

python
# 使用 with 语句自动管理文件资源
with open('example.txt', 'r', encoding='utf-8') as file:
    content = file.read()
    # 文件会在 with 块结束时自动关闭

2. 文件操作进阶

2-1. 文件路径处理

python
import os
from pathlib import Path

# 使用 os.path
current_dir = os.path.dirname(__file__)
file_path = os.path.join(current_dir, 'data', 'example.txt')

# 使用 pathlib(推荐)
path = Path(__file__).parent / 'data' / 'example.txt'
path.exists()  # 检查文件是否存在
path.is_file()  # 检查是否是文件
path.is_dir()  # 检查是否是目录

2-2. 文件迭代器

python
# 逐行读取大文件
with open('large_file.txt', 'r', encoding='utf-8') as file:
    for line in file:
        process_line(line)  # 处理每一行

# 使用文件迭代器
with open('data.txt', 'r', encoding='utf-8') as file:
    # 跳过前两行
    next(file)
    next(file)
    # 处理剩余行
    for line in file:
        process_line(line)

2-3. 文件编码处理

python
import chardet

def detect_encoding(file_path):
    with open(file_path, 'rb') as file:
        raw_data = file.read()
        result = chardet.detect(raw_data)
        return result['encoding']

# 自动检测编码并读取文件
encoding = detect_encoding('example.txt')
with open('example.txt', 'r', encoding=encoding) as file:
    content = file.read()

3. 文件操作最佳实践

3-1. 文件操作模式

  1. 文本文件处理
python
def process_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            for line in file:
                # 处理每一行
                processed_line = line.strip()
                if processed_line:  # 跳过空行
                    yield processed_line
    except UnicodeDecodeError:
        # 处理编码错误
        print(f"无法解码文件: {file_path}")
    except IOError as e:
        # 处理IO错误
        print(f"文件操作错误: {e}")
  1. 二进制文件处理
python
def copy_binary_file(src_path, dst_path, chunk_size=8192):
    try:
        with open(src_path, 'rb') as src, open(dst_path, 'wb') as dst:
            while True:
                chunk = src.read(chunk_size)
                if not chunk:
                    break
                dst.write(chunk)
    except IOError as e:
        print(f"文件复制失败: {e}")

3-2. 文件操作模式

  1. 临时文件处理
python
import tempfile
import os

def process_with_temp_file():
    # 创建临时文件
    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
        temp_file.write(b'Hello, World!')
        temp_path = temp_file.name
    
    try:
        # 处理临时文件
        with open(temp_path, 'r') as file:
            content = file.read()
            print(content)
    finally:
        # 清理临时文件
        os.unlink(temp_path)
  1. 文件锁
python
import fcntl

def write_with_lock(file_path, content):
    with open(file_path, 'a') as file:
        try:
            # 获取文件锁
            fcntl.flock(file.fileno(), fcntl.LOCK_EX)
            file.write(content)
        finally:
            # 释放文件锁
            fcntl.flock(file.fileno(), fcntl.LOCK_UN)

4. 常见文件操作场景

4-1. 配置文件处理

python
import configparser
import json
import yaml

# INI 配置文件
def read_ini_config(file_path):
    config = configparser.ConfigParser()
    config.read(file_path)
    return config

# JSON 配置文件
def read_json_config(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return json.load(file)

# YAML 配置文件
def read_yaml_config(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return yaml.safe_load(file)

4-2. 日志文件处理

python
import logging
from logging.handlers import RotatingFileHandler

def setup_logging(log_file, max_bytes=10485760, backup_count=5):
    logger = logging.getLogger('my_app')
    logger.setLevel(logging.INFO)
    
    # 创建轮转文件处理器
    handler = RotatingFileHandler(
        log_file,
        maxBytes=max_bytes,
        backupCount=backup_count
    )
    
    # 设置日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    
    logger.addHandler(handler)
    return logger

4-3. CSV 文件处理

python
import csv
from typing import List, Dict

def read_csv(file_path: str) -> List[Dict]:
    with open(file_path, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        return list(reader)

def write_csv(file_path: str, data: List[Dict], fieldnames: List[str]):
    with open(file_path, 'w', encoding='utf-8', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

5. 文件操作性能优化

5-1. 大文件处理

python
def process_large_file(file_path, chunk_size=1024*1024):
    with open(file_path, 'rb') as file:
        while True:
            chunk = file.read(chunk_size)
            if not chunk:
                break
            process_chunk(chunk)

def process_chunk(chunk):
    # 处理数据块
    pass

5-2. 文件缓存

python
from functools import lru_cache

@lru_cache(maxsize=128)
def read_file_content(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

6. 文件操作安全

6-1. 文件权限检查

python
import os
import stat

def check_file_permissions(file_path):
    try:
        # 获取文件状态
        file_stat = os.stat(file_path)
        
        # 检查文件权限
        is_readable = bool(file_stat.st_mode & stat.S_IRUSR)
        is_writable = bool(file_stat.st_mode & stat.S_IWUSR)
        is_executable = bool(file_stat.st_mode & stat.S_IXUSR)
        
        return {
            'readable': is_readable,
            'writable': is_writable,
            'executable': is_executable
        }
    except OSError as e:
        print(f"无法检查文件权限: {e}")
        return None

6-2. 安全文件写入

python
import tempfile
import os
import shutil

def safe_write_file(file_path, content):
    # 创建临时文件
    temp_dir = os.path.dirname(file_path)
    with tempfile.NamedTemporaryFile(
        mode='w',
        dir=temp_dir,
        delete=False
    ) as temp_file:
        temp_file.write(content)
        temp_path = temp_file.name
    
    try:
        # 原子性重命名
        os.replace(temp_path, file_path)
    except Exception as e:
        # 清理临时文件
        os.unlink(temp_path)
        raise e

7. 实战示例

7-1. 文件监控器

python
import time
from pathlib import Path
from typing import Callable

class FileMonitor:
    def __init__(self, file_path: str, callback: Callable):
        self.file_path = Path(file_path)
        self.callback = callback
        self.last_modified = self.file_path.stat().st_mtime
        self.last_size = self.file_path.stat().st_size
    
    def start(self, interval: float = 1.0):
        try:
            while True:
                current_stat = self.file_path.stat()
                if (current_stat.st_mtime != self.last_modified or
                    current_stat.st_size != self.last_size):
                    self.callback(self.file_path)
                    self.last_modified = current_stat.st_mtime
                    self.last_size = current_stat.st_size
                time.sleep(interval)
        except KeyboardInterrupt:
            print("监控已停止")
        except Exception as e:
            print(f"监控出错: {e}")

# 使用示例
def on_file_change(file_path):
    print(f"文件已更改: {file_path}")

monitor = FileMonitor('example.txt', on_file_change)
monitor.start()

7-2. 文件压缩工具

python
import zipfile
import os
from pathlib import Path
from typing import List

class FileCompressor:
    def __init__(self, output_path: str):
        self.output_path = Path(output_path)
    
    def compress_files(self, file_paths: List[str]):
        with zipfile.ZipFile(
            self.output_path,
            'w',
            zipfile.ZIP_DEFLATED
        ) as zip_file:
            for file_path in file_paths:
                file_path = Path(file_path)
                if file_path.is_file():
                    arcname = file_path.name
                    zip_file.write(file_path, arcname)
    
    def extract_files(self, extract_path: str):
        extract_path = Path(extract_path)
        with zipfile.ZipFile(self.output_path, 'r') as zip_file:
            zip_file.extractall(extract_path)

# 使用示例
compressor = FileCompressor('archive.zip')
compressor.compress_files(['file1.txt', 'file2.txt'])
compressor.extract_files('extracted')

TIP

文件操作最佳实践:

  1. 始终使用上下文管理器(with 语句)处理文件
  2. 处理文件时考虑编码问题
  3. 大文件使用迭代器逐行或分块处理
  4. 注意文件权限和安全性
  5. 使用 pathlib 处理文件路径
  6. 考虑使用临时文件进行安全写入
  7. 实现适当的错误处理和日志记录