CSC108-Fall-2022-A2 / carbon_emissions.py
carbon_emissions.py
Raw
"""CSC108: Fall 2022 -- Assignment 2: Carbon Emissions 

This code is provided solely for the personal and private use of
students taking the CSC108/CSCA08 course at the University of
Toronto. Copying for purposes other than this use is expressly
prohibited. All forms of distribution of this code, whether as given
or with any changes, are expressly prohibited.

All of the files in this directory and all subdirectories are:
Copyright (c) 2022 Sadia Sharmin and Michelle Craig
"""
# The following lines define the type number so we can use it in type contracts
# for functions which normally return float but for error situations return
# integer constants
from typing import Union
number = Union[float, int] 

START_YEAR = 1799
END_YEAR = 2017
EMISSION_FILENAME = 'co2_emissions_per_person.csv'
POP_FILENAME = 'populations.csv'
G7_COUNTRIES = ['Canada', 'France', 'Germany', 'Italy',
                'Japan', 'United Kingdom', 'United States']

# Missing data entry in table
MISSING_DATA = -1.0 

# Constant return values to be used in error situations
DATA_NOT_FOUND = -1
COUNTRY_NOT_FOUND = -2
INVALID_FORMAT = - 3

SAMPLE_POPULATION_DATA = [
    ['Canada', 500000.0, 512000.0, 525000.0, 538000.0] + [-1.0] * 215,
    ['Finland', 800000.0, 815000.0, 831000.0, 847000.0] + [-1.0] * 215,
    ['Poland', 9000000.0, 9070000.0, 9130000.0, 9200000.0] + [-1.0] * 215,
]

SAMPLE_EMISSIONS_DATA = [
    ['Canada', 0.00733, 0.00716, 0.00698, 0.00681] + [-1.0] * 215,
    ['Finland', -1.0, -1.0, -1.0, 0.00341] + [-1.0] * 215,
    ['Poland', 0.0452, 0.0489, 0.0494, 0.0502] + [-1.0] * 215,
]


################################################################################
# Simple functions
################################################################################

def convert_population(population_string: str) -> number:
    """Convert population_string to its corresponding numeric value.
    When the population string is invalid return INVALID_FORMAT.

    >>> convert_population('76.6M')
    76600000.0
    >>> convert_population('76.6m')
    -3
    """
    if population_string[-1] == 'B':
        converted_population = \
            float(population_string[0: len(population_string) - 1]) * 10 ** 9
    elif population_string[-1] == 'M': 
        converted_population = \
            float(population_string[0: len(population_string) - 1]) * 10 ** 6
    elif population_string[-1] == 'k':
        converted_population = \
            float(population_string[0: len(population_string) - 1]) * 10 ** 3
    elif population_string[-1].isdigit():
        converted_population = \
            float(population_string)
    else:
        converted_population = INVALID_FORMAT 
    return converted_population 
 
 
def get_year_index(year: int) -> int:
    """Return the index that <year> would map to.

    Precondition: START_YEAR <= year <= END_YEAR

    >>> get_year_index(1801)
    3
    """
    return year - 1799 + 1 

    
################################################################################
# Data cleaning and reading
################################################################################

def read_data(filename: str) -> list[list[str]]:
    """Return the data found in the file filename as a list of lists of strings.
    Each inner list corresponds to a row in the file.

    Docstring examples not given since the results depend on filename.

    Precondition: The data in filename is in a valid format.
    """
    with open(filename, "r") as file:
        file.readline()
        line = file.readlines()
        raw_data = []
        for data_set in line:
            separated = data_set.split(',')
            raw_data.append(separated[0: len(separated) - 1])
        return raw_data
        
        
def prepare_data(filename: str, is_emission_data: bool = False) -> list[list]:
    """Return the data found in file filename, cleaning the data accordingly.

    If is_emission_data is True, then the data in filename is emission data and
    should be cleaned as such. Otherwise, it is population data and should be
    cleaned accordingly.

    Docstring examples not given since the results depend on filename.

    Precondition: The data in filename is in a valid format.
    """
    data = read_data(filename) 
    if is_emission_data is True:
        clean_emission_data(data)
        A2_data_table = data
    else:
        clean_population_data(data)
        A2_data_table = data 

    return A2_data_table     
    
    
