331 lines
12 KiB
Python
331 lines
12 KiB
Python
"""
|
|
Integration tests — end-to-end flows across multiple MCP tools.
|
|
|
|
These tests verify that the FULL PIPELINE works:
|
|
CyberArk MCP → (handle) → SSH / PowerShell / DB MCP
|
|
|
|
They also serve as a learning resource for how MCP tools compose:
|
|
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ Claude (LLM) │
|
|
│ 1. Calls get_credential(safe, object_name) │
|
|
│ → receives "secret://abc123..." (handle only) │
|
|
│ 2. Calls ssh_execute(host, command, secret_handle=handle) │
|
|
│ → receives command output │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
|
|
At no point does Claude see the actual password.
|
|
The handle is an opaque token that binds a short TTL credential to one use.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from mcp_privileged.cyberark.client import CyberArkCCPClient, Credential
|
|
from mcp_privileged.cyberark.server import get_credential
|
|
from mcp_privileged.database.server import db_query
|
|
from mcp_privileged.powershell.server import ps_execute
|
|
from mcp_privileged.secret_store import secret_store
|
|
from mcp_privileged.ssh.server import ssh_execute
|
|
from tests.conftest import make_db_result, make_ps_result, make_ssh_cm
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _make_ctx(client_ip: str = "10.0.0.1") -> MagicMock:
|
|
ctx = MagicMock()
|
|
ctx.info = AsyncMock()
|
|
ctx.error = AsyncMock()
|
|
ctx.request_context.request.headers = {"X-Forwarded-For": client_ip}
|
|
ctx.request_context.request.client = None
|
|
return ctx
|
|
|
|
|
|
def _mock_cyberark_client(username: str, password: str, address: str = "db.internal"):
|
|
"""Patch the CyberArk CCP client to return a fixed credential."""
|
|
cred = Credential(
|
|
username=username,
|
|
password=password,
|
|
address=address,
|
|
safe="PROD-SAFE",
|
|
folder="Root",
|
|
object_name="PROD-DB-svc",
|
|
platform_id="UnixSSH",
|
|
password_change_in_process=False,
|
|
)
|
|
mock_client = MagicMock(spec=CyberArkCCPClient)
|
|
mock_client.get_credential = AsyncMock(return_value=cred)
|
|
mock_client._settings_app_id = lambda: "MCP-Privileged-Service"
|
|
return patch("mcp_privileged.cyberark.server.cyberark_client", mock_client)
|
|
|
|
|
|
# ── Full pipeline: CyberArk → SSH ─────────────────────────────────────────────
|
|
|
|
async def test_cyberark_to_ssh_full_pipeline() -> None:
|
|
"""
|
|
Simulate the complete CyberArk → SSH pipeline:
|
|
|
|
1. get_credential() fetches from CyberArk, stores in secret_store, returns handle.
|
|
2. ssh_execute() resolves the handle, uses the password to connect, returns output.
|
|
3. The password never appears in either tool's return value.
|
|
|
|
This is the primary privileged-access use case:
|
|
Claude: "Run `df -h` on linux01 using the PROD-LINUX credential"
|
|
"""
|
|
ctx_cyberark = _make_ctx("192.168.1.10")
|
|
ctx_ssh = _make_ctx("192.168.1.10")
|
|
|
|
# Step 1: Claude calls get_credential
|
|
with _mock_cyberark_client(username="root", password="SshSecret!"):
|
|
handle_response = await get_credential(
|
|
safe="PROD-SAFE",
|
|
object_name="PROD-LINUX-root",
|
|
ctx=ctx_cyberark,
|
|
)
|
|
|
|
# The LLM receives a handle string — NOT the password
|
|
assert "secret://" in handle_response
|
|
assert "SshSecret!" not in handle_response
|
|
|
|
# Extract the handle token from the formatted response text
|
|
handle = next(
|
|
line.split("Handle: ")[1]
|
|
for line in handle_response.splitlines()
|
|
if line.startswith("Handle: ")
|
|
)
|
|
|
|
# Step 2: Claude calls ssh_execute with the handle
|
|
mock_cm, _ = make_ssh_cm(stdout="/dev/sda1 50G 10G 40G 20% /\n", exit_status=0)
|
|
|
|
with patch("mcp_privileged.ssh.server.asyncssh.connect", return_value=mock_cm):
|
|
ssh_result = await ssh_execute(
|
|
host="linux01.internal",
|
|
command="df -h",
|
|
secret_handle=handle,
|
|
ctx=ctx_ssh,
|
|
)
|
|
|
|
assert "Exit code: 0" in ssh_result
|
|
assert "/dev/sda1" in ssh_result
|
|
assert "SshSecret!" not in ssh_result
|
|
|
|
|
|
async def test_cyberark_to_powershell_full_pipeline() -> None:
|
|
"""Simulate CyberArk → PowerShell pipeline."""
|
|
ctx_ca = _make_ctx()
|
|
ctx_ps = _make_ctx()
|
|
|
|
with _mock_cyberark_client(username="domain\\svc_ps", password="WinSecret!"):
|
|
handle_response = await get_credential(
|
|
safe="WIN-SAFE",
|
|
object_name="WIN-svc_ps",
|
|
ctx=ctx_ca,
|
|
)
|
|
|
|
assert "WinSecret!" not in handle_response
|
|
handle = next(
|
|
line.split("Handle: ")[1]
|
|
for line in handle_response.splitlines()
|
|
if line.startswith("Handle: ")
|
|
)
|
|
|
|
ps_result = make_ps_result(output=["WIN-SERVER-01"], had_errors=False)
|
|
|
|
with patch("mcp_privileged.powershell.server._run_ps_sync", return_value=ps_result):
|
|
ps_out = await ps_execute(
|
|
host="win01.internal",
|
|
script="hostname",
|
|
secret_handle=handle,
|
|
ctx=ctx_ps,
|
|
)
|
|
|
|
assert "Had errors: False" in ps_out
|
|
assert "WIN-SERVER-01" in ps_out
|
|
assert "WinSecret!" not in ps_out
|
|
|
|
|
|
async def test_cyberark_to_database_full_pipeline() -> None:
|
|
"""Simulate CyberArk → Database pipeline."""
|
|
ctx_ca = _make_ctx()
|
|
ctx_db = _make_ctx()
|
|
|
|
with _mock_cyberark_client(username="db_reader", password="DbSecret!"):
|
|
handle_response = await get_credential(
|
|
safe="DB-SAFE",
|
|
object_name="PROD-PG-reader",
|
|
ctx=ctx_ca,
|
|
)
|
|
|
|
assert "DbSecret!" not in handle_response
|
|
handle = next(
|
|
line.split("Handle: ")[1]
|
|
for line in handle_response.splitlines()
|
|
if line.startswith("Handle: ")
|
|
)
|
|
|
|
with patch(
|
|
"mcp_privileged.database.server._dispatch_query",
|
|
new=AsyncMock(return_value=make_db_result(["count"], [[42]])),
|
|
):
|
|
db_out = await db_query(
|
|
host="pg.internal",
|
|
database="prod",
|
|
query="SELECT COUNT(*) FROM users",
|
|
secret_handle=handle,
|
|
ctx=ctx_db,
|
|
db_type="postgres",
|
|
)
|
|
|
|
assert "42" in db_out
|
|
assert "DbSecret!" not in db_out
|
|
|
|
|
|
# ── Handle lifecycle ──────────────────────────────────────────────────────────
|
|
|
|
async def test_handle_single_use_enforced() -> None:
|
|
"""
|
|
A handle issued by get_credential can only be resolved ONCE
|
|
(when handle_single_use=True, which is the default).
|
|
|
|
This prevents credential replay attacks:
|
|
if an attacker intercepts the handle, it's already been consumed.
|
|
"""
|
|
ctx = _make_ctx()
|
|
mock_cm, _ = make_ssh_cm(stdout="ok\n", exit_status=0)
|
|
|
|
with _mock_cyberark_client(username="user", password="pass"):
|
|
handle_response = await get_credential(
|
|
safe="S", object_name="O", ctx=ctx
|
|
)
|
|
|
|
handle = next(
|
|
line.split("Handle: ")[1]
|
|
for line in handle_response.splitlines()
|
|
if line.startswith("Handle: ")
|
|
)
|
|
|
|
# First use — succeeds
|
|
with patch("mcp_privileged.ssh.server.asyncssh.connect", return_value=mock_cm):
|
|
await ssh_execute(
|
|
host="host1", command="id", secret_handle=handle, ctx=ctx
|
|
)
|
|
|
|
# Second use — same handle, should fail
|
|
with pytest.raises(KeyError, match="consumed|not found"):
|
|
await ssh_execute(
|
|
host="host1", command="id", secret_handle=handle, ctx=ctx
|
|
)
|
|
|
|
|
|
async def test_handle_cannot_be_shared_across_tools() -> None:
|
|
"""
|
|
A handle resolved by ssh_execute cannot then be reused by db_query.
|
|
One credential fetch = one privileged operation.
|
|
"""
|
|
ctx = _make_ctx()
|
|
mock_cm, _ = make_ssh_cm(stdout="ok\n", exit_status=0)
|
|
|
|
# Issue one handle
|
|
handle = await secret_store.store("user", "pass")
|
|
|
|
# SSH consumes it
|
|
with patch("mcp_privileged.ssh.server.asyncssh.connect", return_value=mock_cm):
|
|
await ssh_execute(
|
|
host="host1", command="id", secret_handle=handle, ctx=ctx
|
|
)
|
|
|
|
# DB tries to reuse it — must fail
|
|
with pytest.raises(KeyError):
|
|
await db_query(
|
|
host="pg.internal",
|
|
database="mydb",
|
|
query="SELECT 1",
|
|
secret_handle=handle,
|
|
ctx=ctx,
|
|
db_type="postgres",
|
|
)
|
|
|
|
|
|
async def test_expired_handle_rejected() -> None:
|
|
"""
|
|
A handle past its TTL is rejected even if not yet consumed.
|
|
We simulate expiry by manually backdating the entry's created_at.
|
|
"""
|
|
import time
|
|
|
|
handle = await secret_store.store("user", "pass")
|
|
handle_id = handle.split("://")[1]
|
|
|
|
# Backdate the entry so it looks expired
|
|
async with secret_store._lock:
|
|
entry = secret_store._store[handle_id]
|
|
entry.created_at = time.monotonic() - 99999 # very old
|
|
|
|
with pytest.raises(KeyError, match="expired"):
|
|
await secret_store.resolve(handle, resolved_by="test")
|
|
|
|
|
|
# ── Concurrent handle isolation ───────────────────────────────────────────────
|
|
|
|
async def test_concurrent_handles_are_independent() -> None:
|
|
"""
|
|
Multiple handles issued at the same time are independent.
|
|
Resolving one does not affect the others.
|
|
"""
|
|
handles = [await secret_store.store(f"user_{i}", f"pass_{i}") for i in range(5)]
|
|
|
|
# Resolve them in reverse order
|
|
results = []
|
|
for handle in reversed(handles):
|
|
username, password = await secret_store.resolve(handle, resolved_by="test")
|
|
results.append((username, password))
|
|
|
|
assert len(results) == 5
|
|
# Each (username, password) pair is unique
|
|
assert len(set(results)) == 5
|
|
|
|
|
|
# ── Audit trail ───────────────────────────────────────────────────────────────
|
|
|
|
async def test_audit_events_fired_for_ssh(mock_ctx) -> None:
|
|
"""
|
|
ssh_execute must call ctx.info() at least twice:
|
|
once for connection start, once for completion.
|
|
ctx.error must NOT be called on the happy path.
|
|
"""
|
|
handle = await secret_store.store("user", "pass")
|
|
mock_cm, _ = make_ssh_cm(stdout="ok\n", exit_status=0)
|
|
|
|
with patch("mcp_privileged.ssh.server.asyncssh.connect", return_value=mock_cm):
|
|
await ssh_execute(
|
|
host="host1", command="id", secret_handle=handle, ctx=mock_ctx
|
|
)
|
|
|
|
assert mock_ctx.info.await_count >= 2
|
|
mock_ctx.error.assert_not_awaited()
|
|
|
|
|
|
async def test_audit_events_fired_for_db(mock_ctx) -> None:
|
|
"""db_query must emit ctx.info on the happy path, not ctx.error."""
|
|
handle = await secret_store.store("user", "pass")
|
|
|
|
with patch(
|
|
"mcp_privileged.database.server._dispatch_query",
|
|
new=AsyncMock(return_value=make_db_result(["v"], [[1]])),
|
|
):
|
|
await db_query(
|
|
host="pg.internal",
|
|
database="mydb",
|
|
query="SELECT 1",
|
|
secret_handle=handle,
|
|
ctx=mock_ctx,
|
|
db_type="postgres",
|
|
)
|
|
|
|
assert mock_ctx.info.await_count >= 2
|
|
mock_ctx.error.assert_not_awaited()
|