QuanTH02's picture
feat: dir structure
e964b12
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from datetime import datetime, timedelta
import csv
import time
DELTA = 5
# === UTILS FUNCTION ===
def month_name_to_number(month_name):
# Dictionary mapping month names to their corresponding numbers
month_numbers = {
'january': 1,
'february': 2,
'march': 3,
'april': 4,
'may': 5,
'june': 6,
'july': 7,
'august': 8,
'september': 9,
'october': 10,
'november': 11,
'december': 12
}
month_name = month_name.lower()
return month_numbers.get(month_name, "Invalid month name")
def get_today_formatted():
# Get today's date
today = datetime.today()
# Format it to 'YYYY-MM-DD'
formatted_date = today.strftime('%Y-%m-%d')
return formatted_date
def clean_currency_string(currency_str):
no_dollar = currency_str.replace('$', '')
no_comma = no_dollar.replace(',', '')
return no_comma
def clean_opening_string(opening_str):
cleaned_str = opening_str.replace('\n', ' ')
parts = cleaned_str.split(' ')
currency_str = clean_currency_string(parts[0])
screens_str = parts[1].replace(',', '')
return {
'gross': int(currency_str),
'screens': int(screens_str)
}
def clean_release_date_string(input_str):
# Define a helper function to convert a date string to the desired format
def format_date(date_str):
date_obj = datetime.strptime(date_str.strip(), '%b %d, %Y')
return date_obj.strftime('%Y-%m-%d')
# Case 1: 'Dec 1, 2023'
if ' - ' not in input_str and '(' not in input_str:
return format_date(input_str)
# Case 2: 'Dec 1, 2023 - Dec 12, 2023'
if ' - ' in input_str:
first_date = input_str.split(' - ')[0]
return format_date(first_date)
# Case 3: 'Dec 1, 2023 (Dec 12, 2023)'
if '(' in input_str:
first_date = input_str.split('(')[0]
return format_date(first_date)
return None
def clean_running_time_string(running_time_str) -> int:
if 'hr' in running_time_str and 'min' in running_time_str:
parts = running_time_str.split(' ')
hours = int(parts[0])
minutes = int(parts[2])
return hours * 60 + minutes
elif 'hr' in running_time_str:
return int(running_time_str.split(' ')[0]) * 60
else:
return int(running_time_str.split(' ')[0])
def clean_imdb_id_string(imdb_id_str):
return imdb_id_str.split('/')[4]
def clean_genres_string(genres_str):
return genres_str.split(' ')
def write_to_csv(movie_data_list, filename='movies_data.csv'):
# Define the fieldnames for the CSV file
fieldnames = ['tt_id', 'movie_name', 'domestic_box_office', 'budget', 'month', 'year', 'opening_week', 'screens', 'genres', 'mpaa', 'runtime']
# Write the data to the CSV file
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write the header
writer.writeheader()
# Write each movie's data
for movie in movie_data_list:
writer.writerow(movie)
def delta_months_before(delta):
current_date = datetime.now()
delta_months_ago = current_date - timedelta(days=30*delta)
month_name = delta_months_ago.strftime("%B").lower()
year = delta_months_ago.year
return month_name, year
def get_movies_list_url(month, year):
return f'https://boxofficemojo.com/month/{month}/{year}/?grossesOption=totalGrosses'
# === CRAWL FUNCTION ===
def crawl_movies_list_data():
result_list = []
month, year = delta_months_before(DELTA)
url = get_movies_list_url(month, year)
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=options)
driver.get(url)
movies_list_len = driver.find_element(By.XPATH, '//*[@id="table"]/div/table[2]/tbody/tr[last()]/td[1]').text
movies_list_len = int(movies_list_len)
for i in range (2, movies_list_len + 2):
movie_url_href = driver.find_element(By.XPATH, f'//*[@id="table"]/div/table[2]/tbody/tr[{i}]/td[2]/a').get_attribute('href')
result_list.append(movie_url_href)
driver.quit()
return result_list
def crawl_movie_data(url):
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(options=options)
driver.get(url)
month, year = delta_months_before(DELTA)
movie = {
'tt_id': None,
'movie_name': None,
'domestic_box_office': None,
'budget': None,
'year': year,
'month': month_name_to_number(month),
'opening_week': None,
'screens': None,
'genres': None,
'mpaa': None,
'runtime': None
}
TITLE_XPATH = '//*[@id="a-page"]/main/div/div[1]/div[1]/div/div/div[2]/h1'
IMDB_ID_XPATH = '//*[@id="title-summary-refiner"]/a'
DOMESTIC_GROSS_XPATH = '//*[@id="a-page"]/main/div/div[3]/div[1]/div/div[1]/span[2]/span'
PROPERTIES_NUM_XPATH = '//*[@id="a-page"]/main/div/div[3]/div[4]/div'
title = driver.find_element(By.XPATH, TITLE_XPATH).text
imdb_id = driver.find_element(By.XPATH, IMDB_ID_XPATH).get_attribute('href')
domestic_gross = driver.find_element(By.XPATH, DOMESTIC_GROSS_XPATH).text
properties = driver.find_elements(By.XPATH, PROPERTIES_NUM_XPATH)
properties_count = len(properties)
movie['movie_name'] = title
movie['tt_id'] = clean_imdb_id_string(imdb_id)
movie['domestic_box_office'] = clean_currency_string(domestic_gross)
for i in range(2, properties_count - 1):
property_name = driver.find_element(By.XPATH, f'//*[@id="a-page"]/main/div/div[3]/div[4]/div[{i}]/span[1]').text
property_value = driver.find_element(By.XPATH, f'//*[@id="a-page"]/main/div/div[3]/div[4]/div[{i}]/span[2]').text
if 'Running Time' in property_name:
movie['runtime'] = clean_running_time_string(property_value)
if 'Opening' in property_name:
opening_data = clean_opening_string(property_value)
movie['opening_week'] = opening_data['gross']
movie['screens'] = opening_data['screens']
if 'Budget' in property_name:
movie['budget'] = clean_currency_string(property_value)
if 'MPAA' in property_name:
movie['mpaa'] = property_value
if 'Genres' in property_name:
movie['genres'] = property_value
driver.quit()
return movie
if __name__ == "__main__":
month, year = delta_months_before(DELTA)
print(f'Start updater from boxofficemojo.com in {month} ,{year}')
# Define ANSI escape codes for background colors
RED_BG = "\033[41m"
GREEN_BG_BLACK_TEXT_BOLD = "\033[42;30;1m"
YELLOW_BG_BOLD = "\033[43;1m"
RESET = "\033[0m"
# Define ANSI escape codes for text colors
WHITE_TEXT = "\033[97m"
GREEN_TEXT = "\033[92m"
YELLOW_TEXT = '\033[33m'
RESET_TEXT = "\033[0m"
start_time = time.time()
movies_url_list_start = time.time()
movies_url_list = crawl_movies_list_data()
movies_url_list_end = time.time()
movies_url_list_time_cost = movies_url_list_end - movies_url_list_start
print(f"{GREEN_BG_BLACK_TEXT_BOLD}CRAWL MOVIES LIST DATA{RESET} Total time cost: {YELLOW_TEXT}{movies_url_list_time_cost:.2f}s{RESET_TEXT}")
movie_data_list = []
movies_list_data_time_start = time.time()
print('Start crawling movie data')
for movie_url in movies_url_list:
movie_data_start = time.time()
movie_data = crawl_movie_data(movie_url)
movie_data_list.append(movie_data)
movie_data_end = time.time()
movie_data_time_cost = movie_data_end - movie_data_start
print(f" MOVIE DATA (Cost: {YELLOW_TEXT}{movie_data_time_cost:.2f}s{RESET_TEXT}) Title: {movie_data['movie_name']}")
movies_list_data_time_end = time.time()
movies_list_data_time_cost = movies_list_data_time_end - movies_list_data_time_start
movies_list_data_time_average = movies_list_data_time_cost / len(movies_url_list)
print(f"{GREEN_BG_BLACK_TEXT_BOLD}CRAWL MOVIE DATA{RESET} Total time cost: {YELLOW_TEXT}{movies_list_data_time_cost:.2f}s{RESET_TEXT}, average time cost: {YELLOW_TEXT}{movies_list_data_time_average:.2f}s{RESET_TEXT}")
write_to_csv(movie_data_list)
end_time = time.time()
time_cost = end_time - start_time
print(f"\nTotal time cost : {GREEN_TEXT}{time_cost:.2f}s{RESET_TEXT}")