Spaces:
No application file
No application file
| # -*- coding: utf-8 -*- | |
| # Import necessary libraries | |
| import os | |
| import random | |
| import gdown | |
| import tensorflow as tf | |
| from tensorflow.keras import layers, Model, metrics, optimizers | |
| from tensorflow.keras.applications import resnet | |
| import matplotlib.pyplot as plt | |
| from tensorflow.keras.callbacks import EarlyStopping | |
| # URL for the dataset and directory for downloading | |
| data_url = "https://drive.google.com/file/d/13hSwP2O4pd3NVVnWj2Fcah_r-jkf8uiv/view?usp=drive_link" | |
| download_dir = "./classification_data/" | |
| # Define the target shape for image resizing | |
| image_shape = (200, 200) | |
| # Function to load and process a single image | |
| def load_and_process_image(file_path): | |
| image_data = tf.io.read_file(file_path) | |
| image = tf.image.decode_jpeg(image_data, channels=3) | |
| image = tf.image.convert_image_dtype(image, tf.float32) | |
| return tf.image.resize(image, image_shape) | |
| # Function to process image triplets (anchor, positive, negative) | |
| def process_triplets(anchor, pos, neg): | |
| return (load_and_process_image(anchor), | |
| load_and_process_image(pos), | |
| load_and_process_image(neg)) | |
| # Function to create a dataset from a directory | |
| def create_dataset(directory): | |
| anchor_list, positive_list, negative_list = [], [], [] | |
| for category in os.listdir(directory): | |
| category_path = os.path.join(directory, category) | |
| images = os.listdir(category_path) | |
| # Loop through each image, creating triplets | |
| for anchor_img in images: | |
| anchor_img_path = os.path.join(category_path, anchor_img) | |
| anchor_list.append(anchor_img_path) | |
| positive_img = random.choice(images) | |
| positive_list.append(os.path.join(category_path, positive_img)) | |
| different_category = random.choice([c for c in os.listdir(directory) if c != category]) | |
| negative_img = random.choice(os.listdir(os.path.join(directory, different_category))) | |
| negative_list.append(os.path.join(directory, different_category, negative_img)) | |
| # Create and return the dataset | |
| dataset = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(anchor_list), | |
| tf.data.Dataset.from_tensor_slices(positive_list), | |
| tf.data.Dataset.from_tensor_slices(negative_list))) | |
| dataset = dataset.shuffle(1024).map(process_triplets).batch(64).prefetch(8) | |
| return dataset | |
| # Create training and validation datasets | |
| train_data = create_dataset(download_dir + "train_data/") | |
| val_data = create_dataset(download_dir + "val_data/") | |
| # Build the base CNN model using ResNet50 | |
| base_model = resnet.ResNet50(weights="imagenet", input_shape=image_shape + (3,), include_top=False) | |
| flattened_output = layers.Flatten()(base_model.output) | |
| dense_layer1 = layers.Dense(512, activation="relu")(flattened_output) | |
| normalized1 = layers.BatchNormalization()(dense_layer1) | |
| dense_layer2 = layers.Dense(256, activation="relu")(normalized1) | |
| normalized2 = layers.BatchNormalization()(dense_layer2) | |
| final_output = layers.Dense(256)(normalized2) | |
| embedding_model = Model(inputs=base_model.input, outputs=final_output, name="Image_Embedding") | |
| # Make specific layers trainable | |
| for layer in base_model.layers: | |
| layer.trainable = layer.name >= "conv5_block1_out" | |
| # Define a custom Distance Layer for the Siamese Network | |
| class DistanceLayer(layers.Layer): | |
| def call(self, anchor_embedding, positive_embedding, negative_embedding): | |
| distance_pos = tf.reduce_sum(tf.square(anchor_embedding - positive_embedding), -1) | |
| distance_neg = tf.reduce_sum(tf.square(anchor_embedding - negative_embedding), -1) | |
| return distance_pos, distance_neg | |
| # Inputs for the Siamese Network | |
| anchor_input = layers.Input(name="anchor_input", shape=image_shape + (3,)) | |
| positive_input = layers.Input(name="positive_input", shape=image_shape + (3,)) | |
| negative_input = layers.Input(name="negative_input", shape=image_shape + (3,)) | |
| # Build the Siamese Network | |
| siamese_network = Model(inputs=[anchor_input, positive_input, negative_input], | |
| outputs=DistanceLayer()( | |
| embedding_model(resnet.preprocess_input(anchor_input)), | |
| embedding_model(resnet.preprocess_input(positive_input)), | |
| embedding_model(resnet.preprocess_input(negative_input)) | |
| )) | |
| # Custom Siamese Model class | |
| class CustomSiameseModel(Model): | |
| def __init__(self, network, margin=0.5): | |
| super().__init__() | |
| self.network = network | |
| self.margin = margin | |
| self.loss_metric = metrics.Mean(name="loss") | |
| self.accuracy_metric = metrics.Mean(name="accuracy") | |
| def call(self, inputs): | |
| return self.network(inputs) | |
| def train_step(self, data): | |
| with tf.GradientTape() as tape: | |
| loss, accuracy = self.compute_loss_and_accuracy(data) | |
| gradients = tape.gradient(loss, self.network.trainable_weights) | |
| self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights)) | |
| self.loss_metric.update_state(loss) | |
| self.accuracy_metric.update_state(accuracy) | |
| return {"loss": self.loss_metric.result(), "accuracy": self.accuracy_metric.result()} | |
| def test_step(self, data): | |
| loss, accuracy = self.compute_loss_and_accuracy(data) | |
| self.loss_metric.update_state(loss) | |
| self.accuracy_metric.update_state(accuracy) | |
| return {"loss": self.loss_metric.result(), "accuracy": self.accuracy_metric.result()} | |
| def compute_loss_and_accuracy(self, data): | |
| ap_dist, an_dist = self.network(data) | |
| loss = tf.maximum(ap_dist - an_dist + self.margin, 0.0) | |
| # Calculate accuracy as top-1 accuracy | |
| accuracy = tf.reduce_mean(tf.cast(tf.less(ap_dist, an_dist), tf.float32)) | |
| return loss, accuracy | |
| def metrics(self): | |
| return [self.loss_metric, self.accuracy_metric] | |
| # Compile and train the Siamese model | |
| siamese_model = CustomSiameseModel(siamese_network) | |
| siamese_model.compile(optimizer=optimizers.Adadelta()) | |
| # Callback for early stopping | |
| early_stopping_callback = EarlyStopping(monitor='val_accuracy', patience=3, restore_best_weights=True) | |
| # Training the model | |
| history = siamese_model.fit(train_data, epochs=30, validation_data=val_data) | |
| # Save model weights | |
| embedding_model.save_weights("siamese_model_weights.h5") | |
| # Plot training history (loss and accuracy) | |
| plt.figure(figsize=(12, 4)) | |
| plt.subplot(1, 2, 1) | |
| plt.plot(history.history['loss'], label='Train Loss') | |
| plt.plot(history.history['val_loss'], label='Validation Loss') | |
| plt.title('Loss Over Epochs') | |
| plt.legend() | |
| plt.subplot(1, 2, 2) | |
| plt.plot(history.history['accuracy'], label='Train Accuracy') | |
| plt.plot(history.history['val_accuracy'], label='Validation Accuracy') | |
| plt.title('Accuracy Over Epochs') | |
| plt.legend() | |
| plt.show() | |
| # Compute and display cosine similarity | |
| cosine_similarity = metrics.CosineSimilarity() | |
| sample = next(iter(train_data)) | |
| anchor, positive, negative = sample | |
| anchor_embedding, positive_embedding, negative_embedding = ( | |
| embedding_model(resnet.preprocess_input(anchor)), | |
| embedding_model(resnet.preprocess_input(positive)), | |
| embedding_model(resnet.preprocess_input(negative)), | |
| ) | |
| positive_similarity = cosine_similarity(anchor_embedding, positive_embedding).numpy() | |
| negative_similarity = cosine_similarity(anchor_embedding, negative_embedding).numpy() | |
| print("Positive similarity:", positive_similarity) | |
| print("Negative similarity", negative_similarity) | |
| # Compute ROC and AUC | |
| actual_labels = [1] * positive_similarity.size + [0] * negative_similarity.size | |
| import numpy as np | |
| predicted_scores = np.vstack([positive_similarity, negative_similarity]).squeeze() | |
| from sklearn.metrics import roc_curve, auc | |
| fpr, tpr, thresholds = roc_curve(actual_labels, predicted_scores) | |
| roc_auc = auc(fpr, tpr) | |
| # Plot ROC Curve | |
| plt.figure(figsize=(8, 6)) | |
| plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc) | |
| plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') | |
| plt.xlim([0.0, 1.0]) | |
| plt.ylim([0.0, 1.05]) | |
| plt.xlabel('False Positive Rate') | |
| plt.ylabel('True Positive Rate') | |
| plt.title('Receiver Operating Characteristic') | |
| plt.legend(loc="lower right") | |
| plt.show() | |
| # Save the Siamese model as a TensorFlow SavedModel | |
| embedding_model.save("embedding_model.h5") | |