Preporato
NCP-AAINVIDIAAgentic AI

Error Handling and Resilience Patterns for Agentic AI Systems: NCP-AAI Guide

Preporato TeamDecember 10, 202510 min readNCP-AAI

In deterministic software, errors are exceptions—clearly defined failure states with predictable stack traces. In agentic AI systems, "errors" include hallucinations that return HTTP 200, tool calls that succeed technically but fail semantically, and reasoning chains that produce confident nonsense. Traditional try-catch blocks don't protect against these failure modes.

For NCP-AAI certification candidates, mastering error handling and resilience patterns is critical for building production-grade agentic AI systems. This guide covers the essential patterns, from basic retry logic to sophisticated circuit breakers and semantic fallback strategies.

The Unique Error Landscape of Agentic AI

Traditional vs Agentic Error Taxonomy

Error TypeTraditional SoftwareAgentic AI Systems
Syntax ErrorsCode won't compileLLM generates invalid JSON (common)
Runtime ErrorsNullPointerException, IndexErrorTool execution failures, API timeouts
Logic ErrorsWrong algorithmHallucinations, reasoning failures
Data ErrorsInvalid input formatContext window overflow, tokenization issues
Integration ErrorsAPI 500 errorsTool not found, schema mismatch
Resource ErrorsOut of memoryToken budget exhausted, rate limits
Semantic ErrorsN/A (doesn't exist)Factually incorrect but fluent responses

The last category—semantic errors—represents the hardest challenge. An agent can execute perfectly, consume 5,000 tokens, invoke three tools successfully, and still produce a response that's completely wrong.

Preparing for NCP-AAI? Practice with 455+ exam questions

Pattern 1: Retry with Exponential Backoff

Use Case: Transient failures (network timeouts, rate limits, temporary service outages)

Implementation:

import time
import random
from typing import Callable, TypeVar, Any
from functools import wraps

T = TypeVar('T')

def retry_with_backoff(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    exceptions: tuple = (Exception,)
):
    """Retry decorator with exponential backoff and jitter."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            delay = initial_delay

            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)

                except exceptions as e:
                    if attempt == max_retries - 1:
                        raise  # Final attempt failed, propagate

                    # Calculate next delay
                    delay = min(delay * exponential_base, max_delay)

                    # Add jitter to prevent thundering herd
                    if jitter:
                        delay = delay * (0.5 + random.random())

                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
                    time.sleep(delay)

            raise RuntimeError("Unreachable")  # Should never get here

        return wrapper
    return decorator

# Usage with LLM calls
@retry_with_backoff(max_retries=3, exceptions=(RateLimitError, TimeoutError))
def call_llm_with_retry(prompt: str) -> str:
    """Call LLM with automatic retry on rate limits."""
    response = llm_client.complete(prompt)
    return response.content

Configuration Guidelines:

  • Transient network errors: 3 retries, 1s initial delay
  • Rate limiting (429): 5 retries, 2s initial delay, max 60s
  • Model inference timeouts: 2 retries, 5s initial delay

Pattern 2: Circuit Breaker

Use Case: Prevent cascading failures when external services (APIs, databases, vector stores) become unhealthy

Implementation:

from enum import Enum
from datetime import datetime, timedelta
from threading import Lock

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    """Circuit breaker pattern for external dependencies."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self._lock = Lock()

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection."""
        with self._lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise CircuitBreakerOpenError(
                        f"Circuit breaker OPEN. Retry after {self.recovery_timeout}s"
                    )

        try:
            result = func(*args, **kwargs)

            # Success - reset if in half-open state
            with self._lock:
                if self.state == CircuitState.HALF_OPEN:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0

            return result

        except self.expected_exception as e:
            with self._lock:
                self.failure_count += 1
                self.last_failure_time = datetime.now()

                if self.failure_count >= self.failure_threshold:
                    self.state = CircuitState.OPEN

            raise

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt recovery."""
        return (
            self.last_failure_time is not None and
            datetime.now() - self.last_failure_time >= timedelta(seconds=self.recovery_timeout)
        )

class CircuitBreakerOpenError(Exception):
    """Raised when circuit breaker is open."""
    pass

# Usage with agent tools
vector_db_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=30,
    expected_exception=(ConnectionError, TimeoutError)
)

def retrieve_context(query: str) -> list[str]:
    """Retrieve context from vector DB with circuit breaker."""
    return vector_db_breaker.call(
        vector_db.search,
        query_embedding=embed(query),
        top_k=5
    )

