Files
MCP_CyberArk/docs/LLD.md
2026-03-29 19:51:51 +02:00

32 KiB
Raw Permalink Blame History

Low-Level Design

MCP Privileged Access Service

Version: 1.0 Date: 2026-03-28 Status: Production-ready


Table of Contents

  1. Module Structure
  2. Foundation Modules
    • 2.1 config.py
    • 2.2 secret_store.py
    • 2.3 auth.py
    • 2.4 audit.py
    • 2.5 main.py
  3. CyberArk MCP
  4. SSH MCP
  5. PowerShell MCP
  6. Database MCP
  7. MCP Tool API Reference
  8. Data Models
  9. Configuration Reference
  10. Error Handling Matrix
  11. Audit Event Catalog
  12. Test Strategy

1. Module Structure

src/mcp_privileged/
├── __init__.py
├── config.py           ← All settings; read once at import time
├── secret_store.py     ← In-RAM handle store + background sweeper
├── auth.py             ← API key middleware
├── audit.py            ← Structured log helpers
├── main.py             ← FastAPI app assembly + lifespan
│
├── cyberark/
│   ├── __init__.py
│   ├── client.py       ← CCP REST client (httpx)
│   └── server.py       ← FastMCP server; get_credential, list_safes tools
│
├── ssh/
│   ├── __init__.py
│   └── server.py       ← FastMCP server; ssh_execute tool
│
├── powershell/
│   ├── __init__.py
│   └── server.py       ← FastMCP server; ps_execute tool
│
└── database/
    ├── __init__.py
    └── server.py       ← FastMCP server; db_query tool

Dependency graph (no cycles):

main.py
  ├── config.py         (leaf)
  ├── audit.py          ← config.py
  ├── auth.py           ← config.py, audit.py
  ├── secret_store.py   ← config.py, audit.py
  ├── cyberark/
  │   ├── client.py     ← config.py, audit.py
  │   └── server.py     ← cyberark/client.py, secret_store.py, audit.py
  ├── ssh/server.py     ← config.py, secret_store.py, audit.py
  ├── powershell/server.py ← config.py, secret_store.py, audit.py
  └── database/server.py   ← config.py, secret_store.py, audit.py

2. Foundation Modules

2.1 config.py

Class: Settings(BaseSettings) Singleton: settings = Settings() — imported everywhere as from mcp_privileged.config import settings

The settings object is created once when the module is first imported. If any required value is missing or invalid, pydantic raises a ValidationError at startup — fail fast.

Settings groups

Service

Setting Env var Default Type Description
mcp_host MCP_HOST 0.0.0.0 str Bind address for uvicorn
mcp_port MCP_PORT 8443 int Listen port
mcp_api_keys_raw MCP_API_KEYS (required) str Comma-separated API keys — access via the mcp_api_keys property

Secret Handle Store

Setting Env var Default Type Description
handle_ttl_seconds HANDLE_TTL_SECONDS 300 int (303600) Handle expiry
handle_single_use HANDLE_SINGLE_USE True bool Invalidate on first resolve

CyberArk CCP

Setting Env var Default Type Description
cyberark_ccp_url CYBERARK_CCP_URL str Full CCP REST endpoint URL
cyberark_app_id CYBERARK_APP_ID str AppID registered in CyberArk
cyberark_verify_ssl CYBERARK_VERIFY_SSL system CAs str "false", "true", or CA path
cyberark_cert_pfx_path CYBERARK_CERT_PFX_PATH None Path|None mTLS client cert (PFX)
cyberark_cert_pfx_password CYBERARK_CERT_PFX_PASSWORD None str|None PFX password

PowerShell / WinRM

Setting Env var Default Type Description
winrm_auth WINRM_AUTH ntlm str ntlm or basic
winrm_connect_timeout_seconds WINRM_CONNECT_TIMEOUT_SECONDS 15 int WinRM connection timeout
winrm_operation_timeout_seconds WINRM_OPERATION_TIMEOUT_SECONDS 20 int WinRM operation timeout
winrm_max_output_bytes WINRM_MAX_OUTPUT_BYTES 51200 int Max bytes per output object

SSH

