Source code for algbench.db.nfs_json_list

import datetime
import json
import logging
import os
import os.path
import pathlib
import random
import shutil
import socket
import typing
import zipfile
from zipfile import BadZipFile, ZipFile

from .json_serializer import to_json

_log = logging.getLogger("AlgBench")


[docs] class NfsJsonList: """ A simple database to dump data (dictionaries) into. Should be reasonably threadsafe even for slurm pools with NFS. """
[docs] def __init__( self, path: typing.Union[str, pathlib.Path], file_split_mb: float = 30 ): self.path: typing.Union[str, pathlib.Path] = path if not os.path.exists(path): # Could fail in very few unlucky cases on an NFS (parallel creations) os.makedirs(path, exist_ok=True) _log.info(f"Created new database '{path}'.") if os.path.isfile(path): msg = f"Cannot create database {path} because there exists an equally named file." raise RuntimeError(msg) self._subfile_path: typing.Union[str, pathlib.Path] = self._get_unique_name() self._cache: typing.List = [] self._filesize: int = 0 self._file_split_size: float = file_split_mb * 1024 * 1024
def _get_unique_name(self, _tries=7): """ Generate a unique file name to prevent collisions of parallel processes. """ if _tries <= 0: msg = "Could not generate a unique file name. This is odd." raise RuntimeError(msg) hostname = socket.gethostname() timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M") rand = random.randint(0, 10000) name = f"{timestamp}-{hostname}-{rand}.data" if os.path.exists(name): return self._get_unique_name(_tries=_tries - 1) return name
[docs] def compress(self, compression=zipfile.ZIP_LZMA, compresslevel=None): """ Warning: This may not be threadsafe! If you want to extract all data to a single file, just use 'read' and dump the output into a single json. """ self.flush() compr_path = os.path.join(self.path, "_compressed.zip") with ZipFile( compr_path, "a", compression=compression, compresslevel=compresslevel ) as z: for file_name in os.listdir(self.path): path = os.path.join(self.path, file_name) if not os.path.isfile(path) or not path.endswith(".data"): continue if os.path.getsize(path) <= 0: _log.warning(f"Skipping '{path}' due to zero size.") continue _log.info(f"Compressing '{file_name}' of size {os.path.getsize(path)}.") z.write(path, file_name) os.remove(path) _log.info(f"Compressed database has size {os.path.getsize(compr_path)}.") self._new_data_file()
[docs] def extend(self, entries: typing.List, flush=True): _log.info(f"Adding {len(entries)} items to database.") serialized_data = [to_json(e) for e in entries] self._cache += serialized_data if flush: self.flush() return serialized_data
[docs] def append(self, entry, flush=True): return self.extend([entry], flush)[0]
[docs] def flush(self): if not self._cache: return for data in self._cache: path = os.path.join(self.path, self._subfile_path) with open(path, "a") as f: data = to_json(data) data = json.dumps(data) + "\n" f.write(data) self._filesize += len(data) if os.path.getsize(path) <= 0: msg = "Could not write to disk. Resulting file has zero size." raise RuntimeError(msg) if not os.path.isfile(path): msg = "Could not write to disk for unknown reasons." raise RuntimeError(msg) if self._filesize > self._file_split_size: _log.info( f"File {self._subfile_path} exceeds {self._file_split_size} MB." ) self._new_data_file() _log.info(f"Wrote {len(self._cache)} entries to disk.") self._cache.clear()
def _new_data_file(self): new_unique_name = self._get_unique_name() _log.info(f"Start writing into new file {new_unique_name}.") self._subfile_path = new_unique_name self._filesize = 0
[docs] def iter_cache(self): yield from self._cache
[docs] def iter_compressed(self): """ Iterate over all entries in the compressed database. This may not represent the whole database if the database is not completely compressed. Use the ``__iter__`` method to iterate over the whole database. """ compr_path = os.path.join(self.path, "_compressed.zip") if os.path.exists(compr_path): with ZipFile(compr_path, "r") as z: for filename in z.filelist: yield from self._iter_compressed_file(z, filename, compr_path)
def _iter_compressed_file(self, zip_file, filename, compr_path): try: with zip_file.open(filename, "r") as f: for line in f.readlines(): try: entry = json.loads(line) yield entry except Exception: # Just continue. Probably a synchronization # thing of the NFS. _log.warning(f'Could not load "{line}" in "{compr_path}".') except BadZipFile as e: _log.warning(f"Could not open file {filename}. Bad Zip: {e}")
[docs] def iter_uncompressed(self): # load uncompressed data for fp in os.listdir(self.path): path = os.path.join(self.path, fp) if not os.path.isfile(path) or not path.endswith(".data"): continue with open(path) as f: for entry in f.readlines(): try: entry_ = json.loads(entry) yield entry_ except Exception: # Just continue. Probably a synchronization thing of the NFS. _log.warning(f'Could not load "{entry}" in "{path}".')
def __iter__(self): for entry in self.iter_compressed(): yield entry for entry in self.iter_uncompressed(): yield entry for entry in self.iter_cache(): yield entry
[docs] def load(self) -> typing.List: return list(self)
[docs] def clear(self): """ Clear database (cache and disk). Note that remaining data in the cache of other nodes may still be written. """ # cache self._cache.clear() # compressed compr_path = os.path.join(self.path, "_compressed.zip") if os.path.exists(compr_path): os.remove(compr_path) # remaining .data files for fp in os.listdir(self.path): path = os.path.join(self.path, fp) if not os.path.isfile(path) or not str(path).endswith(".data"): continue os.remove(path)
def __del__(self): self.flush()
[docs] def delete(self): self._cache.clear() shutil.rmtree(self.path)
[docs] def set_new_directory(self, new_path: str): """ Not thread safe. Does not check the new path and does not move any folders on its own. It is expected that this step has already been performed. """ _log.info(f"New database path being set to {new_path} (was {self.path}).") self.path = new_path