Tool-Specific Circuit Breakers:

# Configure different breakers for different dependencies
tool_breakers = {
    "vector_search": CircuitBreaker(failure_threshold=3, recovery_timeout=30),
    "api_external": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
    "database_query": CircuitBreaker(failure_threshold=2, recovery_timeout=20),
}

def execute_tool_with_protection(tool_name: str, *args, **kwargs):
    """Execute tool with appropriate circuit breaker."""
    breaker = tool_breakers.get(tool_name)
    if breaker:
        return breaker.call(tools[tool_name].execute, *args, **kwargs)
    else:
        return tools[tool_name].execute(*args, **kwargs)

Pattern 3: Graceful Degradation with Fallback Strategies

Use Case: Maintain service availability when primary capabilities fail

Fallback Hierarchy

from typing import Optional, Callable, List
from dataclasses import dataclass

@dataclass
class FallbackStrategy:
    """Defines a fallback option."""
    name: str
    executor: Callable
    max_attempts: int = 1
    cost_multiplier: float = 1.0  # Relative cost vs primary

class FallbackChain:
    """Execute strategies in order until one succeeds."""

    def __init__(self, strategies: List[FallbackStrategy]):
        self.strategies = strategies

    def execute(self, *args, **kwargs) -> Any:
        """Try each strategy until success."""
        last_error = None

        for strategy in self.strategies:
            for attempt in range(strategy.max_attempts):
                try:
                    result = strategy.executor(*args, **kwargs)
                    print(f"✓ Success with strategy: {strategy.name}")
                    return result

                except Exception as e:
                    last_error = e
                    print(f"✗ Strategy '{strategy.name}' attempt {attempt + 1} failed: {e}")

        # All strategies exhausted
        raise FallbackExhaustedError(
            f"All fallback strategies failed. Last error: {last_error}"
        )

# Example: RAG with multiple fallback strategies
def rag_primary(query: str) -> str:
    """Primary RAG: Vector search + GPT-4 Turbo."""
    context = vector_db.search(embed(query), top_k=5)
    return llm_gpt4_turbo.generate(query, context)

def rag_fallback_cheaper_model(query: str) -> str:
    """Fallback 1: Same vector search, cheaper model."""
    context = vector_db.search(embed(query), top_k=5)
    return llm_gpt35.generate(query, context)

def rag_fallback_keyword_search(query: str) -> str:
    """Fallback 2: Keyword search instead of vector."""
    context = keyword_search(query, top_k=5)
    return llm_gpt4_turbo.generate(query, context)

def rag_fallback_no_context(query: str) -> str:
    """Fallback 3: Pure LLM, no retrieval."""
    return llm_gpt4_turbo.generate(query, context=[])

# Define fallback chain
rag_chain = FallbackChain([
    FallbackStrategy("primary_rag", rag_primary, max_attempts=2),
    FallbackStrategy("cheaper_model", rag_fallback_cheaper_model, max_attempts=2, cost_multiplier=0.1),
    FallbackStrategy("keyword_search", rag_fallback_keyword_search, max_attempts=1, cost_multiplier=0.8),
    FallbackStrategy("no_context", rag_fallback_no_context, max_attempts=1, cost_multiplier=0.3),
])

# Usage
response = rag_chain.execute(user_query)

Pattern 4: Semantic Validation & Self-Correction

Use Case: Detect and recover from hallucinations, reasoning errors, invalid outputs

Step 1: Output Validation

from pydantic import BaseModel, Field, validator
from typing import Literal

class AgentOutput(BaseModel):
    """Validated agent response."""
    answer: str = Field(..., min_length=10, max_length=2000)
    confidence: float = Field(..., ge=0.0, le=1.0)
    sources: list[str] = Field(default_factory=list)
    safety_check: Literal["safe", "unsafe"] = "safe"

    @validator("answer")
    def answer_not_refusal(cls, v):
        """Detect refusals disguised as answers."""
        refusal_patterns = [
            "I cannot", "I don't have access", "I'm unable to",
            "As an AI", "I don't know", "I cannot provide"
        ]
        if any(pattern in v for pattern in refusal_patterns):
            raise ValueError("Agent refused to answer")
        return v

    @validator("sources")
    def sources_not_empty_if_factual(cls, v, values):
        """Require sources for factual claims."""
        answer = values.get("answer", "")
        # Heuristic: long answers should cite sources
        if len(answer) > 200 and len(v) == 0:
            raise ValueError("Long answer requires sources")
        return v

