Vai al contenuto principale
OpenData AI — progetto sperimentale

← Documentazione

LangGraph + langchain-mcp-adapters

LangGraph è la libreria di LangChain per orchestrare agenti come grafi di stato. Con langchain-mcp-adapters i tool MCP vengono caricati a runtime e diventano nodi ToolNode utilizzabili dal grafo.

Setup

pip install langgraph langchain-anthropic langchain-mcp-adapters

# I server MCP devono essere in ascolto (streamable-http):
#   TRANSPORT=streamable-http PORT=8080 ckan-mcp
#   TRANSPORT=streamable-http PORT=8081 istat-mcp
#   TRANSPORT=streamable-http PORT=8082 osm-mcp

Grafo minimale (singolo MCP)

MultiServerMCPClient carica i tool da uno o più server MCP; il grafo è il classico ReAct loop di LangGraph (create_react_agent).

import asyncio
from langchain_anthropic import ChatAnthropic
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

async def main() -> None:
    mcp = MultiServerMCPClient(
        {
            "ckan": {
                "transport": "streamable_http",
                "url": "http://localhost:8080/mcp",
            },
        }
    )
    tools = await mcp.get_tools()

    agent = create_react_agent(
        ChatAnthropic(model="claude-haiku-4-5"),
        tools,
        prompt=(
            "Sei un assistente per i portali CKAN italiani. "
            "Usa il tool package_search per cercare dataset su dati.gov.it."
        ),
    )
    out = await agent.ainvoke({
        "messages": [("user", "Dataset sulla qualità dell'aria a Milano")]
    })
    print(out["messages"][-1].content)

asyncio.run(main())

Multi-MCP (CKAN + ISTAT + OSM)

import asyncio
from langchain_anthropic import ChatAnthropic
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent

CONFIG = {
    "ckan":  {"transport": "streamable_http", "url": "http://localhost:8080/mcp"},
    "istat": {"transport": "streamable_http", "url": "http://localhost:8081/mcp"},
    "osm":   {"transport": "streamable_http", "url": "http://localhost:8082/mcp"},
}

PROMPT = (
    "Sei un agente per open data italiani ed europei.\n"
    "- usa il prefisso ckan_* per i portali CKAN (default dati.gov.it),\n"
    "- usa il prefisso istat_* per SDMX 2.1 (ISTAT, Eurostat e OCSE),\n"
    "- usa il prefisso osm_* per geocoding/POI/routing.\n"
    "Cita sempre la fonte di ogni risorsa."
)

async def main() -> None:
    mcp = MultiServerMCPClient(CONFIG)
    tools = await mcp.get_tools()

    agent = create_react_agent(
        ChatAnthropic(model="claude-sonnet-4-6"),
        tools,
        prompt=PROMPT,
    )
    out = await agent.ainvoke({
        "messages": [(
            "user",
            "Confronta la spesa sanitaria pro capite Italia vs Germania "
            "negli ultimi 5 anni e cita i dataflow Eurostat usati.",
        )],
    })
    for m in out["messages"]:
        print(m.type, ":", m.content[:300])

asyncio.run(main())

Streaming degli step

Per mostrare gli step intermedi (chiamate ai tool) usa astream_events:

async for event in agent.astream_events(
    {"messages": [("user", "qualità aria a Milano")]},
    version="v2",
):
    kind = event["event"]
    if kind == "on_tool_start":
        print(f"  ↳ tool {event['name']} input={event['data'].get('input')}")
    elif kind == "on_chat_model_stream":
        chunk = event["data"]["chunk"]
        if chunk.content:
            print(chunk.content, end="", flush=True)

Grafo personalizzato (oltre il ReAct)

Per controllare il routing tra fonti puoi costruire il grafo a mano. Esempio: un nodo “classifier” che decide CKAN vs ISTAT, e poi due rami paralleli.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode

class State(TypedDict):
    messages: Annotated[list, add_messages]
    next: str  # "ckan" | "istat" | "synth"

# ...definisci classifier(), synth() come funzioni che leggono state["messages"]
# e producono una nuova lista di messaggi.

graph = StateGraph(State)
graph.add_node("classifier", classifier)
graph.add_node("ckan_tools", ToolNode(ckan_tools))
graph.add_node("istat_tools", ToolNode(istat_tools))
graph.add_node("synth", synth)

graph.set_entry_point("classifier")
graph.add_conditional_edges(
    "classifier",
    lambda s: s["next"],
    {"ckan": "ckan_tools", "istat": "istat_tools"},
)
graph.add_edge("ckan_tools", "synth")
graph.add_edge("istat_tools", "synth")
graph.add_edge("synth", END)

app = graph.compile()

Note operative

  • Tool naming: i tool ricevono il prefisso del server nel grafo (es. ckan_package_search). Sfrutta il prefisso nel prompt per guidare l'agente.
  • Persistenza: per uno stato a lunga vita aggiungi un checkpointer (es. SQLite o Postgres) al compile() del grafo.
  • Provider: ChatAnthropic è un esempio. Funziona con ChatOpenAI, ChatOllama e qualunque chat-model compatibile LangChain con tool-calling.