CSC111 / assignments / a3 / a3_simulation.py
a3_simulation.py
Raw
"""CSC111 Winter 2023 Assignment 3: Graphs and Interconnection Networks

Instructions (READ THIS FIRST!)
===============================

This module contains a collection of Python classes and functions that you'll use on
this assignment to run a network simulation. You are responsible for reading the
*docstrings* of this file to understand how to use these classes and functions, but should not
modify anything in this file (other than the optional part at the very end).

This file will not be submitted, and we will supply our own copy for testing purposes.

Note: as is standard for CSC111, we use a leading underscore to indicate private
functions, methods, and instance attributes. You don't have to worry about any of these,
and in fact shouldn't use them in this assignment!

Copyright and Usage Information
===============================

This file is provided solely for the personal and private use of students
taking CSC111 at the University of Toronto St. George campus. All forms of
distribution of this code, whether as given or with any changes, are
expressly prohibited. For more information on copyright for CSC111 materials,
please consult our Course Syllabus.

This file is Copyright (c) 2023 Mario Badr and David Liu.
"""
from __future__ import annotations
from dataclasses import dataclass, field
import heapq
import random
from typing import Optional

from python_ta.contracts import check_contracts

from a3_network import Channel, NodeAddress, AbstractNetwork, Packet
from a3_part1 import AbstractRing, AbstractTorus


###################################################################################################
# The main NetworkSimulation class
###################################################################################################
@check_contracts
class NetworkSimulation:
    """A network simulation that can run a particular simulation configuration on a given network.
    """
    _network: AbstractNetwork
    _current_channels: set[Channel]
    _current_time: int

    def __init__(self, network: AbstractNetwork) -> None:
        """Initialize this simulation with the given network."""
        self._network = network
        self._current_channels = set()
        self._current_time = 0

    def run_with_initial_packets(self, packets: list[tuple[int, Packet]],
                                 print_events: bool = False) -> list[PacketStats]:
        """Run a simulation using the given packet data. Return a list of statistics for each packet.

        Each element of packets is a tuple (timestamp, packet), indicating that the given packet
        should be added to the simulation at the given timestamp.

        Pass print_events=True to print a record of each event when it is processed.

        Preconditions:
        - packets != []
        - all(packet_data[0] >= 0 for packet_data in packets)
        """
        stats = _StatisticsTracker()
        initial_events = [_NewPacketEvent(timestamp, packet) for timestamp, packet in packets]

        events = _EventQueueHeap()
        for event in initial_events:
            events.enqueue(event)

        self._current_channels = set()
        self._current_time = 0

        while not events.is_empty():
            event = events.dequeue()
            if print_events:
                print(event)

            if event.timestamp != self._current_time:
                self._current_channels = set()
                self._current_time = event.timestamp

            new_events = event.handle_event(self._network, stats, self._current_channels)
            for new_event in new_events:
                events.enqueue(new_event)

        return list(stats.packets.values())


@check_contracts
def generate_random_packets(
        network: AbstractNetwork,
        n: int,
        p: float,
        pattern_type: str = 'random') -> list[tuple[int, Packet]]:
    """Randomly generate a list of n packets and timestamps for when each packet should be created.

    Packet sources are chosen uniformly at random from all network nodes.

    If pattern_type == 'random' (the default), the packet destinations are chosen uniformly at random
    (independent of their source). If pattern_type == 'hard', the packet dstinations are chosen
    to maximize the shortest-path distance to the packet's source.

    NOTE: we have only provided code to support the 'random' option. If you'd like to try the 'hard'
    option, you must implement the HardTraffic class at the bottom of this file. Doing so is optional.

    Preconditions:
        - n >= 1
        - 0.0 < p <= 1.0
        - pattern_type in {'random', 'hard'}
    """
    if pattern_type == 'random':
        pattern = _UniformRandomTraffic(network)
    else:  # pattern_type == 'hard'
        pattern = _HardTraffic(network)

    packet_id = 0
    timestamp = 0
    packets = []

    while len(packets) < n:
        for source in network.get_node_addresses():
            if random.uniform(0.0, 1.0) <= p:
                destination = pattern.generate_destination(source)
                packet = Packet(packet_id, source, destination)
                packet_id += 1
                packets.append((timestamp, packet))

                if len(packets) == n:
                    return packets

        timestamp += 1

    return packets


