"""CSC111 Winter 2023 Assignment 3: Graphs and Interconnection Networks Instructions (READ THIS FIRST!) =============================== This Python module contains the start of functions and/or classes you'll define for Part 4 of this assignment. You may, but are not required to, add doctest examples to help test your work. We strongly encourage you to do so! Copyright and Usage Information =============================== This file is provided solely for the personal and private use of students taking CSC111 at the University of Toronto St. George campus. All forms of distribution of this code, whether as given or with any changes, are expressly prohibited. For more information on copyright for CSC111 materials, please consult our Course Syllabus. This file is Copyright (c) 2023 Mario Badr and David Liu. """ import random from typing import Optional from python_ta.contracts import check_contracts # NOTE: Node and NodeAddress must be imported for check_contracts # to work correctly, even if they aren't being used directly in this # module. Don't remove them (even if you get a warning about them in PyCharm)! from a3_network import Channel, NodeAddress, Node, Packet from a3_part1 import AbstractRing, AbstractTorus, AbstractStar @check_contracts class GreedyChannelRing(AbstractRing): """An implementation of the Greedy-Channel Ring Network. """ def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]: """Return the channel that the packet should traverse next, given that it has just arrived at the node with address current_address. That is, the returned channel has the node corresponding to current_address as one of its endpoints. Ideally, but not necessarily, traversing the returned channel helps get the given packet closer to its destination. Return None if the current_address is equal to the packet's destination! Preconditions: - current_address in self._nodes - packet.source in self._nodes - packet.destination in self._nodes """ if current_address == packet.destination: return None channels = [] for neighbour_channel in self._nodes[current_address].channels.values(): neighbour = neighbour_channel.get_other_endpoint(self._nodes[current_address]) d = self.get_distance(neighbour.address, packet.destination) channels.append((d, neighbour_channel)) return greedy_channel_select(channels) def get_distance(self, n1: NodeAddress, n2: NodeAddress) -> int: """Return the shortest path distance between the two given node (addresses). Remember that path distance is measured as the number of channels/edges, not nodes. When n1 == n2, the shortest path distance is 0. Preconditions: - n1 in self._nodes - n2 in self._nodes """ right_path = (n2 - n1) % len(self._nodes) left_path = (n1 - n2) % len(self._nodes) return min(right_path, left_path) @check_contracts class GreedyChannelTorus(AbstractTorus): """An implementation of the Greedy-Channel Torus Network. """ def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]: """Return the channel that the packet should traverse next, given that it has just arrived at the node with address current_address. That is, the returned channel has the node corresponding to current_address as one of its endpoints. Ideally, but not necessarily, traversing the returned channel helps get the given packet closer to its destination. Return None if the current_address is equal to the packet's destination! Preconditions: - current_address in self._nodes - packet.source in self._nodes - packet.destination in self._nodes """ if current_address == packet.destination: return None channels = [] for neighbour_channel in self._nodes[current_address].channels.values(): neighbour = neighbour_channel.get_other_endpoint(self._nodes[current_address]) d = self.get_distance(neighbour.address, packet.destination) channels.append((d, neighbour_channel)) return greedy_channel_select(channels) def get_distance(self, n1: NodeAddress, n2: NodeAddress) -> int: """Return the shortest path distance between the two given node (addresses). Remember that path distance is measured as the number of channels/edges, not nodes. When n1 == n2, the shortest path distance is 0. Preconditions: - n1 in self._nodes - n2 in self._nodes """ k = int(len(self._nodes) ** 0.5) right_path = (n2[0] - n1[0]) % k left_path = (n1[0] - n2[0]) % k up_path = (n2[1] - n1[1]) % k down_path = (n1[1] - n2[1]) % k return min(right_path + up_path, right_path + down_path, left_path + up_path, left_path + down_path) @check_contracts class GreedyChannelStar(AbstractStar): """An implementation of the Greedy-Channel Star Network. """ def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]: """Return the channel that the packet should traverse next, given that it has just arrived at the node with address current_address. That is, the returned channel has the node corresponding to current_address as one of its endpoints. Ideally, but not necessarily, traversing the returned channel helps get the given packet closer to its destination. Return None if the current_address is equal to the packet's destination! Preconditions: - current_address in self._nodes - packet.source in self._nodes - packet.destination in self._nodes """ if current_address == packet.destination: return None channels = [] for neighbour_channel in self._nodes[current_address].channels.values(): neighbour = neighbour_channel.get_other_endpoint(self._nodes[current_address]) d = self.get_distance(neighbour.address, packet.destination) channels.append((d, neighbour_channel)) return greedy_channel_select(channels) def get_distance(self, n1: NodeAddress, n2: NodeAddress) -> int: """Return the shortest path distance between the two given node (addresses). Remember that path distance is measured as the number of channels/edges, not nodes. When n1 == n2, the shortest path distance is 0. Preconditions: - n1 in self._nodes - n2 in self._nodes """ if n1 == n2: return 0 elif (n1 < self._num_central) or (n2 < self._num_central): return 1 else: return 2 @check_contracts def greedy_channel_select(channels: list[tuple[int, Channel]]) -> Channel: """Return the channel that minimizes the quantity described under "Greedy Channel Routing Algorithn" on the assignment handout. Each tuple in channels is of the form (d, channel), where d is the shortest-path distance from the neighbour to the packet's destination, and channel is the channel to that neighbour. Break ties as described on the assignment handout. Preconditions: - channels != [] - all(tup[0] >= 0 for tup in channels) """ compare_min_score = float('inf') min_channels = [] for channel in channels: # I NOW REALIZED I COULD'VE JUST USED total_occupancy. if channel[1].occupant: occupant = 1 else: occupant = 0 score = channel[0] + len(channel[1].buffer) + occupant if score < compare_min_score: compare_min_score = score min_channels = [channel] elif score == compare_min_score: min_channels.append(channel) if len(min_channels) == 1: return min_channels[0][1] else: min_distance = min([var_channel[0] for var_channel in min_channels]) min_distance_channels = [var_channel for var_channel in min_channels if var_channel[0] == min_distance] return random.choice(min_distance_channels)[1] ################################################################################################### # Question 2 ################################################################################################### @check_contracts class GreedyPathRing(AbstractRing): """An implementation of the Greedy-Path Ring Network. """ def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]: """Return the channel that the packet should traverse next, given that it has just arrived at the node with address current_address. That is, the returned channel has the node corresponding to current_address as one of its endpoints. Ideally, but not necessarily, traversing the returned channel helps get the given packet closer to its destination. Return None if the current_address is equal to the packet's destination! Preconditions: - current_address in self._nodes - packet.source in self._nodes - packet.destination in self._nodes """ if current_address == packet.destination: return None paths = self.find_paths(current_address, packet.destination) return greedy_path_select(paths) @check_contracts class GreedyPathTorus(AbstractTorus): """An implementation of the Greedy-Path Torus Network. """ def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]: """Return the channel that the packet should traverse next, given that it has just arrived at the node with address current_address. That is, the returned channel has the node corresponding to current_address as one of its endpoints. Ideally, but not necessarily, traversing the returned channel helps get the given packet closer to its destination. Return None if the current_address is equal to the packet's destination! Preconditions: - current_address in self._nodes - packet.source in self._nodes - packet.destination in self._nodes """ if current_address == packet.destination: return None paths = self.find_paths(current_address, packet.destination) return greedy_path_select(paths) @check_contracts class GreedyPathStar(AbstractStar): """An implementation of the Greedy-Path Star Network. """ def route_packet(self, current_address: NodeAddress, packet: Packet) -> Optional[Channel]: """Return the channel that the packet should traverse next, given that it has just arrived at the node with address current_address. That is, the returned channel has the node corresponding to current_address as one of its endpoints. Ideally, but not necessarily, traversing the returned channel helps get the given packet closer to its destination. Return None if the current_address is equal to the packet's destination! Preconditions: - current_address in self._nodes - packet.source in self._nodes - packet.destination in self._nodes """ if current_address == packet.destination: return None paths = self.find_paths(current_address, packet.destination) return greedy_path_select(paths) @check_contracts def greedy_path_select(paths: list[list[Channel]]) -> Channel: """Return the first channel in the path that minimizes the quantity described under "Greedy Path Routing Algorithn" on the assignment handout. Break ties as described on the assignment handout. Preconditions: - paths != [] - every element of paths is a valid path - every path in paths starts at the same node - every path in paths ends at the same node """ best_paths = [] min_score = float('inf') min_length = float('inf') for path in paths: length = len(path) score = compute_path_score(path) if score < min_score or (score == min_score and length < min_length): min_score = score min_length = length best_paths = [path] elif score == min_score and length == min_length: best_paths.append(path) return random.choice(best_paths)[0] @check_contracts def compute_path_score(path: list[Channel]) -> int: """Return the "Greedy Path Routing Algorithm" path score for the given path. See assignment handout for details. Preconditions: - path is a valid path - path != [] """ total_buffer_time = 0 k = len(path) for i in range(k): channel = path[i] buffer_time = max(channel.total_occupancy() - i, 0) total_buffer_time += buffer_time return k + total_buffer_time if __name__ == '__main__': import doctest doctest.testmod(verbose=True) # When you are ready to check your work with python_ta, uncomment the following lines. # (In PyCharm, select the lines below and press Ctrl/Cmd + / to toggle comments.) # You can use "Run file in Python Console" to run PythonTA, # and then also test your methods manually in the console. import python_ta python_ta.check_all(config={ 'max-line-length': 120, 'extra-imports': ['random', 'a3_network', 'a3_part1'], 'disable': ['unused-import'] })