Creating an AI-Agent-Based System with Long Graph: Adding Persistence and Streaming (Step by Step Guide)

In our former tutorial, we built an AI agent who was able to answer questions by surfing the web. But when building funds for longer ongoing tasks, two critical concepts come into play: endurance and Streaming. Persistence allows you to save an agent’s condition at any given time so you can resume from this condition in future interactions. This is crucial for long -term applications. On the other hand, streaming allows you to broadcast real -time signals about what the agent is doing at any time, providing transparency and control over its actions. In this tutorial, we improve our agent by adding these powerful features.

Setting up the agent

Let’s start by recreating our agent. We load the necessary environmental variables, install and import the required libraries, set up Tavily search tool, define the agent mode and eventually build the agent.

pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1
import os
os.environ['TAVILY_API_KEY'] = ""
os.environ['GROQ_API_KEY'] = ""

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.tools.tavily_search import TavilySearchResults

tool = TavilySearchResults(max_results=2)

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

class Agent:
    def __init__(self, model, tools, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile()
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

Adding Persistence

To add persistence we use Langgraph’s Control pointer function. A checkpoint saves the agent’s condition after and between each knot. To this tutorial we use SqlitesaverA simple checkpoint that utilizes sqlite, a built -in database. While using a memory database for simplicity, you can easily connect it to an external database or use other control points such as Redis or Postgres for more robust persistence.

from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect("checkpoints.sqlite",check_same_thread=False)
memory = SqliteSaver(sqlite_conn)

Next, we change our agent to accept a checkpoint:

class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        # Everything else remains the same as before
        self.graph = graph.compile(checkpointer=checkpointer)
    # Everything else after this remains the same

Now we can create our agent with the persistence activated:

prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow-up question, you are allowed to do that!
"""
model = ChatGroq(model="Llama-3.3-70b-Specdec")
bot = Agent(model, [tool], system=prompt, checkpointer=memory)

Adding streaming

Streaming is important for real -time updates. There are two types of streaming we will focus on:

1. Streaming messages: To send out intermediate messages such as AI decisions and tool results.

2. Streaming tokens: Streaming individual tokens from LLM’s answer.
Let’s start streaming messages. We create a human message and use current method of observing the agent’s actions in real time.

messages = [HumanMessage(content="What is the weather in Texas?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

Final Output: The current weather in Texas is sunshine with a temperature of 19.4 ° C (66.9 ° F) and a wind speed of 4.3 mph (6.8 km / h)… ..

When you run this you see a stream of results. First, an AI message that instructs the agent to call Tavily, followed by a tool announcement with the search results, and finally an AI message answering the question.

Understanding of thread -ids

The Thread_id is an important part of the wire configuration. It allows the agent to maintain separate conversations with different users or contexts. By assigning a unique thread_id for each conversation, the agent can keep track of multiple interactions simultaneously without mixing them up.

For example, let’s continue the conversation by asking, “How about in LA?” Using the same thread_id:

messages = [HumanMessage(content="What about in LA?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

Final Output: The current weather in Los Angeles is sunshine with a temperature of 17.2 ° C (63.0 ° F) and a wind speed of 2.2 km / h (3.6 km / h)….

The agent gives us asking for the weather thanks to persistence. Let’s ask, “Which one is hotter?”:

messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

Final Output: Texas is warmer than Los Angeles. The current temperature in Texas is 19.4 ° C (66.9 ° F), while the current temperature in Los Angeles is 17.2 ° C (63.0 ° F)

The agent correctly compares the weather in Texas and LA. To test if Persistence holds conversations separately, let’s ask the same question with another Thread_id:

messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

Output: I need more information to answer that question. Could you please give more context or specify which two things you compare?

This time the agent gets confused because it does not have access to the previous conversation story.

Streaming tokens

To stream tokens we use Astream_EVENTS Method which is asynchronous. We also switch to an async control pointer.

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    messages = [HumanMessage(content="What is the weather in SF?")]
    thread = {"configurable": {"thread_id": "4"}}
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                # Empty content in the context of OpenAI means
                # that the model is asking for a tool to be invoked.
                # So we only print non-empty content
                print(content, end="|")

This streams tokens in real time, giving you a vivid view of the agent’s thought process.

Conclusion

By adding persistence and streaming, we have significantly improved the capacity of our AI agent. Persistence allows the agent to maintain context across interactions, while streaming provides real -time insight into its actions. These features are important for building production-ready applications, especially those involving multiple users or human-in-the-loop interactions.

In the next tutorial we dive down into Human-in-the-loop interactionsWhere Persistence plays a crucial role in enabling seamless cooperation between people and AI agents. Keep an eye on!

References:

  1. (DEEPLEARNING.AI) https://learn.deeplearning.ai/courses/ai-Agents-in-langgraph

Nor do not forget to follow us on Twitter and join in our Telegram Channel and LinkedIn GrOUP. Don’t forget to take part in our 75k+ ml subbreddit.

🚨 Meet Intellagent: An Open Source Multi-Agent framework to evaluate complex conversation AI system (Promoted)


Vineet Kumar is a consulting intern at MarkTechpost. He is currently pursuing his BS from the Indian Institute of Technology (IIT), Kanpur. He is a machine learning enthusiast. He is passionate about research and the latest progress in deep learning, computer vision and related areas.

✅ [Recommended] Join our Telegram -Canal

Leave a Comment