def clean_population_data(data: list[list]) -> None:
    """Clean the population data in data, replacing the strings representing
    the population with a floating point value.

    Use the constant MISSING_DATA to represent any entries where the 
    format of the population string was invalid. 

    >>> small_sample_data = [['France', '29M', '29.1M', '29.2M', '29.3m'],
    ...                      ['Mauritius', '59k', '60.7k', '62.4k', '64.2k']]
    >>> clean_population_data(small_sample_data)
    >>> small_sample_data[0]
    ['France', 29000000.0, 29100000.0, 29200000.0, -1.0]
    >>> small_sample_data[1]
    ['Mauritius', 59000.0, 60700.0, 62400.0, 64200.0]
    
    >>> small_sample_data = [['Canada', '500k', '512k', '525k', '538k'],\
    ['Finland', '800k', '815k', '831k', '847k']]
    >>> clean_population_data(small_sample_data)
    >>> small_sample_data[0]
    ['Canada', 500000.0, 512000.0, 525000.0, 538000.0]
    >>> small_sample_data[1]
    ['Finland', 800000.0, 815000.0, 831000.0, 847000.0]
    """
    for data_set in data:
        for i in range(len(data_set)):
            if data_set[i][:-1].replace('.', '', 1).isnumeric():
                data_set[i] = convert_population(data_set[i])
                if data_set[i] == INVALID_FORMAT:
                    data_set[i] = MISSING_DATA 
            else: 
                data_set[i] = data_set[i]    
    

def clean_emission_data(data: list[list]) -> None:
    """Clean the emission data in data, replacing the strings representing
    the emissions with a floating point value.

    >>> small_sample_data = [
    ...     ['Canada', '0.00733', '0.00716', '0.00698', '0.00681'] + [''] * 215,
    ...     ['Finland'] + ['']*3 + ['0.00341'] + [''] * 215,
    ...     ['Poland', '0.0452',' 0.0489', '0.0494', '0.0502'] + [''] * 215]
    >>> clean_emission_data(small_sample_data)
    >>> small_sample_data == SAMPLE_EMISSIONS_DATA
    True
    
    >>> small_sample_data= [
    ...     ['Finland'] + ['']*3 + ['0.00341'] + [''] * 215,
    ...     ['Poland', '0.0452',' 0.0489', '0.0494', '0.0502'] + [''] * 215]
    >>> clean_emission_data(small_sample_data)
    >>> small_sample_data == SAMPLE_EMISSIONS_DATA[1:]
    True 
    """
    for data_set in data:
        for i in range(1, len(data_set)):
            if data_set[i] == '':
                data_set[i] = MISSING_DATA
            else:
                data_set[i] = float(data_set[i])            
    
################################################################################
# Data querying
################################################################################


def get_country_row(data: list[list], country: str) -> list:
    """Return the row in data that belongs to country, including country's name.

    If country is not in data, return an empty list.

    >>> get_country_row(SAMPLE_POPULATION_DATA, 'Canada')[:5]
    ['Canada', 500000.0, 512000.0, 525000.0, 538000.0]

    """
    countries = []
    countries_to_data = {}
    for data_set in data:
        countries.append(data_set[0])
        countries_to_data[data_set[0]] = data_set[0:]
    if country not in countries:
        return []
    else:
        return countries_to_data[country]   
    

def country_with_largest_emissions_by_year(emissions_data: list[list],
                                           year: int) -> str:
    """Return the name of the country that has the largest per-person
    emissions in the given year of data. In the case of a tie, return the one
    that comes first in the data file.


    Precondition:  START_YEAR <= year <= END_YEAR
                   len(emissions_data) >= 1
 
    There is at least one country with emissions for the given year
    in emissions_data.

    >>> country_with_largest_emissions_by_year(SAMPLE_EMISSIONS_DATA, 1800)
    'Poland'
    """
    year_index = get_year_index(year)
    name_to_yearly_data = {}
    for row in emissions_data:
        name_to_yearly_data[row[0]] = row[year_index]
    
    max_value = max(name_to_yearly_data.values())
    countries_max = []
    count = 0 
    for name in name_to_yearly_data:
        if name_to_yearly_data[name] == max_value:
            count = count + 1 
            countries_max.append(name)
    if count >= 2:
        return min(countries_max)
    else:
        return max(name_to_yearly_data)
    
    
