Files
MCP_CyberArk/tests/test_cyberark_client.py
2026-03-29 19:51:51 +02:00

166 lines
6.3 KiB
Python

"""
Tests for the CyberArk CCP client.
All tests use httpx.MockTransport to avoid real network calls.
"""
from __future__ import annotations
import json
import httpx
import pytest
from mcp_privileged.cyberark.client import (
CyberArkCCPClient,
CyberArkError,
Credential,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ok_response(username: str = "svc_account", password: str = "S3cr3tP@ss") -> dict:
return {
"Content": password,
"UserName": username,
"Address": "db.internal",
"Safe": "PROD-DB",
"Folder": "Root",
"Name": "PROD-DB-svc_account",
"PlatformID": "Oracle",
"PasswordChangeInProcess": "False",
}
def _error_response(code: str, msg: str) -> dict:
return {"ErrorCode": code, "ErrorMsg": msg}
class _MockTransport(httpx.AsyncBaseTransport):
"""Simple mock transport that returns a pre-set response."""
def __init__(self, status_code: int, body: dict) -> None:
self._status = status_code
self._body = body
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
return httpx.Response(
self._status,
headers={"content-type": "application/json"},
content=json.dumps(self._body).encode(),
request=request,
)
def _client_with_transport(transport: httpx.AsyncBaseTransport) -> CyberArkCCPClient:
"""Create a CyberArkCCPClient with a mock transport pre-injected."""
client = CyberArkCCPClient()
client._http = httpx.AsyncClient(transport=transport)
return client
# ── Tests ─────────────────────────────────────────────────────────────────────
async def test_get_credential_success() -> None:
transport = _MockTransport(200, _ok_response())
client = _client_with_transport(transport)
cred = await client.get_credential(
app_id="MyApp", safe="PROD-DB", object_name="PROD-DB-svc_account"
)
assert isinstance(cred, Credential)
assert cred.username == "svc_account"
assert cred.password == "S3cr3tP@ss"
assert cred.address == "db.internal"
assert cred.platform_id == "Oracle"
assert cred.password_change_in_process is False
async def test_get_credential_not_found_raises() -> None:
transport = _MockTransport(404, _error_response("APPAP007E", "Credential object not found"))
client = _client_with_transport(transport)
with pytest.raises(CyberArkError) as exc_info:
await client.get_credential(app_id="MyApp", safe="PROD-DB", object_name="missing")
assert exc_info.value.error_code == "APPAP007E"
assert exc_info.value.status_code == 404
async def test_get_credential_auth_failure_raises() -> None:
transport = _MockTransport(403, _error_response("APPAP006E", "Authentication failure"))
client = _client_with_transport(transport)
with pytest.raises(CyberArkError) as exc_info:
await client.get_credential(app_id="BadApp", safe="PROD-DB", object_name="obj")
assert exc_info.value.error_code == "APPAP006E"
assert exc_info.value.status_code == 403
async def test_get_credential_unknown_error_code() -> None:
"""Unknown error codes should still raise CyberArkError with the raw message."""
transport = _MockTransport(500, _error_response("ZZZZZ999E", "Unexpected internal error"))
client = _client_with_transport(transport)
with pytest.raises(CyberArkError) as exc_info:
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
assert "Unexpected internal error" in str(exc_info.value)
async def test_get_credential_non_json_body() -> None:
"""Non-JSON 500 responses should still raise a CyberArkError."""
class _HtmlTransport(httpx.AsyncBaseTransport):
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
return httpx.Response(500, content=b"<html>Internal Server Error</html>", request=request)
client = _client_with_transport(_HtmlTransport())
with pytest.raises(CyberArkError) as exc_info:
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
assert exc_info.value.status_code == 500
async def test_connect_error_raises() -> None:
class _FailTransport(httpx.AsyncBaseTransport):
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
raise httpx.ConnectError("Connection refused")
client = _client_with_transport(_FailTransport())
with pytest.raises(CyberArkError, match="Cannot reach CCP"):
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
async def test_timeout_raises() -> None:
class _TimeoutTransport(httpx.AsyncBaseTransport):
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
raise httpx.ReadTimeout("Timed out")
client = _client_with_transport(_TimeoutTransport())
with pytest.raises(CyberArkError, match="timed out"):
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
async def test_assert_started_raises_if_not_started() -> None:
client = CyberArkCCPClient()
with pytest.raises(RuntimeError, match="not been started"):
await client.get_credential(app_id="A", safe="S", object_name="O")
async def test_list_safes_raises_not_implemented() -> None:
client = _client_with_transport(_MockTransport(200, {}))
with pytest.raises(NotImplementedError):
await client.list_safes("MyApp")
async def test_password_not_in_error_message() -> None:
"""Ensure passwords are never leaked into exception messages."""
transport = _MockTransport(200, _ok_response(password="SuperSecret123"))
client = _client_with_transport(transport)
cred = await client.get_credential(app_id="A", safe="S", object_name="O")
assert cred.password == "SuperSecret123"
# The Credential dataclass itself is fine, but error paths must not include it
# (no error raised here — just confirming the happy path returns it correctly
# and the password doesn't appear in repr of the transport or request)