Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/acp/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .interfaces import Agent, Client

__all__ = [
"DEFAULT_STDIO_BUFFER_LIMIT_BYTES",
"Agent",
"AgentSideConnection",
"Client",
Expand All @@ -28,13 +29,19 @@
"run_agent",
]

# Default to 50MB for agent/client data transfer.
# The original stdio_streams default is 64KB, which is not large
# enough for multimodal use-cases.
DEFAULT_STDIO_BUFFER_LIMIT_BYTES = 50 * 1024 * 1024


async def run_agent(
agent: Agent,
input_stream: Any = None,
output_stream: Any = None,
*,
use_unstable_protocol: bool = False,
stdio_buffer_limit_bytes: int = DEFAULT_STDIO_BUFFER_LIMIT_BYTES,
**connection_kwargs: Any,
) -> None:
"""Run an ACP agent over the given input/output streams.
Expand All @@ -53,7 +60,7 @@ async def run_agent(
from .stdio import stdio_streams

if input_stream is None and output_stream is None:
output_stream, input_stream = await stdio_streams()
output_stream, input_stream = await stdio_streams(limit=stdio_buffer_limit_bytes)
conn = AgentSideConnection(
agent,
input_stream,
Expand Down
59 changes: 59 additions & 0 deletions tests/real_user/test_stdio_limits.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import os
import subprocess
import sys
import tempfile
import textwrap

import pytest
Expand Down Expand Up @@ -39,3 +42,59 @@ async def test_spawn_stdio_transport_custom_limit_handles_large_line() -> None:
) as (reader, _writer, _process):
line = await reader.readline()
assert len(line) == LARGE_LINE_SIZE + 1


@pytest.mark.asyncio
async def test_run_agent_stdio_buffer_limit() -> None:
"""Test that run_agent with different buffer limits can handle appropriately sized messages."""
with tempfile.TemporaryDirectory() as tmpdir:
# Test 1: Small buffer (1KB) fails with large message (70KB)
small_agent = os.path.join(tmpdir, "small_agent.py")
with open(small_agent, "w") as f:
f.write("""
import asyncio
from acp.core import run_agent
from acp.interfaces import Agent

class TestAgent(Agent):
async def list_capabilities(self):
return {"capabilities": {}}

asyncio.run(run_agent(TestAgent(), stdio_buffer_limit_bytes=1024))
""")

# Send a 70KB message - should fail with 1KB buffer
large_msg = '{"jsonrpc":"2.0","method":"test","params":{"data":"' + "X" * LARGE_LINE_SIZE + '"}}\n'
result = subprocess.run( # noqa: S603
[sys.executable, small_agent], input=large_msg, capture_output=True, text=True, timeout=2
)

# Should have errors in stderr about the buffer limit
assert "Error" in result.stderr or result.returncode != 0, (
f"Expected error with small buffer, got: {result.stderr}"
)

# Test 2: Large buffer (200KB) succeeds with large message (70KB)
large_agent = os.path.join(tmpdir, "large_agent.py")
with open(large_agent, "w") as f:
f.write(f"""
import asyncio
from acp.core import run_agent
from acp.interfaces import Agent

class TestAgent(Agent):
async def list_capabilities(self):
return {{"capabilities": {{}}}}

asyncio.run(run_agent(TestAgent(), stdio_buffer_limit_bytes={LARGE_LINE_SIZE * 3}))
""")

# Same message, but with a buffer 3x the size - should handle it
result = subprocess.run( # noqa: S603
[sys.executable, large_agent], input=large_msg, capture_output=True, text=True, timeout=2
)

# With a large enough buffer, the agent should at least start successfully
# (it may have other errors from invalid JSON-RPC, but not buffer overrun)
if "LimitOverrunError" in result.stderr or "buffer" in result.stderr.lower():
pytest.fail(f"Large buffer still hit limit error: {result.stderr}")