Context Compression: Keeping Long Conversations Inside Token Budgets
How Hermes detects context pressure, summarizes the middle of a conversation, and hands off to itself without losing the thread
What you will learn
- How the `ContextEngine` abstract base class separates the compression contract from any particular implementation, letting third-party engines replace the default compressor via config
- What token threshold signals a compression event, and why there is a built-in anti-thrashing guard that can veto a compression even when the threshold is crossed
- How `ContextCompressor.compress()` slices a conversation into three zones -- protected head, compressible middle, and token-budgeted tail -- before any LLM call is made
- Why `trajectory_compressor.py` takes a fundamentally different approach to compression, working on completed RL training trajectories rather than live chat sessions
- How the boundary functions prevent orphaned tool-call/result pairs after compression changes message indices
- What the `SUMMARY_PREFIX` constant communicates to the downstream model about how to treat the injected summary
- How `manual_compression_feedback.py` closes the feedback loop by reporting whether compression actually changed anything
Prerequisites
- Familiar with the OpenAI message format (`role`, `content`, `tool_calls`, `tool_use`)
- Basic understanding of context windows and why token limits matter for LLMs
- Python reading comfort; no need to run the code
The Context Engine — The Top-Level Coordinator
agent/context_engine.py:1The abstract base class that defines the compression contract
ContextEngine is an abstract base class that requires subclasses to implement update_from_response (called after every API response to track token usage), should_compress (the trigger check), and compress (the compaction itself). The class-level attributes last_prompt_tokens, threshold_tokens, context_length, and compression_count are not private state — run_agent.py reads them directly by name, so they form a data contract alongside the method interface.
update_model sets threshold_tokens = int(context_length * threshold_percent). Subclasses that track additional budgets override this to stay synchronized when the user switches models. The focus_topic parameter on compress() is the hook for /compress <topic>, which directs the summarizer toward a specific subject area.
ContextEngine defines three abstract methods and a fixed set of public attributes that the runtime reads directly; subclasses must satisfy both sides of that contract.
---
"""Abstract base class for pluggable context engines.
A context engine controls how conversation context is managed when
approaching the model's token limit. The built-in ContextCompressor
is the default implementation. Third-party engines (e.g. LCM) can
replace it via the plugin system or by being placed in the
``plugins/context_engine/<name>/`` directory.
Selection is config-driven: ``context.engine`` in config.yaml.
Default is ``"compressor"`` (the built-in). Only one engine is active.
The engine is responsible for:
- Deciding when compaction should fire
- Performing compaction (summarization, DAG construction, etc.)
- Optionally exposing tools the agent can call (e.g. lcm_grep)
- Tracking token usage from API responses
Lifecycle:
1. Engine is instantiated and registered (plugin register() or default)
2. on_session_start() called when a conversation begins
3. update_from_response() called after each API response with usage data
4. should_compress() checked after each turn
5. compress() called when should_compress() returns True
6. on_session_end() called at real session boundaries (CLI exit, /reset,
gateway session expiry) — NOT per-turn
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List
class ContextEngine(ABC):
"""Base class all context engines must implement."""
# -- Identity ----------------------------------------------------------
@property
@abstractmethod
def name(self) -> str:
"""Short identifier (e.g. 'compressor', 'lcm')."""
# -- Token state (read by run_agent.py for display/logging) ------------
#
# Engines MUST maintain these. run_agent.py reads them directly.
last_prompt_tokens: int = 0
last_completion_tokens: int = 0
last_total_tokens: int = 0
threshold_tokens: int = 0
context_length: int = 0
compression_count: int = 0
# -- Compaction parameters (read by run_agent.py for preflight) --------
#
# These control the preflight compression check. Subclasses may
# override via __init__ or property; defaults are sensible for most
# engines.
threshold_percent: float = 0.75
protect_first_n: int = 3
protect_last_n: int = 6
# -- Core interface ----------------------------------------------------
@abstractmethod
def update_from_response(self, usage: Dict[str, Any]) -> None:
"""Update tracked token usage from an API response.
Called after every LLM call with the usage dict from the response.
"""
@abstractmethod
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Return True if compaction should fire this turn."""
@abstractmethod
def compress(
self,
messages: List[Dict[str, Any]],
current_tokens: int = None,
focus_topic: str = None,
) -> List[Dict[str, Any]]:
"""Compact the message list and return the new message list.
This is the main entry point. The engine receives the full message
list and returns a (possibly shorter) list that fits within the
context budget. The implementation is free to summarize, build a
DAG, or do anything else — as long as the returned list is a valid
OpenAI-format message sequence.
Args:
focus_topic: Optional topic string from manual ``/compress <focus>``.
Engines that support guided compression should prioritise
preserving information related to this topic. Engines that
don't support it may simply ignore this argument.
"""
# -- Optional: pre-flight check ----------------------------------------
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick rough check before the API call (no real token count yet).
Default returns False (skip pre-flight). Override if your engine
can do a cheap estimate.
"""
return False
# -- Optional: manual /compress preflight ------------------------------
def has_content_to_compress(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick check: is there anything in ``messages`` that can be compacted?
Used by the gateway ``/compress`` command as a preflight guard —
returning False lets the gateway report "nothing to compress yet"
without making an LLM call.
Default returns True (always attempt). Engines with a cheap way
to introspect their own head/tail boundaries should override this
to return False when the transcript is still entirely protected.
"""
return True
# -- Optional: session lifecycle ---------------------------------------
def on_session_start(self, session_id: str, **kwargs) -> None:
"""Called when a new conversation session begins.
Use this to load persisted state (DAG, store) for the session.
kwargs may include hermes_home, platform, model, etc.
"""
def on_session_end(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
"""Called at real session boundaries (CLI exit, /reset, gateway expiry).
Use this to flush state, close DB connections, etc.
NOT called per-turn — only when the session truly ends.
"""
def on_session_reset(self) -> None:
"""Called on /new or /reset. Reset per-session state.
Default resets compression_count and token tracking.
"""
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
self.compression_count = 0
# -- Optional: tools ---------------------------------------------------
def get_tool_schemas(self) -> List[Dict[str, Any]]:
"""Return tool schemas this engine provides to the agent.
Default returns empty list (no tools). LCM would return schemas
for lcm_grep, lcm_describe, lcm_expand here.
"""
return []
def handle_tool_call(self, name: str, args: Dict[str, Any], **kwargs) -> str:
"""Handle a tool call from the agent.
Only called for tool names returned by get_tool_schemas().
Must return a JSON string.
kwargs may include:
messages: the current in-memory message list (for live ingestion)
"""
import json
return json.dumps({"error": f"Unknown context engine tool: {name}"})
# -- Optional: status / display ----------------------------------------
def get_status(self) -> Dict[str, Any]:
"""Return status dict for display/logging.
Default returns the standard fields run_agent.py expects.
"""
return {
"last_prompt_tokens": self.last_prompt_tokens,
"threshold_tokens": self.threshold_tokens,
"context_length": self.context_length,
"usage_percent": (
min(100, self.last_prompt_tokens / self.context_length * 100)
if self.context_length else 0
),
"compression_count": self.compression_count,
}
# -- Optional: model switch support ------------------------------------
def update_model(
self,
model: str,
context_length: int,
base_url: str = "",
api_key: str = "",
provider: str = "",
) -> None:
"""Called when the user switches models or on fallback activation.
Default updates context_length and recalculates threshold_tokens
from threshold_percent. Override if your engine needs more
(e.g. recalculate DAG budgets, switch summary models).
"""
self.context_length = context_length
self.threshold_tokens = int(context_length * self.threshold_percent)When Does Compression Trigger?
agent/context_compressor.py:465The threshold check and anti-thrashing guard in should_compress
threshold_tokens is computed as max(int(context_length * threshold_percent), MINIMUM_CONTEXT_LENGTH), where threshold_percent defaults to 0.50. The floor prevents premature triggering on very large context models where 50% of the window would be a token count that almost never fires. After each compression pass, the compressor records whether it saved at least 10% of tokens. Two consecutive passes below that threshold set _ineffective_compression_count to 2, and should_compress returns False even when the token count still exceeds the threshold.
The guard addresses a specific failure mode: when head and tail protection covers most of the conversation, the compressible middle shrinks to a handful of turns. Without this check, the compressor triggers, generates a dense summary that saves fewer than 10% of tokens, then triggers again on the next turn, burning LLM calls with no meaningful reduction.
should_compress is a two-condition gate: token count must exceed threshold_tokens, and _ineffective_compression_count must be below 2. Either condition alone is not sufficient.
---
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Check if context exceeds the compression threshold.
Includes anti-thrashing protection: if the last two compressions
each saved less than 10%, skip compression to avoid infinite loops
where each pass removes only 1-2 messages.
"""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
if tokens < self.threshold_tokens:
return False
# Anti-thrashing: back off if recent compressions were ineffective
if self._ineffective_compression_count >= 2:
if not self.quiet_mode:
logger.warning(
"Compression skipped — last %d compressions saved <10%% each. "
"Consider /new to start a fresh session, or /compress <topic> "
"for focused compression.",
self._ineffective_compression_count,
)
return False
return True
# ------------------------------------------------------------------The Compressor Loop
agent/context_compressor.py:1235How compress() partitions a conversation and orchestrates the summarization pipeline
compress() runs four phases in sequence:
_prune_old_tool_resultsreplaces old tool output with one-line stubs (e.g.,[terminal] ran \npm test\-> exit 0, 47 lines output). No LLM call; this cuts tokens before summarization starts.- Boundary resolution: the first
protect_first_nmessages become the fixed head, the most recent messages up totail_token_budgettokens become the fixed tail, and everything between is the compressible middle. _generate_summaryruns on the middle slice, withfocus_topicforwarded from the/compress <topic>command if present.- Reassembly: the protected head (with a compression note appended to the system message), the summary as a single injected message, then the protected tail.
The input messages list is never modified. compress() always returns a new list. The boundary alignment calls (_align_boundary_forward, _find_tail_cut_by_tokens) run before _generate_summary to ensure the cut never lands inside a tool-call/result pair.
compress() is a four-phase pipeline (prune, partition, summarize, reassemble) that returns a new list and never mutates its input.
---
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None, focus_topic: str = None) -> List[Dict[str, Any]]:
"""Compress conversation messages by summarizing middle turns.
Algorithm:
1. Prune old tool results (cheap pre-pass, no LLM call)
2. Protect head messages (system prompt + first exchange)
3. Find tail boundary by token budget (~20K tokens of recent context)
4. Summarize middle turns with structured LLM prompt
5. On re-compression, iteratively update the previous summary
After compression, orphaned tool_call / tool_result pairs are cleaned
up so the API never receives mismatched IDs.
Args:
focus_topic: Optional focus string for guided compression. When
provided, the summariser will prioritise preserving information
related to this topic and be more aggressive about compressing
everything else. Inspired by Claude Code's ``/compact``.
"""
# Reset per-call summary failure state — callers inspect these fields
# after compress() returns to decide whether to surface a warning.
self._last_summary_dropped_count = 0
self._last_summary_fallback_used = False
self._last_summary_error = None
self._last_aux_model_failure_error = None
self._last_aux_model_failure_model = None
n_messages = len(messages)
# Only need head + 3 tail messages minimum (token budget decides the real tail size)
_min_for_compress = self.protect_first_n + 3 + 1
if n_messages <= _min_for_compress:
if not self.quiet_mode:
logger.warning(
"Cannot compress: only %d messages (need > %d)",
n_messages, _min_for_compress,
)
return messages
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
# Phase 1: Prune old tool results (cheap, no LLM call)
messages, pruned_count = self._prune_old_tool_results(
messages, protect_tail_count=self.protect_last_n,
protect_tail_tokens=self.tail_token_budget,
)
if pruned_count and not self.quiet_mode:
logger.info("Pre-compression: pruned %d old tool result(s)", pruned_count)
# Phase 2: Determine boundaries
compress_start = self.protect_first_n
compress_start = self._align_boundary_forward(messages, compress_start)
# Use token-budget tail protection instead of fixed message count
compress_end = self._find_tail_cut_by_tokens(messages, compress_start)
if compress_start >= compress_end:
return messages
turns_to_summarize = messages[compress_start:compress_end]
if not self.quiet_mode:
logger.info(
"Context compression triggered (%d tokens >= %d threshold)",
display_tokens,
self.threshold_tokens,
)
logger.info(
"Model context limit: %d tokens (%.0f%% = %d)",
self.context_length,
self.threshold_percent * 100,
self.threshold_tokens,
)
tail_msgs = n_messages - compress_end
logger.info(
"Summarizing turns %d-%d (%d turns), protecting %d head + %d tail messages",
compress_start + 1,
compress_end,
len(turns_to_summarize),
compress_start,
tail_msgs,
)
# Phase 3: Generate structured summary
summary = self._generate_summary(turns_to_summarize, focus_topic=focus_topic)
# Phase 4: Assemble compressed message list
compressed = []
for i in range(compress_start):
msg = messages[i].copy()
if i == 0 and msg.get("role") == "system":
existing = msg.get("content")
_compression_note = "[Note: Some earlier conversation turns have been compacted into a handoff summary to preserve context space. The current session state may still reflect earlier work, so build on that summary and state rather than re-doing work.]"
if _compression_note not in _content_text_for_contains(existing):
msg["content"] = _append_text_to_content(
existing,
"\n\n" + _compression_note if isinstance(existing, str) and existing else _compression_note,
)
compressed.append(msg)
# If LLM summary failed, insert a static fallback so the model
# knows context was lost rather than silently dropping everything.
if not summary:
if not self.quiet_mode:
logger.warning("Summary generation failed — inserting static fallback context marker")
n_dropped = compress_end - compress_start
self._last_summary_dropped_count = n_dropped
self._last_summary_fallback_used = True
summary = (
f"{SUMMARY_PREFIX}\n"
f"Summary generation was unavailable. {n_dropped} message(s) were "
f"removed to free context space but could not be summarized. The removed "
f"messages contained earlier work in this session. Continue based on the "
f"recent messages below and the current state of any files or resources."
)
_merge_summary_into_tail = False
last_head_role = messages[compress_start - 1].get("role", "user") if compress_start > 0 else "user"
first_tail_role = messages[compress_end].get("role", "user") if compress_end < n_messages else "user"
# Pick a role that avoids consecutive same-role with both neighbors.
# Priority: avoid colliding with head (already committed), then tail.
if last_head_role in ("assistant", "tool"):
summary_role = "user"
else:
summary_role = "assistant"
# If the chosen role collides with the tail AND flipping wouldn't
# collide with the head, flip it.
if summary_role == first_tail_role:Trajectory-Level Compression
trajectory_compressor.py:332How TrajectoryCompressor compresses completed RL trajectories rather than live chat sessions
ContextCompressor operates on OpenAI-format dicts with role and content. TrajectoryCompressor operates on ShareGPT-format turns with from and value — the format used for RL fine-tuning datasets. Its purpose is batch preprocessing: reduce training trajectories to fit within a training context window before they enter a fine-tuning run.
The algorithm is greedy. It computes tokens_to_save = total_tokens - target_max_tokens, then accumulates turns forward from compress_start until their combined token count reaches tokens_to_save + summary_target_tokens. The extra summary_target_tokens reserves budget for the summary text itself. The summary is inserted as a human turn because training data expects a human message to open the context. The still_over_limit flag on TrajectoryMetrics lets batch orchestrators identify trajectories that remain over budget after one compression pass and require further handling.
TrajectoryCompressor uses a greedy forward-accumulation algorithm on ShareGPT-format data. It shares the head/tail protection design with ContextCompressor but runs offline on training data, not at inference time.
---
class TrajectoryCompressor:
"""
Compresses agent trajectories to fit within a target token budget.
Compression strategy:
1. Keep protected head turns (system, human, first gpt+tool)
2. Keep protected tail turns (last N turns)
3. From the compressible middle region, compress only as much as needed
4. Replace compressed turns with a single human summary message
5. Keep remaining middle turns intact (model continues with tools)
"""
def __init__(self, config: CompressionConfig):
"""Initialize the compressor."""
def compress_trajectory(
self,
trajectory: List[Dict[str, str]]
) -> Tuple[List[Dict[str, str]], TrajectoryMetrics]:
"""
Compress a single trajectory to fit within target token budget.
Algorithm:
1. Count total tokens
2. If under target, skip
3. Find compressible region (between protected head and tail)
4. Calculate how many tokens need to be saved
5. Accumulate turns from start of compressible region until savings met
6. Replace accumulated turns with single human summary
7. Keep remaining turns intact
Args:
trajectory: List of conversation turns
Returns:
Tuple of (compressed_trajectory, metrics)
"""
metrics = TrajectoryMetrics()
metrics.original_turns = len(trajectory)
# Count tokens per turn
turn_tokens = self.count_turn_tokens(trajectory)
total_tokens = sum(turn_tokens)
metrics.original_tokens = total_tokens
# Check if compression needed
if total_tokens <= self.config.target_max_tokens:
metrics.skipped_under_target = True
metrics.compressed_tokens = total_tokens
metrics.compressed_turns = len(trajectory)
metrics.compression_ratio = 1.0
return trajectory, metrics
# Find protected regions
protected, compress_start, compress_end = self._find_protected_indices(trajectory)
# Check if there's anything to compress
if compress_start >= compress_end:
# Nothing to compress, return as-is
metrics.compressed_tokens = total_tokens
metrics.compressed_turns = len(trajectory)
metrics.still_over_limit = total_tokens > self.config.target_max_tokens
return trajectory, metrics
# Calculate how much we need to save
tokens_to_save = total_tokens - self.config.target_max_tokens
# We'll replace N turns with 1 summary turn
# Net savings = (sum of N turns' tokens) - summary_target_tokens
# We need: net_savings >= tokens_to_save
# So: sum of turns >= tokens_to_save + summary_target_tokens
target_tokens_to_compress = tokens_to_save + self.config.summary_target_tokens
# Accumulate turns from compress_start until we have enough savings
accumulated_tokens = 0
compress_until = compress_start
for i in range(compress_start, compress_end):
accumulated_tokens += turn_tokens[i]
compress_until = i + 1 # Exclusive end
# Check if we have enough savings
if accumulated_tokens >= target_tokens_to_compress:
break
# If we still don't have enough savings, compress the entire compressible region
if accumulated_tokens < target_tokens_to_compress and compress_until < compress_end:
compress_until = compress_end
accumulated_tokens = sum(turn_tokens[compress_start:compress_end])
# Record compression region
metrics.turns_compressed_start_idx = compress_start
metrics.turns_compressed_end_idx = compress_until
metrics.turns_in_compressed_region = compress_until - compress_start
# Extract content for summary
content_to_summarize = self._extract_turn_content_for_summary(
trajectory, compress_start, compress_until
)
# Generate summary
summary = self._generate_summary(content_to_summarize, metrics)
# Build compressed trajectory
compressed = []
# Add head (turns before compression region)
for i in range(compress_start):
turn = trajectory[i].copy()
# Add notice to system message
if turn.get("from") == "system" and self.config.add_summary_notice:
turn["value"] = turn["value"] + self.config.summary_notice_text
compressed.append(turn)
# Add summary as human message
compressed.append({
"from": "human",
"value": summary
})
# Add tail (turns after compression region)
for i in range(compress_until, len(trajectory)):
compressed.append(trajectory[i].copy())
# Calculate final metrics
metrics.compressed_turns = len(compressed)
metrics.compressed_tokens = self.count_trajectory_tokens(compressed)
metrics.turns_removed = metrics.original_turns - metrics.compressed_turns
metrics.tokens_saved = metrics.original_tokens - metrics.compressed_tokens
metrics.compression_ratio = metrics.compressed_tokens / max(metrics.original_tokens, 1)
metrics.was_compressed = True
metrics.still_over_limit = metrics.compressed_tokens > self.config.target_max_tokens
return compressed, metrics
async def compress_trajectory_async(
self,
trajectory: List[Dict[str, str]]
) -> Tuple[List[Dict[str, str]], TrajectoryMetrics]:
"""What Gets Preserved vs Dropped
agent/context_compressor.py:482How the boundary functions decide which messages stay verbatim and which get summarized
Which messages are preserved verbatim is determined by position and token count, not by the model. The first protect_first_n messages form the fixed head. The most recent messages up to tail_token_budget tokens form the fixed tail. Everything in between is a summarization candidate.
The boundary alignment functions handle the edge cases that positional logic alone would get wrong. _align_boundary_forward advances the compress-start index past any leading tool results; without it, the compressible region could begin on a tool result whose matching assistant call is in the protected head, producing an orphaned result with no corresponding call. _align_boundary_backward is the symmetric fix for the compress-end boundary: if the tail cut lands inside a run of tool results, it walks backward to include the entire assistant-plus-results group in the compressible middle. The soft_ceiling of 1.5 * tail_token_budget in _find_tail_cut_by_tokens prevents the backward walk from stopping mid-message when a single large turn (such as a file read) straddles the budget boundary.
Head and tail boundaries are fixed by count and token budget. The alignment functions (_align_boundary_forward, _align_boundary_backward) exist solely to prevent cuts that would split a tool-call/result pair across the boundary.
---
self._ineffective_compression_count,
)
return False
return True
# ------------------------------------------------------------------
# Tool output pruning (cheap pre-pass, no LLM call)
# ------------------------------------------------------------------
def _prune_old_tool_results(
self, messages: List[Dict[str, Any]], protect_tail_count: int,
protect_tail_tokens: int | None = None,
) -> tuple[List[Dict[str, Any]], int]:
"""Replace old tool result contents with informative 1-line summaries.
Instead of a generic placeholder, generates a summary like::
[terminal] ran `npm test` -> exit 0, 47 lines output
[read_file] read config.py from line 1 (3,400 chars)
Also deduplicates identical tool results (e.g. reading the same file
5x keeps only the newest full copy) and truncates large tool_call
arguments in assistant messages outside the protected tail.
Walks backward from the end, protecting the most recent messages that
fall within ``protect_tail_tokens`` (when provided) OR the last
``protect_tail_count`` messages (backward-compatible default).
When both are given, the token budget takes priority and the message
count acts as a hard minimum floor.
Returns (pruned_messages, pruned_count).
"""
if not messages:
return messages, 0
result = [m.copy() for m in messages]
pruned = 0
# Build index: tool_call_id -> (tool_name, arguments_json)
call_id_to_tool: Dict[str, tuple] = {}
for msg in result:
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
if isinstance(tc, dict):
cid = tc.get("id", "")
fn = tc.get("function", {})
call_id_to_tool[cid] = (fn.get("name", "unknown"), fn.get("arguments", ""))
else:
cid = getattr(tc, "id", "") or ""
fn = getattr(tc, "function", None)
name = getattr(fn, "name", "unknown") if fn else "unknown"
args_str = getattr(fn, "arguments", "") if fn else ""
call_id_to_tool[cid] = (name, args_str)
# Determine the prune boundary
if protect_tail_tokens is not None and protect_tail_tokens > 0:
# Token-budget approach: walk backward accumulating tokens
accumulated = 0
boundary = len(result)
min_protect = min(protect_tail_count, len(result) - 1)
for i in range(len(result) - 1, -1, -1):
msg = result[i]
raw_content = msg.get("content") or ""
content_len = _content_length_for_budget(raw_content)
msg_tokens = content_len // _CHARS_PER_TOKEN + 10
for tc in msg.get("tool_calls") or []:
if isinstance(tc, dict):
args = tc.get("function", {}).get("arguments", "")
msg_tokens += len(args) // _CHARS_PER_TOKEN
if accumulated + msg_tokens > protect_tail_tokens and (len(result) - i) >= min_protect:
boundary = i
break
accumulated += msg_tokens
boundary = i
prune_boundary = max(boundary, len(result) - min_protect)
else:
prune_boundary = len(result) - protect_tail_count
def _align_boundary_forward(self, messages: List[Dict[str, Any]], idx: int) -> int:
"""Push a compress-start boundary forward past any orphan tool results.
If ``messages[idx]`` is a tool result, slide forward until we hit a
non-tool message so we don't start the summarised region mid-group.
"""
while idx < len(messages) and messages[idx].get("role") == "tool":
idx += 1
return idx
def _align_boundary_backward(self, messages: List[Dict[str, Any]], idx: int) -> int:
"""Pull a compress-end boundary backward to avoid splitting a
tool_call / result group.
If the boundary falls in the middle of a tool-result group (i.e.
there are consecutive tool messages before ``idx``), walk backward
past all of them to find the parent assistant message. If found,
move the boundary before the assistant so the entire
assistant + tool_results group is included in the summarised region
rather than being split (which causes silent data loss when
``_sanitize_tool_pairs`` removes the orphaned tail results).
"""
if idx <= 0 or idx >= len(messages):
return idx
# Walk backward past consecutive tool results
check = idx - 1
while check >= 0 and messages[check].get("role") == "tool":
check -= 1
# If we landed on the parent assistant with tool_calls, pull the
# boundary before it so the whole group gets summarised together.
if check >= 0 and messages[check].get("role") == "assistant" and messages[check].get("tool_calls"):
idx = check
return idx
# ------------------------------------------------------------------
# Tail protection by token budget
def _find_tail_cut_by_tokens(
self, messages: List[Dict[str, Any]], head_end: int,
token_budget: int | None = None,
) -> int:
"""Walk backward from the end of messages, accumulating tokens until
the budget is reached. Returns the index where the tail starts.
``token_budget`` defaults to ``self.tail_token_budget`` which is
derived from ``summary_target_ratio * context_length``, so it
scales automatically with the model's context window.
Token budget is the primary criterion. A hard minimum of 3 messages
is always protected, but the budget is allowed to exceed by up to
1.5x to avoid cutting inside an oversized message (tool output, file
read, etc.). If even the minimum 3 messages exceed 1.5x the budget
the cut is placed right after the head so compression still runs.
Never cuts inside a tool_call/result group. Always ensures the most
recent user message is in the tail (see ``_ensure_last_user_message_in_tail``).
"""
if token_budget is None:
token_budget = self.tail_token_budget
n = len(messages)
# Hard minimum: always keep at least 3 messages in the tail
min_tail = min(3, n - head_end - 1) if n - head_end > 1 else 0
soft_ceiling = int(token_budget * 1.5)
accumulated = 0
cut_idx = n # start from beyond the end
for i in range(n - 1, head_end - 1, -1):
msg = messages[i]
raw_content = msg.get("content") or ""
content_len = _content_length_for_budget(raw_content)
msg_tokens = content_len // _CHARS_PER_TOKEN + 10 # +10 for role/metadata
# Include tool call arguments in estimate
for tc in msg.get("tool_calls") or []:
if isinstance(tc, dict):
args = tc.get("function", {}).get("arguments", "")
msg_tokens += len(args) // _CHARS_PER_TOKEN
# Stop once we exceed the soft ceiling (unless we haven't hit min_tail yet)
if accumulated + msg_tokens > soft_ceiling and (n - i) >= min_tail:
break
accumulated += msg_tokens
cut_idx = i
# Ensure we protect at least min_tail messages
fallback_cut = n - min_tail
if cut_idx > fallback_cut:
cut_idx = fallback_cut
# If the token budget would protect everything (small conversations),
# force a cut after the head so compression can still remove middle turns.
if cut_idx <= head_end:
cut_idx = max(fallback_cut, head_end + 1)
# Align to avoid splitting tool groups
cut_idx = self._align_boundary_backward(messages, cut_idx)
# Ensure the most recent user message is always in the tail so the
# active task is never lost to compression (fixes #10896).
cut_idx = self._ensure_last_user_message_in_tail(messages, cut_idx, head_end)
return max(cut_idx, head_end + 1)
# ------------------------------------------------------------------
# ContextEngine: manual /compress preflight
# ------------------------------------------------------------------
def has_content_to_compress(self, messages: List[Dict[str, Any]]) -> bool:
"""Return True if there is a non-empty middle region to compact.
Overrides the ABC default so the gateway ``/compress`` guard can
skip the LLM call when the transcript is still entirely inside
the protected head/tail.
"""
compress_start = self._align_boundary_forward(messages, self.protect_first_n)
compress_end = self._find_tail_cut_by_tokens(messages, compress_start)
return compress_start < compress_end
The Compressed Summary Format
agent/context_compressor.py:38The SUMMARY_PREFIX constant and the structured template that constructs compressed memory
_summarizer_preamble = (
"You are a summarization agent creating a context checkpoint. "
"Your output will be injected as reference material for a DIFFERENT "
"assistant that continues the conversation. "
"Do NOT respond to any questions or requests in the conversation — "
"only output the structured summary. "
"Do NOT include any preamble, greeting, or prefix. "
"Write the summary in the same language the user was using in the "
"conversation — do not translate or switch to English. "
"NEVER include API keys, tokens, passwords, secrets, credentials, "
"or connection strings in the summary — replace any that appear "
"with [REDACTED]. Note that the user had credentials present, but "
"do not preserve their values."
) @staticmethod
def _with_summary_prefix(summary: str) -> str:
"""Normalize summary text to the current compaction handoff format."""
text = (summary or "").strip()
for prefix in (LEGACY_SUMMARY_PREFIX, SUMMARY_PREFIX):
if text.startswith(prefix):
text = text[len(prefix):].lstrip()
break
return f"{SUMMARY_PREFIX}\n{text}" if text else SUMMARY_PREFIXSUMMARY_PREFIX labels the injected block as reference material, forbids the downstream model from re-answering questions that appear in the summary, and directs it to ## Active Task as the authoritative statement of what to do next. _with_summary_prefix also handles LEGACY_SUMMARY_PREFIX ([CONTEXT SUMMARY]:): it strips whichever prefix is present and applies the current one, so summaries generated by older compressor versions work without re-compression after an upgrade.
_summarizer_preamble frames the summary-generating call differently: it tells that model it is a separate assistant writing a handoff document, not an assistant answering questions. This reduces the failure mode where the summarizer interprets questions in the conversation as addressed to itself and tries to answer them inline. The structured sections (## Active Task, ## Completed Actions, ## Active State) give the downstream model predictable anchor points for task continuity.
SUMMARY_PREFIX encodes a prompt engineering contract in source code. _with_summary_prefix normalizes old prefixes to the current format so summaries remain valid across compressor versions.
---
SUMMARY_PREFIX = (
"[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted "
"into the summary below. This is a handoff from a previous context "
"window — treat it as background reference, NOT as active instructions. "
"Do NOT answer questions or fulfill requests mentioned in this summary; "
"they were already addressed. "
"Your current task is identified in the '## Active Task' section of the "
"summary — resume exactly from there. "
"Respond ONLY to the latest user message "
"that appears AFTER this summary. The current session state (files, "
"config, etc.) may reflect work described here — avoid repeating it:"
)
LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:"
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]], focus_topic: str = None) -> Optional[str]:
"""Generate a structured summary of conversation turns.
Uses a structured template (Goal, Progress, Decisions, Resolved/Pending
Questions, Files, Remaining Work) with explicit preamble telling the
summarizer not to answer questions. When a previous summary exists,
generates an iterative update instead of summarizing from scratch.
Args:
focus_topic: Optional focus string for guided compression. When
provided, the summariser prioritises preserving information
related to this topic and is more aggressive about compressing
everything else. Inspired by Claude Code's ``/compact``.
Returns None if all attempts fail — the caller should drop
the middle turns without a summary rather than inject a useless
placeholder.
"""
now = time.monotonic()
if now < self._summary_failure_cooldown_until:
logger.debug(
"Skipping context summary during cooldown (%.0fs remaining)",
self._summary_failure_cooldown_until - now,
)
return None
summary_budget = self._compute_summary_budget(turns_to_summarize)
content_to_summarize = self._serialize_for_summary(turns_to_summarize)
# Preamble shared by both first-compaction and iterative-update prompts.
# Inspired by OpenCode's "do not respond to any questions" instruction
# and Codex's "another language model" framing.
_summarizer_preamble = (
"You are a summarization agent creating a context checkpoint. "
"Your output will be injected as reference material for a DIFFERENT "
"assistant that continues the conversation. "
"Do NOT respond to any questions or requests in the conversation — "
"only output the structured summary. "
"Do NOT include any preamble, greeting, or prefix. "
"Write the summary in the same language the user was using in the "
"conversation — do not translate or switch to English. "
"NEVER include API keys, tokens, passwords, secrets, credentials, "
"or connection strings in the summary — replace any that appear "
"with [REDACTED]. Note that the user had credentials present, but "
"do not preserve their values."
)
# Shared structured template (used by both paths).
_template_sections = f"""## Active Task
[THE SINGLE MOST IMPORTANT FIELD. Copy the user's most recent request or
task assignment verbatim — the exact words they used. If multiple tasks
were requested and only some are done, list only the ones NOT yet completed.
The next assistant must pick up exactly here. Example:
"User asked: 'Now refactor the auth module to use JWT instead of sessions'"
If no outstanding task exists, write "None."]
## Goal
[What the user is trying to accomplish overall]
## Constraints & Preferences
[User preferences, coding style, constraints, important decisions]
## Completed Actions
[Numbered list of concrete actions taken — include tool used, target, and outcome.
Format each as: N. ACTION target — outcome [tool: name]
Example:
1. READ config.py:45 — found `==` should be `!=` [tool: read_file]
2. PATCH config.py:45 — changed `==` to `!=` [tool: patch]
3. TEST `pytest tests/` — 3/50 failed: test_parse, test_validate, test_edge [tool: terminal]
Be specific with file paths, commands, line numbers, and results.]
## Active State
[Current working state — include:
- Working directory and branch (if applicable)
- Modified/created files with brief note on each
- Test status (X/Y passing)
- Any running processes or servers
- Environment details that matter]
## In Progress
[Work currently underway — what was being done when compaction fired]
## Blocked
[Any blockers, errors, or issues not yet resolved. Include exact error messages.]
## Key Decisions
[Important technical decisions and WHY they were made]
## Resolved Questions
[Questions the user asked that were ALREADY answered — include the answer so the next assistant does not re-answer them]
## Pending User Asks
[Questions or requests from the user that have NOT yet been answered or fulfilled. If none, write "None."]
## Relevant Files
[Files read, modified, or created — with brief note on each]
## Remaining Work
[What remains to be done — framed as context, not instructions]
## Critical Context
[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation. NEVER include API keys, tokens, passwords, or credentials — write [REDACTED] instead.]
Target ~{summary_budget} tokens. Be CONCRETE — include file paths, command outputs, error messages, line numbers, and specific values. Avoid vague descriptions like "made some changes" — say exactly what changed.
Write only the summary body. Do not include any preamble or prefix."""
if self._previous_summary:
# Iterative update: preserve existing info, add new progress
prompt = f"""{_summarizer_preamble}
You are updating a context compaction summary. A previous compaction produced the summary below. New conversation turns have occurred since then and need to be incorporated.
PREVIOUS SUMMARY:
{self._previous_summary}
NEW TURNS TO INCORPORATE:
{content_to_summarize}
Update the summary using this exact structure. PRESERVE all existing information that is still relevant. ADD new completed actions to the numbered list (continue numbering). Move items from "In Progress" to "Completed Actions" when done. Move answered questions to "Resolved Questions". Update "Active State" to reflect current state. Remove information only if it is clearly obsolete. CRITICAL: Update "## Active Task" to reflect the user's most recent unfulfilled request — this is the most important field for task continuity.
{_template_sections}"""
else:
# First compaction: summarize from scratch
prompt = f"""{_summarizer_preamble}
Create a structured handoff summary for a different assistant that will continue this conversation after earlier turns are compacted. The next assistant should be able to understand what happened without re-reading the original turns.
def _with_summary_prefix(summary: str) -> str:
"""Normalize summary text to the current compaction handoff format."""
text = (summary or "").strip()
for prefix in (LEGACY_SUMMARY_PREFIX, SUMMARY_PREFIX):
if text.startswith(prefix):
text = text[len(prefix):].lstrip()
break
return f"{SUMMARY_PREFIX}\n{text}" if text else SUMMARY_PREFIX
Manual Compression Feedback
agent/manual_compression_feedback.py:1How the system reports compression outcomes to the user and surfaces the "nothing changed" case
summarize_manual_compression returns a dict the CLI and gateway layer can render, keeping display logic out of the compressor. The noop check (list(after_messages) == list(before_messages)) catches the case where /compress ran but had nothing to compact because the full conversation already fits within the protected head and tail zones. Without explicit feedback, a no-op compression is silent; the noop headline ("No changes from compression: N messages") confirms the command executed.
The note field covers a specific counterintuitive outcome: token count can rise after compression even when message count drops. This happens when the summary LLM produces a dense structured handoff that is larger in raw tokens than the sparse tool-result messages it replaced. The function labels all token figures as a "rough transcript estimate" because they are derived from a character-count heuristic (_CHARS_PER_TOKEN = 4), not the model's actual tokenizer.
summarize_manual_compression is a display helper, not compressor logic. Its noop and note fields surface the two outcomes most likely to confuse users: a compression that changed nothing, and a compression where token count increased despite fewer messages.
---
"""User-facing summaries for manual compression commands."""
from __future__ import annotations
from typing import Any, Sequence
def summarize_manual_compression(
before_messages: Sequence[dict[str, Any]],
after_messages: Sequence[dict[str, Any]],
before_tokens: int,
after_tokens: int,
) -> dict[str, Any]:
"""Return consistent user-facing feedback for manual compression."""
before_count = len(before_messages)
after_count = len(after_messages)
noop = list(after_messages) == list(before_messages)
if noop:
headline = f"No changes from compression: {before_count} messages"
if after_tokens == before_tokens:
token_line = (
f"Rough transcript estimate: ~{before_tokens:,} tokens (unchanged)"
)
else:
token_line = (
f"Rough transcript estimate: ~{before_tokens:,} → "
f"~{after_tokens:,} tokens"
)
else:
headline = f"Compressed: {before_count} → {after_count} messages"
token_line = (
f"Rough transcript estimate: ~{before_tokens:,} → "
f"~{after_tokens:,} tokens"
)
note = None
if not noop and after_count < before_count and after_tokens > before_tokens:
note = (
"Note: fewer messages can still raise this rough transcript estimate "
"when compression rewrites the transcript into denser summaries."
)
return {
"noop": noop,
"headline": headline,
"token_line": token_line,
"note": note,
}You've walked through 7 key areas of the Hermes Agent codebase.
Continue: Provider-Agnostic LLM Adapters: One Agent, Eight APIs → Browse all projectsCreate code tours for your project
Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.
Install Intraview Free