from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from datetime import datetime, timedelta import csv import time DELTA = 5 # === UTILS FUNCTION === def month_name_to_number(month_name): # Dictionary mapping month names to their corresponding numbers month_numbers = { 'january': 1, 'february': 2, 'march': 3, 'april': 4, 'may': 5, 'june': 6, 'july': 7, 'august': 8, 'september': 9, 'october': 10, 'november': 11, 'december': 12 } month_name = month_name.lower() return month_numbers.get(month_name, "Invalid month name") def get_today_formatted(): # Get today's date today = datetime.today() # Format it to 'YYYY-MM-DD' formatted_date = today.strftime('%Y-%m-%d') return formatted_date def clean_currency_string(currency_str): no_dollar = currency_str.replace('$', '') no_comma = no_dollar.replace(',', '') return no_comma def clean_opening_string(opening_str): cleaned_str = opening_str.replace('\n', ' ') parts = cleaned_str.split(' ') currency_str = clean_currency_string(parts[0]) screens_str = parts[1].replace(',', '') return { 'gross': int(currency_str), 'screens': int(screens_str) } def clean_release_date_string(input_str): # Define a helper function to convert a date string to the desired format def format_date(date_str): date_obj = datetime.strptime(date_str.strip(), '%b %d, %Y') return date_obj.strftime('%Y-%m-%d') # Case 1: 'Dec 1, 2023' if ' - ' not in input_str and '(' not in input_str: return format_date(input_str) # Case 2: 'Dec 1, 2023 - Dec 12, 2023' if ' - ' in input_str: first_date = input_str.split(' - ')[0] return format_date(first_date) # Case 3: 'Dec 1, 2023 (Dec 12, 2023)' if '(' in input_str: first_date = input_str.split('(')[0] return format_date(first_date) return None def clean_running_time_string(running_time_str) -> int: if 'hr' in running_time_str and 'min' in running_time_str: parts = running_time_str.split(' ') hours = int(parts[0]) minutes = int(parts[2]) return hours * 60 + minutes elif 'hr' in running_time_str: return int(running_time_str.split(' ')[0]) * 60 else: return int(running_time_str.split(' ')[0]) def clean_imdb_id_string(imdb_id_str): return imdb_id_str.split('/')[4] def clean_genres_string(genres_str): return genres_str.split(' ') def write_to_csv(movie_data_list, filename='movies_data.csv'): # Define the fieldnames for the CSV file fieldnames = ['tt_id', 'movie_name', 'domestic_box_office', 'budget', 'month', 'year', 'opening_week', 'screens', 'genres', 'mpaa', 'runtime'] # Write the data to the CSV file with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # Write the header writer.writeheader() # Write each movie's data for movie in movie_data_list: writer.writerow(movie) def delta_months_before(delta): current_date = datetime.now() delta_months_ago = current_date - timedelta(days=30*delta) month_name = delta_months_ago.strftime("%B").lower() year = delta_months_ago.year return month_name, year def get_movies_list_url(month, year): return f'https://boxofficemojo.com/month/{month}/{year}/?grossesOption=totalGrosses' # === CRAWL FUNCTION === def crawl_movies_list_data(): result_list = [] month, year = delta_months_before(DELTA) url = get_movies_list_url(month, year) options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(options=options) driver.get(url) movies_list_len = driver.find_element(By.XPATH, '//*[@id="table"]/div/table[2]/tbody/tr[last()]/td[1]').text movies_list_len = int(movies_list_len) for i in range (2, movies_list_len + 2): movie_url_href = driver.find_element(By.XPATH, f'//*[@id="table"]/div/table[2]/tbody/tr[{i}]/td[2]/a').get_attribute('href') result_list.append(movie_url_href) driver.quit() return result_list def crawl_movie_data(url): options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(options=options) driver.get(url) month, year = delta_months_before(DELTA) movie = { 'tt_id': None, 'movie_name': None, 'domestic_box_office': None, 'budget': None, 'year': year, 'month': month_name_to_number(month), 'opening_week': None, 'screens': None, 'genres': None, 'mpaa': None, 'runtime': None } TITLE_XPATH = '//*[@id="a-page"]/main/div/div[1]/div[1]/div/div/div[2]/h1' IMDB_ID_XPATH = '//*[@id="title-summary-refiner"]/a' DOMESTIC_GROSS_XPATH = '//*[@id="a-page"]/main/div/div[3]/div[1]/div/div[1]/span[2]/span' PROPERTIES_NUM_XPATH = '//*[@id="a-page"]/main/div/div[3]/div[4]/div' title = driver.find_element(By.XPATH, TITLE_XPATH).text imdb_id = driver.find_element(By.XPATH, IMDB_ID_XPATH).get_attribute('href') domestic_gross = driver.find_element(By.XPATH, DOMESTIC_GROSS_XPATH).text properties = driver.find_elements(By.XPATH, PROPERTIES_NUM_XPATH) properties_count = len(properties) movie['movie_name'] = title movie['tt_id'] = clean_imdb_id_string(imdb_id) movie['domestic_box_office'] = clean_currency_string(domestic_gross) for i in range(2, properties_count - 1): property_name = driver.find_element(By.XPATH, f'//*[@id="a-page"]/main/div/div[3]/div[4]/div[{i}]/span[1]').text property_value = driver.find_element(By.XPATH, f'//*[@id="a-page"]/main/div/div[3]/div[4]/div[{i}]/span[2]').text if 'Running Time' in property_name: movie['runtime'] = clean_running_time_string(property_value) if 'Opening' in property_name: opening_data = clean_opening_string(property_value) movie['opening_week'] = opening_data['gross'] movie['screens'] = opening_data['screens'] if 'Budget' in property_name: movie['budget'] = clean_currency_string(property_value) if 'MPAA' in property_name: movie['mpaa'] = property_value if 'Genres' in property_name: movie['genres'] = property_value driver.quit() return movie if __name__ == "__main__": month, year = delta_months_before(DELTA) print(f'Start updater from boxofficemojo.com in {month} ,{year}') # Define ANSI escape codes for background colors RED_BG = "\033[41m" GREEN_BG_BLACK_TEXT_BOLD = "\033[42;30;1m" YELLOW_BG_BOLD = "\033[43;1m" RESET = "\033[0m" # Define ANSI escape codes for text colors WHITE_TEXT = "\033[97m" GREEN_TEXT = "\033[92m" YELLOW_TEXT = '\033[33m' RESET_TEXT = "\033[0m" start_time = time.time() movies_url_list_start = time.time() movies_url_list = crawl_movies_list_data() movies_url_list_end = time.time() movies_url_list_time_cost = movies_url_list_end - movies_url_list_start print(f"{GREEN_BG_BLACK_TEXT_BOLD}CRAWL MOVIES LIST DATA{RESET} Total time cost: {YELLOW_TEXT}{movies_url_list_time_cost:.2f}s{RESET_TEXT}") movie_data_list = [] movies_list_data_time_start = time.time() print('Start crawling movie data') for movie_url in movies_url_list: movie_data_start = time.time() movie_data = crawl_movie_data(movie_url) movie_data_list.append(movie_data) movie_data_end = time.time() movie_data_time_cost = movie_data_end - movie_data_start print(f" MOVIE DATA (Cost: {YELLOW_TEXT}{movie_data_time_cost:.2f}s{RESET_TEXT}) Title: {movie_data['movie_name']}") movies_list_data_time_end = time.time() movies_list_data_time_cost = movies_list_data_time_end - movies_list_data_time_start movies_list_data_time_average = movies_list_data_time_cost / len(movies_url_list) print(f"{GREEN_BG_BLACK_TEXT_BOLD}CRAWL MOVIE DATA{RESET} Total time cost: {YELLOW_TEXT}{movies_list_data_time_cost:.2f}s{RESET_TEXT}, average time cost: {YELLOW_TEXT}{movies_list_data_time_average:.2f}s{RESET_TEXT}") write_to_csv(movie_data_list) end_time = time.time() time_cost = end_time - start_time print(f"\nTotal time cost : {GREEN_TEXT}{time_cost:.2f}s{RESET_TEXT}")