#### IMAGE SCRAPER #### # Imports import os import time import requests from selenium import webdriver ### Fetch URL's ''' The function: 1. Builds a query and loads the page for query result 2. Scrolls to the end of page 3. Gets thumbnails for all images on the page 4. CLicks on each thumbnail and extracts the url ''' def fetch_urls(search_term: str, max_urls: int, wd: webdriver, sleep_time=0.5): ''' :param search_term: term to query for images :param max_urls: max no of images to download :param wd: webdriver :param sleep_time: time to sleep between interactions :return: list of urls for images ''' # Build google query search_url = "https://www.google.com/search?safe=off&site=&tbm=isch&source=hp&q={q}&oq={q}&gs_l=img" # Load page wd.get(search_url.format(q=search_term)) # Scroll to page length def scroll_to_end(wd): wd.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(sleep_time) # Get all thumbnails on the page image_urls = set() url_count = 0 results_start = 0 while url_count < max_urls: scroll_to_end(wd) # get all thumbnails on the page thumb_res = wd.find_elements_by_css_selector('img.Q4LuWd') num_thumbnails = len(thumb_res) print(f"Found {num_thumbnails} thumbnails. Extracting results from {results_start}:{num_thumbnails}") # Click on each thumbnail and Extract url for each img for img in thumb_res[results_start:num_thumbnails]: try: img.click() time.sleep(sleep_time) except Exception: continue # Select image and get its url actual_imgs = wd.find_elements_by_css_selector("img.n3VNCb") for img in actual_imgs: # Checks if the tag has src and src has "http" in it to grab the correct url # Adds url to the set of url if img.get_attribute('src') and "https" in img.get_attribute('src'): image_urls.add(img.get_attribute('src')) # Update url count url_count = len(image_urls) # Break if enough urls are retrieved if len(image_urls) >= max_urls: print(f"Found {len(image_urls)} urls. done!") break results_start = len(thumb_res) return image_urls ### Persist images ''' The function: 1. Gets image content from url 2. Creates a new file and write image content to it ''' def persist_image(folder: str, url: str, counter: int): ''' :param folder: folder path to download images :param url: url of the image :param counter: img no. to add to img file name :return: ''' # Get image content from url as bytes try: image_content = requests.get(url).content except Exception as e: print(f"ERROR: Could not download {url}, {e}") # Open a file and write content to it # wb = write in binary try: f = open(os.path.join(folder, "img_" + str(counter) + ".jpg"), 'wb') f.write(image_content) f.close() print(f"SUCCESS: Saved {url} at {folder}") except Exception as e: print(f'ERROR: Could not save {url}, {e}') ### Search and Download ''' The function: 1. Creates a new folder with name of query str at target path if it does nto exist 2. Fetches the urls for the no. of images specified 3. Downloads the images using the url ''' def get_images(search_term:str, driver_path:str, n_images: int=10, target_path='./images'): ''' :param search_term: term to query for images :param driver_path: path to the driver :param n_images: number of images to download :param target_path: folder path for downloading images :return: ''' # Create image folder target_folder = os.path.join(target_path, '_'.join(search_term.lower().split(' '))) if not os.path.exists(target_folder): os.makedirs(target_folder) # With webdriver, fetch image url with webdriver.Chrome(executable_path=driver_path) as wd: results = fetch_urls(search_term=search_term, max_urls=n_images, wd=wd, sleep_time=0.5) # Persist images counter = 0 for url in results: persist_image(folder=target_folder, url=url, counter=counter) counter += 1 print(f"{n_images} images have been downloaded successfully") ## Run Image Scraper search_term = 'national parks' driver = './chromedriver' image_count = 10 target_path = './images' get_images(search_term=search_term, driver_path=driver, n_images=image_count, target_path=target_path)