Skip to main content

LangChain integration example

This example demonstrates the streaming capability of LangChain with Anyscale Private Endpoints. The resulting chatbot emulates real-time conversation by processing input and generating responses incrementally, token by token.

LangChain provides a callbacks system that allows your app to handle asynchronous events like streaming new tokens that form a chatbot's response. In this way, the callbacks system ensures that the chatbot can manage a continuous flow of data without waiting for the entire response to generate before outputting text.

Step 0: Install dependencies

pip install openai==1.3.2
pip install langchain>=0.0.341

Step 1: Imports

Copy the rest of the following code into a Python script called chatbot.py.

from langchain.callbacks.base import BaseCallbackHandler
from langchain.chat_models import ChatAnyscale
from langchain.memory import ChatMessageHistory

from queue import Queue
from threading import Thread

Step 2: Create a streaming callback handler

Define a custom BaseCallbackHandler to handle streaming callbacks from LangChain.

class StreamingCBH(BaseCallbackHandler):
def __init__(self, q):
self.q = q

def on_llm_new_token(self, token, *, run_id, parent_run_id=None, **kwargs) -> None:
# When a new token is generated by the language model, put it in the queue.
self.q.put(token)

def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs):
# When the language model finishes generating the response, put the end marker in the queue.
self.q.put(INPUTMARKER_END)

Step 3: Define the chat agent

Create a LangchainChatAgent class that manages the chat history, handles the user input, and streams the chatbot's response.

class LangchainChatAgent():
def __init__(self, model: str = None):
# Initialize the message history and the language model with the provided Anyscale API key.
self.message_history = ChatMessageHistory()
self.model = model
self.llm = ChatAnyscale(anyscale_api_base=ANYSCALE_BASE_URL, anyscale_api_key=ANYSCALE_API_KEY, temperature=0, model_name=self.model, streaming=True)

def process_input(self, user_message: str):
# Add the user message to the history and prepare a queue for the model's response.
self.message_history.add_user_message(user_message)
q = Queue()

# Start a new thread to predict messages from the language model.
thread = Thread(target=self.llm.predict_messages, kwargs={
'messages': self.message_history.messages,
'callbacks': [StreamingCBH(q)]
})
thread.start()
ai_message = ''
while True:
# Collect tokens from the queue until the end marker is received.
token = q.get()
if token == INPUTMARKER_END:
break
ai_message += token
yield token

# Add the complete AI-generated message to the history.
self.message_history.add_ai_message(ai_message)

Step 4: Set up the interaction loop

Set a marker that indicates the end of input from the language model. This marker helps in determining when to stop reading from the stream.

INPUTMARKER_END = "-- END --"

For a quick demo, you can paste in your API key, but for development, follow best practices for setting your API base and key.

ANYSCALE_BASE_URL = "ANYSCALE_ENDPOINT_BASE_URL"
ANYSCALE_API_KEY = "ANYSCALE_ENDPOINT_KEY"

Create the agent for your deployed model and set the interaction loop.

# Replace the model string with the model of your choice.
model_name = "meta-llama/Llama-2-70b-chat-hf"
agent = LangchainChatAgent(model_name)

print("Let's have a chat. Enter `quit` to exit.")
while True:
user_input = input('> ')
if user_input.lower() == 'quit':
break
for response_part in agent.process_input(user_input):
print(response_part, end='')

Step 5: Run the chatbot

Run the script in a terminal with python chatbot.py to have a chat with your bot. The streaming callbacks allow you see the response token by token.