翻译链接:DCGAN 论文翻译
先列举出来,之后再慢慢填肉~
(1)利用爬虫爬取动漫图片,网址为:konachan.net,值得注意的是,爬取速度很慢,如果不想爬取的可以看第二种方法
Download_dataset.py
- import requests
- from bs4 import BeautifulSoup
- import os
- import traceback
-
- def download(url,filename):
- if os.path.exists(filename):
- print('file exists!')
- return
- try:
- r = requests.get(url,stream=True,timeout=60)
- r.raise_for_status()
- with open(filename,'wb') as f:
- for chunk in r.iter_content(chunk_size=1024):
- if chunk: # filter out keep-alove new chunks
- f.write(chunk)
- f.flush()
- return filename
- except KeyboardInterrupt:
- if os.path.exists(filename):
- os.remove(filename)
- return KeyboardInterrupt
- except Exception:
- traceback.print_exc()
- if os.path.exists(filename):
- os.remove(filename)
-
- if os.path.exists('imgs') is False:
- os.makedirs('imgs')
-
- start = 1
- end = 8000
- for i in range(start, end+1):
- url = 'http://konachan.net/post?page=%d&tags=' % i
- html = requests.get(url).text # gain the web's information
- soup = BeautifulSoup(html,'html.parser') # doc's string and jie xi qi
- for img in soup.find_all('img',class_="preview"):# 遍历所有preview类,找到img标签
- #target_url = 'http:' + img['src']
- target_url = img['src']
- #print("第",i,"张完成!")
- filename = os.path.join('imgs',target_url.split('/')[-1])
- download(target_url,filename)
- print("target_url:",target_url,"filename",filename,"完成!!")
- print('%d / %d' % (i,end))
-
-
下载完成,它们被放在imgs文件夹中,可以看到里面有很多的人物,但是我们只需要它们的脸,因此还需要提取人脸部分
(2)论文提到了OpenCV的人脸检测器来提取人脸,但是动漫人物的脸和真实人类的脸是有差别的。因此一般不可以用真实人脸检测器来提取动漫人物的脸。
这里提供一个github网址下载动漫人脸检测器:https://github.com/nagadomi/lbpcascade_animeface,里面包含了一个lbpcascade_animeface.xml文件
或者也可以运行下面的指令下载
- wget https://raw.githubusercontent.com/nagadomi/lbpcascade_animeface/master/lbpcascade_animeface.xml
(3)使用OpenCV人脸检测器,裁剪大小为96×96,存储位置为faces文件夹
face_cut.py
- import cv2
- import sys
- import os.path
- from glob import glob
-
- def detect(filename,cascade_file="lbpcascade_animeface.xml"):
- if not os.path.isfile(cascade_file):
- raise RuntimeError("%s: not found" % cascade_file)
-
- cascade = cv2.CascadeClassifier(cascade_file)
- image = cv2.imread(filename)
- gray = cv2.cvtColor(image,cv2.COLOR_BGR2GRAY)
- gray = cv2.equalizeHist(gray)
-
- faces = cascade.detectMultiScale(
- gray,
- # detector options
- scaleFactor = 1.1,
- minNeighbors = 5,
- minSize = (48,48)
- )
-
- for i,(x,y,w,h) in enumerate(faces):
- face = image[y: y+h, x:x+w, :]
- face = cv2.resize(face,(96,96))
- save_filename = '%s.jpg' % (os.path.basename(filename).split('.')[0])
- cv2.imwrite("faces/"+save_filename,face)
-
- if __name__ == '__main__':
- if os.path.exists('faces') is False:
- os.makedirs('faces')
- file_list = glob('imgs/*.jpg')
- for filename in file_list:
- detect(filename)
-
-
这样我们就有了动漫头像~
链接地址:https://pan.baidu.com/s/1eSifHcA,密码:g5qa
下载完成后如下,解压:
一共是51223张动漫人脸头像,同样地,也是96×96大小
分为Pytorch的和TensorFlow两种框架
- import os
- import scipy.misc
- import numpy as np
-
- from model import DCGAN
- from utils import pp, visualize, to_json, show_all_variables
-
- import tensorflow as tf
-
- flags = tf.app.flags
- flags.DEFINE_integer("epoch", 25, "Epoch to train [25]")
- flags.DEFINE_float("learning_rate", 0.0002, "Learning rate of for adam [0.0002]")
- flags.DEFINE_float("beta1", 0.5, "Momentum term of adam [0.5]")
- flags.DEFINE_float("train_size", np.inf, "The size of train images [np.inf]")
- flags.DEFINE_integer("batch_size", 64, "The size of batch images [64]")
- flags.DEFINE_integer("input_height", 108, "The size of image to use (will be center cropped). [108]")
- flags.DEFINE_integer("input_width", None, "The size of image to use (will be center cropped). If None, same value as input_height [None]")
- flags.DEFINE_integer("output_height", 64, "The size of the output images to produce [64]")
- flags.DEFINE_integer("output_width", None, "The size of the output images to produce. If None, same value as output_height [None]")
- flags.DEFINE_string("dataset", "celebA", "The name of dataset [celebA, mnist, lsun]")
- flags.DEFINE_string("input_fname_pattern", "*.jpg", "Glob pattern of filename of input images [*]")
- flags.DEFINE_string("checkpoint_dir", "checkpoint", "Directory name to save the checkpoints [checkpoint]")
- flags.DEFINE_string("data_dir", "./data", "Root directory of dataset [data]")
- flags.DEFINE_string("sample_dir", "samples", "Directory name to save the image samples [samples]")
- flags.DEFINE_boolean("train", False, "True for training, False for testing [False]")
- flags.DEFINE_boolean("crop", False, "True for training, False for testing [False]")
- flags.DEFINE_boolean("visualize", False, "True for visualizing, False for nothing [False]")
- flags.DEFINE_integer("generate_test_images", 100, "Number of images to generate during test. [100]")
- FLAGS = flags.FLAGS
-
- def main(_):
- pp.pprint(flags.FLAGS.__flags)
-
- if FLAGS.input_width is None:
- FLAGS.input_width = FLAGS.input_height
- if FLAGS.output_width is None:
- FLAGS.output_width = FLAGS.output_height
-
- if not os.path.exists(FLAGS.checkpoint_dir):
- os.makedirs(FLAGS.checkpoint_dir)
- if not os.path.exists(FLAGS.sample_dir):
- os.makedirs(FLAGS.sample_dir)
-
- #gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.333)
- run_config = tf.ConfigProto()
- run_config.gpu_options.allow_growth=True
-
- with tf.Session(config=run_config) as sess:
- if FLAGS.dataset == 'mnist':
- dcgan = DCGAN(
- sess,
- input_width=FLAGS.input_width,
- input_height=FLAGS.input_height,
- output_width=FLAGS.output_width,
- output_height=FLAGS.output_height,
- batch_size=FLAGS.batch_size,
- sample_num=FLAGS.batch_size,
- y_dim=10,
- z_dim=FLAGS.generate_test_images,
- dataset_name=FLAGS.dataset,
- input_fname_pattern=FLAGS.input_fname_pattern,
- crop=FLAGS.crop,
- checkpoint_dir=FLAGS.checkpoint_dir,
- sample_dir=FLAGS.sample_dir,
- data_dir=FLAGS.data_dir)
- else:
- dcgan = DCGAN(
- sess,
- input_width=FLAGS.input_width,
- input_height=FLAGS.input_height,
- output_width=FLAGS.output_width,
- output_height=FLAGS.output_height,
- batch_size=FLAGS.batch_size,
- sample_num=FLAGS.batch_size,
- z_dim=FLAGS.generate_test_images,
- dataset_name=FLAGS.dataset,
- input_fname_pattern=FLAGS.input_fname_pattern,
- crop=FLAGS.crop,
- checkpoint_dir=FLAGS.checkpoint_dir,
- sample_dir=FLAGS.sample_dir,
- data_dir=FLAGS.data_dir)
-
- show_all_variables()
-
- if FLAGS.train:
- dcgan.train(FLAGS)
- else:
- if not dcgan.load(FLAGS.checkpoint_dir)[0]:
- raise Exception("[!] Train a model first, then run test mode")
-
-
- # to_json("./web/js/layers.js", [dcgan.h0_w, dcgan.h0_b, dcgan.g_bn0],
- # [dcgan.h1_w, dcgan.h1_b, dcgan.g_bn1],
- # [dcgan.h2_w, dcgan.h2_b, dcgan.g_bn2],
- # [dcgan.h3_w, dcgan.h3_b, dcgan.g_bn3],
- # [dcgan.h4_w, dcgan.h4_b, None])
-
- # Below is codes for visualization
- OPTION = 1
- visualize(sess, dcgan, FLAGS, OPTION)
-
- if __name__ == '__main__':
- tf.app.run()
-
- from __future__ import division
- import os
- import time
- import math
- from glob import glob
- import tensorflow as tf
- import numpy as np
- from six.moves import xrange
-
- from ops import *
- from utils import *
-
- #大小和步幅
- def conv_out_size_same(size, stride):
- return int(math.ceil(float(size) / float(stride)))
-
- class DCGAN(object):
- #定义类的初始化函数 init。主要是对一些默认的参数进行初始化。包括session、crop、批处理大小batch_size、样本数量sample_num、输入与输出的高和宽、各种维度、生成器与判别器的批处理、数据集名字、灰度值、构建模型函数,需要注意的是,要判断数据集的名字是否是mnist,是的话则直接用load_mnist()函数加载数据,否则需要从本地data文件夹中读取数据,并将图像读取为灰度图
- def __init__(self, sess, input_height=108, input_width=108, crop=True,
- batch_size=64, sample_num = 64, output_height=64, output_width=64,
- y_dim=None, z_dim=100, gf_dim=64, df_dim=64,
- gfc_dim=1024, dfc_dim=1024, c_dim=3, dataset_name='default',
- input_fname_pattern='*.jpg', checkpoint_dir=None, sample_dir=None, data_dir='./data'):
- """
-
- Args:
- sess: TensorFlow session
- batch_size: The size of batch. Should be specified before training.
- y_dim: (optional) Dimension of dim for y. [None]
- z_dim: (optional) Dimension of dim for Z. [100]
- gf_dim: (optional) Dimension of gen filters in first conv layer. [64]
- df_dim: (optional) Dimension of discrim filters in first conv layer. [64]
- gfc_dim: (optional) Dimension of gen units for for fully connected layer. [1024]
- dfc_dim: (optional) Dimension of discrim units for fully connected layer. [1024]
- c_dim: (optional) Dimension of image color. For grayscale input, set to 1. [3]
- """
- self.sess = sess
- self.crop = crop
-
- self.batch_size = batch_size
- self.sample_num = sample_num
-
- self.input_height = input_height
- self.input_width = input_width
- self.output_height = output_height
- self.output_width = output_width
-
- self.y_dim = y_dim
- self.z_dim = z_dim
-
- self.gf_dim = gf_dim
- self.df_dim = df_dim
-
- self.gfc_dim = gfc_dim
- self.dfc_dim = dfc_dim
-
- # batch normalization : deals with poor initialization helps gradient flow
- self.d_bn1 = batch_norm(name='d_bn1')
- self.d_bn2 = batch_norm(name='d_bn2')
-
- if not self.y_dim:
- self.d_bn3 = batch_norm(name='d_bn3')
-
- self.g_bn0 = batch_norm(name='g_bn0')
- self.g_bn1 = batch_norm(name='g_bn1')
- self.g_bn2 = batch_norm(name='g_bn2')
-
- if not self.y_dim:
- self.g_bn3 = batch_norm(name='g_bn3')
-
- self.dataset_name = dataset_name
- self.input_fname_pattern = input_fname_pattern
- self.checkpoint_dir = checkpoint_dir
- self.data_dir = data_dir
-
- if self.dataset_name == 'mnist':
- self.data_X, self.data_y = self.load_mnist()
- self.c_dim = self.data_X[0].shape[-1]
- else:
- data_path = os.path.join(self.data_dir, self.dataset_name, self.input_fname_pattern)
- self.data = glob(data_path)
- if len(self.data) == 0:
- raise Exception("[!] No data found in '" + data_path + "'")
- np.random.shuffle(self.data)
- imreadImg = imread(self.data[0])
- if len(imreadImg.shape) >= 3: #check if image is a non-grayscale image by checking channel number
- self.c_dim = imread(self.data[0]).shape[-1]
- else:
- self.c_dim = 1
-
- if len(self.data) < self.batch_size:
- raise Exception("[!] Entire dataset size is less than the configured batch_size")
-
- self.grayscale = (self.c_dim == 1)
-
- self.build_model()
- #定义构建模型函数
- def build_model(self):
- #首先判断y_dim,然后用tf.placeholder占位符定义并初始化y
- if self.y_dim:
- self.y = tf.placeholder(tf.float32, [self.batch_size, self.y_dim], name='y')
- else:
- self.y = None
- #判断crop是否为真,
- #是的话是进行测试,图像维度是输出图像的维度;
- #否则是输入图像的维度
- if self.crop:
- image_dims = [self.output_height, self.output_width, self.c_dim]
- else:
- image_dims = [self.input_height, self.input_width, self.c_dim]
- #利用tf.placeholder定义inputs,是真实数据的向量
- self.inputs = tf.placeholder(
- tf.float32, [self.batch_size] + image_dims, name='real_images')
-
- inputs = self.inputs
- #定义并初始化生成器用到的噪音z,z_sum
- self.z = tf.placeholder(
- tf.float32, [None, self.z_dim], name='z')
- self.z_sum = histogram_summary("z", self.z)
- #用噪音z和标签y初始化生成器G、
- #用输入inputs初始化判别器D和D_logits、样本、
- #用G和y初始化D_和D_logits
- self.G = self.generator(self.z, self.y)
- self.D, self.D_logits = self.discriminator(inputs, self.y, reuse=False)
- self.sampler = self.sampler(self.z, self.y)
- self.D_, self.D_logits_ = self.discriminator(self.G, self.y, reuse=True)
- #D、D_、G分别放在d_sum、d__sum、G_sum
- self.d_sum = histogram_summary("d", self.D)
- self.d__sum = histogram_summary("d_", self.D_)
- self.G_sum = image_summary("G", self.G)
- #都是调用tf.nn.sigmoid_cross_entropy_with_logits函数,
- #只不过
- #一个是训练,y是标签,
- #一个是测试,y是目标
- def sigmoid_cross_entropy_with_logits(x, y):
- try:
- return tf.nn.sigmoid_cross_entropy_with_logits(logits=x, labels=y)
- except:
- return tf.nn.sigmoid_cross_entropy_with_logits(logits=x, targets=y)
- #定义各种损失值。
- #真实数据的判别损失值d_loss_real、
- #虚假数据的判别损失值d_loss_fake、
- #生成器损失值g_loss、
- #判别器损失值d_loss
- self.d_loss_real = tf.reduce_mean(
- sigmoid_cross_entropy_with_logits(self.D_logits, tf.ones_like(self.D)))
- self.d_loss_fake = tf.reduce_mean(
- sigmoid_cross_entropy_with_logits(self.D_logits_, tf.zeros_like(self.D_)))
- self.g_loss = tf.reduce_mean(
- sigmoid_cross_entropy_with_logits(self.D_logits_, tf.ones_like(self.D_)))
-
- self.d_loss_real_sum = scalar_summary("d_loss_real", self.d_loss_real)
- self.d_loss_fake_sum = scalar_summary("d_loss_fake", self.d_loss_fake)
-
- self.d_loss = self.d_loss_real + self.d_loss_fake
-
- self.g_loss_sum = scalar_summary("g_loss", self.g_loss)
- self.d_loss_sum = scalar_summary("d_loss", self.d_loss)
- #定义训练的所有变量t_vars
- t_vars = tf.trainable_variables()
- #定义生成和判别的参数集
- self.d_vars = [var for var in t_vars if 'd_' in var.name]
- self.g_vars = [var for var in t_vars if 'g_' in var.name]
- #保存
- self.saver = tf.train.Saver()
- #定义训练函数
- def train(self, config):
- #定义判别器优化器d_optim和生成器优化器g_optim
- d_optim = tf.train.AdamOptimizer(config.learning_rate, beta1=config.beta1) \
- .minimize(self.d_loss, var_list=self.d_vars)
- g_optim = tf.train.AdamOptimizer(config.learning_rate, beta1=config.beta1) \
- .minimize(self.g_loss, var_list=self.g_vars)
- #变量初始化
- try:
- tf.global_variables_initializer().run()
- except:
- tf.initialize_all_variables().run()
- #分别将关于生成器和判别器有关的变量各合并到一个变量中,
- #并写入事件文件中
- self.g_sum = merge_summary([self.z_sum, self.d__sum,
- self.G_sum, self.d_loss_fake_sum, self.g_loss_sum])
- self.d_sum = merge_summary(
- [self.z_sum, self.d_sum, self.d_loss_real_sum, self.d_loss_sum])
- self.writer = SummaryWriter("./logs", self.sess.graph)
- #噪音z初始化
- sample_z = np.random.uniform(-1, 1, size=(self.sample_num , self.z_dim))
- #根据数据集是否为mnist的判断,
- #进行输入数据和标签的获取。
- #这里使用到了utils.py文件中的get_image函数
- if config.dataset == 'mnist':
- sample_inputs = self.data_X[0:self.sample_num]
- sample_labels = self.data_y[0:self.sample_num]
- else:
- sample_files = self.data[0:self.sample_num]
- sample = [
- get_image(sample_file,
- input_height=self.input_height,
- input_width=self.input_width,
- resize_height=self.output_height,
- resize_width=self.output_width,
- crop=self.crop,
- grayscale=self.grayscale) for sample_file in sample_files]
- if (self.grayscale):
- sample_inputs = np.array(sample).astype(np.float32)[:, :, :, None]
- else:
- sample_inputs = np.array(sample).astype(np.float32)
- #定义计数器counter和起始时间start_time
- counter = 1
- start_time = time.time()
- #加载检查点,并判断加载是否成功
- could_load, checkpoint_counter = self.load(self.checkpoint_dir)
- if could_load:
- counter = checkpoint_counter
- print(" [*] Load SUCCESS")
- else:
- print(" [!] Load failed...")
- #开始for epoch in range(config.epoch)循环训练。
- #先判断数据集是否是mnist,
- #获取批处理的大小
- for epoch in range(config.epoch):
- if config.dataset == 'mnist':
- batch_idxs = min(len(self.data_X), config.train_size) // config.batch_size
- else:
- self.data = glob(os.path.join(
- config.data_dir, config.dataset, self.input_fname_pattern))
- np.random.shuffle(self.data)
- batch_idxs = min(len(self.data), config.train_size) // config.batch_size
- #开始for idx in xrange(0, batch_idxs)循环训练,
- #判断数据集是否是mnist,
- #来定义初始化批处理图像和标签
- for idx in range(0, int(batch_idxs)):
- if config.dataset == 'mnist':
- batch_images = self.data_X[idx*config.batch_size:(idx+1)*config.batch_size]
- batch_labels = self.data_y[idx*config.batch_size:(idx+1)*config.batch_size]
- else:
- batch_files = self.data[idx*config.batch_size:(idx+1)*config.batch_size]
- batch = [
- get_image(batch_file,
- input_height=self.input_height,
- input_width=self.input_width,
- resize_height=self.output_height,
- resize_width=self.output_width,
- crop=self.crop,
- grayscale=self.grayscale) for batch_file in batch_files]
- if self.grayscale:
- batch_images = np.array(batch).astype(np.float32)[:, :, :, None]
- else:
- batch_images = np.array(batch).astype(np.float32)
- #定义初始化噪音z
- batch_z = np.random.uniform(-1, 1, [config.batch_size, self.z_dim]) \
- .astype(np.float32)
- #判断数据集是否是mnist,
- #来更新判别器网络和生成器网络,
- #这里就不管mnist数据集是怎么处理的,
- #其他数据集是,
- #运行生成器优化器两次,
- #以确保判别器损失值不会变为0,
- #然后是判别器
- #真实数据损失值和
- #虚假数据损失值、
- #生成器损失值
- if config.dataset == 'mnist':
- # Update D network
- _, summary_str = self.sess.run([d_optim, self.d_sum],
- feed_dict={
- self.inputs: batch_images,
- self.z: batch_z,
- self.y:batch_labels,
- })
- self.writer.add_summary(summary_str, counter)
-
- # Update G network
- _, summary_str = self.sess.run([g_optim, self.g_sum],
- feed_dict={
- self.z: batch_z,
- self.y:batch_labels,
- })
- self.writer.add_summary(summary_str, counter)
-
- # Run g_optim twice to make sure that d_loss does not go to zero (different from paper)
- _, summary_str = self.sess.run([g_optim, self.g_sum],
- feed_dict={ self.z: batch_z, self.y:batch_labels })
- self.writer.add_summary(summary_str, counter)
-
- errD_fake = self.d_loss_fake.eval({
- self.z: batch_z,
- self.y:batch_labels
- })
- errD_real = self.d_loss_real.eval({
- self.inputs: batch_images,
- self.y:batch_labels
- })
- errG = self.g_loss.eval({
- self.z: batch_z,
- self.y: batch_labels
- })
- else:
- # Update D network
- _, summary_str = self.sess.run([d_optim, self.d_sum],
- feed_dict={ self.inputs: batch_images, self.z: batch_z })
- self.writer.add_summary(summary_str, counter)
-
- # Update G network
- _, summary_str = self.sess.run([g_optim, self.g_sum],
- feed_dict={ self.z: batch_z })
- self.writer.add_summary(summary_str, counter)
-
- # Run g_optim twice to make sure that d_loss does not go to zero (different from paper)
- _, summary_str = self.sess.run([g_optim, self.g_sum],
- feed_dict={ self.z: batch_z })
- self.writer.add_summary(summary_str, counter)
-
- errD_fake = self.d_loss_fake.eval({ self.z: batch_z })
- errD_real = self.d_loss_real.eval({ self.inputs: batch_images })
- errG = self.g_loss.eval({self.z: batch_z})
-
- counter += 1
- #输出本次批处理中训练参数的情况,
- #首先是第几个epoch,
- #第几个batch,
- #训练时间,
- #判别器损失值,
- #生成器损失值
- print("Epoch: [%2d/%2d] [%4d/%4d] time: %4.4f, d_loss: %.8f, g_loss: %.8f" \
- % (epoch, config.epoch, idx, batch_idxs,
- time.time() - start_time, errD_fake+errD_real, errG))
- #每100次batch训练后,根据数据集是否是mnist的不同,
- #获取样本、判别器损失值、生成器损失值,
- #调用utils.py文件的save_images函数,
- #保存训练后的样本,
- #并以epoch、batch的次数命名文件。
- #然后打印判别器损失值和生成器损失值
- if np.mod(counter, 100) == 1:
- if config.dataset == 'mnist':
- samples, d_loss, g_loss = self.sess.run(
- [self.sampler, self.d_loss, self.g_loss],
- feed_dict={
- self.z: sample_z,
- self.inputs: sample_inputs,
- self.y:sample_labels,
- }
- )
- save_images(samples, image_manifold_size(samples.shape[0]),
- './{}/train_{:02d}_{:04d}.png'.format(config.sample_dir, epoch, idx))
- print("[Sample] d_loss: %.8f, g_loss: %.8f" % (d_loss, g_loss))
- else:
- try:
- samples, d_loss, g_loss = self.sess.run(
- [self.sampler, self.d_loss, self.g_loss],
- feed_dict={
- self.z: sample_z,
- self.inputs: sample_inputs,
- },
- )
- save_images(samples, image_manifold_size(samples.shape[0]),
- './{}/train_{:02d}_{:04d}.png'.format(config.sample_dir, epoch, idx))
- print("[Sample] d_loss: %.8f, g_loss: %.8f" % (d_loss, g_loss))
- except:
- print("one pic error!...")
- #每500次batch训练后,保存一次检查点
- if np.mod(counter, 500) == 2:
- self.save(config.checkpoint_dir, counter)
-
- def discriminator(self, image, y=None, reuse=False):
- with tf.variable_scope("discriminator") as scope:
- if reuse:
- scope.reuse_variables()
- #如果为假,
- #则直接设置5层,
- #前4层为使用lrelu激活函数的卷积层,
- #最后一层是使用线性层,
- #最后返回h4和sigmoid处理后的h4
- if not self.y_dim:
- h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
- h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
- h2 = lrelu(self.d_bn2(conv2d(h1, self.df_dim*4, name='d_h2_conv')))
- h3 = lrelu(self.d_bn3(conv2d(h2, self.df_dim*8, name='d_h3_conv')))
- h4 = linear(tf.reshape(h3, [self.batch_size, -1]), 1, 'd_h4_lin')
-
- return tf.nn.sigmoid(h4), h4
- #如果为真,
- #则首先将Y_dim变为yb,
- #然后利用ops.py文件中的conv_cond_concat函数,
- #连接image与yb得到x,
- #然后设置4层网络,
- #前3层是使用lrelu激励函数的卷积层,
- #最后一层是线性层,
- #最后返回h3和sigmoid处理后的h3
- else:
- yb = tf.reshape(y, [self.batch_size, 1, 1, self.y_dim])
- x = conv_cond_concat(image, yb)
-
- h0 = lrelu(conv2d(x, self.c_dim + self.y_dim, name='d_h0_conv'))
- h0 = conv_cond_concat(h0, yb)
-
- h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim + self.y_dim, name='d_h1_conv')))
- h1 = tf.reshape(h1, [self.batch_size, -1])
- h1 = concat([h1, y], 1)
-
- h2 = lrelu(self.d_bn2(linear(h1, self.dfc_dim, 'd_h2_lin')))
- h2 = concat([h2, y], 1)
-
- h3 = linear(h2, 1, 'd_h3_lin')
-
- return tf.nn.sigmoid(h3), h3
-
- def generator(self, z, y=None):
- with tf.variable_scope("generator") as scope:
- #如果为假:首先获取输出的宽和高,
- #然后根据这一值得到更多不同大小的高和宽的对。
- #然后获取
- #h0层的噪音z,
- #权值w,
- #偏置值b,
- #然后利用relu激励函数。
- #h1层,
- #首先对h0层解卷积得到本层的权值和偏置值,
- #然后利用relu激励函数。
- #h2、h3等同于h1。
- #h4层,
- #解卷积h3,
- #然后直接返回使用tanh激励函数后的h4
- if not self.y_dim:
- s_h, s_w = self.output_height, self.output_width
- s_h2, s_w2 = conv_out_size_same(s_h, 2), conv_out_size_same(s_w, 2)
- s_h4, s_w4 = conv_out_size_same(s_h2, 2), conv_out_size_same(s_w2, 2)
- s_h8, s_w8 = conv_out_size_same(s_h4, 2), conv_out_size_same(s_w4, 2)
- s_h16, s_w16 = conv_out_size_same(s_h8, 2), conv_out_size_same(s_w8, 2)
-
- # project `z` and reshape
- self.z_, self.h0_w, self.h0_b = linear(
- z, self.gf_dim*8*s_h16*s_w16, 'g_h0_lin', with_w=True)
-
- self.h0 = tf.reshape(
- self.z_, [-1, s_h16, s_w16, self.gf_dim * 8])
- h0 = tf.nn.relu(self.g_bn0(self.h0))
-
- self.h1, self.h1_w, self.h1_b = deconv2d(
- h0, [self.batch_size, s_h8, s_w8, self.gf_dim*4], name='g_h1', with_w=True)
- h1 = tf.nn.relu(self.g_bn1(self.h1))
-
- h2, self.h2_w, self.h2_b = deconv2d(
- h1, [self.batch_size, s_h4, s_w4, self.gf_dim*2], name='g_h2', with_w=True)
- h2 = tf.nn.relu(self.g_bn2(h2))
-
- h3, self.h3_w, self.h3_b = deconv2d(
- h2, [self.batch_size, s_h2, s_w2, self.gf_dim*1], name='g_h3', with_w=True)
- h3 = tf.nn.relu(self.g_bn3(h3))
-
- h4, self.h4_w, self.h4_b = deconv2d(
- h3, [self.batch_size, s_h, s_w, self.c_dim], name='g_h4', with_w=True)
-
- return tf.nn.tanh(h4)
- else:
- s_h, s_w = self.output_height, self.output_width
- s_h2, s_h4 = int(s_h/2), int(s_h/4)
- s_w2, s_w4 = int(s_w/2), int(s_w/4)
-
- # yb = tf.expand_dims(tf.expand_dims(y, 1),2)
- yb = tf.reshape(y, [self.batch_size, 1, 1, self.y_dim])
- z = concat([z, y], 1)
-
- h0 = tf.nn.relu(
- self.g_bn0(linear(z, self.gfc_dim, 'g_h0_lin')))
- h0 = concat([h0, y], 1)
-
- h1 = tf.nn.relu(self.g_bn1(
- linear(h0, self.gf_dim*2*s_h4*s_w4, 'g_h1_lin')))
- h1 = tf.reshape(h1, [self.batch_size, s_h4, s_w4, self.gf_dim * 2])
-
- h1 = conv_cond_concat(h1, yb)
-
- h2 = tf.nn.relu(self.g_bn2(deconv2d(h1,
- [self.batch_size, s_h2, s_w2, self.gf_dim * 2], name='g_h2')))
- h2 = conv_cond_concat(h2, yb)
-
- return tf.nn.sigmoid(
- deconv2d(h2, [self.batch_size, s_h, s_w, self.c_dim], name='g_h3'))
-
- def sampler(self, z, y=None):
- #利用tf.variable_scope(“generator”) as scope,
- #在一个作用域 scope 内共享一些变量
- with tf.variable_scope("generator") as scope:
- #对scope利用reuse_variables()进行重利用
- scope.reuse_variables()
- #根据y_dim是否为真,进行判别网络的设置
- if not self.y_dim:
- s_h, s_w = self.output_height, self.output_width
- s_h2, s_w2 = conv_out_size_same(s_h, 2), conv_out_size_same(s_w, 2)
- s_h4, s_w4 = conv_out_size_same(s_h2, 2), conv_out_size_same(s_w2, 2)
- s_h8, s_w8 = conv_out_size_same(s_h4, 2), conv_out_size_same(s_w4, 2)
- s_h16, s_w16 = conv_out_size_same(s_h8, 2), conv_out_size_same(s_w8, 2)
-
- # project `z` and reshape
- h0 = tf.reshape(
- linear(z, self.gf_dim*8*s_h16*s_w16, 'g_h0_lin'),
- [-1, s_h16, s_w16, self.gf_dim * 8])
- h0 = tf.nn.relu(self.g_bn0(h0, train=False))
-
- h1 = deconv2d(h0, [self.batch_size, s_h8, s_w8, self.gf_dim*4], name='g_h1')
- h1 = tf.nn.relu(self.g_bn1(h1, train=False))
-
- h2 = deconv2d(h1, [self.batch_size, s_h4, s_w4, self.gf_dim*2], name='g_h2')
- h2 = tf.nn.relu(self.g_bn2(h2, train=False))
-
- h3 = deconv2d(h2, [self.batch_size, s_h2, s_w2, self.gf_dim*1], name='g_h3')
- h3 = tf.nn.relu(self.g_bn3(h3, train=False))
-
- h4 = deconv2d(h3, [self.batch_size, s_h, s_w, self.c_dim], name='g_h4')
-
- return tf.nn.tanh(h4)
- else:
- s_h, s_w = self.output_height, self.output_width
- s_h2, s_h4 = int(s_h/2), int(s_h/4)
- s_w2, s_w4 = int(s_w/2), int(s_w/4)
-
- # yb = tf.reshape(y, [-1, 1, 1, self.y_dim])
- yb = tf.reshape(y, [self.batch_size, 1, 1, self.y_dim])
- z = concat([z, y], 1)
-
- h0 = tf.nn.relu(self.g_bn0(linear(z, self.gfc_dim, 'g_h0_lin'), train=False))
- h0 = concat([h0, y], 1)
-
- h1 = tf.nn.relu(self.g_bn1(
- linear(h0, self.gf_dim*2*s_h4*s_w4, 'g_h1_lin'), train=False))
- h1 = tf.reshape(h1, [self.batch_size, s_h4, s_w4, self.gf_dim * 2])
- h1 = conv_cond_concat(h1, yb)
-
- h2 = tf.nn.relu(self.g_bn2(
- deconv2d(h1, [self.batch_size, s_h2, s_w2, self.gf_dim * 2], name='g_h2'), train=False))
- h2 = conv_cond_concat(h2, yb)
-
- return tf.nn.sigmoid(deconv2d(h2, [self.batch_size, s_h, s_w, self.c_dim], name='g_h3'))
- #这个主要是针对mnist数据集设置的,所以暂且不考虑,过
- def load_mnist(self):
- data_dir = os.path.join(self.data_dir, self.dataset_name)
-
- fd = open(os.path.join(data_dir,'train-images-idx3-ubyte'))
- loaded = np.fromfile(file=fd,dtype=np.uint8)
- trX = loaded[16:].reshape((60000,28,28,1)).astype(np.float)
-
- fd = open(os.path.join(data_dir,'train-labels-idx1-ubyte'))
- loaded = np.fromfile(file=fd,dtype=np.uint8)
- trY = loaded[8:].reshape((60000)).astype(np.float)
-
- fd = open(os.path.join(data_dir,'t10k-images-idx3-ubyte'))
- loaded = np.fromfile(file=fd,dtype=np.uint8)
- teX = loaded[16:].reshape((10000,28,28,1)).astype(np.float)
-
- fd = open(os.path.join(data_dir,'t10k-labels-idx1-ubyte'))
- loaded = np.fromfile(file=fd,dtype=np.uint8)
- teY = loaded[8:].reshape((10000)).astype(np.float)
-
- trY = np.asarray(trY)
- teY = np.asarray(teY)
-
- X = np.concatenate((trX, teX), axis=0)
- y = np.concatenate((trY, teY), axis=0).astype(np.int)
-
- seed = 547
- np.random.seed(seed)
- np.random.shuffle(X)
- np.random.seed(seed)
- np.random.shuffle(y)
-
- y_vec = np.zeros((len(y), self.y_dim), dtype=np.float)
- for i, label in enumerate(y):
- y_vec[i,y[i]] = 1.0
-
- return X/255.,y_vec
- #返回数据集名字,batch大小,输出的高和宽
- @property
- def model_dir(self):
- return "{}_{}_{}_{}".format(
- self.dataset_name, self.batch_size,
- self.output_height, self.output_width)
-
- def save(self, checkpoint_dir, step):
- model_name = "DCGAN.model"
- checkpoint_dir = os.path.join(checkpoint_dir, self.model_dir)
-
- if not os.path.exists(checkpoint_dir):
- os.makedirs(checkpoint_dir)
-
- self.saver.save(self.sess,
- os.path.join(checkpoint_dir, model_name),
- global_step=step)
- #读取检查点,获取路径,重新存储检查点,并且计数。
- #打印成功读取的提示;
- #如果没有路径,则打印失败的提示
- def load(self, checkpoint_dir):
- import re
- print(" [*] Reading checkpoints...")
- checkpoint_dir = os.path.join(checkpoint_dir, self.model_dir)
-
- ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
- if ckpt and ckpt.model_checkpoint_path:
- ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
- self.saver.restore(self.sess, os.path.join(checkpoint_dir, ckpt_name))
- counter = int(next(re.finditer("(\d+)(?!.*\d)",ckpt_name)).group(0))
- print(" [*] Success to read {}".format(ckpt_name))
- return True, counter
- else:
- print(" [*] Failed to find a checkpoint")
- return False, 0
-
- import math
- import numpy as np
- import tensorflow as tf
-
- #首先导入tensorflow.python.framework模块,
- #包含了tensorflow中图、张量等的定义操作
- from tensorflow.python.framework import ops
-
- from utils import *
-
- #定义了一堆变量:
- #image_summary 、
- #scalar_summary、
- #histogram_summary、
- #merge_summary、
- #SummaryWriter,
- #都是从相应的tensorflow中获取的。
- #如果可是直接获取,则获取,
- #否则从tf.summary中获取
- try:
- image_summary = tf.image_summary
- scalar_summary = tf.scalar_summary
- histogram_summary = tf.histogram_summary
- merge_summary = tf.merge_summary
- SummaryWriter = tf.train.SummaryWriter
- except:
- image_summary = tf.summary.image
- scalar_summary = tf.summary.scalar
- histogram_summary = tf.summary.histogram
- merge_summary = tf.summary.merge
- SummaryWriter = tf.summary.FileWriter
-
- #用来连接多个tensor。
- #利用dir(tf)判断”concat_v2”是否在里面,
- #如果在的话,
- #定义一个concat(tensors, axis, *args, **kwargs)函数,
- #并返回tf.concat_v2(tensors, axis, *args, **kwargs);
- #否则也定义concat(tensors, axis, *args, **kwargs)函数,
- #只不过返回的是tf.concat(tensors, axis, *args, **kwargs)
- if "concat_v2" in dir(tf):
- def concat(tensors, axis, *args, **kwargs):
- return tf.concat_v2(tensors, axis, *args, **kwargs)
- else:
- def concat(tensors, axis, *args, **kwargs):
- return tf.concat(tensors, axis, *args, **kwargs)
-
- #定义一个batch_norm类,包含两个函数init和call函数。
- #首先
- #在init(self, epsilon=1e-5, momentum = 0.9, name=”batch_norm”)函数中,
- #定义一个name参数名字的变量,
- #初始化self变量epsilon、momentum 、name。
- #在call(self, x, train=True)函数中,
- #利用tf.contrib.layers.batch_norm函数批处理规范化
- class batch_norm(object):
- def __init__(self, epsilon=1e-5, momentum = 0.9, name="batch_norm"):
- with tf.variable_scope(name):
- self.epsilon = epsilon
- self.momentum = momentum
- self.name = name
-
- def __call__(self, x, train=True):
- return tf.contrib.layers.batch_norm(x,
- decay=self.momentum,
- updates_collections=None,
- epsilon=self.epsilon,
- scale=True,
- is_training=train,
- scope=self.name)
-
- #连接x,y与Int32型的[x_shapes[0], x_shapes[1], x_shapes[2], y_shapes[3]]维度的张量乘积
- def conv_cond_concat(x, y):
- """Concatenate conditioning vector on feature map axis."""
- x_shapes = x.get_shape()
- y_shapes = y.get_shape()
- return concat([
- x, y*tf.ones([x_shapes[0], x_shapes[1], x_shapes[2], y_shapes[3]])], 3)
-
- #卷积函数:
- #获取随机正态分布权值、实现卷积、获取初始偏置值,
- #获取添加偏置值后的卷积变量并返回
- def conv2d(input_, output_dim,
- k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02,
- name="conv2d"):
- with tf.variable_scope(name):
- w = tf.get_variable('w', [k_h, k_w, input_.get_shape()[-1], output_dim],
- initializer=tf.truncated_normal_initializer(stddev=stddev))
- conv = tf.nn.conv2d(input_, w, strides=[1, d_h, d_w, 1], padding='SAME')
-
- biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
- conv = tf.reshape(tf.nn.bias_add(conv, biases), conv.get_shape())
-
- return conv
-
- #解卷积函数:
- #获取随机正态分布权值、解卷积,获取初始偏置值,
- #获取添加偏置值后的卷积变量,
- #判断with_w是否为真,
- #真则返回解卷积、权值、偏置值,
- #否则返回解卷积
- def deconv2d(input_, output_shape,
- k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02,
- name="deconv2d", with_w=False):
- with tf.variable_scope(name):
- # filter : [height, width, output_channels, in_channels]
- w = tf.get_variable('w', [k_h, k_w, output_shape[-1], input_.get_shape()[-1]],
- initializer=tf.random_normal_initializer(stddev=stddev))
-
- try:
- deconv = tf.nn.conv2d_transpose(input_, w, output_shape=output_shape,
- strides=[1, d_h, d_w, 1])
-
- # Support for verisons of TensorFlow before 0.7.0
- except AttributeError:
- deconv = tf.nn.deconv2d(input_, w, output_shape=output_shape,
- strides=[1, d_h, d_w, 1])
-
- biases = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
- deconv = tf.reshape(tf.nn.bias_add(deconv, biases), deconv.get_shape())
-
- if with_w:
- return deconv, w, biases
- else:
- return deconv
-
- #定义一个lrelu激励函数
- def lrelu(x, leak=0.2, name="lrelu"):
- return tf.maximum(x, leak*x)
-
- #进行线性运算,
- #获取一个随机正态分布矩阵,获取初始偏置值,
- #如果with_w为真,则返回xw+b,权值w和偏置值b;
- #否则返回xw+b
- def linear(input_, output_size, scope=None, stddev=0.02, bias_start=0.0, with_w=False):
- shape = input_.get_shape().as_list()
-
- with tf.variable_scope(scope or "Linear"):
- try:
- matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32,
- tf.random_normal_initializer(stddev=stddev))
- except ValueError as err:
- msg = "NOTE: Usually, this is due to an issue with the image dimensions. Did you correctly set '--crop' or '--input_height' or '--output_height'?"
- err.args = err.args + (msg,)
- raise
- bias = tf.get_variable("bias", [output_size],
- initializer=tf.constant_initializer(bias_start))
- if with_w:
- return tf.matmul(input_, matrix) + bias, matrix, bias
- else:
- return tf.matmul(input_, matrix) + bias
-
- #这个文件主要定义了
- #一些变量连接的函数、
- #批处理规范化的函数、
- #卷积函数、
- #解卷积函数、
- #激励函数、
- #线性运算函数
- """
- Some codes from https://github.com/Newmu/dcgan_code
- """
- from __future__ import division
- import math
- import json
- import random
- import pprint
- import scipy.misc
- import numpy as np
- from time import gmtime, strftime
- from six.moves import xrange
-
- import tensorflow as tf
- import tensorflow.contrib.slim as slim
-
- #首先定义了一个pp = pprint.PrettyPrinter(),
- #以方便打印数据结构信息
- pp = pprint.PrettyPrinter()
-
- #[-1]读取倒数第一个元素
- #定义了get_stddev函数,
- #是三个参数乘积后开平方的倒数,
- #应该是为了随机化用
- get_stddev = lambda x, k_h, k_w: 1/math.sqrt(k_w*k_h*x.get_shape()[-1])
-
- #定义show_all_variables()函数。
- #首先,tf.trainable_variables返回的是需要训练的变量列表;
- #然后用tensorflow.contrib.slim中的model_analyzer.analyze_vars
- #打印出所有与训练相关的变量信息
- def show_all_variables():
- model_vars = tf.trainable_variables()
- #用法参见slim_model_analyzer_analyze_vars.py
- slim.model_analyzer.analyze_vars(model_vars, print_info=True)
-
- #首先根据图像路径参数读取路径,
- #根据灰度化参数选择是否进行灰度化。
- #然后对图像参照输入的参数进行裁剪
- def get_image(image_path, input_height, input_width,
- resize_height=64, resize_width=64,
- crop=True, grayscale=False):
- image = imread(image_path, grayscale)
- return transform(image, input_height, input_width,
- resize_height, resize_width, crop)
-
- #调用imsave(inverse_transform(images), size, image_path)函数
- #并返回新图像
- def save_images(images, size, image_path):
- return imsave(inverse_transform(images), size, image_path)
-
- #调用cipy.misc.imread()函数,
- #判断grayscale参数是否进行范围灰度化,
- #并进行类型转换为np.float
- def imread(path, grayscale = False):
- if (grayscale):
- return scipy.misc.imread(path, flatten = True).astype(np.float)
- else:
- return scipy.misc.imread(path).astype(np.float)
-
- #调用inverse_transform(images)函数,并返回新图像
- def merge_images(images, size):
- return inverse_transform(images)
-
- def merge(images, size):
- h, w = images.shape[1], images.shape[2]#首先获取image的高和宽
- #然后判断image是RGB图还是灰度图,以分别进行不同的处理
- if (images.shape[3] in (3,4)):#是RGB图
- c = images.shape[3]
- #size是visualize(sess, dcgan, config, option)函数中得到的
- #如果通道数是3或4,
- #则对每一批次(如,batch_size=64)的所有图像,
- #用0初始化一张原始图像放大8*8的图像
- img = np.zeros((h * size[0], w * size[1], c))
- #大概就是将大小为hxw的image
- #填入到(h * size[0])x(w * size[1])的新图像中
- #并且返回这张大图像
- #因此循环次数是(size[0] x size[1])
- for idx, image in enumerate(images):
- i = idx % size[1]#取余,为啥不是size[0]??
- j = idx // size[1]#整除,取整数部分
- img[j * h:j * h + h, i * w:i * w + w, :] = image
- return img
- elif images.shape[3]==1:#是灰度图
- img = np.zeros((h * size[0], w * size[1]))
- for idx, image in enumerate(images):
- i = idx % size[1]
- j = idx // size[1]
- #如果通道数是1,也是一样,
- #只不过填入图像的时候只填一个通道的信息
- img[j * h:j * h + h, i * w:i * w + w] = image[:,:,0]
- return img
- else:
- raise ValueError('in merge(images,size) images parameter '
- 'must have dimensions: HxW or HxWx3 or HxWx4')
-
- #首先将merge()函数返回的图像,
- #用 np.squeeze()函数移除长度为1的轴。
- #然后利用scipy.misc.imsave()函数将新图像保存到指定路径中
- def imsave(images, size, path):
- image = np.squeeze(merge(images, size))
- return scipy.misc.imsave(path, image)
-
- #对图像的H和W与crop的H和W相减,得到取整的值,
- #根据这个值作为下标依据来scipy.misc.resize图像
- def center_crop(x, crop_h, crop_w,
- resize_h=64, resize_w=64):
- if crop_w is None:
- crop_w = crop_h
- h, w = x.shape[:2]
- j = int(round((h - crop_h)/2.))
- i = int(round((w - crop_w)/2.))
- return scipy.misc.imresize(
- x[j:j+crop_h, i:i+crop_w], [resize_h, resize_w])
-
- #对输入的图像进行裁剪,
- #如果crop为true,则使用center_crop()函数,
- #对图像的H和W与crop的H和W相减,得到取整的值,
- #根据这个值作为下标依据来scipy.misc.resize图像;
-
- #否则不对图像进行其他操作,
- #直接scipy.misc.resize为64*64大小的图像。
- #最后返回图像
- def transform(image, input_height, input_width,
- resize_height=64, resize_width=64, crop=True):
- if crop:
- cropped_image = center_crop(
- image, input_height, input_width,
- resize_height, resize_width)
- else:
- cropped_image = scipy.misc.imresize(image, [resize_height, resize_width])
- return np.array(cropped_image)/127.5 - 1.#使得像素值[0:255]转换为[-1,1]
-
- #对图像进行翻转后返回新图像,像素值[-1,1]变为[0,1]
- def inverse_transform(images):
- return (images+1.)/2.
- ###########################
- ###########################
- #总结下来,这几个函数相互调用,
- #主要实现了3个图像操作功能:
- #1.获取图像get_image(),负责读取图像,返回图像裁剪后的新图像;
- #2.保存图像save_images(),负责将一个batch中所有图像
- #保存为一张大图像并返回;
- #3.图像翻转merge_images(),负责不知道怎么得翻转的,
- #返回新图像。
- ###########################
- ###########################
-
- #应该是获取每一层的权值、偏置值什么的,
- #但貌似代码中没有用到这个函数,所以先不管,后面用到再说
- def to_json(output_path, *layers):
- with open(output_path, "w") as layer_f:
- lines = ""
- for w, b, bn in layers:
- layer_idx = w.name.split('/')[0].split('h')[1]
-
- B = b.eval()
-
- if "lin/" in w.name:
- W = w.eval()
- depth = W.shape[1]
- else:
- W = np.rollaxis(w.eval(), 2, 0)
- depth = W.shape[0]
-
- biases = {"sy": 1, "sx": 1, "depth": depth, "w": ['%.2f' % elem for elem in list(B)]}
- if bn != None:
- gamma = bn.gamma.eval()
- beta = bn.beta.eval()
-
- gamma = {"sy": 1, "sx": 1, "depth": depth, "w": ['%.2f' % elem for elem in list(gamma)]}
- beta = {"sy": 1, "sx": 1, "depth": depth, "w": ['%.2f' % elem for elem in list(beta)]}
- else:
- gamma = {"sy": 1, "sx": 1, "depth": 0, "w": []}
- beta = {"sy": 1, "sx": 1, "depth": 0, "w": []}
-
- if "lin/" in w.name:
- fs = []
- for w in W.T:
- fs.append({"sy": 1, "sx": 1, "depth": W.shape[0], "w": ['%.2f' % elem for elem in list(w)]})
-
- lines += """
- var layer_%s = {
- "layer_type": "fc",
- "sy": 1, "sx": 1,
- "out_sx": 1, "out_sy": 1,
- "stride": 1, "pad": 0,
- "out_depth": %s, "in_depth": %s,
- "biases": %s,
- "gamma": %s,
- "beta": %s,
- "filters": %s
- };""" % (layer_idx.split('_')[0], W.shape[1], W.shape[0], biases, gamma, beta, fs)
- else:
- fs = []
- for w_ in W:
- fs.append({"sy": 5, "sx": 5, "depth": W.shape[3], "w": ['%.2f' % elem for elem in list(w_.flatten())]})
-
- lines += """
- var layer_%s = {
- "layer_type": "deconv",
- "sy": 5, "sx": 5,
- "out_sx": %s, "out_sy": %s,
- "stride": 2, "pad": 1,
- "out_depth": %s, "in_depth": %s,
- "biases": %s,
- "gamma": %s,
- "beta": %s,
- "filters": %s
- };""" % (layer_idx, 2**(int(layer_idx)+2), 2**(int(layer_idx)+2),
- W.shape[0], W.shape[3], biases, gamma, beta, fs)
- layer_f.write(" ".join(lines.replace("'","").split()))
-
- #利用moviepy.editor模块来制作动图,为了可视化用的。
- #函数又定义了一个函数make_frame(t),
- #首先根据图像集的长度和持续的时间做一个除法,
- #然后返回每帧图像。最后视频修剪并制作成GIF动画
- def make_gif(images, fname, duration=2, true_image=False):
- import moviepy.editor as mpy
-
- def make_frame(t):
- try:
- x = images[int(len(images)/duration*t)]
- except:
- x = images[-1]
-
- if true_image:
- return x.astype(np.uint8)
- else:
- return ((x+1)/2*255).astype(np.uint8)
-
- clip = mpy.VideoClip(make_frame, duration=duration)
- clip.write_gif(fname, fps = len(images) / duration)
-
- #分为0、1、2、3、4种option。
- #如果option=0,则之间显示生产的样本
- #如果option=1,根据不同数据集不一样的处理,
- #并利用前面的save_images()函数将sample保存下来;
- #等等。
- #本次在main.py中选用option=1
- def visualize(sess, dcgan, config, option):
- image_frame_dim = int(math.ceil(config.batch_size**.5))#(如,batch_size=64)则为64的开方(8)
- if option == 0:
- z_sample = np.random.uniform(-0.5, 0.5, size=(config.batch_size, dcgan.z_dim))
- samples = sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample})
- save_images(samples, [image_frame_dim, image_frame_dim], './samples/test_%s.png' % strftime("%Y-%m-%d-%H-%M-%S", gmtime()))
- elif option == 1:
- values = np.arange(0, 1, 1./config.batch_size)
- for idx in xrange(dcgan.z_dim):
- print(" [*] %d" % idx)
- z_sample = np.random.uniform(-1, 1, size=(config.batch_size , dcgan.z_dim))
- for kdx, z in enumerate(z_sample):
- z[idx] = values[kdx]
-
- if config.dataset == "mnist":
- y = np.random.choice(10, config.batch_size)
- y_one_hot = np.zeros((config.batch_size, 10))
- y_one_hot[np.arange(config.batch_size), y] = 1
-
- samples = sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample, dcgan.y: y_one_hot})
- else:
- samples = sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample})
-
- save_images(samples, [image_frame_dim, image_frame_dim], './samples/test_arange_%s.png' % (idx))
- elif option == 2:
- values = np.arange(0, 1, 1./config.batch_size)
- for idx in [random.randint(0, dcgan.z_dim - 1) for _ in xrange(dcgan.z_dim)]:
- print(" [*] %d" % idx)
- z = np.random.uniform(-0.2, 0.2, size=(dcgan.z_dim))
- z_sample = np.tile(z, (config.batch_size, 1))
- #z_sample = np.zeros([config.batch_size, dcgan.z_dim])
- for kdx, z in enumerate(z_sample):
- z[idx] = values[kdx]
-
- if config.dataset == "mnist":
- y = np.random.choice(10, config.batch_size)
- y_one_hot = np.zeros((config.batch_size, 10))
- y_one_hot[np.arange(config.batch_size), y] = 1
-
- samples = sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample, dcgan.y: y_one_hot})
- else:
- samples = sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample})
-
- try:
- make_gif(samples, './samples/test_gif_%s.gif' % (idx))
- except:
- save_images(samples, [image_frame_dim, image_frame_dim], './samples/test_%s.png' % strftime("%Y-%m-%d-%H-%M-%S", gmtime()))
- elif option == 3:
- values = np.arange(0, 1, 1./config.batch_size)
- for idx in xrange(dcgan.z_dim):
- print(" [*] %d" % idx)
- z_sample = np.zeros([config.batch_size, dcgan.z_dim])
- for kdx, z in enumerate(z_sample):
- z[idx] = values[kdx]
-
- samples = sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample})
- make_gif(samples, './samples/test_gif_%s.gif' % (idx))
- elif option == 4:
- image_set = []
- values = np.arange(0, 1, 1./config.batch_size)
-
- for idx in xrange(dcgan.z_dim):
- print(" [*] %d" % idx)
- z_sample = np.zeros([config.batch_size, dcgan.z_dim])
- for kdx, z in enumerate(z_sample): z[idx] = values[kdx]
-
- image_set.append(sess.run(dcgan.sampler, feed_dict={dcgan.z: z_sample}))
- make_gif(image_set[-1], './samples/test_gif_%s.gif' % (idx))
-
- new_image_set = [merge(np.array([images[idx] for images in image_set]), [10, 10]) \
- for idx in range(64) + range(63, -1, -1)]
- make_gif(new_image_set, './samples/test_gif_merged.gif', duration=8)
-
- #首先获取图像数量的开平方后向下取整的h和向上取整的w,
- #然后设置一个assert断言,如果h*w与图像数量相等,则返回h和w,
- #否则断言错误提示
- def image_manifold_size(num_images):
- manifold_h = int(np.floor(np.sqrt(num_images)))
- manifold_w = int(np.ceil(np.sqrt(num_images)))
- assert manifold_h * manifold_w == num_images
- return manifold_h, manifold_w
-
- #这就是全部utils.py全部内容,
- #主要负责图像的一些基本操作,
- #获取图像、
- #保存图像、
- #图像翻转,
- #和利用moviepy模块可视化训练过程
train_00_0099.png
train_09_0798.png
-
- import argparse
- import torch
- import torchvision
- import torchvision.utils as vutils
- import torch.nn as nn
- from random import randint
- from model import NetD, NetG
-
- parser = argparse.ArgumentParser()
- parser.add_argument('--batchSize', type=int, default=64)
- parser.add_argument('--imageSize', type=int, default=96)
- parser.add_argument('--nz', type=int, default=100, help='size of the latent z vector')
- parser.add_argument('--ngf', type=int, default=64)
- parser.add_argument('--ndf', type=int, default=64)
- parser.add_argument('--epoch', type=int, default=25, help='number of epochs to train for')
- parser.add_argument('--lr', type=float, default=0.0002, help='learning rate, default=0.0002')
- parser.add_argument('--beta1', type=float, default=0.5, help='beta1 for adam. default=0.5')
- parser.add_argument('--data_path', default='data/', help='folder to train data')
- parser.add_argument('--outf', default='imgs/', help='folder to output images and model checkpoints')
- opt = parser.parse_args()
- # 定义是否使用GPU
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
- #图像读入与预处理
- transforms = torchvision.transforms.Compose([
- torchvision.transforms.Scale(opt.imageSize),
- torchvision.transforms.ToTensor(),
- torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), ])
-
- dataset = torchvision.datasets.ImageFolder(opt.data_path, transform=transforms)
-
- dataloader = torch.utils.data.DataLoader(
- dataset=dataset,
- batch_size=opt.batchSize,
- shuffle=True,
- drop_last=True,
- )
- #默认ngf是64,nz是100,ndf是64
- netG = NetG(opt.ngf, opt.nz).to(device)
- netD = NetD(opt.ndf).to(device)
-
- criterion = nn.BCELoss()
- optimizerG = torch.optim.Adam(netG.parameters(), lr=opt.lr, betas=(opt.beta1, 0.999))
- optimizerD = torch.optim.Adam(netD.parameters(), lr=opt.lr, betas=(opt.beta1, 0.999))
-
- label = torch.FloatTensor(opt.batchSize)
- real_label = 1
- fake_label = 0
-
- for epoch in range(1, opt.epoch + 1):
- for i, (imgs,_) in enumerate(dataloader):
- # 固定生成器G,训练鉴别器D
- optimizerD.zero_grad()
- ## 让D尽可能的把真图片判别为1
- imgs=imgs.to(device)
- output = netD(imgs)
- label.data.fill_(real_label)
- label=label.to(device)
- errD_real = criterion(output, label)
- errD_real.backward()
- ## 让D尽可能把假图片判别为0
- label.data.fill_(fake_label)
- noise = torch.randn(opt.batchSize, opt.nz, 1, 1)
- noise=noise.to(device)
- fake = netG(noise) # 生成假图
- output = netD(fake.detach()) #避免梯度传到G,因为G不用更新
- errD_fake = criterion(output, label)
- errD_fake.backward()
- errD = errD_fake + errD_real
- optimizerD.step()
-
- # 固定鉴别器D,训练生成器G
- optimizerG.zero_grad()
- # 让D尽可能把G生成的假图判别为1
- label.data.fill_(real_label)
- label = label.to(device)
- output = netD(fake)
- errG = criterion(output, label)
- errG.backward()
- optimizerG.step()
-
- print('[%d/%d][%d/%d] Loss_D: %.3f Loss_G %.3f'
- % (epoch, opt.epoch, i, len(dataloader), errD.item(), errG.item()))
-
- vutils.save_image(fake.data,
- '%s/fake_samples_epoch_%03d.png' % (opt.outf, epoch),
- normalize=True)
- torch.save(netG.state_dict(), '%s/netG_%03d.pth' % (opt.outf, epoch))
- torch.save(netD.state_dict(), '%s/netD_%03d.pth' % (opt.outf, epoch))
- import torch.nn as nn
- # 定义生成器网络G
- class NetG(nn.Module):
- def __init__(self, ngf, nz):
- super(NetG, self).__init__()
- # layer1输入的是一个100x1x1的随机噪声, 输出尺寸(ngf*8)x4x4
- self.layer1 = nn.Sequential(
- nn.ConvTranspose2d(nz, ngf * 8, kernel_size=4, stride=1, padding=0, bias=False),
- nn.BatchNorm2d(ngf * 8),
- nn.ReLU(inplace=True)
- )
- # layer2输出尺寸(ngf*4)x8x8
- self.layer2 = nn.Sequential(
- nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ngf * 4),
- nn.ReLU(inplace=True)
- )
- # layer3输出尺寸(ngf*2)x16x16
- self.layer3 = nn.Sequential(
- nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ngf * 2),
- nn.ReLU(inplace=True)
- )
- # layer4输出尺寸(ngf)x32x32
- self.layer4 = nn.Sequential(
- nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ngf),
- nn.ReLU(inplace=True)
- )
- # layer5输出尺寸 3x96x96
- self.layer5 = nn.Sequential(
- nn.ConvTranspose2d(ngf, 3, 5, 3, 1, bias=False),
- nn.Tanh()
- )
-
- # 定义NetG的前向传播
- def forward(self, x):
- out = self.layer1(x)
- out = self.layer2(out)
- out = self.layer3(out)
- out = self.layer4(out)
- out = self.layer5(out)
- return out
-
-
- # 定义鉴别器网络D
- class NetD(nn.Module):
- def __init__(self, ndf):
- super(NetD, self).__init__()
- # layer1 输入 3 x 96 x 96, 输出 (ndf) x 32 x 32
- self.layer1 = nn.Sequential(
- nn.Conv2d(3, ndf, kernel_size=5, stride=3, padding=1, bias=False),
- nn.BatchNorm2d(ndf),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer2 输出 (ndf*2) x 16 x 16
- self.layer2 = nn.Sequential(
- nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ndf * 2),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer3 输出 (ndf*4) x 8 x 8
- self.layer3 = nn.Sequential(
- nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ndf * 4),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer4 输出 (ndf*8) x 4 x 4
- self.layer4 = nn.Sequential(
- nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ndf * 8),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer5 输出一个数(概率)
- self.layer5 = nn.Sequential(
- nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
- nn.Sigmoid()
- )
-
- # 定义NetD的前向传播
- def forward(self,x):
- out = self.layer1(x)
- out = self.layer2(out)
- out = self.layer3(out)
- out = self.layer4(out)
- out = self.layer5(out)
- return out
-
fake_samples_epoch_001.png
fake_samples_epoch_025.png
Tensorflow
- def generator(self, z, y=None):
- with tf.variable_scope("generator") as scope:
- #如果为假:首先获取输出的宽和高,
- #然后根据这一值得到更多不同大小的高和宽的对。
- #然后获取
- #h0层的噪音z,
- #权值w,
- #偏置值b,
- #然后利用relu激励函数。
- #h1层,
- #首先对h0层解卷积得到本层的权值和偏置值,
- #然后利用relu激励函数。
- #h2、h3等同于h1。
- #h4层,
- #解卷积h3,
- #然后直接返回使用tanh激励函数后的h4
- if not self.y_dim:
- s_h, s_w = self.output_height, self.output_width
- s_h2, s_w2 = conv_out_size_same(s_h, 2), conv_out_size_same(s_w, 2)
- s_h4, s_w4 = conv_out_size_same(s_h2, 2), conv_out_size_same(s_w2, 2)
- s_h8, s_w8 = conv_out_size_same(s_h4, 2), conv_out_size_same(s_w4, 2)
- s_h16, s_w16 = conv_out_size_same(s_h8, 2), conv_out_size_same(s_w8, 2)
-
- # project `z` and reshape
- self.z_, self.h0_w, self.h0_b = linear(
- z, self.gf_dim*8*s_h16*s_w16, 'g_h0_lin', with_w=True)
-
- self.h0 = tf.reshape(
- self.z_, [-1, s_h16, s_w16, self.gf_dim * 8])
- h0 = tf.nn.relu(self.g_bn0(self.h0))
-
- self.h1, self.h1_w, self.h1_b = deconv2d(
- h0, [self.batch_size, s_h8, s_w8, self.gf_dim*4], name='g_h1', with_w=True)
- h1 = tf.nn.relu(self.g_bn1(self.h1))
-
- h2, self.h2_w, self.h2_b = deconv2d(
- h1, [self.batch_size, s_h4, s_w4, self.gf_dim*2], name='g_h2', with_w=True)
- h2 = tf.nn.relu(self.g_bn2(h2))
-
- h3, self.h3_w, self.h3_b = deconv2d(
- h2, [self.batch_size, s_h2, s_w2, self.gf_dim*1], name='g_h3', with_w=True)
- h3 = tf.nn.relu(self.g_bn3(h3))
-
- h4, self.h4_w, self.h4_b = deconv2d(
- h3, [self.batch_size, s_h, s_w, self.c_dim], name='g_h4', with_w=True)
-
- return tf.nn.tanh(h4)
- else:
- s_h, s_w = self.output_height, self.output_width
- s_h2, s_h4 = int(s_h/2), int(s_h/4)
- s_w2, s_w4 = int(s_w/2), int(s_w/4)
-
- # yb = tf.expand_dims(tf.expand_dims(y, 1),2)
- yb = tf.reshape(y, [self.batch_size, 1, 1, self.y_dim])
- z = concat([z, y], 1)
-
- h0 = tf.nn.relu(
- self.g_bn0(linear(z, self.gfc_dim, 'g_h0_lin')))
- h0 = concat([h0, y], 1)
-
- h1 = tf.nn.relu(self.g_bn1(
- linear(h0, self.gf_dim*2*s_h4*s_w4, 'g_h1_lin')))
- h1 = tf.reshape(h1, [self.batch_size, s_h4, s_w4, self.gf_dim * 2])
-
- h1 = conv_cond_concat(h1, yb)
-
- h2 = tf.nn.relu(self.g_bn2(deconv2d(h1,
- [self.batch_size, s_h2, s_w2, self.gf_dim * 2], name='g_h2')))
- h2 = conv_cond_concat(h2, yb)
-
- return tf.nn.sigmoid(
- deconv2d(h2, [self.batch_size, s_h, s_w, self.c_dim], name='g_h3'))
Pytorch
- # 定义生成器网络G
- class NetG(nn.Module):
- def __init__(self, ngf, nz):
- super(NetG, self).__init__()
- # layer1输入的是一个100x1x1的随机噪声, 输出尺寸(ngf*8)x4x4
- self.layer1 = nn.Sequential(
- nn.ConvTranspose2d(nz, ngf * 8, kernel_size=4, stride=1, padding=0, bias=False),
- nn.BatchNorm2d(ngf * 8),
- nn.ReLU(inplace=True)
- )
- # layer2输出尺寸(ngf*4)x8x8
- self.layer2 = nn.Sequential(
- nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ngf * 4),
- nn.ReLU(inplace=True)
- )
- # layer3输出尺寸(ngf*2)x16x16
- self.layer3 = nn.Sequential(
- nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ngf * 2),
- nn.ReLU(inplace=True)
- )
- # layer4输出尺寸(ngf)x32x32
- self.layer4 = nn.Sequential(
- nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ngf),
- nn.ReLU(inplace=True)
- )
- # layer5输出尺寸 3x96x96
- self.layer5 = nn.Sequential(
- nn.ConvTranspose2d(ngf, 3, 5, 3, 1, bias=False),
- nn.Tanh()
- )
-
- # 定义NetG的前向传播
- def forward(self, x):
- out = self.layer1(x)
- out = self.layer2(out)
- out = self.layer3(out)
- out = self.layer4(out)
- out = self.layer5(out)
- return out
TensorFlow
- def discriminator(self, image, y=None, reuse=False):
- with tf.variable_scope("discriminator") as scope:
- if reuse:
- scope.reuse_variables()
- #如果为假,
- #则直接设置5层,
- #前4层为使用lrelu激活函数的卷积层,
- #最后一层是使用线性层,
- #最后返回h4和sigmoid处理后的h4
- if not self.y_dim:
- h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
- h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
- h2 = lrelu(self.d_bn2(conv2d(h1, self.df_dim*4, name='d_h2_conv')))
- h3 = lrelu(self.d_bn3(conv2d(h2, self.df_dim*8, name='d_h3_conv')))
- h4 = linear(tf.reshape(h3, [self.batch_size, -1]), 1, 'd_h4_lin')
-
- return tf.nn.sigmoid(h4), h4
- #如果为真,
- #则首先将Y_dim变为yb,
- #然后利用ops.py文件中的conv_cond_concat函数,
- #连接image与yb得到x,
- #然后设置4层网络,
- #前3层是使用lrelu激励函数的卷积层,
- #最后一层是线性层,
- #最后返回h3和sigmoid处理后的h3
- else:
- yb = tf.reshape(y, [self.batch_size, 1, 1, self.y_dim])
- x = conv_cond_concat(image, yb)
-
- h0 = lrelu(conv2d(x, self.c_dim + self.y_dim, name='d_h0_conv'))
- h0 = conv_cond_concat(h0, yb)
-
- h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim + self.y_dim, name='d_h1_conv')))
- h1 = tf.reshape(h1, [self.batch_size, -1])
- h1 = concat([h1, y], 1)
-
- h2 = lrelu(self.d_bn2(linear(h1, self.dfc_dim, 'd_h2_lin')))
- h2 = concat([h2, y], 1)
-
- h3 = linear(h2, 1, 'd_h3_lin')
-
- return tf.nn.sigmoid(h3), h3
Pytorch
- # 定义鉴别器网络D
- class NetD(nn.Module):
- def __init__(self, ndf):
- super(NetD, self).__init__()
- # layer1 输入 3 x 96 x 96, 输出 (ndf) x 32 x 32
- self.layer1 = nn.Sequential(
- nn.Conv2d(3, ndf, kernel_size=5, stride=3, padding=1, bias=False),
- nn.BatchNorm2d(ndf),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer2 输出 (ndf*2) x 16 x 16
- self.layer2 = nn.Sequential(
- nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ndf * 2),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer3 输出 (ndf*4) x 8 x 8
- self.layer3 = nn.Sequential(
- nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ndf * 4),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer4 输出 (ndf*8) x 4 x 4
- self.layer4 = nn.Sequential(
- nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
- nn.BatchNorm2d(ndf * 8),
- nn.LeakyReLU(0.2, inplace=True)
- )
- # layer5 输出一个数(概率)
- self.layer5 = nn.Sequential(
- nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
- nn.Sigmoid()
- )
-
- # 定义NetD的前向传播
- def forward(self,x):
- out = self.layer1(x)
- out = self.layer2(out)
- out = self.layer3(out)
- out = self.layer4(out)
- out = self.layer5(out)
- return out
相比TensorFlow,Pytorch代码还是要看着舒服一些~