Source code for castep_outputs.tools.md_geom_parser

"""Lazy MD/Geom parser object."""

from __future__ import annotations

from collections.abc import Generator, Iterable
from functools import singledispatchmethod
from pathlib import Path
from typing import overload

from castep_outputs.parsers.md_geom_file_parser import (
    MDGeomTimestepInfo,
    parse_md_geom_frame,
)
from castep_outputs.utilities.filewrapper import Block, FileWrapper
from castep_outputs.utilities.utility import log_factory


[docs] class MDGeomParser: """Lazy MD/Geom parser. Implements iterator and getitem approaches for lazily navigating .md/.geom files. Parameters ---------- md_geom_file : Path or str File to parse. """ def __init__(self, md_geom_file: Path | str) -> None: self._next_frame: int | None self.file = Path(md_geom_file).expanduser() if not self.file.is_file(): raise FileNotFoundError(f"Cannot open file ({self.file.absolute()}).") self._raw_handle = self.file.open() self._handle = FileWrapper(self._raw_handle) self.logger = log_factory(self._handle) for line in self._handle: if "END header" in line: break else: raise ValueError(f'"END header" not in file ({self._handle.name}).') next(self._handle) self._start = self._handle.tell() self._start_line = self._handle.lineno while next(self._handle).strip(): pass self._frame_lines = self._handle.lineno - self._start_line - 1 self._frame_bytes = self._handle.tell() - self._start stat = self.file.stat() len_est = (stat.st_size - self._start) / self._frame_bytes if not len_est.is_integer(): self.logger( """\ Number of frames estimate is non-integral (%d). This may have been caused by manually modifying the file. While iteration should work, extracting particular frames may not. """, len_est, level="warning", ) self._len = int(len_est) self._go_to_frame(0) @property def next_frame(self) -> int | None: """Get index of next frame to be read, or None if at file end.""" return self._next_frame def _get_index(self, frame: int) -> int: """Get index of given frame in bytes. Parameters ---------- frame : int Frame to compute. Returns ------- int Position in bytes into file to get given frame. """ return self._start + (self._frame_bytes * frame) def _go_to_frame(self, frame: int) -> None: """Set file pointer to given index.""" ind = self._get_index(frame) self._handle.file.seek(ind) self._handle._lineno = self._start_line + (frame * self._frame_lines) self._next_frame = frame if frame < len(self) else None
[docs] def get_frame(self, frame: int) -> MDGeomTimestepInfo: """Get particular frame of md/geom. Parameters ---------- frame : int Frame to retrieve. Returns ------- MDGeomTimestepInfo Parsed frame. Raises ------ IndexError Requested frame out of range. """ if frame not in range(-len(self), len(self)): raise IndexError(f"Cannot get {frame}th frame. File only has {len(self)} frames.") if frame < 0: frame = len(self) + frame if frame != self.next_frame: self._go_to_frame(frame) return self.read_next()
[docs] def __len__(self) -> int: """Get number of frames in file. Returns ------- int Number of frames. """ return self._len
[docs] def __iter__(self) -> Generator[MDGeomTimestepInfo, int, None]: """Get generator over all frames in system. Jumps permitted through ``send``. Yields ------ MDGeomTimestepInfo Information about each frame. """ i = 0 while i < len(self): trial = yield self[i] i += 1 if trial is not None: i = trial
[docs] def read_next(self) -> MDGeomTimestepInfo: """Get the next frame. Returns ------- MDGeomTimestepInfo Information about the next frame. Raises ------ StopIteration No next frame. """ if not (block := Block.get_lines(self._handle, self._frame_lines, eof_possible=True)): raise StopIteration self._next_frame = self._next_frame + 1 if self._next_frame < len(self) - 1 else None return parse_md_geom_frame(block)
@overload def __getitem__(self, frame: int) -> MDGeomTimestepInfo: ... @overload def __getitem__(self, frame: Iterable | slice) -> list[MDGeomTimestepInfo]: ...
[docs] @singledispatchmethod def __getitem__(self, frame): """Get particular frame of md/geom. Parameters ---------- frame : int or Iterable or slice Frame(s) to extract. Returns ------- list[MDGeomTimestepInfo] or MDGeomTimestepInfo Requested frames. """ raise NotImplementedError(f"Can't get {frame}th frame.")
@__getitem__.register def _(self, frame: int) -> MDGeomTimestepInfo: return self.get_frame(frame) @__getitem__.register def _(self, frames: Iterable) -> list[MDGeomTimestepInfo]: return [self.get_frame(frame) for frame in frames] @__getitem__.register def _(self, frames: slice) -> list[MDGeomTimestepInfo]: range_ = frames.indices(len(self)) return self[range(*range_)]
[docs] def __del__(self) -> None: """Close file before deletion.""" self._handle.close()
def __str__(self) -> str: return f"""\ File: {self.file} Frames: {self._len} Next frame: {self._next_frame}"""