LangChain integration example
This example demonstrates the streaming capability of LangChain with Anyscale Private Endpoints. The resulting chatbot emulates real-time conversation by processing input and generating responses incrementally, token by token.
LangChain provides a callbacks system that allows your app to handle asynchronous events like streaming new tokens that form a chatbot's response. In this way, the callbacks system ensures that the chatbot can manage a continuous flow of data without waiting for the entire response to generate before outputting text.
Step 0: Install dependencies
pip install openai==1.3.2
pip install langchain>=0.0.341
Step 1: Imports
Copy the rest of the following code into a Python script called chatbot.py
.
from langchain.callbacks.base import BaseCallbackHandler
from langchain.chat_models import ChatAnyscale
from langchain.memory import ChatMessageHistory
from queue import Queue
from threading import Thread
Step 2: Create a streaming callback handler
Define a custom BaseCallbackHandler
to handle streaming callbacks from LangChain.
class StreamingCBH(BaseCallbackHandler):
def __init__(self, q):
self.q = q
def on_llm_new_token(self, token, *, run_id, parent_run_id=None, **kwargs) -> None:
# When a new token is generated by the language model, put it in the queue.
self.q.put(token)
def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs):
# When the language model finishes generating the response, put the end marker in the queue.
self.q.put(INPUTMARKER_END)
Step 3: Define the chat agent
Create a LangchainChatAgent
class that manages the chat history, handles the user input, and streams the chatbot's response.
class LangchainChatAgent():
def __init__(self, model: str = None):
# Initialize the message history and the language model with the provided Anyscale API key.
self.message_history = ChatMessageHistory()
self.model = model
self.llm = ChatAnyscale(anyscale_api_base=ANYSCALE_BASE_URL, anyscale_api_key=ANYSCALE_API_KEY, temperature=0, model_name=self.model, streaming=True)
def process_input(self, user_message: str):
# Add the user message to the history and prepare a queue for the model's response.
self.message_history.add_user_message(user_message)
q = Queue()
# Start a new thread to predict messages from the language model.
thread = Thread(target=self.llm.predict_messages, kwargs={
'messages': self.message_history.messages,
'callbacks': [StreamingCBH(q)]
})
thread.start()
ai_message = ''
while True:
# Collect tokens from the queue until the end marker is received.
token = q.get()
if token == INPUTMARKER_END:
break
ai_message += token
yield token
# Add the complete AI-generated message to the history.
self.message_history.add_ai_message(ai_message)
Step 4: Set up the interaction loop
Set a marker that indicates the end of input from the language model. This marker helps in determining when to stop reading from the stream.
INPUTMARKER_END = "-- END --"
For a quick demo, you can paste in your API key, but for development, follow best practices for setting your API base and key.
ANYSCALE_BASE_URL = "ANYSCALE_ENDPOINT_BASE_URL"
ANYSCALE_API_KEY = "ANYSCALE_ENDPOINT_KEY"
Create the agent for your deployed model and set the interaction loop.
# Replace the model string with the model of your choice.
model_name = "meta-llama/Llama-2-70b-chat-hf"
agent = LangchainChatAgent(model_name)
print("Let's have a chat. Enter `quit` to exit.")
while True:
user_input = input('> ')
if user_input.lower() == 'quit':
break
for response_part in agent.process_input(user_input):
print(response_part, end='')
Step 5: Run the chatbot
Run the script in a terminal with python chatbot.py
to have a chat with your bot. The streaming callbacks allow you see the response token by token.