Setting Env var Default Type Description
ssh_known_hosts SSH_KNOWN_HOSTS ~/.ssh/known_hosts str Path or "disable"
ssh_connect_timeout_seconds SSH_CONNECT_TIMEOUT_SECONDS 10 int SSH connection timeout
ssh_max_output_bytes SSH_MAX_OUTPUT_BYTES 51200 int Max bytes per stdout/stderr

Database

Setting Env var Default Type Description
db_connect_timeout_seconds DB_CONNECT_TIMEOUT_SECONDS 10 int DB connection timeout
db_query_timeout_seconds DB_QUERY_TIMEOUT_SECONDS 30 int Query execution timeout
db_max_rows DB_MAX_ROWS 1000 int Row result cap
db_max_cell_bytes DB_MAX_CELL_BYTES 1024 int Per-cell truncation threshold

Logging

Setting Env var Default Type Description
log_format LOG_FORMAT json "json"|"console" Output format
log_level LOG_LEVEL INFO "DEBUG"|"INFO"|.. Minimum log level

Validators

  • _parse_and_validate_api_keys (model validator): validates that MCP_API_KEYS is non-empty and not equal to the default "changeme" — service refuses to start if either condition is violated. Raises ValidationError at import time (fail-fast).
  • mcp_api_keys (property): splits mcp_api_keys_raw on commas, strips whitespace, returns frozenset[str]. Implemented as a @property (not a pydantic field) to avoid a name collision with the pydantic-settings env-var auto-mapping.
  • cyberark_ssl_verify: maps "false"False, "true" or ""True, anything else → path string
  • cyberark_cert_pfx_path: empty string → None
  • _validate_pfx (model validator): if PFX path is set, the file must exist and the password must be non-empty

2.2 secret_store.py

Class: SecretStore Singleton: secret_store = SecretStore()

Internal data structure

@dataclass(slots=True)
class _Entry:
    handle_id: str          # 32-char hex, the key in _store
    username: str           # plaintext (used as SSH/DB username)
    password: SecretStr     # pydantic SecretStr — prevents accidental str() exposure
    created_at: float       # time.monotonic() at creation
    resolved: bool = False  # set to True on first resolve

    def is_expired(self, ttl: int) -> bool:
        return (time.monotonic() - self.created_at) > ttl
class SecretStore:
    _store: dict[str, _Entry]   # handle_id → entry
    _lock: asyncio.Lock         # all mutations are locked

Methods

async store(username, password) → str

  1. Generate handle_id = secrets.token_hex(16) (32 hex chars, cryptographically random)
  2. Create _Entry(handle_id, username, SecretStr(password))
  3. Acquire lock, insert into _store
  4. Return "secret://" + handle_id

async resolve(handle, resolved_by="unknown") → (str, str)

  1. Parse handle → extract handle_id (raises ValueError if prefix wrong)
  2. Acquire lock
  3. Lookup entry — KeyError if not found
  4. Check TTL — KeyError("expired") + delete if expired
  5. Check resolved + single_useKeyError("already consumed") if violated
  6. Mark entry.resolved = True; delete if single_use
  7. Release lock
  8. Log handle_resolved audit event
  9. Return (entry.username, entry.password.get_secret_value())

get_secret_value() is the only intentional unwrap point in the entire codebase.

async revoke(handle) → bool Explicit early revocation. Returns True if the handle existed.

async purge_expired() → int Scans all entries and deletes expired ones. Called by the background sweeper every 60 seconds. Returns count of deleted entries.

Background sweeper

async def _sweeper(store, interval_seconds=60):
    while True:
        await asyncio.sleep(interval_seconds)
        count = await store.purge_expired()

Started in main.py lifespan as an asyncio.Task. Cancelled on shutdown.


2.3 auth.py

Class: ApiKeyMiddleware(BaseHTTPMiddleware)

Request arrives
      │
      ▼
Does path start with "/mcp/"?
      │
  NO  ├──────────────────────────────► pass through (health check etc.)
      │
  YES ▼
Extract key from headers:
  1. X-API-Key: <value>
  2. Authorization: Bearer <value>
      │
      ▼
