CSC111 / assignments / a3 / a3_part4.py
a3_part4.py
Raw
"""CSC111 Winter 2023 Assignment 3: Graphs and Interconnection Networks

Instructions (READ THIS FIRST!)
===============================

This Python module contains the start of functions and/or classes you'll define
for Part 4 of this assignment. You may, but are not required to, add doctest
examples to help test your work. We strongly encourage you to do so!

Copyright and Usage Information
===============================

This file is provided solely for the personal and private use of students
taking CSC111 at the University of Toronto St. George campus. All forms of
distribution of this code, whether as given or with any changes, are
expressly prohibited. For more information on copyright for CSC111 materials,
please consult our Course Syllabus.

This file is Copyright (c) 2023 Mario Badr and David Liu.
"""
import random
from typing import Optional

from python_ta.contracts import check_contracts

# NOTE: Node and NodeAddress must be imported for check_contracts
# to work correctly, even if they aren't being used directly in this
# module. Don't remove them (even if you get a warning about them in PyCharm)!
from a3_network import Channel, NodeAddress, Node, Packet
from a3_part1 import AbstractRing, AbstractTorus, AbstractStar


@check_contracts
class GreedyChannelRing(AbstractRing):
    """An implementation of the Greedy-Channel Ring Network.
    """
    def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]:
        """Return the channel that the packet should traverse next, given that it has
        just arrived at the node with address current_address.

        That is, the returned channel has the node corresponding to current_address as
        one of its endpoints. Ideally, but not necessarily, traversing the returned channel
        helps get the given packet closer to its destination.

        Return None if the current_address is equal to the packet's destination!

        Preconditions:
        - current_address in self._nodes
        - packet.source in self._nodes
        - packet.destination in self._nodes
        """
        if current_address == packet.destination:
            return None

        channels = []

        for neighbour_channel in self._nodes[current_address].channels.values():
            neighbour = neighbour_channel.get_other_endpoint(self._nodes[current_address])
            d = self.get_distance(neighbour.address, packet.destination)
            channels.append((d, neighbour_channel))

        return greedy_channel_select(channels)

    def get_distance(self, n1: NodeAddress, n2: NodeAddress) -> int:
        """Return the shortest path distance between the two given node (addresses).

        Remember that path distance is measured as the number of channels/edges, not nodes.
        When n1 == n2, the shortest path distance is 0.

        Preconditions:
            - n1 in self._nodes
            - n2 in self._nodes
        """
        right_path = (n2 - n1) % len(self._nodes)
        left_path = (n1 - n2) % len(self._nodes)

        return min(right_path, left_path)


@check_contracts
class GreedyChannelTorus(AbstractTorus):
    """An implementation of the Greedy-Channel Torus Network.
    """
    def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]:
        """Return the channel that the packet should traverse next, given that it has
        just arrived at the node with address current_address.

        That is, the returned channel has the node corresponding to current_address as
        one of its endpoints. Ideally, but not necessarily, traversing the returned channel
        helps get the given packet closer to its destination.

        Return None if the current_address is equal to the packet's destination!

        Preconditions:
        - current_address in self._nodes
        - packet.source in self._nodes
        - packet.destination in self._nodes
        """
        if current_address == packet.destination:
            return None

        channels = []

        for neighbour_channel in self._nodes[current_address].channels.values():
            neighbour = neighbour_channel.get_other_endpoint(self._nodes[current_address])
            d = self.get_distance(neighbour.address, packet.destination)
            channels.append((d, neighbour_channel))

        return greedy_channel_select(channels)

    def get_distance(self, n1: NodeAddress, n2: NodeAddress) -> int:
        """Return the shortest path distance between the two given node (addresses).

        Remember that path distance is measured as the number of channels/edges, not nodes.
        When n1 == n2, the shortest path distance is 0.

        Preconditions:
            - n1 in self._nodes
            - n2 in self._nodes
        """
        k = int(len(self._nodes) ** 0.5)

        right_path = (n2[0] - n1[0]) % k
        left_path = (n1[0] - n2[0]) % k
        up_path = (n2[1] - n1[1]) % k
        down_path = (n1[1] - n2[1]) % k

        return min(right_path + up_path, right_path + down_path, left_path + up_path, left_path + down_path)


@check_contracts
class GreedyChannelStar(AbstractStar):
    """An implementation of the Greedy-Channel Star Network.
    """
    def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]:
        """Return the channel that the packet should traverse next, given that it has
        just arrived at the node with address current_address.

        That is, the returned channel has the node corresponding to current_address as
        one of its endpoints. Ideally, but not necessarily, traversing the returned channel
        helps get the given packet closer to its destination.

        Return None if the current_address is equal to the packet's destination!

        Preconditions:
        - current_address in self._nodes
        - packet.source in self._nodes
        - packet.destination in self._nodes
        """
        if current_address == packet.destination:
            return None

        channels = []

        for neighbour_channel in self._nodes[current_address].channels.values():
            neighbour = neighbour_channel.get_other_endpoint(self._nodes[current_address])
            d = self.get_distance(neighbour.address, packet.destination)
            channels.append((d, neighbour_channel))

        return greedy_channel_select(channels)

    def get_distance(self, n1: NodeAddress, n2: NodeAddress) -> int:
        """Return the shortest path distance between the two given node (addresses).

        Remember that path distance is measured as the number of channels/edges, not nodes.
        When n1 == n2, the shortest path distance is 0.

        Preconditions:
            - n1 in self._nodes
            - n2 in self._nodes
        """
        if n1 == n2:
            return 0
        elif (n1 < self._num_central) or (n2 < self._num_central):
            return 1
        else:
            return 2


