1. 文件基础操作
1-1. 文件打开模式
Python 支持多种文件打开模式:
模式 | 描述 |
---|---|
r | 只读模式(默认) |
w | 写入模式,会覆盖已存在的文件 |
x | 独占创建模式,文件已存在则失败 |
a | 追加模式,在文件末尾添加内容 |
b | 二进制模式 |
t | 文本模式(默认) |
+ | 更新模式,可读写 |
1-2. 基本文件操作
python
# 打开文件
file = open('example.txt', 'r', encoding='utf-8')
# 读取文件
content = file.read() # 读取整个文件
line = file.readline() # 读取一行
lines = file.readlines() # 读取所有行
# 写入文件
file.write('Hello, World!') # 写入字符串
file.writelines(['line1\n', 'line2\n']) # 写入多行
# 关闭文件
file.close()
1-3. 使用上下文管理器
python
# 使用 with 语句自动管理文件资源
with open('example.txt', 'r', encoding='utf-8') as file:
content = file.read()
# 文件会在 with 块结束时自动关闭
2. 文件操作进阶
2-1. 文件路径处理
python
import os
from pathlib import Path
# 使用 os.path
current_dir = os.path.dirname(__file__)
file_path = os.path.join(current_dir, 'data', 'example.txt')
# 使用 pathlib(推荐)
path = Path(__file__).parent / 'data' / 'example.txt'
path.exists() # 检查文件是否存在
path.is_file() # 检查是否是文件
path.is_dir() # 检查是否是目录
2-2. 文件迭代器
python
# 逐行读取大文件
with open('large_file.txt', 'r', encoding='utf-8') as file:
for line in file:
process_line(line) # 处理每一行
# 使用文件迭代器
with open('data.txt', 'r', encoding='utf-8') as file:
# 跳过前两行
next(file)
next(file)
# 处理剩余行
for line in file:
process_line(line)
2-3. 文件编码处理
python
import chardet
def detect_encoding(file_path):
with open(file_path, 'rb') as file:
raw_data = file.read()
result = chardet.detect(raw_data)
return result['encoding']
# 自动检测编码并读取文件
encoding = detect_encoding('example.txt')
with open('example.txt', 'r', encoding=encoding) as file:
content = file.read()
3. 文件操作最佳实践
3-1. 文件操作模式
- 文本文件处理
python
def process_text_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
# 处理每一行
processed_line = line.strip()
if processed_line: # 跳过空行
yield processed_line
except UnicodeDecodeError:
# 处理编码错误
print(f"无法解码文件: {file_path}")
except IOError as e:
# 处理IO错误
print(f"文件操作错误: {e}")
- 二进制文件处理
python
def copy_binary_file(src_path, dst_path, chunk_size=8192):
try:
with open(src_path, 'rb') as src, open(dst_path, 'wb') as dst:
while True:
chunk = src.read(chunk_size)
if not chunk:
break
dst.write(chunk)
except IOError as e:
print(f"文件复制失败: {e}")
3-2. 文件操作模式
- 临时文件处理
python
import tempfile
import os
def process_with_temp_file():
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(b'Hello, World!')
temp_path = temp_file.name
try:
# 处理临时文件
with open(temp_path, 'r') as file:
content = file.read()
print(content)
finally:
# 清理临时文件
os.unlink(temp_path)
- 文件锁
python
import fcntl
def write_with_lock(file_path, content):
with open(file_path, 'a') as file:
try:
# 获取文件锁
fcntl.flock(file.fileno(), fcntl.LOCK_EX)
file.write(content)
finally:
# 释放文件锁
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
4. 常见文件操作场景
4-1. 配置文件处理
python
import configparser
import json
import yaml
# INI 配置文件
def read_ini_config(file_path):
config = configparser.ConfigParser()
config.read(file_path)
return config
# JSON 配置文件
def read_json_config(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
# YAML 配置文件
def read_yaml_config(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
4-2. 日志文件处理
python
import logging
from logging.handlers import RotatingFileHandler
def setup_logging(log_file, max_bytes=10485760, backup_count=5):
logger = logging.getLogger('my_app')
logger.setLevel(logging.INFO)
# 创建轮转文件处理器
handler = RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count
)
# 设置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
4-3. CSV 文件处理
python
import csv
from typing import List, Dict
def read_csv(file_path: str) -> List[Dict]:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return list(reader)
def write_csv(file_path: str, data: List[Dict], fieldnames: List[str]):
with open(file_path, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
5. 文件操作性能优化
5-1. 大文件处理
python
def process_large_file(file_path, chunk_size=1024*1024):
with open(file_path, 'rb') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
process_chunk(chunk)
def process_chunk(chunk):
# 处理数据块
pass
5-2. 文件缓存
python
from functools import lru_cache
@lru_cache(maxsize=128)
def read_file_content(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
6. 文件操作安全
6-1. 文件权限检查
python
import os
import stat
def check_file_permissions(file_path):
try:
# 获取文件状态
file_stat = os.stat(file_path)
# 检查文件权限
is_readable = bool(file_stat.st_mode & stat.S_IRUSR)
is_writable = bool(file_stat.st_mode & stat.S_IWUSR)
is_executable = bool(file_stat.st_mode & stat.S_IXUSR)
return {
'readable': is_readable,
'writable': is_writable,
'executable': is_executable
}
except OSError as e:
print(f"无法检查文件权限: {e}")
return None
6-2. 安全文件写入
python
import tempfile
import os
import shutil
def safe_write_file(file_path, content):
# 创建临时文件
temp_dir = os.path.dirname(file_path)
with tempfile.NamedTemporaryFile(
mode='w',
dir=temp_dir,
delete=False
) as temp_file:
temp_file.write(content)
temp_path = temp_file.name
try:
# 原子性重命名
os.replace(temp_path, file_path)
except Exception as e:
# 清理临时文件
os.unlink(temp_path)
raise e
7. 实战示例
7-1. 文件监控器
python
import time
from pathlib import Path
from typing import Callable
class FileMonitor:
def __init__(self, file_path: str, callback: Callable):
self.file_path = Path(file_path)
self.callback = callback
self.last_modified = self.file_path.stat().st_mtime
self.last_size = self.file_path.stat().st_size
def start(self, interval: float = 1.0):
try:
while True:
current_stat = self.file_path.stat()
if (current_stat.st_mtime != self.last_modified or
current_stat.st_size != self.last_size):
self.callback(self.file_path)
self.last_modified = current_stat.st_mtime
self.last_size = current_stat.st_size
time.sleep(interval)
except KeyboardInterrupt:
print("监控已停止")
except Exception as e:
print(f"监控出错: {e}")
# 使用示例
def on_file_change(file_path):
print(f"文件已更改: {file_path}")
monitor = FileMonitor('example.txt', on_file_change)
monitor.start()
7-2. 文件压缩工具
python
import zipfile
import os
from pathlib import Path
from typing import List
class FileCompressor:
def __init__(self, output_path: str):
self.output_path = Path(output_path)
def compress_files(self, file_paths: List[str]):
with zipfile.ZipFile(
self.output_path,
'w',
zipfile.ZIP_DEFLATED
) as zip_file:
for file_path in file_paths:
file_path = Path(file_path)
if file_path.is_file():
arcname = file_path.name
zip_file.write(file_path, arcname)
def extract_files(self, extract_path: str):
extract_path = Path(extract_path)
with zipfile.ZipFile(self.output_path, 'r') as zip_file:
zip_file.extractall(extract_path)
# 使用示例
compressor = FileCompressor('archive.zip')
compressor.compress_files(['file1.txt', 'file2.txt'])
compressor.extract_files('extracted')
TIP
文件操作最佳实践:
- 始终使用上下文管理器(with 语句)处理文件
- 处理文件时考虑编码问题
- 大文件使用迭代器逐行或分块处理
- 注意文件权限和安全性
- 使用 pathlib 处理文件路径
- 考虑使用临时文件进行安全写入
- 实现适当的错误处理和日志记录