Added CNN code and tensors

This commit is contained in:
Oscar Blue 2022-03-20 17:30:02 +00:00
parent b2f8b7606a
commit 123a5d3dde
7 changed files with 383 additions and 0 deletions

4
.gitattributes vendored Normal file
View file

@ -0,0 +1,4 @@
*.mp4 filter=lfs diff=lfs merge=lfs -text
*.webm filter=lfs diff=lfs merge=lfs -text
*.jpg filter=lfs diff=lfs merge=lfs -text
*.pt filter=lfs diff=lfs merge=lfs -text

View file

View file

@ -0,0 +1,24 @@
import torch
import os
# https://pytorch.org/hub/pytorch_vision_resnet/
MEAN = [0.485, 0.456, 0.406]
STD = [0.229, 0.224, 0.225]
IMAGE_SIZE = 224
VAL_SPLIT = 0.1
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
FEATURE_EXTRACTION_BATCH_SIZE = 256
FINETUNE_BATCH_SIZE = 64
PRED_BATCH_SIZE = 4
EPOCHS = 20
LR = 0.001
LR_FINETUNE = 0.0005 # REMOVE
IMAGE_SIZE = 32
WARMUP_PLOT = os.path.join("output", "plot.png")
WARMUP_MODEL = os.path.join("output", "plot.pth")
TENSOR_IMAGES_PATH = "tensorImages.pt"
TENSOR_RATINGS_PATH = "tensorRatings.pt"

View file