key in settings.mcp_api_keys ?
      │
  NO  ├──────────────────────────────► 401 JSON + log_auth_failure()
      │
  YES ▼
      call_next(request)

Key validation uses hmac.compare_digest in a non-short-circuiting loop over all configured keys, providing timing-safe comparison that prevents an attacker from inferring key length or prefix from response time differences:

@staticmethod
def _is_valid_key(key: str) -> bool:
    key_bytes = key.encode()
    valid = False
    for configured_key in settings.mcp_api_keys:
        if hmac.compare_digest(key_bytes, configured_key.encode()):
            valid = True   # set flag, do NOT return early
    return valid

The loop always iterates all keys (no return True inside the loop) so the response time does not leak how many keys are configured or how close a guess was.


2.4 audit.py

Wraps structlog with named functions so every audit event has a consistent schema. See Section 11 for the full catalog.

Configuration: configure_logging() must be called once at startup (called in lifespan and in the run() entry point).

Processors pipeline:

merge_contextvars → add_logger_name → add_log_level → TimeStamper(iso)
→ StackInfoRenderer → ProcessorFormatter.wrap_for_formatter
→ [JSONRenderer | ConsoleRenderer]

Third-party loggers suppressed to WARNING: uvicorn.access, asyncssh, pypsrp.


2.5 main.py

Function: create_app() → FastAPI

Assembly sequence:

  1. Create FastAPI(lifespan=lifespan, docs_url=None, ...) — docs disabled in production
  2. Add ApiKeyMiddleware
  3. Register GET /health route (no auth)
  4. Import and mount four MCP servers:
    • cyberark_mcp at /mcp/cyberark
    • ssh_mcp at /mcp/ssh
    • powershell_mcp at /mcp/powershell
    • database_mcp at /mcp/database

Lifespan (async context manager):

startup:
  configure_logging()
  await cyberark_client.start()   ← creates httpx.AsyncClient
  sweeper_task = await start_sweeper(secret_store)

shutdown:
  sweeper_task.cancel()
  await sweeper_task              ← wait for cancellation
  await cyberark_client.stop()   ← closes httpx.AsyncClient

CLI entry point: mcp-privilegedmcp_privileged.main:run

def run():
    configure_logging()
    app = create_app()
    uvicorn.run(app, host=settings.mcp_host, port=settings.mcp_port,
                log_config=None, access_log=False)

3. CyberArk MCP

3.1 client.py — CyberArkCCPClient

Singleton: cyberark_client = CyberArkCCPClient()

Lifecycle

await cyberark_client.start()   # creates httpx.AsyncClient
await cyberark_client.stop()    # closes httpx.AsyncClient

The httpx.AsyncClient is created once and reused for connection pooling. Timeouts: connect=5s, read=15s, write=5s, pool=5s.

get_credential(app_id, safe, object_name) → Credential

GET {CYBERARK_CCP_URL}?AppID={app_id}&Safe={safe}&Object={object_name}

Response parsing:

  • HTTP 200 → Credential dataclass from JSON body
  • HTTP 4xx/5xx → parse ErrorCode/ErrorMsg → raise CyberArkError
  • Non-JSON body → raise CyberArkError(status_code=...)
  • httpx.ConnectErrorCyberArkError("Cannot reach CCP")
  • httpx.TimeoutExceptionCyberArkError("CCP request timed out")

SSL modes

Condition _build_ssl_context() returns
cyberark_cert_pfx_path is None settings.cyberark_ssl_verify (bool or path)
PFX path set ssl.SSLContext with client cert loaded

For mTLS, the PFX is parsed with cryptography, cert+key are written to a tempfile.mkstemp(suffix=".pem") with chmod 600, loaded into the SSLContext, then the temp file is immediately deleted with os.unlink().

Error codes

Code Meaning
APPAP004E AppID not found or not permitted
APPAP006E Authentication failure (IP allowlist / AppID mismatch)
APPAP007E Credential object not found in safe
APPAP008E No password found for object
APPAP009E Dual control pending approval
APPAP010E Dual control approval timed out
ITATS023E Object not found
ITATS012E Safe not found

3.2 server.py — CyberArk MCP

Tools: get_credential, list_safes

