101 lines
3.5 KiB
Python
101 lines
3.5 KiB
Python
"""
|
|
Tests for the secret handle store.
|
|
Covers: store, resolve, single-use, TTL expiry, revoke, and sweeper.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from mcp_privileged.secret_store import SecretStore, HANDLE_PREFIX
|
|
|
|
|
|
@pytest.fixture
|
|
def store() -> SecretStore:
|
|
return SecretStore()
|
|
|
|
|
|
async def test_store_returns_handle(store: SecretStore) -> None:
|
|
handle = await store.store("user1", "s3cr3t")
|
|
assert handle.startswith(HANDLE_PREFIX)
|
|
|
|
|
|
async def test_resolve_returns_credentials(store: SecretStore) -> None:
|
|
handle = await store.store("user1", "s3cr3t")
|
|
username, password = await store.resolve(handle, resolved_by="test")
|
|
assert username == "user1"
|
|
assert password == "s3cr3t"
|
|
|
|
|
|
async def test_single_use_invalidates_after_first_resolve(
|
|
store: SecretStore, monkeypatch
|
|
) -> None:
|
|
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_single_use", True)
|
|
handle = await store.store("user1", "s3cr3t")
|
|
await store.resolve(handle, resolved_by="test")
|
|
with pytest.raises(KeyError, match="already_consumed|not found"):
|
|
await store.resolve(handle, resolved_by="test")
|
|
|
|
|
|
async def test_multi_use_allows_repeated_resolve(
|
|
store: SecretStore, monkeypatch
|
|
) -> None:
|
|
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_single_use", False)
|
|
handle = await store.store("user1", "s3cr3t")
|
|
for _ in range(3):
|
|
username, password = await store.resolve(handle, resolved_by="test")
|
|
assert password == "s3cr3t"
|
|
|
|
|
|
async def test_expired_handle_raises(store: SecretStore, monkeypatch) -> None:
|
|
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_ttl_seconds", 1)
|
|
handle = await store.store("user1", "s3cr3t")
|
|
# Manually backdate the entry's creation time
|
|
handle_id = handle[len(HANDLE_PREFIX):]
|
|
store._store[handle_id].created_at = time.monotonic() - 5
|
|
with pytest.raises(KeyError, match="expired"):
|
|
await store.resolve(handle, resolved_by="test")
|
|
|
|
|
|
async def test_unknown_handle_raises(store: SecretStore) -> None:
|
|
with pytest.raises(KeyError):
|
|
await store.resolve(f"{HANDLE_PREFIX}nonexistent", resolved_by="test")
|
|
|
|
|
|
async def test_invalid_handle_format_raises(store: SecretStore) -> None:
|
|
with pytest.raises(ValueError, match="Invalid handle format"):
|
|
await store.resolve("not-a-handle", resolved_by="test")
|
|
|
|
|
|
async def test_revoke_removes_handle(store: SecretStore) -> None:
|
|
handle = await store.store("user1", "s3cr3t")
|
|
assert await store.revoke(handle) is True
|
|
with pytest.raises(KeyError):
|
|
await store.resolve(handle, resolved_by="test")
|
|
|
|
|
|
async def test_revoke_nonexistent_returns_false(store: SecretStore) -> None:
|
|
assert await store.revoke(f"{HANDLE_PREFIX}nonexistent") is False
|
|
|
|
|
|
async def test_purge_expired_removes_stale(store: SecretStore, monkeypatch) -> None:
|
|
monkeypatch.setattr("mcp_privileged.secret_store.settings.handle_ttl_seconds", 1)
|
|
handle = await store.store("user1", "s3cr3t")
|
|
handle_id = handle[len(HANDLE_PREFIX):]
|
|
store._store[handle_id].created_at = time.monotonic() - 5
|
|
count = await store.purge_expired()
|
|
assert count == 1
|
|
assert handle_id not in store._store
|
|
|
|
|
|
async def test_password_not_in_repr(store: SecretStore) -> None:
|
|
"""SecretStr must not leak the password in string representations."""
|
|
handle = await store.store("user1", "topsecret")
|
|
handle_id = handle[len(HANDLE_PREFIX):]
|
|
entry = store._store[handle_id]
|
|
assert "topsecret" not in repr(entry)
|
|
assert "topsecret" not in str(entry.password)
|