###################################################################################################
# Simulation Event classes
###################################################################################################
class _Event:
    """An abstract class representing an event in our simulation.

    Instance Attributes:
        - timestamp: the start time (in cycles) of the event

    Representation Invariants:
        - self.timestamp >= 0
    """
    timestamp: int

    def __init__(self, timestamp: int) -> None:
        """Initialize this event with the given timestamp.

        Preconditions:
            - timestamp >= 0
        """
        self.timestamp = timestamp

    def handle_event(self, network: AbstractNetwork, stats: _StatisticsTracker,
                     current_channels: set[Channel]) -> list[_Event]:
        """Mutate the given interconnection network to process this event.

        Return a new list of new events created by processing this event.
        Update the given stats to reflect the new event.
        """
        raise NotImplementedError


class _NewPacketEvent(_Event):
    """An event for when a new packet is created in the network."""
    _packet: Packet

    def __init__(self, timestamp: int, packet: Packet) -> None:
        """Initialize this event with the given timestamp and packet.


        Preconditions:
            - timestamp >= 0
        """
        _Event.__init__(self, timestamp)
        self._packet = packet

    def handle_event(self, network: AbstractNetwork, stats: _StatisticsTracker,
                     _current_channels: set[Channel]) -> list[_Event]:
        """Mutate the given network to process this event.

        Return a new list of new events created by processing this event.
        Update the given stats to reflect the new event.

        Preconditions:
        - self._packet.source in network._nodes
        - self._packet.destination in network._nodes
        """
        stats.update(self.timestamp, self._packet.source, self._packet)

        next_channel = network.add_new_packet(self._packet)

        # If the packet is the occupant, make sure the channel gets activated at the next cycle.
        # If the packet was added to the buffer, no need to trigger a new event---an ActivateChannelEvent
        # for the channel must already have been enqueued.
        if next_channel.occupant is self._packet:
            return [_ActivateChannelEvent(self.timestamp + 1, next_channel)]
        else:
            return []

    def __str__(self) -> str:
        """Return a string representation of this event."""
        return f'[{self.timestamp}] New packet {self._packet.identifier}: '\
               f'{self._packet.source} -> {self._packet.destination}'


class _ActivateChannelEvent(_Event):
    """An event for when a packet traverses a channel in the network."""
    _channel: Channel
    _packet: Packet

    def __init__(self, timestamp: int, channel: Channel) -> None:
        """Initialize this event with the given timestamp and channel being traversed.

        Preconditions:
            - timestamp >= 0
        """
        _Event.__init__(self, timestamp)
        self._channel = channel
        self._packet = channel.occupant

    def handle_event(self, network: AbstractNetwork, stats: _StatisticsTracker,
                     current_channels: set[Channel]) -> list[_Event]:
        """Mutate the given network to process this event.

        Return a new list of new events created by processing this event.
        Update the given stats to reflect the new event.
        """
        # Skip this event if the channel has already been activated in this timestep
        if self._channel in current_channels:
            return []
        else:
            current_channels.add(self._channel)

        stats.update(self.timestamp, self._packet.next_stop.address, self._packet)

        new_channel = network.activate_channel(self._channel)
        new_events = []

        # If the channel has a new occupant, it should be activated in the next cycle.
        if self._channel.occupant is not None:
            new_events.append(_ActivateChannelEvent(self.timestamp + 1, self._channel))

        # If the old occupant has moved into a new channel as that channel's occupant,
        # the new channel should also be activated in the next cycle.
        if new_channel is not None and new_channel.occupant is self._packet:
            new_events.append(_ActivateChannelEvent(self.timestamp + 1, new_channel))

        return new_events

    def __str__(self) -> str:
        """Return a string representation of this event."""
        # Note: This method must be called *before* the handle_event method. (Because self._packet.next_stop
        # is modified after the packet moves to a different channel.)
        ends = {node.address for node in self._channel.endpoints}
        return f'[{self.timestamp}] Channel {ends} activated. Packet {self._packet.identifier} '\
               f'moved to {self._packet.next_stop.address}'


class _EventQueueHeap:
    """A heap of events that can be dequeued in cycle order."""
    # Private Instance Attributes:
    #   - _events: a heap of events
    #   - _entry_count: a non-decreasing counter storing all events ever
    #                   enqueued (used to break ties for heapq)
    _events: list[tuple[int, int, _Event]]
    _entry_count: int

    def __init__(self) -> None:
        """Initialize a new and empty event queue."""
        self._events = []
        self._entry_count = 0

    def is_empty(self) -> bool:
        """Return whether this event queue contains no items."""
        return self._events == []

    def enqueue(self, event: _Event) -> None:
        """Add an event to this event queue."""
        heapq.heappush(self._events, (event.timestamp, self._entry_count, event))
        self._entry_count += 1

    def dequeue(self) -> _Event:
        """Remove and return the earliest event in this event queue.

        Preconditions:
        - not self.is_empty()
        """
        _, _, event = heapq.heappop(self._events)
        return event