get_credential(safe, object_name, ctx, app_id="")

1. Resolve effective AppID (param or settings.cyberark_app_id)
2. ctx.info(...)
3. cyberark_client.get_credential(...)
   → on CyberArkError: ctx.error(...); raise
4. secret_store.store(credential.username, credential.password)
   → returns handle
5. log_credential_fetched(app_id, safe, object_name, handle_id, ttl, client_ip)
6. ctx.info("Credential retrieved. Handle issued...")
7. Return formatted string:
   "Credential retrieved successfully.\n
    Handle: secret://...\n
    Username: ...\n
    Address: ...\n
    Platform: ...\n
    TTL: 300 seconds\n
    Use this handle with ssh_execute, ps_execute, or db_connect."

The return value is carefully crafted — it contains the handle (needed by Claude for the next step) plus metadata (username, address) to help Claude route the next call correctly, but never the password.

list_safes(ctx, app_id="")

Calls cyberark_client.list_safes(app_id). Currently raises NotImplementedError (CCP has no native list-safes endpoint). The tool catches this and returns an informational message instead of raising.


4. SSH MCP

4.1 server.py

Tool: ssh_execute

Execution sequence

ssh_execute(host, command, secret_handle, ctx, port=22, username_override="", timeout_seconds=30)
│
├── secret_store.resolve(secret_handle, resolved_by="ssh")
│     KeyError → ctx.error(...); raise
│
├── if username_override: username = username_override
│
├── ctx.info("SSH connecting to ...")
│
├── _resolve_known_hosts(settings.ssh_known_hosts)
│     "disable" → None  (no host key check, logs warning)
│     else       → expanded path string
│
├── async with asyncssh.connect(host, port, username, password,
│                               known_hosts, connect_timeout) as conn:
│   ├── result = await conn.run(command, timeout=timeout_seconds)
│   └── [exceptions caught — see error matrix]
│
├── del password  (in finally block)
│
├── stdout = _truncate(result.stdout, ssh_max_output_bytes, "stdout")
├── stderr = _truncate(result.stderr, ssh_max_output_bytes, "stderr")
├── exit_code = result.exit_status ?? -1
│
├── log_ssh_executed(...)
├── ctx.info("SSH command completed ...")
│
└── return _format_result(host, command, exit_code, stdout, stderr)

Output format

Host: linux01.internal
Command: df -h
Exit code: 0

--- stdout ---
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda1        50G   10G   40G  20% /

--- stderr ---
(only present if non-empty)

Known hosts handling

ssh_known_hosts value Passed to asyncssh Behaviour
"disable" None No host key verification (dev/lab only)
"~/.ssh/known_hosts" "/home/user/.ssh/known_hosts" Verify against file
/etc/ssh/known_hosts /etc/ssh/known_hosts Verify against file

5. PowerShell MCP

5.1 server.py

Tool: ps_execute

Thread executor pattern

pypsrp is synchronous. The blocking WinRM call is wrapped in asyncio.get_running_loop().run_in_executor(None, ...):

loop = asyncio.get_running_loop()
output_lines, had_errors, error_records = await loop.run_in_executor(
    None,
    functools.partial(_run_ps_sync, host, port, username, password, script, use_ssl, timeout_seconds),
)

None uses the default ThreadPoolExecutor. The event loop remains responsive to other requests while WinRM is in progress.

_run_ps_sync() (thread worker)

wsman = WSMan(
    host, port=port, username=username, password=password,
    ssl=use_ssl,
    auth=settings.winrm_auth,          # "ntlm" or "basic"
    cert_validation=use_ssl,            # only check cert when using HTTPS
    connection_timeout=settings.winrm_connect_timeout_seconds,
    operation_timeout=max(
        timeout_seconds + 10,
        settings.winrm_operation_timeout_seconds,
    ),
)
with RunspacePool(wsman) as pool:
    ps = PowerShell(pool)
    ps.add_script(script)
    raw_output = ps.invoke()
    had_errors = ps.had_errors
    error_records = [str(e) for e in ps.streams.error]

Each item in raw_output is converted via str() and truncated to winrm_max_output_bytes.

Output format

