微服务架构实战指南
原创2024/1/24大约 5 分钟
微服务架构实战指南
微服务架构将单体应用拆分为多个小型、独立的服务,每个服务专注于特定的业务功能。
微服务架构图
上图展示了完整的微服务架构体系,包括核心服务、基础设施和支撑组件。
核心组件
1. API Gateway (API 网关)
API Gateway 是系统的统一入口,负责路由、认证、限流等功能。
// 使用 Express.js 实现简单的 API Gateway
const express = require('express')
const httpProxy = require('http-proxy')
const app = express()
const proxy = httpProxy.createProxyServer()
// 服务注册表
const services = {
user: 'http://localhost:3001',
order: 'http://localhost:3002',
payment: 'http://localhost:3003'
}
// 路由转发
app.all('/api/user/*', (req, res) => {
proxy.web(req, res, { target: services.user })
})
app.all('/api/order/*', (req, res) => {
proxy.web(req, res, { target: services.order })
})
app.all('/api/payment/*', (req, res) => {
proxy.web(req, res, { target: services.payment })
})
// 认证中间件
app.use(async (req, res, next) => {
const token = req.headers.authorization
if (!token) {
return res.status(401).json({ error: 'Unauthorized' })
}
try {
const user = await verifyToken(token)
req.user = user
next()
} catch (error) {
res.status(401).json({ error: 'Invalid token' })
}
})
// 限流
const rateLimiter = new Map()
app.use((req, res, next) => {
const clientId = req.ip
const requests = rateLimiter.get(clientId) || []
const now = Date.now()
// 清理过期记录
const validRequests = requests.filter(time => now - time < 60000)
if (validRequests.length >= 100) {
return res.status(429).json({ error: 'Too many requests' })
}
validRequests.push(now)
rateLimiter.set(clientId, validRequests)
next()
})
app.listen(3000)API Gateway 职责:
- 🚪 统一入口
- 🔐 认证授权
- 🚦 限流熔断
- 📊 监控日志
- 🔄 协议转换
2. User Service (用户服务)
// user-service/index.js
const express = require('express')
const app = express()
app.get('/api/user/:id', async (req, res) => {
try {
const user = await User.findById(req.params.id)
res.json(user)
} catch (error) {
res.status(500).json({ error: error.message })
}
})
app.post('/api/user', async (req, res) => {
try {
const user = await User.create(req.body)
// 发布事件
await publishEvent('user.created', { userId: user.id })
res.status(201).json(user)
} catch (error) {
res.status(400).json({ error: error.message })
}
})
app.listen(3001)3. Order Service (订单服务)
// order-service/index.js
const express = require('express')
const app = express()
app.post('/api/order', async (req, res) => {
try {
// 调用用户服务验证用户
const user = await fetch(`${services.user}/api/user/${req.body.userId}`)
.then(r => r.json())
// 创建订单
const order = await Order.create({
userId: user.id,
products: req.body.products,
totalAmount: calculateTotal(req.body.products)
})
// 调用支付服务
const payment = await fetch(`${services.payment}/api/payment`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
orderId: order.id,
amount: order.totalAmount
})
}).then(r => r.json())
// 发布订单创建事件
await publishEvent('order.created', { orderId: order.id })
res.status(201).json({ order, payment })
} catch (error) {
res.status(500).json({ error: error.message })
}
})
app.listen(3002)服务间通信
1. 同步通信 (REST API)
// HTTP 调用
async function callUserService(userId) {
const response = await fetch(`${USER_SERVICE}/api/user/${userId}`)
if (!response.ok) {
throw new Error('User service error')
}
return response.json()
}
// 使用 axios
const axios = require('axios')
async function getUserOrders(userId) {
try {
const [user, orders] = await Promise.all([
axios.get(`${USER_SERVICE}/api/user/${userId}`),
axios.get(`${ORDER_SERVICE}/api/orders?userId=${userId}`)
])
return {
user: user.data,
orders: orders.data
}
} catch (error) {
console.error('Error:', error.message)
throw error
}
}2. 异步通信 (Message Queue)
// 使用 RabbitMQ
const amqp = require('amqplib')
class MessageQueue {
async connect() {
this.connection = await amqp.connect('amqp://localhost')
this.channel = await this.connection.createChannel()
}
async publish(exchange, routingKey, message) {
await this.channel.assertExchange(exchange, 'topic', { durable: true })
this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true }
)
}
async subscribe(queue, routingKey, handler) {
await this.channel.assertQueue(queue, { durable: true })
await this.channel.bindQueue(queue, 'events', routingKey)
this.channel.consume(queue, async (msg) => {
if (msg) {
const content = JSON.parse(msg.content.toString())
try {
await handler(content)
this.channel.ack(msg)
} catch (error) {
console.error('Handler error:', error)
this.channel.nack(msg, false, true)
}
}
})
}
}
// 发布事件
const mq = new MessageQueue()
await mq.connect()
await mq.publish('events', 'order.created', {
orderId: '123',
userId: '456',
totalAmount: 100.00
})
// 订阅事件
await mq.subscribe('email-service', 'order.created', async (event) => {
console.log('Order created:', event.orderId)
await sendOrderConfirmationEmail(event.userId, event.orderId)
})服务发现
使用 Consul
const Consul = require('consul')
const consul = new Consul()
// 服务注册
async function registerService() {
await consul.agent.service.register({
id: 'user-service-1',
name: 'user-service',
address: 'localhost',
port: 3001,
check: {
http: 'http://localhost:3001/health',
interval: '10s',
timeout: '5s'
}
})
}
// 服务发现
async function discoverService(serviceName) {
const result = await consul.health.service({
service: serviceName,
passing: true
})
const services = result[0]
// 负载均衡:随机选择一个实例
const service = services[Math.floor(Math.random() * services.length)]
return `http://${service.Service.Address}:${service.Service.Port}`
}
// 使用服务发现
const userServiceUrl = await discoverService('user-service')
const user = await fetch(`${userServiceUrl}/api/user/123`)配置中心
// 使用 Spring Cloud Config 或自建配置中心
class ConfigClient {
constructor(configServerUrl) {
this.configServerUrl = configServerUrl
this.cache = new Map()
}
async getConfig(serviceName, profile = 'default') {
const cacheKey = `${serviceName}-${profile}`
if (this.cache.has(cacheKey)) {
return this.cache.get(cacheKey)
}
const response = await fetch(
`${this.configServerUrl}/${serviceName}/${profile}`
)
const config = await response.json()
this.cache.set(cacheKey, config)
return config
}
async refreshConfig(serviceName, profile) {
const cacheKey = `${serviceName}-${profile}`
this.cache.delete(cacheKey)
return this.getConfig(serviceName, profile)
}
}
// 使用配置中心
const configClient = new ConfigClient('http://config-server:8888')
const config = await configClient.getConfig('user-service', 'production')
console.log('Database URL:', config.database.url)
console.log('Cache TTL:', config.cache.ttl)熔断器模式
class CircuitBreaker {
constructor(request, options = {}) {
this.request = request
this.failureThreshold = options.failureThreshold || 5
this.timeout = options.timeout || 60000
this.state = 'CLOSED'
this.failureCount = 0
this.lastFailureTime = null
}
async call(...args) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = 'HALF_OPEN'
} else {
throw new Error('Circuit breaker is OPEN')
}
}
try {
const result = await this.request(...args)
this.onSuccess()
return result
} catch (error) {
this.onFailure()
throw error
}
}
onSuccess() {
this.failureCount = 0
this.state = 'CLOSED'
}
onFailure() {
this.failureCount++
this.lastFailureTime = Date.now()
if (this.failureCount >= this.failureThreshold) {
this.state = 'OPEN'
}
}
}
// 使用熔断器
const breaker = new CircuitBreaker(
(userId) => fetch(`${USER_SERVICE}/api/user/${userId}`),
{ failureThreshold: 3, timeout: 30000 }
)
try {
const response = await breaker.call('123')
const user = await response.json()
} catch (error) {
// 降级处理
console.log('Service unavailable, using fallback')
const user = { id: '123', name: 'Default User' }
}分布式追踪
// 使用 OpenTelemetry
const { trace, context } = require('@opentelemetry/api')
function createSpan(name, fn) {
return async (...args) => {
const tracer = trace.getTracer('my-service')
const span = tracer.startSpan(name)
try {
const result = await context.with(
trace.setSpan(context.active(), span),
() => fn(...args)
)
span.setStatus({ code: 0 })
return result
} catch (error) {
span.setStatus({ code: 2, message: error.message })
span.recordException(error)
throw error
} finally {
span.end()
}
}
}
// 使用
const processOrder = createSpan('processOrder', async (orderId) => {
const order = await fetchOrder(orderId)
const payment = await processPayment(order)
const shipping = await arrangeShipping(order)
return { order, payment, shipping }
})Docker 部署
# docker-compose.yml
version: '3.8'
services:
api-gateway:
build: ./api-gateway
ports:
- "3000:3000"
environment:
- NODE_ENV=production
depends_on:
- user-service
- order-service
user-service:
build: ./user-service
environment:
- DATABASE_URL=postgres://db:5432/users
- REDIS_URL=redis://redis:6379
depends_on:
- postgres
- redis
order-service:
build: ./order-service
environment:
- DATABASE_URL=postgres://db:5432/orders
depends_on:
- postgres
- rabbitmq
postgres:
image: postgres:15
environment:
POSTGRES_PASSWORD: secret
redis:
image: redis:7-alpine
rabbitmq:
image: rabbitmq:3-management最佳实践
- 单一职责: 每个服务只做一件事
- 数据独立: 每个服务有自己的数据库
- API 版本化: 使用 v1, v2 进行版本管理
- 容错设计: 熔断器、重试、降级
- 监控告警: 完善的监控体系
- 自动化部署: CI/CD 流水线
- 文档完善: API 文档、架构文档
挑战与解决方案
| 挑战 | 解决方案 |
|---|---|
| 分布式事务 | Saga 模式、最终一致性 |
| 服务间调用 | API Gateway、服务网格 |
| 数据一致性 | Event Sourcing、CQRS |
| 性能监控 | APM 工具、分布式追踪 |
| 部署复杂度 | Kubernetes、自动化 |
总结
微服务架构通过服务拆分提高了系统的可扩展性和可维护性,但也带来了分布式系统的复杂性。需要权衡业务需求,选择合适的架构模式。