###############################################################################
# Classes for tracking packet statistics
###############################################################################
class _StatisticsTracker:
    """A class to keep track of packet statistics."""
    packets: dict[int, PacketStats]

    def __init__(self) -> None:
        """Initialize this object."""
        self.packets = {}

    def update(self, timestamp: int, node: NodeAddress, packet: Packet) -> None:
        """Update the statistics based on the arrival of packet at the given node and timestamp."""
        if packet.identifier not in self.packets:
            self.packets[packet.identifier] = PacketStats(identifier=packet.identifier, created_at=timestamp)

        self.packets[packet.identifier].route.append(node)

        if node == packet.destination:
            self.packets[packet.identifier].arrived_at = timestamp


@dataclass
class PacketStats:
    """A data class to track some statistics for a single packet during a simulation.

    Instance Attributes:
        - identifier: The packet identifier.
        - created_at: The timestamp when the packet was created.
        - arrived_at: The timestamp when the packet arrived at its destination,
                      or None if the packet has not yet arrived.
        - route: A list of the NodeAddresses the packet has visited.
    """
    identifier: int
    created_at: int
    arrived_at: Optional[int] = None
    # The following sets the default to be an empty list. Understanding dataclasses.field
    # is beyond the scope of this course.
    route: list[NodeAddress] = field(default_factory=list)


###################################################################################################
# Classes for generating traffic patterns
###################################################################################################
class _TrafficPattern:
    """An abstract class representing an algorithm for generating traffic patterns in a network simulation.
    """
    def generate(self) -> tuple[NodeAddress, NodeAddress]:
        """Return a tuple of two node addresses where the first is the source and the second is the destination.
        """
        raise NotImplementedError

    def generate_destination(self, source: NodeAddress) -> NodeAddress:
        """Return a destination address for the given source address.
        """
        raise NotImplementedError


class _UniformRandomTraffic(_TrafficPattern):
    """A traffic pattern that selects packet source/destination locations uniformly at random across the network.
    """
    _node_ids: list[NodeAddress]

    def __init__(self, network: AbstractNetwork) -> None:
        """Initialize this traffic generator with the given network.
        """
        self._node_ids = list(network.get_node_addresses())

    def generate(self) -> tuple[NodeAddress, NodeAddress]:
        """Return a tuple of two node addresses where the first is the source and the second is the destination.
        """
        return random.sample(self._node_ids, 2)

    def generate_destination(self, source: NodeAddress) -> NodeAddress:
        """Return a destination address for the given source address.
        """
        choices = [address for address in self._node_ids if address != source]
        destination = random.choice(choices)

        return destination


class _HardTraffic(_TrafficPattern):
    """A class that tries to select source/destination pairs that maximize the shortest-path
    distance between them.

    It is OPTIONAL for you to implement this class.
    """
    _node_ids: list[NodeAddress]
    _network: AbstractNetwork

    def __init__(self, network: AbstractNetwork) -> None:
        """Initialize this traffic generator with the given network.
        """
        self._node_ids = list(network.get_node_addresses())
        self._network = network

    def generate(self) -> tuple[NodeAddress, NodeAddress]:
        """Return a tuple of two node addresses where the first is the source and the second is the destination.
        """
        source = random.choice(self._node_ids)
        return source, self.generate_destination(source)

    def generate_destination(self, source: NodeAddress) -> NodeAddress:
        """Return a destination address for the given source address.

        It is OPTIONAL to implement this method.
        If you'd like to attempt it, we've provided some structure and hints in the body below.
        Note that using an if statement to distinguish between types of graph topology is not great design
        (better to add a new abstract method to AbstractNetwork). We haven't done so for this assignment
        because this part is optional.
        """
        if isinstance(self._network, AbstractRing):
            # Pick the destination that's roughly (radix // 2) away from source
            destination = ...
        elif isinstance(self._network, AbstractTorus):
            # Pick the destination that's roughly (radix // 2) away from source in both dimensions
            destination = ...
        else:  # isinstance(self._network, AbstractStar)
            # Pick an outer node (that's not the same as the source node)
            destination = ...

        return destination


if __name__ == '__main__':
    import python_ta
    python_ta.check_all(config={
        'max-line-length': 120,
        'extra-imports': ['heapq', 'random', 'a3_network', 'a3_part1'],
        'allowed-io': ['NetworkSimulation.run_with_initial_packets'],
        'max-nested-blocks': 4
    })