Source code for boofuzz.connections.serial_connection

import time

from boofuzz.connections import itarget_connection, serial_connection_low_level


[docs] class SerialConnection(itarget_connection.ITargetConnection): """ ITargetConnection implementation for generic serial ports. Since serial ports provide no default functionality for separating messages/packets, this class provides several means: * timeout: Return received bytes after timeout seconds. * msg_separator_time: Return received bytes after the wire is silent for a given time. This is useful, e.g., for terminal protocols without a machine-readable delimiter. A response may take a long time to send its information, and you know the message is done when data stops coming. * content_check: A user-defined function takes the data received so far and checks for a packet. The function should return 0 if the packet isn't finished yet, or n if a valid message of n bytes has been received. Remaining bytes are stored for next call to recv(). Example: :: def content_check_newline(data): if data.find('\\n') >= 0: return data.find('\\n') else: return 0 If none of these methods are used, your connection may hang forever. .. versionchanged:: 0.2.0 SerialConnection has been moved into the connections subpackage. The full path is now boofuzz.connections.serial_connection.SerialConnection Args: port (Union[int, str]): Serial port name or number. baudrate (int): Baud rate for port. timeout (float): For recv(). After timeout seconds from receive start, recv() will return all received data, if any. message_separator_time (float): After message_separator_time seconds *without receiving any more data*, recv() will return. Optional. Default None. content_checker (function(str) -> int): User-defined function. recv() will pass all bytes received so far to this method. If the method returns n > 0, recv() will return n bytes. If it returns 0, recv() will keep on reading. """ def __init__(self, port=0, baudrate=9600, timeout=5, message_separator_time=0.300, content_checker=None): self._connection = serial_connection_low_level.SerialConnectionLowLevel(port=port, baudrate=baudrate) self._port = port self._baudrate = baudrate self.timeout = timeout self.message_separator_time = message_separator_time self.content_checker = content_checker self._leftover_bytes = b""
[docs] def close(self): """ Close connection to the target. :return: None """ self._connection.close()
[docs] def open(self): """ Opens connection to the target. Make sure to call close! :return: None """ self._connection.open()
[docs] def recv(self, max_bytes): """ Receive up to max_bytes data from the target. Args: max_bytes (int): Maximum number of bytes to receive. Returns: Received data. """ self._connection.timeout = min(0.001, self.message_separator_time, self.timeout) start_time = last_byte_time = time.time() data = self._leftover_bytes self._leftover_bytes = b"" while len(data) < max_bytes: # Update timer for message_separator_time if len(data) > 0: last_byte_time = time.time() # Try recv again fragment = self._connection.recv(max_bytes=max_bytes - len(data)) data += fragment # User-supplied content_checker function if self.content_checker is not None: num_valid_bytes = self.content_checker(data) if num_valid_bytes > 0: self._leftover_bytes = data[num_valid_bytes:] return data[0:num_valid_bytes] # Check timeout and message_separator_time cur_time = time.time() if self.timeout is not None and cur_time - start_time >= self.timeout: return data if self.message_separator_time is not None and cur_time - last_byte_time >= self.message_separator_time: return data return data
[docs] def send(self, data): """ Send data to the target. Only valid after calling open! Args: data: Data to send. Returns: int: Number of bytes actually sent. """ bytes_sent = 0 while bytes_sent < len(data): bytes_sent_this_round = self._connection.send(data[bytes_sent:]) if bytes_sent_this_round is not None: bytes_sent += bytes_sent_this_round return bytes_sent
@property def info(self): return "port: {port}, baudrate: {baudrate}".format(port=self._port, baudrate=self._baudrate)