Skip to main content

Core API Reference

The PenguinCore class serves as the central coordinator for the entire Penguin AI assistant, orchestrating interactions between various subsystems with event-driven architecture and advanced conversation management.

Overview

PenguinCore acts as an integration point between:

  • Engine (Optional): A high-level coordinator for reasoning/action loops (used when available).
  • ConversationManager: Handles messages, context, conversation state, and checkpointing.
  • ToolManager: Provides access to available tools and actions with lazy initialization.
  • ActionExecutor: Executes actions and processes results with UI event callbacks.
  • ProjectManager: Manages projects and tasks with SQLite persistence.
  • APIClient: Handles direct communication with LLMs with streaming support.
  • EventBus: Centralized event system for UI updates and real-time streaming.
  • StreamingStateManager: Manages streaming state with coalescing and buffering.

Rather than implementing functionality directly, PenguinCore focuses on coordination. It initializes and holds references to these components. Crucially, if the Engine is successfully initialized, PenguinCore delegates the primary reasoning and execution loops to it. Otherwise, it falls back to managing the loop internally using get_response.

Processing Flow

The primary processing flow depends on whether the Engine is available, with enhanced event-driven streaming and real-time UI updates.

With Engine (Preferred):

Without Engine (Legacy Fallback):

Event System

PenguinCore includes a comprehensive event system for real-time UI updates and streaming coordination:

  • stream_chunk: Real-time streaming content with message type and role information
  • token_update: Token usage updates for UI display
  • message: User and assistant message events
  • status: Status updates for UI components
  • error: Error events with source and details

Events are emitted throughout the processing pipeline to enable live UI updates in CLI, web interface, and other clients.

Factory Method

@classmethod
async def create(
cls,
config: Optional[Config] = None,
model: Optional[str] = None,
provider: Optional[str] = None,
workspace_path: Optional[str] = None,
enable_cli: bool = False,
) -> Union["PenguinCore", Tuple["PenguinCore", "PenguinCLI"]]

Creates a new PenguinCore instance with optional CLI. This method handles the initialization of all subsystems, including attempting to initialize the Engine.

create reads standard environment variables to load API keys and defaults. Common variables include OPENROUTER_API_KEY, OPENAI_API_KEY, ANTHROPIC_API_KEY, and optional overrides such as PENGUIN_MODEL, PENGUIN_PROVIDER, PENGUIN_CLIENT_PREFERENCE, and PENGUIN_API_BASE.

Core Methods

Execution Root (ToolManager)

File tools and command execution operate against an “execution root” that is separate from Penguin’s workspace:

  • Project root: the current repo (CWD/git root) for edits, shell commands, diffs, and code analysis
  • Workspace root: assistant state (conversations, notes, logs, memory) under WORKSPACE_PATH

Selection precedence:

  • CLI flag: --root project|workspace
  • Env var: PENGUIN_WRITE_ROOT=project|workspace
  • Config: defaults.write_root
  • Default: project

The CLI prints the active root at startup. Tools can switch roots at runtime via:

tm.set_execution_root("project")  # or "workspace"

__init__

def __init__(
self,
config: Optional[Config] = None,
api_client: Optional[APIClient] = None,
tool_manager: Optional[ToolManager] = None,
model_config: Optional[ModelConfig] = None
)

Initializes the core with configuration and required components. This constructor is also responsible for attempting to create and store the Engine instance (self.engine).

process_message

async def process_message(
self,
message: str,
context: Optional[Dict[str, Any]] = None,
conversation_id: Optional[str] = None,
context_files: Optional[List[str]] = None,
streaming: bool = False
) -> str

Processes a user message primarily through the ConversationManager. This is a simplified interface, potentially bypassing the Engine for basic exchanges, focusing on conversation history management and direct LLM calls via ConversationManager.

process

async def process(
self,
input_data: Union[Dict[str, Any], str],
context: Optional[Dict[str, Any]] = None,
conversation_id: Optional[str] = None,
max_iterations: int = 5, # Note: max_iterations primarily relevant for legacy fallback
context_files: Optional[List[str]] = None,
streaming: Optional[bool] = None,
stream_callback: Optional[Callable[[str], None]] = None # Note: Used by Engine/APIClient
) -> Dict[str, Any]

Primary interface for processing input. If self.engine is available, this method delegates the execution to Engine.run_single_turn or potentially Engine.run_task based on internal logic or future configuration. If the Engine is not available, it falls back to the legacy multi-step loop managed within PenguinCore itself, using get_response iteratively. Returns a dictionary containing the assistant's final response and any accumulated action results.

