File size: 8,334 Bytes
c2bcda5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# -*- coding: utf-8 -*-
# Import necessary libraries
import os
import random
import gdown
import tensorflow as tf
from tensorflow.keras import layers, Model, metrics, optimizers
from tensorflow.keras.applications import resnet
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping

# URL for the dataset and directory for downloading
data_url = "https://drive.google.com/file/d/13hSwP2O4pd3NVVnWj2Fcah_r-jkf8uiv/view?usp=drive_link"
download_dir = "./classification_data/"

# Define the target shape for image resizing
image_shape = (200, 200)

# Function to load and process a single image
def load_and_process_image(file_path):
    image_data = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image_data, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    return tf.image.resize(image, image_shape)

# Function to process image triplets (anchor, positive, negative)
def process_triplets(anchor, pos, neg):
    return (load_and_process_image(anchor),
            load_and_process_image(pos),
            load_and_process_image(neg))

# Function to create a dataset from a directory
def create_dataset(directory):
    anchor_list, positive_list, negative_list = [], [], []

    for category in os.listdir(directory):
        category_path = os.path.join(directory, category)
        images = os.listdir(category_path)

        # Loop through each image, creating triplets
        for anchor_img in images:
            anchor_img_path = os.path.join(category_path, anchor_img)
            anchor_list.append(anchor_img_path)

            positive_img = random.choice(images)
            positive_list.append(os.path.join(category_path, positive_img))

            different_category = random.choice([c for c in os.listdir(directory) if c != category])
            negative_img = random.choice(os.listdir(os.path.join(directory, different_category)))
            negative_list.append(os.path.join(directory, different_category, negative_img))

    # Create and return the dataset
    dataset = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(anchor_list),
                                   tf.data.Dataset.from_tensor_slices(positive_list),
                                   tf.data.Dataset.from_tensor_slices(negative_list)))
    dataset = dataset.shuffle(1024).map(process_triplets).batch(64).prefetch(8)
    return dataset

# Create training and validation datasets
train_data = create_dataset(download_dir + "train_data/")
val_data = create_dataset(download_dir + "val_data/")

# Build the base CNN model using ResNet50
base_model = resnet.ResNet50(weights="imagenet", input_shape=image_shape + (3,), include_top=False)
flattened_output = layers.Flatten()(base_model.output)
dense_layer1 = layers.Dense(512, activation="relu")(flattened_output)
normalized1 = layers.BatchNormalization()(dense_layer1)
dense_layer2 = layers.Dense(256, activation="relu")(normalized1)
normalized2 = layers.BatchNormalization()(dense_layer2)
final_output = layers.Dense(256)(normalized2)

embedding_model = Model(inputs=base_model.input, outputs=final_output, name="Image_Embedding")

# Make specific layers trainable
for layer in base_model.layers:
    layer.trainable = layer.name >= "conv5_block1_out"

# Define a custom Distance Layer for the Siamese Network
class DistanceLayer(layers.Layer):
    def call(self, anchor_embedding, positive_embedding, negative_embedding):
        distance_pos = tf.reduce_sum(tf.square(anchor_embedding - positive_embedding), -1)
        distance_neg = tf.reduce_sum(tf.square(anchor_embedding - negative_embedding), -1)
        return distance_pos, distance_neg

# Inputs for the Siamese Network
anchor_input = layers.Input(name="anchor_input", shape=image_shape + (3,))
positive_input = layers.Input(name="positive_input", shape=image_shape + (3,))
negative_input = layers.Input(name="negative_input", shape=image_shape + (3,))

# Build the Siamese Network
siamese_network = Model(inputs=[anchor_input, positive_input, negative_input],
                        outputs=DistanceLayer()(
                            embedding_model(resnet.preprocess_input(anchor_input)),
                            embedding_model(resnet.preprocess_input(positive_input)),
                            embedding_model(resnet.preprocess_input(negative_input))
                        ))

# Custom Siamese Model class
class CustomSiameseModel(Model):
    def __init__(self, network, margin=0.5):
        super().__init__()
        self.network = network
        self.margin = margin
        self.loss_metric = metrics.Mean(name="loss")
        self.accuracy_metric = metrics.Mean(name="accuracy")

    def call(self, inputs):
        return self.network(inputs)

    def train_step(self, data):
        with tf.GradientTape() as tape:
            loss, accuracy = self.compute_loss_and_accuracy(data)
        gradients = tape.gradient(loss, self.network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.network.trainable_weights))
        self.loss_metric.update_state(loss)
        self.accuracy_metric.update_state(accuracy)
        return {"loss": self.loss_metric.result(), "accuracy": self.accuracy_metric.result()}

    def test_step(self, data):
        loss, accuracy = self.compute_loss_and_accuracy(data)
        self.loss_metric.update_state(loss)
        self.accuracy_metric.update_state(accuracy)
        return {"loss": self.loss_metric.result(), "accuracy": self.accuracy_metric.result()}

    def compute_loss_and_accuracy(self, data):
        ap_dist, an_dist = self.network(data)
        loss = tf.maximum(ap_dist - an_dist + self.margin, 0.0)

        # Calculate accuracy as top-1 accuracy
        accuracy = tf.reduce_mean(tf.cast(tf.less(ap_dist, an_dist), tf.float32))
        return loss, accuracy

    @property
    def metrics(self):
        return [self.loss_metric, self.accuracy_metric]

# Compile and train the Siamese model
siamese_model = CustomSiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adadelta())

# Callback for early stopping
early_stopping_callback = EarlyStopping(monitor='val_accuracy', patience=3, restore_best_weights=True)

# Training the model
history = siamese_model.fit(train_data, epochs=30, validation_data=val_data)

# Save model weights
embedding_model.save_weights("siamese_model_weights.h5")

# Plot training history (loss and accuracy)
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss Over Epochs')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy Over Epochs')
plt.legend()
plt.show()

# Compute and display cosine similarity
cosine_similarity = metrics.CosineSimilarity()
sample = next(iter(train_data))
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding_model(resnet.preprocess_input(anchor)),
    embedding_model(resnet.preprocess_input(positive)),
    embedding_model(resnet.preprocess_input(negative)),
)
positive_similarity = cosine_similarity(anchor_embedding, positive_embedding).numpy()
negative_similarity = cosine_similarity(anchor_embedding, negative_embedding).numpy()
print("Positive similarity:", positive_similarity)
print("Negative similarity", negative_similarity)

# Compute ROC and AUC
actual_labels = [1] * positive_similarity.size + [0] * negative_similarity.size
import numpy as np
predicted_scores = np.vstack([positive_similarity, negative_similarity]).squeeze()
from sklearn.metrics import roc_curve, auc
fpr, tpr, thresholds = roc_curve(actual_labels, predicted_scores)
roc_auc = auc(fpr, tpr)

# Plot ROC Curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

# Save the Siamese model as a TensorFlow SavedModel
embedding_model.save("embedding_model.h5")