| | import csv |
| | from tempfile import NamedTemporaryFile |
| | import shutil |
| | import pandas as pd |
| | import source_code as sc |
| | from suggestion import suggest_email_domain |
| | import whois |
| | from popular_domains import emailDomains |
| | import streamlit as st |
| | from streamlit_extras.metric_cards import style_metric_cards |
| |
|
| | st.set_page_config( |
| | page_title="Email verification", |
| | page_icon="✅", |
| | layout="centered", |
| | ) |
| |
|
| | def label_email(email): |
| | if not sc.is_valid_email(email): |
| | return "Invalid" |
| | if not sc.has_valid_mx_record(email.split('@')[1]): |
| | return "Invalid" |
| | if not sc.verify_email(email): |
| | return "Unknown" |
| | if sc.is_disposable(email.split('@')[1]): |
| | return "Risky" |
| | return "Valid" |
| |
|
| | def label_emails(input_file): |
| | file_extension = input_file.name.split('.')[-1].lower() |
| |
|
| | if file_extension == 'csv': |
| | df = process_csv(input_file) |
| | elif file_extension == 'xlsx': |
| | df = process_xlsx(input_file) |
| | elif file_extension == 'txt': |
| | df = process_txt(input_file) |
| | else: |
| | st.warning("Unsupported file format. Please provide a CSV, XLSX, or TXT file.") |
| |
|
| |
|
| | def process_csv(input_file): |
| | |
| | if input_file: |
| | if isinstance(input_file, str): |
| | df = pd.read_csv(input_file, header=None) |
| | else: |
| | df = pd.read_csv(input_file, header=None) |
| | |
| | |
| | results = [] |
| |
|
| | |
| | for index, row in df.iterrows(): |
| | email = row[0].strip() |
| | label = label_email(email) |
| | results.append([email, label]) |
| |
|
| | |
| | result_df = pd.DataFrame(results, columns=['Email', 'Label']) |
| | result_df.index = range(1, len(result_df) + 1) |
| | return result_df |
| | else: |
| | return pd.DataFrame(columns=['Email', 'Label']) |
| |
|
| | def process_xlsx(input_file): |
| | df = pd.read_excel(input_file, header=None) |
| | results = [] |
| |
|
| | for index, row in df.iterrows(): |
| | email = row[0].strip() |
| | label = label_email(email) |
| | results.append([email, label]) |
| |
|
| | result_df = pd.DataFrame(results, columns=['Email', 'Label']) |
| | result_df.index = range(1, len(result_df) + 1) |
| | |
| | |
| | st.dataframe(result_df) |
| |
|
| |
|
| | def process_txt(input_file): |
| | input_text = input_file.read().decode("utf-8").splitlines() |
| |
|
| | |
| | results = [] |
| |
|
| | for line in input_text: |
| | email = line.strip() |
| | label = label_email(email) |
| | results.append([email, label]) |
| |
|
| | |
| | result_df = pd.DataFrame(results, columns=['Email', 'Label']) |
| | result_df.index = range(1, len(result_df) + 1) |
| |
|
| | |
| | st.dataframe(result_df) |
| |
|
| | def main(): |
| | with open('style.css') as f: |
| | st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True) |
| | |
| | st.title("Email Verification Tool", help="This tool verifies the validity of an email address.") |
| | st.info("The result may not be accurate. However, it has 90% accuracy.") |
| |
|
| | t1, t2= st.tabs(["Single Email", "Bulk Email Processing"]) |
| |
|
| | with t1: |
| | |
| |
|
| | email = st.text_input("Enter an email address:") |
| | |
| | if st.button("Verify"): |
| | with st.spinner('Verifying...'): |
| | result = {} |
| |
|
| | |
| | result['syntaxValidation'] = sc.is_valid_email(email) |
| |
|
| | if result['syntaxValidation']: |
| | domain_part = email.split('@')[1] if '@' in email else '' |
| |
|
| | if not domain_part: |
| | st.error("Invalid email format. Please enter a valid email address.") |
| | else: |
| | |
| | if not sc.has_valid_mx_record(domain_part): |
| | st.warning("Not valid: MX record not found.") |
| | suggested_domains = suggest_email_domain(domain_part, emailDomains) |
| | if suggested_domains: |
| | st.info("Suggested Domains:") |
| | for suggested_domain in suggested_domains: |
| | st.write(suggested_domain) |
| | else: |
| | st.warning("No suggested domains found.") |
| | else: |
| | |
| | result['MXRecord'] = sc.has_valid_mx_record(domain_part) |
| |
|
| | |
| | if result['MXRecord']: |
| | result['smtpConnection'] = sc.verify_email(email) |
| | else: |
| | result['smtpConnection'] = False |
| |
|
| | |
| | result['is Temporary'] = sc.is_disposable(domain_part) |
| |
|
| | |
| | is_valid = ( |
| | result['syntaxValidation'] |
| | and result['MXRecord'] |
| | and result['smtpConnection'] |
| | and not result['is Temporary'] |
| | ) |
| |
|
| | st.markdown("**Result:**") |
| |
|
| | |
| | col1, col2, col3 = st.columns(3) |
| | col1.metric(label="Syntax", value=result['syntaxValidation']) |
| | col2.metric(label="MxRecord", value=result['MXRecord']) |
| | col3.metric(label="Is Temporary", value=result['is Temporary']) |
| | |
| | |
| | |
| | if not result['smtpConnection']: |
| | st.warning("SMTP connection not established.") |
| | |
| | |
| | with st.expander("See Domain Information"): |
| | try: |
| | dm_info = whois.whois(domain_part) |
| | st.write("Registrar:", dm_info.registrar) |
| | st.write("Server:", dm_info.whois_server) |
| | st.write("Country:", dm_info.country) |
| | except: |
| | st.error("Domain information retrieval failed.") |
| | |
| | |
| | if is_valid: |
| | st.success(f"{email} is a Valid email") |
| | else: |
| | st.error(f"{email} is a Invalid email") |
| | if result['is Temporary']: |
| | st.text("It is a disposable email") |
| |
|
| | with t2: |
| | |
| | st.header("Bulk Email Processing") |
| | input_file = st.file_uploader("Upload a CSV, XLSX, or TXT file", type=["csv", "xlsx", "txt"]) |
| | if input_file: |
| | st.write("Processing...") |
| | if input_file.type == 'text/plain': |
| | process_txt(input_file) |
| | else: |
| | df = process_csv(input_file) |
| | st.success("Processing completed. Displaying results:") |
| | st.dataframe(df) |
| |
|
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|