def validated_agent_call(query: str) -> AgentOutput:
    """Call agent with output validation."""
    raw_response = agent.run(query)

    try:
        validated = AgentOutput(**raw_response)
        return validated
    except ValueError as e:
        raise ValidationError(f"Agent output validation failed: {e}")

Step 2: Hallucination Detection

def detect_hallucination(answer: str, sources: list[str]) -> tuple[bool, float]:
    """Detect if answer is grounded in sources."""

    # Method 1: Semantic similarity check
    answer_embedding = embed(answer)
    source_embeddings = [embed(s) for s in sources]

    max_similarity = max(
        cosine_similarity(answer_embedding, source_emb)
        for source_emb in source_embeddings
    )

    # Method 2: LLM-as-judge
    judge_prompt = f"""
    Evaluate if the ANSWER is fully supported by the SOURCES.

    ANSWER: {answer}

    SOURCES:
    {chr(10).join(f"[{i+1}] {s}" for i, s in enumerate(sources))}

    Is the answer supported? Reply with:
    - "YES" if fully supported
    - "PARTIAL" if partially supported
    - "NO" if not supported or hallucinated

    Confidence (0.0-1.0):
    """

    judge_response = llm_judge.complete(judge_prompt)

    is_hallucination = (
        max_similarity < 0.6 or  # Low semantic overlap
        "NO" in judge_response.upper()  # Judge says not supported
    )

    confidence = extract_confidence(judge_response)

    return is_hallucination, confidence

# Usage with auto-retry
def agent_with_hallucination_guard(query: str, max_attempts: int = 3) -> str:
    """Run agent with hallucination detection and retry."""

    for attempt in range(max_attempts):
        response = agent.run(query)

        is_hallucination, confidence = detect_hallucination(
            response["answer"],
            response["sources"]
        )

        if not is_hallucination:
            return response["answer"]

        print(f"Hallucination detected (confidence: {confidence:.2f}). Retrying...")

        # Retry with stronger grounding instruction
        agent.update_system_prompt(
            "You MUST cite sources for every factual claim. "
            "If unsure, say 'I don't have enough information.'"
        )

    raise HallucinationError("Agent hallucinated after 3 attempts")

Master These Concepts with Practice

Our NCP-AAI practice bundle includes:

  • 7 full practice exams (455+ questions)
  • Detailed explanations for every answer
  • Domain-by-domain performance tracking

30-day money-back guarantee

Pattern 5: Token Budget Management

Use Case: Prevent context window overflow, control costs

import tiktoken

class TokenBudgetManager:
    """Manage token budgets for agent interactions."""

    def __init__(
        self,
        model: str,
        max_prompt_tokens: int = 6000,
        max_completion_tokens: int = 2000,
        reserve_tokens: int = 500  # Safety margin
    ):
        self.encoding = tiktoken.encoding_for_model(model)
        self.max_prompt_tokens = max_prompt_tokens
        self.max_completion_tokens = max_completion_tokens
        self.reserve_tokens = reserve_tokens

    def count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoding.encode(text))

    def truncate_context(
        self,
        system_prompt: str,
        user_query: str,
        context_docs: list[str],
        conversation_history: list[dict]
    ) -> dict:
        """Truncate inputs to fit budget."""

        # Fixed costs (always included)
        system_tokens = self.count_tokens(system_prompt)
        query_tokens = self.count_tokens(user_query)
        fixed_tokens = system_tokens + query_tokens

        # Available budget for dynamic content
        available_budget = (
            self.max_prompt_tokens -
            fixed_tokens -
            self.reserve_tokens
        )

        if available_budget < 0:
            raise TokenBudgetError("Query exceeds maximum prompt size")

        # Allocate budget: 60% context, 40% history
        context_budget = int(available_budget * 0.6)
        history_budget = int(available_budget * 0.4)

        # Truncate context documents
        truncated_context = self._truncate_docs(context_docs, context_budget)

        # Truncate conversation history (keep recent messages)
        truncated_history = self._truncate_history(conversation_history, history_budget)

        return {
            "system_prompt": system_prompt,
            "user_query": user_query,
            "context": truncated_context,
            "history": truncated_history,
            "tokens_used": fixed_tokens + context_budget + history_budget
        }

    def _truncate_docs(self, docs: list[str], budget: int) -> list[str]:
        """Truncate document list to fit budget."""
        truncated = []
        tokens_used = 0

        for doc in docs:
            doc_tokens = self.count_tokens(doc)
            if tokens_used + doc_tokens <= budget:
                truncated.append(doc)
                tokens_used += doc_tokens
            else:
                # Partial doc inclusion
                remaining_budget = budget - tokens_used
                if remaining_budget > 100:  # Minimum useful size
                    partial_doc = self.encoding.decode(
                        self.encoding.encode(doc)[:remaining_budget]
                    )
                    truncated.append(partial_doc + "...")
                break

        return truncated

