mirror of
https://github.com/anthropics/claude-plugins-official.git
synced 2026-06-13 22:26:03 -03:00
Fixes #1868 — when CLAUDE_CONFIG_DIR is set to a non-default location (e.g. ~/.config/claude for XDG compliance, or a multi-tenant install path), the plugin still wrote state files to the hardcoded ~/.claude/ path, leaving stale state and breaking CLAUDE_CONFIG_DIR's purpose. Resolution precedence (highest first): 1. SECURITY_WARNINGS_STATE_DIR — plugin-specific override (existing) 2. CLAUDE_CONFIG_DIR/security — CC's config-dir env (new — #1868) 3. ~/.claude/security — default fallback (unchanged) Empty-string env vars (e.g. CLAUDE_CONFIG_DIR= in a misconfigured shell) are treated as not-set so the empty path doesn't collide with os.path.join and silently write to /security at the filesystem root. Implementation: a single state_dir() helper in _base.py is the source of truth for resolution. All five modules that previously had inline SECURITY_WARNINGS_STATE_DIR / ~/.claude/security resolutions (_base.py, session_state.py, ensure_agent_sdk.py, llm.py, and one site in security_reminder_hook.py) now call state_dir() instead. Re-implementing the precedence inline risks drift — one module gets a future fix, others don't. The helper is called per-invocation rather than cached at import time so test monkeypatches of the env vars take effect, and so a long- running test or future shared-process scenario can change the env between calls and have the next call observe the new value. The per-call cost is negligible compared to the subprocess-spawn cost the hooks pay every fire in production. Three hardcoded ~/.claude/security strings remain but are NOT functional resolutions: - _base.py:39: the fallback BRANCH inside state_dir() itself - ensure_agent_sdk.py:6, :11: docstring text describing default location for users Verified locally on macOS Python 3.13: - py_compile clean on all 5 modified files. - Existing 45 smoke + extensibility tests still pass. - 14 new tests in test_claude_config_dir.py (added to internal test suite at sg-staging/tests/, not in this PR): * 7 resolution-semantics: default fallback, CLAUDE_CONFIG_DIR override, SECURITY_WARNINGS_STATE_DIR beats both, tilde expansion, empty-string handling (CLAUDE_CONFIG_DIR= must fall back, NOT join to /security). * 4 static-shape: each of session_state / ensure_agent_sdk / llm / security_reminder_hook either imports state_dir from _base OR has zero resolution patterns. Catches the regression where someone adds a new state-file writer and re-implements resolution inline, missing the CLAUDE_CONFIG_DIR branch. * 3 end-to-end: with CLAUDE_CONFIG_DIR set, get_state_file / get_lock_file return paths under <CLAUDE_CONFIG_DIR>/security/; save_state round-trip writes a file to the redirected path and re-reads the same contents. - 59/59 pass total (45 existing + 14 new) in 2.54s. NOT verified end-to-end with a real CC instance setting CLAUDE_CONFIG_DIR. The shape tests catch the regression class (hardcoded ~/.claude/), and the end-to-end test pins the behavior that user state files actually land at the redirected path. Closes #1868. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
162 lines
5.5 KiB
Python
162 lines
5.5 KiB
Python
"""
|
|
Per-session state-file plumbing for the security-guidance plugin.
|
|
|
|
Holds the JSON state file location, fcntl-locked read-modify-write helper,
|
|
and old-file GC. Side-effect-free at import time (no env-var reads beyond
|
|
``CLAUDE_CODE_REMOTE_SESSION_ID`` inside the helpers).
|
|
|
|
The ``atomic_check_*`` helpers that build on ``with_locked_state`` deliberately
|
|
remain in ``security_reminder_hook.py`` so that tests which monkeypatch
|
|
``hook.with_locked_state`` and then call a handler still see the patched
|
|
binding via the handler → ``atomic_check_*`` → bare-name lookup chain.
|
|
"""
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
fcntl = None
|
|
import json
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from _base import debug_log, state_dir as _state_dir
|
|
|
|
|
|
def _state_key(session_id):
|
|
# In CCR each user turn is a new CC process with a fresh session_id; the
|
|
# remote session ID is stable across those restarts. Prefer it so the
|
|
# pending-warnings sweep and any unprocessed touched_paths survive.
|
|
key = os.environ.get("CLAUDE_CODE_REMOTE_SESSION_ID") or session_id
|
|
# The key becomes a filename component under the state dir. CC session ids
|
|
# are UUIDs (sanitization is a no-op for them), but nothing in the hook
|
|
# protocol guarantees that, so strip path separators and anything else
|
|
# that could escape the state dir, and bound the length.
|
|
return re.sub(r"[^A-Za-z0-9._-]", "_", str(key))[:128]
|
|
|
|
|
|
def get_state_file(session_id):
|
|
"""Get session-specific state file path."""
|
|
state_dir = _state_dir()
|
|
return os.path.join(state_dir, f"security_warnings_state_{_state_key(session_id)}.json")
|
|
|
|
|
|
def get_lock_file(session_id):
|
|
"""Get session-specific lock file path."""
|
|
state_dir = _state_dir()
|
|
return os.path.join(state_dir, f"security_warnings_state_{_state_key(session_id)}.lock")
|
|
|
|
|
|
def cleanup_old_state_files():
|
|
"""Remove state files and lock files older than 30 days."""
|
|
try:
|
|
state_dir = _state_dir()
|
|
if not os.path.exists(state_dir):
|
|
return
|
|
|
|
current_time = datetime.now().timestamp()
|
|
thirty_days_ago = current_time - (30 * 24 * 60 * 60)
|
|
|
|
for filename in os.listdir(state_dir):
|
|
if filename.startswith("security_warnings_state_") and (
|
|
filename.endswith(".json") or filename.endswith(".lock")
|
|
):
|
|
file_path = os.path.join(state_dir, filename)
|
|
try:
|
|
file_mtime = os.path.getmtime(file_path)
|
|
if file_mtime < thirty_days_ago:
|
|
os.remove(file_path)
|
|
except (OSError, IOError):
|
|
pass
|
|
|
|
# Sweep legacy lock files left at ~/.claude/ root by versions
|
|
# <1.1.66, where get_lock_file() didn't honor state_dir. Same
|
|
# 30-day mtime gate as above so we don't race an older
|
|
# concurrent peer that may still hold an active lock.
|
|
legacy_dir = os.path.expanduser("~/.claude")
|
|
for filename in os.listdir(legacy_dir):
|
|
if filename.startswith("security_warnings_state_") and filename.endswith(".lock"):
|
|
file_path = os.path.join(legacy_dir, filename)
|
|
try:
|
|
if os.path.getmtime(file_path) < thirty_days_ago:
|
|
os.remove(file_path)
|
|
except (OSError, IOError):
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def load_state(session_id):
|
|
"""Load the full state dict from file."""
|
|
state_file = get_state_file(session_id)
|
|
try:
|
|
with open(state_file, "r") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
return {"shown_warnings": data}
|
|
if isinstance(data, dict):
|
|
data.setdefault("shown_warnings", [])
|
|
return data
|
|
except (json.JSONDecodeError, IOError, KeyError, TypeError):
|
|
pass
|
|
return {"shown_warnings": []}
|
|
|
|
|
|
def save_state(session_id, state):
|
|
"""Save the full state dict to file."""
|
|
state_file = get_state_file(session_id)
|
|
try:
|
|
state_dir = os.path.dirname(state_file)
|
|
if state_dir:
|
|
os.makedirs(state_dir, exist_ok=True)
|
|
|
|
with open(state_file, "w") as f:
|
|
json.dump(state, f)
|
|
except (IOError, OSError) as e:
|
|
debug_log(f"Failed to save state file {state_file}: {e}")
|
|
|
|
|
|
def with_locked_state(session_id, callback):
|
|
"""
|
|
Execute callback with exclusive access to the state file.
|
|
The callback receives the state dict and can modify it in place.
|
|
State is saved after the callback returns.
|
|
Returns the callback's return value.
|
|
"""
|
|
lock_file = get_lock_file(session_id)
|
|
state_dir = os.path.dirname(lock_file)
|
|
|
|
try:
|
|
os.makedirs(state_dir, exist_ok=True)
|
|
except OSError:
|
|
pass
|
|
|
|
if fcntl is None:
|
|
# No file locking available (Windows) — run without locking
|
|
state = load_state(session_id)
|
|
result = callback(state)
|
|
save_state(session_id, state)
|
|
return result
|
|
|
|
lock_fd = None
|
|
try:
|
|
lock_fd = os.open(lock_file, os.O_RDWR | os.O_CREAT)
|
|
fcntl.flock(lock_fd, fcntl.LOCK_EX)
|
|
|
|
state = load_state(session_id)
|
|
result = callback(state)
|
|
save_state(session_id, state)
|
|
return result
|
|
|
|
except (OSError, IOError) as e:
|
|
debug_log(f"Lock/state operation failed: {e}")
|
|
return None
|
|
|
|
finally:
|
|
if lock_fd is not None:
|
|
try:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
|
os.close(lock_fd)
|
|
except (OSError, IOError):
|
|
pass
|
|
|