"""
pasted._optimizer
=================
Objective-based structure optimization via Markov Chain Monte Carlo.
Three methods are provided:
``"annealing"`` — Simulated Annealing
Exponential temperature cooling from *T_start* to *T_end* over
*max_steps*. Accepts moves via the Metropolis criterion.
``"basin_hopping"`` — Basin-Hopping
Constant temperature (*T_start*) with 3× relax cycles per step for
stronger local minimization before each Metropolis test.
``"parallel_tempering"`` — Replica-Exchange MC
*n_replicas* independent chains at geometrically spaced temperatures
between *T_end* (coldest) and *T_start* (hottest). Adjacent replicas
attempt a state exchange every *pt_swap_interval* steps, tunneling
good states from hot replicas to cold ones. At equal wall time, PT
typically outperforms SA and BH on rugged objective landscapes.
Move types (chosen with equal probability each step)
-----------------------------------------------------
Fragment coordinate move
Atoms whose local Q6 exceeds *frag_threshold* are displaced by a
random vector of magnitude ≤ *move_step* Å. If no atom exceeds the
threshold, a single random atom is moved.
Affine coordinate move (when ``allow_affine_moves=True``)
Random stretch/compress along one axis, shear of one axis pair, and
optional per-atom jitter. The center of mass is pinned. Controlled
by ``affine_strength`` (and the optional ``affine_stretch``,
``affine_shear``, ``affine_jitter`` overrides).
Composition move
Parity-preserving element replacement: a random atom is replaced with
an element from *element_pool* whose atomic number has the same parity
(Z mod 2) as the original, preserving the total electron-count parity
and therefore charge/multiplicity validity. When no same-parity
candidate exists, two atoms are replaced simultaneously so that the
combined ΔZ is even (fallback path).
If the initial structure passed to :meth:`StructureOptimizer.run`
contains atoms outside the pool, each foreign atom is replaced by a
parity-compatible pool element via :func:`_sanitize_atoms_to_pool`
before the MC loop begins. This sanitization applies to SA, BH, and PT.
Objective function
------------------
The objective is **maximized**. Pass a weight dict or any callable::
# Dict form: f = sum(w * metric)
objective = {"H_atom": 1.0, "H_spatial": 1.0, "Q6": -2.0}
# 1-arg callable
objective = lambda m: m["H_spatial"] - 2.0 * m["Q6"]
# 2-arg callable with EvalContext (full structure + optimizer state)
def my_obj(m, ctx):
pos = np.array(ctx.positions)
return float(m["H_total"]) - float(np.max(ctx.per_atom_q6))
Use negative weights to penalize a metric.
Public API
----------
:class:`StructureOptimizer`
Main entry point. Build, then call ``.run()`` to get an
:class:`OptimizationResult`.
:class:`OptimizationResult`
List-compatible container of per-restart structures, sorted
best-first. ``result.best`` is the highest-scoring structure.
:class:`EvalContext`
Frozen snapshot of the current candidate structure and live optimizer
state, passed as the second argument to 2-parameter objective callables.
:func:`parse_objective_spec`
Convert a list of ``"METRIC:WEIGHT"`` strings to an objective dict.
"""
from __future__ import annotations
import inspect
import itertools
import math
import random
import sys
import warnings
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
import numpy as np
from ._atoms import (
ALL_METRICS,
ATOMIC_NUMBERS,
_cov_radius_ang,
default_element_pool,
parse_element_spec,
validate_charge_mult,
)
from ._ext import HAS_RELAX
from ._ext import relax_positions as _cpp_relax_positions
from ._generator import Structure, StructureGenerator
from ._io import _fmt
from ._metrics import compute_all_metrics, compute_steinhardt_per_atom
from ._placement import Vec3, _affine_move, place_gas, relax_positions
# ---------------------------------------------------------------------------
# EvalContext dataclass
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True, slots=True)
class EvalContext:
"""Full evaluation context passed as the second argument to a 2-parameter
objective callable.
``EvalContext`` consolidates every piece of information available at the
moment the objective function is called: the current structure (atoms,
positions, charge/mult), all pre-computed disorder metrics, and the live
optimizer state (step number, temperature, best score seen so far, etc.).
This design allows user-supplied objective functions to call external
quantum-chemistry or machine-learning potential tools without depending on
PASTED internals, and to implement adaptive or state-aware objectives.
Attributes — Structure
----------------------
atoms:
Element symbols for the current candidate structure, one per atom
(e.g. ``("C", "H", "O", ...)``).
positions:
Cartesian coordinates in Å, one ``(x, y, z)`` tuple per atom.
charge:
Total system charge.
mult:
Spin multiplicity 2S+1.
n_atoms:
Number of atoms (``len(atoms)``).
metrics:
Computed disorder metrics dict — same reference as the ``m`` argument
in the objective callable. Treat as read-only.
Attributes — Optimizer Runtime State
-------------------------------------
step:
Current MC step index, 0-based. Ranges from 0 to ``max_steps - 1``.
Useful for progress-dependent or curriculum objectives.
max_steps:
Total number of MC steps per restart.
temperature:
Current temperature at this step. For ``"annealing"`` this decreases
exponentially; for ``"basin_hopping"`` it is fixed at ``T_start``; for
``"parallel_tempering"`` it is this replica's fixed temperature.
f_current:
Objective value of the most recently *accepted* state. Use this to
compute improvement margins or relative scores.
best_f:
Best objective value seen across all steps so far in this restart.
restart_idx:
0-based index of the current restart.
n_restarts:
Total number of restarts configured.
per_atom_q6:
Per-atom Steinhardt Q6 values from the *previous accepted* step
(shape ``[n_atoms]``, dtype ``float64``). Already computed by the
optimizer loop; available at zero additional cost.
Treat the array as read-only — it is a reference, not a copy.
Attributes — Parallel Tempering (``None`` for other methods)
-------------------------------------------------------------
replica_idx:
0-based index of the current replica (0 = coldest, ``n_replicas - 1``
= hottest). ``None`` when ``method != "parallel_tempering"``.
replica_temperature:
This replica's fixed temperature. ``None`` when
``method != "parallel_tempering"``.
n_replicas:
Total number of replicas. ``None`` when
``method != "parallel_tempering"``.
Attributes — Optimizer Configuration
-------------------------------------
element_pool:
Tuple of element symbols available for composition moves.
cutoff:
Distance cutoff in Å used for Steinhardt and graph metrics.
method:
Optimization method: ``"annealing"``, ``"basin_hopping"``, or
``"parallel_tempering"``.
T_start:
Starting temperature.
T_end:
Ending temperature (for ``"annealing"``).
seed:
Random seed, or ``None`` if unseeded.
"""
# ── Structure ─────────────────────────────────────────────────────────
atoms: tuple[str, ...]
positions: tuple[tuple[float, float, float], ...]
charge: int
mult: int
n_atoms: int
metrics: dict[str, float]
# ── Optimizer runtime state ────────────────────────────────────────────
step: int
max_steps: int
temperature: float
f_current: float
best_f: float
restart_idx: int
n_restarts: int
per_atom_q6: np.ndarray # shape [n_atoms], dtype float64; treat as read-only
# ── Parallel Tempering (None for non-PT methods) ───────────────────────
replica_idx: int | None
replica_temperature: float | None
n_replicas: int | None
# ── Optimizer configuration ────────────────────────────────────────────
element_pool: tuple[str, ...]
cutoff: float
method: str
T_start: float
T_end: float
seed: int | None
# ── Convenience methods ────────────────────────────────────────────────
[docs]
def to_xyz(self, comment: str = "") -> str:
"""Return a well-formed XYZ-format string for the current structure.
The string is suitable for writing directly to a ``.xyz`` file and
passing to external tools such as xTB, ORCA, or any ASE calculator.
Parameters
----------
comment:
Optional comment placed on the second line of the XYZ block.
When empty, a default comment containing charge and multiplicity
is generated automatically.
Returns
-------
str
Multi-line XYZ string (no trailing newline).
"""
if not comment:
comment = f"charge={self.charge} mult={self.mult}"
lines = [str(self.n_atoms), comment]
for sym, (x, y, z) in zip(self.atoms, self.positions, strict=True):
lines.append(f"{sym:4s} {x:14.8f} {y:14.8f} {z:14.8f}")
return "\n".join(lines)
@property
def progress(self) -> float:
"""Fractional progress of the current restart: ``step / max_steps``.
Returns a float in ``[0.0, 1.0)`` useful for curriculum-style
objectives that change behavior over the course of a run.
"""
return self.step / max(self.max_steps, 1)
# ---------------------------------------------------------------------------
# Public type alias
# ---------------------------------------------------------------------------
ObjectiveType = (
dict[str, float]
| Callable[[dict[str, float]], float]
| Callable[[dict[str, float], "EvalContext"], float]
)
# ---------------------------------------------------------------------------
# OptimizationResult
# ---------------------------------------------------------------------------
[docs]
@dataclass
class OptimizationResult:
"""Return value of :meth:`StructureOptimizer.run`.
Wraps all per-restart results and exposes the best structure as a
first-class attribute. Behaves like a ``list[Structure]`` — indexing,
iteration, ``len``, and ``bool`` all work — so callers that only want
the best result can access it without changing existing code::
result = opt.run()
best = result.best # highest-scoring Structure
best = result[0] # same — index 0 is always the best
for s in result: # iterate all restarts, best first
print(s.metrics["H_total"])
Attributes
----------
all_structures:
All structures produced by each restart, sorted by objective value
(highest first). ``all_structures[0]`` is always the best.
objective_scores:
Scalar objective values corresponding to each entry in
``all_structures``.
n_restarts_attempted:
Number of restarts that were actually run (may be less than
``n_restarts`` when initial-structure generation fails).
method:
The optimization method used (``"annealing"``,
``"basin_hopping"``, or ``"parallel_tempering"``).
Notes
-----
**Parallel Tempering result count.** For ``method="parallel_tempering"``,
each restart contributes one entry for the global best *plus* one entry
for each replica whose final objective value differs from the global best.
The total ``len(result)`` therefore satisfies::
n_restarts <= len(result) <= n_restarts * (n_replicas + 1)
For ``method="annealing"`` and ``method="basin_hopping"``, each restart
contributes exactly one entry, so ``len(result) == n_restarts_attempted``.
Examples
--------
Single-structure usage (backward-compatible)::
result = opt.run()
result.best.to_xyz() # best structure
result[0].to_xyz() # same
All-restarts usage::
result = opt.run()
print(result.summary())
for rank, s in enumerate(result, 1):
print(f"rank {rank}: H_total={s.metrics['H_total']:.3f}")
"""
all_structures: list[Structure] = field(default_factory=list)
objective_scores: list[float] = field(default_factory=list)
n_restarts_attempted: int = 0
method: str = "annealing"
# ------------------------------------------------------------------ #
# list-compatible interface (best-first order) #
# ------------------------------------------------------------------ #
@property
def best(self) -> Structure:
"""The structure with the highest objective value."""
if not self.all_structures:
raise RuntimeError("OptimizationResult is empty — all restarts failed.")
return self.all_structures[0]
[docs]
def __len__(self) -> int:
return len(self.all_structures)
[docs]
def __iter__(self) -> Iterator[Structure]:
return iter(self.all_structures)
def __getitem__(self, index: int | slice) -> Structure | list[Structure]:
if isinstance(index, slice):
return self.all_structures[index]
return self.all_structures[index]
def __bool__(self) -> bool:
return bool(self.all_structures)
[docs]
def __repr__(self) -> str:
if not self.all_structures:
return f"OptimizationResult(empty, method={self.method!r})"
best_f = self.objective_scores[0] if self.objective_scores else float("nan")
return (
f"OptimizationResult("
f"restarts={self.n_restarts_attempted}, "
f"best_f={best_f:.4f}, "
f"method={self.method!r})"
)
# ------------------------------------------------------------------ #
# Metadata helpers #
# ------------------------------------------------------------------ #
[docs]
def summary(self) -> str:
"""Return a human-readable one-line summary of the optimization run.
Returns
-------
str
E.g. ``"restarts=5 best_f=1.2294 worst_f=0.7823 method='annealing'"``.
"""
if not self.objective_scores:
return f"restarts={self.n_restarts_attempted} no results method={self.method!r}"
return (
f"restarts={self.n_restarts_attempted}"
f" best_f={self.objective_scores[0]:.4f}"
f" worst_f={self.objective_scores[-1]:.4f}"
f" method={self.method!r}"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _pool_can_satisfy_parity(pool: list[str], n_atoms: int, charge: int, mult: int) -> bool:
"""Return ``True`` if *any* composition of *n_atoms* from *pool* can pass
:func:`~pasted._atoms.validate_charge_mult`.
The parity rule (derived from :func:`~pasted._atoms.validate_charge_mult`)
is::
n_electrons = sum(Z_i) - charge > 0
n_electrons % 2 == (mult - 1) % 2
The second condition simplifies to::
sum(Z_i) % 2 == (charge + mult - 1) % 2 [call this *target*]
The achievable parities of ``sum(Z_i)`` for *n_atoms* atoms drawn from
*pool* depend only on whether the pool contains both even-Z and odd-Z
elements:
* **mixed pool** — any parity is reachable → always satisfiable (modulo
the ``n_electrons > 0`` check below).
* **all-even pool** — ``sum(Z_i)`` is always even → only satisfiable when
``target == 0``.
* **all-odd pool** — ``sum(Z_i) % 2 == n_atoms % 2`` → only satisfiable
when ``n_atoms % 2 == target``.
The ``n_electrons > 0`` check uses the minimum possible sum,
``min(Z) × n_atoms``.
Parameters
----------
pool:
Unique element symbols in the optimizer's element pool.
n_atoms:
Number of atoms in each generated structure.
charge:
Total charge (integer, may be negative).
mult:
Spin multiplicity (positive integer).
Returns
-------
bool
``False`` when *no* composition of *n_atoms* atoms from *pool* can
ever pass the parity check; ``True`` otherwise.
"""
z_values = [ATOMIC_NUMBERS[e] for e in pool]
min_z = min(z_values)
# Condition 1: n_electrons = sum(Z) - charge > 0 for at least one composition.
# The minimum achievable sum is min_z * n_atoms.
if min_z * n_atoms <= charge:
return False
# Condition 2: parity of sum(Z_i) must equal target_parity for some composition.
target_parity = (charge + mult - 1) % 2
has_even = any(z % 2 == 0 for z in z_values)
has_odd = any(z % 2 == 1 for z in z_values)
if has_even and has_odd:
# Mixed pool — can hit any parity by adjusting the odd/even atom count.
return True
if has_even:
# All-even pool: sum is always even.
return target_parity == 0
# All-odd pool: sum parity == n_atoms % 2.
return (n_atoms % 2) == target_parity
[docs]
def parse_objective_spec(specs: list[str]) -> dict[str, float]:
"""Parse ``["METRIC:WEIGHT", ...]`` into a weight dict.
Parameters
----------
specs:
Each string must be of the form ``"METRIC:WEIGHT"``,
e.g. ``["H_atom:1.0", "Q6:-2.0"]``.
Returns
-------
dict[str, float]
Raises
------
ValueError
On malformed strings or unknown metric names.
"""
result: dict[str, float] = {}
for spec in specs:
parts = spec.split(":")
if len(parts) != 2:
raise ValueError(f"Expected 'METRIC:WEIGHT', got {spec!r}")
metric, weight_s = parts
if metric not in ALL_METRICS:
raise ValueError(f"Unknown metric {metric!r}. Valid: {sorted(ALL_METRICS)}")
result[metric] = float(weight_s)
return result
def _eval_objective(
metrics: dict[str, float],
objective: ObjectiveType,
ctx: EvalContext | None = None,
) -> float:
"""Evaluate the scalar objective from a metrics dict.
Dispatches based on the number of *required* positional parameters:
* **1 parameter** ``f(m)`` — legacy; ``m`` is the metrics dict.
* **2 parameters** ``f(m, ctx)`` — extended; ``ctx`` is an
:class:`EvalContext` carrying the full structure and optimizer state.
Parameters
----------
metrics:
Computed disorder metrics.
objective:
Dict, 1-arg callable, or 2-arg callable.
ctx:
Pre-built :class:`EvalContext`. Must be non-None when a 2-arg
callable is supplied; raises :class:`ValueError` otherwise.
"""
if not callable(objective):
return float(sum(w * metrics.get(k, 0.0) for k, w in objective.items()))
try:
sig = inspect.signature(objective)
n_required = sum(
1
for p in sig.parameters.values()
if p.default is inspect.Parameter.empty
and p.kind
not in (
inspect.Parameter.VAR_POSITIONAL,
inspect.Parameter.VAR_KEYWORD,
)
)
except (ValueError, TypeError):
n_required = 1 # fallback: legacy 1-arg
if n_required >= 2:
if ctx is None:
raise ValueError(
"A 2-argument objective callable was supplied but EvalContext "
"was not provided to _eval_objective. This is an internal error."
)
return float(objective(metrics, ctx)) # type: ignore[call-arg]
return float(objective(metrics)) # type: ignore[call-arg]
def _objective_needs_ctx(objective: ObjectiveType) -> bool:
"""Return ``True`` iff *objective* requires a second :class:`EvalContext` argument.
The check is performed once at optimizer construction time (cached on the
instance) so that :func:`inspect.signature` is not called on every MC step.
Parameters
----------
objective:
Dict, 1-arg callable, or 2-arg callable.
Returns
-------
bool
``True`` when the callable has two or more required positional
parameters; ``False`` for dicts and 1-arg callables.
"""
if not callable(objective):
return False
try:
sig = inspect.signature(objective)
n_required = sum(
1
for p in sig.parameters.values()
if p.default is inspect.Parameter.empty
and p.kind
not in (
inspect.Parameter.VAR_POSITIONAL,
inspect.Parameter.VAR_KEYWORD,
)
)
return n_required >= 2
except (ValueError, TypeError):
return False
def _make_ctx(
atoms: list[str],
positions: list[tuple[float, float, float]],
metrics: dict[str, float],
charge: int,
mult: int,
step: int,
max_steps: int,
temperature: float,
f_current: float,
best_f: float,
restart_idx: int,
n_restarts: int,
per_atom_q6: np.ndarray,
element_pool: list[str],
cutoff: float,
method: str,
T_start: float,
T_end: float,
seed: int | None,
replica_idx: int | None = None,
replica_temperature: float | None = None,
n_replicas: int | None = None,
) -> EvalContext:
"""Construct an :class:`EvalContext` from optimizer loop variables.
Parameters
----------
atoms:
Current atom element list.
positions:
Current Cartesian coordinates (list of 3-tuples, Å).
metrics:
Pre-computed disorder metrics for the candidate structure.
charge:
System charge.
mult:
Spin multiplicity.
step:
Current MC step index (0-based).
max_steps:
Total MC steps per restart.
temperature:
Current temperature.
f_current:
Objective value of the last accepted state.
best_f:
Best objective value seen so far in this restart.
restart_idx:
0-based restart index.
n_restarts:
Total number of restarts.
per_atom_q6:
Per-atom Q6 array from the previous accepted step.
element_pool:
Element pool list.
cutoff:
Distance cutoff (Å).
method:
Optimization method name.
T_start:
Starting temperature.
T_end:
Ending temperature.
seed:
Random seed or ``None``.
replica_idx:
PT replica index, or ``None`` for non-PT methods.
replica_temperature:
PT replica temperature, or ``None`` for non-PT methods.
n_replicas:
Total PT replica count, or ``None`` for non-PT methods.
Returns
-------
EvalContext
"""
return EvalContext(
atoms=tuple(atoms),
positions=tuple((float(p[0]), float(p[1]), float(p[2])) for p in positions),
charge=charge,
mult=mult,
n_atoms=len(atoms),
metrics=metrics,
step=step,
max_steps=max_steps,
temperature=temperature,
f_current=f_current,
best_f=best_f,
restart_idx=restart_idx,
n_restarts=n_restarts,
per_atom_q6=per_atom_q6,
element_pool=tuple(element_pool),
cutoff=cutoff,
method=method,
T_start=T_start,
T_end=T_end,
seed=seed,
replica_idx=replica_idx,
replica_temperature=replica_temperature,
n_replicas=n_replicas,
)
def _fragment_move(
positions: list[Vec3],
per_atom_q6: np.ndarray,
frag_threshold: float,
move_step: float,
rng: random.Random,
) -> list[Vec3]:
"""Displace atoms whose local Q6 exceeds *frag_threshold*.
Falls back to moving a single random atom when no atom exceeds the
threshold (structure is already maximally disordered).
"""
candidates = [i for i, q in enumerate(per_atom_q6) if q > frag_threshold]
if not candidates:
candidates = [rng.randrange(len(positions))]
new_pos = list(positions)
for i in candidates:
x, y, z = positions[i]
new_pos[i] = (
x + rng.uniform(-move_step, move_step),
y + rng.uniform(-move_step, move_step),
z + rng.uniform(-move_step, move_step),
)
return new_pos
def _composition_move(
atoms: list[str],
element_pool: list[str],
rng: random.Random,
charge: int = 0,
mult: int = 1,
) -> list[str]:
"""Propose a parity-preserving composition change.
Two move types are attempted in order:
1. **Pool replacement** (up to 20 tries): pick a random atom and replace
it with a *different* element drawn from *element_pool* that has the
same atomic-number parity as the atom being replaced. Because
``ΔZ = Z(new) - Z(old)`` is even, the total electron count parity is
preserved and charge/multiplicity validity is maintained.
2. **Two-atom replacement** (fallback): when Path 1 cannot find a valid
same-parity candidate (e.g. every pool element has the same symbol as
the chosen atom, or the pool contains only one parity class), replace
two atoms simultaneously so the net ``ΔZ_total`` is even.
In the last-resort case (pool with a single parity class *and* ``n == 1``)
an unchecked single replacement is returned; the caller's
``validate_charge_mult`` guard will reject it if the result is invalid.
The ``charge`` and ``mult`` parameters are used only to determine the
required electron-count parity (even or odd).
Parameters
----------
atoms:
Current element list. A copy is returned; the input is not mutated.
element_pool:
Elements available for replacement. Every returned atom is drawn
from this pool (or already present in *atoms* and unchanged).
rng:
Seeded random-number generator.
charge:
System charge (used to infer required parity).
mult:
Spin multiplicity 2S+1 (used to infer required parity).
Returns
-------
list[str]
New atom list with the same length as *atoms*, with at least one
element replaced by a symbol from *element_pool*.
"""
new_atoms = list(atoms)
n = len(new_atoms)
# ── Path 1: replace one atom with an element drawn from element_pool ─────
# To preserve parity (electron-count mod 2), pick a pool element whose
# atomic number has the same parity as the atom being replaced.
# If no same-parity pool element exists, fall through to Path 2.
for _ in range(20):
i = rng.randrange(n)
zi = ATOMIC_NUMBERS[new_atoms[i]]
same_parity = [
e for e in element_pool if ATOMIC_NUMBERS[e] % 2 == zi % 2 and e != new_atoms[i]
]
if same_parity:
new_atoms[i] = rng.choice(same_parity)
return new_atoms
# ── Path 2: parity-preserving two-atom replacement ───────────────────
# All atoms are the same element (swap path found no diverse pair).
# Replace atom i with X and atom j with Y such that
# ΔZ_total = (Z(X) - Z(atoms[i])) + (Z(Y) - Z(atoms[j]))
# is even, preserving the electron-count parity required by charge/mult.
#
# Required parity of total electrons:
# n_electrons = sum(Z) - charge
# For mult=1 (singlet), n_electrons must be even → sum(Z) parity = charge parity
i = rng.randrange(n)
zi = ATOMIC_NUMBERS[new_atoms[i]]
# Separate pool into elements whose Z has the same parity as zi (ΔZ even)
# vs different parity (ΔZ odd → need a compensating second replacement).
same_parity_pool = [e for e in element_pool if ATOMIC_NUMBERS[e] % 2 == zi % 2]
diff_parity_pool = [e for e in element_pool if ATOMIC_NUMBERS[e] % 2 != zi % 2]
if same_parity_pool:
# Replace one atom with same-parity element: ΔZ is even → parity kept.
new_atoms[i] = rng.choice(same_parity_pool)
return new_atoms
if diff_parity_pool and n >= 2:
# Replace two atoms with opposite-parity elements so ΔZ_total is even.
j = rng.choice([k for k in range(n) if k != i])
# Both from diff_parity_pool give ΔZ each odd → ΔZ_total even. ✓
# Both from diff_parity_pool gives ΔZ each odd → ΔZ_total even. ✓
new_atoms[i] = rng.choice(diff_parity_pool)
new_atoms[j] = rng.choice(diff_parity_pool)
return new_atoms
# ── Last-resort fallback ──────────────────────────────────────────────
# Pool has only one parity class and n == 1, or pool is empty.
# Replace one atom; caller's validate_charge_mult will reject if invalid.
new_atoms[i] = rng.choice(element_pool)
return new_atoms
def _sanitize_atoms_to_pool(
atoms: list[str],
element_pool: list[str],
rng: random.Random,
) -> list[str]:
"""Replace every atom not in *element_pool* with a pool element.
Called at the start of :meth:`StructureOptimizer._run_one` when the
caller supplies an *initial* structure whose composition is drawn from a
different element set than *element_pool*. Without this step the
Metropolis loop could spend many iterations slowly replacing foreign
atoms, or—if the objective value is higher with foreign atoms—retain them
permanently.
Replacements are chosen to preserve the electron-count parity of each
replaced position: a non-pool atom is replaced by a pool element whose
atomic number has the same Z mod 2. This keeps the total electron count
parity unchanged so the resulting structure passes ``validate_charge_mult``
with the same charge/mult that the caller supplied.
If the pool contains no element with the required parity (rare, e.g. a
pool of only odd-Z atoms and an even-Z replacement needed) any pool
element is used; ``validate_charge_mult`` in the Metropolis loop will
reject the step in that case.
Parameters
----------
atoms:
Input element list. Not mutated.
element_pool:
Elements allowed in the final structure.
rng:
Seeded random-number generator.
Returns
-------
list[str]
Copy of *atoms* with every symbol not in *element_pool* replaced by
a parity-compatible symbol from *element_pool*.
"""
pool_set = set(element_pool)
new_atoms = list(atoms)
for idx, sym in enumerate(new_atoms):
if sym not in pool_set:
zi = ATOMIC_NUMBERS[sym]
same_parity = [e for e in element_pool if ATOMIC_NUMBERS[e] % 2 == zi % 2]
new_atoms[idx] = rng.choice(same_parity if same_parity else element_pool)
return new_atoms
# ---------------------------------------------------------------------------
# StructureOptimizer
# ---------------------------------------------------------------------------
[docs]
class StructureOptimizer:
"""Optimize a single structure to maximize a disorder objective.
Parameters
----------
n_atoms:
Number of atoms.
charge:
Total system charge.
mult:
Spin multiplicity 2S+1.
objective:
Weight dict ``{"METRIC": weight, ...}`` or any callable.
The optimizer **maximizes** the returned scalar.
Two calling conventions are supported:
* **1-argument** ``f(m)`` — ``m`` is a ``dict[str, float]`` of
disorder metrics. Fully backward-compatible.
* **2-argument** ``f(m, ctx)`` — ``m`` is the same metrics dict;
``ctx`` is an :class:`EvalContext` that exposes:
- Structure: ``ctx.atoms``, ``ctx.positions``, ``ctx.charge``,
``ctx.mult``, ``ctx.n_atoms``, ``ctx.to_xyz()``
- Optimizer state: ``ctx.step``, ``ctx.temperature``,
``ctx.f_current``, ``ctx.best_f``, ``ctx.progress``,
``ctx.per_atom_q6``, ``ctx.restart_idx``
- Configuration: ``ctx.element_pool``, ``ctx.cutoff``,
``ctx.method``, ``ctx.T_start``, ``ctx.T_end``, ``ctx.seed``
- PT-only (``None`` for other methods): ``ctx.replica_idx``,
``ctx.replica_temperature``, ``ctx.n_replicas``
Dispatch is based on the number of *required* positional parameters
via :func:`inspect.signature`. A callable with a default for the
second argument (``lambda m, ctx=None:``) is treated as 1-argument.
:class:`EvalContext` construction is skipped entirely for 1-argument
and dict objectives — no overhead for existing code.
elements:
Element pool — spec string (``"6,7,8"``), list of symbols, or
``None`` for all Z = 1–106. When a list is given, duplicate
symbols are silently removed while preserving insertion order
(e.g. ``['C', 'H', 'H', 'H', 'H']`` is treated as ``['C', 'H']``).
To bias sampling toward a particular element use
``element_fractions`` in :class:`StructureGenerator` instead.
method:
``"annealing"`` (default), ``"basin_hopping"``, or
``"parallel_tempering"``.
max_steps:
Number of MC steps per restart (or per replica per restart for
``"parallel_tempering"``; default: 5000).
T_start:
Initial temperature (default: 1.0). For
``"parallel_tempering"`` this is the *highest* replica
temperature.
T_end:
Final temperature for SA (default: 0.01). For
``"parallel_tempering"`` this is the *lowest* replica
temperature (the coldest, most selective replica).
BH uses *T_start* throughout.
n_replicas:
Number of temperature replicas for ``"parallel_tempering"``
(default: 4). Ignored for other methods. Temperatures are
spaced geometrically between *T_end* and *T_start*.
pt_swap_interval:
Attempt a replica-exchange swap every this many MC steps
(default: 10). Ignored for other methods.
allow_displacements:
When ``True`` (default), **fragment moves** — small random
displacements of one or more atoms — are included in the MC step
pool as an independent move type.
When ``False``, fragment moves are excluded; coordinates are only
modified by affine moves (if *allow_affine_moves* is ``True``).
If the *initial* structure passed to :meth:`run` contains atoms
whose symbols are not in *elements*, those atoms are automatically
replaced with parity-compatible pool elements before the MC loop
begins. This sanitization applies to all three methods (SA, BH,
and PT); see :func:`_sanitize_atoms_to_pool`.
Cannot be ``False`` simultaneously with *allow_composition_moves*
**and** *allow_affine_moves* (at least one move type must be
enabled).
allow_composition_moves:
When ``True`` (default), **composition moves** — replacing a random
atom with a different element drawn from *elements* while preserving
charge/multiplicity parity — are included in the MC step pool as an
independent move type.
When ``False``, element types are held fixed throughout the run.
Cannot be ``False`` simultaneously with *allow_displacements*
**and** *allow_affine_moves* (at least one move type must be
enabled).
allow_affine_moves:
When ``True``, **affine moves** — a random stretch, compress, or
shear applied to the entire structure, followed by a small per-atom
jitter — are included in the MC step pool as an independent move
type alongside (not as a subset of) displacement and composition
moves. Affine moves allow the optimizer to explore elongated or
compressed configurations that fragment moves cannot reach
efficiently. When *allow_displacements* is ``False``, affine moves
are the only way positions change; the distance-constraint relaxation
is **not** applied after affine moves (consistent with the
*allow_displacements=False* semantics).
Default: ``False`` (backward-compatible).
Cannot be ``False`` simultaneously with *allow_displacements*
**and** *allow_composition_moves* (at least one move type must be
enabled).
affine_strength:
Global dimensionless scale of the affine transform (default: 0.1).
At 0.1 the structure is stretched / compressed by up to ±10 % along
a random axis and sheared by up to ±5 %. Practical range: 0.02–0.4.
Has no effect when *allow_affine_moves* is ``False``. Use
*affine_stretch*, *affine_shear*, and *affine_jitter* to override
individual operation strengths independently.
affine_stretch:
Strength of the stretch/compress operation only ∈ (0, 1). When
``None`` (default) *affine_strength* is used. Set to ``0.0`` to
disable stretching while keeping shear and jitter active.
Has no effect when *allow_affine_moves* is ``False``.
affine_shear:
Strength of the shear operation only ∈ (0, 1). When ``None``
(default) *affine_strength* is used. Set to ``0.0`` to disable
shearing while keeping stretch and jitter active.
Has no effect when *allow_affine_moves* is ``False``.
affine_jitter:
Per-atom jitter scale ∈ (0, 1) relative to *move_step*. When
``None`` (default) *affine_strength* is used. Set to ``0.0`` to
disable per-atom jitter in affine moves.
Has no effect when *allow_affine_moves* is ``False``.
frag_threshold:
Local Q6 threshold for fragment selection (default: 0.3).
Atoms with local Q6 > threshold are preferentially displaced.
move_step:
Maximum displacement magnitude per coordinate step (Å, default: 0.5).
Also used as the per-atom jitter scale in affine moves (× 0.25).
lcc_threshold:
Minimum ``graph_lcc`` required to accept a step (default: 0.0,
i.e. no connectivity constraint). Set to 0.8 to enforce that at
least 80 % of atoms remain connected.
cov_scale:
Minimum distance scale factor for :func:`relax_positions`.
relax_cycles:
Max repulsion-relaxation cycles per step. Basin-Hopping uses
3× this value for its local-minimisation step.
cutoff:
Distance cutoff (Å) for Steinhardt / graph metrics. Auto-computed
from the element pool when ``None``.
n_bins:
Histogram bins for ``H_spatial`` / ``RDF_dev`` (default: 20).
w_atom:
Weight of ``H_atom`` in ``H_total`` (default: 0.5).
w_spatial:
Weight of ``H_spatial`` in ``H_total`` (default: 0.5).
n_restarts:
Independent optimization runs (default: 1). The best result
across all restarts is returned.
max_init_attempts:
Maximum number of single-sample tries that :meth:`_make_initial`
makes per restart when generating the starting structure
(default: ``0`` = unlimited).
* ``0`` — unlimited retries (recommended for production runs with
large or constrained element pools). Safe because
:meth:`__init__` validates at construction time that the element
pool can satisfy the charge/multiplicity parity constraint; if
that check passes, a valid structure is guaranteed to be found
eventually.
* ``> 0`` — at most *max_init_attempts* tries per restart. If
exhausted the restart is skipped and a :class:`UserWarning` is
emitted. Useful as a time-budget guard in automated pipelines.
.. note::
:meth:`__init__` raises :class:`ValueError` immediately when
the element pool is *structurally* incompatible with
``charge``/``mult`` (e.g. an all-nitrogen pool with
``charge=0, mult=1``), making an infinite loop impossible for
well-formed inputs.
seed:
Random seed (``None`` → non-deterministic).
verbose:
Print per-step progress to stderr (default: ``False``).
Examples
--------
Class API::
from pasted import StructureOptimizer
opt = StructureOptimizer(
n_atoms=50,
charge=0, mult=1,
elements="24,25,26,27,28", # Cantor alloy
objective={"H_atom": 1.0, "H_spatial": 1.0, "Q6": -2.0},
method="annealing",
max_steps=5000,
lcc_threshold=0.8,
seed=42,
)
best = opt.run()
Callable objective::
opt = StructureOptimizer(
...,
objective=lambda m: m["H_spatial"] - 2.0 * m["Q6"],
)
"""
def __init__(
self,
*,
n_atoms: int,
charge: int,
mult: int,
objective: ObjectiveType,
elements: str | list[str] | None = None,
method: str = "annealing",
max_steps: int = 5000,
T_start: float = 1.0,
T_end: float = 0.01,
frag_threshold: float = 0.3,
move_step: float = 0.5,
allow_composition_moves: bool = True,
allow_displacements: bool = True,
allow_affine_moves: bool = False,
affine_strength: float = 0.1,
affine_stretch: float | None = None,
affine_shear: float | None = None,
affine_jitter: float | None = None,
lcc_threshold: float = 0.0,
cov_scale: float = 1.0,
relax_cycles: int = 1500,
cutoff: float | None = None,
n_bins: int = 20,
w_atom: float = 0.5,
w_spatial: float = 0.5,
n_restarts: int = 1,
n_replicas: int = 4,
pt_swap_interval: int = 10,
max_init_attempts: int = 0,
seed: int | None = None,
verbose: bool = False,
) -> None:
if method not in ("annealing", "basin_hopping", "parallel_tempering"):
raise ValueError(
f"method must be 'annealing', 'basin_hopping', or "
f"'parallel_tempering', got {method!r}"
)
if not allow_displacements and not allow_composition_moves and not allow_affine_moves:
raise ValueError(
"At least one move type must be enabled: "
"allow_displacements, allow_composition_moves, and allow_affine_moves "
"cannot all be False."
)
self.n_atoms = n_atoms
self.charge = charge
self.mult = mult
self.objective = objective
self.method = method
self.max_steps = max_steps
self.T_start = T_start
self.T_end = T_end
self.frag_threshold = frag_threshold
self.move_step = move_step
self.allow_composition_moves = allow_composition_moves
self.allow_displacements = allow_displacements
self.allow_affine_moves = allow_affine_moves
self.affine_strength = affine_strength
self.affine_stretch = affine_stretch
self.affine_shear = affine_shear
self.affine_jitter = affine_jitter
self.lcc_threshold = lcc_threshold
self.cov_scale = cov_scale
self.relax_cycles = relax_cycles
self.n_bins = n_bins
self.w_atom = w_atom
self.w_spatial = w_spatial
self.n_restarts = n_restarts
self.n_replicas = n_replicas
self.pt_swap_interval = pt_swap_interval
self.max_init_attempts = max_init_attempts
self.seed = seed
self.verbose = verbose
# Arity cache — inspect.signature is called once here so that the hot
# MC loop can skip it on every step.
self._needs_ctx: bool = _objective_needs_ctx(self.objective)
# Element pool
# When *elements* is a list, callers sometimes pass repeated symbols
# (e.g. ['C', 'H', 'H', 'H', 'H']) intending to describe a fixed
# stoichiometry rather than a biased sampling pool. Storing duplicates
# would cause rng.choice(self._element_pool) to sample H with 4x the
# probability of C, which is both surprising and incorrect when
# allow_composition_moves=False. Deduplicate while preserving
# insertion order so the pool contains exactly the unique element types.
if elements is None:
self._element_pool: list[str] = default_element_pool()
elif isinstance(elements, str):
self._element_pool = parse_element_spec(elements)
else:
# dict.fromkeys preserves insertion order and removes duplicates
self._element_pool = list(dict.fromkeys(elements))
# Early parity validation — catch impossible element pools before run().
# This makes max_init_attempts=0 (unlimited) safe: if this check passes,
# _make_initial is guaranteed to eventually find a valid structure.
if not _pool_can_satisfy_parity(self._element_pool, self.n_atoms, self.charge, self.mult):
even_odd = (
"all even-Z"
if all(ATOMIC_NUMBERS[e] % 2 == 0 for e in self._element_pool)
else "all odd-Z"
)
raise ValueError(
f"Element pool {self._element_pool!r} ({even_odd}) cannot produce "
f"any composition of {self.n_atoms} atoms that satisfies "
f"charge={self.charge:+d}, mult={self.mult}. "
f"Add at least one element with a different atomic-number parity, "
f"or adjust n_atoms / charge / mult."
)
# Cutoff
self._cutoff: float = self._resolve_cutoff(cutoff)
# ------------------------------------------------------------------ #
# Internal helpers #
# ------------------------------------------------------------------ #
def _log(self, msg: str) -> None:
if self.verbose:
print(msg, file=sys.stderr)
def _resolve_cutoff(self, override: float | None) -> float:
if override is not None:
if self.verbose:
self._log(f"[cutoff] {override:.3f} Å (user-specified)")
return override
radii = np.array([_cov_radius_ang(s) for s in self._element_pool])
# O(N) approximation: median(r_i + r_j) ≈ 2 × median(r_i).
median_sum = float(np.median(radii)) * 2.0
cutoff = self.cov_scale * 1.5 * median_sum
if self.verbose:
self._log(
f"[cutoff] {cutoff:.3f} Å (auto: cov_scale={self.cov_scale} × 1.5 × "
f"median(r_i+r_j)≈{median_sum:.3f} Å)"
)
return cutoff
def _auto_region(self) -> str:
"""Estimate a sphere radius that gives approximately bulk density."""
v_per_atom = 4 / 3 * math.pi * 1.3**3 # ų, mean atomic radius ~1.3 Å
r = (3 * self.n_atoms * v_per_atom / (4 * math.pi * 0.74)) ** (1 / 3)
return f"sphere:{r * 1.2:.1f}" # 20 % margin
def _make_initial(self, rng: random.Random) -> Structure | None:
"""Generate a valid initial structure using StructureGenerator.
Retries until a parity-valid structure is produced. The number of
attempts is controlled by :attr:`max_init_attempts`:
* ``0`` (default) — unlimited retries. Safe because
:meth:`__init__` already verified that the element pool can satisfy
the charge/multiplicity parity constraint.
* ``> 0`` — at most *max_init_attempts* tries; returns ``None`` on
exhaustion (caller logs and skips the restart).
Warnings from the internal single-sample generation attempts are
suppressed because this method manages its own retry loop. A
caller-visible :class:`UserWarning` is emitted by :meth:`run` only
when a restart *cannot be started at all* (i.e., this method returns
``None``), which is the actionable signal for the end user.
"""
region = self._auto_region()
attempts = (
itertools.count() if self.max_init_attempts == 0 else range(self.max_init_attempts)
)
for _ in attempts:
seed = rng.randint(0, 2**31)
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
structs = StructureGenerator(
n_atoms=self.n_atoms,
charge=self.charge,
mult=self.mult,
mode="gas",
region=region,
elements=self._element_pool,
cov_scale=self.cov_scale,
relax_cycles=self.relax_cycles,
cutoff=self._cutoff,
n_bins=self.n_bins,
w_atom=self.w_atom,
w_spatial=self.w_spatial,
n_samples=1,
seed=seed,
).generate()
if structs:
return structs.structures[0]
return None
def _temperature(self, step: int) -> float:
"""Return temperature at *step*. BH and PT use constant T_start."""
if self.method in ("basin_hopping", "parallel_tempering"):
return self.T_start
if self.T_end <= 0 or self.T_start <= 0:
return self.T_start
ratio = self.T_end / self.T_start
return float(self.T_start * ratio ** (step / max(self.max_steps - 1, 1)))
def _relax_cycles_for_method(self) -> int:
"""BH uses 3× relax cycles as local minimisation."""
return self.relax_cycles * 3 if self.method == "basin_hopping" else self.relax_cycles
def _pt_temperatures(self) -> list[float]:
"""Geometric temperature ladder from T_end (coldest) to T_start (hottest)."""
n = max(2, self.n_replicas)
if n == 1:
return [self.T_start]
t_lo = max(self.T_end, 1e-6)
t_hi = max(self.T_start, t_lo * 1.01)
ratio = (t_hi / t_lo) ** (1.0 / (n - 1))
return [round(t_lo * ratio**k, 8) for k in range(n)] # cold→hot
# ------------------------------------------------------------------ #
# Parallel Tempering #
# ------------------------------------------------------------------ #
def _run_parallel_tempering(
self, initial: Structure | None, restart_idx: int
) -> list[tuple[float, Structure]]:
"""Run one Parallel Tempering restart; return (score, structure) per replica.
Algorithm
---------
1. Build N replicas at temperatures T_0 < T_1 < ... < T_{N-1}
(geometric ladder from T_end to T_start).
2. Every step: each replica independently proposes a move and
applies Metropolis acceptance at its own temperature.
3. Every ``pt_swap_interval`` steps: for each adjacent pair (k, k+1)
attempt a replica exchange with the Metropolis criterion::
ΔE = (β_k − β_{k+1}) × (f_{k+1} − f_k)
where β = 1/T and f is the objective value (higher is better).
Accept with probability min(1, exp(ΔE)).
4. Track the global best across all replicas and all steps.
Initialization
--------------
* ``initial`` provided — all replicas start from the same structure.
If ``allow_composition_moves=True`` and the initial structure
contains atoms whose symbols are outside *element_pool*, each
replica's atom list is independently sanitized via
:func:`_sanitize_atoms_to_pool` before the MC loop begins
(fix: Bug #6 — mirrors the same fix in :meth:`_run_one`).
* ``initial=None``, ``allow_composition_moves=True`` — each replica
gets an independent random structure (composition and positions).
* ``initial=None``, ``allow_composition_moves=False`` — a single
shared composition is generated once via ``_make_initial`` and all
replicas inherit that composition. Positions are independently
randomized per replica so they still start from different points in
configuration space. This ensures that the fixed-composition
invariant holds from step zero across all replicas.
Returns all replica final states sorted best-first so that
``run()`` can incorporate them into ``OptimizationResult``.
Return value
------------
A list of ``(score, Structure)`` tuples containing:
1. The global best (tracked across all replicas and all steps) as
the first entry.
2. Each replica's final state, **if** its final objective value
differs from the global best by more than ``1e-10`` or its atom
list differs. Replicas that converged to the global best are
deduplicated and omitted.
The total count therefore satisfies::
1 <= len(result) <= n_replicas + 1
and across ``n_restarts`` calls to this method the aggregated
``OptimizationResult`` will contain::
n_restarts <= len(opt_result) <= n_restarts * (n_replicas + 1)
"""
rng = random.Random(None if self.seed is None else self.seed + restart_idx * 97)
temps = self._pt_temperatures()
n_rep = len(temps)
rc = self.relax_cycles
# ── Initialise one state per replica ────────────────────────────
# Replica 0 = coldest (T_end), Replica N-1 = hottest (T_start).
#
# When allow_composition_moves=False and no initial structure is
# provided we must generate a single shared composition here and
# reuse it across all replicas. Generating an independent random
# composition per replica would break the invariant that composition
# is fixed throughout the run: replica k would start with a
# different element assignment than replica k+1, and the cold
# replica would never see the composition that was intended.
#
# When allow_composition_moves=True replicas may diverge in
# composition during the run anyway, so independent starts are fine.
states_atoms: list[list[str]] = []
states_positions: list[list[Vec3]] = []
states_metrics: list[dict[str, float]] = []
states_f: list[float] = []
states_q6: list[np.ndarray] = []
# If composition moves are disabled and no initial structure was
# supplied, generate one shared initial structure whose composition
# will be inherited by all replicas (positions are independently
# randomised per-replica so they still start from different points).
_shared_initial: Structure | None = None
if initial is None and not self.allow_composition_moves:
_shared_initial = self._make_initial(rng)
# Pre-check whether the caller-supplied initial structure contains
# atoms outside the element pool. If so, each replica's atom list
# must be sanitized before the MC loop begins — same fix as in
# _run_one (Bug #4), extended to the PT path (Bug #6).
_initial_needs_sanitize = (
initial is not None
and self.allow_composition_moves
and not all(a in set(self._element_pool) for a in initial.atoms)
)
for k in range(n_rep):
if initial is not None:
a = list(initial.atoms)
p = list(initial.positions)
# Sanitize foreign atoms once per replica using replica-specific
# RNG so that each replica's composition is independently
# randomized while still being confined to the pool.
if _initial_needs_sanitize:
rng_san = random.Random(
None if self.seed is None else self.seed + restart_idx * 97 + k * 13 + 7
)
a = _sanitize_atoms_to_pool(a, self._element_pool, rng_san)
elif _shared_initial is not None:
# Composition fixed: reuse atom types from shared initial,
# but place atoms independently so replicas explore different
# regions of configuration space.
a = list(_shared_initial.atoms)
rng_k = random.Random(
None if self.seed is None else self.seed + restart_idx * 97 + k * 13
)
_, p_raw = place_gas(a, self._auto_region(), rng_k)
p_list, _ = relax_positions(a, p_raw, self.cov_scale, rc)
p = list(p_list)
else:
# Composition moves enabled: each replica gets an independent
# random structure so they collectively cover more of the
# composition×geometry space from the start.
rng_k = random.Random(
None if self.seed is None else self.seed + restart_idx * 97 + k * 13
)
a_raw = [rng.choice(self._element_pool) for _ in range(self.n_atoms)]
_, p_raw = place_gas(a_raw, self._auto_region(), rng_k)
p_list, _ = relax_positions(a_raw, p_raw, self.cov_scale, rc)
a = a_raw
p = list(p_list)
m = compute_all_metrics(
a,
p,
self.n_bins,
self.w_atom,
self.w_spatial,
self._cutoff,
self.cov_scale,
)
pts = np.array(p)
q6: np.ndarray = compute_steinhardt_per_atom(pts, [6], self._cutoff)["Q6"]
ctx = (
_make_ctx(
a,
p,
m,
self.charge,
self.mult,
step=0,
max_steps=self.max_steps,
temperature=temps[k],
f_current=0.0,
best_f=0.0,
restart_idx=restart_idx,
n_restarts=self.n_restarts,
per_atom_q6=q6,
element_pool=self._element_pool,
cutoff=self._cutoff,
method=self.method,
T_start=self.T_start,
T_end=self.T_end,
seed=self.seed,
replica_idx=k,
replica_temperature=temps[k],
n_replicas=n_rep,
)
if self._needs_ctx
else None
)
f = _eval_objective(m, self.objective, ctx=ctx)
states_atoms.append(a)
states_positions.append(p)
states_metrics.append(m)
states_f.append(f)
states_q6.append(q6)
best_atoms = list(states_atoms[0])
best_positions = list(states_positions[0])
best_metrics = dict(states_metrics[0])
best_f = states_f[0]
for k in range(n_rep):
if states_f[k] > best_f:
best_f = states_f[k]
best_atoms = list(states_atoms[k])
best_positions = list(states_positions[k])
best_metrics = dict(states_metrics[k])
# Exchange-attempt counts for diagnostics
n_swap_attempted = 0
n_swap_accepted = 0
# Precompute radii per replica (updated on composition moves)
replicas_radii: list[np.ndarray] = [
np.array([_cov_radius_ang(a) for a in atoms], dtype=float) for atoms in states_atoms
]
seed_int = -1 if self.seed is None else int(self.seed + restart_idx * 97)
log_interval = max(1, self.max_steps // 20)
width = len(str(self.max_steps))
# Pre-build the ordered list of enabled move types so that each
# iteration can sample uniformly (1/N) without re-evaluating flags.
_move_pool: list[str] = []
if self.allow_displacements:
_move_pool.append("displacement")
if self.allow_affine_moves:
_move_pool.append("affine")
if self.allow_composition_moves:
_move_pool.append("composition")
for step in range(self.max_steps):
# ── Each replica: one Metropolis step ────────────────────────
for k in range(n_rep):
T_k = temps[k]
atoms = states_atoms[k]
positions = states_positions[k]
f_k = states_f[k]
q6_k = states_q6[k]
radii_k = replicas_radii[k]
# Propose move — sample uniformly from enabled move types
_move = rng.choice(_move_pool)
if _move == "displacement":
new_positions = _fragment_move(
positions, q6_k, self.frag_threshold, self.move_step, rng
)
new_atoms = list(atoms)
elif _move == "affine":
new_positions = _affine_move(
positions,
self.move_step,
self.affine_strength,
rng,
affine_stretch=self.affine_stretch,
affine_shear=self.affine_shear,
affine_jitter=self.affine_jitter,
)
new_atoms = list(atoms)
else: # composition
new_atoms = _composition_move(
atoms, self._element_pool, rng, self.charge, self.mult
)
new_positions = list(positions)
# Relax — skip when allow_displacements=False (positions must stay fixed)
radii_new = radii_k
if self.allow_displacements:
if HAS_RELAX:
if new_atoms != atoms:
radii_new = np.array(
[_cov_radius_ang(a) for a in new_atoms], dtype=float
)
pts_arr = np.array(new_positions, dtype=float)
pts_arr, _ = _cpp_relax_positions(
pts_arr, radii_new, self.cov_scale, rc, seed_int
)
new_positions = [tuple(row) for row in pts_arr]
else:
new_positions, _ = relax_positions(
new_atoms, new_positions, self.cov_scale, rc
)
if new_atoms != atoms:
radii_new = np.array(
[_cov_radius_ang(a) for a in new_atoms], dtype=float
)
# msg (second element) is intentionally discarded here — on the
# ok=True path validate_charge_mult returns "" to avoid the
# Counter + f-string overhead in this hot loop.
ok_parity, _ = validate_charge_mult(new_atoms, self.charge, self.mult)
if not ok_parity:
continue
new_metrics = compute_all_metrics(
new_atoms,
new_positions,
self.n_bins,
self.w_atom,
self.w_spatial,
self._cutoff,
self.cov_scale,
)
ctx = (
_make_ctx(
new_atoms,
new_positions,
new_metrics,
self.charge,
self.mult,
step=step,
max_steps=self.max_steps,
temperature=T_k,
f_current=f_k,
best_f=best_f,
restart_idx=restart_idx,
n_restarts=self.n_restarts,
per_atom_q6=states_q6[k],
element_pool=self._element_pool,
cutoff=self._cutoff,
method=self.method,
T_start=self.T_start,
T_end=self.T_end,
seed=self.seed,
replica_idx=k,
replica_temperature=temps[k],
n_replicas=n_rep,
)
if self._needs_ctx
else None
)
f_new = _eval_objective(new_metrics, self.objective, ctx=ctx)
if new_metrics.get("graph_lcc", 0.0) < self.lcc_threshold:
continue
delta = f_new - f_k
accept = delta >= 0 or (T_k > 1e-12 and rng.random() < math.exp(delta / T_k))
if accept:
states_atoms[k] = new_atoms
states_positions[k] = new_positions
states_metrics[k] = new_metrics
states_f[k] = f_new
replicas_radii[k] = radii_new
new_pts = np.array(new_positions)
states_q6[k] = compute_steinhardt_per_atom(new_pts, [6], self._cutoff)["Q6"]
if f_new > best_f:
best_f = f_new
best_atoms = list(new_atoms)
best_positions = list(new_positions)
best_metrics = dict(new_metrics)
# ── Replica-exchange swaps every pt_swap_interval steps ───────
if (step + 1) % self.pt_swap_interval == 0:
# Attempt swaps between adjacent replica pairs in random order
pairs = list(range(n_rep - 1))
rng.shuffle(pairs)
for k in pairs:
f_k = states_f[k]
f_k1 = states_f[k + 1]
T_k = temps[k]
T_k1 = temps[k + 1]
# Metropolis criterion for maximization:
# ΔE = (β_k − β_{k+1}) × (f_{k+1} − f_k)
# β = 1/T, higher T = more permissive
beta_k = 1.0 / T_k if T_k > 1e-12 else 1e12
beta_k1 = 1.0 / T_k1 if T_k1 > 1e-12 else 1e12
delta_swap = (beta_k - beta_k1) * (f_k1 - f_k)
n_swap_attempted += 1
if delta_swap >= 0 or rng.random() < math.exp(delta_swap):
# Exchange states k ↔ k+1
(
states_atoms[k],
states_atoms[k + 1],
states_positions[k],
states_positions[k + 1],
states_metrics[k],
states_metrics[k + 1],
states_f[k],
states_f[k + 1],
states_q6[k],
states_q6[k + 1],
replicas_radii[k],
replicas_radii[k + 1],
) = (
states_atoms[k + 1],
states_atoms[k],
states_positions[k + 1],
states_positions[k],
states_metrics[k + 1],
states_metrics[k],
states_f[k + 1],
states_f[k],
states_q6[k + 1],
states_q6[k],
replicas_radii[k + 1],
replicas_radii[k],
)
n_swap_accepted += 1
# ── Logging ──────────────────────────────────────────────────
if self.verbose and (step + 1) % log_interval == 0:
swap_rate = n_swap_accepted / n_swap_attempted if n_swap_attempted > 0 else 0.0
self._log(
f"[restart={restart_idx + 1} step={step + 1:>{width}}/{self.max_steps}] "
f"best_f={best_f:.4f} "
f"replica_f=[{', '.join(f'{f:.3f}' for f in states_f)}] "
f"T=[{', '.join(f'{t:.3f}' for t in temps)}] "
f"swap_rate={swap_rate:.2f}"
)
if self.verbose:
swap_rate = n_swap_accepted / max(n_swap_attempted, 1)
self._log(
f"[PT restart={restart_idx + 1}] best_f={best_f:.4f} "
f"swap_accept_rate={swap_rate:.3f} "
f"({n_swap_accepted}/{n_swap_attempted} swaps)"
)
# Return (score, structure) for each replica's final state + global best
all_results: list[tuple[float, Structure]] = []
# Global best first
all_results.append(
(
best_f,
Structure(
atoms=best_atoms,
positions=best_positions,
charge=self.charge,
mult=self.mult,
metrics=best_metrics,
mode="opt_parallel_tempering",
sample_index=restart_idx + 1,
seed=self.seed,
),
)
)
# Add each replica's final state (if not identical to best)
for k in range(n_rep):
f_k = states_f[k]
if abs(f_k - best_f) > 1e-10 or states_atoms[k] != best_atoms:
all_results.append(
(
f_k,
Structure(
atoms=list(states_atoms[k]),
positions=list(states_positions[k]),
charge=self.charge,
mult=self.mult,
metrics=dict(states_metrics[k]),
mode=f"opt_parallel_tempering_T{temps[k]:.4f}",
sample_index=restart_idx * n_rep + k + 1,
seed=self.seed,
),
)
)
return all_results
# ------------------------------------------------------------------ #
# Single restart #
# ------------------------------------------------------------------ #
def _run_one(self, initial: Structure, restart_idx: int) -> tuple[float, Structure]:
rng = random.Random(None if self.seed is None else self.seed + restart_idx * 97)
atoms: list[str] = list(initial.atoms)
positions: list[Vec3] = list(initial.positions)
# When composition moves are enabled and the caller supplied an initial
# structure whose atoms include symbols outside the element pool, replace
# every foreign symbol with a parity-compatible pool element before the
# MC loop begins. Without this step the optimizer could retain foreign
# atoms indefinitely whenever their objective value is locally optimal.
# (fix: Bug #4 — composition-only optimization retains non-pool atoms)
if self.allow_composition_moves:
pool_set = set(self._element_pool)
if not all(a in pool_set for a in atoms):
atoms = _sanitize_atoms_to_pool(atoms, self._element_pool, rng)
# Pre-compute radii and seed_int once; reused every step.
radii = np.array([_cov_radius_ang(a) for a in atoms], dtype=float)
seed_int: int = -1 if self.seed is None else int(self.seed + restart_idx * 97)
# Initial evaluation
pts = np.array(positions)
metrics = compute_all_metrics(
atoms,
positions,
self.n_bins,
self.w_atom,
self.w_spatial,
self._cutoff,
self.cov_scale,
)
per_atom_q6: np.ndarray = compute_steinhardt_per_atom(pts, [6], self._cutoff)["Q6"]
ctx0 = (
_make_ctx(
atoms,
positions,
metrics,
self.charge,
self.mult,
step=0,
max_steps=self.max_steps,
temperature=self._temperature(0),
f_current=0.0,
best_f=0.0,
restart_idx=restart_idx,
n_restarts=self.n_restarts,
per_atom_q6=per_atom_q6,
element_pool=self._element_pool,
cutoff=self._cutoff,
method=self.method,
T_start=self.T_start,
T_end=self.T_end,
seed=self.seed,
)
if self._needs_ctx
else None
)
f_current = _eval_objective(metrics, self.objective, ctx=ctx0)
best_atoms = list(atoms)
best_positions = list(positions)
best_f = f_current
best_metrics = dict(metrics)
rc = self._relax_cycles_for_method()
log_interval = max(1, self.max_steps // 20)
width = len(str(self.max_steps))
# Pre-build the ordered list of enabled move types so that each
# iteration can sample uniformly (1/N) without re-evaluating flags.
_move_pool: list[str] = []
if self.allow_displacements:
_move_pool.append("displacement")
if self.allow_affine_moves:
_move_pool.append("affine")
if self.allow_composition_moves:
_move_pool.append("composition")
for step in range(self.max_steps):
T = self._temperature(step)
# ── Move — sample uniformly from enabled move types ───────────
_move = rng.choice(_move_pool)
if _move == "displacement":
new_positions = _fragment_move(
positions, per_atom_q6, self.frag_threshold, self.move_step, rng
)
new_atoms = list(atoms)
elif _move == "affine":
new_positions = _affine_move(
positions,
self.move_step,
self.affine_strength,
rng,
affine_stretch=self.affine_stretch,
affine_shear=self.affine_shear,
affine_jitter=self.affine_jitter,
)
new_atoms = list(atoms)
else: # composition
new_atoms = _composition_move(
atoms, self._element_pool, rng, self.charge, self.mult
)
new_positions = list(positions)
# ── Relax (distance constraint) ───────────────────────────────
# Skip relax when allow_displacements=False: the caller expects
# positions to be exactly preserved after composition-only moves.
if self.allow_displacements:
if HAS_RELAX:
# radii may need updating if atom types changed (composition move)
if new_atoms != atoms:
new_radii = np.array([_cov_radius_ang(a) for a in new_atoms], dtype=float)
else:
new_radii = radii
new_pts_arr = np.array(new_positions, dtype=float)
new_pts_arr, _ = _cpp_relax_positions(
new_pts_arr, new_radii, self.cov_scale, rc, seed_int
)
new_positions = [tuple(row) for row in new_pts_arr]
else:
new_positions, _ = relax_positions(
new_atoms, new_positions, self.cov_scale, rc
)
# ── Charge/mult validity ──────────────────────────────────────
# msg (second element) is intentionally discarded here — on the
# ok=True path validate_charge_mult returns "" to avoid the
# Counter + f-string overhead in this hot loop.
ok, _ = validate_charge_mult(new_atoms, self.charge, self.mult)
if not ok:
continue
# ── Evaluate ─────────────────────────────────────────────────
new_metrics = compute_all_metrics(
new_atoms,
new_positions,
self.n_bins,
self.w_atom,
self.w_spatial,
self._cutoff,
self.cov_scale,
)
ctx = (
_make_ctx(
new_atoms,
new_positions,
new_metrics,
self.charge,
self.mult,
step=step,
max_steps=self.max_steps,
temperature=T,
f_current=f_current,
best_f=best_f,
restart_idx=restart_idx,
n_restarts=self.n_restarts,
per_atom_q6=per_atom_q6,
element_pool=self._element_pool,
cutoff=self._cutoff,
method=self.method,
T_start=self.T_start,
T_end=self.T_end,
seed=self.seed,
)
if self._needs_ctx
else None
)
f_new = _eval_objective(new_metrics, self.objective, ctx=ctx)
# ── Hard connectivity constraint ──────────────────────────────
if new_metrics.get("graph_lcc", 0.0) < self.lcc_threshold:
continue
# ── Accept / reject (Metropolis) ─────────────────────────────
delta = f_new - f_current
accept = delta >= 0 or (T > 1e-12 and rng.random() < math.exp(delta / T))
if accept:
old_atoms = atoms # snapshot before reassignment (fix: Bug #3)
atoms = new_atoms
positions = new_positions
metrics = new_metrics
f_current = f_new
# Refresh the covalent-radius cache when the composition changed.
# old_atoms is captured *before* atoms = new_atoms so that the
# identity / equality test is reliable regardless of Python's
# object-identity semantics after assignment.
if old_atoms != atoms:
radii = np.array([_cov_radius_ang(a) for a in atoms], dtype=float)
# Update per_atom_q6 for next fragment selection
new_pts = np.array(positions)
per_atom_q6 = compute_steinhardt_per_atom(new_pts, [6], self._cutoff)["Q6"]
if f_current > best_f:
best_f = f_current
best_atoms = list(atoms)
best_positions = list(positions)
best_metrics = dict(metrics)
# ── Logging ──────────────────────────────────────────────────
if self.verbose and step % log_interval == 0:
self._log(
f"[restart={restart_idx + 1} "
f"step={step + 1:>{width}}/{self.max_steps}] "
f"T={T:.4f} f={f_current:.4f} best={best_f:.4f} "
+ " ".join(f"{k}={_fmt(v)}" for k, v in metrics.items())
)
return best_f, Structure(
atoms=best_atoms,
positions=best_positions,
charge=self.charge,
mult=self.mult,
metrics=best_metrics,
mode=f"opt_{self.method}",
sample_index=restart_idx + 1,
seed=self.seed,
)
# ------------------------------------------------------------------ #
# Public interface #
# ------------------------------------------------------------------ #
[docs]
def run(self, initial: Structure | None = None) -> OptimizationResult:
"""Run ``n_restarts`` optimizations and return an :class:`OptimizationResult`.
Each restart begins from an independently generated random gas-mode
structure (or from *initial* if provided). All per-restart results
are collected, sorted by objective value (highest first), and returned
together in an :class:`OptimizationResult`.
:class:`OptimizationResult` is list-compatible: ``result[0]`` and
``result.best`` both return the highest-scoring structure, and
``for s in result`` iterates all restarts in rank order. Existing
code that calls ``opt.run()`` and uses the return value as a single
``Structure`` should switch to ``opt.run().best`` or ``opt.run()[0]``.
A :class:`UserWarning` is emitted when one or more restarts fail to
produce a valid initial structure after all internal retries are
exhausted. Transient parity-check failures inside the initial-
structure generation loop are silenced internally and do **not** reach
the caller; only a definitive inability to start a restart is reported.
The retry limit is controlled by :attr:`max_init_attempts`
(``0`` = unlimited, the default).
Parameters
----------
initial:
Starting structure. When ``None`` (default), a random gas-mode
structure is generated automatically for each restart.
Returns
-------
OptimizationResult
All per-restart structures sorted by objective value (highest
first), plus summary metadata. Raises :class:`RuntimeError` if
every restart fails to produce a valid initial structure.
Raises
------
RuntimeError
When all restarts fail to produce a valid initial structure.
Examples
--------
Best structure only::
result = opt.run()
print(result.best) # highest-scoring structure
print(result[0]) # same — index 0 is always the best
print(result.summary()) # one-line diagnostic
All restarts::
result = opt.run()
for rank, s in enumerate(result, 1):
print(f"rank {rank}: f={result.objective_scores[rank-1]:.4f} {s}")
"""
rng = random.Random(self.seed)
all_results: list[tuple[float, Structure]] = []
n_attempted = 0
for r in range(self.n_restarts):
self._log(f"[optimize] restart {r + 1}/{self.n_restarts} start")
# ── Parallel Tempering path ───────────────────────────────────
if self.method == "parallel_tempering":
n_attempted += 1
pt_results = self._run_parallel_tempering(initial, r)
all_results.extend(pt_results)
best_pt = max(pt_results, key=lambda x: x[0])
self._log(
f"[optimize] restart {r + 1}/{self.n_restarts} done: "
f"best_f={best_pt[0]:.4f} ({len(pt_results)} replica states)"
)
continue
# ── SA / BH path ──────────────────────────────────────────────
init = initial
if init is None:
init = self._make_initial(rng)
if init is None:
self._log(
f"[optimize] restart {r + 1}: could not generate initial structure, skipping"
)
continue
n_attempted += 1
f, result = self._run_one(init, r)
self._log(f"[optimize] restart {r + 1}/{self.n_restarts} done: f={f:.4f}")
all_results.append((f, result))
if not all_results:
raise RuntimeError(
"Optimization failed: no valid structure found across all restarts."
)
n_skipped = self.n_restarts - n_attempted
if n_skipped > 0:
warnings.warn(
f"{n_skipped} of {self.n_restarts} restart(s) were skipped because "
f"no valid initial structure could be generated. "
f"Try narrowing the element pool to satisfy "
f"charge={self.charge}, mult={self.mult}.",
UserWarning,
stacklevel=2,
)
all_results.sort(key=lambda x: x[0], reverse=True)
best_f = all_results[0][0]
self._log(f"[optimize] best f={best_f:.4f}")
return OptimizationResult(
all_structures=[s for _, s in all_results],
objective_scores=[f for f, _ in all_results],
n_restarts_attempted=n_attempted,
method=self.method,
)
# ------------------------------------------------------------------ #
# Properties / dunder #
# ------------------------------------------------------------------ #
@property
def element_pool(self) -> list[str]:
"""A copy of the resolved element pool."""
return list(self._element_pool)
@property
def cutoff(self) -> float:
"""Distance cutoff (Å) used for Steinhardt and graph metrics."""
return self._cutoff
[docs]
def __repr__(self) -> str:
pt_info = (
f", n_replicas={self.n_replicas}, pt_swap_interval={self.pt_swap_interval}"
if self.method == "parallel_tempering"
else ""
)
comp_info = "" if self.allow_composition_moves else ", allow_composition_moves=False"
disp_info = "" if self.allow_displacements else ", allow_displacements=False"
affine_info = (
f", affine_strength={self.affine_strength}" if self.allow_affine_moves else ""
)
return (
f"StructureOptimizer("
f"n_atoms={self.n_atoms}, method={self.method!r}, "
f"max_steps={self.max_steps}, "
f"T_start={self.T_start}, T_end={self.T_end}, "
f"pool_size={len(self._element_pool)}"
f"{pt_info}{comp_info}{disp_info}{affine_info})"
)