1. 网络编程基础
1-1. 网络协议概述
Python 支持多种网络协议:
协议 | 描述 | 主要模块 |
---|---|---|
TCP | 可靠的、面向连接的协议 | socket |
UDP | 不可靠的、无连接的协议 | socket |
HTTP/HTTPS | 超文本传输协议 | http.client , requests |
FTP | 文件传输协议 | ftplib |
SMTP | 简单邮件传输协议 | smtplib |
POP3/IMAP | 邮件接收协议 | poplib , imaplib |
1-2. Socket 编程基础
python
import socket
# 创建 TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 创建 UDP socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 设置 socket 选项
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.settimeout(5.0) # 设置超时时间
1-3. 基本网络通信
python
# 服务器端
def start_server(host='0.0.0.0', port=8000):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"服务器启动在 {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
print(f"收到来自 {addr} 的连接")
try:
data = client_socket.recv(1024)
if data:
print(f"收到数据: {data.decode()}")
client_socket.send(b"Hello, Client!")
finally:
client_socket.close()
# 客户端
def start_client(host='localhost', port=8000):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((host, port))
try:
client_socket.send(b"Hello, Server!")
data = client_socket.recv(1024)
print(f"收到响应: {data.decode()}")
finally:
client_socket.close()
2. 高级网络编程
2-1. 异步网络编程
python
import asyncio
import aiohttp
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'http://example.com',
'http://example.org',
'http://example.net'
]
tasks = [fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"{url} 的内容长度: {len(content)}")
# 运行异步程序
asyncio.run(main())
2-2. 并发服务器
python
import socket
import threading
from concurrent.futures import ThreadPoolExecutor
class ThreadedServer:
def __init__(self, host='0.0.0.0', port=8000):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((host, port))
self.server_socket.listen(5)
self.executor = ThreadPoolExecutor(max_workers=10)
def handle_client(self, client_socket, addr):
try:
while True:
data = client_socket.recv(1024)
if not data:
break
print(f"收到来自 {addr} 的数据: {data.decode()}")
client_socket.send(b"Message received")
except Exception as e:
print(f"处理客户端 {addr} 时出错: {e}")
finally:
client_socket.close()
def start(self):
print(f"服务器启动在 {self.host}:{self.port}")
try:
while True:
client_socket, addr = self.server_socket.accept()
self.executor.submit(self.handle_client, client_socket, addr)
except KeyboardInterrupt:
print("服务器正在关闭...")
finally:
self.server_socket.close()
self.executor.shutdown()
2-3. SSL/TLS 安全通信
python
import ssl
import socket
def create_secure_server(host='0.0.0.0', port=8443):
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile="server.crt", keyfile="server.key")
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"安全服务器启动在 {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
secure_socket = context.wrap_socket(client_socket, server_side=True)
try:
data = secure_socket.recv(1024)
if data:
print(f"收到加密数据: {data.decode()}")
secure_socket.send(b"Secure response")
finally:
secure_socket.close()
3. HTTP 编程
3-1. 使用 requests 库
python
import requests
from requests.exceptions import RequestException
def fetch_webpage(url):
try:
response = requests.get(
url,
timeout=5,
headers={'User-Agent': 'Mozilla/5.0'},
params={'key': 'value'}
)
response.raise_for_status()
return response.text
except RequestException as e:
print(f"请求失败: {e}")
return None
def post_data(url, data):
try:
response = requests.post(
url,
json=data,
timeout=5,
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
return response.json()
except RequestException as e:
print(f"POST 请求失败: {e}")
return None
3-2. 使用 http.server 创建简单服务器
python
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class SimpleHTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'method': 'GET',
'message': 'Hello, World!'
}
self.wfile.write(json.dumps(response).encode())
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'path': self.path,
'method': 'POST',
'data': post_data.decode()
}
self.wfile.write(json.dumps(response).encode())
def run_server(port=8000):
server_address = ('', port)
httpd = HTTPServer(server_address, SimpleHTTPHandler)
print(f"服务器启动在端口 {port}")
httpd.serve_forever()
4. WebSocket 编程
4-1. 使用 websockets 库
python
import asyncio
import websockets
import json
async def handle_websocket(websocket, path):
try:
async for message in websocket:
data = json.loads(message)
print(f"收到消息: {data}")
response = {
'status': 'success',
'message': 'Message received'
}
await websocket.send(json.dumps(response))
except websockets.exceptions.ConnectionClosed:
print("连接已关闭")
async def start_websocket_server(host='localhost', port=8765):
server = await websockets.serve(
handle_websocket,
host,
port
)
print(f"WebSocket 服务器启动在 {host}:{port}")
await server.wait_closed()
# 运行 WebSocket 服务器
asyncio.run(start_websocket_server())
5. 网络编程最佳实践
5-1. 错误处理
python
import socket
import errno
def safe_socket_operation():
try:
# 创建 socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时
sock.settimeout(5.0)
# 连接服务器
sock.connect(('example.com', 80))
# 发送数据
sock.send(b'GET / HTTP/1.1\r\n\r\n')
# 接收数据
data = sock.recv(1024)
print(f"收到数据: {data.decode()}")
except socket.timeout:
print("连接超时")
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
print("连接被拒绝")
elif e.errno == errno.ETIMEDOUT:
print("操作超时")
else:
print(f"Socket 错误: {e}")
finally:
sock.close()
5-2. 连接池管理
python
from urllib3 import PoolManager
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
# 配置连接池
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 使用连接池
def make_requests():
session = create_session()
try:
response = session.get('http://example.com')
print(response.status_code)
finally:
session.close()
6. 网络编程工具
6-1. 网络诊断工具
python
import subprocess
import platform
def network_diagnostics():
system = platform.system()
if system == "Windows":
# Windows 网络诊断
subprocess.run(["ipconfig", "/all"])
subprocess.run(["ping", "8.8.8.8", "-n", "4"])
subprocess.run(["tracert", "example.com"])
else:
# Linux/Mac 网络诊断
subprocess.run(["ifconfig"])
subprocess.run(["ping", "-c", "4", "8.8.8.8"])
subprocess.run(["traceroute", "example.com"])
6-2. 端口扫描器
python
import socket
import concurrent.futures
from typing import List
def scan_port(host: str, port: int) -> bool:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except socket.error:
return False
def port_scanner(host: str, start_port: int, end_port: int) -> List[int]:
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
future_to_port = {
executor.submit(scan_port, host, port): port
for port in range(start_port, end_port + 1)
}
for future in concurrent.futures.as_completed(future_to_port):
port = future_to_port[future]
try:
if future.result():
open_ports.append(port)
print(f"端口 {port} 开放")
except Exception as e:
print(f"扫描端口 {port} 时出错: {e}")
return open_ports
7. 实战示例
7-1. 简单的 HTTP 代理服务器
python
import socket
import threading
from urllib.parse import urlparse
class ProxyServer:
def __init__(self, host='0.0.0.0', port=8080):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((host, port))
self.server_socket.listen(5)
def handle_client(self, client_socket, addr):
try:
# 接收客户端请求
request = client_socket.recv(4096)
# 解析请求
first_line = request.split(b'\n')[0]
url = first_line.split(b' ')[1]
# 解析目标服务器
parsed_url = urlparse(url)
target_host = parsed_url.netloc
target_port = 80
# 连接到目标服务器
target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
target_socket.connect((target_host, target_port))
# 转发请求
target_socket.send(request)
# 接收响应并转发
while True:
response = target_socket.recv(4096)
if not response:
break
client_socket.send(response)
except Exception as e:
print(f"处理请求时出错: {e}")
finally:
client_socket.close()
target_socket.close()
def start(self):
print(f"代理服务器启动在 {self.host}:{self.port}")
try:
while True:
client_socket, addr = self.server_socket.accept()
threading.Thread(
target=self.handle_client,
args=(client_socket, addr)
).start()
except KeyboardInterrupt:
print("服务器正在关闭...")
finally:
self.server_socket.close()
7-2. 简单的聊天服务器
python
import socket
import threading
import json
from typing import Dict, Set
class ChatServer:
def __init__(self, host='0.0.0.0', port=9000):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((host, port))
self.server_socket.listen(5)
self.clients: Dict[socket.socket, str] = {}
self.lock = threading.Lock()
def broadcast(self, message: str, sender: socket.socket = None):
with self.lock:
for client in self.clients:
if client != sender:
try:
client.send(message.encode())
except:
self.remove_client(client)
def remove_client(self, client: socket.socket):
with self.lock:
if client in self.clients:
username = self.clients[client]
del self.clients[client]
self.broadcast(f"{username} 离开了聊天室\n")
def handle_client(self, client_socket: socket.socket, addr: tuple):
try:
# 接收用户名
username = client_socket.recv(1024).decode().strip()
with self.lock:
self.clients[client_socket] = username
self.broadcast(f"{username} 加入了聊天室\n")
while True:
message = client_socket.recv(1024).decode()
if not message:
break
# 广播消息
self.broadcast(f"{username}: {message}\n", client_socket)
except Exception as e:
print(f"处理客户端 {addr} 时出错: {e}")
finally:
self.remove_client(client_socket)
client_socket.close()
def start(self):
print(f"聊天服务器启动在 {self.host}:{self.port}")
try:
while True:
client_socket, addr = self.server_socket.accept()
threading.Thread(
target=self.handle_client,
args=(client_socket, addr)
).start()
except KeyboardInterrupt:
print("服务器正在关闭...")
finally:
self.server_socket.close()
TIP
网络编程最佳实践:
- 始终使用异常处理机制
- 合理设置超时时间
- 使用连接池管理连接
- 实现适当的重试机制
- 注意资源清理
- 考虑并发性能
- 实现适当的日志记录