Source code for ooai_llm.tui.data

"""Testable data layer for the optional Textual TUI.

Purpose:
    Keep model/catalog/benchmark data shaping independent of Textual so the
    interactive app can be tested without a terminal.

Design:
    The TUI consumes preformatted row models, snapshot-level metrics, simple
    client-side filters, and a small refresh gate used to avoid accidental
    repeated provider/catalog calls.
"""

from __future__ import annotations

from decimal import Decimal
from typing import Literal, cast

from pydantic import BaseModel, Field

from ..catalog_insights import compare_model_catalog
from ..litellm_registry import list_litellm_registry
from ..model_defaults import list_model_catalog
from ..model_suites import model_suite_from_catalog
from .themes import TUIThemeName

[docs] TUIViewName = Literal["cheapest", "coding", "catalog", "suites", "benchmarks"]
[docs] TUICatalogScope = Literal["supported", "litellm_registry"]
[docs] TUI_LOAD_VIEW_CHOICES: tuple[TUIViewName, ...] = ( "cheapest", "coding", "catalog", "suites", "benchmarks", )
[docs] class TUIConfig(BaseModel): """Initial state for the optional Textual model explorer."""
[docs] source: Literal["auto", "provider", "litellm"] = "litellm"
[docs] providers: list[str] | None = None
[docs] limit: int = 25
[docs] input_tokens: int = 10_000
[docs] output_tokens: int = 2_000
[docs] budget_usd: Decimal = Decimal("1")
[docs] show_benchmarks: bool = True
[docs] include_non_chat: bool = False
[docs] catalog_scope: TUICatalogScope = "supported"
[docs] query: str = ""
[docs] refresh_cooldown_seconds: float = Field(default=2.0, ge=0)
[docs] theme: TUIThemeName = "paper"
[docs] load_views: list[TUIViewName] | None = None
[docs] notes: list[str] = Field(default_factory=list)
[docs] def resolved_load_views(self) -> tuple[TUIViewName, ...]: """Return the table-backed views that should be loaded.""" views = self.load_views or list(TUI_LOAD_VIEW_CHOICES) seen: set[str] = set() resolved: list[TUIViewName] = [] for view in views: if view == "benchmarks" and not self.show_benchmarks: continue if view in seen: continue seen.add(view) resolved.append(view) return tuple(resolved)
[docs] def should_load_view(self, view: TUIViewName) -> bool: """Return whether a table-backed view should be fetched.""" return view in self.resolved_load_views()
[docs] class TUIComparisonRow(BaseModel): """A model comparison row ready for display."""
[docs] provider: str
[docs] model: str
[docs] call_cost: str
[docs] calls_per_budget: str
[docs] price_per_1m: str
[docs] tokens: str
[docs] capabilities: str
[docs] release: str = "n/a"
[docs] source: str = "n/a"
[docs] class TUICatalogRow(BaseModel): """A catalog row ready for display."""
[docs] provider: str
[docs] model: str
[docs] release: str
[docs] price_per_1m: str
[docs] tokens: str
[docs] capabilities: str
[docs] source: str = "n/a"
[docs] class TUISuiteRow(BaseModel): """A model suite row ready for display."""
[docs] key: str
[docs] role: str
[docs] provider: str
[docs] model: str
[docs] capabilities: str
[docs] class TUIBenchmarkRow(BaseModel): """A benchmark endpoint row ready for display."""
[docs] name: str
[docs] method: str
[docs] path: str
[docs] query: str
[docs] stability: str
[docs] description: str
[docs] TUIRow = TUIComparisonRow | TUICatalogRow | TUISuiteRow | TUIBenchmarkRow
[docs] class TUISnapshot(BaseModel): """All data needed by the Textual app to render its main tables."""
[docs] config: TUIConfig
[docs] cheapest: list[TUIComparisonRow] = Field(default_factory=list)
[docs] coding: list[TUIComparisonRow] = Field(default_factory=list)
[docs] catalog: list[TUICatalogRow] = Field(default_factory=list)
[docs] suites: list[TUISuiteRow] = Field(default_factory=list)
[docs] benchmarks: list[TUIBenchmarkRow] = Field(default_factory=list)
[docs] notes: list[str] = Field(default_factory=list)
[docs] class TUIRefreshDecision(BaseModel): """Result of a TUI refresh-rate decision."""
[docs] allowed: bool
[docs] message: str
[docs] remaining_seconds: float = 0
[docs] class TUIRefreshGate: """Tiny in-process cooldown for expensive TUI refresh actions.""" def __init__(self, *, cooldown_seconds: float = 2.0) -> None:
[docs] self.cooldown_seconds = max(0, cooldown_seconds)
[docs] self.last_refresh_at: float | None = None
[docs] def request(self, *, now: float, force: bool = False) -> TUIRefreshDecision: """Return whether a refresh may run and update state when allowed.""" if force or self.cooldown_seconds == 0 or self.last_refresh_at is None: self.last_refresh_at = now return TUIRefreshDecision(allowed=True, message="Refresh started.") elapsed = now - self.last_refresh_at if elapsed >= self.cooldown_seconds: self.last_refresh_at = now return TUIRefreshDecision(allowed=True, message="Refresh started.") remaining = max(0.0, self.cooldown_seconds - elapsed) return TUIRefreshDecision( allowed=False, remaining_seconds=remaining, message=f"Refresh is rate-limited for {remaining:.1f}s.", )
[docs] def format_money(value: Decimal | None) -> str: """Format optional USD values for compact terminal display.""" if value is None: return "n/a" return f"${value.normalize():f}"
[docs] def format_count(value: int | None) -> str: """Format optional token counts for compact terminal display.""" if value is None: return "n/a" return f"{value:,}"
def _limit[T](items: list[T], limit: int) -> list[T]: if limit <= 0: return items return items[:limit] def _comparison_rows(config: TUIConfig, *, coding: bool = False) -> tuple[list[TUIComparisonRow], list[str]]: comparison = compare_model_catalog( providers=config.providers, source=config.source, capabilities=["coding"] if coding else None, input_tokens=config.input_tokens, output_tokens=config.output_tokens, budget_usd=config.budget_usd, sort_by="call_cost", limit=config.limit, ) rows = [ TUIComparisonRow( provider=estimate.provider.value, model=estimate.model.as_langchain(), call_cost=format_money(estimate.call_cost_usd), calls_per_budget=str(estimate.calls_per_budget or "n/a"), price_per_1m=( f"{format_money(estimate.input_cost_per_1m_tokens)} / " f"{format_money(estimate.output_cost_per_1m_tokens)}" ), tokens=f"{format_count(estimate.context_window)} / {format_count(estimate.max_output_tokens)}", capabilities=", ".join(estimate.capabilities), release=getattr(estimate, "release_date", None) or "n/a", source=getattr(estimate, "source", "n/a"), ) for estimate in comparison.estimates ] return rows, comparison.notes def _catalog_rows(config: TUIConfig) -> tuple[list[TUICatalogRow], list[str]]: if config.catalog_scope == "litellm_registry": registry = list_litellm_registry( providers=config.providers, include_non_chat=config.include_non_chat, sort_by="provider", strict=False, ) rows = [ TUICatalogRow( provider=model.provider, model=model.model_string, release=model.release_date or "n/a", price_per_1m=( f"{format_money(model.input_cost_per_1m_tokens)} / " f"{format_money(model.output_cost_per_1m_tokens)}" ), tokens=f"{format_count(model.context_window)} / {format_count(model.max_output_tokens)}", capabilities=", ".join(model.capability_labels), source=model.source, ) for model in _limit(registry.models, config.limit) ] return rows, registry.notes catalog = list_model_catalog( providers=config.providers, source=config.source, include_non_chat=config.include_non_chat, sort_by="recency", strict=False, ) rows = [ TUICatalogRow( provider=model.provider.value, model=model.model_string.as_langchain(), release=model.release_date or "n/a", price_per_1m=( f"{format_money(model.input_cost_per_1m_tokens)} / " f"{format_money(model.output_cost_per_1m_tokens)}" ), tokens=f"{format_count(model.context_window)} / {format_count(model.max_output_tokens)}", capabilities=", ".join(model.capability_labels), source=getattr(model, "source", "n/a"), ) for model in _limit(catalog.models, config.limit) ] return rows, catalog.notes def _suite_rows(config: TUIConfig) -> tuple[list[TUISuiteRow], list[str]]: suite = model_suite_from_catalog( providers=config.providers, source=config.source, capabilities=["coding"], sort_by="cost", limit=config.limit, name="coding-catalog", parallel_tool_calls=True, ) rows = [ TUISuiteRow( key=entry.key, role=entry.role, provider=entry.provider.value if entry.provider else "unknown", model=entry.model.as_langchain(), capabilities=", ".join(entry.capabilities), ) for entry in suite ] return rows, suite.notes def _benchmark_rows(config: TUIConfig) -> tuple[list[TUIBenchmarkRow], list[str]]: if not config.show_benchmarks: return [], [] from ..benchmarks.livecodebench_pro import livecodebench_pro_endpoints rows = [ TUIBenchmarkRow( name=endpoint.name, method=endpoint.method, path=endpoint.path, query=", ".join(endpoint.query) if endpoint.query else "none", stability=endpoint.stability, description=endpoint.description, ) for endpoint in livecodebench_pro_endpoints() ] notes = ["Benchmark rows list known endpoint surfaces only; no live benchmark HTTP calls are made by the TUI."] return rows, notes
[docs] def build_tui_snapshot(config: TUIConfig | None = None) -> TUISnapshot: """Load catalog, comparison, and suite rows for the TUI. The function is intentionally independent of Textual so tests and future non-interactive exporters can exercise the same data path. """ config = config or TUIConfig() notes = [*config.notes] load_views = config.resolved_load_views() notes.append(f"Loaded TUI views: {', '.join(load_views) or 'none'}.") cheapest: list[TUIComparisonRow] = [] if config.should_load_view("cheapest"): try: cheapest, cheapest_notes = _comparison_rows(config) notes.extend(cheapest_notes) except Exception as exc: notes.append(f"Cheapest comparison failed: {exc}") cheapest = [ TUIComparisonRow( provider="error", model=str(exc), call_cost="", calls_per_budget="", price_per_1m="", tokens="", capabilities="", ) ] coding: list[TUIComparisonRow] = [] if config.should_load_view("coding"): try: coding, coding_notes = _comparison_rows(config, coding=True) notes.extend(coding_notes) except Exception as exc: notes.append(f"Coding comparison failed: {exc}") coding = [ TUIComparisonRow( provider="error", model=str(exc), call_cost="", calls_per_budget="", price_per_1m="", tokens="", capabilities="", ) ] catalog: list[TUICatalogRow] = [] if config.should_load_view("catalog"): try: catalog, catalog_notes = _catalog_rows(config) notes.extend(catalog_notes) except Exception as exc: notes.append(f"Catalog listing failed: {exc}") catalog = [ TUICatalogRow( provider="error", model=str(exc), release="", price_per_1m="", tokens="", capabilities="", ) ] suites: list[TUISuiteRow] = [] if config.should_load_view("suites"): try: suites, suite_notes = _suite_rows(config) notes.extend(suite_notes) except Exception as exc: notes.append(f"Suite generation failed: {exc}") suites = [TUISuiteRow(key="error", role="", provider="", model=str(exc), capabilities="")] benchmarks: list[TUIBenchmarkRow] = [] if config.should_load_view("benchmarks"): try: benchmarks, benchmark_notes = _benchmark_rows(config) notes.extend(benchmark_notes) except Exception as exc: notes.append(f"Benchmark endpoint inventory failed: {exc}") benchmarks = [ TUIBenchmarkRow( name="error", method="", path=str(exc), query="", stability="", description="", ) ] return TUISnapshot( config=config, cheapest=cheapest, coding=coding, catalog=catalog, suites=suites, benchmarks=benchmarks, notes=notes, )
[docs] def rows_for_view(snapshot: TUISnapshot, view: TUIViewName) -> list[TUIRow]: """Return snapshot rows for one table-backed view.""" if view == "cheapest": return list(snapshot.cheapest) if view == "coding": return list(snapshot.coding) if view == "catalog": return list(snapshot.catalog) if view == "suites": return list(snapshot.suites) return list(snapshot.benchmarks)
[docs] def normalize_tui_load_views(values: list[str] | None) -> list[TUIViewName] | None: """Normalize repeatable or comma-separated TUI view names.""" if not values: return None parsed: list[TUIViewName] = [] valid = set(TUI_LOAD_VIEW_CHOICES) invalid: list[str] = [] for value in values: for part in value.split(","): normalized = part.strip().lower().replace("-", "_") if not normalized: continue if normalized not in valid: invalid.append(part.strip()) continue parsed.append(cast(TUIViewName, normalized)) if invalid: allowed = ", ".join(TUI_LOAD_VIEW_CHOICES) raise ValueError(f"Unknown TUI view(s): {', '.join(invalid)}. Allowed values: {allowed}.") return parsed or None
[docs] def tui_row_search_text(row: TUIRow) -> str: """Return lowercase searchable text for any TUI display row.""" return " ".join(str(value) for value in row.model_dump(mode="json").values()).lower()
[docs] def filter_tui_rows(rows: list[TUIRow], *, query: str = "", provider: str | None = None) -> list[TUIRow]: """Filter already-loaded TUI rows by free-text query and provider.""" normalized_query = query.strip().lower() normalized_provider = (provider or "").strip().lower() filtered: list[TUIRow] = [] for row in rows: row_provider = getattr(row, "provider", "") if normalized_provider and normalized_provider != "all" and str(row_provider).lower() != normalized_provider: continue if normalized_query and normalized_query not in tui_row_search_text(row): continue filtered.append(row) return filtered
[docs] def snapshot_providers(snapshot: TUISnapshot) -> list[str]: """Return sorted provider names visible in the snapshot.""" providers = { str(getattr(row, "provider")) for view in ("cheapest", "coding", "catalog", "suites") for row in rows_for_view(snapshot, view) # type: ignore[arg-type] if getattr(row, "provider", None) } return sorted(provider for provider in providers if provider and provider != "error")
[docs] def snapshot_metrics(snapshot: TUISnapshot) -> dict[str, str]: """Return compact metrics for the dashboard cards.""" return { "providers": str(len(snapshot_providers(snapshot))), "loaded_views": ", ".join(snapshot.config.resolved_load_views()) or "none", "catalog_scope": snapshot.config.catalog_scope, "catalog_rows": str(len(snapshot.catalog)), "cheapest_rows": str(len(snapshot.cheapest)), "coding_rows": str(len(snapshot.coding)), "suite_rows": str(len(snapshot.suites)), "benchmark_rows": str(len(snapshot.benchmarks)), "notes": str(len(snapshot.notes)), }