Malicious-URL-Detection-ML / NewCode / HostBasedFeature.py
HostBasedFeature.py
Raw
#Host-based Feature recover will come here (Contributor: Navkaran)
import socket
from ipwhois import IPWhois
from datetime import datetime
import whois
import requests
import csv
import pandas as pd
    
#def HostBasedFeature(train_df, test_df):
   

def domainage(domain):
    #Takes in information from the whois module and compares it with current day to get the age of the domain
    try:
        domain_info = whois.whois(domain)
        creation_date = domain_info.creation_date
        if isinstance(creation_date, list):
            creation_date = creation_date[0]
        today = datetime.now()
        age = (today - creation_date).days
        return age
    except Exception as e:
        print("Error fetching domain age:", e)
        return None

def asninfo(ip_address):
#Uses IPWhois database to retrieve the information
    try:
        obj = IPWhois(ip_address)
        result = obj.lookup_rdap()
        
        return {
            'ASN': result['asn'],
            'ASN Country Code': result['asn_country_code'],
            'ASN CIDR': result['asn_cidr'],
            'ASN Creation Date': datetime.strptime(result['asn_date'], "%Y-%m-%d") if 'asn_date' in result else 'N/A',
            'ASN Last Updated': datetime.strptime(result['asn_last_updated'], "%Y-%m-%d") if 'asn_last_updated' in result else 'N/A'
        }
    except Exception as e:
        print("Error fetching ASN info:", e)
        return None

def ipadd(domain):
#Uses Socket module to obtain the IP address from the url's provided
    try:
        return socket.gethostbyname(domain)
    except Exception as e:
        print("Error fetching IP address:", e)
        return None

def postalcode(ip_address):
#Uses IP-API web with help of json file editor to grab the result from the website
    try:
        url = f"http://ip-api.com/json/{ip_address}?fields=zip"
        response = requests.get(url)
        data = response.json()
        postal_code = data.get('zip')
        return postal_code
    except Exception as e:
        print("Error fetching postal code:", e)
        return None

def process_domains(input_file, output_file):
#Uses built-in CSV file editor to write the results on a condition of presence of the ASN number for a website over the database
    fieldnames = ['Domain', 'IP Address', 'ASN', 'ASN Country Code', 'ASN CIDR', 'ASN Creation Date', 'ASN Last Updated', 'Domain Age (in days)', 'Postal Code']
    output_df=pd.DataFrame(columns=fieldnames)
    try:
        input_df=pd.read_csv(input_file)
        #URLs=input_df['URLs']
        #print(URLs.shape)
        #with open(input_file, 'r') as csvfile, open(output_file, 'w', newline='') as outfile:
        #reader = csv.DictReader(csvfile)
        #writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        #writer.writeheader()
    
        for domain in input_df['URLs']:
            #print(row)
            #domain = row['URLs']
            print(domain)
            ip_address = ipadd(domain)
            if ip_address:
                asn_info = asninfo(ip_address)
                if asn_info and asn_info['ASN'] != 'N/A':
                    domain_age = domainage(domain)
                    postal_code = postalcode(ip_address)

                    new_row={
                        'Domain': domain,
                        'IP Address': ip_address,
                        'ASN': asn_info['ASN'],
                        'ASN Country Code': asn_info['ASN Country Code'] if asn_info else 'N/A',
                        'ASN CIDR': asn_info['ASN CIDR'] if asn_info else 'N/A',
                        'ASN Creation Date': asn_info['ASN Creation Date'] if asn_info else 'N/A',
                        'ASN Last Updated': asn_info['ASN Last Updated'] if asn_info else 'N/A',
                        'Domain Age (in days)': domain_age,
                        'Postal Code': postal_code
                    }
                # Inserting the new row
                output_df.loc[len(output_df)] = new_row

                # Reset the index
                output_df = output_df.reset_index(drop=True)  
            else:
                print(f"Error: Unable to fetch IP address of the domain '{domain}'")
    except Exception as e:
        print("Error processing domains:", e)
    
    output_df = output_df.loc[:, ~output_df.columns.str.contains('^Unnamed')]
    output_df.to_csv(output_file)
    
    

#inputfile = input("Please enter the target file(include .csv):")
#outputfile= input("Please enter the output file name(include .csv):")

#def process_domains(inputfile, outputfile):
#    pass
#    #retun train_vector, test_vector