get_response

async def get_response(
self,
current_iteration: Optional[int] = None,
max_iterations: Optional[int] = None,
stream_callback: Optional[Callable[[str], None]] = None,
streaming: Optional[bool] = None
) -> Tuple[Dict[str, Any], bool]

Legacy Fallback Method. Generates one turn of response using the current conversation context and executes actions found within that response. This method is primarily used internally by the legacy processing loop when the Engine is not available. It directly calls the APIClient and ActionExecutor. Returns the response data for the single turn and a continuation flag (e.g., if TASK_COMPLETION_PHRASE is found).

start_run_mode

async def start_run_mode(
self,
name: Optional[str] = None,
description: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
continuous: bool = False,
time_limit: Optional[int] = None,
mode_type: str = "task",
stream_callback_for_cli: Optional[Callable[[str], Awaitable[None]]] = None,
ui_update_callback_for_cli: Optional[Callable[[], Awaitable[None]]] = None
) -> None

Starts autonomous run mode by creating and running a RunMode instance. The RunMode instance will internally use self.engine if available.

Parameters

  • name – Name of the task to run.
  • description – Optional description when creating a new task.
  • context – Extra context passed to the task.
  • continuous – Run continuously rather than a single task.
  • time_limit – Optional time limit in minutes.
  • mode_type – Either "task" or "project".
  • stream_callback_for_cli – Async callback for streaming output in the CLI.
  • ui_update_callback_for_cli – Async callback to refresh CLI UI elements.

Checkpoint Management

create_checkpoint

async def create_checkpoint(
self,
name: Optional[str] = None,
description: Optional[str] = None
) -> Optional[str]

Creates a checkpoint of the current conversation state.

Parameters

  • name – Optional name for the checkpoint
  • description – Optional description

Returns: Checkpoint ID if successful, None otherwise

rollback_to_checkpoint

async def rollback_to_checkpoint(self, checkpoint_id: str) -> bool

Rollbacks conversation to a specific checkpoint.

Parameters

  • checkpoint_id – ID of the checkpoint to rollback to

Returns: True if successful, False otherwise

branch_from_checkpoint

async def branch_from_checkpoint(
self,
checkpoint_id: str,
name: Optional[str] = None,
description: Optional[str] = None
) -> Optional[str]

Creates a new conversation branch from a checkpoint.

Parameters

  • checkpoint_id – ID of the checkpoint to branch from
  • name – Optional name for the branch
  • description – Optional description

Returns: New branch checkpoint ID if successful, None otherwise

list_checkpoints

def list_checkpoints(
self,
session_id: Optional[str] = None,
limit: int = 50
) -> List[Dict[str, Any]]

Lists available checkpoints with optional filtering.

Parameters

  • session_id – Filter by session ID
  • limit – Maximum number of checkpoints to return

Returns: List of checkpoint information

Model Management

load_model

async def load_model(self, model_id: str) -> bool

Switches to a different model at runtime with automatic configuration updates.

Parameters

  • model_id – Model identifier (e.g., "anthropic/claude-3-5-sonnet-20240620")

Returns: True if successful, False otherwise

Features:

  • Fetches model specifications from OpenRouter API
  • Updates context window settings automatically
  • Supports both explicit config models and provider/model format
  • Updates config.yml with new model settings

Event System Methods

PenguinCore uses an EventBus for all UI event delivery. The legacy register_ui/unregister_ui methods have been removed in favor of the centralized EventBus pattern.

emit_ui_event

async def emit_ui_event(self, event_type: str, data: Dict[str, Any]) -> None

Emits UI events via the EventBus to all subscribed handlers.

Parameters

  • event_type – Type of event (stream_chunk, token_update, message, etc.)
  • data – Event data relevant to the event type

Using EventBus Directly

For components that need to receive events, subscribe via the EventBus:

from penguin.cli.events import EventBus, EventType

# Get the singleton EventBus
event_bus = EventBus.get_sync()

# Subscribe to specific events
async def my_handler(event_type: str, data: dict):
print(f"Received {event_type}: {data}")

for event_type in EventType:
event_bus.subscribe(event_type.value, my_handler)

# Unsubscribe when done
event_bus.unsubscribe("stream_chunk", my_handler)

Streaming Properties

PenguinCore provides read-only properties for accessing streaming state:

streaming_active

@property
def streaming_active(self) -> bool

Returns whether streaming is currently active.

streaming_content

@property
def streaming_content(self) -> str