Host: win01.internal
Script length: 43 chars
Had errors: False

--- output ---
WIN-SERVER-01
6.1.7601.65536

--- errors ---
(only present if had_errors or error_records is non-empty)

6. Database MCP

6.1 server.py

Tool: db_query

Driver dispatch

async def _dispatch_query(db_type, host, port, database, username, password, query, timeout_seconds):
    if db_type == "postgres": return await _query_postgres(...)
    if db_type == "mysql":    return await _query_mysql(...)
    # mssql: run synchronous pyodbc in thread pool
    return await loop.run_in_executor(None, partial(_query_mssql_sync, ...))

PostgreSQL (asyncpg)

conn = await asyncpg.connect(host, port, user, password, database, timeout=connect_timeout)
rows = await conn.fetch(query, timeout=query_timeout)
columns = list(rows[0].keys())
data = [list(row.values()) for row in rows]
await conn.close()

MySQL (aiomysql)

conn = await aiomysql.connect(host, port, user, password, db, connect_timeout)
async with conn.cursor() as cursor:
    await asyncio.wait_for(cursor.execute(query), timeout=query_timeout)
    columns = [col[0] for col in cursor.description]
    rows = await cursor.fetchall()
conn.close()

SQL Server (pyodbc — sync)

conn_str = (
    "DRIVER={ODBC Driver 18 for SQL Server};"
    f"SERVER={host},{port};DATABASE={database};UID={username};PWD={password};"
    f"Connection Timeout={connect_timeout};"
)
with pyodbc.connect(conn_str, timeout=query_timeout) as conn:
    cursor = conn.cursor()
    cursor.execute(query)
    columns = [col[0] for col in cursor.description]
    rows = [list(row) for row in cursor.fetchall()]

If pyodbc is not importable (missing system ODBC driver), raises RuntimeError with installation instructions.

Row and cell limits

After query execution:

  1. If len(rows) > settings.db_max_rows: truncate to db_max_rows, set truncated=True
  2. For each cell in _format_result: _cell_str() truncates at db_max_cell_bytes UTF-8 bytes and appends

Output format

Host: pg.internal
Database: prod (postgres)
Query length: 38 chars
Rows returned: 3
Elapsed: 12ms

 id | name    | email
----|---------|----------------
  1 | Alice   | alice@corp.com
  2 | Bob     | bob@corp.com
  3 | Charlie | charlie@corp.com

If rows are capped: Rows returned: 1000 (capped — more rows exist)


7. MCP Tool API Reference

All tools follow the JSON-RPC 2.0 envelope defined by the MCP protocol. Parameters below are the tool-level parameters (inside arguments).

get_credential

MCP path: POST /mcp/cyberark/...

Parameter Type Required Default Description
safe string Yes CyberArk Safe name
object_name string Yes Credential object name in the Safe
app_id string No CYBERARK_APP_ID Override the service AppID

Returns: Plain text with handle, username, address, platform, TTL.

Errors:

  • CyberArkError — CCP returned an error (APPAP00xE etc.)
  • RuntimeError — CyberArk client not started

list_safes

MCP path: POST /mcp/cyberark/...

Parameter Type Required Default Description
app_id string No CYBERARK_APP_ID AppID to list safes for

Returns: Newline-separated list of Safe names, or informational message if not configured.


ssh_execute

MCP path: POST /mcp/ssh/...

Parameter Type Required Default Description
host string Yes Hostname or IP
command string Yes Shell command
secret_handle string Yes Handle from get_credential
port integer No 22 SSH port
username_override string No "" Override credential username
timeout_seconds integer No 30 Command timeout

Returns: Formatted text with host, command, exit code, stdout, stderr.

Errors:

  • KeyError — handle not found, expired, or already consumed
  • asyncssh.PermissionDenied — authentication failure
  • asyncssh.DisconnectError — SSH disconnection
  • asyncio.TimeoutError — command timed out
  • OSError — network error (connection refused, DNS failure)

ps_execute

MCP path: POST /mcp/powershell/...

