|
| 1 | +#!/usr/bin/env python |
| 2 | +# |
| 3 | +# This script can be used for any purpose without limitation subject to the |
| 4 | +# conditions at https://www.ccdc.cam.ac.uk/Community/Pages/Licences/v2.aspx |
| 5 | +# |
| 6 | +# This permission notice and the following statement of attribution must be |
| 7 | +# included in all copies or substantial portions of this script. |
| 8 | +# |
| 9 | +# 2026-06-29: created by Ming-Yu Guo |
| 10 | +# |
| 11 | + |
| 12 | +"""Demo: export full disorder-aware CIF data from the CSD Python API. |
| 13 | +
|
| 14 | +This script demonstrates a complete workflow for extracting crystal structures |
| 15 | +from the Cambridge Structural Database (CSD) with atom-site occupancy, |
| 16 | +disorder-group metadata, anisotropic displacement parameters, and bond |
| 17 | +connectivity preserved in a manually assembled CIF file. |
| 18 | +
|
| 19 | +Why this script exists: |
| 20 | + The built-in CSD Python API CIF writers are convenient, but for some |
| 21 | + disordered entries they do not expose a simple path to preserve the full |
| 22 | + occupancy/disorder atom-site metadata needed for downstream enumeration or |
| 23 | + custom reconstruction workflows. |
| 24 | +
|
| 25 | +Pipeline: |
| 26 | + 1. Read an entry from the CSD with ``EntryReader``. |
| 27 | + 2. Access ``crystal.disordered_molecule`` when available. |
| 28 | + 3. Collect atom-site occupancy, disorder assembly/group, ADP, and bonds. |
| 29 | + 4. Write a CIF with explicit ``_atom_site_occupancy`` and disorder tags. |
| 30 | +
|
| 31 | +Examples: |
| 32 | + python demo_csd_disorder_workflow.py --refcodes ABACIR ABABUB --output-dir ./output |
| 33 | +""" |
| 34 | + |
| 35 | +from __future__ import annotations |
| 36 | + |
| 37 | +import argparse |
| 38 | +from pathlib import Path |
| 39 | +from typing import Any |
| 40 | + |
| 41 | + |
| 42 | +LICENSE_REQUIREMENT = "CSD-Materials or another licence tier with CSD Python API database access" |
| 43 | + |
| 44 | + |
| 45 | +def _format_float(value: Any, precision: int = 4) -> str: |
| 46 | + if value is None: |
| 47 | + return "?" |
| 48 | + return f"{float(value):.{precision}f}" |
| 49 | + |
| 50 | + |
| 51 | +def _format_esd(value: Any, esd: Any, precision: int = 4) -> str: |
| 52 | + if value is None: |
| 53 | + return "?" |
| 54 | + base = f"{float(value):.{precision}f}" |
| 55 | + if esd is None: |
| 56 | + return base |
| 57 | + try: |
| 58 | + if float(esd) > 0: |
| 59 | + return f"{base}({esd})" |
| 60 | + except (TypeError, ValueError): |
| 61 | + pass |
| 62 | + return base |
| 63 | + |
| 64 | + |
| 65 | +def export_full_cif_from_csd(refcode: str, output_path: Path) -> dict[str, Any]: |
| 66 | + """Export a CSD structure to CIF with occupancy and disorder metadata.""" |
| 67 | + from ccdc.io import EntryReader |
| 68 | + |
| 69 | + with EntryReader("CSD") as reader: |
| 70 | + entry = reader.entry(refcode) |
| 71 | + crystal = reader.crystal(refcode) |
| 72 | + |
| 73 | + molecule = crystal.disordered_molecule or crystal.molecule |
| 74 | + |
| 75 | + disorder_map: dict[str, tuple[int | None, str | None]] = {} |
| 76 | + disorder = getattr(crystal, "disorder", None) |
| 77 | + if disorder is not None: |
| 78 | + try: |
| 79 | + for assembly in disorder.assemblies: |
| 80 | + assembly_id = str(getattr(assembly, "id", "")) |
| 81 | + for group in assembly.groups: |
| 82 | + group_id = int(getattr(group, "id", 0)) |
| 83 | + for atom in group.atoms: |
| 84 | + disorder_map[str(atom.label)] = (group_id, assembly_id) |
| 85 | + except Exception: |
| 86 | + pass |
| 87 | + |
| 88 | + atom_data: list[dict[str, Any]] = [] |
| 89 | + for atom in molecule.atoms: |
| 90 | + label = str(atom.label) if atom.label else "?" |
| 91 | + symbol = str(atom.atomic_symbol) if atom.atomic_symbol else "?" |
| 92 | + fractional = atom.fractional_coordinates |
| 93 | + occupancy = 1.0 |
| 94 | + try: |
| 95 | + if atom.occupancy is not None: |
| 96 | + occupancy = float(atom.occupancy) |
| 97 | + except Exception: |
| 98 | + pass |
| 99 | + |
| 100 | + displacement_type = "?" |
| 101 | + u_iso = None |
| 102 | + aniso = None |
| 103 | + try: |
| 104 | + displacement = atom.displacement_parameters |
| 105 | + if displacement is not None: |
| 106 | + u_iso = float(displacement.isotropic_equivalent) |
| 107 | + if displacement.type == "Anisotropic": |
| 108 | + displacement_type = "Uani" |
| 109 | + values = displacement.values |
| 110 | + uncertainties = displacement.uncertainties |
| 111 | + aniso = { |
| 112 | + "u11": values[0][0], "u22": values[1][1], "u33": values[2][2], |
| 113 | + "u12": values[0][1], "u13": values[0][2], "u23": values[1][2], |
| 114 | + "e11": uncertainties[0][0], "e22": uncertainties[1][1], "e33": uncertainties[2][2], |
| 115 | + "e12": uncertainties[0][1], "e13": uncertainties[0][2], "e23": uncertainties[1][2], |
| 116 | + } |
| 117 | + else: |
| 118 | + displacement_type = "Uiso" |
| 119 | + except Exception: |
| 120 | + pass |
| 121 | + |
| 122 | + disorder_group, disorder_assembly = disorder_map.get(label, (None, None)) |
| 123 | + atom_data.append({ |
| 124 | + "label": label, |
| 125 | + "symbol": symbol, |
| 126 | + "fx": float(fractional.x) if fractional else None, |
| 127 | + "fy": float(fractional.y) if fractional else None, |
| 128 | + "fz": float(fractional.z) if fractional else None, |
| 129 | + "occupancy": occupancy, |
| 130 | + "u_iso": u_iso, |
| 131 | + "displacement_type": displacement_type, |
| 132 | + "disorder_group": disorder_group, |
| 133 | + "disorder_assembly": disorder_assembly, |
| 134 | + "aniso": aniso, |
| 135 | + }) |
| 136 | + |
| 137 | + bonds: list[tuple[str, str, float, str]] = [] |
| 138 | + try: |
| 139 | + for bond in molecule.bonds: |
| 140 | + atom_1, atom_2 = bond.atoms |
| 141 | + bonds.append((str(atom_1.label), str(atom_2.label), float(bond.length), str(bond.bond_type))) |
| 142 | + except Exception: |
| 143 | + pass |
| 144 | + |
| 145 | + cell_a, cell_b, cell_c = crystal.cell_lengths |
| 146 | + alpha, beta, gamma = crystal.cell_angles |
| 147 | + disorder_present = any(atom["disorder_group"] is not None for atom in atom_data) |
| 148 | + |
| 149 | + lines = [ |
| 150 | + f"data_{refcode}", |
| 151 | + "_audit_creation_method 'CSD Python API disorder workflow demo'", |
| 152 | + f"_chemical_name_common '{(entry.chemical_name or '?').replace(chr(39), chr(39) + chr(39))}'", |
| 153 | + f"_chemical_formula_sum '{entry.formula or '?'}'", |
| 154 | + f"_cell_length_a {_format_float(cell_a)}", |
| 155 | + f"_cell_length_b {_format_float(cell_b)}", |
| 156 | + f"_cell_length_c {_format_float(cell_c)}", |
| 157 | + f"_cell_angle_alpha {_format_float(alpha, 2)}", |
| 158 | + f"_cell_angle_beta {_format_float(beta, 2)}", |
| 159 | + f"_cell_angle_gamma {_format_float(gamma, 2)}", |
| 160 | + f"_cell_volume {_format_float(crystal.cell_volume, 2)}", |
| 161 | + f"_cell_formula_units_Z {int(crystal.z_value) if crystal.z_value else '?'}", |
| 162 | + f"_symmetry_space_group_name_H-M '{crystal.spacegroup_symbol or '?'}'", |
| 163 | + "", |
| 164 | + ] |
| 165 | + |
| 166 | + if crystal.symmetry_operators: |
| 167 | + lines.extend(["loop_", "_symmetry_equiv_pos_as_xyz"]) |
| 168 | + for operator in crystal.symmetry_operators: |
| 169 | + lines.append(f" '{operator}'") |
| 170 | + lines.append("") |
| 171 | + |
| 172 | + lines.extend([ |
| 173 | + "loop_", |
| 174 | + "_atom_site_label", |
| 175 | + "_atom_site_type_symbol", |
| 176 | + "_atom_site_fract_x", |
| 177 | + "_atom_site_fract_y", |
| 178 | + "_atom_site_fract_z", |
| 179 | + "_atom_site_occupancy", |
| 180 | + "_atom_site_U_iso_or_equiv", |
| 181 | + "_atom_site_thermal_displace_type", |
| 182 | + ]) |
| 183 | + if disorder_present: |
| 184 | + lines.extend([ |
| 185 | + "_atom_site_disorder_assembly", |
| 186 | + "_atom_site_disorder_group", |
| 187 | + ]) |
| 188 | + |
| 189 | + for atom in atom_data: |
| 190 | + if atom["fx"] is None: |
| 191 | + continue |
| 192 | + row = [ |
| 193 | + f"{atom['label']:<8s}", |
| 194 | + f"{atom['symbol']:<4s}", |
| 195 | + f"{atom['fx']:12.6f}", |
| 196 | + f"{atom['fy']:12.6f}", |
| 197 | + f"{atom['fz']:12.6f}", |
| 198 | + f"{atom['occupancy']:8.4f}", |
| 199 | + f"{_format_float(atom['u_iso']):>10s}", |
| 200 | + f"{atom['displacement_type']:>5s}", |
| 201 | + ] |
| 202 | + if disorder_present: |
| 203 | + row.append(f"{str(atom['disorder_assembly']) if atom['disorder_assembly'] is not None else '.':>4s}") |
| 204 | + row.append(f"{str(atom['disorder_group']) if atom['disorder_group'] is not None else '.':>4s}") |
| 205 | + lines.append(" " + " ".join(row)) |
| 206 | + lines.append("") |
| 207 | + |
| 208 | + anisotropic_atoms = [atom for atom in atom_data if atom["aniso"]] |
| 209 | + if anisotropic_atoms: |
| 210 | + lines.extend([ |
| 211 | + "loop_", |
| 212 | + "_atom_site_aniso_label", |
| 213 | + "_atom_site_aniso_U_11", |
| 214 | + "_atom_site_aniso_U_22", |
| 215 | + "_atom_site_aniso_U_33", |
| 216 | + "_atom_site_aniso_U_12", |
| 217 | + "_atom_site_aniso_U_13", |
| 218 | + "_atom_site_aniso_U_23", |
| 219 | + ]) |
| 220 | + for atom in anisotropic_atoms: |
| 221 | + u = atom["aniso"] |
| 222 | + lines.append( |
| 223 | + f" {atom['label']:<8s}" |
| 224 | + f" {_format_esd(u['u11'], u['e11']):>12s}" |
| 225 | + f" {_format_esd(u['u22'], u['e22']):>12s}" |
| 226 | + f" {_format_esd(u['u33'], u['e33']):>12s}" |
| 227 | + f" {_format_esd(u['u12'], u['e12']):>12s}" |
| 228 | + f" {_format_esd(u['u13'], u['e13']):>12s}" |
| 229 | + f" {_format_esd(u['u23'], u['e23']):>12s}" |
| 230 | + ) |
| 231 | + lines.append("") |
| 232 | + |
| 233 | + if bonds: |
| 234 | + lines.extend([ |
| 235 | + "loop_", |
| 236 | + "_geom_bond_atom_site_label_1", |
| 237 | + "_geom_bond_atom_site_label_2", |
| 238 | + "_geom_bond_distance", |
| 239 | + "_ccdc_geom_bond_type", |
| 240 | + ]) |
| 241 | + for atom_1, atom_2, distance, bond_type in bonds: |
| 242 | + lines.append(f" {atom_1:<8s} {atom_2:<8s} {distance:8.4f} {bond_type}") |
| 243 | + lines.append("") |
| 244 | + |
| 245 | + lines.append("#END") |
| 246 | + output_path.parent.mkdir(parents=True, exist_ok=True) |
| 247 | + output_path.write_text("\n".join(lines), encoding="utf-8") |
| 248 | + |
| 249 | + return { |
| 250 | + "refcode": refcode, |
| 251 | + "output": str(output_path), |
| 252 | + "n_atoms": len(atom_data), |
| 253 | + "n_partial_occupancy": sum(1 for atom in atom_data if atom["occupancy"] < 0.999), |
| 254 | + "n_bonds": len(bonds), |
| 255 | + } |
| 256 | + |
| 257 | + |
| 258 | +def parse_args() -> argparse.Namespace: |
| 259 | + parser = argparse.ArgumentParser(description=__doc__) |
| 260 | + parser.add_argument("--refcodes", nargs="+", required=True, help="CSD refcodes to export") |
| 261 | + parser.add_argument("--output-dir", default="output", help="Directory for exported CIF files") |
| 262 | + return parser.parse_args() |
| 263 | + |
| 264 | + |
| 265 | +def main() -> None: |
| 266 | + args = parse_args() |
| 267 | + output_dir = Path(args.output_dir) |
| 268 | + |
| 269 | + print(f"Licence requirement: {LICENSE_REQUIREMENT}") |
| 270 | + for refcode in args.refcodes: |
| 271 | + output_path = output_dir / f"{refcode}.cif" |
| 272 | + summary = export_full_cif_from_csd(refcode, output_path) |
| 273 | + print( |
| 274 | + f"Exported {summary['refcode']} -> {summary['output']} " |
| 275 | + f"(atoms={summary['n_atoms']}, partial_occ={summary['n_partial_occupancy']}, bonds={summary['n_bonds']})" |
| 276 | + ) |
| 277 | + |
| 278 | + |
| 279 | +if __name__ == "__main__": |
| 280 | + main() |
0 commit comments