Conversation Manager
The ConversationManager
v0.3.3.3.post1 is a high-level coordinator that manages conversation state, handles message exchanges, and provides advanced checkpoint and snapshot capabilities for conversation management.
Overview
The ConversationManager coordinates between:
- ConversationSystem - Core message handling
- ContextWindowManager - Token budgeting and trimming
- SessionManager - Session persistence and boundaries
- ContextLoader - Context file management
- CheckpointManager - Conversation state snapshots and branching
Architecture
Message Flow
Session Management
Initialization
def __init__(
self,
model_config=None,
api_client=None,
workspace_path: Optional[Path] = None,
system_prompt: str = "",
max_messages_per_session: int = 5000,
max_sessions_in_memory: int = 20,
auto_save_interval: int = 60,
checkpoint_config: Optional[CheckpointConfig] = None
)
Configuration Resolution
The ConversationManager automatically resolves configuration from multiple sources:
- Model Configuration:
model_config
for API settings - Live Config:
config_obj
from PenguinCore for runtime updates - Default Values: Sensible defaults for all parameters
- Checkpoint Config: Automatic checkpoint configuration if not provided
Parameters
model_config
: Configuration for the AI model with provider-specific settingsapi_client
: Client for API interactions with enhanced streaming supportworkspace_path
: Path to workspace directory for conversation storagesystem_prompt
: Initial system prompt with advanced template supportmax_messages_per_session
: Maximum messages before creating a new session (default: 5000)max_sessions_in_memory
: Maximum sessions to keep in memory cache (default: 20)auto_save_interval
: Seconds between auto-saves (default: 60, 0 to disable)checkpoint_config
: Configuration for checkpoint system with retention policies
Automatic Component Initialization
- Session Manager: Created with optimized caching and persistence
- Context Window: Initialized with provider-aware token counting
- Checkpoint Manager: Lazy initialization with automatic worker startup
- Context Loader: Pre-loads core context files on startup
Key Methods
Message Processing
async def process_message(
self,
message: str,
conversation_id: Optional[str] = None,
image_path: Optional[str] = None,
streaming: bool = False,
stream_callback: Optional[Callable[[str], None]] = None,
context_files: Optional[List[str]] = None
) -> Union[str, AsyncGenerator[str, None]]
Processes a user message:
- Conversation Loading: Automatic conversation ID resolution
- Context File Integration: Loads specified context files before processing
- Image Support: Handles multimodal content with automatic token counting
- Streaming Support: Real-time response streaming with callback handling
- Error Recovery: Comprehensive error handling with fallback mechanisms
Checkpoint and Snapshot System
The ConversationManager includes checkpoint and snapshot capabilities for conversation state management.
Checkpoint Management
async def create_manual_checkpoint(
self,
name: Optional[str] = None,
description: Optional[str] = None
) -> Optional[str]
Creates a manual checkpoint of the current conversation state with optional metadata.
async def rollback_to_checkpoint(self, checkpoint_id: str) -> bool
Rollbacks conversation to a specific checkpoint, restoring the exact state.
async def branch_from_checkpoint(
self,
checkpoint_id: str,
name: Optional[str] = None,
description: Optional[str] = None
) -> Optional[str]
Creates a new conversation branch from a checkpoint for experimental conversations.
def list_checkpoints(
self,
session_id: Optional[str] = None,
checkpoint_type: Optional[CheckpointType] = None,
limit: int = 50
) -> List[Dict[str, Any]]
Lists available checkpoints with filtering and pagination support.
Snapshot Management
def create_snapshot(self, *, meta: Optional[Dict[str, Any]] = None) -> Optional[str]
Creates a lightweight snapshot of the current conversation for quick restoration.
def restore_snapshot(self, snapshot_id: str) -> bool
Restores conversation from a snapshot with minimal overhead.
def branch_from_snapshot(self, snapshot_id: str, *, meta: Optional[Dict[str, Any]] = None) -> Optional[str]
Creates a new conversation branch from a snapshot for parallel experimentation.
Automatic Checkpointing
The system automatically creates checkpoints based on the configured policy:
- Auto Checkpoints: Created every N messages (configurable frequency)
- Manual Checkpoints: Created on-demand with custom metadata
- Branch Checkpoints: Created when branching from existing checkpoints
- Retention Management: Automatic cleanup of old checkpoints based on policies
Checkpoint Configuration
@dataclass
class CheckpointConfig:
enabled: bool = True
frequency: int = 1 # Checkpoint every message
planes: Dict[str, bool] = field(default_factory=lambda: {"conversation": True, "tasks": False, "code": False})
retention: Dict[str, Any] = field(default_factory=lambda: {
"keep_all_hours": 24,
"keep_every_nth": 10,
"max_age_days": 30
})
max_auto_checkpoints: int = 1000
Context Management
def add_context(self, content: str, source: Optional[str] = None) -> Message
Adds context information to the conversation.
def load_context_file(self, file_path: str) -> bool
Loads a context file into the conversation.
def list_context_files(self) -> List[Dict[str, Any]]
Lists available context files.
Conversation State
def save(self) -> bool
Saves the current conversation state.
def load(self, conversation_id: str) -> bool
Loads a specific conversation by ID.
def list_conversations(
self,
limit: int = 100,
offset: int = 0,
search_term: Optional[str] = None
) -> List[Dict]
Lists available conversations with search and pagination.
def reset(self) -> None
Resets the conversation state.
Token Management
def get_token_usage(self) -> Dict[str, Any]
Gets token usage statistics for the current conversation across all sessions.
Advanced Usage Examples
Basic Setup with Enhanced Features
from penguin.system.conversation_manager import ConversationManager
from penguin.system.checkpoint_manager import CheckpointConfig
# Initialize with checkpoint configuration
checkpoint_config = CheckpointConfig(
enabled=True,
frequency=1, # Checkpoint every message
retention={
"keep_all_hours": 24,
"keep_every_nth": 10,
"max_age_days": 30
}
)
conversation_manager = ConversationManager(
model_config=model_config,
api_client=api_client,
workspace_path=Path("./workspace"),
system_prompt="You are a helpful assistant.",
checkpoint_config=checkpoint_config
)
Checkpoint and Snapshot Management
# Create a checkpoint before major changes
checkpoint_id = await conversation_manager.create_manual_checkpoint(
name="Before refactoring",
description="Saving state before major code changes"
)
# Continue conversation
response = await conversation_manager.process_message("Please refactor this code...")
print(response)
# If something goes wrong, rollback to the checkpoint
success = await conversation_manager.rollback_to_checkpoint(checkpoint_id)
if success:
print("Successfully rolled back to previous state")
# Create a branch for experimental conversation
branch_id = await conversation_manager.branch_from_checkpoint(
checkpoint_id,
name="Experimental approach",
description="Testing alternative refactoring strategy"
)
Snapshot-Based Quick Save/Restore
# Create a lightweight snapshot
snapshot_id = conversation_manager.create_snapshot(
meta={"purpose": "quick_backup", "timestamp": datetime.now().isoformat()}
)
# Continue working
await conversation_manager.process_message("Make this code more efficient...")
# Quick restore if needed
success = conversation_manager.restore_snapshot(snapshot_id)
if success:
print("Restored from snapshot")
Key Features
- Session Management: Automatically handles transitions between sessions when the context window fills up
- Token Budgeting: Ensures conversations don't exceed model limits through intelligent trimming
- Streaming Support: Works with both streaming and non-streaming response modes
- Multimodal Content: Handles text, images, and structured content
- Context Loading: Integrates external files into conversation context
- Conversation Persistence: Automatically saves state and allows resuming conversations
- Checkpoint & Snapshot System: Full conversation state management with branching
- Context Loading: Smart integration of external files and documentation
- Conversation Persistence: Automatic state saving with metadata preservation
- Error Recovery: Robust error handling with emergency checkpoint creation
- Performance Monitoring: Real-time token usage and performance tracking