166 lines
6.3 KiB
Python
166 lines
6.3 KiB
Python
"""
|
|
Tests for the CyberArk CCP client.
|
|
|
|
All tests use httpx.MockTransport to avoid real network calls.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from mcp_privileged.cyberark.client import (
|
|
CyberArkCCPClient,
|
|
CyberArkError,
|
|
Credential,
|
|
)
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _ok_response(username: str = "svc_account", password: str = "S3cr3tP@ss") -> dict:
|
|
return {
|
|
"Content": password,
|
|
"UserName": username,
|
|
"Address": "db.internal",
|
|
"Safe": "PROD-DB",
|
|
"Folder": "Root",
|
|
"Name": "PROD-DB-svc_account",
|
|
"PlatformID": "Oracle",
|
|
"PasswordChangeInProcess": "False",
|
|
}
|
|
|
|
|
|
def _error_response(code: str, msg: str) -> dict:
|
|
return {"ErrorCode": code, "ErrorMsg": msg}
|
|
|
|
|
|
class _MockTransport(httpx.AsyncBaseTransport):
|
|
"""Simple mock transport that returns a pre-set response."""
|
|
|
|
def __init__(self, status_code: int, body: dict) -> None:
|
|
self._status = status_code
|
|
self._body = body
|
|
|
|
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(
|
|
self._status,
|
|
headers={"content-type": "application/json"},
|
|
content=json.dumps(self._body).encode(),
|
|
request=request,
|
|
)
|
|
|
|
|
|
def _client_with_transport(transport: httpx.AsyncBaseTransport) -> CyberArkCCPClient:
|
|
"""Create a CyberArkCCPClient with a mock transport pre-injected."""
|
|
client = CyberArkCCPClient()
|
|
client._http = httpx.AsyncClient(transport=transport)
|
|
return client
|
|
|
|
|
|
# ── Tests ─────────────────────────────────────────────────────────────────────
|
|
|
|
async def test_get_credential_success() -> None:
|
|
transport = _MockTransport(200, _ok_response())
|
|
client = _client_with_transport(transport)
|
|
|
|
cred = await client.get_credential(
|
|
app_id="MyApp", safe="PROD-DB", object_name="PROD-DB-svc_account"
|
|
)
|
|
|
|
assert isinstance(cred, Credential)
|
|
assert cred.username == "svc_account"
|
|
assert cred.password == "S3cr3tP@ss"
|
|
assert cred.address == "db.internal"
|
|
assert cred.platform_id == "Oracle"
|
|
assert cred.password_change_in_process is False
|
|
|
|
|
|
async def test_get_credential_not_found_raises() -> None:
|
|
transport = _MockTransport(404, _error_response("APPAP007E", "Credential object not found"))
|
|
client = _client_with_transport(transport)
|
|
|
|
with pytest.raises(CyberArkError) as exc_info:
|
|
await client.get_credential(app_id="MyApp", safe="PROD-DB", object_name="missing")
|
|
|
|
assert exc_info.value.error_code == "APPAP007E"
|
|
assert exc_info.value.status_code == 404
|
|
|
|
|
|
async def test_get_credential_auth_failure_raises() -> None:
|
|
transport = _MockTransport(403, _error_response("APPAP006E", "Authentication failure"))
|
|
client = _client_with_transport(transport)
|
|
|
|
with pytest.raises(CyberArkError) as exc_info:
|
|
await client.get_credential(app_id="BadApp", safe="PROD-DB", object_name="obj")
|
|
|
|
assert exc_info.value.error_code == "APPAP006E"
|
|
assert exc_info.value.status_code == 403
|
|
|
|
|
|
async def test_get_credential_unknown_error_code() -> None:
|
|
"""Unknown error codes should still raise CyberArkError with the raw message."""
|
|
transport = _MockTransport(500, _error_response("ZZZZZ999E", "Unexpected internal error"))
|
|
client = _client_with_transport(transport)
|
|
|
|
with pytest.raises(CyberArkError) as exc_info:
|
|
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
|
|
|
|
assert "Unexpected internal error" in str(exc_info.value)
|
|
|
|
|
|
async def test_get_credential_non_json_body() -> None:
|
|
"""Non-JSON 500 responses should still raise a CyberArkError."""
|
|
class _HtmlTransport(httpx.AsyncBaseTransport):
|
|
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
|
|
return httpx.Response(500, content=b"<html>Internal Server Error</html>", request=request)
|
|
|
|
client = _client_with_transport(_HtmlTransport())
|
|
with pytest.raises(CyberArkError) as exc_info:
|
|
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
|
|
assert exc_info.value.status_code == 500
|
|
|
|
|
|
async def test_connect_error_raises() -> None:
|
|
class _FailTransport(httpx.AsyncBaseTransport):
|
|
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
|
|
raise httpx.ConnectError("Connection refused")
|
|
|
|
client = _client_with_transport(_FailTransport())
|
|
with pytest.raises(CyberArkError, match="Cannot reach CCP"):
|
|
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
|
|
|
|
|
|
async def test_timeout_raises() -> None:
|
|
class _TimeoutTransport(httpx.AsyncBaseTransport):
|
|
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
|
|
raise httpx.ReadTimeout("Timed out")
|
|
|
|
client = _client_with_transport(_TimeoutTransport())
|
|
with pytest.raises(CyberArkError, match="timed out"):
|
|
await client.get_credential(app_id="MyApp", safe="S", object_name="O")
|
|
|
|
|
|
async def test_assert_started_raises_if_not_started() -> None:
|
|
client = CyberArkCCPClient()
|
|
with pytest.raises(RuntimeError, match="not been started"):
|
|
await client.get_credential(app_id="A", safe="S", object_name="O")
|
|
|
|
|
|
async def test_list_safes_raises_not_implemented() -> None:
|
|
client = _client_with_transport(_MockTransport(200, {}))
|
|
with pytest.raises(NotImplementedError):
|
|
await client.list_safes("MyApp")
|
|
|
|
|
|
async def test_password_not_in_error_message() -> None:
|
|
"""Ensure passwords are never leaked into exception messages."""
|
|
transport = _MockTransport(200, _ok_response(password="SuperSecret123"))
|
|
client = _client_with_transport(transport)
|
|
cred = await client.get_credential(app_id="A", safe="S", object_name="O")
|
|
assert cred.password == "SuperSecret123"
|
|
# The Credential dataclass itself is fine, but error paths must not include it
|
|
# (no error raised here — just confirming the happy path returns it correctly
|
|
# and the password doesn't appear in repr of the transport or request)
|