Returns the accumulated assistant content from the current stream.

streaming_reasoning_content

@property
def streaming_reasoning_content(self) -> str

Returns the accumulated reasoning content from the current stream (for models with extended thinking).

streaming_stream_id

@property
def streaming_stream_id(self) -> Optional[str]

Returns the unique ID of the current stream, or None if not streaming.

Conversation Management

list_conversations

def list_conversations(self, limit: int = 20, offset: int = 0) -> List[Dict[str, Any]]

Lists available conversations with pagination.

get_conversation

def get_conversation(self, conversation_id: str) -> Optional[Dict[str, Any]]

Gets a specific conversation by ID.

create_conversation

def create_conversation(self) -> str

Creates a new conversation and returns its ID.

delete_conversation

def delete_conversation(self, conversation_id: str) -> bool

Deletes a conversation by ID.

State Management

reset_context

def reset_context(self) -> None

Resets conversation context and diagnostics.

reset_state

async def reset_state(self) -> None

Resets core state including messages, tools, and external resources.

Properties

total_tokens_used

@property
def total_tokens_used(self) -> int

Gets total tokens used in current session.

get_token_usage

def get_token_usage(self) -> Dict[str, Dict[str, int]]

Gets detailed token usage statistics.

stats = core.get_token_usage()
print(stats["session"]["prompt_tokens"], stats["session"]["completion_tokens"])

Action Handling

execute_action

async def execute_action(self, action) -> Dict[str, Any]

Executes a single action via the ActionExecutor. Note: In the preferred flow (with Engine), action execution is handled within the Engine's loop. This method might be used by the legacy fallback loop or potentially for direct action calls outside the main loops.

Diagnostics and Performance

get_system_info

def get_system_info(self) -> Dict[str, Any]

Returns comprehensive system information including model config, component status, and capabilities.

get_system_status

def get_system_status(self) -> Dict[str, Any]

Returns current system status including runtime state and performance metrics.

get_startup_stats

def get_startup_stats(self) -> Dict[str, Any]

Returns comprehensive startup performance statistics and profiling data.

def print_startup_report(self) -> None

Prints a comprehensive startup performance report to console.

enable_fast_startup_globally

def enable_fast_startup_globally(self) -> None

Enables fast startup mode for future operations by deferring heavy initialization.

get_memory_provider_status

def get_memory_provider_status(self) -> Dict[str, Any]

Returns current status of memory provider and indexing operations.

Configuration and Model Management

list_available_models

def list_available_models(self) -> List[Dict[str, Any]]

Returns a list of model metadata derived from config.yml with current model highlighted.

get_current_model

def get_current_model(self) -> Optional[Dict[str, Any]]

Returns information about the currently loaded model including all configuration parameters.

get_token_usage

def get_token_usage(self) -> Dict[str, Dict[str, int]]

Returns detailed token usage statistics with enhanced structure for CLI and UI display.

Usage Examples

Basic Usage

# Create a core instance with fast startup
core = await PenguinCore.create(fast_startup=True)

# Process a user message with streaming
response = await core.process(
"Write a Python function to calculate factorial",
streaming=True,
stream_callback=my_callback
)
print(response['assistant_response'])

Model Switching

# Switch to a different model at runtime
success = await core.load_model("openai/gpt-4o")
if success:
print(f"Switched to: {core.get_current_model()['model']}")

Checkpoint Management

# Create a checkpoint
checkpoint_id = await core.create_checkpoint(
name="Before refactoring",
description="Saving state before major changes"
)

# List available checkpoints
checkpoints = core.list_checkpoints(limit=10)

# Rollback to a previous state
success = await core.rollback_to_checkpoint(checkpoint_id)

Event-Driven UI Integration

from penguin.cli.events import EventBus, EventType

# Get the EventBus singleton
event_bus = EventBus.get_sync()

# Register for real-time updates
async def handle_stream_chunk(event_type, data):
if event_type == "stream_chunk":
print(f"Streaming: {data.get('content', '')}")

event_bus.subscribe(EventType.STREAM_CHUNK.value, handle_stream_chunk)

# Events will be emitted automatically during processing
response = await core.process("Hello!", streaming=True)

# Check streaming state via properties
if core.streaming_active:
print(f"Current content: {core.streaming_content}")

Advanced Configuration

# Get comprehensive system information
info = core.get_system_info()
print(f"Current model: {info['current_model']['model']}")
print(f"Context window: {info['current_model']['max_context_window_tokens']}")

# Enable diagnostics
core.print_startup_report()