def emissions_by_country_by_year(emissions_per_person: list[list],
                                 population: list[list],
                                 country: str, year: int) -> number:
    """Return the total emissions for this country in this year
    based on the population and the emissions per person


    If the country is represented in the data, but the population or emission
    for the required year is missing, then return DATA_NOT_FOUND.

    If the country is missing from one or both of the data files,
    return COUNTRY_NOT_FOUND.

    >>> emissions_by_country_by_year(SAMPLE_EMISSIONS_DATA,
    ...                              SAMPLE_POPULATION_DATA,
    ...                              'Canada', 1799)
    3665.0
    >>> emissions_by_country_by_year(SAMPLE_EMISSIONS_DATA,
    ...                              SAMPLE_POPULATION_DATA,
    ...                              'Finland', 1801)
    -1
    """
    year_index = get_year_index(year) 
    countries_in_population = []
    countries_in_emissions = []
    country_to_data_population = {}
    country_to_data_emissions = {}
    
    for country_data in population:
        countries_in_population.append(country_data[0]) 
    if country not in countries_in_population:
        return COUNTRY_NOT_FOUND
    
    for country_data in emissions_per_person:
        countries_in_emissions.append(country_data[0]) 
    if country not in countries_in_emissions:
        return COUNTRY_NOT_FOUND
    
    for country_data in population:
        country_to_data_population[country_data[0]] = \
            country_data[1: len(country_data)]
    for country_data in emissions_per_person:
        country_to_data_emissions[country_data[0]] = \
            country_data[1: len(country_data)]
    
    if country_to_data_emissions[country][year_index - 1] == MISSING_DATA:
        return DATA_NOT_FOUND
    else:
        return country_to_data_emissions[country][year_index - 1] * \
            country_to_data_population[country][year_index - 1]   


def total_emissions_by_countries(countries: list[str], population: list[list],
                                 emissions_per_person: list[list], 
                                 year: int) -> number:
    """Return the total CO2 emitted collectively by these countries in this year
    based on their populations and emissions data in the appropriate year.

    If there is a country in the list that is not found in the data,
    return COUNTRY_NOT_FOUND. If all the countries are in the data, but
    none of them have data for the given year, return DATA_NOT_FOUND.
    Otherwise return the total emissions for the years in which these
    countries have valid data.

    Precondition: no table entry for emissions is 0.0

    >>> total_emissions_by_countries(['Canada', 'Finland'],
    ...                              SAMPLE_POPULATION_DATA,
    ...                              SAMPLE_EMISSIONS_DATA, 1799)
    3665.0

    >>> total_emissions_by_countries(['Canada', 'Finland', 'Nowhere'],
    ...                              SAMPLE_POPULATION_DATA,
    ...                              SAMPLE_EMISSIONS_DATA, 1799)
    -2
    """
    country_to_total_emissions = {}
    for country in countries:
        country_to_total_emissions[country] \
            = emissions_by_country_by_year(emissions_per_person, population, 
                                           country, year)
    
    for country in countries:
        if country_to_total_emissions[country] == COUNTRY_NOT_FOUND:
            return COUNTRY_NOT_FOUND
        else:
            country_to_total_emissions[country] = \
                country_to_total_emissions[country]
    count = 0 
    total = 0
    for country in countries:
        if country_to_total_emissions[country] == DATA_NOT_FOUND:
            count = count + 1
            if count == len(country_to_total_emissions):
                return DATA_NOT_FOUND
        else:
            total = total + country_to_total_emissions[country] 
    return total     