@check_contracts
def greedy_channel_select(channels: list[tuple[int, Channel]]) -> Channel:
    """Return the channel that minimizes the quantity described under "Greedy Channel Routing Algorithn"
    on the assignment handout.

    Each tuple in channels is of the form (d, channel), where d is the shortest-path distance
    from the neighbour to the packet's destination, and channel is the channel to that neighbour.

    Break ties as described on the assignment handout.

    Preconditions:
    - channels != []
    - all(tup[0] >= 0 for tup in channels)
    """
    compare_min_score = float('inf')
    min_channels = []

    for channel in channels:
        # I NOW REALIZED I COULD'VE JUST USED total_occupancy.
        if channel[1].occupant:
            occupant = 1
        else:
            occupant = 0
        score = channel[0] + len(channel[1].buffer) + occupant
        if score < compare_min_score:
            compare_min_score = score
            min_channels = [channel]
        elif score == compare_min_score:
            min_channels.append(channel)

    if len(min_channels) == 1:
        return min_channels[0][1]
    else:
        min_distance = min([var_channel[0] for var_channel in min_channels])
        min_distance_channels = [var_channel for var_channel in min_channels if var_channel[0] == min_distance]
        return random.choice(min_distance_channels)[1]


###################################################################################################
# Question 2
###################################################################################################
@check_contracts
class GreedyPathRing(AbstractRing):
    """An implementation of the Greedy-Path Ring Network.
    """
    def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]:
        """Return the channel that the packet should traverse next, given that it has
        just arrived at the node with address current_address.

        That is, the returned channel has the node corresponding to current_address as
        one of its endpoints. Ideally, but not necessarily, traversing the returned channel
        helps get the given packet closer to its destination.

        Return None if the current_address is equal to the packet's destination!

        Preconditions:
        - current_address in self._nodes
        - packet.source in self._nodes
        - packet.destination in self._nodes
        """
        if current_address == packet.destination:
            return None

        paths = self.find_paths(current_address, packet.destination)
        return greedy_path_select(paths)


@check_contracts
class GreedyPathTorus(AbstractTorus):
    """An implementation of the Greedy-Path Torus Network.
    """
    def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]:
        """Return the channel that the packet should traverse next, given that it has
        just arrived at the node with address current_address.

        That is, the returned channel has the node corresponding to current_address as
        one of its endpoints. Ideally, but not necessarily, traversing the returned channel
        helps get the given packet closer to its destination.

        Return None if the current_address is equal to the packet's destination!

        Preconditions:
        - current_address in self._nodes
        - packet.source in self._nodes
        - packet.destination in self._nodes
        """
        if current_address == packet.destination:
            return None

        paths = self.find_paths(current_address, packet.destination)
        return greedy_path_select(paths)


@check_contracts
class GreedyPathStar(AbstractStar):
    """An implementation of the Greedy-Path Star Network.
    """
    def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]:
        """Return the channel that the packet should traverse next, given that it has
        just arrived at the node with address current_address.

        That is, the returned channel has the node corresponding to current_address as
        one of its endpoints. Ideally, but not necessarily, traversing the returned channel
        helps get the given packet closer to its destination.

        Return None if the current_address is equal to the packet's destination!

        Preconditions:
        - current_address in self._nodes
        - packet.source in self._nodes
        - packet.destination in self._nodes
        """
        if current_address == packet.destination:
            return None

        paths = self.find_paths(current_address, packet.destination)
        return greedy_path_select(paths)


@check_contracts
def greedy_path_select(paths: list[list[Channel]]) -> Channel:
    """Return the first channel in the path that minimizes the quantity described under "Greedy Path Routing Algorithn"
    on the assignment handout.

    Break ties as described on the assignment handout.

    Preconditions:
    - paths != []
    - every element of paths is a valid path
    - every path in paths starts at the same node
    - every path in paths ends at the same node
    """
    best_paths = []
    min_score = float('inf')
    min_length = float('inf')

    for path in paths:
        length = len(path)
        score = compute_path_score(path)

        if score < min_score or (score == min_score and length < min_length):
            min_score = score
            min_length = length
            best_paths = [path]
        elif score == min_score and length == min_length:
            best_paths.append(path)

    return random.choice(best_paths)[0]


@check_contracts
def compute_path_score(path: list[Channel]) -> int:
    """Return the "Greedy Path Routing Algorithm" path score for the given path.

    See assignment handout for details.

    Preconditions:
        - path is a valid path
        - path != []
    """
    total_buffer_time = 0
    k = len(path)

    for i in range(k):
        channel = path[i]
        buffer_time = max(channel.total_occupancy() - i, 0)
        total_buffer_time += buffer_time

    return k + total_buffer_time


if __name__ == '__main__':
    import doctest
    doctest.testmod(verbose=True)

    # When you are ready to check your work with python_ta, uncomment the following lines.
    # (In PyCharm, select the lines below and press Ctrl/Cmd + / to toggle comments.)
    # You can use "Run file in Python Console" to run PythonTA,
    # and then also test your methods manually in the console.
    import python_ta
    python_ta.check_all(config={
        'max-line-length': 120,
        'extra-imports': ['random', 'a3_network', 'a3_part1'],
        'disable': ['unused-import']
    })