基于Kafka的大模型数据流处理实践
最近在为一个大模型推理服务设计数据流处理架构时,踩了一个大坑。一开始我们直接在应用层用Python的asyncio处理消息队列,结果在高峰期直接把服务搞崩了。
问题复现
# 错误做法
from kafka import KafkaConsumer
import asyncio
class BadProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'model-input',
bootstrap_servers='localhost:9092',
value_deserializer=lambda x: json.loads(x)
)
async def process_messages(self):
for msg in self.consumer:
# 直接处理,没有限流
result = await self.model_predict(msg.value)
# 同步写回
正确实践
采用Kafka + 消费者组 + 限流机制的架构:
# 正确做法
from kafka import KafkaConsumer
from asyncio import Semaphore
import asyncio
class GoodProcessor:
def __init__(self, max_concurrent=10):
self.consumer = KafkaConsumer(
'model-input',
bootstrap_servers='localhost:9092',
group_id='model-group',
value_deserializer=lambda x: json.loads(x)
)
self.semaphore = Semaphore(max_concurrent)
async def process_message(self, msg):
async with self.semaphore:
result = await self.model_predict(msg.value)
return result
async def run(self):
tasks = []
for msg in self.consumer:
task = asyncio.create_task(self.process_message(msg))
tasks.append(task)
# 控制并发数
if len(tasks) >= 100:
await asyncio.gather(*tasks)
tasks = []
核心经验
- 消费者组配置要合理,避免重复消费
- 添加限流机制防止模型过载
- 使用异步处理提高吞吐量

讨论