def country_average_over_range(data: list[list], range_start: int,
                               range_end: int,
                               country: str) -> number:
    """
    Return the average per-person emissions for the years between range_start
    and range_end for this country in which we have available data. If there
    are no valid entries in this range, return DATA_NOT_FOUND. If country is
    not included in the data, return COUNTRY_NOT_FOUND.

    Preconditions:
        range_start >= START_YEAR
        range_end <= END_YEAR
        
    >>> country_average_over_range(SAMPLE_EMISSIONS_DATA, 1799, 1800, 'Canada')
    0.007245
    >>> country_average_over_range(SAMPLE_EMISSIONS_DATA, 1799, 1800, 'Finland')
    -1
    """
    starting_year_index = get_year_index(range_start)
    final_year_index = get_year_index(range_end) 
    total_years = range_end - range_start + 1
    
    country_data = get_country_row(data, country)[starting_year_index: 
                                                  final_year_index + 1] 
    
    if country_data == []:
        return COUNTRY_NOT_FOUND 
    
    sum_emissions = 0 
    count = 0 
    
    for yearly_data in country_data:
        if yearly_data == MISSING_DATA:
            count = count + 1 
            if count == total_years:
                return DATA_NOT_FOUND
            else: 
                yearly_data = 0
                sum_emissions = sum_emissions + yearly_data 
        else:
            sum_emissions = sum_emissions + yearly_data 
    return sum_emissions / total_years    
    

def peak_year_by_country(data: list[list], country: str) -> int:
    """Return the year when this country had the largest emissions or
    return COUNTRY_NOT_FOUND if this country is not represented in data.

    If this largest value was the same for multiple years, return the latest
    year the country had this maximum level.
    
    Precondition: If country is represented in data, then at least one year
    for this country has valid data. (I.e. not all years are MISSING_DATA)


    >>> peak_year_by_country(SAMPLE_EMISSIONS_DATA, 'Canada')
    1799
    >>> peak_year_by_country(SAMPLE_EMISSIONS_DATA, 'Nowhere')
    -2
    """
    country_data = get_country_row(data, country) 
    if country_data == []:
        return COUNTRY_NOT_FOUND
    
    max_value = max(country_data[1:])
    indexes = []
    for i in range(1, len(country_data)):
        if country_data[i] == max_value:
            indexes.append(i)
            
    peak_year = max(indexes) + 1799 - 1 
    return peak_year     
    
################################################################################
# Data mutation
################################################################################


def create_total_emissions_table(emissions_data: list[list],
                                 population_data: list[list]) -> list[list]:
    """Create and return a table in same format as emissions_data but using the
    population data to determine the total emissions in each year.
    Years where the emissions_per_person data is not available have values from
    the constant MISSING_DATA as the entry. The returned table should also have
    MISSING_DATA when no population data is available for that country/year.

    Precondition: countries are in the same order in all three tables.

    >>> table = create_total_emissions_table(SAMPLE_EMISSIONS_DATA, \
    SAMPLE_POPULATION_DATA)
    >>> table[1][:5]
    ['Finland', -1.0, -1.0, -1.0, 2888.27]
    """
    total_emissions = []
    k = 0 
    while k < len(emissions_data):
        total_emissions_data_point = []
        total_emissions_data_point.append(emissions_data[k][0])
        i = 1
        while i < len(emissions_data[k]):
            if emissions_data[k][i] == MISSING_DATA:
                total_emissions_num = MISSING_DATA
            else: 
                total_emissions_num = emissions_data[k][i] * \
                    population_data[k][i]
            total_emissions_data_point.append(total_emissions_num)
            i = i + 1
        total_emissions.append(total_emissions_data_point)
        k = k + 1
   
    return total_emissions


def update_country_year_data(data: list[list], country: str, year: int,
                             new_data: float) -> float:
    """Replace the values in data for the given year and country
    with new_data. Return the original value or COUNTRY_NOT_FOUND if country
    is not in data.

    Precondition: START_YEAR <= year <= END_YEAR

    >>> small_sample_data = [['France', -1.0, -1.0, -1.0, -1.0] + [-1.0] * 215]
    >>> update_country_year_data(small_sample_data, 'France', 1799, 0.05)
    -1.0
    >>> small_sample_data[0][:5]
    ['France', 0.05, -1.0, -1.0, -1.0]
    >>> update_country_year_data(small_sample_data, 'Canada', 1799, 0.05)
    -2
    """
    year_index = get_year_index(year) 
    countries = []
    countries_to_data = {}
    to_remove = []
    for data_set in data:
        countries.append(data_set[0])
        countries_to_data[data_set[0]] = data_set[0:]
    if country not in countries:
        return COUNTRY_NOT_FOUND
    else: 
        for country_data in data:
            if country_data[0] == country:
                to_remove.append(country_data[year_index])
                country_data[year_index] = new_data
        return to_remove[0]    


if __name__ == '__main__':

    import doctest
    doctest.testmod()