In deterministic software, errors are exceptions—clearly defined failure states with predictable stack traces. In agentic AI systems, "errors" include hallucinations that return HTTP 200, tool calls that succeed technically but fail semantically, and reasoning chains that produce confident nonsense. Traditional try-catch blocks don't protect against these failure modes.
For NCP-AAI certification candidates, mastering error handling and resilience patterns is critical for building production-grade agentic AI systems. This guide covers the essential patterns, from basic retry logic to sophisticated circuit breakers and semantic fallback strategies.
The Unique Error Landscape of Agentic AI
Traditional vs Agentic Error Taxonomy
| Error Type | Traditional Software | Agentic AI Systems |
|---|---|---|
| Syntax Errors | Code won't compile | LLM generates invalid JSON (common) |
| Runtime Errors | NullPointerException, IndexError | Tool execution failures, API timeouts |
| Logic Errors | Wrong algorithm | Hallucinations, reasoning failures |
| Data Errors | Invalid input format | Context window overflow, tokenization issues |
| Integration Errors | API 500 errors | Tool not found, schema mismatch |
| Resource Errors | Out of memory | Token budget exhausted, rate limits |
| Semantic Errors | N/A (doesn't exist) | Factually incorrect but fluent responses |
The last category—semantic errors—represents the hardest challenge. An agent can execute perfectly, consume 5,000 tokens, invoke three tools successfully, and still produce a response that's completely wrong.
Preparing for NCP-AAI? Practice with 455+ exam questions
Pattern 1: Retry with Exponential Backoff
Use Case: Transient failures (network timeouts, rate limits, temporary service outages)
Implementation:
import time
import random
from typing import Callable, TypeVar, Any
from functools import wraps
T = TypeVar('T')
def retry_with_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
exceptions: tuple = (Exception,)
):
"""Retry decorator with exponential backoff and jitter."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries - 1:
raise # Final attempt failed, propagate
# Calculate next delay
delay = min(delay * exponential_base, max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
raise RuntimeError("Unreachable") # Should never get here
return wrapper
return decorator
# Usage with LLM calls
@retry_with_backoff(max_retries=3, exceptions=(RateLimitError, TimeoutError))
def call_llm_with_retry(prompt: str) -> str:
"""Call LLM with automatic retry on rate limits."""
response = llm_client.complete(prompt)
return response.content
Configuration Guidelines:
- Transient network errors: 3 retries, 1s initial delay
- Rate limiting (429): 5 retries, 2s initial delay, max 60s
- Model inference timeouts: 2 retries, 5s initial delay
Pattern 2: Circuit Breaker
Use Case: Prevent cascading failures when external services (APIs, databases, vector stores) become unhealthy
Implementation:
from enum import Enum
from datetime import datetime, timedelta
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Circuit breaker pattern for external dependencies."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self._lock = Lock()
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError(
f"Circuit breaker OPEN. Retry after {self.recovery_timeout}s"
)
try:
result = func(*args, **kwargs)
# Success - reset if in half-open state
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except self.expected_exception as e:
with self._lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt recovery."""
return (
self.last_failure_time is not None and
datetime.now() - self.last_failure_time >= timedelta(seconds=self.recovery_timeout)
)
class CircuitBreakerOpenError(Exception):
"""Raised when circuit breaker is open."""
pass
# Usage with agent tools
vector_db_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30,
expected_exception=(ConnectionError, TimeoutError)
)
def retrieve_context(query: str) -> list[str]:
"""Retrieve context from vector DB with circuit breaker."""
return vector_db_breaker.call(
vector_db.search,
query_embedding=embed(query),
top_k=5
)
Tool-Specific Circuit Breakers:
# Configure different breakers for different dependencies
tool_breakers = {
"vector_search": CircuitBreaker(failure_threshold=3, recovery_timeout=30),
"api_external": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
"database_query": CircuitBreaker(failure_threshold=2, recovery_timeout=20),
}
def execute_tool_with_protection(tool_name: str, *args, **kwargs):
"""Execute tool with appropriate circuit breaker."""
breaker = tool_breakers.get(tool_name)
if breaker:
return breaker.call(tools[tool_name].execute, *args, **kwargs)
else:
return tools[tool_name].execute(*args, **kwargs)
Pattern 3: Graceful Degradation with Fallback Strategies
Use Case: Maintain service availability when primary capabilities fail
Fallback Hierarchy
from typing import Optional, Callable, List
from dataclasses import dataclass
@dataclass
class FallbackStrategy:
"""Defines a fallback option."""
name: str
executor: Callable
max_attempts: int = 1
cost_multiplier: float = 1.0 # Relative cost vs primary
class FallbackChain:
"""Execute strategies in order until one succeeds."""
def __init__(self, strategies: List[FallbackStrategy]):
self.strategies = strategies
def execute(self, *args, **kwargs) -> Any:
"""Try each strategy until success."""
last_error = None
for strategy in self.strategies:
for attempt in range(strategy.max_attempts):
try:
result = strategy.executor(*args, **kwargs)
print(f"✓ Success with strategy: {strategy.name}")
return result
except Exception as e:
last_error = e
print(f"✗ Strategy '{strategy.name}' attempt {attempt + 1} failed: {e}")
# All strategies exhausted
raise FallbackExhaustedError(
f"All fallback strategies failed. Last error: {last_error}"
)
# Example: RAG with multiple fallback strategies
def rag_primary(query: str) -> str:
"""Primary RAG: Vector search + GPT-4 Turbo."""
context = vector_db.search(embed(query), top_k=5)
return llm_gpt4_turbo.generate(query, context)
def rag_fallback_cheaper_model(query: str) -> str:
"""Fallback 1: Same vector search, cheaper model."""
context = vector_db.search(embed(query), top_k=5)
return llm_gpt35.generate(query, context)
def rag_fallback_keyword_search(query: str) -> str:
"""Fallback 2: Keyword search instead of vector."""
context = keyword_search(query, top_k=5)
return llm_gpt4_turbo.generate(query, context)
def rag_fallback_no_context(query: str) -> str:
"""Fallback 3: Pure LLM, no retrieval."""
return llm_gpt4_turbo.generate(query, context=[])
# Define fallback chain
rag_chain = FallbackChain([
FallbackStrategy("primary_rag", rag_primary, max_attempts=2),
FallbackStrategy("cheaper_model", rag_fallback_cheaper_model, max_attempts=2, cost_multiplier=0.1),
FallbackStrategy("keyword_search", rag_fallback_keyword_search, max_attempts=1, cost_multiplier=0.8),
FallbackStrategy("no_context", rag_fallback_no_context, max_attempts=1, cost_multiplier=0.3),
])
# Usage
response = rag_chain.execute(user_query)
Pattern 4: Semantic Validation & Self-Correction
Use Case: Detect and recover from hallucinations, reasoning errors, invalid outputs
Step 1: Output Validation
from pydantic import BaseModel, Field, validator
from typing import Literal
class AgentOutput(BaseModel):
"""Validated agent response."""
answer: str = Field(..., min_length=10, max_length=2000)
confidence: float = Field(..., ge=0.0, le=1.0)
sources: list[str] = Field(default_factory=list)
safety_check: Literal["safe", "unsafe"] = "safe"
@validator("answer")
def answer_not_refusal(cls, v):
"""Detect refusals disguised as answers."""
refusal_patterns = [
"I cannot", "I don't have access", "I'm unable to",
"As an AI", "I don't know", "I cannot provide"
]
if any(pattern in v for pattern in refusal_patterns):
raise ValueError("Agent refused to answer")
return v
@validator("sources")
def sources_not_empty_if_factual(cls, v, values):
"""Require sources for factual claims."""
answer = values.get("answer", "")
# Heuristic: long answers should cite sources
if len(answer) > 200 and len(v) == 0:
raise ValueError("Long answer requires sources")
return v
def validated_agent_call(query: str) -> AgentOutput:
"""Call agent with output validation."""
raw_response = agent.run(query)
try:
validated = AgentOutput(**raw_response)
return validated
except ValueError as e:
raise ValidationError(f"Agent output validation failed: {e}")
Step 2: Hallucination Detection
def detect_hallucination(answer: str, sources: list[str]) -> tuple[bool, float]:
"""Detect if answer is grounded in sources."""
# Method 1: Semantic similarity check
answer_embedding = embed(answer)
source_embeddings = [embed(s) for s in sources]
max_similarity = max(
cosine_similarity(answer_embedding, source_emb)
for source_emb in source_embeddings
)
# Method 2: LLM-as-judge
judge_prompt = f"""
Evaluate if the ANSWER is fully supported by the SOURCES.
ANSWER: {answer}
SOURCES:
{chr(10).join(f"[{i+1}] {s}" for i, s in enumerate(sources))}
Is the answer supported? Reply with:
- "YES" if fully supported
- "PARTIAL" if partially supported
- "NO" if not supported or hallucinated
Confidence (0.0-1.0):
"""
judge_response = llm_judge.complete(judge_prompt)
is_hallucination = (
max_similarity < 0.6 or # Low semantic overlap
"NO" in judge_response.upper() # Judge says not supported
)
confidence = extract_confidence(judge_response)
return is_hallucination, confidence
# Usage with auto-retry
def agent_with_hallucination_guard(query: str, max_attempts: int = 3) -> str:
"""Run agent with hallucination detection and retry."""
for attempt in range(max_attempts):
response = agent.run(query)
is_hallucination, confidence = detect_hallucination(
response["answer"],
response["sources"]
)
if not is_hallucination:
return response["answer"]
print(f"Hallucination detected (confidence: {confidence:.2f}). Retrying...")
# Retry with stronger grounding instruction
agent.update_system_prompt(
"You MUST cite sources for every factual claim. "
"If unsure, say 'I don't have enough information.'"
)
raise HallucinationError("Agent hallucinated after 3 attempts")
Master These Concepts with Practice
Our NCP-AAI practice bundle includes:
- 7 full practice exams (455+ questions)
- Detailed explanations for every answer
- Domain-by-domain performance tracking
30-day money-back guarantee
Pattern 5: Token Budget Management
Use Case: Prevent context window overflow, control costs
import tiktoken
class TokenBudgetManager:
"""Manage token budgets for agent interactions."""
def __init__(
self,
model: str,
max_prompt_tokens: int = 6000,
max_completion_tokens: int = 2000,
reserve_tokens: int = 500 # Safety margin
):
self.encoding = tiktoken.encoding_for_model(model)
self.max_prompt_tokens = max_prompt_tokens
self.max_completion_tokens = max_completion_tokens
self.reserve_tokens = reserve_tokens
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoding.encode(text))
def truncate_context(
self,
system_prompt: str,
user_query: str,
context_docs: list[str],
conversation_history: list[dict]
) -> dict:
"""Truncate inputs to fit budget."""
# Fixed costs (always included)
system_tokens = self.count_tokens(system_prompt)
query_tokens = self.count_tokens(user_query)
fixed_tokens = system_tokens + query_tokens
# Available budget for dynamic content
available_budget = (
self.max_prompt_tokens -
fixed_tokens -
self.reserve_tokens
)
if available_budget < 0:
raise TokenBudgetError("Query exceeds maximum prompt size")
# Allocate budget: 60% context, 40% history
context_budget = int(available_budget * 0.6)
history_budget = int(available_budget * 0.4)
# Truncate context documents
truncated_context = self._truncate_docs(context_docs, context_budget)
# Truncate conversation history (keep recent messages)
truncated_history = self._truncate_history(conversation_history, history_budget)
return {
"system_prompt": system_prompt,
"user_query": user_query,
"context": truncated_context,
"history": truncated_history,
"tokens_used": fixed_tokens + context_budget + history_budget
}
def _truncate_docs(self, docs: list[str], budget: int) -> list[str]:
"""Truncate document list to fit budget."""
truncated = []
tokens_used = 0
for doc in docs:
doc_tokens = self.count_tokens(doc)
if tokens_used + doc_tokens <= budget:
truncated.append(doc)
tokens_used += doc_tokens
else:
# Partial doc inclusion
remaining_budget = budget - tokens_used
if remaining_budget > 100: # Minimum useful size
partial_doc = self.encoding.decode(
self.encoding.encode(doc)[:remaining_budget]
)
truncated.append(partial_doc + "...")
break
return truncated
# Usage in RAG agent
budget_manager = TokenBudgetManager(model="gpt-4-turbo", max_prompt_tokens=8000)
def rag_agent_with_budget(query: str) -> str:
"""RAG agent with automatic token budget management."""
# Retrieve more documents than we can use
candidate_docs = vector_db.search(query, top_k=20)
# Truncate to fit budget
truncated_inputs = budget_manager.truncate_context(
system_prompt=AGENT_SYSTEM_PROMPT,
user_query=query,
context_docs=candidate_docs,
conversation_history=get_recent_history(limit=10)
)
# Generate with guaranteed fit
response = llm.generate(
system=truncated_inputs["system_prompt"],
messages=truncated_inputs["history"],
context=truncated_inputs["context"],
query=truncated_inputs["user_query"],
max_tokens=budget_manager.max_completion_tokens
)
return response
Pattern 6: Multi-Agent Consensus for Critical Decisions
Use Case: High-stakes decisions where errors are costly (medical diagnosis, financial advice, legal analysis)
from collections import Counter
from typing import List
def multi_agent_consensus(
query: str,
agents: List[Agent],
min_agreement: float = 0.7
) -> str:
"""Run multiple agents and require consensus."""
responses = []
for agent in agents:
try:
response = agent.run(query)
responses.append(response)
except Exception as e:
print(f"Agent {agent.name} failed: {e}")
if len(responses) < 2:
raise InsufficientResponsesError("Need at least 2 agent responses")
# Check for consensus
response_hashes = [hash_response(r) for r in responses]
most_common = Counter(response_hashes).most_common(1)[0]
agreement_rate = most_common[1] / len(responses)
if agreement_rate >= min_agreement:
# Consensus reached
consensus_response = next(
r for r in responses if hash_response(r) == most_common[0]
)
return consensus_response
else:
# No consensus - escalate to human
raise ConsensusFailureError(
f"Agents disagree ({agreement_rate:.1%} agreement). "
f"Escalating to human review."
)
def hash_response(response: str) -> int:
"""Hash response for consensus checking (semantic similarity)."""
# In production: use embedding similarity instead of exact match
return hash(response.lower().strip())
Production Checklist: Error Resilience
- LLM calls: Retry with exponential backoff (3 attempts)
- External APIs: Circuit breakers (5 failures = open for 60s)
- Tool execution: Fallback strategies (primary → secondary → cached)
- Context retrieval: Token budget management (truncate if needed)
- Agent outputs: Schema validation (Pydantic models)
- Hallucinations: LLM-as-judge detection with retry
- Critical decisions: Multi-agent consensus (70% agreement threshold)
- Monitoring: Alert on error rates >5%, fallback usage >20%
Practice for NCP-AAI Exam
Test your error handling knowledge with Preporato's NCP-AAI Practice Tests:
- ✅ Retry logic scenario questions
- ✅ Circuit breaker configuration exercises
- ✅ Fallback strategy design challenges
- ✅ Hallucination detection techniques
- ✅ Production resilience patterns
Start practicing today and master production-grade error handling for agentic AI systems.
Conclusion
Error handling in agentic AI systems requires a fundamental shift from traditional software engineering patterns. Retries handle transient failures, circuit breakers prevent cascades, fallbacks maintain availability, semantic validation catches hallucinations, and token budgets prevent overflows. Together, these patterns form the foundation of resilient production AI agents.
For NCP-AAI certification, understanding when and how to apply each pattern—and recognizing their limitations—is essential for exam success and real-world system reliability.
Ready to Pass the NCP-AAI Exam?
Join thousands who passed with Preporato practice tests
