LangChain integration example
This version of the Anyscale docs is deprecated. Go to the latest version for up to date information.
This example demonstrates the streaming capability of LangChain with Anyscale Private Endpoints. The resulting chatbot emulates real-time conversation by processing input and generating responses incrementally, token by token.
LangChain provides a callbacks system that allows your app to handle asynchronous events like streaming new tokens that form a chatbot's response. In this way, the callbacks system ensures that the chatbot can manage a continuous flow of data without waiting for the entire response to generate before outputting text.
Step 0: Install dependencies
pip install openai==1.3.2
pip install langchain>=0.0.341
Step 1: Imports
Copy the rest of the following code into a Python script called chatbot.py
.
from langchain.callbacks.base import BaseCallbackHandler
from langchain.chat_models import ChatAnyscale
from langchain.memory import ChatMessageHistory
from queue import Queue
from threading import Thread
Step 2: Create a streaming callback handler
Define a custom BaseCallbackHandler
to handle streaming callbacks from LangChain.
class StreamingCBH(BaseCallbackHandler):
def __init__(self, q):
self.q = q
def on_llm_new_token(self, token, *, run_id, parent_run_id=None, **kwargs) -> None:
# When a new token is generated by the language model, put it in the queue.
self.q.put(token)
def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs):
# When the language model finishes generating the response, put the end marker in the queue.
self.q.put(INPUTMARKER_END)
Step 3: Define the chat agent
Create a LangchainChatAgent
class that manages the chat history, handles the user input, and streams the chatbot's response.
class LangchainChatAgent():
def __init__(self, model: str = None):
# Initialize the message history and the language model with the provided Anyscale API key.
self.message_history = ChatMessageHistory()
self.model = model
self.llm = ChatAnyscale(anyscale_api_base=ANYSCALE_BASE_URL, anyscale_api_key=ANYSCALE_API_KEY, temperature=0, model_name=self.model, streaming=True)
def process_input(self, user_message: str):
# Add the user message to the history and prepare a queue for the model's response.
self.message_history.add_user_message(user_message)
q = Queue()
# Start a new thread to predict messages from the language model.
thread = Thread(target=self.llm.predict_messages, kwargs={
'messages': self.message_history.messages,
'callbacks': [StreamingCBH(q)]
})
thread.start()
ai_message = ''
while True:
# Collect tokens from the queue until the end marker is received.
token = q.get()
if token == INPUTMARKER_END:
break
ai_message += token
yield token
# Add the complete AI-generated message to the history.
self.message_history.add_ai_message(ai_message)
Step 4: Set up the interaction loop
Set a marker that indicates the end of input from the language model. This marker helps in determining when to stop reading from the stream.
INPUTMARKER_END = "-- END --"
For a quick demo, you can paste in your API key, but for development, follow best practices for setting your API base and key.
ANYSCALE_BASE_URL = "ANYSCALE_ENDPOINT_BASE_URL"
ANYSCALE_API_KEY = "ANYSCALE_ENDPOINT_KEY"
Create the agent for your deployed model and set the interaction loop.
# Replace the model string with the model of your choice.
model_name = "meta-llama/Llama-2-70b-chat-hf"
agent = LangchainChatAgent(model_name)
print("Let's have a chat. Enter `quit` to exit.")
while True:
user_input = input('> ')
if user_input.lower() == 'quit':
break
for response_part in agent.process_input(user_input):
print(response_part, end='')
Step 5: Run the chatbot
Run the script in a terminal with python chatbot.py
to have a chat with your bot. The streaming callbacks allow you see the response token by token.