navis
Video Upscaling (VRT) 본문
1. 목차
- 환경설정
- 모델 - VRT
- 코드변경
- 기존 main_test_vrt.py 파일 input 데이터 처리하게 수정
- Fast API 통신을 위한 main.py 작성
- 추론 결과
- 결과 비교
- 최종 결과
2. 환경 설정
- AI 모델 테스트 환경
- Ubuntu 22.04(워크스테이션)
- Anaconda
- VS Code
- Python 3.8.19
- PyTorch 1.10
- CUDA 10.2
- 환경설정
git clone <https://github.com/JingyunLiang/VRT>
cd VRT
pip install -r requirements.txt
pip install matplotlib==3.7.5
pip uninstall torch torchaudio torchvision
pip install torch==1.10.1+cu102 torchvision==0.11.2+cu102 torchaudio==0.10.1 -f <https://download.pytorch.org/whl/cu102/torch_stable.html>
- Fast Server 설정
- Fast API
- OpenCV 4.9
- FFmpeg
- 환경설정
pip install fastapi uvicorn aiofiles
conda install -c conda-forge ffmpeg
3. VRT
https://github.com/JingyunLiang/VRT
1. 모델 (VRT)
Video restoration (e.g., video super-resolution) aims to restore high-quality frames from low-quality frames. Different from single image restoration, video restoration generally requires to utilize temporal information from multiple adjacent but usually misaligned video frames. Existing deep methods generally tackle with this by exploiting a sliding window strategy or a recurrent architecture, which either is restricted by frame-by-frame restoration or lacks long-range modelling ability. In this paper, we propose a Video Restoration Transformer (VRT) with parallel frame prediction and long-range temporal dependency modelling abilities. More specifically, VRT is composed of multiple scales, each of which consists of two kinds of modules: temporal mutual self attention (TMSA) and parallel warping. TMSA divides the video into small clips, on which mutual attention is applied for joint motion estimation, feature alignment and feature fusion, while self-attention is used for feature extraction. To enable cross-clip interactions, the video sequence is shifted for every other layer. Besides, parallel warping is used to further fuse information from neighboring frames by parallel feature warping. Experimental results on three tasks, including video super-resolution, video deblurring and video denoising, demonstrate that VRT outperforms the state-of-the-art methods by large margins (up to 2.16 dB) on nine benchmark datasets.
4. 코드 변경
main_test_vrt.py
import argparse
import cv2
import glob
import os
import torch
import requests
import numpy as np
from os import path as osp
from collections import OrderedDict
from torch.utils.data import DataLoader, Dataset
from datetime import datetime
from models.network_vrt import VRT as net
from utils import utils_image as util
from data.dataset_video_test import VideoRecurrentTestDataset, VideoTestVimeo90KDataset, \
SingleVideoRecurrentTestDataset, VFI_DAVIS, VFI_UCF101, VFI_Vid4
def resize_frames(frames, size):
resized_frames = []
for frame in frames:
frame = cv2.resize(frame, size)
resized_frames.append(frame)
return resized_frames
class FrameImageDataset(Dataset):
def __init__(self, folder_path, size=(128, 128)):
self.folder_path = folder_path
self.size = size
self.frames = self._load_frames(folder_path)
print(f'Loaded {len(self.frames)} frames from {folder_path}')
def _load_frames(self, folder_path):
frame_files = sorted([f for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))])
frames = []
for frame_file in frame_files:
frame_path = os.path.join(folder_path, frame_file)
frame = cv2.imread(frame_path)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(frame)
frames = resize_frames(frames, self.size)
return frames
def __len__(self):
return len(self.frames)
def __getitem__(self, idx):
frame = self.frames[idx]
frame = np.transpose(frame, (2, 0, 1)) # HWC to CHW
frame = frame / 255.0 # Normalize to [0, 1]
return torch.from_numpy(frame).float()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--task', type=str, default='001_VRT_videosr_bi_REDS_6frames', help='tasks: 001 to 008')
parser.add_argument('--sigma', type=int, default=0, help='noise level for denoising: 10, 20, 30, 40, 50')
parser.add_argument('--folder_lq', type=str, default='input/lq_videos',
help='input low-quality test video folder')
parser.add_argument('--file_lq', type=str, default=None,
help='input low-quality test video file')
parser.add_argument('--folder_gt', type=str, default=None,
help='input ground-truth test video folder')
parser.add_argument('--tile', type=int, nargs='+', default=[40, 128, 128],
help='Tile size, [0,0,0] for no tile during testing (testing as a whole)')
parser.add_argument('--tile_overlap', type=int, nargs='+', default=[2, 20, 20],
help='Overlapping of different tiles')
parser.add_argument('--num_workers', type=int, default=16, help='number of workers in data loading')
parser.add_argument('--save_result', action='store_true', help='save resulting image')
parser.add_argument('--frame_size', type=int, nargs=2, default=[128, 128],
help='Frame size for resizing')
args = parser.parse_args()
# define model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = prepare_model_dataset(args)
model.eval()
model = model.to(device)
print(f'Low-quality video folder: {args.folder_lq}')
test_set = FrameImageDataset(args.folder_lq, size=tuple(args.frame_size)) # �\� x� ��
print(f'Test set length: {len(test_set)}')
test_loader = DataLoader(dataset=test_set, num_workers=args.num_workers, batch_size=1, shuffle=False)
save_dir = f'results/{args.task}'
if args.save_result:
os.makedirs(save_dir, exist_ok=True)
test_results = OrderedDict()
test_results['psnr'] = []
test_results['ssim'] = []
test_results['psnr_y'] = []
test_results['ssim_y'] = []
print(f'Number of batches in test_loader: {len(test_loader)}')
assert len(test_loader) != 0, f'No dataset found at {args.folder_lq}'
for idx, batch in enumerate(test_loader):
lq = batch.to(device).unsqueeze(0) # Add batch dimension
folder = 'results'
gt = None
# inference
with torch.no_grad():
output = test_video(lq, model, args)
test_results_folder = OrderedDict()
test_results_folder['psnr'] = []
test_results_folder['ssim'] = []
test_results_folder['psnr_y'] = []
test_results_folder['ssim_y'] = []
for i in range(output.shape[1]):
# save image
img = output[:, i, ...].data.squeeze().float().cpu().clamp_(0, 1).numpy()
if img.ndim == 3:
img = np.transpose(img[[2, 1, 0], :, :], (1, 2, 0)) # CHW-RGB to HCW-BGR
img = (img * 255.0).round().astype(np.uint8) # float32 to uint8
if args.save_result:
seq_ = f'frame_{idx:04d}' # 파일 이름에 인덱스 추가
os.makedirs(f'{save_dir}/{folder}', exist_ok=True)
cv2.imwrite(f'{save_dir}/{folder}/{seq_}.png', img)
print('Testing {:20s} ({:2d}/{})'.format(folder, idx, len(test_loader)))
print('\n{} \n-- Testing completed'.format(save_dir))
def prepare_model_dataset(args):
''' prepare model and dataset according to args.task. '''
# define model
if args.task == '001_VRT_videosr_bi_REDS_6frames':
model = net(upscale=4, img_size=[6, 128, 128], window_size=[6, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4, 4, 4],
indep_reconsts=[11, 12], embed_dims=[120, 120, 120, 120, 120, 120, 120, 180, 180, 180, 180, 180, 180],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=2, deformable_groups=12)
datasets = ['REDS4']
args.scale = 4
args.window_size = [6, 8, 8]
args.nonblind_denoising = False
elif args.task == '002_VRT_videosr_bi_REDS_16frames':
model = net(upscale=4, img_size=[16, 128, 128], window_size=[8, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4, 4, 4],
indep_reconsts=[11, 12], embed_dims=[120, 120, 120, 120, 120, 120, 120, 180, 180, 180, 180, 180, 180],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=6, deformable_groups=24)
datasets = ['REDS4']
args.scale = 4
args.window_size = [8, 8, 8]
args.nonblind_denoising = False
elif args.task in ['003_VRT_videosr_bi_Vimeo_7frames', '004_VRT_videosr_bd_Vimeo_7frames']:
model = net(upscale=4, img_size=[8, 128, 128], window_size=[8, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4, 4, 4],
indep_reconsts=[11, 12], embed_dims=[120, 120, 120, 120, 120, 120, 120, 180, 180, 180, 180, 180, 180],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=4, deformable_groups=16)
datasets = ['Vid4']
args.scale = 4
args.window_size = [8, 8, 8]
args.nonblind_denoising = False
elif args.task in ['005_VRT_videodeblurring_DVD']:
model = net(upscale=1, img_size=[6, 192, 192], window_size=[6, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4],
indep_reconsts=[9, 10], embed_dims=[96, 96, 96, 96, 96, 96, 96, 120, 120, 120, 120],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=2, deformable_groups=16)
datasets = ['DVD10']
args.scale = 1
args.window_size = [6, 8, 8]
args.nonblind_denoising = False
elif args.task in ['006_VRT_videodeblurring_GoPro']:
model = net(upscale=1, img_size=[6, 192, 192], window_size=[6, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4],
indep_reconsts=[9, 10], embed_dims=[96, 96, 96, 96, 96, 96, 96, 120, 120, 120, 120],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=2, deformable_groups=16)
datasets = ['GoPro11-part1', 'GoPro11-part2']
args.scale = 1
args.window_size = [6, 8, 8]
args.nonblind_denoising = False
elif args.task in ['007_VRT_videodeblurring_REDS']:
model = net(upscale=1, img_size=[6, 192, 192], window_size=[6, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4],
indep_reconsts=[9, 10], embed_dims=[96, 96, 96, 96, 96, 96, 96, 120, 120, 120, 120],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=2, deformable_groups=16)
datasets = ['REDS4']
args.scale = 1
args.window_size = [6, 8, 8]
args.nonblind_denoising = False
elif args.task == '008_VRT_videodenoising_DAVIS':
model = net(upscale=1, img_size=[6, 192, 192], window_size=[6, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4],
indep_reconsts=[9, 10], embed_dims=[96, 96, 96, 96, 96, 96, 96, 120, 120, 120, 120],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=2, deformable_groups=16,
nonblind_denoising=True)
datasets = ['Set8', 'DAVIS-test']
args.scale = 1
args.window_size = [6, 8, 8]
args.nonblind_denoising = True
elif args.task == '009_VRT_videofi_Vimeo_4frames':
model = net(upscale=1, out_chans=3, img_size=[4, 192, 192], window_size=[4, 8, 8], depths=[8, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4],
indep_reconsts=[], embed_dims=[96, 96, 96, 96, 96, 96, 96, 120, 120, 120, 120],
num_heads=[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6], pa_frames=0)
datasets = ['UCF101', 'DAVIS-train']
args.scale = 1
args.window_size = [4, 8, 8]
args.nonblind_denoising = False
# download model
model_path = f'model_zoo/vrt/{args.task}.pth'
if os.path.exists(model_path):
print(f'loading model from ./model_zoo/vrt/{model_path}')
else:
os.makedirs(os.path.dirname(model_path), exist_ok=True)
url = 'https://github.com/JingyunLiang/VRT/releases/download/v0.0/{}'.format(os.path.basename(model_path))
r = requests.get(url, allow_redirects=True)
print(f'downloading model {model_path}')
open(model_path, 'wb').write(r.content)
pretrained_model = torch.load(model_path)
model.load_state_dict(pretrained_model['params'] if 'params' in pretrained_model.keys() else pretrained_model, strict=True)
# download datasets
if os.path.exists(f'{args.folder_lq}'):
print(f'using dataset from {args.folder_lq}')
else:
if 'vimeo' in args.folder_lq.lower():
print(f'Vimeo dataset is not at {args.folder_lq}! Please refer to #training of Readme.md to download it.')
else:
os.makedirs('testsets', exist_ok=True)
for dataset in datasets:
url = f'https://github.com/JingyunLiang/VRT/releases/download/v0.0/testset_{dataset}.tar.gz'
r = requests.get(url, allow_redirects=True)
print(f'downloading testing dataset {dataset}')
open(f'testsets/{dataset}.tar.gz', 'wb').write(r.content)
os.system(f'tar -xvf testsets/{dataset}.tar.gz -C testsets')
os.system(f'rm testsets/{dataset}.tar.gz')
return model
def test_video(lq, model, args):
'''test the video as a whole or as clips (divided temporally). '''
num_frame_testing = args.tile[0]
if num_frame_testing:
# test as multiple clips if out-of-memory
sf = args.scale
num_frame_overlapping = args.tile_overlap[0]
not_overlap_border = False
b, d, c, h, w = lq.size()
c = c - 1 if args.nonblind_denoising else c
stride = num_frame_testing - num_frame_overlapping
d_idx_list = list(range(0, d-num_frame_testing, stride)) + [max(0, d-num_frame_testing)]
E = torch.zeros(b, d, c, h*sf, w*sf)
W = torch.zeros(b, d, 1, 1, 1)
for d_idx in d_idx_list:
lq_clip = lq[:, d_idx:d_idx+num_frame_testing, ...]
out_clip = test_clip(lq_clip, model, args)
out_clip_mask = torch.ones((b, min(num_frame_testing, d), 1, 1, 1))
if not_overlap_border:
if d_idx < d_idx_list[-1]:
out_clip[:, -num_frame_overlapping//2:, ...] *= 0
out_clip_mask[:, -num_frame_overlapping//2:, ...] *= 0
if d_idx > d_idx_list[0]:
out_clip[:, :num_frame_overlapping//2, ...] *= 0
out_clip_mask[:, :num_frame_overlapping//2, ...] *= 0
E[:, d_idx:d_idx+num_frame_testing, ...].add_(out_clip)
W[:, d_idx:d_idx+num_frame_testing, ...].add_(out_clip_mask)
output = E.div_(W)
else:
# test as one clip (the whole video) if you have enough memory
window_size = args.window_size
d_old = lq.size(1)
d_pad = (window_size[0] - d_old % window_size[0]) % window_size[0]
lq = torch.cat([lq, torch.flip(lq[:, -d_pad:, ...], [1])], 1) if d_pad else lq
output = test_clip(lq, model, args)
output = output[:, :d_old, :, :, :]
return output
def test_clip(lq, model, args):
''' test the clip as a whole or as patches. '''
sf = args.scale
window_size = args.window_size
size_patch_testing = args.tile[1]
assert size_patch_testing % window_size[-1] == 0, 'testing patch size should be a multiple of window_size.'
if size_patch_testing:
# divide the clip to patches (spatially only, tested patch by patch)
overlap_size = args.tile_overlap[1]
not_overlap_border = True
# test patch by patch
b, d, c, h, w = lq.size()
c = c - 1 if args.nonblind_denoising else c
stride = size_patch_testing - overlap_size
h_idx_list = list(range(0, h-size_patch_testing, stride)) + [max(0, h-size_patch_testing)]
w_idx_list = list(range(0, w-size_patch_testing, stride)) + [max(0, w-size_patch_testing)]
E = torch.zeros(b, d, c, h*sf, w*sf)
W = torch.zeros_like(E)
for h_idx in h_idx_list:
for w_idx in w_idx_list:
in_patch = lq[..., h_idx:h_idx+size_patch_testing, w_idx:w_idx+size_patch_testing]
out_patch = model(in_patch).detach().cpu()
out_patch_mask = torch.ones_like(out_patch)
if not_overlap_border:
if h_idx < h_idx_list[-1]:
out_patch[..., -overlap_size//2:, :] *= 0
out_patch_mask[..., -overlap_size//2:, :] *= 0
if w_idx < w_idx_list[-1]:
out_patch[..., :, -overlap_size//2:] *= 0
out_patch_mask[..., :, -overlap_size//2:] *= 0
if h_idx > h_idx_list[0]:
out_patch[..., :overlap_size//2, :] *= 0
out_patch_mask[..., :overlap_size//2, :] *= 0
if w_idx > w_idx_list[0]:
out_patch[..., :, :overlap_size//2] *= 0
out_patch_mask[..., :, :overlap_size//2] *= 0
E[..., h_idx*sf:(h_idx+size_patch_testing)*sf, w_idx*sf:(w_idx+size_patch_testing)*sf].add_(out_patch)
W[..., h_idx*sf:(h_idx+size_patch_testing)*sf, w_idx*sf:(w_idx+size_patch_testing)*sf].add_(out_patch_mask)
output = E.div_(W)
else:
_, _, _, h_old, w_old = lq.size()
h_pad = (window_size[1] - h_old % window_size[1]) % window_size[1]
w_pad = (window_size[2] - w_old % window_size[2]) % window_size[2]
lq = torch.cat([lq, torch.flip(lq[:, :, :, -h_pad:, :], [3])], 3) if h_pad else lq
lq = torch.cat([lq, torch.flip(lq[:, :, :, :, -w_pad:], [4])], 4) if w_pad else lq
output = model(lq).detach().cpu()
output = output[:, :, :, :h_old*sf, :w_old*sf]
return output
if __name__ == '__main__':
main()
Prompt
# 001, REDS 데이터셋에서 6프레임으로 학습된 비디오 SR을 REDS4 데이터셋에서 테스트
python main_test_vrt.py --task 001_VRT_videosr_bi_REDS_6frames --folder_lq testsets/REDS4/sharp_bicubic --folder_gt testsets/REDS4/GT --tile 40 128 128 --tile_overlap 2 20 20
# 002, REDS 데이터셋에서 16프레임으로 학습된 비디오 SR을 REDS4 데이터셋에서 테스트
python main_test_vrt.py --task 002_VRT_videosr_bi_REDS_16frames --folder_lq testsets/REDS4/sharp_bicubic --folder_gt testsets/REDS4/GT --tile 40 128 128 --tile_overlap 2 20 20
# 003, Vimeo 데이터셋에서 학습된 비디오 SR을 Vid4와 Vimeo 데이터셋에서 테스트
python main_test_vrt.py --task 003_VRT_videosr_bi_Vimeo_7frames --folder_lq testsets/Vid4/BIx4 --folder_gt testsets/Vid4/GT --tile 32 128 128 --tile_overlap 2 20 20
python main_test_vrt.py --task 003_VRT_videosr_bi_Vimeo_7frames --folder_lq testsets/vimeo90k/vimeo_septuplet_matlabLRx4/sequences --folder_gt testsets/vimeo90k/vimeo_septuplet/sequences --tile 8 0 0 --tile_overlap 0 20 20
# 004, Vimeo 데이터셋에서 블러-다운샘플링으로 학습된 비디오 SR을 Vid4, UDM10, Vimeo 데이터셋에서 테스트
python main_test_vrt.py --task 004_VRT_videosr_bd_Vimeo_7frames --folder_lq testsets/Vid4/BDx4 --folder_gt testsets/Vid4/GT --tile 32 128 128 --tile_overlap 2 20 20
python main_test_vrt.py --task 004_VRT_videosr_bd_Vimeo_7frames --folder_lq testsets/UDM10/BDx4 --folder_gt testsets/UDM10/GT --tile 32 128 128 --tile_overlap 2 20 20
python main_test_vrt.py --task 004_VRT_videosr_bd_Vimeo_7frames --folder_lq testsets/vimeo90k/vimeo_septuplet_BDLRx4/sequences --folder_gt testsets/vimeo90k/vimeo_septuplet/sequences --tile 8 0 0 --tile_overlap 0 20 20
# 005, DVD 데이터셋에서 학습 및 테스트된 비디오 디블러링
python main_test_vrt.py --task 005_VRT_videodeblurring_DVD --folder_lq testsets/DVD10/test_GT_blurred --folder_gt testsets/DVD10/test_GT --tile 12 256 256 --tile_overlap 2 20 20
# 006, GoPro 데이터셋에서 학습 및 테스트된 비디오 디블러링
python main_test_vrt.py --task 006_VRT_videodeblurring_GoPro --folder_lq testsets/GoPro11/test_GT_blurred --folder_gt testsets/GoPro11/test_GT --tile 18 192 192 --tile_overlap 2 20 20
# 007, REDS 데이터셋에서 학습된 비디오 디블러링을 REDS4 데이터셋에서 테스트
python main_test_vrt.py --task 007_VRT_videodeblurring_REDS --folder_lq testsets/REDS4/blur --folder_gt testsets/REDS4/GT --tile 12 256 256 --tile_overlap 2 20 20
# 008, DAVIS 데이터셋에서 노이즈 레벨 0-50으로 학습된 비디오 디노이징을 Set8 및 DAVIS 데이터셋에서 테스트
python main_test_vrt.py --task 008_VRT_videodenoising_DAVIS --sigma 10 --folder_lq testsets/Set8 --folder_gt testsets/Set8 --tile 12 256 256 --tile_overlap 2 20 20
python main_test_vrt.py --task 008_VRT_videodenoising_DAVIS --sigma 10 --folder_lq testsets/DAVIS-test --folder_gt testsets/DAVIS-test --tile 12 256 256 --tile_overlap 2 20 20
# 009, Vimeo 데이터셋에서 단일 프레임 보간으로 학습된 비디오 프레임 보간을 Vimeo, UCF101 및 DAVIS-train 데이터셋에서 테스트
python main_test_vrt.py --task 009_VRT_videofi_Vimeo_4frames --folder_lq testsets/vimeo90k/vimeo_septuplet/sequences --folder_gt testsets/vimeo90k/vimeo_septuplet/sequences --tile 0 0 0 --tile_overlap 0 0 0
python main_test_vrt.py --task 009_VRT_videofi_Vimeo_4frames --folder_lq testsets/UCF101 --folder_gt testsets/UCF101 --tile 0 0 0 --tile_overlap 0 0 0
python main_test_vrt.py --task 009_VRT_videofi_Vimeo_4frames --folder_lq testsets/DAVIS-train --folder_gt testsets/DAVIS-train --tile 0 256 256 --tile_overlap 0 20 20
# 010, 003 및 009에서 사전 학습된 모델을 사용한 시공간 비디오 SR을 Vid4 및 Vimeo 데이터셋에서 테스트
# 003 및 009를 참조하십시오
# 사용자 자신의 데이터셋에서 테스트 (예시)
python main_test_vrt.py --task 001_VRT_videosr_bi_REDS_6frames --folder_lq testsets/your/own --tile 40 128 128 --tile_overlap 2 20 20
001_VRT_videosr_bi_REDS_6frames 명령어를 사용하면 가능하나, 업스케일을 위해선 tile 파라미터를 0 64 64로 설정해야 합니다.
IART와 마찬가지로, 입력 비디오 사이즈가 크면 결과가 일그러집니다.
또한, 첫 번째 파라미터를 0으로 설정하지 않으면 실행할 수 없습니다.
main.py
# uvicorn main:app --reload --host 192.168.0.201 --port 5000
# python main_test_vrt.py --task 001_VRT_videosr_bi_REDS_6frames --folder_lq testsets/input/own --tile 0 64 64 --tile_overlap 2 20 20 --save_result
import os
import glob
import shutil
import subprocess
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
import aiofiles
app = FastAPI()
# 비디오 파일 저장
async def save_upload_file(upload_file: UploadFile, destination: str) -> None:
async with aiofiles.open(destination, 'wb') as out_file:
while content := await upload_file.read(1024):
await out_file.write(content)
await upload_file.close()
# 프레임 추출
def extract_frames(input_video_path: str, output_frame_path: str) -> None:
subprocess.run([
'ffmpeg', '-i', input_video_path, '-q:v', '2', output_frame_path
], check=True)
# 업스케일 스크립트 실행
def run_demo_script(input_dir: str, output_dir: str) -> None:
try:
subprocess.run([
'python', 'main_test_vrt.py',
'--task', '001_VRT_videosr_bi_REDS_6frames',
'--folder_lq', input_dir,
'--tile', '0', '64', '64',
'--tile_overlap', '2', '20', '20',
'--save_result'
], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to run script: {e}")
# 프레임 합치기
def combine_frames_to_video(input_frame_path: str, output_video_path: str, frame_rate: int = 30) -> None:
frame_pattern = input_frame_path + "frame_%04d.png"
print(f"Looking for frames at: {frame_pattern}")
if not glob.glob(input_frame_path + "frame_0000.png"):
print(f"No files found at {input_frame_path + 'frame_0000.png'}")
return
try:
subprocess.run([
'ffmpeg', '-framerate', str(frame_rate), '-i', frame_pattern,
'-c:v', 'libx264', '-profile:v', 'high', '-pix_fmt', 'yuv420p', output_video_path
], check=True)
print(f"Video created successfully at {output_video_path}")
except subprocess.CalledProcessError as e:
print(f"Failed to create video: {e}")
def remove_files_after_response(input_dir: str, output_dir: str):
delete_directory(input_dir)
delete_directory(output_dir)
# 파일 삭제
def delete_directory(path):
try:
shutil.rmtree(path)
print(f"Successfully deleted {path}")
except Exception as e:
print(f"Failed to delete {path}: {e}")
@app.get("/")
async def read_root():
return {"Hello": "World"}
# Upscale
@app.post("/video")
async def upload_video(background_tasks: BackgroundTasks, video: UploadFile = File(...)):
base_dir = "demo"
input_dir = os.path.join("testsets", "input")
sequence_dir = os.path.join(input_dir, "own")
output_dir = os.path.join("results", "001_VRT_videosr_bi_REDS_6frames", "results")
video_name = os.path.splitext(video.filename)[0]
final_video_path = os.path.join(output_dir, f"{video_name}_upscaled.mp4")
# 필요한 디렉토리 생성
os.makedirs(input_dir, exist_ok=True)
os.makedirs(sequence_dir, exist_ok=True)
# 비디오 파일 저장
await save_upload_file(video, os.path.join(input_dir, video.filename))
# 프레임 추출
extract_frames(os.path.join(input_dir, video.filename), os.path.join(sequence_dir, video_name + "_frame_%04d.png"))
# 업스케일 스크립트 실행
run_demo_script(sequence_dir, output_dir)
# 프레임 합치기
combine_frames_to_video(output_dir + "/", final_video_path)
# 완성된 비디오 파일 반환
if os.path.exists(final_video_path):
# 배경 작업 추가
background_tasks.add_task(delete_directory, input_dir)
background_tasks.add_task(delete_directory, output_dir)
return FileResponse(path=final_video_path, filename=f"{video_name}_upscaled.mp4")
else:
return JSONResponse(content={"error": "Failed to create video file"}, status_code=500)
http://192.168.0.201:5000/video 엔드포인트 경로로 Video 파일 전송시
파일저장 → 프레임 추출 → 프레임당 업스케일 → 비디오 합치기 → 리턴 순으로 진행됨
5. 추론 결과
input Video Frame (VRT)
output Video Frame (VRT)
6. 최종 결과
이 모델도 input Video Size 조절이 필요함
첫 번째 이미지의 output은 IART와 같은 품질이지만, 입력 비디오로 테스트하지 못해 IART 모델과 결과 차이가 보이는 것 같습니다.
그러나 동일한 입력 비디오로 테스트했을 경우, 추론 결과는 비슷하게 나타납니다.
그럼에도 불구하고, 첫 번째 프레임은 180 x 144로 테스트한 결과이고, 두 번째 프레임은 598 x 484로 추론한 결과입니다. IART 모델과 마찬가지로 입력 비디오 크기가 커지면 완성도 높은 결과를 확인하기 어려웠습니다.
따라서 이 모델도 입력 비디오 크기 조절이 필요합니다.
Prompt에 나와있는 다른 체크포인트도 테스트를 해봐야 하지만 기술적 한계로 분석 불가 합니다.
'AI' 카테고리의 다른 글
Video Upscaling (CodeFormer) (0) | 2024.05.22 |
---|---|
Object Separation (cloth-segmentation) (0) | 2024.05.22 |
Video Upscaling (IART) (0) | 2024.05.22 |
Stable Diffsuion TEST (0) | 2024.04.18 |
생성형 AI의 일관성을 위한 사전 자료조사 (1) | 2024.04.18 |