Skip to content

1. 网络编程基础

1-1. 网络协议概述

Python 支持多种网络协议:

协议描述主要模块
TCP可靠的、面向连接的协议socket
UDP不可靠的、无连接的协议socket
HTTP/HTTPS超文本传输协议http.client, requests
FTP文件传输协议ftplib
SMTP简单邮件传输协议smtplib
POP3/IMAP邮件接收协议poplib, imaplib

1-2. Socket 编程基础

python
import socket

# 创建 TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 创建 UDP socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 设置 socket 选项
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.settimeout(5.0)  # 设置超时时间

1-3. 基本网络通信

python
# 服务器端
def start_server(host='0.0.0.0', port=8000):
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(5)
    
    print(f"服务器启动在 {host}:{port}")
    
    while True:
        client_socket, addr = server_socket.accept()
        print(f"收到来自 {addr} 的连接")
        
        try:
            data = client_socket.recv(1024)
            if data:
                print(f"收到数据: {data.decode()}")
                client_socket.send(b"Hello, Client!")
        finally:
            client_socket.close()

# 客户端
def start_client(host='localhost', port=8000):
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect((host, port))
    
    try:
        client_socket.send(b"Hello, Server!")
        data = client_socket.recv(1024)
        print(f"收到响应: {data.decode()}")
    finally:
        client_socket.close()

2. 高级网络编程

2-1. 异步网络编程

python
import asyncio
import aiohttp

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        'http://example.com',
        'http://example.org',
        'http://example.net'
    ]
    
    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    for url, content in zip(urls, results):
        print(f"{url} 的内容长度: {len(content)}")

# 运行异步程序
asyncio.run(main())

2-2. 并发服务器

python
import socket
import threading
from concurrent.futures import ThreadPoolExecutor

class ThreadedServer:
    def __init__(self, host='0.0.0.0', port=8000):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((host, port))
        self.server_socket.listen(5)
        self.executor = ThreadPoolExecutor(max_workers=10)
    
    def handle_client(self, client_socket, addr):
        try:
            while True:
                data = client_socket.recv(1024)
                if not data:
                    break
                print(f"收到来自 {addr} 的数据: {data.decode()}")
                client_socket.send(b"Message received")
        except Exception as e:
            print(f"处理客户端 {addr} 时出错: {e}")
        finally:
            client_socket.close()
    
    def start(self):
        print(f"服务器启动在 {self.host}:{self.port}")
        try:
            while True:
                client_socket, addr = self.server_socket.accept()
                self.executor.submit(self.handle_client, client_socket, addr)
        except KeyboardInterrupt:
            print("服务器正在关闭...")
        finally:
            self.server_socket.close()
            self.executor.shutdown()

2-3. SSL/TLS 安全通信

python
import ssl
import socket

def create_secure_server(host='0.0.0.0', port=8443):
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile="server.crt", keyfile="server.key")
    
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((host, port))
    server_socket.listen(5)
    
    print(f"安全服务器启动在 {host}:{port}")
    
    while True:
        client_socket, addr = server_socket.accept()
        secure_socket = context.wrap_socket(client_socket, server_side=True)
        
        try:
            data = secure_socket.recv(1024)
            if data:
                print(f"收到加密数据: {data.decode()}")
                secure_socket.send(b"Secure response")
        finally:
            secure_socket.close()

3. HTTP 编程

3-1. 使用 requests 库

python
import requests
from requests.exceptions import RequestException

def fetch_webpage(url):
    try:
        response = requests.get(
            url,
            timeout=5,
            headers={'User-Agent': 'Mozilla/5.0'},
            params={'key': 'value'}
        )
        response.raise_for_status()
        return response.text
    except RequestException as e:
        print(f"请求失败: {e}")
        return None

def post_data(url, data):
    try:
        response = requests.post(
            url,
            json=data,
            timeout=5,
            headers={'Content-Type': 'application/json'}
        )
        response.raise_for_status()
        return response.json()
    except RequestException as e:
        print(f"POST 请求失败: {e}")
        return None

3-2. 使用 http.server 创建简单服务器

python
from http.server import HTTPServer, BaseHTTPRequestHandler
import json

class SimpleHTTPHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'path': self.path,
            'method': 'GET',
            'message': 'Hello, World!'
        }
        self.wfile.write(json.dumps(response).encode())
    
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'path': self.path,
            'method': 'POST',
            'data': post_data.decode()
        }
        self.wfile.write(json.dumps(response).encode())

def run_server(port=8000):
    server_address = ('', port)
    httpd = HTTPServer(server_address, SimpleHTTPHandler)
    print(f"服务器启动在端口 {port}")
    httpd.serve_forever()

4. WebSocket 编程

4-1. 使用 websockets 库

python
import asyncio
import websockets
import json

async def handle_websocket(websocket, path):
    try:
        async for message in websocket:
            data = json.loads(message)
            print(f"收到消息: {data}")
            
            response = {
                'status': 'success',
                'message': 'Message received'
            }
            await websocket.send(json.dumps(response))
    except websockets.exceptions.ConnectionClosed:
        print("连接已关闭")

async def start_websocket_server(host='localhost', port=8765):
    server = await websockets.serve(
        handle_websocket,
        host,
        port
    )
    print(f"WebSocket 服务器启动在 {host}:{port}")
    await server.wait_closed()

# 运行 WebSocket 服务器
asyncio.run(start_websocket_server())

5. 网络编程最佳实践

5-1. 错误处理

python
import socket
import errno

