The Art of Noise

-

In my last several articles I talked about generative deep learning algorithms, which mostly are related to text generation tasks. So, I believe it will be interesting to change to generative algorithms for image generation now. We knew that these days there have been loads of deep learning models specialized for generating images on the market, similar to Autoencoder, Variational Autoencoder (VAE), Generative Adversarial Network (GAN) and Neural Style Transfer (NST). I actually got a few of my writings about these topics posted on Medium as well. I provide you the links at the top of this text if you ought to read them.

In today’s article, I would really like to debate the so-called — one of the impactful models in the sector of deep learning for image generation. The thought of this algorithm was first proposed within the paper titled written by Sohl-Dickstein back in 2015 [1]. Their framework was then developed further by Ho in 2020 of their paper titled [2]. was later adapted by OpenAI and Google to develop DALLE-2 and Imagen, which we knew that these models have impressive capabilities to generate high-quality images.

How Diffusion Model Works

Generally speaking, diffusion model works by generating image from noise. We are able to consider it like an artist transforming a splash of paint on a canvas into an attractive artwork. So as to achieve this, the diffusion model must be trained first. There are two principal steps required to be followed to coach the model, namely and .

Figure 1. The forward and backward diffusion process [3].

As you may see within the above figure, forward diffusion is a process where Gaussian noise is applied to the unique image iteratively. We keep adding the noise until the image is totally unrecognizable, at which point we are able to say that the image now lies within the . Different from Autoencoders and GANs where the latent space typically has a lower dimension than the unique image, the latent space in DDPM maintains the very same dimensionality as the unique one. This noising process follows the principle of a Markov Chain, meaning that the image at timestep is affected only by timestep -1. Forward diffusion is taken into account easy since what we principally do is just adding some noise step-by-step.

The second training phase is named backward diffusion, which our objective here is to remove the noise little by little until we obtain a transparent image. This process follows the principle of the Markov Chain, where the image at timestep -1 can only be obtained based on the image at timestep . Such a denoising process is actually difficult since we’d like to guess which pixels are noise and which of them belong to the actual image content. Thus, we’d like to employ a neural network model to achieve this.

DDPM uses U-Net as the idea of the deep learning architecture for backward diffusion. Nonetheless, as a substitute of using the unique U-Net model [4], we’d like to make several modifications to it in order that it can be more suitable for our task. Afterward, I’m going to coach this model on the MNIST Handwritten Digit dataset [5], and we’ll see whether it could possibly generate similar images.

Well, that was just about all the elemental concepts you should find out about diffusion models for now. In the subsequent sections we’re going to get even deeper into the main points while implementing the algorithm from scratch.


PyTorch Implementation

We’re going to start by importing the required modules. In case you’re not yet accustomed to the imports below, each torch and torchvision are the libraries we’ll use for preparing the model and the dataset. Meanwhile, matplotlib and tqdm will help us display images and progress bars.

# Codeblock 1
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm

Because the modules have been imported, the subsequent thing to do is to initialize some config parameters. Take a look at the Codeblock 2 below for the main points.

# Codeblock 2
IMAGE_SIZE     = 28     #(1)
NUM_CHANNELS   = 1      #(2)

BATCH_SIZE     = 2
NUM_EPOCHS     = 10
LEARNING_RATE  = 0.001

NUM_TIMESTEPS  = 1000   #(3)
BETA_START     = 0.0001 #(4)
BETA_END       = 0.02   #(5)
TIME_EMBED_DIM = 32     #(6)
DEVICE = torch.device("cuda" if torch.cuda.is_available else "cpu")  #(7)
DEVICE
# Codeblock 2 Output
device(type='cuda')

On the lines marked with #(1) and #(2) I set IMAGE_SIZE and NUM_CHANNELS to twenty-eight and 1, which these numbers are obtained from the image dimension within the MNIST dataset. The BATCH_SIZE, NUM_EPOCHS, and LEARNING_RATE variables are pretty straightforward, so I don’t think I want to elucidate them further.

At line #(3), the variable NUM_TIMESTEPS denotes the variety of iterations within the forward and backward diffusion process. Timestep 0 is the condition where the image is in its original state (the leftmost image in Figure 1). On this case, since we set this parameter to 1000, timestep number 999 goes to be the condition where the image is totally unrecognizable (the rightmost image in Figure 1). It is crucial to consider that the alternative of the variety of timesteps involves a tradeoff between model accuracy and computational cost. If we assign a small value for NUM_TIMESTEPS, the inference time goes to be shorter, yet the resulting image won’t be really good for the reason that model has fewer steps to refine the image within the backward diffusion stage. Alternatively, increasing NUM_TIMESTEPS will decelerate the inference process, but we are able to expect the output image to have higher quality because of the gradual denoising process which ends up in a more precise reconstruction.

