sjc / my /utils /event.py
amankishore's picture
Updated app.py
7a11626
raw
history blame
4.31 kB
# design inspiration from detectron2
from pathlib import Path
import json
import os
from contextlib import contextmanager
from .ticker import IntervalTicker
_CURRENT_STORAGE_STACK = []
def get_event_storage():
"""
Returns:
The :class:`EventStorage` object that's currently being used.
Throws an error if no :class:`EventStorage` is currently enabled.
"""
assert len(
_CURRENT_STORAGE_STACK
), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!"
return _CURRENT_STORAGE_STACK[-1]
def read_lined_json(fname):
with Path(fname).open('r') as f:
for line in f:
item = json.loads(line)
yield item
def read_stats(dirname, key):
if dirname is None or not (fname := Path(dirname) / "history.json").is_file():
return [], []
stats = read_lined_json(fname)
stats = list(filter(lambda x: key in x, stats))
xs = [e['iter'] for e in stats]
ys = [e[key] for e in stats]
return xs, ys
class EventStorage():
def __init__(self, output_dir="./", start_iter=0, flush_period=60):
self.iter = start_iter
self.ticker = IntervalTicker(flush_period)
self.history = []
self._current_prefix = ""
self._init_curr_buffer_()
self.output_dir = output_dir
self.writable = False
def _open(self):
if self.writable:
output_dir = Path(self.output_dir)
if not output_dir.is_dir():
output_dir.mkdir(parents=True, exist_ok=True)
json_fname = output_dir / 'history.json'
self._file_handle = json_fname.open('a', encoding='utf8')
self.output_dir = output_dir # make sure it's a path object
def _init_curr_buffer_(self):
self.curr_buffer = {'iter': self.iter}
def step(self, flush=False):
self.history.append(self.curr_buffer)
on_flush_period = self.ticker.tick()
if flush or on_flush_period:
self.flush_history()
self.iter += 1
self._init_curr_buffer_()
def flush_history(self):
if self.writable:
for item in self.history:
line = json.dumps(item, sort_keys=True, ensure_ascii=False) + "\n"
self._file_handle.write(line)
self._file_handle.flush()
self.history = []
def full_key(self, key):
assert isinstance(key, str)
name = self._current_prefix + key
return name
def put(self, key, val):
key = self.full_key(key)
assert isinstance(val, (int, float, str))
if isinstance(val, float):
val = round(val, 3)
self.curr_buffer[key] = val
def put_scalars(self, **kwargs):
for k, v in kwargs.items():
self.put(k, v)
def put_artifact(self, key, ext, save_func):
if not self.writable:
return
os.makedirs(self.output_dir / key, exist_ok=True)
fname = (self.output_dir / key / f"step_{self.iter}").with_suffix(ext)
fname = str(fname)
# must be called inside so that
# 1. the func is not executed if the metric is not writable
# 2. the key is only inserted if the func succeeds
save_func(fname)
self.put(key, fname)
return fname
def close(self):
self.flush_history()
if self.writable:
self._file_handle.close()
def get_last(self):
if len(self.history) > 0:
last = self.history[-1]
return last
def __enter__(self):
if len(_CURRENT_STORAGE_STACK) > 0:
parent = _CURRENT_STORAGE_STACK[-1]
root, dirname = parent.output_dir, self.output_dir
if root is not None and dirname is not None:
child_dir = parent.output_dir / f"{self.output_dir}_{parent.iter}"
self.output_dir = child_dir
parent.put(str(dirname), str(child_dir))
if self.output_dir is not None:
self.writable = True
self._open()
_CURRENT_STORAGE_STACK.append(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
assert _CURRENT_STORAGE_STACK[-1] == self
_CURRENT_STORAGE_STACK.pop()
self.close()