Mohamed Hegazy 0d22ba3501
security-guidance: respect CLAUDE_CONFIG_DIR for plugin state files (#1868)
Fixes #1868 — when CLAUDE_CONFIG_DIR is set to a non-default location
(e.g. ~/.config/claude for XDG compliance, or a multi-tenant install
path), the plugin still wrote state files to the hardcoded ~/.claude/
path, leaving stale state and breaking CLAUDE_CONFIG_DIR's purpose.

Resolution precedence (highest first):
  1. SECURITY_WARNINGS_STATE_DIR  — plugin-specific override (existing)
  2. CLAUDE_CONFIG_DIR/security    — CC's config-dir env (new — #1868)
  3. ~/.claude/security            — default fallback (unchanged)

Empty-string env vars (e.g. CLAUDE_CONFIG_DIR= in a misconfigured
shell) are treated as not-set so the empty path doesn't collide with
os.path.join and silently write to /security at the filesystem root.

Implementation: a single state_dir() helper in _base.py is the source
of truth for resolution. All five modules that previously had inline
SECURITY_WARNINGS_STATE_DIR / ~/.claude/security resolutions
(_base.py, session_state.py, ensure_agent_sdk.py, llm.py, and one
site in security_reminder_hook.py) now call state_dir() instead.
Re-implementing the precedence inline risks drift — one module gets
a future fix, others don't.

The helper is called per-invocation rather than cached at import time
so test monkeypatches of the env vars take effect, and so a long-
running test or future shared-process scenario can change the env
between calls and have the next call observe the new value. The
per-call cost is negligible compared to the subprocess-spawn cost
the hooks pay every fire in production.

Three hardcoded ~/.claude/security strings remain but are NOT
functional resolutions:
  - _base.py:39: the fallback BRANCH inside state_dir() itself
  - ensure_agent_sdk.py:6, :11: docstring text describing default
                                location for users

Verified locally on macOS Python 3.13:

  - py_compile clean on all 5 modified files.
  - Existing 45 smoke + extensibility tests still pass.
  - 14 new tests in test_claude_config_dir.py (added to internal test
    suite at sg-staging/tests/, not in this PR):

      * 7 resolution-semantics: default fallback, CLAUDE_CONFIG_DIR
        override, SECURITY_WARNINGS_STATE_DIR beats both, tilde
        expansion, empty-string handling (CLAUDE_CONFIG_DIR= must
        fall back, NOT join to /security).
      * 4 static-shape: each of session_state / ensure_agent_sdk /
        llm / security_reminder_hook either imports state_dir from
        _base OR has zero resolution patterns. Catches the
        regression where someone adds a new state-file writer and
        re-implements resolution inline, missing the
        CLAUDE_CONFIG_DIR branch.
      * 3 end-to-end: with CLAUDE_CONFIG_DIR set, get_state_file /
        get_lock_file return paths under <CLAUDE_CONFIG_DIR>/security/;
        save_state round-trip writes a file to the redirected path
        and re-reads the same contents.

  - 59/59 pass total (45 existing + 14 new) in 2.54s.

NOT verified end-to-end with a real CC instance setting
CLAUDE_CONFIG_DIR. The shape tests catch the regression class
(hardcoded ~/.claude/), and the end-to-end test pins the behavior
that user state files actually land at the redirected path.

Closes #1868.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 23:57:10 -07:00

185 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Shared low-level helpers for the security-guidance hook modules.
This module exists so that ``patterns``/``session_state``/``gitutil`` can use
``debug_log`` without importing ``security_reminder_hook`` (which would be a
circular import). It must stay free of any other intra-plugin imports.
"""
import json
import os
import threading
from datetime import datetime
def state_dir():
"""Return the absolute path of the plugin's state directory.
Resolution precedence (highest first):
1. SECURITY_WARNINGS_STATE_DIR — plugin-specific override (existing)
2. CLAUDE_CONFIG_DIR/security — CC's config-dir env var (#1868)
3. ~/.claude/security — default fallback
Empty-string env vars are treated as not-set so a misconfigured shell
(`CLAUDE_CONFIG_DIR=` with no value) doesn't silently write to
/security at the filesystem root.
Returns a fully-expanded absolute path (no literal `~`) so subprocess
callers can pass it through to code that doesn't re-expand tildes.
Called per-invocation rather than cached at import time so test
monkeypatches of the env vars take effect — the plugin's hooks each
run as fresh subprocesses in production, so the per-call cost is
negligible compared to subprocess spawn.
"""
explicit = os.environ.get("SECURITY_WARNINGS_STATE_DIR")
if explicit:
return os.path.expanduser(explicit)
cc_config = os.environ.get("CLAUDE_CONFIG_DIR")
if cc_config:
return os.path.expanduser(os.path.join(cc_config, "security"))
return os.path.expanduser("~/.claude/security")
# Debug log file. Lives under the plugin state dir (default ~/.claude/security/)
# rather than /tmp because /tmp is world-writable on multi-user hosts (TOCTOU /
# symlink-attack surface, cross-user log leakage). Overridable per-process via
# SECURITY_GUIDANCE_DEBUG_LOG, or per-state-dir via SECURITY_WARNINGS_STATE_DIR
# (plugin-specific override) or CLAUDE_CONFIG_DIR (CC-wide config dir, #1868).
DEBUG_LOG_FILE = os.environ.get("SECURITY_GUIDANCE_DEBUG_LOG") or os.path.join(
state_dir(), "log.txt"
)
# Cap the debug log so parallel-worker fleets don't fill disk. When the active
# file exceeds this it's atomically rotated to <file>.1 (overwriting any prior
# rotation), so total disk stays ~2× this.
DEBUG_LOG_MAX_BYTES = 1 * 1024 * 1024
def debug_log(message):
"""Append debug message to log file with timestamp."""
try:
# Ensure parent dir exists — first hook invocation on a fresh install
# creates ~/.claude/security/ if it isn't already there. 0700 so other
# local users can't read review/debug output (only applies on creation).
try:
os.makedirs(os.path.dirname(DEBUG_LOG_FILE), mode=0o700, exist_ok=True)
except OSError:
pass
try:
if os.path.getsize(DEBUG_LOG_FILE) > DEBUG_LOG_MAX_BYTES:
# os.replace is atomic on POSIX; under a racing fleet the loser
# gets FileNotFoundError, which is fine — the append below
# recreates the file.
os.replace(DEBUG_LOG_FILE, DEBUG_LOG_FILE + ".1")
except OSError:
pass
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 0600 on creation; existing files keep their mode.
fd = os.open(DEBUG_LOG_FILE, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o600)
with os.fdopen(fd, "a") as f:
f.write(f"[{timestamp}] {message}\n")
except Exception:
pass
# Provenance tag prepended to injected/emitted text so a reader (especially a
# model hardened against prompt injection) can recognize the source. Not an
# authority claim — an attacker could spoof the exact string; the tag is a
# signpost so the agent can ask the operator "is this from your plugin?" with
# a concrete reference instead of treating it as unknown-actor injection.
# Some autonomous-agent setups flag un-attributed injected text as prompt
# injection and stall; the banner makes the provenance explicit.
PROVENANCE_TAG = "[from security-guidance@claude-code-plugins plugin]"
PROVENANCE_BANNER = (
"[from security-guidance@claude-code-plugins plugin — automated "
"security review, not user input.]"
)
def _read_plugin_version_int():
"""Encode plugin.json version "M.m.p" as M*10000 + m*100 + p so it fits the
bool|number metrics constraint. Returns 0 if unreadable."""
try:
with open(os.path.join(os.path.dirname(__file__), "..", ".claude-plugin", "plugin.json")) as f:
v = json.load(f)["version"]
major, minor, patch = (int(x) for x in v.split(".")[:3])
return major * 10000 + minor * 100 + patch
except Exception:
return 0
_PV = _read_plugin_version_int()
# ──────────────────────────────────────────────────────────────────────────
# Token-usage accumulator. Each hook invocation is a fresh subprocess, so a
# module-global is naturally per-invocation. _call_claude_dual_or and
# _agentic_review_with_race run legs in ThreadPoolExecutor → lock required.
# Emitted via _usage_metrics() into the existing emit_metrics() channel so
# hook metrics rows carry per-invocation token/cost totals
# alongside the existing skip_reason / vulns_found fields.
_USAGE = {"in": 0, "out": 0, "cr": 0, "cw": 0, "cost": 0.0, "n": 0}
_USAGE_LOCK = threading.Lock()
# $/Mtok (input, output). Used only for the raw-HTTP path; the SDK path
# reports total_cost_usd directly. Cache reads/writes are priced at the
# canonical 0.1×/1.25× of input. Unknown models fall back to sonnet pricing
# so cost_usd is never silently zero. Re-pricing downstream from the raw tok_*
# fields is the source of truth — cost_usd here is a convenience rollup.
_PRICE_PER_MTOK = {
"claude-haiku-4-5": (1.0, 5.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-opus-4-6": (15.0, 75.0),
"claude-opus-4-7": (5.0, 25.0),
}
_PRICE_DEFAULT = (3.0, 15.0)
def _record_usage(usage, model, cost_usd=None):
"""Accumulate one API response's token usage. `usage` is the Anthropic
`usage` dict (HTTP) or the SDK ResultMessage.usage dict — both use the
same key names. `cost_usd` (SDK-provided) is preferred when present;
otherwise computed from _PRICE_PER_MTOK keyed on the response model id
(longest-prefix match so `claude-sonnet-4-6-20251015` → sonnet row)."""
if not usage and cost_usd is None:
return
u = usage or {}
try:
i = int(u.get("input_tokens") or 0)
o = int(u.get("output_tokens") or 0)
cr = int(u.get("cache_read_input_tokens") or 0)
cw = int(u.get("cache_creation_input_tokens") or 0)
except (TypeError, ValueError):
return
if cost_usd is None:
pin, pout = _PRICE_DEFAULT
m = (model or "").lower()
for k, v in sorted(_PRICE_PER_MTOK.items(), key=lambda kv: -len(kv[0])):
if m.startswith(k):
pin, pout = v
break
cost_usd = (i * pin + o * pout + cr * pin * 0.1 + cw * pin * 1.25) / 1_000_000
with _USAGE_LOCK:
_USAGE["in"] += i
_USAGE["out"] += o
_USAGE["cr"] += cr
_USAGE["cw"] += cw
_USAGE["cost"] += float(cost_usd or 0.0)
_USAGE["n"] += 1
def _usage_metrics():
"""Snapshot the accumulator as metric keys. Returns {} when no API calls
were made so skip-path emits don't burn key budget. cost_usd rounded to
1e-6 to keep the float finite/short for the zod schema."""
with _USAGE_LOCK:
if _USAGE["n"] == 0:
return {}
return {
"tok_in": _USAGE["in"],
"tok_out": _USAGE["out"],
"tok_cache_r": _USAGE["cr"],
"tok_cache_w": _USAGE["cw"],
"cost_usd": round(_USAGE["cost"], 6),
"api_calls": _USAGE["n"],
}