Files
MCP_CyberArk/tests/test_secret_store.py
2026-03-29 19:51:51 +02:00

101 lines
3.5 KiB
Python

"""
Tests for the secret handle store.
Covers: store, resolve, single-use, TTL expiry, revoke, and sweeper.
"""
from __future__ import annotations
import asyncio
import time
import pytest
from mcp_privileged.secret_store import SecretStore, HANDLE_PREFIX
@pytest.fixture
def store() -> SecretStore:
return SecretStore()
async def test_store_returns_handle(store: SecretStore) -> None:
handle = await store.store("user1", "s3cr3t")
assert handle.startswith(HANDLE_PREFIX)
async def test_resolve_returns_credentials(store: SecretStore) -> None:
handle = await store.store("user1", "s3cr3t")
username, password = await store.resolve(handle, resolved_by="test")
assert username == "user1"
assert password == "s3cr3t"
async def test_single_use_invalidates_after_first_resolve(
store: SecretStore, monkeypatch
) -> None:
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_single_use", True)
handle = await store.store("user1", "s3cr3t")
await store.resolve(handle, resolved_by="test")
with pytest.raises(KeyError, match="already_consumed|not found"):
await store.resolve(handle, resolved_by="test")
async def test_multi_use_allows_repeated_resolve(
store: SecretStore, monkeypatch
) -> None:
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_single_use", False)
handle = await store.store("user1", "s3cr3t")
for _ in range(3):
username, password = await store.resolve(handle, resolved_by="test")
assert password == "s3cr3t"
async def test_expired_handle_raises(store: SecretStore, monkeypatch) -> None:
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_ttl_seconds", 1)
handle = await store.store("user1", "s3cr3t")
# Manually backdate the entry's creation time
handle_id = handle[len(HANDLE_PREFIX):]
store._store[handle_id].created_at = time.monotonic() - 5
with pytest.raises(KeyError, match="expired"):
await store.resolve(handle, resolved_by="test")
async def test_unknown_handle_raises(store: SecretStore) -> None:
with pytest.raises(KeyError):
await store.resolve(f"{HANDLE_PREFIX}nonexistent", resolved_by="test")
async def test_invalid_handle_format_raises(store: SecretStore) -> None:
with pytest.raises(ValueError, match="Invalid handle format"):
await store.resolve("not-a-handle", resolved_by="test")
async def test_revoke_removes_handle(store: SecretStore) -> None:
handle = await store.store("user1", "s3cr3t")
assert await store.revoke(handle) is True
with pytest.raises(KeyError):
await store.resolve(handle, resolved_by="test")
async def test_revoke_nonexistent_returns_false(store: SecretStore) -> None:
assert await store.revoke(f"{HANDLE_PREFIX}nonexistent") is False
async def test_purge_expired_removes_stale(store: SecretStore, monkeypatch) -> None:
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_ttl_seconds", 1)
handle = await store.store("user1", "s3cr3t")
handle_id = handle[len(HANDLE_PREFIX):]
store._store[handle_id].created_at = time.monotonic() - 5
count = await store.purge_expired()
assert count == 1
assert handle_id not in store._store
async def test_password_not_in_repr(store: SecretStore) -> None:
"""SecretStr must not leak the password in string representations."""
handle = await store.store("user1", "topsecret")
handle_id = handle[len(HANDLE_PREFIX):]
entry = store._store[handle_id]
assert "topsecret" not in repr(entry)
assert "topsecret" not in str(entry.password)