32 KiB
Low-Level Design
MCP Privileged Access Service
Version: 1.0 Date: 2026-03-28 Status: Production-ready
Table of Contents
- Module Structure
- Foundation Modules
- 2.1 config.py
- 2.2 secret_store.py
- 2.3 auth.py
- 2.4 audit.py
- 2.5 main.py
- CyberArk MCP
- SSH MCP
- PowerShell MCP
- Database MCP
- MCP Tool API Reference
- Data Models
- Configuration Reference
- Error Handling Matrix
- Audit Event Catalog
- Test Strategy
1. Module Structure
src/mcp_privileged/
├── __init__.py
├── config.py ← All settings; read once at import time
├── secret_store.py ← In-RAM handle store + background sweeper
├── auth.py ← API key middleware
├── audit.py ← Structured log helpers
├── main.py ← FastAPI app assembly + lifespan
│
├── cyberark/
│ ├── __init__.py
│ ├── client.py ← CCP REST client (httpx)
│ └── server.py ← FastMCP server; get_credential, list_safes tools
│
├── ssh/
│ ├── __init__.py
│ └── server.py ← FastMCP server; ssh_execute tool
│
├── powershell/
│ ├── __init__.py
│ └── server.py ← FastMCP server; ps_execute tool
│
└── database/
├── __init__.py
└── server.py ← FastMCP server; db_query tool
Dependency graph (no cycles):
main.py
├── config.py (leaf)
├── audit.py ← config.py
├── auth.py ← config.py, audit.py
├── secret_store.py ← config.py, audit.py
├── cyberark/
│ ├── client.py ← config.py, audit.py
│ └── server.py ← cyberark/client.py, secret_store.py, audit.py
├── ssh/server.py ← config.py, secret_store.py, audit.py
├── powershell/server.py ← config.py, secret_store.py, audit.py
└── database/server.py ← config.py, secret_store.py, audit.py
2. Foundation Modules
2.1 config.py
Class: Settings(BaseSettings)
Singleton: settings = Settings() — imported everywhere as from mcp_privileged.config import settings
The settings object is created once when the module is first imported. If any required value is missing or invalid, pydantic raises a ValidationError at startup — fail fast.
Settings groups
Service
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
mcp_host |
MCP_HOST |
0.0.0.0 |
str |
Bind address for uvicorn |
mcp_port |
MCP_PORT |
8443 |
int |
Listen port |
mcp_api_keys_raw |
MCP_API_KEYS |
(required) | str |
Comma-separated API keys — access via the mcp_api_keys property |
Secret Handle Store
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
handle_ttl_seconds |
HANDLE_TTL_SECONDS |
300 |
int (30–3600) |
Handle expiry |
handle_single_use |
HANDLE_SINGLE_USE |
True |
bool |
Invalidate on first resolve |
CyberArk CCP
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
cyberark_ccp_url |
CYBERARK_CCP_URL |
— | str |
Full CCP REST endpoint URL |
cyberark_app_id |
CYBERARK_APP_ID |
— | str |
AppID registered in CyberArk |
cyberark_verify_ssl |
CYBERARK_VERIFY_SSL |
system CAs | str |
"false", "true", or CA path |
cyberark_cert_pfx_path |
CYBERARK_CERT_PFX_PATH |
None |
Path|None |
mTLS client cert (PFX) |
cyberark_cert_pfx_password |
CYBERARK_CERT_PFX_PASSWORD |
None |
str|None |
PFX password |
PowerShell / WinRM
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
winrm_auth |
WINRM_AUTH |
ntlm |
str |
ntlm or basic |
winrm_connect_timeout_seconds |
WINRM_CONNECT_TIMEOUT_SECONDS |
15 |
int |
WinRM connection timeout |
winrm_operation_timeout_seconds |
WINRM_OPERATION_TIMEOUT_SECONDS |
20 |
int |
WinRM operation timeout |
winrm_max_output_bytes |
WINRM_MAX_OUTPUT_BYTES |
51200 |
int |
Max bytes per output object |
SSH
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
ssh_known_hosts |
SSH_KNOWN_HOSTS |
~/.ssh/known_hosts |
str |
Path or "disable" |
ssh_connect_timeout_seconds |
SSH_CONNECT_TIMEOUT_SECONDS |
10 |
int |
SSH connection timeout |
ssh_max_output_bytes |
SSH_MAX_OUTPUT_BYTES |
51200 |
int |
Max bytes per stdout/stderr |
Database
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
db_connect_timeout_seconds |
DB_CONNECT_TIMEOUT_SECONDS |
10 |
int |
DB connection timeout |
db_query_timeout_seconds |
DB_QUERY_TIMEOUT_SECONDS |
30 |
int |
Query execution timeout |
db_max_rows |
DB_MAX_ROWS |
1000 |
int |
Row result cap |
db_max_cell_bytes |
DB_MAX_CELL_BYTES |
1024 |
int |
Per-cell truncation threshold |
Logging
| Setting | Env var | Default | Type | Description |
|---|---|---|---|---|
log_format |
LOG_FORMAT |
json |
"json"|"console" |
Output format |
log_level |
LOG_LEVEL |
INFO |
"DEBUG"|"INFO"|.. |
Minimum log level |
Validators
_parse_and_validate_api_keys(model validator): validates thatMCP_API_KEYSis non-empty and not equal to the default"changeme"— service refuses to start if either condition is violated. RaisesValidationErrorat import time (fail-fast).mcp_api_keys(property): splitsmcp_api_keys_rawon commas, strips whitespace, returnsfrozenset[str]. Implemented as a@property(not a pydantic field) to avoid a name collision with the pydantic-settings env-var auto-mapping.cyberark_ssl_verify: maps"false"→False,"true"or""→True, anything else → path stringcyberark_cert_pfx_path: empty string →None_validate_pfx(model validator): if PFX path is set, the file must exist and the password must be non-empty
2.2 secret_store.py
Class: SecretStore
Singleton: secret_store = SecretStore()
Internal data structure
@dataclass(slots=True)
class _Entry:
handle_id: str # 32-char hex, the key in _store
username: str # plaintext (used as SSH/DB username)
password: SecretStr # pydantic SecretStr — prevents accidental str() exposure
created_at: float # time.monotonic() at creation
resolved: bool = False # set to True on first resolve
def is_expired(self, ttl: int) -> bool:
return (time.monotonic() - self.created_at) > ttl
class SecretStore:
_store: dict[str, _Entry] # handle_id → entry
_lock: asyncio.Lock # all mutations are locked
Methods
async store(username, password) → str
- Generate
handle_id = secrets.token_hex(16)(32 hex chars, cryptographically random) - Create
_Entry(handle_id, username, SecretStr(password)) - Acquire lock, insert into
_store - Return
"secret://" + handle_id
async resolve(handle, resolved_by="unknown") → (str, str)
- Parse handle → extract
handle_id(raisesValueErrorif prefix wrong) - Acquire lock
- Lookup entry —
KeyErrorif not found - Check TTL —
KeyError("expired")+ delete if expired - Check
resolved+single_use—KeyError("already consumed")if violated - Mark
entry.resolved = True; delete ifsingle_use - Release lock
- Log
handle_resolvedaudit event - Return
(entry.username, entry.password.get_secret_value())
get_secret_value()is the only intentional unwrap point in the entire codebase.
async revoke(handle) → bool
Explicit early revocation. Returns True if the handle existed.
async purge_expired() → int
Scans all entries and deletes expired ones. Called by the background sweeper every 60 seconds. Returns count of deleted entries.
Background sweeper
async def _sweeper(store, interval_seconds=60):
while True:
await asyncio.sleep(interval_seconds)
count = await store.purge_expired()
Started in main.py lifespan as an asyncio.Task. Cancelled on shutdown.
2.3 auth.py
Class: ApiKeyMiddleware(BaseHTTPMiddleware)
Request arrives
│
▼
Does path start with "/mcp/"?
│
NO ├──────────────────────────────► pass through (health check etc.)
│
YES ▼
Extract key from headers:
1. X-API-Key: <value>
2. Authorization: Bearer <value>
│
▼
key in settings.mcp_api_keys ?
│
NO ├──────────────────────────────► 401 JSON + log_auth_failure()
│
YES ▼
call_next(request)
Key validation uses hmac.compare_digest in a non-short-circuiting loop over all configured keys, providing timing-safe comparison that prevents an attacker from inferring key length or prefix from response time differences:
@staticmethod
def _is_valid_key(key: str) -> bool:
key_bytes = key.encode()
valid = False
for configured_key in settings.mcp_api_keys:
if hmac.compare_digest(key_bytes, configured_key.encode()):
valid = True # set flag, do NOT return early
return valid
The loop always iterates all keys (no return True inside the loop) so the response time does not leak how many keys are configured or how close a guess was.
2.4 audit.py
Wraps structlog with named functions so every audit event has a consistent schema. See Section 11 for the full catalog.
Configuration: configure_logging() must be called once at startup (called in lifespan and in the run() entry point).
Processors pipeline:
merge_contextvars → add_logger_name → add_log_level → TimeStamper(iso)
→ StackInfoRenderer → ProcessorFormatter.wrap_for_formatter
→ [JSONRenderer | ConsoleRenderer]
Third-party loggers suppressed to WARNING: uvicorn.access, asyncssh, pypsrp.
2.5 main.py
Function: create_app() → FastAPI
Assembly sequence:
- Create
FastAPI(lifespan=lifespan, docs_url=None, ...)— docs disabled in production - Add
ApiKeyMiddleware - Register
GET /healthroute (no auth) - Import and mount four MCP servers:
cyberark_mcpat/mcp/cyberarkssh_mcpat/mcp/sshpowershell_mcpat/mcp/powershelldatabase_mcpat/mcp/database
Lifespan (async context manager):
startup:
configure_logging()
await cyberark_client.start() ← creates httpx.AsyncClient
sweeper_task = await start_sweeper(secret_store)
shutdown:
sweeper_task.cancel()
await sweeper_task ← wait for cancellation
await cyberark_client.stop() ← closes httpx.AsyncClient
CLI entry point: mcp-privileged → mcp_privileged.main:run
def run():
configure_logging()
app = create_app()
uvicorn.run(app, host=settings.mcp_host, port=settings.mcp_port,
log_config=None, access_log=False)
3. CyberArk MCP
3.1 client.py — CyberArkCCPClient
Singleton: cyberark_client = CyberArkCCPClient()
Lifecycle
await cyberark_client.start() # creates httpx.AsyncClient
await cyberark_client.stop() # closes httpx.AsyncClient
The httpx.AsyncClient is created once and reused for connection pooling. Timeouts: connect=5s, read=15s, write=5s, pool=5s.
get_credential(app_id, safe, object_name) → Credential
GET {CYBERARK_CCP_URL}?AppID={app_id}&Safe={safe}&Object={object_name}
Response parsing:
- HTTP 200 →
Credentialdataclass from JSON body - HTTP 4xx/5xx → parse
ErrorCode/ErrorMsg→ raiseCyberArkError - Non-JSON body → raise
CyberArkError(status_code=...) httpx.ConnectError→CyberArkError("Cannot reach CCP")httpx.TimeoutException→CyberArkError("CCP request timed out")
SSL modes
| Condition | _build_ssl_context() returns |
|---|---|
cyberark_cert_pfx_path is None |
settings.cyberark_ssl_verify (bool or path) |
| PFX path set | ssl.SSLContext with client cert loaded |
For mTLS, the PFX is parsed with cryptography, cert+key are written to a tempfile.mkstemp(suffix=".pem") with chmod 600, loaded into the SSLContext, then the temp file is immediately deleted with os.unlink().
Error codes
| Code | Meaning |
|---|---|
APPAP004E |
AppID not found or not permitted |
APPAP006E |
Authentication failure (IP allowlist / AppID mismatch) |
APPAP007E |
Credential object not found in safe |
APPAP008E |
No password found for object |
APPAP009E |
Dual control pending approval |
APPAP010E |
Dual control approval timed out |
ITATS023E |
Object not found |
ITATS012E |
Safe not found |
3.2 server.py — CyberArk MCP
Tools: get_credential, list_safes
get_credential(safe, object_name, ctx, app_id="")
1. Resolve effective AppID (param or settings.cyberark_app_id)
2. ctx.info(...)
3. cyberark_client.get_credential(...)
→ on CyberArkError: ctx.error(...); raise
4. secret_store.store(credential.username, credential.password)
→ returns handle
5. log_credential_fetched(app_id, safe, object_name, handle_id, ttl, client_ip)
6. ctx.info("Credential retrieved. Handle issued...")
7. Return formatted string:
"Credential retrieved successfully.\n
Handle: secret://...\n
Username: ...\n
Address: ...\n
Platform: ...\n
TTL: 300 seconds\n
Use this handle with ssh_execute, ps_execute, or db_connect."
The return value is carefully crafted — it contains the handle (needed by Claude for the next step) plus metadata (username, address) to help Claude route the next call correctly, but never the password.
list_safes(ctx, app_id="")
Calls cyberark_client.list_safes(app_id). Currently raises NotImplementedError (CCP has no native list-safes endpoint). The tool catches this and returns an informational message instead of raising.
4. SSH MCP
4.1 server.py
Tool: ssh_execute
Execution sequence
ssh_execute(host, command, secret_handle, ctx, port=22, username_override="", timeout_seconds=30)
│
├── secret_store.resolve(secret_handle, resolved_by="ssh")
│ KeyError → ctx.error(...); raise
│
├── if username_override: username = username_override
│
├── ctx.info("SSH connecting to ...")
│
├── _resolve_known_hosts(settings.ssh_known_hosts)
│ "disable" → None (no host key check, logs warning)
│ else → expanded path string
│
├── async with asyncssh.connect(host, port, username, password,
│ known_hosts, connect_timeout) as conn:
│ ├── result = await conn.run(command, timeout=timeout_seconds)
│ └── [exceptions caught — see error matrix]
│
├── del password (in finally block)
│
├── stdout = _truncate(result.stdout, ssh_max_output_bytes, "stdout")
├── stderr = _truncate(result.stderr, ssh_max_output_bytes, "stderr")
├── exit_code = result.exit_status ?? -1
│
├── log_ssh_executed(...)
├── ctx.info("SSH command completed ...")
│
└── return _format_result(host, command, exit_code, stdout, stderr)
Output format
Host: linux01.internal
Command: df -h
Exit code: 0
--- stdout ---
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 50G 10G 40G 20% /
--- stderr ---
(only present if non-empty)
Known hosts handling
ssh_known_hosts value |
Passed to asyncssh | Behaviour |
|---|---|---|
"disable" |
None |
No host key verification (dev/lab only) |
"~/.ssh/known_hosts" |
"/home/user/.ssh/known_hosts" |
Verify against file |
/etc/ssh/known_hosts |
/etc/ssh/known_hosts |
Verify against file |
5. PowerShell MCP
5.1 server.py
Tool: ps_execute
Thread executor pattern
pypsrp is synchronous. The blocking WinRM call is wrapped in asyncio.get_running_loop().run_in_executor(None, ...):
loop = asyncio.get_running_loop()
output_lines, had_errors, error_records = await loop.run_in_executor(
None,
functools.partial(_run_ps_sync, host, port, username, password, script, use_ssl, timeout_seconds),
)
None uses the default ThreadPoolExecutor. The event loop remains responsive to other requests while WinRM is in progress.
_run_ps_sync() (thread worker)
wsman = WSMan(
host, port=port, username=username, password=password,
ssl=use_ssl,
auth=settings.winrm_auth, # "ntlm" or "basic"
cert_validation=use_ssl, # only check cert when using HTTPS
connection_timeout=settings.winrm_connect_timeout_seconds,
operation_timeout=max(
timeout_seconds + 10,
settings.winrm_operation_timeout_seconds,
),
)
with RunspacePool(wsman) as pool:
ps = PowerShell(pool)
ps.add_script(script)
raw_output = ps.invoke()
had_errors = ps.had_errors
error_records = [str(e) for e in ps.streams.error]
Each item in raw_output is converted via str() and truncated to winrm_max_output_bytes.
Output format
Host: win01.internal
Script length: 43 chars
Had errors: False
--- output ---
WIN-SERVER-01
6.1.7601.65536
--- errors ---
(only present if had_errors or error_records is non-empty)
6. Database MCP
6.1 server.py
Tool: db_query
Driver dispatch
async def _dispatch_query(db_type, host, port, database, username, password, query, timeout_seconds):
if db_type == "postgres": return await _query_postgres(...)
if db_type == "mysql": return await _query_mysql(...)
# mssql: run synchronous pyodbc in thread pool
return await loop.run_in_executor(None, partial(_query_mssql_sync, ...))
PostgreSQL (asyncpg)
conn = await asyncpg.connect(host, port, user, password, database, timeout=connect_timeout)
rows = await conn.fetch(query, timeout=query_timeout)
columns = list(rows[0].keys())
data = [list(row.values()) for row in rows]
await conn.close()
MySQL (aiomysql)
conn = await aiomysql.connect(host, port, user, password, db, connect_timeout)
async with conn.cursor() as cursor:
await asyncio.wait_for(cursor.execute(query), timeout=query_timeout)
columns = [col[0] for col in cursor.description]
rows = await cursor.fetchall()
conn.close()
SQL Server (pyodbc — sync)
conn_str = (
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={host},{port};DATABASE={database};UID={username};PWD={password};"
f"Connection Timeout={connect_timeout};"
)
with pyodbc.connect(conn_str, timeout=query_timeout) as conn:
cursor = conn.cursor()
cursor.execute(query)
columns = [col[0] for col in cursor.description]
rows = [list(row) for row in cursor.fetchall()]
If pyodbc is not importable (missing system ODBC driver), raises RuntimeError with installation instructions.
Row and cell limits
After query execution:
- If
len(rows) > settings.db_max_rows: truncate todb_max_rows, settruncated=True - For each cell in
_format_result:_cell_str()truncates atdb_max_cell_bytesUTF-8 bytes and appends…
Output format
Host: pg.internal
Database: prod (postgres)
Query length: 38 chars
Rows returned: 3
Elapsed: 12ms
id | name | email
----|---------|----------------
1 | Alice | alice@corp.com
2 | Bob | bob@corp.com
3 | Charlie | charlie@corp.com
If rows are capped: Rows returned: 1000 (capped — more rows exist)
7. MCP Tool API Reference
All tools follow the JSON-RPC 2.0 envelope defined by the MCP protocol. Parameters below are the tool-level parameters (inside arguments).
get_credential
MCP path: POST /mcp/cyberark/...
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
safe |
string |
Yes | — | CyberArk Safe name |
object_name |
string |
Yes | — | Credential object name in the Safe |
app_id |
string |
No | CYBERARK_APP_ID |
Override the service AppID |
Returns: Plain text with handle, username, address, platform, TTL.
Errors:
CyberArkError— CCP returned an error (APPAP00xE etc.)RuntimeError— CyberArk client not started
list_safes
MCP path: POST /mcp/cyberark/...
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
app_id |
string |
No | CYBERARK_APP_ID |
AppID to list safes for |
Returns: Newline-separated list of Safe names, or informational message if not configured.
ssh_execute
MCP path: POST /mcp/ssh/...
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
host |
string |
Yes | — | Hostname or IP |
command |
string |
Yes | — | Shell command |
secret_handle |
string |
Yes | — | Handle from get_credential |
port |
integer |
No | 22 |
SSH port |
username_override |
string |
No | "" |
Override credential username |
timeout_seconds |
integer |
No | 30 |
Command timeout |
Returns: Formatted text with host, command, exit code, stdout, stderr.
Errors:
KeyError— handle not found, expired, or already consumedasyncssh.PermissionDenied— authentication failureasyncssh.DisconnectError— SSH disconnectionasyncio.TimeoutError— command timed outOSError— network error (connection refused, DNS failure)
ps_execute
MCP path: POST /mcp/powershell/...
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
host |
string |
Yes | — | Hostname or IP |
script |
string |
Yes | — | PowerShell script text |
secret_handle |
string |
Yes | — | Handle from get_credential |
port |
integer |
No | 5985 |
WinRM port (5986 for HTTPS) |
use_ssl |
boolean |
No | false |
Use HTTPS for WinRM |
timeout_seconds |
integer |
No | 60 |
Script execution timeout |
username_override |
string |
No | "" |
Override credential username |
Returns: Formatted text with host, script length, had_errors, output, error records.
Errors:
KeyError— handle not found/expired/consumed- Any exception from pypsrp (WinRM connection error, auth failure, etc.)
db_query
MCP path: POST /mcp/database/...
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
host |
string |
Yes | — | Database server hostname |
database |
string |
Yes | — | Database/schema name |
query |
string |
Yes | — | SQL query text |
secret_handle |
string |
Yes | — | Handle from get_credential |
db_type |
string |
No | "postgres" |
"postgres", "mysql", "mssql" |
port |
integer |
No | 0 |
0 = use default for db_type |
username_override |
string |
No | "" |
Override credential username |
timeout_seconds |
integer |
No | 30 |
Query timeout |
Returns: Text table with columns, rows, counts, elapsed time.
Errors:
ValueError— unsupporteddb_typeKeyError— handle not found/expired/consumedasyncpg.PostgresError— PostgreSQL erroraiomysql.Error— MySQL errorpyodbc.Error— SQL Server errorRuntimeError— pyodbc not installed
8. Data Models
Credential (CyberArk CCP response)
@dataclass(frozen=True)
class Credential:
username: str # "svc_account"
password: str # raw password — stored in SecretStr immediately
address: str # "db.internal" — target host from CyberArk
safe: str # "PROD-DB"
folder: str # "Root"
object_name: str # "PROD-DB-svc_account"
platform_id: str # "Oracle", "UnixSSH", etc.
password_change_in_process: bool # True if CyberArk is rotating this credential
password_change_in_process=Trueshould trigger a warning — the credential may be mid-rotation.
SecretStore entry
@dataclass(slots=True)
class _Entry:
handle_id: str # 32 hex chars (key in _store dict)
username: str
password: SecretStr # pydantic SecretStr — str() returns "**********"
created_at: float # time.monotonic()
resolved: bool # True after first resolve
Handle format
secret://a3f9c2e1b8d74f2c9e1a0b5d3c8f7e2a
└──────────────────────────────────┘
32-char lowercase hex = 128 bits of entropy
from secrets.token_hex(16)
9. Configuration Reference
Full .env.example:
# ── Service ───────────────────────────────────────────────────────────
MCP_HOST=0.0.0.0
MCP_PORT=8443
MCP_API_KEYS=key-for-claude-desktop,key-for-vscode
# ── Secret Handle Store ───────────────────────────────────────────────
HANDLE_TTL_SECONDS=300
HANDLE_SINGLE_USE=true
# ── CyberArk CCP ─────────────────────────────────────────────────────
CYBERARK_CCP_URL=https://cyberark.internal/AIMWebService/api/Accounts
CYBERARK_APP_ID=MCP-Privileged-Service
CYBERARK_VERIFY_SSL=/etc/ssl/certs/ca-certificates.crt
# ── CyberArk mTLS (leave empty for IP allowlist mode) ─────────────────
CYBERARK_CERT_PFX_PATH=
CYBERARK_CERT_PFX_PASSWORD=
# ── PowerShell / WinRM ────────────────────────────────────────────────
WINRM_AUTH=ntlm
WINRM_CONNECT_TIMEOUT_SECONDS=15
WINRM_OPERATION_TIMEOUT_SECONDS=20
WINRM_MAX_OUTPUT_BYTES=51200
# ── SSH ───────────────────────────────────────────────────────────────
SSH_KNOWN_HOSTS=~/.ssh/known_hosts
SSH_CONNECT_TIMEOUT_SECONDS=10
SSH_MAX_OUTPUT_BYTES=51200
# ── Database ──────────────────────────────────────────────────────────
DB_CONNECT_TIMEOUT_SECONDS=10
DB_QUERY_TIMEOUT_SECONDS=30
DB_MAX_ROWS=1000
DB_MAX_CELL_BYTES=1024
# ── Logging ───────────────────────────────────────────────────────────
LOG_FORMAT=json
LOG_LEVEL=INFO
10. Error Handling Matrix
| Layer | Exception | Handling | What Claude sees |
|---|---|---|---|
| Auth middleware | Bad/missing key | 401 JSON response | {"detail": "Invalid or missing API key"} |
| SecretStore | KeyError (unknown) |
Caught in tool, ctx.error, re-raised |
MCP error response |
| SecretStore | KeyError (expired) |
Same | MCP error response |
| SecretStore | KeyError (consumed) |
Same | MCP error response |
| CyberArk CCP | CyberArkError |
Caught in tool, ctx.error, re-raised |
MCP error response with error code |
| SSH | asyncssh.PermissionDenied |
ctx.error, re-raised |
MCP error response |
| SSH | asyncssh.DisconnectError |
ctx.error, re-raised |
MCP error response |
| SSH | asyncio.TimeoutError |
ctx.error, re-raised |
MCP error response |
| SSH | OSError |
ctx.error, re-raised |
MCP error response |
| SSH | Non-zero exit code | NOT raised — returned in result | Normal result with Exit code: N |
| WinRM | Any exception from pypsrp | ctx.error, re-raised |
MCP error response |
| WinRM | Script errors (had_errors=True) |
NOT raised — returned in result | Normal result with Had errors: True |
| Database | ValueError (bad db_type) |
Raised before credential access | MCP error response |
| Database | Driver exceptions | ctx.error, re-raised |
MCP error response |
| Database | Row cap exceeded | NOT raised — result truncated | Normal result with (capped) note |
11. Audit Event Catalog
All events are emitted via structlog at INFO or WARNING level to the audit logger.
The logger is named "audit" — log shippers can filter on this name.
| Event | Level | When | Key fields |
|---|---|---|---|
credential_fetched |
INFO | CyberArk credential retrieved | app_id, safe, object_name, handle_id, ttl_seconds, client_ip |
handle_resolved |
INFO | Handle consumed by a tool | handle_id, resolved_by, target_host, single_use_invalidated |
handle_expired |
WARNING | Handle TTL exceeded or already consumed | handle_id, reason |
auth_failure |
WARNING | Invalid/missing API key | client_ip, reason |
cyberark_error |
ERROR | CCP returned error | app_id, safe, object_name, status_code, error_code, message |
ssh_executed |
INFO | SSH command completed | handle_id, host, port, username, command, exit_code, elapsed_ms, client_ip |
ps_executed |
INFO | PowerShell script completed | handle_id, host, port, username, script_length, had_errors, elapsed_ms, client_ip |
db_queried |
INFO | Database query completed | handle_id, host, port, database, db_type, username, query_length, row_count, elapsed_ms, client_ip |
Fields intentionally absent from all events:
password(never)secret_handle(never — onlyhandle_idwhich is non-reversible)- stdout / stderr output (may contain sensitive data)
- SQL query text (logged only as
query_length) - PowerShell script text (logged only as
script_length)
12. Test Strategy
Test layout
tests/
├── conftest.py ← shared fixtures and mock helpers
├── test_auth.py ← API key middleware (FastAPI TestClient)
├── test_secret_store.py ← handle lifecycle (pure asyncio)
├── test_cyberark_client.py ← CCP HTTP client (httpx MockTransport)
├── test_ssh_server.py ← SSH tool (mock asyncssh.connect)
├── test_powershell_server.py ← PS tool (mock _run_ps_sync)
├── test_database_server.py ← DB tool (mock _dispatch_query)
└── test_integration.py ← end-to-end pipelines (all mocks combined)
Test patterns
| Pattern | Used for | Why |
|---|---|---|
httpx.MockTransport |
CyberArk client | Tests full HTTP response parsing without real CyberArk |
unittest.mock.patch on transport layer |
SSH, PowerShell, DB tools | Isolates MCP tool logic from network I/O |
Real secret_store |
All tool tests | Tests handle lifecycle end-to-end |
MagicMock for Context |
All tool tests | Tests ctx.info / ctx.error calls without MCP framework |
patch.object(settings, ...) |
Settings-sensitive tests | Overrides config for a test without process restart |
Coverage targets
- Foundation modules: 100%
- CyberArk client: 100% (all HTTP response paths)
- MCP tools: ≥90% (happy path + all error paths)
- Integration flows: key pipelines (CyberArk→SSH, →PS, →DB)
- Known gap: real-system integration tests (require live CyberArk/WinRM/DB)
Running tests
# All tests
python -m pytest tests/ -v
# With coverage
python -m pytest tests/ --cov=src/mcp_privileged --cov-report=term-missing
# Single module
python -m pytest tests/test_integration.py -v