Skip to content

Agent-to-Agent Protocol (A2A)

Overview

The Agent-to-Agent Protocol (A2A) enables seamless communication and collaboration between autonomous agents within the IBM watsonx Orchestrate platform. This implementation uses the official A2A 0.3.0 specification with the a2a-server framework to provide standards-compliant agent integration.

Purpose

A2A solves critical challenges in multi-agent systems:

  • Standardized Communication: JSON-RPC 2.0 based messaging protocol
  • Agent Discovery: Agents expose capabilities via agent cards
  • Task Management: Structured task lifecycle with state tracking
  • Streaming Updates: Real-time progress updates during task execution
  • IBM Orchestrate Integration: Native integration with watsonx Orchestrate

Architecture

graph TB
    subgraph "IBM watsonx Orchestrate"
        WO[Orchestrate Platform]
        UI[User Interface]
    end

    subgraph "A2A Agent Server"
        AC[Agent Card<br/>.well-known/agent-card.json]
        RH[Request Handler<br/>DefaultRequestHandler]
        AE[Agent Executor<br/>ShakespeareAgentExecutor]
        TS[Task Store<br/>InMemoryTaskStore]
        EQ[Event Queue<br/>Streaming Updates]
    end

    subgraph "RAG Agent Core"
        AG[A2A RAG Agent<br/>LangGraph Workflow]
        MCP[MCP Tool Client<br/>HTTP Client]
    end

    subgraph "Backend Services"
        MCPS[MCP Server<br/>FastAPI]
        MILVUS[Milvus<br/>Vector DB]
        WX[Watsonx.ai<br/>LLM + Embeddings]
    end

    UI --> WO
    WO -->|JSON-RPC 2.0| AC
    AC --> RH
    RH --> AE
    AE --> AG
    AE --> TS
    AE --> EQ
    EQ -->|Push Updates| WO

    AG --> MCP
    MCP -->|HTTP/REST| MCPS
    MCPS --> MILVUS
    MCPS --> WX

    style AC fill:#0f62fe
    style RH fill:#0f62fe
    style AE fill:#0f62fe
    style AG fill:#ff832b
    style MCP fill:#ff832b

Core Components

1. Agent Card

The agent card describes the agent's capabilities and is served at /.well-known/agent-card.json:

from a2a.types import AgentCard, AgentCapabilities, AgentSkill

def create_agent_card(settings: Settings, host: str, port: int) -> AgentCard:
    """Create the agent card describing agent capabilities."""
    capabilities = AgentCapabilities(
        streaming=False,
        push_notifications=False,
    )

    skill = AgentSkill(
        id='shakespeare_knowledge',
        name='Shakespeare Knowledge Base',
        description='Search and answer questions about Shakespeare\'s complete works',
        tags=['shakespeare', 'literature', 'plays', 'sonnets'],
        examples=[
            'Who is Hamlet?',
            'What are the main characters in Othello?',
            'Tell me about Romeo and Juliet',
        ],
    )

    agent_card = AgentCard(
        name=settings.a2a_agent_name,
        description=settings.a2a_agent_description,
        url=f'http://{host}:{port}/',
        version='1.0.0',
        default_input_modes=['text', 'text/plain'],
        default_output_modes=['text', 'text/plain'],
        capabilities=capabilities,
        skills=[skill],
    )

    return agent_card

2. Agent Executor

The executor handles incoming requests and manages task execution:

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskState

class ShakespeareAgentExecutor(AgentExecutor):
    """Shakespeare RAG Agent Executor for A2A Protocol."""

    def __init__(self):
        """Initialize the executor with the RAG agent."""
        settings = Settings()
        self.agent = A2ARAGAgent(settings)

    async def execute(
        self,
        context: RequestContext,
        event_queue: EventQueue,
    ) -> None:
        """Execute a query against the Shakespeare knowledge base."""
        # Extract query from user input
        query = context.get_user_input()

        # Get or create task
        task = context.current_task
        if not task:
            task = new_task(context.message)
            await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, task.id, task.context_id)

        # Update status to working
        await updater.update_status(
            TaskState.working,
            new_agent_text_message(
                "Searching Shakespeare's works...",
                task.context_id,
                task.id,
            ),
        )

        # Process query through RAG agent
        result = await self.agent.process_query(query)

        # Add artifact with the answer
        await updater.add_artifact(
            [Part(root=TextPart(text=result['response']))],
            name="shakespeare_answer",
        )

        # Mark task as complete
        await updater.complete()

3. LangGraph Workflow

The RAG agent uses LangGraph for workflow orchestration:

from langgraph.graph import StateGraph, END
from agent.state import AgentState

class A2ARAGAgent:
    """A2A-based RAG agent using LangGraph for orchestration."""

    def _build_graph(self) -> StateGraph:
        """Build the LangGraph workflow."""
        workflow = StateGraph(AgentState)

        # Add nodes
        workflow.add_node("process_input", self._process_input)
        workflow.add_node("retrieve_context", self._retrieve_context)
        workflow.add_node("generate_response", self._generate_response)
        workflow.add_node("handle_error", self._handle_error)

        # Set entry point
        workflow.set_entry_point("process_input")

        # Add conditional edges
        workflow.add_conditional_edges(
            "process_input",
            self._route_after_input,
            {"retrieve": "retrieve_context", "error": "handle_error"},
        )

        workflow.add_conditional_edges(
            "retrieve_context",
            self._route_after_retrieval,
            {"generate": "generate_response", "error": "handle_error"},
        )

        workflow.add_edge("generate_response", END)
        workflow.add_edge("handle_error", END)

        return workflow.compile()

