Source code for ooai_llm.cli.app

"""Command-line interface for ``ooai-llm``."""

from __future__ import annotations

import argparse
import csv
import json
import sys
from decimal import Decimal
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING, Sequence

from ..catalog import ListModelsConfig
from ..catalog_insights import (
    ModelCallEquivalent,
    ModelCostComparison,
    ModelCostEstimate,
    compare_model_catalog,
)
from .help import (
    BENCHMARKS_EXAMPLES,
    LCB_PRO_DIFFICULTY_CHOICES,
    LCB_PRO_SORT_CHOICES,
    MODEL_CAPABILITY_CHOICES,
    MODEL_CATALOG_SORT_CHOICES,
    MODEL_CHEAPEST_EXAMPLES,
    MODEL_CODING_EXAMPLES,
    MODEL_COMPARE_EXAMPLES,
    MODEL_COMPARE_SORT_CHOICES,
    MODEL_LIST_EXAMPLES,
    MODEL_SUITE_EXAMPLES,
    MODELS_EXAMPLES,
    PROFILES_EXAMPLES,
    ROOT_EXAMPLES,
    ROOT_HELP,
    add_compare_args,
    add_provider_args,
    package_version,
)
from .recipes import RECIPE_TOPICS, render_recipes, selected_recipe_sections
from ..logging import configure_logging, get_logger, log_event
from ..litellm_registry import LiteLLMRegistryModel, list_litellm_registry
from ..model_defaults import ModelCapabilityName, ModelDefaultCandidate, list_model_catalog, update_model_defaults
from ..model_suites import (
    ModelSuite,
    get_model_suite,
    list_model_suite_names,
    model_suite_from_catalog,
)
from ..profiles import ChatModelProfile
from ..settings import AppSettings
from ..tui.data import TUI_LOAD_VIEW_CHOICES, normalize_tui_load_views
from ..tui.themes import TUI_THEME_CHOICES
from ..types import ModelString

