File size: 4,104 Bytes
0cb14f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9c720d9
0cb14f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
import subprocess
import hashlib
import time
import pandas as pd

def load_data(file):
    """
    Loads data from an uploaded file (CSV or Excel) or a disk path.
    """
    is_path = isinstance(file, str)
    filename = file if is_path else file.name
    
    if filename.endswith('.csv'):
        return pd.read_csv(file)
    elif filename.endswith(('.xls', '.xlsx')):
        return pd.read_excel(file)
    else:
        raise ValueError("Unsupported file format. Please use CSV or Excel.")

def get_data_summary(df):
    """
    Returns a summary of the dataframe.
    """
    summary = {
        "rows": df.shape[0],
        "columns": df.shape[1],
        "column_names": df.columns.tolist(),
        "missing_values": df.isnull().sum().to_dict(),
        "dtypes": df.dtypes.astype(str).to_dict()
    }
    return summary

def init_dvc():
    """
    Initializes a DVC repository in the current directory if it doesn't exist.
    """
    if not os.path.exists(".dvc"):
        try:
            subprocess.run(["dvc", "init"], check=True, capture_output=True)
            print("DVC repository initialized successfully.")
        except subprocess.CalledProcessError as e:
            print(f"Failed to initialize DVC: {e}")
        except FileNotFoundError:
            print("DVC is not installed or not in PATH.")

def save_to_data_lake(df, filename_prefix="dataset"):
    """
    Saves a DataFrame to the local data lake, tracks it with DVC, and returns its metadata hash.
    """
    data_lake_dir = os.path.join("data_lake", "raw")
    os.makedirs(data_lake_dir, exist_ok=True)
    
    # Generate unique filename based on time
    timestamp = int(time.time())
    file_path = os.path.join(data_lake_dir, f"{filename_prefix}_{timestamp}.csv")
    
    # Save the dataframe
    df.to_csv(file_path, index=False)
    
    # Add to DVC
    dvc_hash = "unknown_hash"
    try:
        init_dvc() # Ensure DVC is initialized
        subprocess.run(["dvc", "add", file_path], check=True, capture_output=True)
        # Assuming dvc add creates a .dvc file, we can potentially read it or just use the filename hash as a proxy
        dvc_file_path = file_path + ".dvc"
        if os.path.exists(dvc_file_path):
            with open(dvc_file_path, "r") as f:
                content = f.read()
                # Simple extraction of md5 from the dvc file if available
                import re
                match = re.search(r'md5:\s*([a-fA-F0-9]+)', content)
                if match:
                    dvc_hash = match.group(1)
    except Exception as e:
        print(f"DVC error: {e}")
        # Fallback to computing standard MD5 if DVC fails
        with open(file_path, "rb") as f:
            dvc_hash = hashlib.md5(f.read()).hexdigest()
            
    return file_path, dvc_hash, dvc_hash[:8]

def get_data_lake_files():
    """
    Retrieves all available datasets in the data lake.
    """
    data_lake_dir = os.path.join("data_lake", "raw")
    if not os.path.exists(data_lake_dir):
        return []
    
    files = []
    for f in os.listdir(data_lake_dir):
        if f.endswith(('.csv', '.xls', '.xlsx')):
            files.append(os.path.join(data_lake_dir, f))
    
    # Sort files by creation time descending (newest first)
    files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
    return files

def get_dvc_hash(file_path):
    """
    Extracts the DVC hash corresponding to a specific file.
    """
    dvc_hash = "unknown_hash"
    dvc_file_path = file_path + ".dvc"
    if os.path.exists(dvc_file_path):
        with open(dvc_file_path, "r") as f:
            content = f.read()
            import re
            match = re.search(r'md5:\s*([a-fA-F0-9]+)', content)
            if match:
                dvc_hash = match.group(1)
                return dvc_hash, dvc_hash[:8]
                
    # Fallback to computing MD5
    try:
        if os.path.exists(file_path):
            with open(file_path, "rb") as f:
                dvc_hash = hashlib.md5(f.read()).hexdigest()
    except:
        pass
        
    return dvc_hash, dvc_hash[:8]