Parameter Type Required Default Description
host string Yes Hostname or IP
script string Yes PowerShell script text
secret_handle string Yes Handle from get_credential
port integer No 5985 WinRM port (5986 for HTTPS)
use_ssl boolean No false Use HTTPS for WinRM
timeout_seconds integer No 60 Script execution timeout
username_override string No "" Override credential username

Returns: Formatted text with host, script length, had_errors, output, error records.

Errors:

  • KeyError — handle not found/expired/consumed
  • Any exception from pypsrp (WinRM connection error, auth failure, etc.)

db_query

MCP path: POST /mcp/database/...

Parameter Type Required Default Description
host string Yes Database server hostname
database string Yes Database/schema name
query string Yes SQL query text
secret_handle string Yes Handle from get_credential
db_type string No "postgres" "postgres", "mysql", "mssql"
port integer No 0 0 = use default for db_type
username_override string No "" Override credential username
timeout_seconds integer No 30 Query timeout

Returns: Text table with columns, rows, counts, elapsed time.

Errors:

  • ValueError — unsupported db_type
  • KeyError — handle not found/expired/consumed
  • asyncpg.PostgresError — PostgreSQL error
  • aiomysql.Error — MySQL error
  • pyodbc.Error — SQL Server error
  • RuntimeError — pyodbc not installed

8. Data Models

Credential (CyberArk CCP response)

@dataclass(frozen=True)
class Credential:
    username: str                    # "svc_account"
    password: str                    # raw password — stored in SecretStr immediately
    address: str                     # "db.internal" — target host from CyberArk
    safe: str                        # "PROD-DB"
    folder: str                      # "Root"
    object_name: str                 # "PROD-DB-svc_account"
    platform_id: str                 # "Oracle", "UnixSSH", etc.
    password_change_in_process: bool # True if CyberArk is rotating this credential

password_change_in_process=True should trigger a warning — the credential may be mid-rotation.

SecretStore entry

@dataclass(slots=True)
class _Entry:
    handle_id: str       # 32 hex chars (key in _store dict)
    username: str
    password: SecretStr  # pydantic SecretStr — str() returns "**********"
    created_at: float    # time.monotonic()
    resolved: bool       # True after first resolve

Handle format

secret://a3f9c2e1b8d74f2c9e1a0b5d3c8f7e2a
         └──────────────────────────────────┘
         32-char lowercase hex = 128 bits of entropy
         from secrets.token_hex(16)

9. Configuration Reference

Full .env.example:

# ── Service ───────────────────────────────────────────────────────────
MCP_HOST=0.0.0.0
MCP_PORT=8443
MCP_API_KEYS=key-for-claude-desktop,key-for-vscode

# ── Secret Handle Store ───────────────────────────────────────────────
HANDLE_TTL_SECONDS=300
HANDLE_SINGLE_USE=true

# ── CyberArk CCP ─────────────────────────────────────────────────────
CYBERARK_CCP_URL=https://cyberark.internal/AIMWebService/api/Accounts
CYBERARK_APP_ID=MCP-Privileged-Service
CYBERARK_VERIFY_SSL=/etc/ssl/certs/ca-certificates.crt

# ── CyberArk mTLS (leave empty for IP allowlist mode) ─────────────────
CYBERARK_CERT_PFX_PATH=
CYBERARK_CERT_PFX_PASSWORD=

# ── PowerShell / WinRM ────────────────────────────────────────────────
WINRM_AUTH=ntlm
WINRM_CONNECT_TIMEOUT_SECONDS=15
WINRM_OPERATION_TIMEOUT_SECONDS=20
WINRM_MAX_OUTPUT_BYTES=51200

# ── SSH ───────────────────────────────────────────────────────────────
SSH_KNOWN_HOSTS=~/.ssh/known_hosts
SSH_CONNECT_TIMEOUT_SECONDS=10
SSH_MAX_OUTPUT_BYTES=51200

# ── Database ──────────────────────────────────────────────────────────
DB_CONNECT_TIMEOUT_SECONDS=10
DB_QUERY_TIMEOUT_SECONDS=30
DB_MAX_ROWS=1000
DB_MAX_CELL_BYTES=1024

# ── Logging ───────────────────────────────────────────────────────────
LOG_FORMAT=json
LOG_LEVEL=INFO