Next, the BETA_START (#(4)) and BETA_END (#(5)) variables are used to manage the quantity of Gaussian noise added at each timestep, whereas TIME_EMBED_DIM (#(6)) is employed to find out the feature vector length for storing the timestep information. Lastly, at line #(7) I assign “cuda” to the DEVICE variable if Pytorch detects GPU installed in our machine. I highly recommend you run this project on GPU since training a diffusion model is computationally expensive. Along with the above parameters, the values set for NUM_TIMESTEPS, BETA_START and BETA_END are all adopted directly from the DDPM paper [2].

The whole implementation can be done in several steps: constructing the U-Net model, preparing the dataset, defining noise scheduler for the diffusion process, training, and inference. We’re going to discuss each of those stages in the next sub-sections.


The U-Net Architecture: Time Embedding

As I’ve mentioned earlier, the idea of a diffusion model is U-Net. This architecture is used because its output layer is suitable to represent a picture, which definitely is smart because it was initially introduced for image segmentation task at the primary place. The next figure shows what the unique U-Net architecture looks like.

Figure 2. The unique U-Net model proposed in [4].

Nonetheless, it’s essential to switch this architecture in order that it could possibly also take note of the timestep information. Not only that, since we’ll only use MNIST dataset, we also must make the model smaller. Just remember the convention in deep learning that simpler models are sometimes more practical for easy tasks.

Within the figure below I show you your complete U-Net model that has been modified. Here you may see that the tensor is injected to the model at every stage, which can later be done by element-wise summation, allowing the model to capture the timestep information. Next, as a substitute of repeating each of the downsampling and the upsampling stages 4 times like the unique U-Net, on this case we’ll only repeat each of them twice. Moreover, it’s value noting that the stack of downsampling stages can be referred to as the , whereas the stack of upsampling stages is commonly called the .

Figure 3. The modified U-Net model for our diffusion task [3].

Now let’s start constructing the architecture by making a class for generating the time embedding tensor, which the thought is comparable to the in Transformer. See the Codeblock 3 below for the main points.

# Codeblock 3
class TimeEmbedding(nn.Module):
    def forward(self):
        time = torch.arange(NUM_TIMESTEPS, device=DEVICE).reshape(NUM_TIMESTEPS, 1)  #(1)
        print(f"timett: {time.shape}")
          
        i = torch.arange(0, TIME_EMBED_DIM, 2, device=DEVICE)
        denominator = torch.pow(10000, i/TIME_EMBED_DIM)
        print(f"denominatort: {denominator.shape}")
          
        even_time_embed = torch.sin(time/denominator)  #(1)
        odd_time_embed  = torch.cos(time/denominator)  #(2)
        print(f"even_time_embedt: {even_time_embed.shape}")
        print(f"odd_time_embedt: {odd_time_embed.shape}")
          
        stacked = torch.stack([even_time_embed, odd_time_embed], dim=2)  #(3)
        print(f"stackedtt: {stacked.shape}")
        time_embed = torch.flatten(stacked, start_dim=1, end_dim=2)  #(4)
        print(f"time_embedt: {time_embed.shape}")
          
        return time_embed

What we principally do within the above code is to create a tensor of size NUM_TIMESTEPS × TIME_EMBED_DIM (1000×32), where each row of this tensor will contain the timestep information. Afterward, each of the 1000 timesteps can be represented by a feature vector of length 32. The values within the tensor themselves are obtained based on the 2 equations in Figure 4. Within the Codeblock 3 above, these two equations are implemented at line #(1) and #(2), each forming a tensor having the scale of 1000×16. Next, these tensors are combined using the code at line #(3) and #(4).

Here I also print out each step done within the above codeblock so which you can get a greater understanding of what is definitely being done within the TimeEmbedding class. In case you still want more explanation in regards to the above code, be at liberty to read my previous post about Transformer which you’ll access through the link at the top of this text. When you clicked the link, you may just scroll all the way in which all the way down to the Positional Encoding section.

Figure 4. The sinusoidal positional encoding formula from the Transformer paper [6].

Now let’s check if the TimeEmbedding class works properly using the next testing code. The resulting output shows that it successfully produced a tensor of size 1000×32, which is strictly what we expected earlier.

# Codeblock 4
time_embed_test = TimeEmbedding()
out_test = time_embed_test()
# Codeblock 4 Output
time            : torch.Size([1000, 1])
denominator     : torch.Size([16])
even_time_embed : torch.Size([1000, 16])
odd_time_embed  : torch.Size([1000, 16])
stacked         : torch.Size([1000, 16, 2])
time_embed      : torch.Size([1000, 32])

The U-Net Architecture: DoubleConv

In case you take a better take a look at the modified architecture, you will notice that we actually got numerous repeating patterns, similar to those highlighted in yellow boxes in the next figure.

Figure 5. The processes done contained in the yellow boxes can be implemented within the DoubleConv class [3].

These five yellow boxes share the identical structure, where they consist of two convolution layers with the time embedding tensor injected right after the primary convolution operation is performed. So, what we’re going to do now could be to create one other class named DoubleConv to breed this structure. Take a look at the Codeblock 5a and 5b below to see how I do this.

# Codeblock 5a
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):  #(1)
        super().__init__()
        
        self.conv_0 = nn.Conv2d(in_channels=in_channels,  #(2)
                                out_channels=out_channels, 
                                kernel_size=3, 
                                bias=False, 
                                padding=1)
        self.bn_0 = nn.BatchNorm2d(num_features=out_channels)  #(3)
        
        self.time_embedding = TimeEmbedding()  #(4)
        self.linear = nn.Linear(in_features=TIME_EMBED_DIM,  #(5)
                                out_features=out_channels)
        
        self.conv_1 = nn.Conv2d(in_channels=out_channels,  #(6)
                                out_channels=out_channels, 
                                kernel_size=3, 
                                bias=False, 
                                padding=1)
        self.bn_1 = nn.BatchNorm2d(num_features=out_channels)  #(7)
        
        self.relu = nn.ReLU(inplace=True)  #(8)