def safe_socket_operation():
    try:
        # 创建 socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # 设置超时
        sock.settimeout(5.0)
        
        # 连接服务器
        sock.connect(('example.com', 80))
        
        # 发送数据
        sock.send(b'GET / HTTP/1.1\r\n\r\n')
        
        # 接收数据
        data = sock.recv(1024)
        print(f"收到数据: {data.decode()}")
        
    except socket.timeout:
        print("连接超时")
    except socket.error as e:
        if e.errno == errno.ECONNREFUSED:
            print("连接被拒绝")
        elif e.errno == errno.ETIMEDOUT:
            print("操作超时")
        else:
            print(f"Socket 错误: {e}")
    finally:
        sock.close()

5-2. 连接池管理

python
from urllib3 import PoolManager
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session():
    session = requests.Session()
    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[500, 502, 503, 504]
    )
    
    # 配置连接池
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,
        pool_maxsize=10
    )
    
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# 使用连接池
def make_requests():
    session = create_session()
    try:
        response = session.get('http://example.com')
        print(response.status_code)
    finally:
        session.close()

6. 网络编程工具

6-1. 网络诊断工具

python
import subprocess
import platform

def network_diagnostics():
    system = platform.system()
    
    if system == "Windows":
        # Windows 网络诊断
        subprocess.run(["ipconfig", "/all"])
        subprocess.run(["ping", "8.8.8.8", "-n", "4"])
        subprocess.run(["tracert", "example.com"])
    else:
        # Linux/Mac 网络诊断
        subprocess.run(["ifconfig"])
        subprocess.run(["ping", "-c", "4", "8.8.8.8"])
        subprocess.run(["traceroute", "example.com"])

6-2. 端口扫描器

python
import socket
import concurrent.futures
from typing import List

def scan_port(host: str, port: int) -> bool:
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except socket.error:
        return False

def port_scanner(host: str, start_port: int, end_port: int) -> List[int]:
    open_ports = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        future_to_port = {
            executor.submit(scan_port, host, port): port
            for port in range(start_port, end_port + 1)
        }
        
        for future in concurrent.futures.as_completed(future_to_port):
            port = future_to_port[future]
            try:
                if future.result():
                    open_ports.append(port)
                    print(f"端口 {port} 开放")
            except Exception as e:
                print(f"扫描端口 {port} 时出错: {e}")
    
    return open_ports

7. 实战示例

7-1. 简单的 HTTP 代理服务器

python
import socket
import threading
from urllib.parse import urlparse

class ProxyServer:
    def __init__(self, host='0.0.0.0', port=8080):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((host, port))
        self.server_socket.listen(5)
    
    def handle_client(self, client_socket, addr):
        try:
            # 接收客户端请求
            request = client_socket.recv(4096)
            
            # 解析请求
            first_line = request.split(b'\n')[0]
            url = first_line.split(b' ')[1]
            
            # 解析目标服务器
            parsed_url = urlparse(url)
            target_host = parsed_url.netloc
            target_port = 80
            
            # 连接到目标服务器
            target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            target_socket.connect((target_host, target_port))
            
            # 转发请求
            target_socket.send(request)
            
            # 接收响应并转发
            while True:
                response = target_socket.recv(4096)
                if not response:
                    break
                client_socket.send(response)
        
        except Exception as e:
            print(f"处理请求时出错: {e}")
        finally:
            client_socket.close()
            target_socket.close()
    
    def start(self):
        print(f"代理服务器启动在 {self.host}:{self.port}")
        try:
            while True:
                client_socket, addr = self.server_socket.accept()
                threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, addr)
                ).start()
        except KeyboardInterrupt:
            print("服务器正在关闭...")
        finally:
            self.server_socket.close()

7-2. 简单的聊天服务器

python
import socket
import threading
import json
from typing import Dict, Set

class ChatServer:
    def __init__(self, host='0.0.0.0', port=9000):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((host, port))
        self.server_socket.listen(5)
        self.clients: Dict[socket.socket, str] = {}
        self.lock = threading.Lock()
    
    def broadcast(self, message: str, sender: socket.socket = None):
        with self.lock:
            for client in self.clients:
                if client != sender:
                    try:
                        client.send(message.encode())
                    except:
                        self.remove_client(client)
    
    def remove_client(self, client: socket.socket):
        with self.lock:
            if client in self.clients:
                username = self.clients[client]
                del self.clients[client]
                self.broadcast(f"{username} 离开了聊天室\n")
    
    def handle_client(self, client_socket: socket.socket, addr: tuple):
        try:
            # 接收用户名
            username = client_socket.recv(1024).decode().strip()
            
            with self.lock:
                self.clients[client_socket] = username
                self.broadcast(f"{username} 加入了聊天室\n")
            
            while True:
                message = client_socket.recv(1024).decode()
                if not message:
                    break
                
                # 广播消息
                self.broadcast(f"{username}: {message}\n", client_socket)
        
        except Exception as e:
            print(f"处理客户端 {addr} 时出错: {e}")
        finally:
            self.remove_client(client_socket)
            client_socket.close()
    
    def start(self):
        print(f"聊天服务器启动在 {self.host}:{self.port}")
        try:
            while True:
                client_socket, addr = self.server_socket.accept()
                threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, addr)
                ).start()
        except KeyboardInterrupt:
            print("服务器正在关闭...")
        finally:
            self.server_socket.close()

TIP

网络编程最佳实践:

  1. 始终使用异常处理机制
  2. 合理设置超时时间
  3. 使用连接池管理连接
  4. 实现适当的重试机制
  5. 注意资源清理
  6. 考虑并发性能
  7. 实现适当的日志记录