10. Error Handling Matrix

Layer Exception Handling What Claude sees
Auth middleware Bad/missing key 401 JSON response {"detail": "Invalid or missing API key"}
SecretStore KeyError (unknown) Caught in tool, ctx.error, re-raised MCP error response
SecretStore KeyError (expired) Same MCP error response
SecretStore KeyError (consumed) Same MCP error response
CyberArk CCP CyberArkError Caught in tool, ctx.error, re-raised MCP error response with error code
SSH asyncssh.PermissionDenied ctx.error, re-raised MCP error response
SSH asyncssh.DisconnectError ctx.error, re-raised MCP error response
SSH asyncio.TimeoutError ctx.error, re-raised MCP error response
SSH OSError ctx.error, re-raised MCP error response
SSH Non-zero exit code NOT raised — returned in result Normal result with Exit code: N
WinRM Any exception from pypsrp ctx.error, re-raised MCP error response
WinRM Script errors (had_errors=True) NOT raised — returned in result Normal result with Had errors: True
Database ValueError (bad db_type) Raised before credential access MCP error response
Database Driver exceptions ctx.error, re-raised MCP error response
Database Row cap exceeded NOT raised — result truncated Normal result with (capped) note

11. Audit Event Catalog

All events are emitted via structlog at INFO or WARNING level to the audit logger. The logger is named "audit" — log shippers can filter on this name.

Event Level When Key fields
credential_fetched INFO CyberArk credential retrieved app_id, safe, object_name, handle_id, ttl_seconds, client_ip
handle_resolved INFO Handle consumed by a tool handle_id, resolved_by, target_host, single_use_invalidated
handle_expired WARNING Handle TTL exceeded or already consumed handle_id, reason
auth_failure WARNING Invalid/missing API key client_ip, reason
cyberark_error ERROR CCP returned error app_id, safe, object_name, status_code, error_code, message
ssh_executed INFO SSH command completed handle_id, host, port, username, command, exit_code, elapsed_ms, client_ip
ps_executed INFO PowerShell script completed handle_id, host, port, username, script_length, had_errors, elapsed_ms, client_ip
db_queried INFO Database query completed handle_id, host, port, database, db_type, username, query_length, row_count, elapsed_ms, client_ip

Fields intentionally absent from all events:

  • password (never)
  • secret_handle (never — only handle_id which is non-reversible)
  • stdout / stderr output (may contain sensitive data)
  • SQL query text (logged only as query_length)
  • PowerShell script text (logged only as script_length)

12. Test Strategy

Test layout

tests/
├── conftest.py                 ← shared fixtures and mock helpers
├── test_auth.py                ← API key middleware (FastAPI TestClient)
├── test_secret_store.py        ← handle lifecycle (pure asyncio)
├── test_cyberark_client.py     ← CCP HTTP client (httpx MockTransport)
├── test_ssh_server.py          ← SSH tool (mock asyncssh.connect)
├── test_powershell_server.py   ← PS tool (mock _run_ps_sync)
├── test_database_server.py     ← DB tool (mock _dispatch_query)
└── test_integration.py         ← end-to-end pipelines (all mocks combined)

Test patterns

Pattern Used for Why
httpx.MockTransport CyberArk client Tests full HTTP response parsing without real CyberArk
unittest.mock.patch on transport layer SSH, PowerShell, DB tools Isolates MCP tool logic from network I/O
Real secret_store All tool tests Tests handle lifecycle end-to-end
MagicMock for Context All tool tests Tests ctx.info / ctx.error calls without MCP framework
patch.object(settings, ...) Settings-sensitive tests Overrides config for a test without process restart

Coverage targets

  • Foundation modules: 100%
  • CyberArk client: 100% (all HTTP response paths)
  • MCP tools: ≥90% (happy path + all error paths)
  • Integration flows: key pipelines (CyberArk→SSH, →PS, →DB)
  • Known gap: real-system integration tests (require live CyberArk/WinRM/DB)

Running tests

# All tests
python -m pytest tests/ -v

# With coverage
python -m pytest tests/ --cov=src/mcp_privileged --cov-report=term-missing

# Single module
python -m pytest tests/test_integration.py -v