LangChain Tools Explained: How LLMs Take Real-World Actions
LLMs are staggeringly good at reasoning, but they're frozen in time and isolated by default. GPT-4 doesn't know what the stock market did this morning. It can't send an email on your behalf or query your Postgres database. For a production AI application, that's a dealbreaker — users don't want a chatbot that apologizes for not knowing current information, they want one that goes and finds it.
LangChain Tools solve this by giving an LLM a structured vocabulary of actions it's allowed to take. When the model decides it needs external information or needs to perform a side effect, it emits a structured call to a named tool with specific arguments. LangChain intercepts that call, executes the real function, and feeds the result back into the model's context. The model never 'reaches out' itself — it's always the orchestration layer doing the actual work, which is critical for security and control.
By the end of this article you'll understand how Tools are represented internally, how the LLM decides which tool to call and when, how to write production-grade custom tools with proper error handling and validation, how to use ToolExecutor in multi-agent graphs with LangGraph, and the exact gotchas that will burn you in production if you skip this article.
How LangChain Tools Actually Work Under the Hood
A LangChain Tool is not magic — it's a Python callable wrapped in a metadata contract. That contract has three mandatory fields: a name (a short snake_case identifier the LLM uses to invoke it), a description (the natural-language prompt that tells the LLM WHEN and WHY to use this tool), and an args_schema (a Pydantic model that enforces what arguments are valid). That's the entire surface area. Everything else is implementation.
When you bind tools to a chat model using .bind_tools(), LangChain serializes those Pydantic schemas into JSON Schema and injects them into the system prompt or into the model's tools parameter (depending on the provider). The LLM sees a list of callable 'functions' in its context window. When it decides to use one, it returns an AIMessage with a tool_calls attribute — a list of structured dicts containing the tool name and arguments. Crucially, the LLM does NOT execute anything. It just declares intent.
The ToolExecutor (or a LangGraph node) picks up those tool_calls, routes each one to the matching Python function, runs it, wraps the result in a ToolMessage, and appends it back to the conversation history. The model then reads that ToolMessage and continues reasoning. This request-execute-observe loop is the entire foundation of ReAct-style agents.
The description field is more important than most developers realize. It IS the tool's API documentation for the LLM. A vague description causes the model to call the wrong tool, call it with wrong arguments, or hallucinate that a tool exists. Treat descriptions like you'd treat a well-written docstring that a new engineer has to act on without asking questions.
from langchain_core.tools import tool from langchain_openai import ChatOpenAI from pydantic import BaseModel, Field import json # --- Step 1: Define a strict input schema with Pydantic --- # This schema becomes JSON Schema that gets sent to the LLM. # Field descriptions are part of what the LLM reads to understand how to call this. class StockLookupInput(BaseModel): ticker: str = Field( description="The stock ticker symbol in uppercase, e.g. 'AAPL' or 'MSFT'" ) metric: str = Field( description="The metric to retrieve. One of: 'price', 'pe_ratio', 'market_cap'" ) # --- Step 2: Decorate with @tool and provide a rich description --- # The description is the LLM's only guide for WHEN to use this. # Be specific: tell it what it returns and when NOT to use it. @tool(args_schema=StockLookupInput) def get_stock_metric(ticker: str, metric: str) -> str: """ Retrieve a real-time financial metric for a publicly traded stock. Use this when the user asks about current stock prices, P/E ratios, or market capitalisation. Do NOT use this for historical data or crypto. Returns a formatted string with the value and currency where applicable. """ # Simulated data store — in production this would call a financial API mock_data = { "AAPL": {"price": "$189.43", "pe_ratio": "31.2", "market_cap": "$2.94T"}, "MSFT": {"price": "$415.61", "pe_ratio": "36.8", "market_cap": "$3.08T"}, } ticker = ticker.upper() if ticker not in mock_data: # Return a clear error string — never raise inside a tool unless you want # the agent to crash. Return errors as strings so the LLM can reason about them. return f"Ticker '{ticker}' not found. Available tickers: AAPL, MSFT" if metric not in mock_data[ticker]: return f"Metric '{metric}' is invalid. Valid options: price, pe_ratio, market_cap" return f"{ticker} {metric}: {mock_data[ticker][metric]}" # --- Step 3: Inspect what the LLM actually sees --- # This is the JSON Schema injected into the model's context. print("=== Tool Name ===") print(get_stock_metric.name) # 'get_stock_metric' print("\n=== Tool Description (what LLM reads) ===") print(get_stock_metric.description) print("\n=== Args Schema (sent as JSON Schema to the model) ===") print(json.dumps(get_stock_metric.args_schema.model_json_schema(), indent=2)) # --- Step 4: Bind tool to model and observe the raw tool_call output --- llm = ChatOpenAI(model="gpt-4o", temperature=0) llm_with_tools = llm.bind_tools([get_stock_metric]) response = llm_with_tools.invoke("What's Apple's current P/E ratio?") print("\n=== AIMessage tool_calls (raw LLM output — no execution yet) ===") for tc in response.tool_calls: print(json.dumps(tc, indent=2)) # --- Step 5: Manually execute the tool call --- # In production an agent loop or LangGraph node does this automatically. tool_call = response.tool_calls[0] result = get_stock_metric.invoke(tool_call["args"]) print("\n=== Tool Execution Result ===") print(result)
get_stock_metric
=== Tool Description (what LLM reads) ===
Retrieve a real-time financial metric for a publicly traded stock.
Use this when the user asks about current stock prices, P/E ratios,
or market capitalisation. Do NOT use this for historical data or crypto.
Returns a formatted string with the value and currency where applicable.
=== Args Schema (sent as JSON Schema to the model) ===
{
"properties": {
"ticker": {
"description": "The stock ticker symbol in uppercase, e.g. 'AAPL' or 'MSFT'",
"title": "Ticker",
"type": "string"
},
"metric": {
"description": "The metric to retrieve. One of: 'price', 'pe_ratio', 'market_cap'",
"title": "Metric",
"type": "string"
}
},
"required": ["ticker", "metric"],
"title": "StockLookupInput",
"type": "object"
}
=== AIMessage tool_calls (raw LLM output — no execution yet) ===
{
"name": "get_stock_metric",
"args": {
"ticker": "AAPL",
"metric": "pe_ratio"
},
"id": "call_abc123xyz",
"type": "tool_call"
}
=== Tool Execution Result ===
APPL pe_ratio: 31.2
Building Production-Grade Custom Tools with Validation and Error Handling
The @tool decorator is convenient for simple cases, but in production you'll want BaseTool subclasses. They give you explicit control over sync vs async execution, fine-grained error handling via handle_tool_error, and the ability to inject dependencies (like database sessions or API clients) at construction time rather than using module-level globals.
The key architectural decision is: should your tool raise exceptions or return error strings? The answer depends on your agent architecture. In a simple ReAct loop, returning a descriptive error string lets the LLM reason about the failure and potentially retry with different arguments — which is usually what you want. Raising an exception bubbles up and typically terminates the agent run unless you've configured handle_tool_error=True on the executor.
Dependency injection into tools is something most tutorials skip entirely, and it's where production systems diverge from toy examples. You almost never want API keys or database connections defined at module scope inside a tool. Instead, pass them into the tool's __init__ and store them as instance attributes. This makes your tools testable (you can inject mocks), configurable per-tenant, and avoids the subtle bug where a module is imported once and caches stale credentials.
Another critical pattern is idempotency awareness. If your tool sends an email or writes to a database, you need to understand that agents can and do call tools multiple times — either due to retry logic, parallel tool calls, or the model second-guessing itself. Design write operations to be idempotent or add deduplication logic at the tool level.
from langchain_core.tools import BaseTool from pydantic import BaseModel, Field, field_validator from typing import Optional, Type import httpx import hashlib import time # --- Input schema with built-in validation --- class WeatherQueryInput(BaseModel): city: str = Field(description="City name to get weather for, e.g. 'London' or 'Tokyo'") units: str = Field( default="celsius", description="Temperature units: 'celsius' or 'fahrenheit'" ) # Pydantic v2 validator — catches bad input BEFORE the tool runs @field_validator("units") @classmethod def validate_units(cls, value: str) -> str: allowed = {"celsius", "fahrenheit"} if value.lower() not in allowed: raise ValueError(f"Units must be one of {allowed}, got '{value}'") return value.lower() @field_validator("city") @classmethod def validate_city_name(cls, value: str) -> str: # Prevent prompt injection via city name if len(value) > 100 or not value.replace(" ", "").replace("-", "").isalpha(): raise ValueError("City name contains invalid characters") return value.strip().title() # --- Production BaseTool with dependency injection --- class WeatherTool(BaseTool): name: str = "get_current_weather" description: str = ( "Retrieve current weather conditions for a city. " "Use this when the user asks about today's weather, temperature, " "or current conditions in a specific city. " "Returns temperature and a brief condition description. " "Do NOT use for forecasts — only current conditions." ) args_schema: Type[BaseModel] = WeatherQueryInput # Injected dependencies — passed at construction, not module-scope globals api_key: str base_url: str = "https://api.openweathermap.org/data/2.5" request_timeout: int = 5 # Simple in-memory deduplication to prevent redundant API calls # In production, use Redis with TTL instead _call_cache: dict = {} def _make_cache_key(self, city: str, units: str) -> str: """Cache key includes a time bucket so we don't serve stale data""" # Round to nearest 10-minute window for caching time_bucket = int(time.time() / 600) raw = f"{city}:{units}:{time_bucket}" return hashlib.md5(raw.encode()).hexdigest() def _run(self, city: str, units: str = "celsius") -> str: """ Synchronous execution path. LangChain calls this when the tool is invoked in a non-async context. """ cache_key = self._make_cache_key(city, units) if cache_key in self._call_cache: print(f"[WeatherTool] Cache HIT for {city}") return self._call_cache[cache_key] # Map our friendly unit names to the API's expected values api_units = "metric" if units == "celsius" else "imperial" unit_symbol = "°C" if units == "celsius" else "°F" try: response = httpx.get( f"{self.base_url}/weather", params={"q": city, "appid": self.api_key, "units": api_units}, timeout=self.request_timeout, ) # Don't raise on 404 — return a useful string so the LLM can handle it if response.status_code == 404: return f"City '{city}' not found. Check the spelling and try again." response.raise_for_status() # Raise on 5xx errors data = response.json() temp = data["main"]["temp"] condition = data["weather"][0]["description"] humidity = data["main"]["humidity"] result = f"{city}: {temp}{unit_symbol}, {condition}, humidity {humidity}%" # Cache the successful result self._call_cache[cache_key] = result return result except httpx.TimeoutException: # Return recoverable error — LLM can retry or inform user return f"Weather service timed out after {self.request_timeout}s. Try again shortly." except httpx.HTTPStatusError as exc: return f"Weather API error (HTTP {exc.response.status_code}). Service may be down." async def _arun(self, city: str, units: str = "celsius") -> str: """ Async execution path — called when the agent runs in an async context. Always implement both _run and _arun in production tools. Using httpx.AsyncClient here for true non-blocking IO. """ cache_key = self._make_cache_key(city, units) if cache_key in self._call_cache: return self._call_cache[cache_key] api_units = "metric" if units == "celsius" else "imperial" unit_symbol = "°C" if units == "celsius" else "°F" async with httpx.AsyncClient(timeout=self.request_timeout) as client: try: response = await client.get( f"{self.base_url}/weather", params={"q": city, "appid": self.api_key, "units": api_units}, ) if response.status_code == 404: return f"City '{city}' not found." response.raise_for_status() data = response.json() temp = data["main"]["temp"] condition = data["weather"][0]["description"] humidity = data["main"]["humidity"] result = f"{city}: {temp}{unit_symbol}, {condition}, humidity {humidity}%" self._call_cache[cache_key] = result return result except httpx.TimeoutException: return "Weather service timed out. Please retry." # --- Constructing the tool with injected config --- # In a real app, api_key comes from environment variables or a secrets manager weather_tool = WeatherTool( api_key="your-openweathermap-api-key", request_timeout=8 ) # --- Direct invocation test (no LLM needed) --- result = weather_tool.invoke({"city": "London", "units": "celsius"}) print(result) # --- Test validation catches bad input cleanly --- try: weather_tool.invoke({"city": "London", "units": "kelvin"}) except Exception as e: print(f"Validation caught: {e}")
Validation caught: 1 validation error for WeatherQueryInput
units
Value error, Units must be one of {'celsius', 'fahrenheit'}, got 'kelvin' [type=value_error, ...]
ToolExecutor in LangGraph: Wiring Tools Into a Real Agent Loop
LangGraph replaced the legacy AgentExecutor as the recommended way to build agents with LangChain, and the reason is control. AgentExecutor was a black box — hard to debug, hard to add conditional logic, and nearly impossible to add human-in-the-loop approval steps. LangGraph makes the agent loop an explicit, inspectable graph where you define exactly what happens at each node.
The core pattern is a two-node graph: a model_node that calls the LLM with tools bound, and a tools_node that executes whatever tool calls the model requested. A conditional edge between them asks: 'Did the model output any tool calls?' If yes, route to the tools node. If no (meaning the model produced a final answer), route to END. That loop is the entire agent.
ToolNode (from langgraph.prebuilt) handles the boilerplate of extracting tool_calls from the last AIMessage, routing each call to the correct tool by name, running them (optionally in parallel), and wrapping results in ToolMessage objects. The messages_modifier pattern means all of this state flows through a single messages key in your graph state, making the full conversation history trivially inspectable at any point.
The real power comes when you need to break out of the simple loop: you can add a human_approval_node before the tools node that pauses execution and waits for a user confirmation before running destructive operations. You can add a retry_node that detects tool error strings and re-prompts the model differently. You can add a max_iterations counter in your graph state to prevent infinite loops — something AgentExecutor handled poorly.
from typing import Annotated, Sequence from langchain_core.messages import BaseMessage, HumanMessage, AIMessage from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode from pydantic import BaseModel import operator # --- Define the tools our agent can use --- @tool def search_product_catalog(query: str) -> str: """ Search the internal product catalog for items matching a description. Use this when the user asks what products we carry or wants to find a specific item. Returns product names, IDs, and prices. """ # Simulated catalog — real version would query a vector store or DB catalog = [ {"id": "P001", "name": "Wireless Keyboard", "price": 79.99}, {"id": "P002", "name": "Ergonomic Mouse", "price": 49.99}, {"id": "P003", "name": "USB-C Hub", "price": 34.99}, ] query_lower = query.lower() matches = [ f"{p['name']} (ID: {p['id']}, ${p['price']})" for p in catalog if any(word in p["name"].lower() for word in query_lower.split()) ] if not matches: return f"No products found matching '{query}'. Try broader search terms." return "Found: " + ", ".join(matches) @tool def check_inventory(product_id: str) -> str: """ Check real-time stock levels for a product by its ID. Use this AFTER searching the catalog to get a product ID. Returns stock count and estimated restock date if out of stock. """ inventory = { "P001": {"stock": 143, "restock": None}, "P002": {"stock": 0, "restock": "2025-09-15"}, "P003": {"stock": 27, "restock": None}, } product_id = product_id.upper() if product_id not in inventory: return f"Product ID '{product_id}' not recognised in inventory system." item = inventory[product_id] if item["stock"] == 0: return f"{product_id}: OUT OF STOCK. Expected restock: {item['restock']}" return f"{product_id}: {item['stock']} units in stock." # Collect all tools — ToolNode needs this list to route calls by name available_tools = [search_product_catalog, check_inventory] # --- Define graph state --- # Annotated with add_messages reducer: new messages are APPENDED, not overwritten # This is critical — without the reducer you'd lose conversation history on each step class AgentState(BaseModel): messages: Annotated[Sequence[BaseMessage], add_messages] iteration_count: int = 0 # Guard against infinite loops MAX_ITERATIONS: int = 10 # Class constant baked into state type class Config: arbitrary_types_allowed = True # --- Wire up LLM with tools --- catalog_llm = ChatOpenAI(model="gpt-4o", temperature=0) catalog_llm_with_tools = catalog_llm.bind_tools(available_tools) # --- Node 1: Model node — calls the LLM and returns its response --- def call_model_node(state: AgentState) -> dict: """ This node calls the LLM. It reads the full message history from state and returns the new AIMessage. LangGraph merges this with existing messages via the add_messages reducer. """ current_iteration = state.iteration_count + 1 print(f"[Agent] Model call — iteration {current_iteration}") # Safety valve: if we've looped too many times, force a stop if current_iteration >= state.MAX_ITERATIONS: from langchain_core.messages import AIMessage forced_stop = AIMessage( content="I've reached the maximum number of steps. " "Here's what I found so far based on available information." ) return {"messages": [forced_stop], "iteration_count": current_iteration} response = catalog_llm_with_tools.invoke(state.messages) return {"messages": [response], "iteration_count": current_iteration} # --- Node 2: ToolNode — executes all tool_calls from the last AIMessage --- # ToolNode handles parallel execution automatically when multiple tools are called tool_executor_node = ToolNode(tools=available_tools) # --- Routing function: decide whether to use a tool or end --- def should_continue_routing(state: AgentState) -> str: """ Conditional edge function. Inspects the last message: - If it has tool_calls -> route to tools node - Otherwise -> route to END (agent has its final answer) """ last_message = state.messages[-1] # isinstance check ensures we're looking at an AIMessage, not a ToolMessage if isinstance(last_message, AIMessage) and last_message.tool_calls: print(f"[Router] Routing to tools: {[tc['name'] for tc in last_message.tool_calls]}") return "use_tools" print("[Router] No tool calls — agent finished") return "end" # --- Build the graph --- agent_graph = StateGraph(AgentState) # Register nodes agent_graph.add_node("model", call_model_node) agent_graph.add_node("tools", tool_executor_node) # Set entry point agent_graph.set_entry_point("model") # Conditional edge from model: either call tools or finish agent_graph.add_conditional_edges( "model", should_continue_routing, { "use_tools": "tools", # route name -> node name "end": END, }, ) # After tools execute, always go back to model to process results agent_graph.add_edge("tools", "model") # Compile into a runnable runnable_agent = agent_graph.compile() # --- Run the agent --- print("=== STARTING AGENT ===") initial_state = {"messages": [HumanMessage(content="Do you have any keyboards in stock?")]} final_state = runnable_agent.invoke(initial_state) print("\n=== FINAL ANSWER ===") print(final_state["messages"][-1].content) print(f"Total iterations: {final_state['iteration_count']}")
[Agent] Model call — iteration 1
[Router] Routing to tools: ['search_product_catalog']
[Agent] Model call — iteration 2
[Router] Routing to tools: ['check_inventory']
[Agent] Model call — iteration 3
[Router] No tool calls — agent finished
=== FINAL ANSWER ===
Yes! We carry a Wireless Keyboard (ID: P001, priced at $79.99), and it's well-stocked with 143 units available. Would you like to place an order?
Total iterations: 3
Parallel Tool Calls, Streaming, and Performance at Scale
Modern models (GPT-4o, Claude 3.5, Gemini 1.5 Pro) can emit multiple tool calls in a single response. This is a huge performance win — if the model needs both the weather in London AND today's news headlines, it can request both simultaneously instead of waiting for the first result before requesting the second. LangChain's ToolNode executes parallel tool calls using asyncio.gather under the hood when running in async mode, so your async tool implementations genuinely matter here.
Streaming tool results is a less-discussed but production-critical feature. In a chat UI, users stare at a blank screen while the agent thinks and calls tools. LangChain's streaming API lets you stream both the model's reasoning tokens AND tool execution events as they happen. The .astream_events() method on a compiled LangGraph emits a stream of typed events: on_chat_model_stream for token-by-token LLM output, on_tool_start when a tool begins executing, and on_tool_end when it returns — all as async generator events your UI can consume in real time.
For production deployments, the most expensive operation in a tool-heavy agent is often not the LLM itself but the aggregate latency of sequential tool calls. Profile your agents: if tool calls are sequential when they could be parallel, restructure your prompts to encourage the model to batch its requests. If a single tool call is slow (>2s), it will dominate your p95 latency. Add caching at the tool level (as shown above) and consider prefetching common tool results at session start.
import asyncio from langchain_core.tools import tool from langchain_core.messages import HumanMessage from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent import time # --- Two slow tools that should run in PARALLEL, not sequentially --- @tool async def fetch_user_profile(user_id: str) -> str: """ Fetch a user's profile from the user service by their ID. Returns name, email, and account tier. Use when personalising responses. """ await asyncio.sleep(0.8) # Simulates 800ms database query profiles = { "user_42": {"name": "Sarah Chen", "email": "sarah@example.com", "tier": "premium"}, "user_99": {"name": "James Okafor", "email": "james@example.com", "tier": "free"}, } if user_id not in profiles: return f"User '{user_id}' not found" p = profiles[user_id] return f"Name: {p['name']}, Email: {p['email']}, Tier: {p['tier']}" @tool async def fetch_recent_orders(user_id: str) -> str: """ Fetch the 3 most recent orders for a user. Returns order IDs, items, and total amounts. Use when user asks about their order history or past purchases. """ await asyncio.sleep(1.0) # Simulates 1s API call to order service orders = { "user_42": [ {"id": "ORD-8821", "item": "Wireless Keyboard", "total": 79.99}, {"id": "ORD-7703", "item": "USB-C Hub", "total": 34.99}, ], "user_99": [{"id": "ORD-9011", "item": "Ergonomic Mouse", "total": 49.99}], } if user_id not in orders: return f"No orders found for user '{user_id}'" order_list = ", ".join( f"{o['id']}: {o['item']} (${o['total']})" for o in orders[user_id] ) return f"Recent orders: {order_list}" async def demonstrate_parallel_execution(): """ With parallel tool calling, both fetch_user_profile and fetch_recent_orders run concurrently. Total time should be ~1s (the slower tool), not ~1.8s (sum). """ parallel_tools = [fetch_user_profile, fetch_recent_orders] agent_llm = ChatOpenAI(model="gpt-4o", temperature=0) # create_react_agent is a convenience wrapper that builds the LangGraph pattern # we built manually above — useful for standard use cases react_agent = create_react_agent(agent_llm, parallel_tools) user_query = ( "Give me a personalised summary for user_42. " "I need their profile AND their recent orders." ) start_time = time.perf_counter() result = await react_agent.ainvoke( {"messages": [HumanMessage(content=user_query)]} ) elapsed = time.perf_counter() - start_time print(f"Execution time: {elapsed:.2f}s") print(f"(If tools ran sequentially this would be ~1.8s — parallel saves ~0.8s)") print("\nFinal response:") print(result["messages"][-1].content) async def demonstrate_streaming_events(): """ Stream agent events to show the user what's happening in real time. This is what you'd use to build a live 'thinking...' UI indicator. """ streaming_tools = [fetch_user_profile] stream_llm = ChatOpenAI(model="gpt-4o", temperature=0) streaming_agent = create_react_agent(stream_llm, streaming_tools) print("\n=== STREAMING EVENTS ===") async for event in streaming_agent.astream_events( {"messages": [HumanMessage(content="Look up profile for user_42")]}, version="v2", # v2 is the current stable events API ): event_kind = event["event"] if event_kind == "on_chat_model_stream": # Stream LLM token output as it arrives token = event["data"]["chunk"].content if token: # Filter empty chunks print(token, end="", flush=True) elif event_kind == "on_tool_start": # Notify UI that a tool is being called print(f"\n[UI INDICATOR] Calling tool: {event['name']}...") elif event_kind == "on_tool_end": # Tool finished — you could update a progress indicator print(f"[UI INDICATOR] Tool '{event['name']}' completed") async def main(): await demonstrate_parallel_execution() await demonstrate_streaming_events() asyncio.run(main())
(If tools ran sequentially this would be ~1.8s — parallel saves ~0.8s)
Final response:
Here's the summary for Sarah Chen (user_42):
- Account Tier: Premium
- Email: sarah@example.com
- Recent Orders: ORD-8821 (Wireless Keyboard, $79.99), ORD-7703 (USB-C Hub, $34.99)
As a premium member, Sarah is eligible for priority support and free shipping on all orders.
=== STREAMING EVENTS ===
[UI INDICATOR] Calling tool: fetch_user_profile...
[UI INDICATOR] Tool 'fetch_user_profile' completed
Based on the profile data, Sarah Chen (user_42) is a premium tier member...
🎯 Key Takeaways
Written and reviewed by senior developers with real-world experience across enterprise, startup and open-source projects. Every article on TheCodeForge is written to be clear, accurate and genuinely useful — not just SEO filler.