系统设计核心原则
原创2024/1/14大约 4 分钟
系统设计核心原则
好的系统设计是软件成功的关键,本文介绍系统设计的核心原则和实践方法。
CAP 定理
分布式系统只能同时满足以下三项中的两项:
- C (Consistency): 一致性
- A (Availability): 可用性
- P (Partition tolerance): 分区容错性
一致性 (C)
/\
/ \
/ \
/ \
/ CP CA \
/ \
/ CAP \
/ X \
/______________ \
\ AP /
\ /
\ /
\ /
\ /
\ /
\ /
\/
可用性(A) 分区容错性(P)实践选择
- CP: ZooKeeper, HBase
- AP: Cassandra, DynamoDB
- CA: 传统关系型数据库(不考虑分区)
扩展性设计
垂直扩展 vs 水平扩展
垂直扩展 (Scale Up)
┌─────────┐ ┌──────────┐
│ 4核8G │ --> │ 8核16G │
│ Server │ │ Server │
└─────────┘ └──────────┘
水平扩展 (Scale Out)
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 4核8G │ --> │ 4核8G │+│ 4核8G │+│ 4核8G │
│ Server │ │ Server1 │ │ Server2 │ │ Server3 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘负载均衡
┌──────────┐
│ 负载均衡 │
└─────┬────┘
┌──────────┼──────────┐
│ │ │
┌────▼───┐ ┌───▼────┐ ┌───▼────┐
│ App1 │ │ App2 │ │ App3 │
└────────┘ └────────┘ └────────┘常用算法:
- 轮询 (Round Robin)
- 最少连接 (Least Connections)
- IP Hash
- 加权轮询
缓存策略
缓存模式
Cache-Aside (旁路缓存)
def get_user(user_id):
# 先查缓存
user = cache.get(f"user:{user_id}")
if user:
return user
# 缓存未命中,查数据库
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
# 写入缓存
cache.set(f"user:{user_id}", user, ttl=3600)
return user
def update_user(user_id, data):
# 更新数据库
db.execute("UPDATE users SET ... WHERE id = ?", user_id, data)
# 删除缓存
cache.delete(f"user:{user_id}")Read-Through / Write-Through
class CacheLayer:
def get(self, key):
value = cache.get(key)
if not value:
value = db.get(key)
cache.set(key, value)
return value
def set(self, key, value):
# 同时写入缓存和数据库
cache.set(key, value)
db.set(key, value)Write-Behind (Write-Back)
class AsyncWriteCache:
def set(self, key, value):
# 先写缓存
cache.set(key, value)
# 异步写入数据库
queue.enqueue({
'operation': 'write',
'key': key,
'value': value
})数据库设计
读写分离
┌──────────┐
│ 应用层 │
└─────┬────┘
┌───────────┼────────────┐
│写 │ │读
┌────▼────┐ │ ┌────▼────┐
│ Master │ │ │ Slave1 │
│ 主库 │─────同步────│ 从库1 │
└─────────┘ │ └─────────┘
│ ┌─────────┐
└──────│ Slave2 │
│ 从库2 │
└─────────┘分库分表
垂直分割
原表 users
┌────┬──────┬───────┬──────┬────────┐
│id │name │email │phone │ avatar │
└────┴──────┴───────┴──────┴────────┘
分割后:
users_basic users_profile
┌────┬──────┬───────┐ ┌────┬──────┬────────┐
│id │name │email │ │id │phone │ avatar │
└────┴──────┴───────┘ └────┴──────┴────────┘水平分割
users_0 users_1 users_2
┌─────────┐ ┌─────────┐ ┌─────────┐
│id: 1-100│ │id:101-200│ │id:201-300│
└─────────┘ └─────────┘ └─────────┘消息队列
使用场景
生产者 消息队列 消费者
┌────────┐ ┌──────────┐ ┌────────┐
│ 订单服务│─────────▶│ Queue │────────▶│邮件服务│
└────────┘ └──────────┘ └────────┘
│ ┌──────────┐ ┌────────┐
└──────────────▶│ Queue │────────▶│短信服务│
└──────────┘ └────────┘消息可靠性
# 生产者确认
def send_message(message):
try:
result = queue.send(message, persistent=True)
if result.success:
db.mark_sent(message.id)
except Exception as e:
# 重试逻辑
retry_send(message)
# 消费者确认
def process_message(message):
try:
# 处理消息
handle(message)
# 手动确认
message.ack()
except Exception as e:
# 重新入队
message.nack(requeue=True)微服务架构
服务拆分
单体应用
┌─────────────────────────┐
│ Monolith App │
│ ┌──────────────────┐ │
│ │ User Service │ │
│ │ Order Service │ │
│ │ Payment Service │ │
│ └──────────────────┘ │
└─────────────────────────┘
微服务
┌───────────┐ ┌───────────┐ ┌────────────┐
│ User │ │ Order │ │ Payment │
│ Service │ │ Service │ │ Service │
└───────────┘ └───────────┘ └────────────┘
│ │ │
└──────────────┴───────────────┘
│
┌──────▼──────┐
│ API │
│ Gateway │
└─────────────┘服务通信
同步通信 (REST/gRPC)
Service A ──HTTP──▶ Service B
异步通信 (Message Queue)
Service A ──Publish──▶ Queue ──Subscribe──▶ Service B高可用设计
容错机制
熔断器模式
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
self.last_failure_time = None
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception('Circuit breaker is OPEN')
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'重试机制
def retry(max_attempts=3, delay=1, backoff=2):
def decorator(func):
def wrapper(*args, **kwargs):
attempts = 0
current_delay = delay
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts >= max_attempts:
raise e
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
@retry(max_attempts=3, delay=1, backoff=2)
def call_external_api():
# API 调用
pass性能优化
数据库优化
-- 添加索引
CREATE INDEX idx_user_email ON users(email);
CREATE INDEX idx_orders_user_created
ON orders(user_id, created_at);
-- 查询优化
-- 不好的查询
SELECT * FROM orders WHERE YEAR(created_at) = 2024;
-- 优化后
SELECT * FROM orders
WHERE created_at >= '2024-01-01'
AND created_at < '2025-01-01';
-- 使用连接代替子查询
-- 不好
SELECT * FROM users
WHERE id IN (SELECT user_id FROM orders);
-- 优化后
SELECT DISTINCT u.* FROM users u
INNER JOIN orders o ON u.id = o.user_id;CDN 加速
用户请求
│
▼
┌───────┐ ┌────────┐ ┌──────────┐
│ CDN │──未命中─▶│ 源服务器│─────▶│ 数据库 │
│ 边缘节点│◀─返回───┤ │ └──────────┘
└───────┘ └────────┘
│
▼
返回给用户安全设计
认证和授权
认证流程:
1. 用户登录
2. 验证凭证
3. 生成 Token
4. 返回 Token
授权流程:
1. 请求携带 Token
2. 验证 Token
3. 检查权限
4. 返回结果API 安全
# Rate Limiting
from functools import wraps
import time
def rate_limit(max_requests=100, time_window=60):
requests = {}
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
client_id = get_client_id()
now = time.time()
# 清理过期记录
requests[client_id] = [
req_time for req_time in requests.get(client_id, [])
if now - req_time < time_window
]
# 检查限流
if len(requests[client_id]) >= max_requests:
raise Exception('Rate limit exceeded')
requests[client_id].append(now)
return func(*args, **kwargs)
return wrapper
return decorator监控和告警
# 指标采集
metrics = {
'request_count': Counter('http_requests_total'),
'request_duration': Histogram('http_request_duration_seconds'),
'error_count': Counter('http_errors_total')
}
@app.route('/api/users')
def get_users():
start_time = time.time()
metrics['request_count'].inc()
try:
# 业务逻辑
users = fetch_users()
return users
except Exception as e:
metrics['error_count'].inc()
raise
finally:
duration = time.time() - start_time
metrics['request_duration'].observe(duration)总结
系统设计需要在可用性、一致性、性能等多方面做权衡。没有完美的架构,只有最适合当前业务的设计。