[docs] logger = get_logger(__name__)
if TYPE_CHECKING: from ..benchmarks.livecodebench_pro import ( LiveCodeBenchProClient, LiveCodeBenchProDifficultyResult, LiveCodeBenchProModel, LiveCodeBenchProSubmissionDetail, LiveCodeBenchProSubmissionsResult, ) def _parse_providers(values: Sequence[str] | None) -> list[str] | None: if not values: return None providers: list[str] = [] for value in values: providers.extend(part.strip() for part in value.split(",") if part.strip()) return providers or None def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="ooai-llm", formatter_class=argparse.RawDescriptionHelpFormatter, description=ROOT_HELP, epilog=ROOT_EXAMPLES, ) parser.add_argument("--log-level", default=None, help="Optional log level, such as DEBUG or INFO.") parser.add_argument( "--log-mode", choices=("rich", "plain", "json"), default=None, help="Optional ultilog/stdout logging mode.", ) parser.add_argument("--version", action="version", version=f"%(prog)s {package_version()}") subcommands = parser.add_subparsers(dest="command", required=True) models = subcommands.add_parser( "models", formatter_class=argparse.RawDescriptionHelpFormatter, help="Model catalog, cost comparison, and suite utilities.", epilog=MODELS_EXAMPLES, ) model_commands = models.add_subparsers(dest="models_command", required=True) update = model_commands.add_parser( "update", formatter_class=argparse.RawDescriptionHelpFormatter, help="Refresh convenience factory model defaults.", ) update.add_argument( "--source", choices=("auto", "provider", "litellm"), default="auto", help="Refresh source. Defaults to auto.", ) update.add_argument( "--provider", action="append", default=[], help="Provider to refresh. Can be repeated.", ) update.add_argument( "--providers", action="append", default=[], help="Comma-separated provider list.", ) update.add_argument( "--primary-alias-provider", default="openai", help="Provider used to update global aliases such as latest and cheap.", ) update.add_argument( "--format", choices=("json", "env"), default="json", help="Output format for reusable overrides.", ) update.add_argument( "--output", help="Optional path to write overrides. Prints to stdout when omitted.", ) update.add_argument( "--limit", type=int, default=None, help="Optional per-provider model-list limit for provider catalogs.", ) update.add_argument( "--strict", action="store_true", help="Fail if any selected provider cannot be refreshed.", ) update.add_argument( "--no-aliases", action="store_true", help="Only export provider presets, not global aliases/default_model.", ) list_cmd = model_commands.add_parser( "list", formatter_class=argparse.RawDescriptionHelpFormatter, help="List model metadata across providers.", epilog=MODEL_LIST_EXAMPLES, ) list_cmd.add_argument( "--source", choices=("auto", "provider", "litellm"), default="auto", help="Catalog source. Defaults to auto.", ) list_cmd.add_argument( "--provider", action="append", default=[], help="Provider to list. Can be repeated.", ) list_cmd.add_argument( "--providers", action="append", default=[], help="Comma-separated provider list.", ) list_cmd.add_argument( "--format", choices=("table", "json", "csv"), default="table", help="Output format. Defaults to table.", ) list_cmd.add_argument( "--limit", type=int, default=None, help="Maximum rows per provider. Defaults to 20, or all rows with --all-litellm. Use 0 for no limit.", ) list_cmd.add_argument( "--all-litellm", action="store_true", help=( "List the raw LiteLLM registry across every LiteLLM provider label " "instead of only ooai-supported providers. Implies --source litellm, " "--include-non-chat, and no row limit unless --limit is set." ), ) list_cmd.add_argument( "--include-non-chat", action="store_true", help="Include embeddings, image, audio, and other non-chat models.", ) list_cmd.add_argument( "--capability", action="append", choices=MODEL_CAPABILITY_CHOICES, default=[], help="Required capability filter. Can be repeated.", ) list_cmd.add_argument("--reasoning-only", action="store_true", help="Only show reasoning-oriented models.") list_cmd.add_argument("--coding-only", action="store_true", help="Only show coding-oriented models.") list_cmd.add_argument("--vision-only", action="store_true", help="Only show vision-capable models.") list_cmd.add_argument( "--function-calling-only", action="store_true", help="Only show models marked as function/tool-call capable.", ) list_cmd.add_argument( "--tool-calling-only", action="store_true", help="Only show models marked as tool-call capable.", ) list_cmd.add_argument( "--tool-choice-only", action="store_true", help="Only show models marked as explicit tool-choice capable.", ) list_cmd.add_argument( "--parallel-tool-calls-only", action="store_true", help="Only show models marked as parallel-tool-call capable.", ) list_cmd.add_argument( "--structured-output-only", action="store_true", help="Only show models marked as native structured-output capable.", ) list_cmd.add_argument( "--min-context", type=int, default=None, help="Only show models with at least this many input/context tokens.", ) list_cmd.add_argument( "--min-input-tokens", dest="min_context", type=int, default=None, help="Alias for --min-context.", ) list_cmd.add_argument( "--min-output-tokens", type=int, default=None, help="Only show models with at least this many output tokens.", ) list_cmd.add_argument( "--max-input-cost-per-1m", type=Decimal, default=None, help="Only show models at or below this input-token USD cost per 1M tokens.", ) list_cmd.add_argument( "--max-output-cost-per-1m", type=Decimal, default=None, help="Only show models at or below this output-token USD cost per 1M tokens.", ) list_cmd.add_argument( "--released-after", default=None, help="Only show models released on or after this date-like value.", ) list_cmd.add_argument( "--released-before", default=None, help="Only show models released on or before this date-like value.", ) list_cmd.add_argument( "--sort", choices=MODEL_CATALOG_SORT_CHOICES, default="recency", help="Sort mode. Defaults to recency.", ) list_cmd.add_argument( "--strict", action="store_true", help="Fail if any selected provider cannot be listed.", ) list_cmd.add_argument( "--no-rich", action="store_true", help="Use the built-in plain table renderer even when Rich is installed.", ) compare_cmd = model_commands.add_parser( "compare", formatter_class=argparse.RawDescriptionHelpFormatter, help="Compare catalog models by estimated cost for a representative call.", epilog=MODEL_COMPARE_EXAMPLES, ) compare_cmd.add_argument( "--source", choices=("auto", "provider", "litellm"), default="auto", help="Catalog source. Defaults to auto.", ) compare_cmd.add_argument( "--provider", action="append", default=[], help="Provider to compare. Can be repeated.", ) compare_cmd.add_argument( "--providers", action="append", default=[], help="Comma-separated provider list.", ) compare_cmd.add_argument( "--format", choices=("table", "json", "csv"), default="table", help="Output format. Defaults to table.", ) compare_cmd.add_argument( "--style", choices=("langchain", "litellm", "bare"), default="langchain", help="Model-string style for table and CSV output.", ) compare_cmd.add_argument( "--limit", type=int, default=20, help="Maximum rows after cost ranking. Use 0 for no limit.", ) compare_cmd.add_argument( "--input-tokens", type=int, default=10_000, help="Representative input tokens per call. Defaults to 10000.", ) compare_cmd.add_argument( "--output-tokens", type=int, default=2_000, help="Representative output tokens per call. Defaults to 2000.", ) compare_cmd.add_argument( "--budget-usd", type=Decimal, default=Decimal("1"), help="Budget for calls-per-budget estimates. Defaults to 1.", ) compare_cmd.add_argument( "--baseline", default=None, help="Optional model used to compute calls-per-baseline ratios.", ) compare_cmd.add_argument( "--per-provider", action="store_true", help="Only keep the cheapest matching model for each provider.", ) compare_cmd.add_argument( "--include-non-chat", action="store_true", help="Include embeddings, image, audio, and other non-chat models.", ) compare_cmd.add_argument( "--capability", action="append", choices=MODEL_CAPABILITY_CHOICES, default=[], help="Required capability filter. Can be repeated.", ) compare_cmd.add_argument("--reasoning-only", action="store_true", help="Only show reasoning-oriented models.") compare_cmd.add_argument("--coding-only", action="store_true", help="Only show coding-oriented models.") compare_cmd.add_argument("--vision-only", action="store_true", help="Only show vision-capable models.") compare_cmd.add_argument( "--function-calling-only", action="store_true", help="Only show models marked as function/tool-call capable.", ) compare_cmd.add_argument( "--tool-calling-only", action="store_true", help="Only show models marked as tool-call capable.", ) compare_cmd.add_argument( "--tool-choice-only", action="store_true", help="Only show models marked as explicit tool-choice capable.", ) compare_cmd.add_argument( "--parallel-tool-calls-only", action="store_true", help="Only show models marked as parallel-tool-call capable.", ) compare_cmd.add_argument( "--structured-output-only", action="store_true", help="Only show models marked as native structured-output capable.", ) compare_cmd.add_argument( "--min-context", type=int, default=None, help="Only show models with at least this many input/context tokens.", ) compare_cmd.add_argument( "--min-input-tokens", dest="min_context", type=int, default=None, help="Alias for --min-context.", ) compare_cmd.add_argument( "--min-output-tokens", type=int, default=None, help="Only show models with at least this many output tokens.", ) compare_cmd.add_argument( "--max-input-cost-per-1m", type=Decimal, default=None, help="Only show models at or below this input-token USD cost per 1M tokens.", ) compare_cmd.add_argument( "--max-output-cost-per-1m", type=Decimal, default=None, help="Only show models at or below this output-token USD cost per 1M tokens.", ) compare_cmd.add_argument("--released-after", default=None, help="Catalog lower release-date bound.") compare_cmd.add_argument("--released-before", default=None, help="Catalog upper release-date bound.") compare_cmd.add_argument( "--sort", choices=MODEL_COMPARE_SORT_CHOICES, default="call_cost", help="Sort comparison rows. Defaults to call_cost.", ) compare_cmd.add_argument("--strict", action="store_true", help="Fail if any selected provider cannot be listed.") compare_cmd.add_argument( "--no-rich", action="store_true", help="Use the built-in plain table renderer even when Rich is installed.", ) cheapest_cmd = model_commands.add_parser( "cheapest", aliases=("cheap",), formatter_class=argparse.RawDescriptionHelpFormatter, help="Shortcut for cost-ranked model comparison.", epilog=MODEL_CHEAPEST_EXAMPLES, ) add_compare_args(cheapest_cmd, default_sort="call_cost") coding_cmd = model_commands.add_parser( "coding", aliases=("code",), formatter_class=argparse.RawDescriptionHelpFormatter, help="Shortcut for coding-oriented model comparison.", epilog=MODEL_CODING_EXAMPLES, ) add_compare_args(coding_cmd, default_sort="call_cost") coding_cmd.set_defaults(coding_only=True) suite_cmd = model_commands.add_parser( "suite", formatter_class=argparse.RawDescriptionHelpFormatter, help="Render a reusable model suite for comparisons or LangGraph nodes.", epilog=MODEL_SUITE_EXAMPLES, ) suite_cmd.add_argument( "--suite", choices=tuple(list_model_suite_names()), default="practical", help="Built-in suite name. Defaults to practical.", ) suite_cmd.add_argument("--provider", action="append", default=[], help="Provider to include. Can be repeated.") suite_cmd.add_argument("--providers", action="append", default=[], help="Comma-separated provider list.") suite_cmd.add_argument( "--parallel-tool-calls", choices=("default", "true", "false"), default="default", help="Set profile parallel_tool_calls for generated suite profiles.", ) suite_cmd.add_argument( "--preset", action="append", default=[], help="Provider preset to include. Can be repeated or comma-separated.", ) suite_cmd.add_argument( "--format", choices=("table", "json", "csv"), default="table", help="Output format. Defaults to table.", ) suite_cmd.add_argument( "--style", choices=("langchain", "litellm", "bare"), default="langchain", help="Model-string style for table and CSV output.", ) suite_cmd.add_argument( "--from-catalog", action="store_true", help="Build the suite from filtered catalog rows instead of configured presets.", ) suite_cmd.add_argument( "--source", choices=("auto", "provider", "litellm"), default="auto", help="Catalog source when --from-catalog is used.", ) suite_cmd.add_argument("--limit", type=int, default=None, help="Maximum number of catalog rows.") suite_cmd.add_argument( "--capability", action="append", choices=MODEL_CAPABILITY_CHOICES, default=[], help="Required catalog capability when --from-catalog is used.", ) suite_cmd.add_argument("--reasoning-only", action="store_true", help="Only catalog reasoning models.") suite_cmd.add_argument("--coding-only", action="store_true", help="Only catalog coding models.") suite_cmd.add_argument("--vision-only", action="store_true", help="Only catalog vision models.") suite_cmd.add_argument( "--function-calling-only", action="store_true", help="Only catalog models marked as function/tool-call capable.", ) suite_cmd.add_argument( "--tool-calling-only", action="store_true", help="Only catalog models marked as tool-call capable.", ) suite_cmd.add_argument( "--tool-choice-only", action="store_true", help="Only catalog models marked as explicit tool-choice capable.", ) suite_cmd.add_argument( "--parallel-tool-calls-only", action="store_true", help="Only catalog models marked as parallel-tool-call capable.", ) suite_cmd.add_argument( "--structured-output-only", action="store_true", help="Only catalog models marked as native structured-output capable.", ) suite_cmd.add_argument( "--min-context", type=int, default=None, help="Only catalog models with at least this many input/context tokens.", ) suite_cmd.add_argument( "--min-input-tokens", dest="min_context", type=int, default=None, help="Alias for --min-context.", ) suite_cmd.add_argument( "--min-output-tokens", type=int, default=None, help="Only catalog models with at least this many output tokens.", ) suite_cmd.add_argument( "--max-input-cost-per-1m", type=Decimal, default=None, help="Only catalog models at or below this input-token USD cost per 1M tokens.", ) suite_cmd.add_argument( "--max-output-cost-per-1m", type=Decimal, default=None, help="Only catalog models at or below this output-token USD cost per 1M tokens.", ) suite_cmd.add_argument("--released-after", default=None, help="Catalog lower release-date bound.") suite_cmd.add_argument("--released-before", default=None, help="Catalog upper release-date bound.") suite_cmd.add_argument( "--sort", choices=MODEL_CATALOG_SORT_CHOICES, default="recency", help="Catalog sort mode. Defaults to recency.", ) suite_cmd.add_argument("--strict", action="store_true", help="Fail if any selected provider cannot be listed.") suite_cmd.add_argument( "--no-rich", action="store_true", help="Use the built-in plain table renderer even when Rich is installed.", ) profiles = subcommands.add_parser( "profiles", formatter_class=argparse.RawDescriptionHelpFormatter, help="Serializable chat-model profile utilities.", epilog=PROFILES_EXAMPLES, ) profile_commands = profiles.add_subparsers(dest="profiles_command", required=True) validate = profile_commands.add_parser( "validate", formatter_class=argparse.RawDescriptionHelpFormatter, help="Validate a JSON chat-model profile.", epilog=PROFILES_EXAMPLES, ) validate.add_argument("--input", "-i", required=True, help="Profile JSON path, or '-' for stdin.") render = profile_commands.add_parser( "render", formatter_class=argparse.RawDescriptionHelpFormatter, help="Render normalized profile JSON.", epilog=PROFILES_EXAMPLES, ) render.add_argument("--input", "-i", required=True, help="Profile JSON path, or '-' for stdin.") render.add_argument("--format", choices=("json",), default="json", help="Output format. Defaults to JSON.") resolve = profile_commands.add_parser( "resolve", formatter_class=argparse.RawDescriptionHelpFormatter, help="Resolve model metadata for a profile.", epilog=PROFILES_EXAMPLES, ) resolve.add_argument("--input", "-i", required=True, help="Profile JSON path, or '-' for stdin.") resolve.add_argument( "--format", choices=("table", "json"), default="table", help="Output format. Defaults to table.", ) resolve.add_argument( "--source", choices=("auto", "provider", "litellm"), default="auto", help="Model-default refresh source when the profile resolves aliases or presets.", ) benchmarks = subcommands.add_parser( "benchmarks", formatter_class=argparse.RawDescriptionHelpFormatter, help="External benchmark exploration utilities.", epilog=BENCHMARKS_EXAMPLES, ) benchmark_commands = benchmarks.add_subparsers(dest="benchmarks_command", required=True) lcb_pro = benchmark_commands.add_parser( "lcb-pro", formatter_class=argparse.RawDescriptionHelpFormatter, help="Explore the public LiveCodeBench Pro leaderboard surfaces.", epilog=BENCHMARKS_EXAMPLES, ) lcb_commands = lcb_pro.add_subparsers(dest="lcb_pro_command", required=True) def add_lcb_connection_args(command: argparse.ArgumentParser) -> None: command.add_argument( "--base-url", default=None, help="Override the LiveCodeBench Pro backend URL.", ) command.add_argument( "--timeout", type=float, default=30, help="HTTP timeout in seconds. Defaults to 30.", ) summary_cmd = lcb_commands.add_parser( "summary", formatter_class=argparse.RawDescriptionHelpFormatter, help="Show what the LiveCodeBench Pro surface exposes.", epilog=BENCHMARKS_EXAMPLES, ) add_lcb_connection_args(summary_cmd) summary_cmd.add_argument("--format", choices=("table", "json"), default="table", help="Output format.") summary_cmd.add_argument("--active-only", action="store_true", help="Only include active model rows.") summary_cmd.add_argument("--limit", type=int, default=10, help="Maximum rows per fetched view. Use 0 for no limit.") summary_cmd.add_argument( "--no-difficulties", action="store_true", help="Do not fetch easy/medium/hard difficulty summaries.", ) summary_cmd.add_argument("--no-rich", action="store_true", help="Use plain table output even when Rich is installed.") lcb_models_cmd = lcb_commands.add_parser( "models", formatter_class=argparse.RawDescriptionHelpFormatter, help="List LiveCodeBench Pro leaderboard model rows.", epilog=BENCHMARKS_EXAMPLES, ) add_lcb_connection_args(lcb_models_cmd) lcb_models_cmd.add_argument("--format", choices=("table", "json", "csv"), default="table", help="Output format.") lcb_models_cmd.add_argument("--status", choices=("all", "active", "inactive"), default="all", help="Status filter.") lcb_models_cmd.add_argument("--provider", action="append", default=[], help="Provider filter. Can be repeated.") lcb_models_cmd.add_argument("--providers", action="append", default=[], help="Comma-separated provider filters.") lcb_models_cmd.add_argument("--organization", action="append", default=[], help="Organization filter. Can be repeated.") lcb_models_cmd.add_argument("--organizations", action="append", default=[], help="Comma-separated organization filters.") lcb_models_cmd.add_argument("--query", default=None, help="Case-insensitive model/provider/org search.") lcb_models_cmd.add_argument("--sort", choices=LCB_PRO_SORT_CHOICES, default="rating", help="Sort mode.") lcb_models_cmd.add_argument("--ascending", action="store_true", help="Sort ascending instead of descending.") lcb_models_cmd.add_argument("--limit", type=int, default=20, help="Maximum rows. Use 0 for no limit.") lcb_models_cmd.add_argument("--no-rich", action="store_true", help="Use plain table output even when Rich is installed.") difficulty_cmd = lcb_commands.add_parser( "difficulty", formatter_class=argparse.RawDescriptionHelpFormatter, help="Show pass rates for one difficulty slice.", epilog=BENCHMARKS_EXAMPLES, ) add_lcb_connection_args(difficulty_cmd) difficulty_cmd.add_argument("--difficulty", choices=LCB_PRO_DIFFICULTY_CHOICES, required=True) difficulty_cmd.add_argument("--format", choices=("table", "json", "csv"), default="table", help="Output format.") difficulty_cmd.add_argument("--provider", action="append", default=[], help="Provider filter. Can be repeated.") difficulty_cmd.add_argument("--providers", action="append", default=[], help="Comma-separated provider filters.") difficulty_cmd.add_argument("--organization", action="append", default=[], help="Organization filter. Can be repeated.") difficulty_cmd.add_argument("--organizations", action="append", default=[], help="Comma-separated organization filters.") difficulty_cmd.add_argument("--query", default=None, help="Case-insensitive model/provider/org search.") difficulty_cmd.add_argument("--sort", choices=LCB_PRO_SORT_CHOICES, default="rating", help="Sort mode.") difficulty_cmd.add_argument("--ascending", action="store_true", help="Sort ascending instead of descending.") difficulty_cmd.add_argument("--limit", type=int, default=20, help="Maximum rows. Use 0 for no limit.") difficulty_cmd.add_argument("--no-rich", action="store_true", help="Use plain table output even when Rich is installed.") submissions_cmd = lcb_commands.add_parser( "submissions", formatter_class=argparse.RawDescriptionHelpFormatter, help="Show per-problem verdicts for one model/provider/difficulty.", epilog=BENCHMARKS_EXAMPLES, ) add_lcb_connection_args(submissions_cmd) submissions_cmd.add_argument("--model-name", required=True, help="Leaderboard model name, not display name.") submissions_cmd.add_argument("--model-provider", required=True, help="Leaderboard provider key.") submissions_cmd.add_argument("--difficulty", choices=LCB_PRO_DIFFICULTY_CHOICES, required=True) submissions_cmd.add_argument("--format", choices=("table", "json", "csv"), default="table", help="Output format.") submissions_cmd.add_argument("--limit", type=int, default=50, help="Maximum flattened problem rows. Use 0 for no limit.") submissions_cmd.add_argument("--no-rich", action="store_true", help="Use plain table output even when Rich is installed.") submission_cmd = lcb_commands.add_parser( "submission", formatter_class=argparse.RawDescriptionHelpFormatter, help="Show one submission detail.", epilog=BENCHMARKS_EXAMPLES, ) add_lcb_connection_args(submission_cmd) submission_cmd.add_argument("--submission-id", required=True, help="LiveCodeBench Pro submission id.") submission_cmd.add_argument("--format", choices=("table", "json"), default="table", help="Output format.") submission_cmd.add_argument("--code-only", action="store_true", help="Print only the generated code.") recipes = subcommands.add_parser( "recipes", formatter_class=argparse.RawDescriptionHelpFormatter, help="Print copy/paste CLI and Python package recipes.", epilog="""\ Examples: ooai-llm recipes --topic cheapest ooai-llm recipes --topic coding ooai-llm recipes --topic runtime --format markdown """, ) recipes.add_argument( "--topic", choices=RECIPE_TOPICS, default="all", help="Recipe topic to print. Defaults to all.", ) recipes.add_argument( "--format", choices=("text", "markdown", "json"), default="text", help="Output format. Defaults to text.", ) tui = subcommands.add_parser( "tui", aliases=("interactive",), formatter_class=argparse.RawDescriptionHelpFormatter, help="Launch the optional Textual model explorer.", epilog="""\ Examples: ooai-llm tui ooai-llm tui --providers openai,anthropic,mistral --source litellm ooai-llm tui --theme paper ooai-llm tui --theme mono --refresh-cooldown 0 ooai-llm tui --views cheapest,catalog --providers mistral ooai-llm tui --catalog-all Install: pip install "ooai-llm[tui]" """, ) tui.add_argument("--source", choices=("auto", "provider", "litellm"), default="litellm", help="Initial catalog source.") add_provider_args(tui, noun="Provider") tui.add_argument("--limit", type=int, default=25, help="Initial row limit. Use 0 for no limit.") tui.add_argument( "--include-non-chat", action="store_true", help="Include embeddings, image, audio, and other non-chat rows in catalog views.", ) tui.add_argument( "--catalog-scope", choices=("supported", "litellm-registry"), default="supported", help="Catalog view source. supported uses ooai provider adapters; litellm-registry uses raw LiteLLM metadata.", ) tui.add_argument( "--catalog-all", "--all-litellm", action="store_true", help=( "Shortcut for raw LiteLLM registry exploration: catalog-only, " "include non-chat rows, and no row limit." ), ) tui.add_argument("--input-tokens", type=int, default=10_000, help="Initial comparison input-token shape.") tui.add_argument("--output-tokens", type=int, default=2_000, help="Initial comparison output-token shape.") tui.add_argument("--budget-usd", type=Decimal, default=Decimal("1"), help="Initial comparison budget.") tui.add_argument( "--view", action="append", choices=TUI_LOAD_VIEW_CHOICES, default=[], help="TUI data view to load. Can be repeated. Defaults to all views.", ) tui.add_argument( "--views", action="append", default=[], help="Comma-separated TUI data views to load. Example: cheapest,catalog.", ) tui.add_argument( "--theme", choices=TUI_THEME_CHOICES, default="paper", help="Initial TUI theme. Use mono for minimal color or slate for a dark neutral theme.", ) tui.add_argument( "--refresh-cooldown", type=float, default=2.0, help="Minimum seconds between manual TUI refreshes. Use 0 to disable throttling.", ) tui.add_argument( "--no-benchmarks", action="store_true", help="Hide benchmark exploration from the initial TUI state.", ) return parser def _read_text(path: str) -> str: if path == "-": return sys.stdin.read() return Path(path).expanduser().read_text(encoding="utf-8") def _read_profile(path: str) -> ChatModelProfile: return ChatModelProfile.from_json(_read_text(path)) def _run_recipes(args: argparse.Namespace) -> int: sections = selected_recipe_sections(args.topic) log_event(logger, "cli.recipes", topic=args.topic, format=args.format) print(render_recipes(sections, format=args.format), end="") return 0 def _run_tui(args: argparse.Namespace) -> int: from ..tui import TUIConfig, run_tui providers = _parse_providers([*args.provider, *args.providers]) load_views = normalize_tui_load_views([*args.view, *args.views]) catalog_scope = args.catalog_scope.replace("-", "_") include_non_chat = args.include_non_chat limit = args.limit if args.catalog_all: catalog_scope = "litellm_registry" include_non_chat = True limit = 0 if load_views is None: load_views = ["catalog"] config = TUIConfig( source=args.source, providers=providers, limit=limit, input_tokens=args.input_tokens, output_tokens=args.output_tokens, budget_usd=args.budget_usd, theme=args.theme, load_views=load_views, include_non_chat=include_non_chat, catalog_scope=catalog_scope, refresh_cooldown_seconds=args.refresh_cooldown, show_benchmarks=not args.no_benchmarks, ) log_event( logger, "cli.tui", source=args.source, providers=providers, theme=args.theme, load_views=load_views, catalog_scope=catalog_scope, ) run_tui(config) return 0 def _settings_for_profile_resolve(profile: ChatModelProfile, *, source: str) -> AppSettings: settings = AppSettings() providers = None if profile.provider is not None: providers = [str(profile.provider)] refresh = settings.llm.auto_refresh_models.model_copy( update={ "source": source, "providers": providers or settings.llm.auto_refresh_models.providers, } ) llm_settings = settings.llm.model_copy(update={"auto_refresh_models": refresh}) return settings.model_copy(update={"llm": llm_settings}) def _run_models_update(args: argparse.Namespace) -> int: providers = _parse_providers([*args.provider, *args.providers]) config = ListModelsConfig(limit=args.limit) if args.limit is not None else None log_event(logger, "cli.models.update", source=args.source, providers=providers) result = update_model_defaults( providers=providers, source=args.source, config=config, primary_alias_provider=args.primary_alias_provider, strict=args.strict, output_path=args.output, output_format=args.format, include_aliases=not args.no_aliases, ) for note in result.notes: print(f"warning: {note}", file=sys.stderr) if result.output_text is not None: print(result.output_text, end="") elif result.output_path is not None: print(f"Wrote model defaults to {result.output_path}", file=sys.stderr) return 0 def _limit_models_per_provider( models: Sequence[ModelDefaultCandidate], *, limit: int | None, ) -> list[ModelDefaultCandidate]: if limit is None or limit <= 0: return list(models) counts: dict[str, int] = {} limited: list[ModelDefaultCandidate] = [] for model in models: provider = model.provider.value count = counts.get(provider, 0) if count >= limit: continue counts[provider] = count + 1 limited.append(model) return limited def _limit_registry_per_provider( models: Sequence[LiteLLMRegistryModel], *, limit: int | None, ) -> list[LiteLLMRegistryModel]: if limit is None or limit <= 0: return list(models) counts: dict[str, int] = {} limited: list[LiteLLMRegistryModel] = [] for model in models: provider = model.provider count = counts.get(provider, 0) if count >= limit: continue counts[provider] = count + 1 limited.append(model) return limited def _models_list_limit(args: argparse.Namespace) -> int: if args.limit is not None: return args.limit if args.all_litellm: return 0 return 20 def _format_decimal(value: Decimal | None) -> str: if value is None: return "" return f"{value.normalize():f}" def _capability_filters(args: argparse.Namespace) -> list[ModelCapabilityName] | None: capabilities = list(args.capability) if args.reasoning_only: capabilities.append("reasoning") if args.coding_only: capabilities.append("coding") if args.vision_only: capabilities.append("vision") if args.function_calling_only: capabilities.append("function_calling") if args.tool_calling_only: capabilities.append("tool_calling") if args.tool_choice_only: capabilities.append("tool_choice") if args.parallel_tool_calls_only: capabilities.append("parallel_tool_calls") if args.structured_output_only: capabilities.append("structured_output") unique: list[ModelCapabilityName] = [] for capability in capabilities: if capability not in unique: unique.append(capability) return unique or None def _parse_presets(values: Sequence[str] | None) -> list[str] | None: if not values: return None presets: list[str] = [] for value in values: presets.extend(part.strip() for part in value.split(",") if part.strip()) return presets or None def _parse_optional_bool(value: str) -> bool | None: if value == "default": return None return value == "true" def _decimal_from_text(value: str) -> Decimal | None: if not value: return None try: return Decimal(value) except Exception: return None def _format_count_text(value: str | int | None) -> str: if value in (None, ""): return "" try: return f"{int(value):,}" except (TypeError, ValueError): return str(value) def _format_compact_decimal(value: Decimal | str | None, *, money: bool = False) -> str: decimal = value if isinstance(value, Decimal) else _decimal_from_text(str(value or "")) if decimal is None: return "n/a" absolute = abs(decimal) if absolute == 0: text = "0" elif absolute < Decimal("0.01"): text = f"{decimal.quantize(Decimal('0.0001')).normalize():f}" elif absolute < Decimal("1"): text = f"{decimal.quantize(Decimal('0.001')).normalize():f}" elif absolute < Decimal("10"): text = f"{decimal.quantize(Decimal('0.01')).normalize():f}" elif absolute < Decimal("100"): text = f"{decimal.quantize(Decimal('0.1')).normalize():f}" else: text = f"{decimal.quantize(Decimal('1')):,f}" return f"${text}" if money else text def _cost_style(value: Decimal | None) -> str: if value is None: return "dim" if value < Decimal("0.005"): return "green" if value < Decimal("0.025"): return "yellow" return "red" def _rich_money(value: str | Decimal | None, *, style: str | None = None): from rich.text import Text decimal = value if isinstance(value, Decimal) else _decimal_from_text(str(value or "")) if decimal is None: return Text("n/a", style="dim") return Text(_format_compact_decimal(decimal, money=True), style=style or _cost_style(decimal)) def _rich_number(value: str | Decimal | None, *, style: str = "bright_white"): from rich.text import Text decimal = value if isinstance(value, Decimal) else _decimal_from_text(str(value or "")) if decimal is None: return Text("n/a", style="dim") return Text(_format_compact_decimal(decimal), style=style) def _rich_price_pair(input_value: str, output_value: str): from rich.text import Text text = Text() text.append(_format_compact_decimal(input_value, money=True), style="bright_white") text.append(" / ", style="dim") text.append(_format_compact_decimal(output_value, money=True), style="bright_white") return text def _rich_token_pair(input_value: str | int | None, output_value: str | int | None): from rich.text import Text text = Text() text.append(_format_count_text(input_value) or "n/a", style="bright_white" if input_value not in (None, "") else "dim") text.append(" / ", style="dim") text.append( _format_count_text(output_value) or "n/a", style="bright_white" if output_value not in (None, "") else "dim", ) return text def _rich_count(value: str | int | None): from rich.text import Text text = _format_count_text(value) if not text: return Text("n/a", style="dim") return Text(text, style="bright_white") def _rich_model(value: str): from rich.text import Text return Text(value, style="bold white", overflow="fold") def _rich_role(value: str): from rich.text import Text styles = { "cheap": "bold green", "fast": "green", "balanced": "cyan", "reasoning": "magenta", "coding": "bold yellow", "vision": "blue", "testing": "dim", "default": "white", "latest": "bright_white", } return Text(value or "custom", style=styles.get(value, "white")) def _rich_capabilities(value: str): from rich.text import Text styles = { "chat": "white on grey23", "reasoning": "white on magenta", "coding": "black on green", "vision": "white on blue", "function_calling": "black on yellow", "tool_calling": "black on yellow", "tool_choice": "black on bright_cyan", "parallel_tool_calls": "black on bright_yellow", "structured_output": "white on dark_green", "cheap": "black on bright_green", } text = Text() capabilities = [item for item in value.split(",") if item] if not capabilities: return Text("n/a", style="dim") for index, capability in enumerate(capabilities): if index: text.append(" ") text.append(f" {capability.replace('_', ' ')} ", style=styles.get(capability, "white on grey23")) return text def _rich_metric(label: str, value: str, *, style: str = "bright_white"): from rich.text import Text text = Text() text.append(label, style="dim") text.append(value, style=style) return text def _rich_summary_panel(title: str, metrics: Sequence[object], *, note: str | None = None): from rich.console import Group from rich.panel import Panel from rich.text import Text body: list[object] = [] line = Text() for index, metric in enumerate(metrics): if index: line.append(" ") line.append_text(metric if isinstance(metric, Text) else Text(str(metric))) body.append(line) if note: body.append(Text(note, style="dim")) return Panel( Group(*body), title=title, title_align="left", border_style="bright_blue", padding=(0, 1), ) def _model_table_rows(models: Sequence[ModelDefaultCandidate]) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for model in models: rows.append( { "provider": model.provider.value, "model": model.model_string.as_langchain(), "source": model.source, "release": model.release_date or "", "input_per_1m": _format_decimal(model.input_cost_per_1m_tokens), "output_per_1m": _format_decimal(model.output_cost_per_1m_tokens), "max_input_tokens": str(model.context_window or ""), "max_output_tokens": str(model.max_output_tokens or ""), "context": str(model.context_window or ""), "mode": model.mode or "", "capabilities": ",".join(model.capability_labels), } ) return rows def _registry_table_rows(models: Sequence[LiteLLMRegistryModel]) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for model in models: rows.append( { "provider": model.provider, "model": model.model_string, "source": model.source, "release": model.release_date or "", "input_per_1m": _format_decimal(model.input_cost_per_1m_tokens), "output_per_1m": _format_decimal(model.output_cost_per_1m_tokens), "max_input_tokens": str(model.context_window or ""), "max_output_tokens": str(model.max_output_tokens or ""), "context": str(model.context_window or ""), "mode": model.mode or "", "capabilities": ",".join(model.capability_labels), } ) return rows def _print_plain_table(rows: Sequence[dict[str, str]]) -> None: columns = [ "provider", "model", "source", "release", "input_per_1m", "output_per_1m", "max_input_tokens", "max_output_tokens", "context", "mode", "capabilities", ] widths = { column: max([len(column), *(len(row[column]) for row in rows)] or [len(column)]) for column in columns } print(" ".join(column.ljust(widths[column]) for column in columns)) print(" ".join("-" * widths[column] for column in columns)) for row in rows: print(" ".join(row[column].ljust(widths[column]) for column in columns)) def _print_rich_table(rows: Sequence[dict[str, str]]) -> bool: try: from rich.console import Console from rich import box from rich.table import Table except ImportError: return False providers = sorted({row["provider"] for row in rows if row["provider"]}) sources = sorted({row["source"] for row in rows if row["source"]}) provider_summary = ", ".join(providers[:8]) if len(providers) > 8: provider_summary = f"{provider_summary}, +{len(providers) - 8} more" summary = _rich_summary_panel( "Model catalog", [ _rich_metric("rows ", str(len(rows)), style="bold white"), _rich_metric("providers ", provider_summary or "none", style="cyan"), _rich_metric("sources ", ", ".join(sources) or "none", style="dim"), ], note="Filterable provider catalog rows with pricing/context metadata when available.", ) table = Table( box=box.ROUNDED, border_style="grey39", header_style="bold bright_blue", row_styles=["", "grey11"], expand=True, show_lines=False, ) table.add_column("Model", ratio=4) table.add_column("Price/1M", justify="right", no_wrap=True) table.add_column("Tokens in/out", justify="right", no_wrap=True) table.add_column("Release", no_wrap=True) table.add_column("Capabilities", ratio=2) for row in rows: table.add_row( _rich_model(row["model"]), _rich_price_pair(row["input_per_1m"], row["output_per_1m"]), _rich_token_pair(row["max_input_tokens"], row["max_output_tokens"]), row["release"] or "n/a", _rich_capabilities(row["capabilities"]), ) Console().print(summary) Console().print(table) return True def _render_models_csv(rows: Sequence[dict[str, str]]) -> str: output = StringIO() fieldnames = [ "provider", "model", "source", "release", "input_per_1m", "output_per_1m", "max_input_tokens", "max_output_tokens", "context", "mode", "capabilities", ] writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return output.getvalue() def _run_models_list(args: argparse.Namespace) -> int: providers = _parse_providers([*args.provider, *args.providers]) limit = _models_list_limit(args) if args.all_litellm: return _run_models_list_litellm_registry(args, providers=providers, limit=limit) config = ListModelsConfig(limit=limit) if args.source == "provider" and limit else None log_event(logger, "cli.models.list", source=args.source, providers=providers, limit=limit) result = list_model_catalog( providers=providers, source=args.source, config=config, include_non_chat=args.include_non_chat, capabilities=_capability_filters(args), min_context_tokens=args.min_context, min_output_tokens=args.min_output_tokens, max_input_cost_per_1m=args.max_input_cost_per_1m, max_output_cost_per_1m=args.max_output_cost_per_1m, released_after=args.released_after, released_before=args.released_before, sort_by=args.sort, strict=args.strict, ) models = _limit_models_per_provider(result.models, limit=limit) rows = _model_table_rows(models) for note in result.notes: print(f"warning: {note}", file=sys.stderr) if args.format == "json": payload = [model.model_dump(mode="json") for model in models] print(json.dumps(payload, indent=2, sort_keys=True)) elif args.format == "csv": print(_render_models_csv(rows), end="") elif args.no_rich or not _print_rich_table(rows): _print_plain_table(rows) return 0 def _run_models_list_litellm_registry( args: argparse.Namespace, *, providers: Sequence[str] | None, limit: int, ) -> int: log_event(logger, "cli.models.list_litellm_registry", providers=providers, limit=limit) result = list_litellm_registry( providers=providers, include_non_chat=True, capabilities=_capability_filters(args), min_context_tokens=args.min_context, min_output_tokens=args.min_output_tokens, max_input_cost_per_1m=args.max_input_cost_per_1m, max_output_cost_per_1m=args.max_output_cost_per_1m, released_after=args.released_after, released_before=args.released_before, sort_by=args.sort, strict=args.strict, ) models = _limit_registry_per_provider(result.models, limit=limit) rows = _registry_table_rows(models) for note in result.notes: print(f"note: {note}", file=sys.stderr) if args.format == "json": payload = [model.model_dump(mode="json") for model in models] print(json.dumps(payload, indent=2, sort_keys=True)) elif args.format == "csv": print(_render_models_csv(rows), end="") elif args.no_rich or not _print_rich_table(rows): _print_plain_table(rows) return 0 def _comparison_baseline_estimate( args: argparse.Namespace, *, providers: Sequence[str] | None, ) -> ModelCostEstimate | None: if not args.baseline: return None baseline_model = ModelString.parse(args.baseline).canonical() baseline_providers: Sequence[str] | None = ( [baseline_model.provider.value] if baseline_model.provider is not None else providers ) baseline_comparison = compare_model_catalog( providers=baseline_providers, source=args.source, include_non_chat=True, input_tokens=args.input_tokens, output_tokens=args.output_tokens, budget_usd=args.budget_usd, limit=0, strict=args.strict, ) return baseline_comparison.find_model(args.baseline) def _comparison_baseline_ratios( estimates: Sequence[ModelCostEstimate], *, baseline: ModelCostEstimate | None, ) -> dict[str, Decimal]: if baseline is None or baseline.call_cost_usd is None or baseline.call_cost_usd <= 0: return {} ratios: dict[str, Decimal] = {} for estimate in estimates: if estimate.call_cost_usd is None or estimate.call_cost_usd <= 0: continue ratios[estimate.model.as_langchain()] = baseline.call_cost_usd / estimate.call_cost_usd return ratios def _comparison_equivalents( estimates: Sequence[ModelCostEstimate], *, baseline: ModelCostEstimate | None, ) -> list[ModelCallEquivalent]: if baseline is None or baseline.call_cost_usd is None or baseline.call_cost_usd <= 0: return [] equivalents: list[ModelCallEquivalent] = [] for estimate in estimates: if estimate.call_cost_usd is None or estimate.call_cost_usd <= 0: continue equivalents.append( ModelCallEquivalent( baseline_model=baseline.model, compared_model=estimate.model, baseline_call_cost_usd=baseline.call_cost_usd, compared_call_cost_usd=estimate.call_cost_usd, compared_calls_per_baseline_call=baseline.call_cost_usd / estimate.call_cost_usd, ) ) return equivalents def _comparison_table_rows( estimates: Sequence[ModelCostEstimate], *, style: str, baseline_ratios: dict[str, Decimal] | None = None, ) -> list[dict[str, str]]: ratios = baseline_ratios or {} rows: list[dict[str, str]] = [] for estimate in estimates: row = { "provider": estimate.provider.value, "model": estimate.model_name(style=style), # type: ignore[arg-type] "source": estimate.source, "release": estimate.release_date or "", "input_per_1m": _format_decimal(estimate.input_cost_per_1m_tokens), "output_per_1m": _format_decimal(estimate.output_cost_per_1m_tokens), "max_input_tokens": str(estimate.context_window or estimate.max_input_tokens or ""), "max_output_tokens": str(estimate.max_output_tokens or ""), "call_cost": _format_decimal(estimate.call_cost_usd), "calls_per_usd": _format_decimal(estimate.calls_per_usd), "calls_per_budget": _format_decimal(estimate.calls_per_budget), "context": str(estimate.context_window or ""), "capabilities": ",".join(estimate.capabilities), } if ratios: row["calls_per_baseline"] = _format_decimal(ratios.get(estimate.model.as_langchain())) rows.append(row) return rows def _comparison_columns(*, include_baseline: bool) -> list[str]: columns = [ "provider", "model", "source", "release", "input_per_1m", "output_per_1m", "max_input_tokens", "max_output_tokens", "call_cost", "calls_per_usd", "calls_per_budget", ] if include_baseline: columns.append("calls_per_baseline") columns.extend(["context", "capabilities"]) return columns def _print_comparison_plain_table( rows: Sequence[dict[str, str]], *, include_baseline: bool, ) -> None: columns = _comparison_columns(include_baseline=include_baseline) widths = { column: max([len(column), *(len(row[column]) for row in rows)] or [len(column)]) for column in columns } print(" ".join(column.ljust(widths[column]) for column in columns)) print(" ".join("-" * widths[column] for column in columns)) for row in rows: print(" ".join(row[column].ljust(widths[column]) for column in columns)) def _print_comparison_rich_table( comparison: ModelCostComparison, rows: Sequence[dict[str, str]], *, include_baseline: bool, baseline: ModelCostEstimate | None = None, ) -> bool: try: from rich.console import Console from rich import box from rich.table import Table except ImportError: return False cheapest = rows[0] if rows else None metrics: list[object] = [ _rich_metric("rows ", str(len(rows)), style="bold white"), _rich_metric("shape ", f"{comparison.shape.input_tokens:,} in / {comparison.shape.output_tokens:,} out"), _rich_metric("budget ", f"${_format_decimal(comparison.budget_usd)}", style="green"), ] if cheapest is not None: metrics.append(_rich_metric("cheapest ", cheapest["model"], style="bold green")) if baseline is not None: metrics.append(_rich_metric("baseline ", baseline.model.as_langchain(), style="yellow")) summary = _rich_summary_panel( "Model cost comparison", metrics, note="Estimated from catalog pricing for planning; provider usage metadata remains billing truth.", ) table = Table( box=box.ROUNDED, border_style="grey39", header_style="bold bright_blue", row_styles=["", "grey11"], expand=True, show_lines=False, ) table.add_column("Model", ratio=4) table.add_column("Price/1M", justify="right", no_wrap=True) table.add_column("Tokens in/out", justify="right", no_wrap=True) table.add_column("Call", justify="right", no_wrap=True) table.add_column(f"Calls/${_format_compact_decimal(comparison.budget_usd)}", justify="right", no_wrap=True) if include_baseline: table.add_column("Per baseline", justify="right", no_wrap=True) for row in rows: values = [ _rich_model(row["model"]), _rich_price_pair(row["input_per_1m"], row["output_per_1m"]), _rich_token_pair(row["max_input_tokens"], row["max_output_tokens"]), _rich_money(row["call_cost"]), _rich_number(row["calls_per_budget"], style="green"), ] if include_baseline: values.append(_rich_number(row["calls_per_baseline"], style="yellow")) table.add_row(*values) Console().print(summary) Console().print(table) return True def _render_comparison_csv( rows: Sequence[dict[str, str]], *, include_baseline: bool, ) -> str: output = StringIO() fieldnames = _comparison_columns(include_baseline=include_baseline) writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return output.getvalue() def _run_models_compare(args: argparse.Namespace) -> int: providers = _parse_providers([*args.provider, *args.providers]) log_event( logger, "cli.models.compare", source=args.source, providers=providers, input_tokens=args.input_tokens, output_tokens=args.output_tokens, ) comparison = compare_model_catalog( providers=providers, source=args.source, include_non_chat=args.include_non_chat, capabilities=_capability_filters(args), min_context_tokens=args.min_context, min_output_tokens=args.min_output_tokens, max_input_cost_per_1m=args.max_input_cost_per_1m, max_output_cost_per_1m=args.max_output_cost_per_1m, released_after=args.released_after, released_before=args.released_before, input_tokens=args.input_tokens, output_tokens=args.output_tokens, budget_usd=args.budget_usd, per_provider=args.per_provider, sort_by=args.sort, limit=args.limit, strict=args.strict, ) baseline_estimate = _comparison_baseline_estimate(args, providers=providers) baseline_ratios = _comparison_baseline_ratios( comparison.estimates, baseline=baseline_estimate, ) include_baseline = bool(args.baseline) rows = _comparison_table_rows( comparison.estimates, style=args.style, baseline_ratios=baseline_ratios, ) for note in comparison.notes: print(f"warning: {note}", file=sys.stderr) if args.baseline and baseline_estimate is None: print(f"warning: baseline model not found or missing pricing: {args.baseline}", file=sys.stderr) if args.format == "json": payload = { "comparison": comparison.model_dump(mode="json"), "equivalents": [ item.model_dump(mode="json") for item in _comparison_equivalents( comparison.estimates, baseline=baseline_estimate, ) ] if args.baseline else [], } print(json.dumps(payload, indent=2, sort_keys=True)) elif args.format == "csv": print(_render_comparison_csv(rows, include_baseline=include_baseline), end="") elif args.no_rich or not _print_comparison_rich_table( comparison, rows, include_baseline=include_baseline, baseline=baseline_estimate, ): _print_comparison_plain_table(rows, include_baseline=include_baseline) return 0 def _suite_table_rows(suite: ModelSuite, *, style: str) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for entry in suite: rows.append( { "key": entry.key, "provider": entry.provider.value if entry.provider else "", "role": entry.role, "model": entry.model_name(style=style), # type: ignore[arg-type] "capabilities": ",".join(entry.capabilities), } ) return rows def _print_suite_plain_table(rows: Sequence[dict[str, str]]) -> None: columns = ["key", "provider", "role", "model", "capabilities"] widths = { column: max([len(column), *(len(row[column]) for row in rows)] or [len(column)]) for column in columns } print(" ".join(column.ljust(widths[column]) for column in columns)) print(" ".join("-" * widths[column] for column in columns)) for row in rows: print(" ".join(row[column].ljust(widths[column]) for column in columns)) def _print_suite_rich_table(suite: ModelSuite, rows: Sequence[dict[str, str]]) -> bool: try: from rich.console import Console from rich import box from rich.table import Table except ImportError: return False providers = ", ".join(suite.providers) or "none" summary = _rich_summary_panel( f"Model suite: {suite.name}", [ _rich_metric("entries ", str(len(rows)), style="bold white"), _rich_metric("providers ", providers, style="cyan"), ], note=suite.description, ) table = Table( box=box.ROUNDED, border_style="grey39", header_style="bold bright_blue", row_styles=["", "grey11"], expand=True, show_lines=False, ) table.add_column("Key", style="bold white", no_wrap=True) table.add_column("Role", no_wrap=True) table.add_column("Model", ratio=3) table.add_column("Capabilities", ratio=2) for row in rows: table.add_row( row["key"], _rich_role(row["role"]), _rich_model(row["model"]), _rich_capabilities(row["capabilities"]), ) Console().print(summary) Console().print(table) return True def _render_suite_csv(rows: Sequence[dict[str, str]]) -> str: output = StringIO() fieldnames = ["key", "provider", "role", "model", "capabilities"] writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return output.getvalue() def _run_models_suite(args: argparse.Namespace) -> int: providers = _parse_providers([*args.provider, *args.providers]) log_event( logger, "cli.models.suite", suite=args.suite, providers=providers, from_catalog=args.from_catalog, ) if args.from_catalog: suite = model_suite_from_catalog( providers=providers, source=args.source, capabilities=_capability_filters(args), min_context_tokens=args.min_context, min_output_tokens=args.min_output_tokens, max_input_cost_per_1m=args.max_input_cost_per_1m, max_output_cost_per_1m=args.max_output_cost_per_1m, released_after=args.released_after, released_before=args.released_before, sort_by=args.sort, limit=args.limit, name=args.suite, parallel_tool_calls=_parse_optional_bool(args.parallel_tool_calls), strict=args.strict, ) else: suite = get_model_suite( args.suite, providers=providers, presets=_parse_presets(args.preset), parallel_tool_calls=_parse_optional_bool(args.parallel_tool_calls), ) rows = _suite_table_rows(suite, style=args.style) for note in suite.notes: print(f"warning: {note}", file=sys.stderr) if args.format == "json": print(json.dumps(suite.model_dump(mode="json"), indent=2, sort_keys=True)) elif args.format == "csv": print(_render_suite_csv(rows), end="") elif args.no_rich or not _print_suite_rich_table(suite, rows): _print_suite_plain_table(rows) return 0 def _run_profiles_validate(args: argparse.Namespace) -> int: profile = _read_profile(args.input) selector = profile.model or profile.alias or profile.provider or "default" log_event(logger, "cli.profiles.validate", selector=str(selector), profile_id=profile.id) print(f"Profile valid: {selector}") return 0 def _run_profiles_render(args: argparse.Namespace) -> int: profile = _read_profile(args.input) log_event(logger, "cli.profiles.render", profile_id=profile.id) print(profile.to_json()) return 0 def _print_profile_resolution_table(payload: dict[str, object]) -> None: rows = [ ("profile_id", str(payload.get("profile_id") or "")), ("model", str(payload.get("model") or "")), ("provider", str(payload.get("provider") or "")), ("litellm_model", str(payload.get("litellm_model") or "")), ("pricing_source", str(payload.get("pricing_source") or "")), ("input_cost_per_token", str(payload.get("input_cost_per_token") or "")), ("output_cost_per_token", str(payload.get("output_cost_per_token") or "")), ("max_input_tokens", str(payload.get("max_input_tokens") or "")), ("reasoning", str(payload.get("reasoning") or "")), ("cache_namespace", str(payload.get("cache_namespace") or "")), ("cache_key", str(payload.get("cache_key") or "")), ] width = max(len(key) for key, _ in rows) for key, value in rows: print(f"{key.ljust(width)} {value}") def _run_profiles_resolve(args: argparse.Namespace) -> int: profile = _read_profile(args.input) settings = _settings_for_profile_resolve(profile, source=args.source) should_refresh = profile.auto_refresh_models if should_refresh is None and profile.model is None: should_refresh = True log_event(logger, "cli.profiles.resolve", source=args.source, profile_id=profile.id) resolution = profile.resolve(settings=settings, auto_refresh_models=should_refresh) payload = { "model": resolution.model.as_langchain(), "profile_id": resolution.profile_id, "provider": resolution.metadata.identity.provider.value if resolution.metadata.identity.provider else None, "litellm_model": resolution.metadata.identity.litellm_model, "pricing_source": resolution.metadata.pricing.source, "input_cost_per_token": str(resolution.metadata.pricing.input_cost_per_token), "output_cost_per_token": str(resolution.metadata.pricing.output_cost_per_token), "max_input_tokens": resolution.metadata.max_input_tokens, "reasoning": resolution.reasoning.config.model_dump(mode="json") if resolution.reasoning else None, "cache_namespace": resolution.cache_namespace, "cache_key": resolution.cache_key, } if args.format == "json": print(json.dumps(payload, indent=2, sort_keys=True)) else: _print_profile_resolution_table(payload) return 0 def _lcb_client(args: argparse.Namespace) -> LiveCodeBenchProClient: from ..benchmarks.livecodebench_pro import LiveCodeBenchProClient kwargs: dict[str, object] = {"timeout": args.timeout} if args.base_url: kwargs["base_url"] = args.base_url return LiveCodeBenchProClient(**kwargs) def _format_percent(value: float | None) -> str: if value is None: return "" return f"{value * 100:.2f}%" def _render_dict_csv(rows: Sequence[dict[str, str]], columns: Sequence[str]) -> str: output = StringIO() writer = csv.DictWriter(output, fieldnames=list(columns)) writer.writeheader() writer.writerows(rows) return output.getvalue() def _print_simple_plain_table(rows: Sequence[dict[str, str]], columns: Sequence[str]) -> None: widths = { column: max([len(column), *(len(row.get(column, "")) for row in rows)] or [len(column)]) for column in columns } print(" ".join(column.ljust(widths[column]) for column in columns)) print(" ".join("-" * widths[column] for column in columns)) for row in rows: print(" ".join(row.get(column, "").ljust(widths[column]) for column in columns)) def _print_simple_rich_table( title: str, rows: Sequence[dict[str, str]], columns: Sequence[tuple[str, str]], *, note: str | None = None, ) -> bool: try: from rich.console import Console from rich import box from rich.table import Table except ImportError: return False summary = _rich_summary_panel( title, [_rich_metric("rows ", str(len(rows)), style="bold white")], note=note, ) table = Table( box=box.ROUNDED, border_style="grey39", header_style="bold bright_blue", row_styles=["", "grey11"], expand=True, show_lines=False, ) for key, label in columns: justify = "right" if key in {"rating", "passrate", "validrate", "accepted"} else "left" table.add_column(label, justify=justify, overflow="fold") for row in rows: table.add_row(*(row.get(key, "") for key, _ in columns)) console = Console() console.print(summary) console.print(table) return True def _lcb_model_rows(models: Sequence[LiveCodeBenchProModel]) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for model in models: rows.append( { "rating": str(model.rating or ""), "model": model.label, "name": model.name, "provider": model.provider, "organization": model.organization or "", "status": model.status or "", "license": model.model_license or "", "events": str(len(model.rating_events)), } ) return rows def _lcb_difficulty_rows(result: LiveCodeBenchProDifficultyResult) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for model in result.llms: rows.append( { "rating": str(model.rating or ""), "passrate": _format_percent(model.passrate), "validrate": _format_percent(model.validrate), "model": model.label, "name": model.name, "provider": model.provider, "organization": model.organization or "", "status": model.status or "", } ) return rows def _lcb_submission_rows(result: LiveCodeBenchProSubmissionsResult, *, limit: int | None) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for contest in result.contests: for problem in contest.problems: rows.append( { "contest": contest.contest_title or "", "problem": problem.problem_index or problem.problem_name or "", "verdict": problem.verdict or "", "status": problem.status or "", "accepted": "yes" if problem.accepted else "no", "submission_id": problem.submission_id or "", "problem_link": problem.problem_link or "", } ) if limit and limit > 0 and len(rows) >= limit: return rows return rows def _limit_lcb_submissions( result: LiveCodeBenchProSubmissionsResult, *, limit: int | None, ) -> LiveCodeBenchProSubmissionsResult: if limit is None or limit <= 0: return result remaining = limit contests = [] for contest in result.contests: if remaining <= 0: break problems = contest.problems[:remaining] if problems: contests.append(contest.model_copy(update={"problems": problems})) remaining -= len(problems) return result.model_copy(update={"contests": contests}) def _lcb_submission_detail_rows(detail: LiveCodeBenchProSubmissionDetail) -> list[dict[str, str]]: code_lines = detail.code.count("\n") + 1 if detail.code else 0 return [ {"key": "problem_id", "value": detail.problem_id or ""}, {"key": "platform", "value": detail.platform or ""}, {"key": "model_name", "value": detail.model_name or ""}, {"key": "model_provider", "value": detail.model_provider or ""}, {"key": "verdict", "value": detail.verdict or ""}, {"key": "status", "value": detail.status or ""}, {"key": "code_lines", "value": str(code_lines or "")}, ] def _run_benchmarks_lcb_summary(args: argparse.Namespace) -> int: from ..benchmarks.livecodebench_pro import get_livecodebench_pro_snapshot, livecodebench_pro_endpoints client = _lcb_client(args) log_event(logger, "cli.benchmarks.lcb_pro.summary", active_only=args.active_only, limit=args.limit) snapshot = get_livecodebench_pro_snapshot( client=client, include_difficulties=not args.no_difficulties, active_only=args.active_only, limit=args.limit, ) if args.format == "json": print(json.dumps(snapshot.model_dump(mode="json"), indent=2, sort_keys=True)) return 0 endpoint_rows = [ { "name": endpoint.name, "method": endpoint.method, "path": endpoint.path, "query": ", ".join(endpoint.query), "stability": endpoint.stability, } for endpoint in livecodebench_pro_endpoints() ] model_rows = _lcb_model_rows(snapshot.models) difficulty_rows = [ { "difficulty": difficulty, "rows": str(len(result.llms)), "best": result.llms[0].label if result.llms else "", "best_passrate": _format_percent(result.llms[0].passrate) if result.llms else "", } for difficulty, result in snapshot.difficulties.items() ] print(f"LiveCodeBench Pro models: {len(snapshot.models)} shown, {snapshot.active_count} active") for note in snapshot.notes: print(f"warning: {note}", file=sys.stderr) if not args.no_rich and _print_simple_rich_table( "LiveCodeBench Pro endpoints", endpoint_rows, [("name", "Name"), ("method", "Method"), ("path", "Path"), ("query", "Query"), ("stability", "Stability")], note="Public frontend endpoints; not documented as a stable API.", ): _print_simple_rich_table( "Top LiveCodeBench Pro models", model_rows, [("rating", "Rating"), ("model", "Model"), ("provider", "Provider"), ("organization", "Org"), ("status", "Status")], ) if difficulty_rows: _print_simple_rich_table( "Difficulty slices", difficulty_rows, [("difficulty", "Difficulty"), ("rows", "Rows"), ("best", "Best"), ("best_passrate", "Best pass")], ) else: _print_simple_plain_table(endpoint_rows, ["name", "method", "path", "query", "stability"]) print() _print_simple_plain_table(model_rows, ["rating", "model", "provider", "organization", "status"]) if difficulty_rows: print() _print_simple_plain_table(difficulty_rows, ["difficulty", "rows", "best", "best_passrate"]) return 0 def _run_benchmarks_lcb_models(args: argparse.Namespace) -> int: client = _lcb_client(args) providers = _parse_providers([*args.provider, *args.providers]) organizations = _parse_providers([*args.organization, *args.organizations]) status = None if args.status == "all" else args.status log_event(logger, "cli.benchmarks.lcb_pro.models", status=status, providers=providers, limit=args.limit) models = client.list_models( status=status, providers=providers, organizations=organizations, query=args.query, sort_by=args.sort, descending=not args.ascending, limit=args.limit, ) rows = _lcb_model_rows(models) columns = ["rating", "model", "name", "provider", "organization", "status", "license", "events"] if args.format == "json": print(json.dumps([model.model_dump(mode="json") for model in models], indent=2, sort_keys=True)) elif args.format == "csv": print(_render_dict_csv(rows, columns), end="") elif args.no_rich or not _print_simple_rich_table( "LiveCodeBench Pro models", rows, [ ("rating", "Rating"), ("model", "Model"), ("provider", "Provider"), ("organization", "Org"), ("status", "Status"), ("license", "License"), ], note="Leaderboard rows from an undocumented public frontend API.", ): _print_simple_plain_table(rows, columns) return 0 def _run_benchmarks_lcb_difficulty(args: argparse.Namespace) -> int: client = _lcb_client(args) providers = _parse_providers([*args.provider, *args.providers]) organizations = _parse_providers([*args.organization, *args.organizations]) log_event(logger, "cli.benchmarks.lcb_pro.difficulty", difficulty=args.difficulty, providers=providers) result = client.get_difficulty( args.difficulty, providers=providers, organizations=organizations, query=args.query, sort_by=args.sort, descending=not args.ascending, limit=args.limit, ) rows = _lcb_difficulty_rows(result) columns = ["rating", "passrate", "validrate", "model", "name", "provider", "organization", "status"] if args.format == "json": print(json.dumps(result.model_dump(mode="json"), indent=2, sort_keys=True)) elif args.format == "csv": print(_render_dict_csv(rows, columns), end="") elif args.no_rich or not _print_simple_rich_table( f"LiveCodeBench Pro {args.difficulty}", rows, [ ("rating", "Rating"), ("passrate", "Pass"), ("validrate", "Valid"), ("model", "Model"), ("provider", "Provider"), ("organization", "Org"), ], note="Pass rates are reported by the leaderboard backend for this difficulty slice.", ): _print_simple_plain_table(rows, columns) return 0 def _run_benchmarks_lcb_submissions(args: argparse.Namespace) -> int: client = _lcb_client(args) log_event( logger, "cli.benchmarks.lcb_pro.submissions", model_name=args.model_name, model_provider=args.model_provider, difficulty=args.difficulty, ) result = client.get_submissions( model_name=args.model_name, model_provider=args.model_provider, difficulty=args.difficulty, ) limited = _limit_lcb_submissions(result, limit=args.limit) rows = _lcb_submission_rows(limited, limit=args.limit) columns = ["contest", "problem", "verdict", "status", "accepted", "submission_id", "problem_link"] if args.format == "json": print(json.dumps(limited.model_dump(mode="json"), indent=2, sort_keys=True)) elif args.format == "csv": print(_render_dict_csv(rows, columns), end="") elif args.no_rich or not _print_simple_rich_table( f"LiveCodeBench Pro submissions: {args.model_name}", rows, [ ("contest", "Contest"), ("problem", "Problem"), ("verdict", "Verdict"), ("status", "Status"), ("accepted", "AC"), ("submission_id", "Submission"), ], note=f"{limited.accepted_count}/{limited.problem_count} listed problems accepted.", ): _print_simple_plain_table(rows, columns) return 0 def _run_benchmarks_lcb_submission(args: argparse.Namespace) -> int: client = _lcb_client(args) log_event(logger, "cli.benchmarks.lcb_pro.submission", submission_id=args.submission_id) detail = client.get_submission(args.submission_id) if args.code_only: print(detail.code or "", end="" if detail.code and detail.code.endswith("\n") else "\n") return 0 if args.format == "json": print(json.dumps(detail.model_dump(mode="json"), indent=2, sort_keys=True)) else: rows = _lcb_submission_detail_rows(detail) _print_simple_plain_table(rows, ["key", "value"]) return 0
[docs] def main(argv: Sequence[str] | None = None) -> int: """Run the ``ooai-llm`` command-line interface.""" parser = _build_parser() if (argv is not None and len(argv) == 0) or (argv is None and len(sys.argv) == 1): parser.print_help() return 0 args = parser.parse_args(argv) if args.log_level is not None or args.log_mode is not None: configure_logging(level=args.log_level, mode=args.log_mode) try: if args.command == "models" and args.models_command == "update": return _run_models_update(args) if args.command == "models" and args.models_command == "list": return _run_models_list(args) if args.command == "models" and args.models_command == "compare": return _run_models_compare(args) if args.command == "models" and args.models_command in {"cheapest", "cheap"}: return _run_models_compare(args) if args.command == "models" and args.models_command in {"coding", "code"}: return _run_models_compare(args) if args.command == "models" and args.models_command == "suite": return _run_models_suite(args) if args.command == "recipes": return _run_recipes(args) if args.command in {"tui", "interactive"}: return _run_tui(args) if args.command == "profiles" and args.profiles_command == "validate": return _run_profiles_validate(args) if args.command == "profiles" and args.profiles_command == "render": return _run_profiles_render(args) if args.command == "profiles" and args.profiles_command == "resolve": return _run_profiles_resolve(args) if args.command == "benchmarks" and args.benchmarks_command == "lcb-pro": if args.lcb_pro_command == "summary": return _run_benchmarks_lcb_summary(args) if args.lcb_pro_command == "models": return _run_benchmarks_lcb_models(args) if args.lcb_pro_command == "difficulty": return _run_benchmarks_lcb_difficulty(args) if args.lcb_pro_command == "submissions": return _run_benchmarks_lcb_submissions(args) if args.lcb_pro_command == "submission": return _run_benchmarks_lcb_submission(args) except Exception as exc: log_event(logger, "cli.error", level="error", error_type=type(exc).__name__) print(f"error: {exc}", file=sys.stderr) return 1 parser.error("Unknown command.") return 2
if __name__ == "__main__": # pragma: no cover raise SystemExit(main())