@ -0,0 +1,195 @@
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import random
import time
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from sklearn.model_selection import train_test_split
import cv2
from pathlib import Path
import config
from torchvision import transforms
import torch
from torch.utils.data import DataLoader
import os
from os.path import abspath
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
datasetDir = "/datasets/"
#datasetDir = ""
script_directory = os.path.dirname(__file__)
projectRoot = abspath(os.path.join(script_directory, "../.."))
#projectRoot = "/src/"
print(projectRoot)
tensorImagesPath = os.path.join(projectRoot, "src/autophotographer/tensorImages.pt")
tensorImagesPath = os.path.join(projectRoot, "src/autophotographer/tensorImages.pt")
tensorRatingsPath = os.path.join(projectRoot, "src/autophotographer/tensorRatings.pt")
tensorArrayPath = os.path.join(projectRoot, "src/autophotographer/tensorArray.pt")
filePathRatings = os.path.join(projectRoot, "data/ratings.txt")
if not datasetDir == "":
filePathStyle = datasetDir + "AVA/style_image_lists/test.multilab"
filePathIds = datasetDir + "AVA/style_image_lists/test.jpgl"
filePathInfoAVA = datasetDir + "AVA/AVA.txt"
imgPath= datasetDir + "AVA/images/images"
def load_aesthetic_attributes(filePathStyle, filePathIds):
columns = ["Complementary Colors", "Duotones", "HDR", "Image Grain", "Light On White", "Long Exposure", "Macro", "Motion Blur", "Negative Image", "Rule of Thirds", "Shallow DOF", "Silhouettes", "Soft Focus" , "Vanishing Point"]
dataFrameStyle = pd.read_csv(filePathStyle, sep=" ", header=None, names=columns)
columns = ["ID"]
dataFrameId = pd.read_csv(filePathIds, sep=" ", header=None, names=columns)
dataFrame = dataFrameId.join(dataFrameStyle)
# return dataframe
return dataFrame
def get_style_info(imageIndex):
df = load_aesthetic_attributes(filePathStyle, filePathIds)
imageInfo = df.loc[df['ID'] == imageIndex]
styleAttributes = ""
for i in imageInfo:
if imageInfo[i].values[0] == 1:
if styleAttributes == "":
styleAttributes = i
else:
styleAttributes = styleAttributes + ", " + i
return styleAttributes
def display_image_with_styles(imageIndex):
styleInfo = get_style_info(imageIndex)
image = mpimg.imread(imgPath + "/" + str(imageIndex) + ".jpg")
plt.imshow(image)
plt.title("ID: " + str(imageIndex) + ", Rating: " + str(calculate_image_rating(imageIndex)))
plt.text(0, 0, styleInfo, ha="center")
plt.axis("off")
plt.show()
def get_random_image_index():
df = load_aesthetic_attributes(filePathStyle, filePathIds)
imageIndexes = df["ID"].values
randomId = random.choice(imageIndexes)
return randomId
def calculate_image_rating(imageIndex):
df = pd.read_csv(filePathInfoAVA, sep=" ", header=None)
imageInfo = df.loc[df[1] == imageIndex]
styleIndex = 2
numOfRatings = 0
score = 0
#print(imageInfo)
while styleIndex <= 11:
score += imageInfo[styleIndex].values[0] * (styleIndex-1)
numOfRatings += imageInfo[styleIndex].values[0]
# print((str(styleIndex-1) + ": " + str(imageInfo[styleIndex].values[0])))
styleIndex += 1
#print(numOfRatings)
#print(score)
adjustedScore = score / numOfRatings
#print(adjustedScore)
return adjustedScore
def get_all_image_ratings():
imageRatings = {}
df = pd.read_csv(filePathInfoAVA, sep=" ", header=None, index_col=0)
listOfImageIndexes = df[1].values
imageIndex = 1
startTime = time.time()
for image in listOfImageIndexes:
if imageIndex % 1000 == 0:
endTime = time.time()
timeTaken = endTime - startTime
print(str(imageIndex) + "/" + str(len(listOfImageIndexes)) + ": took " + str(timeTaken) + " seconds.")
rating = calculate_image_rating(image)
imageRatings[image] = rating
# print(str(image) + ": " + str(rating))
imageIndex += 1
df = pd.DataFrame(list(imageRatings.items()))
return df
def load_image_ratings():
columns = ["id", "rating"]
df = pd.read_csv(filePathRatings, header=None, sep=" ", names=columns)
return df
def process_image_ratings(df, train, test):
cs = MinMaxScaler()
trainContinuous = cs.fit_transform(train["rating"])
testContinuous = cs.fit_transform(test["rating"])
return (trainContinuous, testContinuous)
def load_images(df):
images = []
for path in df.path.values:
image = cv2.imread(path)
image = cv2.resize(image, (32, 32))
images.append(image)
return np.array(images)
def remove_entries_for_missing_images(df, imgPath):
ids = []
for path in Path(imgPath).glob('*.jpg'):
imgName = path.name
imgId = int(imgName[0:-4])
ids.append(imgId)
df = df[df.id.isin(ids) == True]
return df
def build_dataframe(df, imgPath):
imagePaths = []
for id in df.id.values:
imagePath = imgPath + "/" + str(id) + ".jpg"
imagePaths.append(imagePath)
df['path'] = imagePaths
return df
df = build_dataframe(remove_entries_for_missing_images(load_image_ratings(), imgPath), imgPath)
def create_tensor_array():
tensorArray = []
for idx, row in df.iterrows():
rating = row['rating']
image = cv2.imread(row['path'])
image = cv2.resize(image, (32, 32))
tensorTuple = (image, rating)
tensorArray.append(tensorTuple)
return tensorArray
def load_in_tensors():
print(tensorImagesPath)
print(tensorImagesPath)
print(tensorImagesPath)
tensorImages = torch.load(tensorImagesPath)
tensorRatings = torch.load(tensorRatingsPath)
tensorArray = torch.load(tensorArrayPath)
return (tensorImages, tensorRatings, tensorArray)
def get_dataloader(df, transforms, batchSize, shuffle=True):
# Create a dataloader for dataset
tensorArray = []
for idx, row in df.iterrows():
#load each image path and process the data via transforms
transformedImg = transforms(pil_loader(row['path']))
rating = row['rating']
# form a tuple row with rating and the processed data
tensorTuple = (transformedImg, rating)
# append it to the array
tensorArray.append(tensorTuple)
loader = DataLoader(tensorArray, batch_size=batchSize, shuffle=shuffle, num_workers=os.cpu_count(),
pin_memory=True if config.DEVICE == "cuda" else False)
return (tensorArray, loader)
def pil_loader(path):
with open(path, 'rb') as f:
img = Image.open(f)
return img.convert('RGB')

View file