# Usage in RAG agent
budget_manager = TokenBudgetManager(model="gpt-4-turbo", max_prompt_tokens=8000)

def rag_agent_with_budget(query: str) -> str:
    """RAG agent with automatic token budget management."""

    # Retrieve more documents than we can use
    candidate_docs = vector_db.search(query, top_k=20)

    # Truncate to fit budget
    truncated_inputs = budget_manager.truncate_context(
        system_prompt=AGENT_SYSTEM_PROMPT,
        user_query=query,
        context_docs=candidate_docs,
        conversation_history=get_recent_history(limit=10)
    )

    # Generate with guaranteed fit
    response = llm.generate(
        system=truncated_inputs["system_prompt"],
        messages=truncated_inputs["history"],
        context=truncated_inputs["context"],
        query=truncated_inputs["user_query"],
        max_tokens=budget_manager.max_completion_tokens
    )

    return response

Pattern 6: Multi-Agent Consensus for Critical Decisions

Use Case: High-stakes decisions where errors are costly (medical diagnosis, financial advice, legal analysis)

from collections import Counter
from typing import List

def multi_agent_consensus(
    query: str,
    agents: List[Agent],
    min_agreement: float = 0.7
) -> str:
    """Run multiple agents and require consensus."""

    responses = []
    for agent in agents:
        try:
            response = agent.run(query)
            responses.append(response)
        except Exception as e:
            print(f"Agent {agent.name} failed: {e}")

    if len(responses) < 2:
        raise InsufficientResponsesError("Need at least 2 agent responses")

    # Check for consensus
    response_hashes = [hash_response(r) for r in responses]
    most_common = Counter(response_hashes).most_common(1)[0]
    agreement_rate = most_common[1] / len(responses)

    if agreement_rate >= min_agreement:
        # Consensus reached
        consensus_response = next(
            r for r in responses if hash_response(r) == most_common[0]
        )
        return consensus_response
    else:
        # No consensus - escalate to human
        raise ConsensusFailureError(
            f"Agents disagree ({agreement_rate:.1%} agreement). "
            f"Escalating to human review."
        )

def hash_response(response: str) -> int:
    """Hash response for consensus checking (semantic similarity)."""
    # In production: use embedding similarity instead of exact match
    return hash(response.lower().strip())

Production Checklist: Error Resilience

  • LLM calls: Retry with exponential backoff (3 attempts)
  • External APIs: Circuit breakers (5 failures = open for 60s)
  • Tool execution: Fallback strategies (primary → secondary → cached)
  • Context retrieval: Token budget management (truncate if needed)
  • Agent outputs: Schema validation (Pydantic models)
  • Hallucinations: LLM-as-judge detection with retry
  • Critical decisions: Multi-agent consensus (70% agreement threshold)
  • Monitoring: Alert on error rates >5%, fallback usage >20%

Practice for NCP-AAI Exam

Test your error handling knowledge with Preporato's NCP-AAI Practice Tests:

  • ✅ Retry logic scenario questions
  • ✅ Circuit breaker configuration exercises
  • ✅ Fallback strategy design challenges
  • ✅ Hallucination detection techniques
  • ✅ Production resilience patterns

Start practicing today and master production-grade error handling for agentic AI systems.

Conclusion

Error handling in agentic AI systems requires a fundamental shift from traditional software engineering patterns. Retries handle transient failures, circuit breakers prevent cascades, fallbacks maintain availability, semantic validation catches hallucinations, and token budgets prevent overflows. Together, these patterns form the foundation of resilient production AI agents.

For NCP-AAI certification, understanding when and how to apply each pattern—and recognizing their limitations—is essential for exam success and real-world system reliability.

Ready to Pass the NCP-AAI Exam?

Join thousands who passed with Preporato practice tests

Instant access30-day guaranteeUpdated monthly