Skip to main content
Version: 1.0.0

LangChain integration example

Check your docs version

Anyscale is rolling out a new design. If you have preview access to the enhanced experience, use the latest version of the docs and see the migration guide for transitioning.

This example demonstrates the streaming capability of LangChain with Anyscale Private Endpoints. The resulting chatbot emulates real-time conversation by processing input and generating responses incrementally, token by token.

LangChain provides a callbacks system that allows your app to handle asynchronous events like streaming new tokens that form a chatbot's response. In this way, the callbacks system ensures that the chatbot can manage a continuous flow of data without waiting for the entire response to generate before outputting text.

Step 0: Install dependencies

pip install openai==1.3.2
pip install langchain>=0.0.341

Step 1: Imports

Copy the rest of the following code into a Python script called chatbot.py.

from langchain.callbacks.base import BaseCallbackHandler
from langchain.chat_models import ChatAnyscale
from langchain.memory import ChatMessageHistory

from queue import Queue
from threading import Thread

Step 2: Create a streaming callback handler

Define a custom BaseCallbackHandler to handle streaming callbacks from LangChain.

class StreamingCBH(BaseCallbackHandler):
def __init__(self, q):
self.q = q

def on_llm_new_token(self, token, *, run_id, parent_run_id=None, **kwargs) -> None:
# When a new token is generated by the language model, put it in the queue.
self.q.put(token)

def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs):
# When the language model finishes generating the response, put the end marker in the queue.
self.q.put(INPUTMARKER_END)

Step 3: Define the chat agent

Create a LangchainChatAgent class that manages the chat history, handles the user input, and streams the chatbot's response.

class LangchainChatAgent():
def __init__(self, model: str = None):
# Initialize the message history and the language model with the provided Anyscale API key.
self.message_history = ChatMessageHistory()
self.model = model
self.llm = ChatAnyscale(anyscale_api_base=ANYSCALE_BASE_URL, anyscale_api_key=ANYSCALE_API_KEY, temperature=0, model_name=self.model, streaming=True)

def process_input(self, user_message: str):
# Add the user message to the history and prepare a queue for the model's response.
self.message_history.add_user_message(user_message)
q = Queue()

# Start a new thread to predict messages from the language model.
thread = Thread(target=self.llm.predict_messages, kwargs={
'messages': self.message_history.messages,
'callbacks': [StreamingCBH(q)]
})
thread.start()
ai_message = ''
while True:
# Collect tokens from the queue until the end marker is received.
token = q.get()
if token == INPUTMARKER_END:
break
ai_message += token
yield token

# Add the complete AI-generated message to the history.
self.message_history.add_ai_message(ai_message)

Step 4: Set up the interaction loop

Set a marker that indicates the end of input from the language model. This marker helps in determining when to stop reading from the stream.

INPUTMARKER_END = "-- END --"

For a quick demo, you can paste in your API key, but for development, follow best practices for setting your API base and key.

ANYSCALE_BASE_URL = "ANYSCALE_ENDPOINT_BASE_URL"
ANYSCALE_API_KEY = "ANYSCALE_ENDPOINT_KEY"

Create the agent for your deployed model and set the interaction loop.

# Replace the model string with the model of your choice.
model_name = "meta-llama/Llama-2-70b-chat-hf"
agent = LangchainChatAgent(model_name)

print("Let's have a chat. Enter `quit` to exit.")
while True:
user_input = input('> ')
if user_input.lower() == 'quit':
break
for response_part in agent.process_input(user_input):
print(response_part, end='')

Step 5: Run the chatbot

Run the script in a terminal with python chatbot.py to have a chat with your bot. The streaming callbacks allow you see the response token by token.