DeepRF / utils / logger.py
logger.py
Raw
import os
import sys
from pathlib import Path
from datetime import datetime
import traceback

import logging
from subprocess import Popen, PIPE
from termcolor import colored
from tensorboardX import SummaryWriter
from tensorboardX.summary import Summary
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
import numpy as np
from scipy.io import savemat
import torch
from PIL import Image
import csv

from settings import PROJECT_ROOT, LOG_LEVELS, LOG_DIR


def encode_gif(images, fps=30):
    cmd = [
        'ffmpeg', '-y',
        '-f', 'rawvideo',
        '-vcodec', 'rawvideo',
        '-r', '%.02f' % fps,
        '-s', '%dx%d' % (images[0].shape[1], images[0].shape[0]),
        '-pix_fmt', 'rgb24',
        '-i', '-',
        '-filter_complex',
        '[0:v]split[x][z];[z]palettegen[y];[x]fifo[x];[x][y]paletteuse',
        '-r', '%.02f' % fps,
        '-f', 'gif',
        '-'
    ]
    proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    for image in images:
        proc.stdin.write(image.tostring())
    out, err = proc.communicate()
    if proc.returncode:
        err = '\n'.join([' '.join(cmd), err.decode('utf8')])
        raise IOError(err)
    del proc
    return out


class Logger:
    def __init__(self, name, args=None, log_dir=None):
        self.args = args
        if log_dir is None:
            self.log_dir = os.path.join(
                os.path.join(PROJECT_ROOT, LOG_DIR, self.args.tag),
                datetime.now().strftime("%Y%m%d%H%M%S")
            )
            Path(self.log_dir).mkdir(parents=True, exist_ok=True)
        else:
            self.log_dir = log_dir

        logger = logging.getLogger(name)
        if not logger.handlers:
            format = logging.Formatter(
                "[%(name)s|%(levelname)s] %(asctime)s > %(message)s"
            )
            streamHandler = logging.StreamHandler()
            streamHandler.setFormatter(format)
            logger.addHandler(streamHandler)
            logger.setLevel(args.log_level)

            filename = os.path.join(self.log_dir, name + '.txt')
            fileHandler = logging.FileHandler(filename, mode="w")
            fileHandler.setFormatter(format)
            logger.addHandler(fileHandler)

        self.logger = logger
        self.writer = SummaryWriter(self.log_dir)
        sys.excepthook = self.excepthook

    def log(self, msg, lvl="INFO"):
        lvl, color = self.get_level_color(lvl)
        self.logger.log(lvl, colored(msg, color))

    def add_level(self, name, lvl, color='white'):
        if name not in LOG_LEVELS.keys() and lvl not in LOG_LEVELS.values():
            LOG_LEVELS[name] = {'lvl': lvl, 'color': color}
            logging.addLevelName(lvl, name)
        else:
            raise AssertionError("log level already exists")

    def get_level_color(self, lvl):
        assert isinstance(lvl, str)
        lvl_num = LOG_LEVELS[lvl]['lvl']
        color = LOG_LEVELS[lvl]['color']
        return lvl_num, color

    def excepthook(self, type_, value_, traceback_):
        e = "{}: {}".format(type_.__name__, value_)
        tb = "".join(traceback.format_exception(type_, value_, traceback_))
        self.log(e, "ERROR")
        self.log(tb, "DEBUG")

    def scalar_summary(self, info, step, lvl="INFO", tag='values'):
        assert isinstance(info, dict), "data must be a dictionary"
        # flush to terminal
        if self.args.log_level <= LOG_LEVELS[lvl]['lvl']:
            key2str = {}
            for key, val in info.items():
                if isinstance(val, float):
                    valstr = "%-8.3g" % (val,)
                else:
                    valstr = str(val)
                key2str[self._truncate(key)] = self._truncate(valstr)

            if len(key2str) == 0:
                self.log("empty key-value dict", 'WARNING')
                return

            keywidth = max(map(len, key2str.keys()))
            valwidth = max(map(len, key2str.values()))

            dashes = '  ' + '-'*(keywidth + valwidth + 7)
            lines = [dashes]
            for key, val in key2str.items():
                lines.append('  | %s%s | %s%s |' % (
                    key,
                    ' '*(keywidth - len(key)),
                    val,
                    ' '*(valwidth - len(val))
                ))
            lines.append(dashes)
            print('\n'.join(lines))

        # flush to csv
        if self.log_dir is not None:
            filepath = Path(os.path.join(self.log_dir, tag + '.csv'))
            if not filepath.is_file():
                with open(filepath, 'w') as f:
                    writer = csv.writer(f)
                    writer.writerow(['step'] + list(info.keys()))

            with open(filepath, 'a') as f:
                writer = csv.writer(f)
                writer.writerow([step] + list(info.values()))

        # flush to tensorboard
        if self.writer is not None:
            for k, v in info.items():
                self.writer.add_scalar(k, v, step)

    def image_summary(self, image, step, tag='figure'):
        path = os.path.join(self.log_dir, 'images')
        if '/' in tag:
            path = os.path.join(path, tag[:tag.find('/')])
        if not os.path.exists(path):
            os.makedirs(path)

        path = os.path.join(self.log_dir, 'images')
        path = os.path.join(path, '{}_{}.png'.format(tag, step))

        # save matplotlib figure to image
        if isinstance(image, Figure):
            image.savefig(path)
            plt.close(image)
        # save PIL image
        else:
            image.save(path)

        # flush to tensorboard
        if self.writer is not None:
            image = Image.open(path).convert('RGB')
            pix = np.array(image)
            pix = np.transpose(pix, (2, 0, 1))
            self.writer.add_image(tag, pix, step)

    def video_summary(self, images, step, tag='playback', fps=30):
        path = os.path.join(self.log_dir, 'videos')
        if not os.path.exists(path):
            os.makedirs(path)
        string_encode = encode_gif(images, fps=fps)
        path = os.path.join(path, '{}_{}.gif'.format(tag, step))

        # save gif image
        with open(path, 'wb') as f:
            f.write(string_encode)

        # flush to tensorboard
        if self.writer is not None:
            _, h, w, c = images.shape
            video = Summary.Image(
                height=h,
                width=w,
                colorspace=c,
                encoded_image_string=string_encode
            )
            self.writer._get_file_writer().add_summary(
                Summary(value=[Summary.Value(tag=tag, image=video)]),
                step,
                walltime=None
            )

    def savemat(self, filename, array_dict):
        assert isinstance(array_dict, dict)
        for k, v in array_dict.items():
            if isinstance(v, torch.Tensor):
                v = v.detach().cpu().numpy()
            assert isinstance(v, np.ndarray)
            array_dict[k] = v

        path = os.path.join(self.log_dir, 'arrays')
        if not os.path.exists(path):
            os.makedirs(path)
        path = os.path.join(path, filename)
        if filename[-4:] == '.mat':
            savemat(path, array_dict)
        else:
            savemat(path + '.mat', array_dict)

    def _truncate(self, s):
        return s[:20] + '...' if len(s) > 23 else s