import atexit
import sys
import time
import warnings
from io import StringIO
try:
import curses # pytype: disable=import-error
except ImportError:
# Allow fuzz_logger_curses to be imported on Windows -- will fail if you try to use it.
warnings.warn("Importing curses failed. Optional console GUI features will not be available.", UserWarning)
curses = None
import signal
import threading
from math import *
from shutil import get_terminal_size
from . import helpers
from . import ifuzz_logger_backend
COLOR_PAIR_WHITE = 1
COLOR_PAIR_CYAN = 2
COLOR_PAIR_RED = 3
COLOR_PAIR_YELLOW = 4
COLOR_PAIR_GREEN = 5
COLOR_PAIR_MAGENTA = 6
COLOR_PAIR_BLACK = 7
STATUS_RUNNING = 0
STATUS_PAUSED = 1
STATUS_DONE = 2
[docs]
class FuzzLoggerCurses(ifuzz_logger_backend.IFuzzLoggerBackend):
"""
This class formats FuzzLogger data for a console GUI using curses. This hasn't been tested on Windows.
"""
INDENT_SIZE = 2
def __init__(
self,
web_port=26000,
web_address="localhost",
window_height=40,
window_width=130,
auto_scroll=True,
max_log_lines=500,
wait_on_quit=True,
min_refresh_rate=1000,
bytes_to_str=helpers.hex_to_hexstr,
):
"""
:type web_port: int
:param web_port: Webinterface port. Default 26000
:type window_height: int
:param window_height: Default console height, set to on startup. Default 40
:type window_width: int
:param window_width: Default console width, set to on startup. Default 130
:type auto_scroll: bool
:param auto_scroll: Whether to auto-scroll the cases and crashed windows to always display the last line if
there are too many lines to display all of them. Default True
:type max_log_lines: int
:param max_log_lines: Maximum log lines to keep in the internal storage. Additional lines exceeding this limit
will not be displayed. Default 500
:type wait_on_quit: bool
:param wait_on_quit: Whether to keep the GUI open and wait for user-input when the main thread is about to exit.
Default True
:type min_refresh_rate: int
:param min_refresh_rate: The delay between two checks for a resize of the terminal in milliseconds.
Increment 100 ms. Default 1000 ms
:type bytes_to_str: function
:param bytes_to_str: Function that converts sent/received bytes data to string for logging.
"""
self._title = "boofuzz"
self._web_port = web_port
self._web_address = web_address
self._max_log_lines = max_log_lines
self._auto_scroll = auto_scroll
self._current_data = None
self._log_storage = []
self._fail_storage = []
self._wait_on_quit = wait_on_quit
self._quit = False
self._status = STATUS_RUNNING
self._refresh_interval = min_refresh_rate
self._event_resize = True
self._event_log = False
self._event_case_close = False
self._event_crash = False
self._total_index = 0
self._total_num_mutations = 0
self._current_name = ""
self._current_index = 0
self._current_num_mutations = 0
self._format_raw_bytes = bytes_to_str
self._version = helpers.get_boofuzz_version()
# Resize console to minimum size
self._width, self._height = get_terminal_size()
if self._height < window_height or self._width < window_width:
print("\x1b[8;{};{}t".format(window_height, window_width))
self._height, self._width = window_height, window_width
self._height_old = 0
self._width_old = 0
self._min_size_ok = True
sys.stdout = sys.stderr = self._std_buffer = StringIO()
atexit.register(self._cleanup)
self._stdscr = curses.initscr()
curses.start_color()
curses.use_default_colors()
curses.noecho()
curses.curs_set(0)
self._stdscr.nodelay(True)
# Curses color pairs
curses.init_pair(COLOR_PAIR_WHITE, curses.COLOR_WHITE, -1)
curses.init_pair(COLOR_PAIR_CYAN, curses.COLOR_CYAN, -1)
curses.init_pair(COLOR_PAIR_RED, curses.COLOR_RED, -1)
curses.init_pair(COLOR_PAIR_YELLOW, curses.COLOR_YELLOW, -1)
curses.init_pair(COLOR_PAIR_GREEN, curses.COLOR_GREEN, -1)
curses.init_pair(COLOR_PAIR_MAGENTA, curses.COLOR_MAGENTA, -1)
curses.init_pair(COLOR_PAIR_BLACK, curses.COLOR_BLACK, curses.COLOR_WHITE)
# Start thread and restore the original SIGWINCH handler
self._draw_thread = threading.Thread(name="curses_logger", target=self._draw_screen)
self._draw_thread.setDaemon(True)
current_signal_handler = signal.getsignal(signal.SIGWINCH)
self._draw_thread.start()
signal.signal(signal.SIGWINCH, current_signal_handler)
def _cleanup(self):
self._wait_on_quit = False
self.close_test()
sys.stderr = sys.__stderr__
sys.stdout = sys.__stdout__
print(self._std_buffer.getvalue())
self._std_buffer.close()
[docs]
def open_test_case(self, test_case_id, name, index, *args, **kwargs):
self._log_storage = []
self._current_name = name
self._total_index = index
if "current_index" in kwargs:
self._current_index = kwargs["current_index"]
if "current_num_mutations" in kwargs:
self._current_num_mutations = kwargs["current_num_mutations"]
if "num_mutations" in kwargs:
self._total_num_mutations = kwargs["num_mutations"]
self._log_storage.append(
helpers.format_log_msg(msg_type="test_case", description=test_case_id, format_type="curses")
)
self._event_log = True
[docs]
def open_test_step(self, description):
self._log_storage.append(helpers.format_log_msg(msg_type="step", description=description, format_type="curses"))
self._event_log = True
[docs]
def log_info(self, description):
self._log_storage.append(helpers.format_log_msg(msg_type="info", description=description, format_type="curses"))
self._event_log = True
[docs]
def log_check(self, description):
self._log_storage.append(
helpers.format_log_msg(msg_type="check", description=description, format_type="curses")
)
self._event_log = True
[docs]
def log_pass(self, description=""):
self._log_storage.append(helpers.format_log_msg(msg_type="pass", description=description, format_type="curses"))
self._event_log = True
[docs]
def log_fail(self, description="", indent_size=INDENT_SIZE):
# TODO: Why do some fail messages have a trailing whitespace?
fail_msg = (
"#"
+ str(self._total_index)
+ (4 * indent_size + 1 - len(str(self._total_index))) * " "
+ description.strip()
)
self._fail_storage.append([fail_msg.replace("\n", " "), COLOR_PAIR_WHITE])
self._log_storage.append(helpers.format_log_msg(msg_type="fail", description=description, format_type="curses"))
self._event_crash = True
self._event_log = True
[docs]
def log_error(self, description="", indent_size=INDENT_SIZE):
fail_msg = (
"#"
+ str(self._total_index)
+ (4 * indent_size + 1 - len(str(self._total_index))) * " "
+ description.strip()
)
self._fail_storage.append([fail_msg.replace("\n", " "), COLOR_PAIR_RED])
self._log_storage.append(
helpers.format_log_msg(msg_type="error", description=description, format_type="curses")
)
self._event_crash = True
self._event_log = True
[docs]
def log_recv(self, data):
self._log_storage.append(helpers.format_log_msg(msg_type="receive", data=data, format_type="curses"))
self._event_log = True
[docs]
def log_send(self, data):
self._log_storage.append(helpers.format_log_msg(msg_type="send", data=data, format_type="curses"))
self._event_log = True
[docs]
def close_test_case(self):
self._event_case_close = True
[docs]
def close_test(self):
self._status = STATUS_DONE
self._quit = True
try:
if self._wait_on_quit:
self._draw_thread.join()
else:
self._draw_thread.join(max(0.2, self._refresh_interval * 3))
except KeyboardInterrupt:
pass
finally:
self._wait_on_quit = False
curses.nocbreak()
self._stdscr.keypad(False)
curses.echo()
curses.endwin()
def _draw_main(self, force=False):
self._width, self._height = get_terminal_size()
if not force and self._width == self._width_old and self._height == self._height_old:
return
self._height_old = self._height
self._width_old = self._width
curses.resizeterm(self._height, self._width)
self._stdscr.erase()
if self._height < 40 or self._width < 130:
self._min_size_ok = False
return
else:
self._min_size_ok = True
# Render title
self._stdscr.addstr(0, 0, "=" * self._width)
start_x_title = int((self._width // 2) - (len(self._title) // 2) - len(self._title) % 2)
self._stdscr.addstr(0, start_x_title, self._title, curses.color_pair(COLOR_PAIR_CYAN) | curses.A_BOLD)
# Render status bar
self._stdscr.attron(curses.color_pair(COLOR_PAIR_BLACK))
self._stdscr.addstr(self._height - 1, 0, " " * (self._width - 1))
self._stdscr.insch(" ") # Fill bottom right corner
if self._quit:
self._stdscr.addstr(
self._height - 1, 1, "Press 'q' to quit", curses.color_pair(COLOR_PAIR_BLACK) | curses.A_BLINK
)
else:
self._stdscr.addstr(self._height - 1, 1, "Press 'CTRL+C' to abort")
self._stdscr.addstr(self._height - 1, self._width - len(self._version) - 1, self._version)
self._stdscr.attroff(curses.color_pair(COLOR_PAIR_BLACK))
self._stdscr.refresh()
# Initialise test case window
self._casescr_frame = curses.newpad(self._max_log_lines + 1, self._width)
self._casescr_frame.nodelay(True)
self._casescr_frame.border()
self._casescr_frame.addstr(0, 1, "Test case log", curses.color_pair(COLOR_PAIR_YELLOW) | curses.A_BOLD)
self._casescr_frame.refresh(0, 0, 1, 0, self._height - 18, self._width)
self._casescr = self._casescr_frame.subpad(self._max_log_lines, self._width - 2, 1, 1)
self._draw_case()
# Initialise crash window
self._crashescr_frame = curses.newpad(self._max_log_lines + 1, self._width)
self._crashescr_frame.nodelay(True)
self._crashescr_frame.border()
self._crashescr_frame.addstr(0, 1, "Crashes", curses.color_pair(COLOR_PAIR_RED) | curses.A_BOLD)
self._crashescr_frame.refresh(0, 0, self._height - 17, 0, self._height - 8, self._width)
self._crashescr = self._crashescr_frame.subpad(self._max_log_lines, self._width - 2, 1, 1)
self._draw_crash()
# Initialise status window
self._statscr = curses.newwin(6, self._width, self._height - 7, 0)
self._statscr.nodelay(True)
self._statscr.border()
self._statscr.addstr(0, 1, "Status", curses.color_pair(COLOR_PAIR_CYAN) | curses.A_BOLD)
self._draw_stat()
def _draw_case(self, indent_size=INDENT_SIZE):
# Test Case Screen
# TODO: Handle longer indent for multi-line 'fail' messages
self._casescr.erase()
total_indent_size = indent_size * 2 + 1 + 25
_render_pad(
lines=self._log_storage[: self._max_log_lines],
pad=self._casescr,
y_min=2,
x_min=1,
y_max=self._height - 18,
x_max=self._width - 1,
max_lines=self._max_log_lines,
total_indent_size=total_indent_size,
auto_scroll=self._auto_scroll,
)
def _draw_crash(self, indent_size=INDENT_SIZE):
# Crashes Screen
total_indent_size = indent_size * 5
_render_pad(
lines=self._fail_storage[: self._max_log_lines],
pad=self._crashescr,
y_min=self._height - 16,
x_min=1,
y_max=self._height - 8,
x_max=self._width - 1,
max_lines=self._max_log_lines,
total_indent_size=total_indent_size,
auto_scroll=self._auto_scroll,
truncate_long_lines=True,
)
def _draw_stat(self):
# Status Screen
self._indent_size = 16
self._statscr.addstr(1, 1, "Webinterface:")
self._statscr.addstr(1, self._indent_size, "{}:{}".format(self._web_address, self._web_port))
self._statscr.addstr(2, 1, "Case:")
# fmt: off
self._statscr.addstr(2, self._indent_size, _progess_bar(self._current_index,
self._current_num_mutations,
self._width - self._indent_size))
self._statscr.addstr(3, 1, "Total:")
self._statscr.addstr(3, self._indent_size, _progess_bar(self._total_index,
self._total_num_mutations,
self._width - self._indent_size))
# fmt: on
# TODO: Get paused flag from sessions
if self._status == STATUS_RUNNING:
self._statscr.addstr(4, 1, "Status:")
self._statscr.addstr(4, self._indent_size, "Running", curses.color_pair(COLOR_PAIR_YELLOW))
elif self._status == STATUS_PAUSED:
self._statscr.addstr(4, 1, "Status:")
self._statscr.addstr(4, self._indent_size, "Paused ", curses.color_pair(COLOR_PAIR_RED) | curses.A_BLINK)
elif self._status == STATUS_DONE:
self._statscr.addstr(4, 1, "Status:")
self._statscr.addstr(4, self._indent_size, "Done ", curses.color_pair(COLOR_PAIR_GREEN))
self._statscr.refresh()
def _draw_screen(self):
error_counter = 0
ms_since_refresh = 0
key = 0
wait_for_key = False
try:
while not ((key == ord("q") or not self._wait_on_quit) and self._quit):
try:
if self._event_resize or ms_since_refresh >= self._refresh_interval:
self._draw_main()
ms_since_refresh = 0
self._event_resize = False
if self._quit and not wait_for_key:
self._draw_main(force=True)
wait_for_key = True
if self._min_size_ok:
if self._event_log:
self._draw_case()
self._event_log = False
if self._event_crash:
self._draw_crash()
self._event_crash = False
if self._event_case_close:
self._draw_stat()
self._event_case_close = False
key = self._stdscr.getch()
curses.flushinp()
time.sleep(0.1)
ms_since_refresh += 100
error_counter = 0
except curses.error:
error_counter += 1
if error_counter > 2:
raise
finally:
curses.nocbreak()
self._stdscr.keypad(False)
curses.echo()
curses.endwin()
def _progess_bar(current, total, width):
try:
percent = current / total
except (ZeroDivisionError, TypeError):
percent = 0
total = 0
title_str = "{:7d} of {:7d} ".format(current, total)
percent_str = " {:7.3f}%".format(percent * 100)
bar_len = width - 4 - len(title_str) - len(percent_str)
num_bars = int(round(percent * bar_len))
bar_str = "[" + "=" * num_bars + " " * (bar_len - num_bars) + "]"
return title_str + bar_str + percent_str
def _render_pad(
lines, pad, y_min, x_min, y_max, x_max, max_lines, total_indent_size, auto_scroll, truncate_long_lines=False
):
total_rows = 0
height = y_max - y_min + 1
width = x_max - x_min
for i in range(len(lines)):
if total_rows < max_lines - 1:
pad.addnstr(total_rows, 0, lines[i][0], width, curses.color_pair(lines[i][1]))
total_rows += 1
else:
pad.addstr(
total_rows,
0,
"Maximum number of lines reached for this window! Increase 'max_log_lines'",
curses.color_pair(COLOR_PAIR_RED),
)
total_rows += 1
break
if not truncate_long_lines:
columns = width - total_indent_size
rows = int(ceil(len(lines[i][0][width:]) / columns))
if rows >= 1:
for row in range(1, rows + 1):
if total_rows < max_lines - 1:
pad.addstr(
total_rows,
total_indent_size,
lines[i][0][width:][(row * columns) - columns : row * columns],
curses.color_pair(lines[i][1]),
)
total_rows += 1
else:
break
if total_rows > height and auto_scroll:
offset = total_rows - height
pad.refresh(offset, 0, y_min, x_min, y_max, x_max)
else:
pad.refresh(0, 0, y_min, x_min, y_max, x_max)