"""
Optimizer wrapper for Tangent SSEOptimizer.
"""
import cppyy
from typing import List, Optional
from dataclasses import dataclass
from .core import init
[docs]
@dataclass
class OptimizationResult:
"""Result of an optimization run.
Attributes:
iterations: Number of optimizer iterations performed.
initial_error: Total squared error before optimization.
final_error: Total squared error after optimization.
converged: Whether the optimizer converged to a minimum.
error_decreased: Whether the final error is less than the initial error.
error_history: List of total error values at each iteration.
"""
iterations: int
initial_error: float
final_error: float
converged: bool
error_decreased: bool
error_history: List[float]
[docs]
class Optimizer:
"""
Nonlinear least squares optimizer using Tangent's SSEOptimizer.
This class wraps SSEOptimizer and handles the complex template
instantiation required by cppyy.
Example:
>>> opt = Optimizer(
... variables=["SimpleScalar"],
... error_terms=["DifferenceError"]
... )
>>> x = SimpleScalar(10.0)
>>> y = SimpleScalar(0.0)
>>> k1 = opt.add_variable(x)
>>> k2 = opt.add_variable(y)
>>> opt.add_error_term("DifferenceError", k1, k2)
>>> result = opt.optimize()
"""
# Class-level counter for unique type names
_instance_counter = 0
[docs]
def __init__(self,
variables: List[str],
error_terms: List[str],
huber_delta: float = 1000.0):
"""
Create an optimizer.
Args:
variables: List of variable type names (e.g., ["SimpleScalar", "SE3"])
error_terms: List of error term class names
huber_delta: Huber loss function delta parameter
"""
init()
self._var_types = variables
self._err_types = error_terms
# Get unique instance ID
Optimizer._instance_counter += 1
self._instance_id = Optimizer._instance_counter
# Build the fully-qualified type strings
var_group = ", ".join(f"Tangent::{v}" for v in variables)
err_group = ", ".join(error_terms)
# Store for later use
self._var_group_str = var_group
self._err_group_str = err_group
# Build the complex nested optimizer type
optimizer_type_str = self._build_optimizer_type_string(var_group, err_group)
solver_type_str = self._build_solver_type_string(var_group, err_group)
# Use cppyy to instantiate the optimizer type and create an instance
try:
# Define a factory function in C++ to create the optimizer
# The solver needs a loss function reference, so we use a static loss function
factory_code = f"""
namespace TangentPy {{
using OptType_{self._instance_id} = {optimizer_type_str};
using SolverType_{self._instance_id} = {solver_type_str};
// Static loss function that persists for the lifetime of the optimizer
static Tangent::HuberLossFunction<double> lossFunction_{self._instance_id}({huber_delta});
OptType_{self._instance_id}* create_optimizer_{self._instance_id}() {{
SolverType_{self._instance_id} solver(lossFunction_{self._instance_id});
return new OptType_{self._instance_id}(std::move(solver));
}}
}}
"""
cppyy.cppdef(factory_code)
# Create the optimizer instance
factory = getattr(cppyy.gbl.TangentPy, f"create_optimizer_{self._instance_id}")
self._cpp_opt = factory()
except Exception as e:
raise RuntimeError(f"Failed to instantiate optimizer: {e}\n"
f"Type string: {optimizer_type_str}")
def _build_solver_type_string(self, var_group: str, err_group: str) -> str:
"""Build the PSDSchurSolver template type string."""
return f"""Tangent::PSDSchurSolver<
Tangent::Scalar<double>,
Tangent::LossFunction<Tangent::HuberLossFunction<double>>,
Tangent::ErrorTermGroup<{err_group}>,
Tangent::VariableGroup<{var_group}>,
Tangent::VariableGroup<>
>"""
def _build_optimizer_type_string(self, var_group: str, err_group: str) -> str:
"""Build the full SSEOptimizer template type string."""
solver_type = self._build_solver_type_string(var_group, err_group)
return f"""Tangent::SSEOptimizer<
Tangent::Scalar<double>,
{solver_type},
Tangent::GaussianPrior<
Tangent::Scalar<double>,
Tangent::VariableGroup<{var_group}>
>,
Tangent::Marginalizer<
Tangent::Scalar<double>,
Tangent::VariableGroup<{var_group}>,
Tangent::ErrorTermGroup<{err_group}>
>,
Tangent::VariableGroup<{var_group}>,
Tangent::ErrorTermGroup<{err_group}>
>"""
[docs]
def add_variable(self, variable, prior_info=None):
"""
Add a variable to the optimization problem.
Args:
variable: A Tangent variable instance (SimpleScalar, SE3, etc.)
prior_info: Optional prior information matrix (numpy array or scalar)
Returns:
Variable key for referencing this variable
"""
key = self._cpp_opt.addVariable(variable)
if prior_info is not None:
self.set_prior(key, prior_info)
return key
[docs]
def set_prior(self, key, information):
"""
Set the prior information matrix for a variable.
Args:
key: Variable key from add_variable()
information: Information matrix (scalar for 1D, or numpy array)
"""
import numpy as np
# Convert scalar to Eigen matrix if needed
if np.isscalar(information):
# Get dimension from variable type
var = self._cpp_opt.getVariable(key)
dim = var.dimension
# Create Eigen matrix
info_matrix = cppyy.gbl.Eigen.Matrix["double", dim, dim]()
info_matrix.setIdentity()
# Scale by the scalar value
for i in range(dim):
for j in range(dim):
if i == j:
info_matrix[i, j] = float(information)
else:
info_matrix[i, j] = 0.0
self._cpp_opt.setVariablePrior(key, info_matrix)
else:
# Assume it's already an Eigen matrix or numpy array
self._cpp_opt.setVariablePrior(key, information)
[docs]
def add_error_term(self, error_type: str, *args):
"""
Add an error term to the optimization problem.
Args:
error_type: Name of the error term class
*args: Arguments to pass to the error term constructor
(typically variable keys and measurement data)
Returns:
Error term key
"""
# Get the error term class from cppyy
err_class = getattr(cppyy.gbl, error_type)
# Construct the error term with provided arguments
err = err_class(*args)
# Add to optimizer
return self._cpp_opt.addErrorTerm(err)
[docs]
def get_variable(self, key):
"""
Get a variable by its key.
Args:
key: Variable key from add_variable()
Returns:
The variable instance with current optimized value
"""
return self._cpp_opt.getVariable(key)
@property
def settings(self):
"""Access optimizer settings."""
return self._cpp_opt.settings
[docs]
def optimize(self, max_iterations: Optional[int] = None) -> OptimizationResult:
"""
Run the optimization.
Args:
max_iterations: Override the default maximum iterations
Returns:
OptimizationResult with convergence information
"""
if max_iterations is not None:
self._cpp_opt.settings.maximumIterations = max_iterations
cpp_result = self._cpp_opt.optimize()
return OptimizationResult(
iterations=cpp_result.iterations,
initial_error=cpp_result.initialError,
final_error=cpp_result.finalError,
converged=cpp_result.converged,
error_decreased=cpp_result.errorDecreased,
error_history=list(cpp_result.errorHistory)
)
[docs]
def variable_count(self, var_type: str) -> int:
"""Get the number of variables of a given type."""
var_class = getattr(cppyy.gbl.Tangent, var_type)
return self._cpp_opt.variableCount[var_class]()
[docs]
def error_term_count(self, error_type: str) -> int:
"""Get the number of error terms of a given type."""
err_class = getattr(cppyy.gbl, error_type)
return self._cpp_opt.errorTermCount[err_class]()