Hey there, Python devs!
Let’s explore a practical approach to giving users control over stopping those AI-generated responses?
The Scenario
Imagine you’re building a FastAPI application that uses OpenAI’s API. You’ve got streaming responses working smoothly, but there’s one thing missing: the ability for users to stop the stream mid-generation.
The Challenge
Stopping a stream isn’t as straightforward as you might think. OpenAI’s API keeps pumping out tokens, and you need a clean way to interrupt that flow without breaking your entire application.
The Solution
Here’s a killer implementation that’ll make your users happy:
import asynciofrom fastapi import FastAPI, WebSocketfrom openai import AsyncOpenAIfrom typing import Optionalclass StreamController:def __init__(self):self.stop_generation = Falsedef request_stop(self):self.stop_generation = Trueclass AIResponseGenerator:def __init__(self, client: AsyncOpenAI):self.client = clientself.stream_controller = StreamController()async def generate_streaming_response(self, prompt: str):# Reset the stop flagself.stream_controller.stop_generation = Falsetry:stream = await self.client.chat.completions.create(model="gpt-3.5-turbo",messages=[{"role": "user", "content": prompt}],stream=True)full_response = ""for chunk in stream:# Check if stop was requestedif self.stream_controller.stop_generation:breakif chunk.choices[0].delta.content:content = chunk.choices[0].delta.contentfull_response += contentyield contentexcept Exception as e:print(f"Stream generation error: {e}")def stop_stream(self):# Trigger the stop mechanismself.stream_controller.request_stop()import asyncio from fastapi import FastAPI, WebSocket from openai import AsyncOpenAI from typing import Optional class StreamController: def __init__(self): self.stop_generation = False def request_stop(self): self.stop_generation = True class AIResponseGenerator: def __init__(self, client: AsyncOpenAI): self.client = client self.stream_controller = StreamController() async def generate_streaming_response(self, prompt: str): # Reset the stop flag self.stream_controller.stop_generation = False try: stream = await self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], stream=True ) full_response = "" for chunk in stream: # Check if stop was requested if self.stream_controller.stop_generation: break if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_response += content yield content except Exception as e: print(f"Stream generation error: {e}") def stop_stream(self): # Trigger the stop mechanism self.stream_controller.request_stop()import asyncio from fastapi import FastAPI, WebSocket from openai import AsyncOpenAI from typing import Optional class StreamController: def __init__(self): self.stop_generation = False def request_stop(self): self.stop_generation = True class AIResponseGenerator: def __init__(self, client: AsyncOpenAI): self.client = client self.stream_controller = StreamController() async def generate_streaming_response(self, prompt: str): # Reset the stop flag self.stream_controller.stop_generation = False try: stream = await self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], stream=True ) full_response = "" for chunk in stream: # Check if stop was requested if self.stream_controller.stop_generation: break if chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_response += content yield content except Exception as e: print(f"Stream generation error: {e}") def stop_stream(self): # Trigger the stop mechanism self.stream_controller.request_stop()
Enter fullscreen mode Exit fullscreen mode
Let’s unpack what’s happening here:
-
StreamController: This is our traffic cop. It manages a simple boolean flag to control stream generation.
-
AIResponseGenerator: The main class that handles AI response streaming.
- Uses AsyncOpenAI for non-blocking API calls
- Implements a generator that can be stopped mid-stream
- Provides a
stop_stream()
method to interrupt generation
Pro Tips
- Performance: This approach is memory-efficient and doesn’t block the event loop.
- ️ Error Handling: Includes basic error catching to prevent unexpected crashes.
- Flexibility: Easy to adapt to different streaming scenarios.
Potential Improvements
- Add timeout mechanisms
- Implement more granular error handling
- Create a more sophisticated stop mechanism for complex streams
See u guys!
原文链接:Stopping the Stream: A Pythonic Guide to Controlling OpenAI Responses
暂无评论内容