@ -0,0 +1,154 @@
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet50
from torchvision import transforms
from torch.utils.data import DataLoader
from tqdm import tqdm
import time
import os
from os.path import abspath
import matplotlib.pyplot as plt
import config
import dataset
# projectRoot = "/src/"
script_directory = os.path.dirname(__file__)
projectRoot = abspath(os.path.join(script_directory, "../.."))
print("Project root: " + projectRoot)
INITIAL_PLOT_PATH = projectRoot + "/src/output/plot.png"
INTIIAL_MODEL_PATH = projectRoot + "/src/output/model.pth"
# define transformations
trainTransform = transforms.Compose([
transforms.RandomResizedCrop(config.IMAGE_SIZE),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(90),
transforms.ToTensor(),
transforms.Normalize(mean=config.MEAN, std=config.STD)
])
valTransform = transforms.Compose([
transforms.Resize((config.IMAGE_SIZE, config.IMAGE_SIZE)),
transforms.ToTensor(),
transforms.Normalize(mean=config.MEAN, std=config.STD)
])
valSetLen = int(len(dataset.df) * config.VAL_SPLIT)
trainSetLen = len(dataset.df) - valSetLen
trainSet = dataset.df[:trainSetLen]
valSet = dataset.df[trainSetLen:]
print("Using " + config.DEVICE + "...")
# create data loaders
print("Getting dataloaders...")
#(trainDataset, trainLoader) = dataset.get_dataloader(trainSet,
#transforms=trainTransform, batchSize=config.FEATURE_EXTRACTION_BATCH_SIZE)
#torch.save(trainDataset, 'trainDataset.pt')
#(valDataset, valLoader) = dataset.get_dataloader(valSet,
#transforms=valTransform, batchSize=config.FEATURE_EXTRACTION_BATCH_SIZE, shuffle=False)
#torch.save(valDataset, 'valDataset.pt')
valDataset = torch.load("/src/src/autophotographer/valDataset.pt")
valLoader = DataLoader(valDataset, batch_size=config.FEATURE_EXTRACTION_BATCH_SIZE, shuffle=False, num_workers=os.cpu_count(),
pin_memory=True if config.DEVICE == "cuda" else False)
trainDataset = torch.load("/src/src/autophotographer/trainDataset.pt")
trainLoader = DataLoader(trainDataset, batch_size=config.FEATURE_EXTRACTION_BATCH_SIZE, shuffle=True, num_workers=os.cpu_count(),
pin_memory=True if config.DEVICE == "cuda" else False)
# Load the resnet model
model = resnet50(pretrained=True)
# Freeze all existing layers
for parameter in model.parameters():
parameter.requires_grad = False
modelOutputFeatures = model.fc.in_features
model.fc = nn.Linear(modelOutputFeatures, 1)
model = model.to(config.DEVICE)
# initialize loss function and optimizer
lossFunction = nn.L1Loss()
optimizer = torch.optim.Adam(model.fc.parameters(), lr=config.LR)
# calculate steps per epoch for training and validating set
trainSteps = len(trainDataset) // config.FEATURE_EXTRACTION_BATCH_SIZE
valSteps = len(valDataset) // config.FEATURE_EXTRACTION_BATCH_SIZE
# initialize a dictionary to store training data
H = {"train_loss": [], "val_loss": []}
# loop over epochs
print("Starting training...")
startTime = time.time()
for epoch in tqdm(range(config.EPOCHS)):
model.train()
totalTrainLoss = 0
totalValLoss = 0
trainCorrect = 0
valCorrect = 0
for (i, (x, y)) in enumerate(trainLoader):
(x, y) = (x.to(config.DEVICE), y.to(config.DEVICE))
pred = model(x)
new_shape = (len(y), 1)
y = y.view(new_shape)
loss = lossFunction(pred, y)
loss.backward()
if (i + 2) % 2 == 0:
optimizer.step()
optimizer.zero_grad()
totalTrainLoss += loss
trainCorrect += (pred.argmax(1) == y).type(
torch.float).sum().item()
with torch.no_grad():
model.eval()
for (x, y) in valLoader:
(x, y) = (x.to(config.DEVICE), y.to(config.DEVICE))
pred = model(x)
new_shape = (len(y), 1)
y = y.view(new_shape)
totalValLoss += lossFunction(pred, y)
valCorrect += (pred.argmax(1) == y).type(
torch.float).sum().item()
# calculate the average training and validation loss
avgTrainLoss = totalTrainLoss / trainSteps
avgValLoss = totalValLoss / valSteps
# calculate the training and validation accuracy
#trainCorrect = trainCorrect / len(trainDataset)
#valCorrect = valCorrect / len(valDataset)
# update our training history
H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
H["val_loss"].append(avgValLoss.cpu().detach().numpy())
# print the model training and validation information
print("[INFO] EPOCH: {}/{}".format(epoch + 1, config.EPOCHS))
print("Train loss: {:.6f}, Val loss: {:.6f}".format(
avgTrainLoss, avgValLoss))
# display the total time needed to perform the training
endTime = time.time()
print("[INFO] total time taken to train the model: {:.2f}s".format(
endTime - startTime))
# plot the training loss and accuracy
plt.style.use("ggplot")
plt.figure()
plt.plot(H["train_loss"], label="train_loss")
plt.plot(H["val_loss"], label="val_loss")
plt.title("Training Loss on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="lower left")
plt.savefig(INITIAL_PLOT_PATH)
# serialize the model to disk
torch.save(model, INTIIAL_MODEL_PATH)

BIN
src/autophotographer/tensorImages.pt (Stored with Git LFS) Normal file

Binary file not shown.

BIN
src/autophotographer/tensorRatings.pt (Stored with Git LFS) Normal file

Binary file not shown.