The 2 inputs of the __init__() method above gives us flexibility to configure the variety of input and output channels (#(1)) in order that the DoubleConv class will be used to instantiate all of the five yellow boxes just by adjusting its input arguments. Because the name suggests, here we initialize two convolution layers (line #(2) and #(6)), each followed by a batch normalization layer and a ReLU activation function. Be mindful that the 2 normalization layers have to be initialized individually (line #(3) and #(7)) since each of them has their very own trainable normalization parameters. Meanwhile, the ReLU activation function should only be initialized once (#(8)) since it incorporates no parameters, allowing it for use multiple times in numerous parts of the network. At line #(4), we initialize the TimeEmbedding layer we created earlier, which can later be connected to an ordinary linear layer (#(5)). This linear layer is responsible to regulate the dimension of the time embedding tensor in order that the resulting output will be summed with the output from the primary convolution layer in an element-wise manner.

Now let’s take a take a look at the Codeblock 5b below to raised understand the flow of the DoubleConv block. Here you may see that the forward() method accepts two inputs: the raw image x and the timestep information t as shown at line #(1). We initially process the image with the primary Conv-BN-ReLU sequence (#(2–4)). This Conv-BN-ReLU structure is often used when working with CNN-based models, even when the illustration doesn’t explicitly show the batch normalization and the ReLU layers. Other than the image, we then take the -th timestep information from our embedding tensor of the corresponding image (#(5)) and pass it through the linear layer (#(6)). We still must expand the dimension of the resulting tensor using the code at line #(7) before performing element-wise summation at line #(8). Finally, we process the resulting tensor with the second Conv-BN-ReLU sequence (#(9–11)).

# Codeblock 5b
    def forward(self, x, t):  #(1)
        print(f'imagesttt: {x.size()}')
        print(f'timestepstt: {t.size()}, {t}')
        
        x = self.conv_0(x)  #(2)
        x = self.bn_0(x)    #(3)
        x = self.relu(x)    #(4)
        print(f'nafter first convt: {x.size()}')
        
        time_embed = self.time_embedding()[t]      #(5)
        print(f'ntime_embedtt: {time_embed.size()}')
        
        time_embed = self.linear(time_embed)       #(6)
        print(f'time_embed after lineart: {time_embed.size()}')
        
        time_embed = time_embed[:, :, None, None]  #(7)
        print(f'time_embed expandedt: {time_embed.size()}')
        
        x = x + time_embed  #(8)
        print(f'nafter summationtt: {x.size()}')
        
        x = self.conv_1(x)  #(9)
        x = self.bn_1(x)    #(10)
        x = self.relu(x)    #(11)
        print(f'after second convt: {x.size()}')
        
        return x

To see if our DoubleConv implementation works properly, we’re going to test it with the Codeblock 6 below. Here I need to simulate the very first instance of this block, which corresponds to the leftmost yellow box in Figure 5. To achieve this, we’d like to we’d like to set the in_channels and out_channels parameters to 1 and 64, respectively (#(1)). Next, we initialize two input tensors, namely x_test and t_test. The x_test tensor has the scale of two×1×28×28, representing a batch of two grayscale images having the scale of 28×28 (#(2)). Be mindful that that is only a dummy tensor of random values which can be replaced with the actual images from MNIST dataset later within the training phase. Meanwhile, t_test is a tensor containing the timestep numbers of the corresponding images (#(3)). The values for this tensor are randomly chosen between 0 and NUM_TIMESTEPS (1000). Note that the datatype of this tensor have to be an integer for the reason that numbers can be used for indexing, as shown at line #(5) back in Codeblock 5b. Lastly, at line #(4) we pass each x_test and t_test tensors to the double_conv_test layer.

By the way in which, I re-run the previous codeblocks with the print() functions removed prior to running the next code in order that the outputs will look neater.

# Codeblock 6
double_conv_test = DoubleConv(in_channels=1, out_channels=64).to(DEVICE)  #(1)

x_test = torch.randn((BATCH_SIZE, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE)  #(2)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)  #(3)

out_test = double_conv_test(x_test, t_test)  #(4)
# Codeblock 6 Output
images                  : torch.Size([2, 1, 28, 28])   #(1)
timesteps               : torch.Size([2]), tensor([468, 304], device='cuda:0')  #(2)

after first conv        : torch.Size([2, 64, 28, 28])  #(3)

time_embed              : torch.Size([2, 32])          #(4)
time_embed after linear : torch.Size([2, 64])
time_embed expanded     : torch.Size([2, 64, 1, 1])    #(5)

after summation         : torch.Size([2, 64, 28, 28])  #(6)
after second conv       : torch.Size([2, 64, 28, 28])  #(7)

The form of our original input tensors will be seen at lines #(1) and #(2) within the above output. Specifically at line #(2), I also print out the 2 timesteps that we chosen randomly. In this instance we assume that every of the 2 images within the x tensor are already noised with the noise level from 468-th and 304-th timesteps prior to being fed into the network. We are able to see that the form of the image tensor x changes to 2×64×28×28 after being passed through the primary convolution layer (#(3)). Meanwhile, the scale of our time embedding tensor becomes 2×32 (#(4)), which is obtained by extracting rows 468 and 304 from the unique embedding of size 1000×32. So as to allow element-wise summation to be performed (#(6)), we’d like to map the 32-dimensional time embedding vectors into 64 and expand their axes, leading to a tensor of size 2×64×1×1 (#(5)) in order that it could possibly be broadcast to the two×64×28×28 tensor. After the summation is completed, we then pass the tensor through the second convolution layer, at which point the tensor dimension doesn’t change in any respect (#(7)).


The U-Net Architecture: Encoder

As we’ve got successfully implemented the DoubleConv block, the subsequent step to do is to implement the so-called DownSample block. In Figure 6 below, this corresponds to the parts enclosed within the red box.

Figure 6. The parts of the network highlighted in red are the so-called DownSample blocks [3].

The aim of a DownSample block is to scale back the spatial dimension of a picture, but it is necessary to notice that at the identical time it increases the variety of channels. So as to achieve this, we are able to simply stack a DoubleConv block and a maxpooling operation. On this case the pooling uses 2×2 kernel size with the stride of two, causing the spatial dimension of the image to be twice as small because the input. The implementation of this block will be seen in Codeblock 7 below.

# Codeblock 7
class DownSample(nn.Module):
    def __init__(self, in_channels, out_channels):  #(1)
        super().__init__()
        
        self.double_conv = DoubleConv(in_channels=in_channels,  #(2)
                                      out_channels=out_channels)
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)    #(3)
    
    def forward(self, x, t):  #(4)
        print(f'originaltt: {x.size()}')
        print(f'timestepstt: {t.size()}, {t}')
        
        convolved = self.double_conv(x, t)   #(5)
        print(f'nafter double convt: {convolved.size()}')
        
        maxpooled = self.maxpool(convolved)  #(6)
        print(f'after poolingtt: {maxpooled.size()}')
        
        return convolved, maxpooled          #(7)

Here I set the __init__() method to take variety of input and output channels in order that we are able to use it for creating the 2 DownSample blocks highlighted in Figure 6 with no need to put in writing them in separate classes (#(1)). Next, the DoubleConv and the maxpooling layers are initialized at line #(2) and #(3), respectively. Keep in mind that for the reason that DoubleConv block accepts image x and the corresponding timestep t because the inputs, we also must set the forward() approach to this DownSample block such that it accepts each of them as well (#(4)). The data contained in x and t are then combined because the two tensors are processed by the double_conv layer, which the output is stored within the variable named convolved (#(5)). Afterwards, we now actually perform the downsampling with the maxpooling operation at line #(6), producing a tensor named maxpooled. It is crucial to notice that each the convolved and maxpooled tensors are going to be returned, which is actually done because we’ll later bring maxpooled to the subsequent downsampling stage, whereas the convolved tensor can be transferred on to the upsampling stage within the decoder through skip-connections.

Now let’s test the DownSample class using the Codeblock 8 below. The input tensors used listed below are the exact same because the ones in Codeblock 6. Based on the resulting output, we are able to see that the pooling operation successfully converted the output of the DoubleConv block from 2×64×28×28 (#(1)) to 2×64×14×14 (#(2)), indicating that our DownSample class works properly.

# Codeblock 8
down_sample_test = DownSample(in_channels=1, out_channels=64).to(DEVICE)

x_test = torch.randn((BATCH_SIZE, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)

out_test = down_sample_test(x_test, t_test)
# Codeblock 8 Output
original          : torch.Size([2, 1, 28, 28])
timesteps         : torch.Size([2]), tensor([468, 304], device='cuda:0')

after double conv : torch.Size([2, 64, 28, 28])  #(1)
after pooling     : torch.Size([2, 64, 14, 14])  #(2)

The U-Net Architecture: Decoder

We want to introduce the so-called UpSample block within the decoder, which is accountable for reverting the tensor within the intermediate layers to the unique image dimension. So as to maintain a symmetrical structure, the variety of UpSample blocks must match that of the DownSample blocks. Take a look at the Figure 7 below to see where the 2 UpSample blocks are placed.

Figure 7. The components contained in the blue boxes are the so-called UpSample blocks [3].

Since each UpSample blocks are structurally equivalent, we are able to just initialize a single class for them, identical to the DownSample class we created earlier. Take a look at the Codeblock 9 below to see how I implement it.

# Codeblock 9
class UpSample(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        
        self.conv_transpose = nn.ConvTranspose2d(in_channels=in_channels,  #(1)
                                                 out_channels=out_channels, 
                                                 kernel_size=2, stride=2)  #(2)
        self.double_conv = DoubleConv(in_channels=in_channels,  #(3)
                                      out_channels=out_channels)
        
    def forward(self, x, t, connection):  #(4)
        print(f'originaltt: {x.size()}')
        print(f'timestepstt: {t.size()}, {t}')
        print(f'connectiontt: {connection.size()}')
        
        x = self.conv_transpose(x)  #(5)
        print(f'nafter conv transposet: {x.size()}')
        
        x = torch.cat([x, connection], dim=1)  #(6)
        print(f'after concattt: {x.size()}')
        
        x = self.double_conv(x, t)  #(7)
        print(f'after double convt: {x.size()}')
        
        return x

Within the __init__() method, we use nn.ConvTranspose2d to upsample the spatial dimension (#(1)). Each the kernel size and stride are set to 2 in order that the output can be twice as large (#(2)). Next, the DoubleConv block can be employed to scale back the variety of channels, while at the identical time combining the timestep information from the time embedding tensor (#(3)).

The flow of this UpSample class is a little more complicated than the DownSample class. If we take a better take a look at the architecture, we’ll see that that we even have a skip-connection coming directly from the encoder. Thus, we’d like the forward() method to simply accept one other argument along with the unique image x and the timestep t, namely the residual tensor connection (#(4)). The very first thing we do inside this method is to process the unique image x with the transpose convolution layer (#(5)). Actually, not only upsampling the spatial size, but this layer also reduces the variety of channels at the identical time. Nonetheless, the resulting tensor is then directly concatenated with connection in a channel-wise manner (#(6)), causing it to seem to be no channel reduction is performed. It is crucial to know that at this point these two tensors are only concatenated, meaning that the knowledge from the 2 will not be yet combined. We finally feed these concatenated tensors to the double_conv layer (#(7)), allowing them to share information to one another through the learnable parameters contained in the convolution layers.

The Codeblock 10 below shows how I test the UpSample class. The scale of the tensors to be passed through are set in keeping with the second upsampling block, i.e., the rightmost blue box in Figure 7.

# Codeblock 10
up_sample_test = UpSample(in_channels=128, out_channels=64).to(DEVICE)

x_test = torch.randn((BATCH_SIZE, 128, 14, 14)).to(DEVICE)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)
connection_test = torch.randn((BATCH_SIZE, 64, 28, 28)).to(DEVICE)

out_test = up_sample_test(x_test, t_test, connection_test)

Within the resulting output below, if we compare the input tensor (#(1)) with the ultimate tensor shape (#(2)), we are able to clearly see that the variety of channels successfully reduced from 128 to 64, while at the identical time the spatial dimension increased from 14×14 to twenty-eight×28. This essentially signifies that our UpSample class is now able to be utilized in the principal U-Net architecture.

# Codeblock 10 Output
original             : torch.Size([2, 128, 14, 14])   #(1)
timesteps            : torch.Size([2]), tensor([468, 304], device='cuda:0')
connection           : torch.Size([2, 64, 28, 28])

after conv transpose : torch.Size([2, 64, 28, 28])
after concat         : torch.Size([2, 128, 28, 28])
after double conv    : torch.Size([2, 64, 28, 28])    #(2)

The U-Net Architecture: Putting All Components Together

Once all U-Net components have been created, what we’re going to do next is to wrap them together right into a single class. Take a look at the Codeblock 11a and 11b below for the main points.

# Codeblock 11a
class UNet(nn.Module):
    def __init__(self):
        super().__init__()
      
        self.downsample_0 = DownSample(in_channels=NUM_CHANNELS,  #(1)
                                       out_channels=64)
        self.downsample_1 = DownSample(in_channels=64,            #(2)
                                       out_channels=128)
      
        self.bottleneck   = DoubleConv(in_channels=128,           #(3)
                                       out_channels=256)
      
        self.upsample_0   = UpSample(in_channels=256,             #(4)
                                     out_channels=128)
        self.upsample_1   = UpSample(in_channels=128,             #(5)
                                     out_channels=64)
      
        self.output = nn.Conv2d(in_channels=64,                   #(6)
                                out_channels=NUM_CHANNELS,
                                kernel_size=1)

You possibly can see within the __init__() method above that we initialize two downsampling (#(1–2)) and two upsampling (#(4–5)) blocks, which the variety of input and output channels are set in keeping with the architecture shown within the illustration. There are literally two additional components I haven’t explained yet, namely the (#(3)) and the layer (#(6)). The previous is actually only a DoubleConv block, which acts because the principal connection between the encoder and the decoder. Take a look at the Figure 8 below to see which components of the network belong to the layer. Next, the layer is an ordinary convolution layer which is responsible to show the 64-channel image produced by the last UpSampling stage into 1-channel only. This operation is completed using a kernel of size 1×1, meaning that it combines information across all channels while operating independently at each pixel position.

Figure 8. The bottleneck layer (the lower a part of the model) acts because the principal bridge between the encoder and the decoder of U-Net [3].

I suppose the forward() approach to your complete U-Net in the next codeblock is pretty straightforward, as what we essentially do here is pass the tensors from one layer to a different — just don’t forget to incorporate the skip connections between the downsampling and upsampling blocks.

# Codeblock 11b
    def forward(self, x, t):  #(1)
        print(f'originaltt: {x.size()}')
        print(f'timestepstt: {t.size()}, {t}')
            
        convolved_0, maxpooled_0 = self.downsample_0(x, t)
        print(f'nmaxpooled_0tt: {maxpooled_0.size()}')
            
        convolved_1, maxpooled_1 = self.downsample_1(maxpooled_0, t)
        print(f'maxpooled_1tt: {maxpooled_1.size()}')
            
        x = self.bottleneck(maxpooled_1, t)
        print(f'after bottleneckt: {x.size()}')
    
        upsampled_0 = self.upsample_0(x, t, convolved_1)
        print(f'upsampled_0tt: {upsampled_0.size()}')
            
        upsampled_1 = self.upsample_1(upsampled_0, t, convolved_0)
        print(f'upsampled_1tt: {upsampled_1.size()}')
            
        x = self.output(upsampled_1)
        print(f'final outputtt: {x.size()}')
            
        return x

Now let’s see whether we’ve got accurately constructed the U-Net class above by running the next testing code.

# Codeblock 12
unet_test = UNet().to(DEVICE)

x_test = torch.randn((BATCH_SIZE, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE)
t_test = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,)).to(DEVICE)

out_test = unet_test(x_test, t_test)
# Codeblock 12 Output
original         : torch.Size([2, 1, 28, 28])   #(1)
timesteps        : torch.Size([2]), tensor([468, 304], device='cuda:0')

maxpooled_0      : torch.Size([2, 64, 14, 14])  #(2)
maxpooled_1      : torch.Size([2, 128, 7, 7])   #(3)
after bottleneck : torch.Size([2, 256, 7, 7])   #(4)
upsampled_0      : torch.Size([2, 128, 14, 14])
upsampled_1      : torch.Size([2, 64, 28, 28])
final output     : torch.Size([2, 1, 28, 28])   #(5)

We are able to see within the above output that the 2 downsampling stages successfully converted the unique tensor of size 1×28×28 (#(1)) into 64×14×14 (#(2)) and 128×7×7 (#(3)), respectively. This tensor is then passed through the bottleneck layer, causing its variety of channels to expand to 256 without changing the spatial dimension (#(4)). Lastly, we upsample the tensor twice before eventually shrinking the variety of channels to 1 (#(5)). Based on this output, it looks like our model is working properly. Thus, it’s now able to be trained for our diffusion task.


Dataset Preparation

As we’ve got successfully created your complete U-Net architecture, the subsequent thing to do is to arrange the MNIST Handwritten Digit dataset. Before actually loading it, we’d like to define the preprocessing steps first using the transforms.Compose() method from Torchvision, as shown at line #(1) in Codeblock 13. There are two things we do here: converting the pictures into PyTorch tensors which also scales the pixel values from 0–255 to 0–1 (#(2)), and normalize them in order that the ultimate pixel values ranging between -1 and 1 (#(3)). Next, we download the dataset using datasets.MNIST(). On this case, we’re going to take the pictures from the training data, hence we use train=True (#(5)). Don’t forget to pass the transform variable we initialized earlier to the transform parameter (transform=transform) in order that it can robotically preprocess the pictures as we load them (#(6)). Lastly, we’d like to employ DataLoader to load the pictures from mnist_dataset (#(7)). The arguments I exploit for the input parameters are intended to randomly pick BATCH_SIZE (2) images from the dataset in each iteration.

# Codeblock 13
transform = transforms.Compose([  #(1)
    transforms.ToTensor(),        #(2)
    transforms.Normalize((0.5,), (0.5,))  #(3)
])

mnist_dataset = datasets.MNIST(   #(4)
    root='./data', 
    train=True,           #(5)
    download=True, 
    transform=transform   #(6)
)

loader = DataLoader(mnist_dataset,  #(7)
                    batch_size=BATCH_SIZE,
                    drop_last=True, 
                    shuffle=True)

In the next codeblock, I attempt to load a batch of images from the dataset. In every iteration, loader provides each the pictures and the corresponding labels, hence we’d like to store them in two separate variables: images and labels.

# Codeblock 14
images, labels = next(iter(loader))

print('imagestt:', images.shape)
print('labelstt:', labels.shape)
print('min valuet:', images.min())
print('max valuet:', images.max())

We are able to see within the resulting output below that the images tensor has the scale of two×1×28×28 (#(1)), indicating that two grayscale images of size 28×28 have been successfully loaded. Here we may see that the length of the labels tensor is 2, which matches the variety of the loaded images (#(2)). Note that on this case the labels are going to be completely ignored. My plan here is that I just want the model to generate any number it previously seen from your complete training dataset without even knowing what number it actually is. Lastly, this output also shows that the preprocessing works properly, because the pixel values now range between -1 and 1.

# Codeblock 14 Output
images    : torch.Size([2, 1, 28, 28])  #(1)
labels    : torch.Size([2])             #(2)
min value : tensor(-1.)
max value : tensor(1.)

Run the next code if you ought to see what the image we just loaded looks like.

# Codeblock 15   
plt.imshow(images[0].squeeze(), cmap='gray')
plt.show()
Figure 9. Output from Codeblock 15 [3].

Noise Scheduler

On this section we’re going to speak about how the forward and backward diffusion are performed, which the method essentially involves adding or removing noise little by little at each timestep. It’s essential to know that we principally need a uniform amount of noise across all timesteps, where within the forward diffusion the image needs to be completely filled with noise exactly at timestep 1000, while within the backward diffusion, we’ve got to get the completely clear image at timestep 0. Hence, we’d like something to manage the noise amount for every timestep. Later on this section, I’m going to implement a category named NoiseScheduler to achieve this. — This can probably be probably the most mathy section of this text, as I’ll display many equations here. But don’t worry about that since we’ll deal with implementing these equations fairly than discussing the mathematical derivations.

Now let’s take a take a look at the equations in Figure 10 which I’ll implement within the __init__() approach to the NoiseScheduler class below.

Figure 10. The equations we’d like to implement within the __init__() approach to the NoiseScheduler class [3].
# Codeblock 16a
class NoiseScheduler:
    def __init__(self):
        self.betas = torch.linspace(BETA_START, BETA_END, NUM_TIMESTEPS)  #(1)
        self.alphas = 1. - self.betas
        self.alphas_cum_prod = torch.cumprod(self.alphas, dim=0)
        self.sqrt_alphas_cum_prod = torch.sqrt(self.alphas_cum_prod)
        self.sqrt_one_minus_alphas_cum_prod = torch.sqrt(1. - self.alphas_cum_prod)

The above code works by creating multiple sequences of numbers, all of them are principally controlled by BETA_START (0.0001), BETA_END (0.02), and NUM_TIMESTEPS (1000). The primary sequence we’d like to instantiate is the betas itself, which is completed using torch.linspace() (#(1)). What it essentially does is that it generates a 1-dimensional tensor of length 1000 ranging from 0.0001 to 0.02, where each element on this tensor corresponds to a single timestep. The interval between each element is uniform, allowing us to generate uniform amount of noise throughout all timesteps as well. With this betas tensor, we then compute alphas, alphas_cum_prod, sqrt_alphas_cum_prod and sqrt_one_minus_alphas_cum_prod based on the 4 equations in Figure 10. Afterward, these tensors will act as the idea of how the noise is generated or removed throughout the diffusion process.

Diffusion is generally done in a sequential manner. Nonetheless, the forward diffusion process is deterministic, hence we are able to derive the unique equation right into a closed form in order that we are able to obtain the noise at a particular timestep without having to iteratively add noise from the very starting. The Figure 11 below shows what the closed type of the forward diffusion looks like, where represents the unique image while epsilon (denotes a picture made up of random Gaussian noise. We are able to consider this equation as a weighted combination, where we mix the clear image and the noise in keeping with weights determined by the timestep, leading to a picture with a certain quantity of noise.

Figure 11. The closed type of the forward diffusion process [3].

The implementation of this equation will be seen in Codeblock 16b. On this forward_diffusion() method, and are denoted as original and noise. Here you should consider that these two input variables are images, whereas sqrt_alphas_cum_prod_t and sqrt_one_minus_alphas_cum_prod_t are scalars. Thus, we’d like to regulate the form of those two scalars (#(1) and #(2)) in order that the operation at line #(3) will be performed. The noisy_image variable goes to be the output of this function, which I suppose the name is self-explanatory.

# Codeblock 16b
    def forward_diffusion(self, original, noise, t):
        sqrt_alphas_cum_prod_t = self.sqrt_alphas_cum_prod[t]
        sqrt_alphas_cum_prod_t = sqrt_alphas_cum_prod_t.to(DEVICE).view(-1, 1, 1, 1)  #(1)
        
        sqrt_one_minus_alphas_cum_prod_t = self.sqrt_one_minus_alphas_cum_prod[t]
        sqrt_one_minus_alphas_cum_prod_t = sqrt_one_minus_alphas_cum_prod_t.to(DEVICE).view(-1, 1, 1, 1)  #(2)
        
        noisy_image = sqrt_alphas_cum_prod_t * original + sqrt_one_minus_alphas_cum_prod_t * noise  #(3)
        
        return noisy_image

Now let’s speak about backward diffusion. Actually, this one is a little more complicated than the forward diffusion since we’d like three more equations here. Before I offer you these equations, let me show you the implementation first. See the Codeblock 16c below.

# Codeblock 16c
    def backward_diffusion(self, current_image, predicted_noise, t):  #(1)
        denoised_image = (current_image - (self.sqrt_one_minus_alphas_cum_prod[t] * predicted_noise)) / self.sqrt_alphas_cum_prod[t]  #(2)
        denoised_image = 2 * (denoised_image - denoised_image.min()) / (denoised_image.max() - denoised_image.min()) - 1  #(3)
        
        current_prediction = current_image - ((self.betas[t] * predicted_noise) / (self.sqrt_one_minus_alphas_cum_prod[t]))  #(4)
        current_prediction = current_prediction / torch.sqrt(self.alphas[t])  #(5)
        
        if t == 0:  #(6)
            return current_prediction, denoised_image
        
        else:
            variance = (1 - self.alphas_cum_prod[t-1]) / (1. - self.alphas_cum_prod[t])  #(7)
            variance = variance * self.betas[t]  #(8)
            sigma = variance ** 0.5
            z = torch.randn(current_image.shape).to(DEVICE)
            current_prediction = current_prediction + sigma*z
            
            return current_prediction, denoised_image

Later within the inference phase, the backward_diffusion() method can be called inside a loop that iterates NUM_TIMESTEPS (1000) times, ranging from = 999, continued with = 998, and so forth all of the option to = 0. This function is responsible to remove the noise from the image iteratively based on the current_image (the image produced by the previous denoising step), the predicted_noise (the noise predicted by U-Net within the previous step), and the timestep information t (#(1)). In each iteration, noise removal is completed using the equation shown in Figure 12, which in Codeblock 16c, this corresponds to lines #(4-5).

Figure 12. The equation used for removing noise from the image [3].

So long as we haven’t reached = 0, we’ll compute the variance based on the equation in Figure 13 (#(7–8)). This variance will then be used to introduce one other controlled noise to simulate the stochasticity within the backward diffusion process for the reason that noise removal equation in Figure 12 is a deterministic approximation. This is actually also the rationale that we don’t calculate the variance once we reached = 0 (#(6)) since we not must add more noise because the image is totally clear already.

Figure 13. The equation used to calculate variance for introducing controlled noise [3].

Different from current_prediction which goals to estimate the image of the previous timestep (), the target of the denoised_image tensor is to reconstruct the unique image (). Due to these different objectives, we’d like a separate equation to compute denoised_image, which will be seen in Figure 14 below. The implementation of the equation itself is written at line #(2–3).

Figure 14. The equation for reconstructing the unique image [3].

Now let’s test the NoiseScheduler class we created above. In the next codeblock, I instantiate a NoiseScheduler object and print out the attributes related to it, that are all computed using the equation in Figure 10 based on the values stored within the betas attribute. Keep in mind that the actual length of those tensors is NUM_TIMESTEPS (1000), but here I only print out the primary 6 elements.

# Codeblock 17
noise_scheduler = NoiseScheduler()

print(f'betastttt: {noise_scheduler.betas[:6]}')
print(f'alphastttt: {noise_scheduler.alphas[:6]}')
print(f'alphas_cum_prodttt: {noise_scheduler.alphas_cum_prod[:6]}')
print(f'sqrt_alphas_cum_prodtt: {noise_scheduler.sqrt_alphas_cum_prod[:6]}')
print(f'sqrt_one_minus_alphas_cum_prodt: {noise_scheduler.sqrt_one_minus_alphas_cum_prod[:6]}')
# Codeblock 17 Output
betas                          : tensor([1.0000e-04, 1.1992e-04, 1.3984e-04, 1.5976e-04, 1.7968e-04, 1.9960e-04])
alphas                         : tensor([0.9999, 0.9999, 0.9999, 0.9998, 0.9998, 0.9998])
alphas_cum_prod                : tensor([0.9999, 0.9998, 0.9996, 0.9995, 0.9993, 0.9991])
sqrt_alphas_cum_prod           : tensor([0.9999, 0.9999, 0.9998, 0.9997, 0.9997, 0.9996])
sqrt_one_minus_alphas_cum_prod : tensor([0.0100, 0.0148, 0.0190, 0.0228, 0.0264, 0.0300])

The above output indicates that our __init__() method works as expected. Next, we’re going to test the forward_diffusion() method. In case you return to Figure 16b, you will notice that forward_diffusion() accepts three inputs: original image, noise image and the timestep number. Let’s just use the image from the MNIST dataset we loaded earlier for the primary input (#(1)) and a random Gaussian noise of the very same size for the second (#(2)). Run the Codeblock 18 below to see what these two images appear to be.

# Codeblock 18
image = images[0]  #(1)
noise = torch.randn_like(image)  #(2)

plt.imshow(image.squeeze(), cmap='gray')
plt.show()
plt.imshow(noise.squeeze(), cmap='gray')
plt.show()
Figure 15. The 2 images for use as the unique (left) and the noise image (right). The one on the left is identical image I showed earlier in Figure 9 [3].

As we already got the image and the noise ready, what we’d like to do afterwards is to pass them to the forward_diffusion() method alongside the . I actually tried to run the Codeblock 19 below multiple times with = 50, 100, 150, and so forth as much as = 300. You possibly can see in Figure 16 that the image becomes less clear because the parameter increases. On this case, the image goes to be completely filled by noise when the is about to 999.

# Codeblock 19
noisy_image_test = noise_scheduler.forward_diffusion(image.to(DEVICE), noise.to(DEVICE), t=50)

plt.imshow(noisy_image_test[0].squeeze().cpu(), cmap='gray')
plt.show()
Figure 16. The results of the forward diffusion process at t=50, 100, 150, and so forth until t=300 [3].

Unfortunately, we cannot test the backward_diffusion() method since this process requires us to have our U-Net model trained. So, let’s just skip this part for now. I’ll show you the way we are able to actually use this function later within the inference phase.


Training

Because the U-Net model, MNIST dataset, and the noise scheduler are ready, we are able to now prepare a function for training. Before we do this, I instantiate the model and the noise scheduler in Codeblock 20 below.

# Codeblock 20
model = UNet().to(DEVICE)
noise_scheduler = NoiseScheduler()

Your entire training procedure is implemented within the train() function shown in Codeblock 21. Before doing anything, we first initialize the optimizer and the loss function, which on this case we use Adam and MSE, respectively (#(1–2)). What we principally wish to do here is to coach the model such that it can have the option to predict the noise contained within the input image, which afterward, the expected noise can be used as the idea of the denoising process within the backward diffusion stage. To truly train the model, we first must perform forward diffusion using the code at line #(6). This noising process can be done on the images tensor (#(3)) using the random noise generated at line #(4). Next, we take random number somewhere between 0 and NUM_TIMESTEPS (1000) for the t (#(5)), which is actually done because we wish our model to see images of various noise levels as an approach to enhance generalization. Because the noisy images have been generated, we then pass it through the U-Net model alongside the chosen t (#(7)). The input t here is helpful for the model because it indicates the present noise level within the image. Lastly, the loss function we initialized earlier is responsible to compute the difference between the actual noise and the expected noise from the unique image (#(8)). So, the target of this training is essentially to make the expected noise as similar as possible to the noise we generated at line #(4).

# Codeblock 21
def train():
    optimizer = Adam(model.parameters(), lr=LEARNING_RATE)  #(1)
    loss_function = nn.MSELoss()  #(2)
    losses = []
    
    for epoch in range(NUM_EPOCHS):
        print(f'Epoch no {epoch}')
        
        for images, _ in tqdm(loader):
            
            optimizer.zero_grad()

            images = images.float().to(DEVICE)  #(3)
            noise = torch.randn_like(images)  #(4)
            t = torch.randint(0, NUM_TIMESTEPS, (BATCH_SIZE,))  #(5)

            noisy_images = noise_scheduler.forward_diffusion(images, noise, t).to(DEVICE)  #(6)
            predicted_noise = model(noisy_images, t)  #(7)
            loss = loss_function(predicted_noise, noise)  #(8)
            
            losses.append(loss.item())
            loss.backward()
            optimizer.step()

    return losses

Now let’s run the above training function using the codeblock below. Sit back and loosen up while waiting the training completes. In my case, I used Kaggle Notebook with Nvidia GPU P100 turned on, and it took around 45 minutes to complete.

# Codeblock 22
losses = train()

If we take a take a look at the loss graph, it looks like our model learned pretty much as the worth is mostly decreasing over time with a rapid drop at early stages and a more stable (yet still decreasing) trend within the later stages. So, I believe we are able to expect good results later within the inference phase.

# Codeblock 23
plt.plot(losses)
Figure 17. How the loss value decreases because the training goes [3].

Inference

At this point we’ve got already got our model trained, so we are able to now perform inference on it. Take a look at the Codeblock 24 below to see how I implement the inference() function.

# Codeblock 24
def inference():

    denoised_images = []  #(1)
    
    with torch.no_grad():  #(2)
        current_prediction = torch.randn((64, NUM_CHANNELS, IMAGE_SIZE, IMAGE_SIZE)).to(DEVICE)  #(3)
        
        for i in tqdm(reversed(range(NUM_TIMESTEPS))):  #(4)
            predicted_noise = model(current_prediction, torch.as_tensor(i).unsqueeze(0))  #(5)
            current_prediction, denoised_image = noise_scheduler.backward_diffusion(current_prediction, predicted_noise, torch.as_tensor(i))  #(6)

            if i%100 == 0:  #(7)
                denoised_images.append(denoised_image)
            
        return denoised_images

At the road marked with #(1) I initialize an empty list which can be used to store the denoising result every 100 timesteps (#(7)). This can later allow us to see how the backward diffusion goes. The actual inference process is encapsulated inside torch.no_grad() (#(2)). Keep in mind that in diffusion models we generate images from a very random noise, which we assume that these images are initially at = 999. To implement this, we are able to simply use torch.randn() as shown at line #(3). Here we initialize a tensor of size 64×1×28×28, indicating that we’re about to generate 64 images concurrently. Next, we write a for loop that iterates backwards ranging from 999 to 0 (#(4)). Inside this loop, we feed the present image and the timestep because the input for the trained U-Net and let it predict the noise (#(5)). The actual backward diffusion is then performed at line #(6). At the top of the iteration, we should always get recent images just like those we’ve got in our dataset. Now let’s call the inference() function in the next codeblock.

# Codeblock 25
denoised_images = inference()

Because the inference accomplished, we are able to now see what the resulting images appear to be. The Codeblock 26 below is used to display the primary 42 images we just generated.

# Codeblock 26
fig, axes = plt.subplots(ncols=7, nrows=6, figsize=(10, 8))

counter = 0

for i in range(6):
    for j in range(7):
        axes[i,j].imshow(denoised_images[-1][counter].squeeze().detach().cpu().numpy(), cmap='gray')  #(1)
        axes[i,j].get_xaxis().set_visible(False)
        axes[i,j].get_yaxis().set_visible(False)
        counter += 1

plt.show()
Figure 18. The photographs generated by the diffusion model trained on the MNIST Handwritten Digit dataset [3].

If we take a take a look at the above codeblock, you may see that the indexer of [-1] at line #(1) indicates that we only display the pictures from the last iteration (which corresponds to timestep 0). That is the rationale that the pictures you see in Figure 18 are all free from noise. I do acknowledge that this won’t be the very best of a result since not all of the generated images are valid digit numbers. — But hey, this as a substitute indicates that these images will not be merely duplicates from the unique dataset.

Here we may visualize the backward diffusion process using the Codeblock 27 below. You possibly can see within the resulting output in Figure 19 that we initially start from an entire random noise, which progressively disappears as we move to the precise.

# Codeblock 27
fig, axes = plt.subplots(ncols=10, figsize=(24, 8))

sample_no = 0
timestep_no = 0

for i in range(10):
    axes[i].imshow(denoised_images[timestep_no][sample_no].squeeze().detach().cpu().numpy(), cmap='gray')
    axes[i].get_xaxis().set_visible(False)
    axes[i].get_yaxis().set_visible(False)
    timestep_no += 1

plt.show()
Figure 19. What the image looks like at timestep 900, 800, 700 and so forth until timestep 0 [3].

Ending

There are many directions you may go from here. First, you may probably must tweak the parameter configurations in Codeblock 2 for those who want higher results. Second, additionally it is possible to switch the U-Net model by implementing attention layers along with the stack of convolution layers we utilized in the downsampling and the upsampling stages. This doesn’t guarantee you to acquire higher results especially for an easy dataset like this, however it’s definitely value trying. Third, you too can try to make use of a more complex dataset if you ought to challenge yourself.

In terms of practical applications, there are literally numerous things you may do with diffusion models. The best one is perhaps for data augmentation. With diffusion model, we are able to easily generate recent images from a particular data distribution. For instance, suppose we’re working on a picture classification project, however the variety of images within the classes are imbalanced. To deal with this problem, it is feasible for us to take the pictures from the minority class and feed them right into a diffusion model. By doing so, we are able to ask the trained diffusion model to generate plenty of samples from that class as many as we wish.

And well, that’s just about every part in regards to the theory and the implementation of diffusion model. Thanks for reading, I hope you learn something recent today!


References

[1] Jascha Sohl-Dickstein .Deep Unsupervised Learning using Nonequilibrium Thermodynamics. Arxiv. https://arxiv.org/pdf/1503.03585 [Accessed December 27, 2024].

[2] Jonathan Ho . Denoising Diffusion Probabilistic Models. Arxiv. https://arxiv.org/pdf/2006.11239 [Accessed December 27, 2024].

[3] Image created originally by writer.

[4] Olaf Ronneberger . U-Net: Convolutional Networks for Biomedical
 Image Segmentation. Arxiv. https://arxiv.org/pdf/1505.04597 [Accessed December 27, 2024].

[5] Yann LeCun . The MNIST Database of Handwritten Digits. https://yann.lecun.com/exdb/mnist/ [Accessed December 30, 2024] (Creative Commons Attribution-Share Alike 3.0 license).

[6] Ashish Vaswani . Attention Is All You Need. Arxiv. https://arxiv.org/pdf/1706.03762 [Accessed September 29, 2024].

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x