Spaces:
No application file
No application file
File size: 8,334 Bytes
c2bcda5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | # -*- coding: utf-8 -*-
# Import necessary libraries
import os
import random
import gdown
import tensorflow as tf
from tensorflow.keras import layers, Model, metrics, optimizers
from tensorflow.keras.applications import resnet
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping
# URL for the dataset and directory for downloading
data_url = "https://drive.google.com/file/d/13hSwP2O4pd3NVVnWj2Fcah_r-jkf8uiv/view?usp=drive_link"
download_dir = "./classification_data/"
# Define the target shape for image resizing
image_shape = (200, 200)
# Function to load and process a single image
def load_and_process_image(file_path):
image_data = tf.io.read_file(file_path)
image = tf.image.decode_jpeg(image_data, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
return tf.image.resize(image, image_shape)
# Function to process image triplets (anchor, positive, negative)
def process_triplets(anchor, pos, neg):
return (load_and_process_image(anchor),
load_and_process_image(pos),
load_and_process_image(neg))
# Function to create a dataset from a directory
def create_dataset(directory):
anchor_list, positive_list, negative_list = [], [], []
for category in os.listdir(directory):
category_path = os.path.join(directory, category)
images = os.listdir(category_path)
# Loop through each image, creating triplets
for anchor_img in images:
anchor_img_path = os.path.join(category_path, anchor_img)
anchor_list.append(anchor_img_path)
positive_img = random.choice(images)
positive_list.append(os.path.join(category_path, positive_img))
different_category = random.choice([c for c in os.listdir(directory) if c != category])
negative_img = random.choice(os.listdir(os.path.join(directory, different_category)))
negative_list.append(os.path.join(directory, different_category, negative_img))
# Create and return the dataset
dataset = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(anchor_list),
tf.data.Dataset.from_tensor_slices(positive_list),
tf.data.Dataset.from_tensor_slices(negative_list)))
dataset = dataset.shuffle(1024).map(process_triplets).batch(64).prefetch(8)
return dataset
# Create training and validation datasets
train_data = create_dataset(download_dir + "train_data/")
val_data = create_dataset(download_dir + "val_data/")
# Build the base CNN model using ResNet50
base_model = resnet.ResNet50(weights="imagenet", input_shape=image_shape + (3,), include_top=False)
flattened_output = layers.Flatten()(base_model.output)
dense_layer1 = layers.Dense(512, activation="relu")(flattened_output)
normalized1 = layers.BatchNormalization()(dense_layer1)
dense_layer2 = layers.Dense(256, activation="relu")(normalized1)
normalized2 = layers.BatchNormalization()(dense_layer2)
final_output = layers.Dense(256)(normalized2)
embedding_model = Model(inputs=base_model.input, outputs=final_output, name="Image_Embedding")
# Make specific layers trainable
for layer in base_model.layers:
layer.trainable = layer.name >= "conv5_block1_out"
# Define a custom Distance Layer for the Siamese Network
class DistanceLayer(layers.Layer):
def call(self, anchor_embedding, positive_embedding, negative_embedding):
distance_pos = tf.reduce_sum(tf.square(anchor_embedding - positive_embedding), -1)
distance_neg = tf.reduce_sum(tf.square(anchor_embedding - negative_embedding), -1)
return distance_pos, distance_neg
# Inputs for the Siamese Network
anchor_input = layers.Input(name="anchor_input", shape=image_shape + (3,))
positive_input = layers.Input(name="positive_input", shape=image_shape + (3,))
negative_input = layers.Input(name="negative_input", shape=image_shape + (3,))
# Build the Siamese Network
siamese_network = Model(inputs=[anchor_input, positive_input, negative_input],
outputs=DistanceLayer()(
embedding_model(resnet.preprocess_input(anchor_input)),
embedding_model(resnet.preprocess_input(positive_input)),
embedding_model(resnet.preprocess_input(negative_input))
))
# Custom Siamese Model class
class CustomSiameseModel(Model):
def __init__(self, network, margin=0.5):
super().__init__()
self.network = network
self.margin = margin
self.loss_metric = metrics.Mean(name="loss")
self.accuracy_metric = metrics.Mean(name="accuracy")
def call(self, inputs):
return self.network(inputs)
def train_step(self, data):
with tf.GradientTape() as tape:
loss, accuracy = self.compute_loss_and_accuracy(data)
gradients = tape.gradient(loss, self.network.trainable_weights)
self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))
self.loss_metric.update_state(loss)
self.accuracy_metric.update_state(accuracy)
return {"loss": self.loss_metric.result(), "accuracy": self.accuracy_metric.result()}
def test_step(self, data):
loss, accuracy = self.compute_loss_and_accuracy(data)
self.loss_metric.update_state(loss)
self.accuracy_metric.update_state(accuracy)
return {"loss": self.loss_metric.result(), "accuracy": self.accuracy_metric.result()}
def compute_loss_and_accuracy(self, data):
ap_dist, an_dist = self.network(data)
loss = tf.maximum(ap_dist - an_dist + self.margin, 0.0)
# Calculate accuracy as top-1 accuracy
accuracy = tf.reduce_mean(tf.cast(tf.less(ap_dist, an_dist), tf.float32))
return loss, accuracy
@property
def metrics(self):
return [self.loss_metric, self.accuracy_metric]
# Compile and train the Siamese model
siamese_model = CustomSiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adadelta())
# Callback for early stopping
early_stopping_callback = EarlyStopping(monitor='val_accuracy', patience=3, restore_best_weights=True)
# Training the model
history = siamese_model.fit(train_data, epochs=30, validation_data=val_data)
# Save model weights
embedding_model.save_weights("siamese_model_weights.h5")
# Plot training history (loss and accuracy)
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss Over Epochs')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy Over Epochs')
plt.legend()
plt.show()
# Compute and display cosine similarity
cosine_similarity = metrics.CosineSimilarity()
sample = next(iter(train_data))
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
embedding_model(resnet.preprocess_input(anchor)),
embedding_model(resnet.preprocess_input(positive)),
embedding_model(resnet.preprocess_input(negative)),
)
positive_similarity = cosine_similarity(anchor_embedding, positive_embedding).numpy()
negative_similarity = cosine_similarity(anchor_embedding, negative_embedding).numpy()
print("Positive similarity:", positive_similarity)
print("Negative similarity", negative_similarity)
# Compute ROC and AUC
actual_labels = [1] * positive_similarity.size + [0] * negative_similarity.size
import numpy as np
predicted_scores = np.vstack([positive_similarity, negative_similarity]).squeeze()
from sklearn.metrics import roc_curve, auc
fpr, tpr, thresholds = roc_curve(actual_labels, predicted_scores)
roc_auc = auc(fpr, tpr)
# Plot ROC Curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()
# Save the Siamese model as a TensorFlow SavedModel
embedding_model.save("embedding_model.h5")
|