728x90
728x90
개념 이해하기

Latent Diffusion은 기존 Diffusion Model의 연산 부담을 줄이기 위해 나온 모델이다.
기존 diffusion model은 고해상도 이미지 공간에서 직접 노이즈를 제거해야하다보니 계산량이 너무 많았다. latent diffusion은 이에 주목하여 이미지를 먼저 autoencoder를 통해 압축된 latent space로 옮겨서 latent 공간에서 diffusion을 수행한 뒤 마지막에 다시 이미지를 복원한다.
u net이 노이즈 제거시에 latent값과 추가조건(ex: text embedding)을 함게 고려하면서 노이즈를 제거할 수 있기 때문에 conditioning이 가능하다
코드 뜯어보기
사전 학습된 Stable Diffusion 모델이 돌아가도록 하는 코드를 통해 모델을 좀 더 확실하게 이해해보자
아래의 코드는 이미 학습된 모듈들을 들고와서 실행시킬 수 있게 짠 코드이다.
import torch
import torch.nn as nn
from diffusers import StableDiffusionPipeline, DDIMScheduler
class StableDiffusion(nn.Module):
def __init__(self, device, fp16, vram_O, sd_version='2.1', hf_key=None, t_range=[0.02, 0.98]):
#############################################################
# 상속받은 nn.Module 초기화: PyTorch에서 커스텀 모델 만들 때 항상 호출해야 함
#############################################################
super().__init__()
self.device = device # 모델이 올라갈 디바이스 (GPU 등)
self.sd_version = sd_version # 사용할 Stable Diffusion 버전 (기본은 2.1)
print(f'[INFO] loading stable diffusion...')
# huggingface에서 불러올 모델 키 (Stable Diffusion 2.1 base 버전)
model_key = "stabilityai/stable-diffusion-2-1-base"
# 정밀도 설정 (fp16 = float16, 아니면 float32)
self.precision_t = torch.float16 if fp16 else torch.float32
# huggingface diffusers 라이브러리를 사용해 모델 전체를 불러옴
pipe = StableDiffusionPipeline.from_pretrained(model_key, torch_dtype=self.precision_t)
pipe.to(device)
# Stable Diffusion 구성요소 개별 추출
self.vae = pipe.vae # 이미지를 latent로 바꾸거나 복원할 때 사용
self.tokenizer = pipe.tokenizer # 텍스트를 토큰으로 변환
self.text_encoder = pipe.text_encoder # 텍스트를 latent vector로 인코딩
self.unet = pipe.unet # 노이즈 제거의 핵심 모델 (diffusion의 중심)
# Diffusion 과정을 담당하는 scheduler 설정 (DDIM 사용)
# DDIM은 DDPM보다 빠르게 샘플링 가능하고 deterministic하게 생성 가능함
self.scheduler = DDIMScheduler.from_pretrained(model_key, subfolder="scheduler", torch_dtype=self.precision_t)
# 파이프라인 전체는 더 이상 필요 없으므로 메모리 절약을 위해 제거
del pipe
# scheduler의 total timestep 개수 (기본적으로 1000)
self.num_train_timesteps = self.scheduler.config.num_train_timesteps
#############################################################
# t_range 비율을 이용해 실제 timestep 범위(min_step ~ max_step)를 계산
# 예: t_range = [0.02, 0.98]일 때 → 1000 * 0.02 = 20, 1000 * 0.98 = 980
#############################################################
self.min_step = int(self.num_train_timesteps * t_range[0])
self.max_step = int(self.num_train_timesteps * t_range[1])
# alpha 값은 각 timestep에서의 noise scale 정보 누적곱 (sampling에 활용됨)
self.alphas = self.scheduler.alphas_cumprod.to(self.device)
print(f'[INFO] loaded stable diffusion!')
# 이미지를 VAE의 latent로 인코딩
def encode_imgs(self, imgs):
# imgs: [B, 3, H, W]
imgs = 2 * imgs - 1
posterior = self.vae.encode(imgs).latent_dist
latents = posterior.sample() * self.vae.config.scaling_factor
return latents
def get_latents(self, pred_rgb, as_latent=True): #pred_rgb: 모델이 예측한 RGB 결과
if as_latent:
# pred_rgb 자체를 latent로 간주하고, 해상도를 (64, 64)로 줄인 후 (-1, 1) 범위로 정규화
latents = F.interpolate(pred_rgb, size=(64, 64), mode='bilinear', align_corners=False)
latents = latents * 2 - 1 # (0, 1) → (-1, 1)
else:
# VAE encoder는 (512, 512) 해상도만 받으므로, 먼저 보정
pred_rgb_512 = F.interpolate(pred_rgb, size=(512, 512), mode='bilinear', align_corners=False)
latents = self.encode_imgs(pred_rgb_512) # VAE encode (이미지 → latent)
def sample(self, prompts, num_steps=50, guidance_scale=7.5):
"""
prompts: List[str], 예: ["a cat", "a dog"]
"""
batch_size = len(prompts)
# (1) 텍스트 인코딩 (conditional + unconditional)
text_embeddings = self.get_text_embeddings(prompts).to(dtype=self.precision_t)
uncond_embeddings = self.get_text_embeddings([""] * batch_size).to(dtype=self.precision_t)
text_embeddings = torch.cat([uncond_embeddings, text_embeddings], dim=0) # [2B, 77, D]
latents = torch.randn((batch_size, 4, 64, 64), device=self.device, dtype=self.precision_t)
# (3) 스케줄러 초기화
self.scheduler.set_timesteps(num_steps)
for i, t in enumerate(self.scheduler.timesteps):
# 현재 timestep에 맞춰 latent 준비
latent_model_input = self.scheduler.scale_model_input(latents, t)
latent_model_input = torch.cat([latent_model_input] * 2, dim=0)
# ✔ 수정된 부분
tt = torch.full((latent_model_input.shape[0],), t, device=self.device, dtype=torch.long)
# (5) 노이즈 예측
noise_pred = self.unet(latent_model_input, tt, encoder_hidden_states=text_embeddings).sample
# (6) uncond / cond 분리 + guidance 적용
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# (7) latent 업데이트
latents = self.scheduler.step(noise_pred, t, latents).prev_sample
# (8) VAE 디코더로 latent → 이미지 복원
imgs = self.vae.decode(latents / self.vae.config.scaling_factor).sample
return imgs
(실행시키는 코드 포함) 전체 코드
import torch
import torch.nn as nn
from diffusers import StableDiffusionPipeline, DDIMScheduler
class StableDiffusion(nn.Module):
def __init__(self, device, fp16, vram_O, sd_version='2.1', hf_key=None, t_range=[0.02, 0.98]):
#############################################################
# 상속받은 nn.Module 초기화: PyTorch에서 커스텀 모델 만들 때 항상 호출해야 함
#############################################################
super().__init__()
self.device = device # 모델이 올라갈 디바이스 (GPU 등)
self.sd_version = sd_version # 사용할 Stable Diffusion 버전 (기본은 2.1)
print(f'[INFO] loading stable diffusion...')
# huggingface에서 불러올 모델 키 (Stable Diffusion 2.1 base 버전)
model_key = "stabilityai/stable-diffusion-2-1-base"
# 정밀도 설정 (fp16 = float16, 아니면 float32)
self.precision_t = torch.float16 if fp16 else torch.float32
# huggingface diffusers 라이브러리를 사용해 모델 전체를 불러옴
pipe = StableDiffusionPipeline.from_pretrained(model_key, torch_dtype=self.precision_t)
pipe.to(device)
# Stable Diffusion 구성요소 개별 추출
self.vae = pipe.vae # 이미지를 latent로 바꾸거나 복원할 때 사용
self.tokenizer = pipe.tokenizer # 텍스트를 토큰으로 변환
self.text_encoder = pipe.text_encoder # 텍스트를 latent vector로 인코딩
self.unet = pipe.unet # 노이즈 제거의 핵심 모델 (diffusion의 중심)
# Diffusion 과정을 담당하는 scheduler 설정 (DDIM 사용)
# DDIM은 DDPM보다 빠르게 샘플링 가능하고 deterministic하게 생성 가능함
self.scheduler = DDIMScheduler.from_pretrained(model_key, subfolder="scheduler", torch_dtype=self.precision_t)
# 파이프라인 전체는 더 이상 필요 없으므로 메모리 절약을 위해 제거
del pipe
# scheduler의 total timestep 개수 (기본적으로 1000)
self.num_train_timesteps = self.scheduler.config.num_train_timesteps
#############################################################
# t_range 비율을 이용해 실제 timestep 범위(min_step ~ max_step)를 계산
# 예: t_range = [0.02, 0.98]일 때 → 1000 * 0.02 = 20, 1000 * 0.98 = 980
#############################################################
self.min_step = int(self.num_train_timesteps * t_range[0])
self.max_step = int(self.num_train_timesteps * t_range[1])
# alpha 값은 각 timestep에서의 noise scale 정보 누적곱 (sampling에 활용됨)
self.alphas = self.scheduler.alphas_cumprod.to(self.device)
print(f'[INFO] loaded stable diffusion!')
# 이미지를 VAE의 latent로 인코딩
def encode_imgs(self, imgs):
# imgs: [B, 3, H, W]
imgs = 2 * imgs - 1
posterior = self.vae.encode(imgs).latent_dist
latents = posterior.sample() * self.vae.config.scaling_factor
return latents
def get_latents(self, pred_rgb, as_latent=True): #pred_rgb: 모델이 예측한 RGB 결과
if as_latent:
# pred_rgb 자체를 latent로 간주하고, 해상도를 (64, 64)로 줄인 후 (-1, 1) 범위로 정규화
latents = F.interpolate(pred_rgb, size=(64, 64), mode='bilinear', align_corners=False)
latents = latents * 2 - 1 # (0, 1) → (-1, 1)
else:
# VAE encoder는 (512, 512) 해상도만 받으므로, 먼저 보정
pred_rgb_512 = F.interpolate(pred_rgb, size=(512, 512), mode='bilinear', align_corners=False)
latents = self.encode_imgs(pred_rgb_512) # VAE encode (이미지 → latent)
def get_text_embeddings(self, prompts):
inputs = self.tokenizer(prompts, padding='max_length', max_length=77, return_tensors='pt').to(self.device)
return self.text_encoder(**inputs).last_hidden_state
@torch.no_grad()
def sample(self, prompts, num_steps=50, guidance_scale=7.5):
"""
prompts: List[str], 예: ["a cat", "a dog"]
"""
batch_size = len(prompts)
# (1) 텍스트 인코딩 (conditional + unconditional)
text_embeddings = self.get_text_embeddings(prompts).to(dtype=self.precision_t)
uncond_embeddings = self.get_text_embeddings([""] * batch_size).to(dtype=self.precision_t)
text_embeddings = torch.cat([uncond_embeddings, text_embeddings], dim=0) # [2B, 77, D]
latents = torch.randn((batch_size, 4, 64, 64), device=self.device, dtype=self.precision_t)
# (3) 스케줄러 초기화
self.scheduler.set_timesteps(num_steps)
for i, t in enumerate(self.scheduler.timesteps):
# 현재 timestep에 맞춰 latent 준비
latent_model_input = self.scheduler.scale_model_input(latents, t)
latent_model_input = torch.cat([latent_model_input] * 2, dim=0)
# ✔ 수정된 부분
tt = torch.full((latent_model_input.shape[0],), t, device=self.device, dtype=torch.long)
# (5) 노이즈 예측
noise_pred = self.unet(latent_model_input, tt, encoder_hidden_states=text_embeddings).sample
# (6) uncond / cond 분리 + guidance 적용
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
# (7) latent 업데이트
latents = self.scheduler.step(noise_pred, t, latents).prev_sample
# (8) VAE 디코더로 latent → 이미지 복원
imgs = self.vae.decode(latents / self.vae.config.scaling_factor).sample
return imgs
model = StableDiffusion(device='cuda', fp16=True, vram_O=True)
prompts = ["a futuristic city", "a cat wearing sunglasses", "a poor postgraduate student"]
images = model.sample(prompts)
from torchvision.transforms import ToPILImage
to_pil = ToPILImage()
for i, img_tensor in enumerate(images):
img_pil = to_pil(img_tensor.cpu().clamp(0, 1))
img_pil.save(f"generated_image_{i}.png") # 파일로 저장
728x90
728x90
'수업정리 > 딥러닝 이론' 카테고리의 다른 글
딥러닝 면접 기초 개념 (0) | 2025.03.19 |
---|---|
SVM(State-Vector Machine)이란? (0) | 2024.09.30 |
VQ-VAE 이해하기 (0) | 2024.08.16 |
Supervised Contrastive Learning 코드 분석 (0) | 2024.05.28 |
Triplet Loss 이해하기(개념, 수식, 주의사항) (0) | 2024.05.19 |