4. MCP Tool Client

The agent communicates with the MCP server for RAG operations:

import httpx
from config.settings import Settings

class MCPToolClient:
    """Client for calling MCP RAG tools."""

    def __init__(self, settings: Settings):
        """Initialize MCP tool client."""
        self.settings = settings
        self.base_url = settings.get_mcp_server_url()
        self.client = httpx.AsyncClient(
            timeout=30.0,
            http2=False,
            follow_redirects=True,
        )

    async def rag_query(
        self,
        query: str,
        top_k: Optional[int] = None,
        include_sources: bool = True,
    ) -> Dict[str, Any]:
        """Call the rag_query MCP tool."""
        response = await self.client.post(
            f"{self.base_url}/tools/rag_query",
            json={
                "query": query,
                "top_k": top_k,
                "include_sources": include_sources,
            },
        )
        response.raise_for_status()
        return response.json()

Protocol Specification (A2A 0.3.0)

JSON-RPC 2.0 Request Format

All A2A communication uses JSON-RPC 2.0 over HTTP POST:

{
  "jsonrpc": "2.0",
  "id": "req-123",
  "method": "agent/execute",
  "params": {
    "message": {
      "role": "user",
      "parts": [
        {
          "text": "Who is Hamlet?"
        }
      ]
    },
    "context_id": "ctx-456"
  }
}

Agent Card Format

Served at /.well-known/agent-card.json:

{
  "name": "Shakespeare Knowledge Agent",
  "description": "RAG agent with complete works of Shakespeare",
  "url": "http://localhost:8001/",
  "version": "1.0.0",
  "default_input_modes": ["text", "text/plain"],
  "default_output_modes": ["text", "text/plain"],
  "capabilities": {
    "streaming": false,
    "push_notifications": false
  },
  "skills": [
    {
      "id": "shakespeare_knowledge",
      "name": "Shakespeare Knowledge Base",
      "description": "Search and answer questions about Shakespeare's complete works",
      "tags": ["shakespeare", "literature", "plays", "sonnets"],
      "examples": [
        "Who is Hamlet?",
        "What are the main characters in Othello?"
      ]
    }
  ]
}

Task State Updates

The agent sends task state updates via the event queue:

{
  "type": "task",
  "task": {
    "id": "task-789",
    "context_id": "ctx-456",
    "state": "working",
    "messages": [
      {
        "role": "agent",
        "parts": [
          {
            "text": "Searching Shakespeare's works for relevant information..."
          }
        ]
      }
    ]
  }
}

Task Completion with Artifacts

{
  "type": "task",
  "task": {
    "id": "task-789",
    "context_id": "ctx-456",
    "state": "completed",
    "artifacts": [
      {
        "name": "shakespeare_answer",
        "parts": [
          {
            "text": "Hamlet is the Prince of Denmark and the protagonist..."
          }
        ]
      }
    ]
  }
}

Communication Patterns

1. Request-Response

Synchronous communication for immediate responses:

# Send request and wait for response
response = a2a.request(
    to='translation-agent',
    action='translate',
    payload={
        'text': 'Hello, world!',
        'target_lang': 'es'
    },
    timeout=5000
)

print(response.payload['translated_text'])

2. Fire-and-Forget

Asynchronous communication without waiting for response:

# Send message without waiting
a2a.send(
    to='logging-agent',
    action='log_event',
    payload={
        'event': 'user_login',
        'user_id': 'user-123'
    }
)

3. Publish-Subscribe

Event-driven communication:

# Subscribe to events
a2a.subscribe_event(
    event_type='order.created',
    handler=handle_new_order
)

# Publish event
a2a.publish_event(
    event_type='order.created',
    payload={
        'order_id': 'ord-456',
        'customer_id': 'cust-789',
        'total': 99.99
    }
)

4. Request-Reply with Callback

Asynchronous request with callback:

# Send request with callback
a2a.request_async(
    to='data-processing-agent',
    action='process_large_dataset',
    payload={'dataset': 'large-data.csv'},
    callback=handle_processing_complete
)

def handle_processing_complete(response):
    print(f"Processing complete: {response.payload}")

5. Workflow Coordination

Multi-agent workflow coordination:

# Define workflow
workflow = a2a.create_workflow(
    name='order-fulfillment',
    steps=[
        {
            'agent': 'inventory-agent',
            'action': 'check_availability',
            'input': '${order.items}'
        },
        {
            'agent': 'payment-agent',
            'action': 'process_payment',
            'input': '${order.payment_info}',
            'depends_on': ['check_availability']
        },
        {
            'agent': 'shipping-agent',
            'action': 'create_shipment',
            'input': '${order.shipping_info}',
            'depends_on': ['process_payment']
        }
    ]
)

# Execute workflow
result = a2a.execute_workflow(
    workflow='order-fulfillment',
    input={'order': order_data}
)

Resources