navis
Video Upscaling (IART) 본문
728x90
1. 목차
2. 환경 설정
- AI 모델 테스트 환경
- Ubuntu 22.04(워크스테이션)
- Anaconda
- VS Code
- Python 3.9.0
- PyTorch 1.13
- CUDA 11.7
- 환경설정
git clone <https://github.com/kai422/IART>
cd IART
pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 --extra-index-url <https://download.pytorch.org/whl/cu117>
pip install -r requirements.txt
-----------------------------------------------
데이터셋 :
<https://seungjunnah.github.io/Datasets/reds.html>
<https://gist.github.com/SeungjunNah/b10d369b92840cb8dd2118dd4f41d643>
데이터셋 프롬프트 :
python script_name.py --server google --all
-----------------------------------------------
가중치 :
<https://drive.google.com/drive/folders/1MIUK37Izc4IcA_a3eSH-21EXOZO5G5qU>
경로 :
./experiments./pretrained_models/
- Fast Server 설정
- Fast API
- OpenCV 4.9
- FFmpeg
- 환경설정
pip install fastapi uvicorn aiofiles
conda install -c conda-forge ffmpeg
3. IART
https://github.com/kai422/IART
1. 모델 (IART)
[CVPR 2024 Highlight] Enhancing Video Super-Resolution via Implicit Resampling-based Alignment
4. 코드 변경
demo.py
# uvicorn main:app --reload --host 192.168.0.201 --port 5000
# python demo.py --input demo/input --output demo/output
import os
import glob
import shutil
import subprocess
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
import aiofiles
import torch
from torchvision.io import read_video, write_video
from archs.iart_arch import IART
from basicsr.utils import tensor2img
from torchvision import transforms
from PIL import Image
app = FastAPI()
DEMO_SCRIPT_PATH = '/home/viking3/AI/Upscale/IART/demo.py' # demo.py 파일의 경로를 변수로 설정
# 비디오 파일 저장
async def save_upload_file(upload_file: UploadFile, destination: str) -> None:
async with aiofiles.open(destination, 'wb') as out_file: # 비디오 파일을 비동기적으로 저장
while content := await upload_file.read(1024): # 파일을 1024 바이트씩 읽음
await out_file.write(content) # 읽은 내용을 파일에 씀
await upload_file.close() # 파일 닫기
# 프레임 추출
def extract_frames(input_video_path: str, output_frame_path: str) -> None:
subprocess.run([
'ffmpeg', '-i', input_video_path, '-q:v', '2', output_frame_path # ffmpeg를 사용하여 프레임 추출
], check=True)
# 업스케일 스크립트 실행
def run_demo_script(input_dir: str, output_dir: str) -> None:
try:
subprocess.run([
'python', DEMO_SCRIPT_PATH,
'--input', input_dir,
'--output', output_dir # demo.py 스크립트를 실행하여 업스케일 수행
], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to run script: {e}")
# 프레임 합치기
def combine_frames_to_video(input_frame_path: str, output_video_path: str, frame_rate: int = 30) -> None:
frame_pattern = input_frame_path + "%08d.png" # 프레임 파일 패턴 설정
print(f"Looking for frames at: {frame_pattern}")
if not glob.glob(input_frame_path + "00000000.png"): # 첫 번째 프레임 파일이 존재하는지 확인
print(f"No files found at {input_frame_path + '00000000.png'}")
return
try:
subprocess.run([
'ffmpeg', '-framerate', str(frame_rate), '-i', frame_pattern,
'-c:v', 'libx264', '-profile:v', 'high', '-pix_fmt', 'yuv420p', output_video_path # ffmpeg를 사용하여 비디오로 합치기
], check=True)
print(f"Video created successfully at {output_video_path}")
except subprocess.CalledProcessError as e:
print(f"Failed to create video: {e}")
# 응답 후 파일 삭제
def remove_files_after_response(input_dir: str, output_dir: str):
delete_directory(input_dir)
delete_directory(output_dir)
# 디렉토리 삭제
def delete_directory(path):
try:
shutil.rmtree(path) # 디렉토리 삭제
print(f"Successfully deleted {path}")
except Exception as e:
print(f"Failed to delete {path}: {e}")
@app.get("/")
async def read_root():
return {"Hello": "World"} # 기본 루트 엔드포인트
@app.post("/video")
async def upload_video(background_tasks: BackgroundTasks, video: UploadFile = File(...)):
base_dir = "demo"
input_dir = os.path.join(base_dir, "input")
output_dir = os.path.join(base_dir, "output")
sequence_dir = os.path.join(input_dir, "video")
upscaled_frames_dir = os.path.join(output_dir, "video")
video_name = os.path.splitext(video.filename)[0]
final_video_path = os.path.join(upscaled_frames_dir, f"{video_name}_upscaled.mp4")
os.makedirs(sequence_dir, exist_ok=True)
os.makedirs(upscaled_frames_dir, exist_ok=True)
await save_upload_file(video, os.path.join(input_dir, video.filename)) # 비디오 파일 저장
extract_frames(os.path.join(input_dir, video.filename), os.path.join(sequence_dir, video_name + "_frame_%06d.jpg")) # 프레임 추출
run_demo_script(sequence_dir, upscaled_frames_dir) # 업스케일 스크립트 실행
combine_frames_to_video(upscaled_frames_dir + "/", final_video_path) # 프레임을 비디오로 합치기
if os.path.exists(final_video_path):
background_tasks.add_task(delete_directory, input_dir)
background_tasks.add_task(delete_directory, output_dir)
return FileResponse(path=final_video_path, filename=f"{video_name}_upscaled.mp4") # 업스케일된 비디오 파일 응답
else:
return JSONResponse(content={"error": "Failed to create video file"}, status_code=500) # 비디오 파일 생성 실패 응답
기존 def main() 함수에서 테스트 데이터 배열을 사용한 반복문 코드를 수정.
main.py
# uvicorn main:app --reload --host 192.168.0.201 --port 5000
# python demo.py --input demo/input --output demo/output
import os
import glob
import shutil
import subprocess
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
import aiofiles
import torch
from torchvision.io import read_video, write_video
from archs.iart_arch import IART
from basicsr.utils import tensor2img
from torchvision import transforms
from PIL import Image
app = FastAPI()
DEMO_SCRIPT_PATH = '/home/viking3/AI/Upscale/IART/demo.py' # demo.py 파일의 경로를 변수로 설정
# 비디오 파일 저장
async def save_upload_file(upload_file: UploadFile, destination: str) -> None:
async with aiofiles.open(destination, 'wb') as out_file: # 비디오 파일을 비동기적으로 저장
while content := await upload_file.read(1024): # 파일을 1024 바이트씩 읽음
await out_file.write(content) # 읽은 내용을 파일에 씀
await upload_file.close() # 파일 닫기
# 프레임 추출
def extract_frames(input_video_path: str, output_frame_path: str) -> None:
subprocess.run([
'ffmpeg', '-i', input_video_path, '-q:v', '2', output_frame_path # ffmpeg를 사용하여 프레임 추출
], check=True)
# 업스케일 스크립트 실행
def run_demo_script(input_dir: str, output_dir: str) -> None:
try:
subprocess.run([
'python', DEMO_SCRIPT_PATH,
'--input', input_dir,
'--output', output_dir # demo.py 스크립트를 실행하여 업스케일 수행
], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to run script: {e}")
# 프레임 합치기
def combine_frames_to_video(input_frame_path: str, output_video_path: str, frame_rate: int = 30) -> None:
frame_pattern = input_frame_path + "%08d.png" # 프레임 파일 패턴 설정
print(f"Looking for frames at: {frame_pattern}")
if not glob.glob(input_frame_path + "00000000.png"): # 첫 번째 프레임 파일이 존재하는지 확인
print(f"No files found at {input_frame_path + '00000000.png'}")
return
try:
subprocess.run([
'ffmpeg', '-framerate', str(frame_rate), '-i', frame_pattern,
'-c:v', 'libx264', '-profile:v', 'high', '-pix_fmt', 'yuv420p', output_video_path # ffmpeg를 사용하여 비디오로 합치기
], check=True)
print(f"Video created successfully at {output_video_path}")
except subprocess.CalledProcessError as e:
print(f"Failed to create video: {e}")
# 응답 후 파일 삭제
def remove_files_after_response(input_dir: str, output_dir: str):
delete_directory(input_dir)
delete_directory(output_dir)
# 디렉토리 삭제
def delete_directory(path):
try:
shutil.rmtree(path) # 디렉토리 삭제
print(f"Successfully deleted {path}")
except Exception as e:
print(f"Failed to delete {path}: {e}")
@app.get("/")
async def read_root():
return {"Hello": "World"} # 기본 루트 엔드포인트
@app.post("/video")
async def upload_video(background_tasks: BackgroundTasks, video: UploadFile = File(...)):
base_dir = "demo"
input_dir = os.path.join(base_dir, "input")
output_dir = os.path.join(base_dir, "output")
sequence_dir = os.path.join(input_dir, "video")
upscaled_frames_dir = os.path.join(output_dir, "video")
video_name = os.path.splitext(video.filename)[0]
final_video_path = os.path.join(upscaled_frames_dir, f"{video_name}_upscaled.mp4")
os.makedirs(sequence_dir, exist_ok=True)
os.makedirs(upscaled_frames_dir, exist_ok=True)
await save_upload_file(video, os.path.join(input_dir, video.filename)) # 비디오 파일 저장
extract_frames(os.path.join(input_dir, video.filename), os.path.join(sequence_dir, video_name + "_frame_%06d.jpg")) # 프레임 추출
run_demo_script(sequence_dir, upscaled_frames_dir) # 업스케일 스크립트 실행
combine_frames_to_video(upscaled_frames_dir + "/", final_video_path) # 프레임을 비디오로 합치기
if os.path.exists(final_video_path):
background_tasks.add_task(delete_directory, input_dir)
background_tasks.add_task(delete_directory, output_dir)
return FileResponse(path=final_video_path, filename=f"{video_name}_upscaled.mp4") # 업스케일된 비디오 파일 응답
else:
return JSONResponse(content={"error": "Failed to create video file"}, status_code=500) # 비디오 파일 생성 실패 응답
http://192.168.0.201:5000/video 엔드포인트 경로로 Video 파일 전송시
파일저장 → 프레임 추출 → 프레임당 업스케일 → 비디오 합치기 → 리턴 순으로 진행됨
5. 추론 결과
input Video (IART)
output Video (IART)
6. 최종 결과
input Video Size가 180 x 144 인데 이 이상 커졌을 때 RuntimeError 발생하여
def __init__(self,
in_channels=3,
mid_channels=64,
embed_dim=120,
depths=(6, 6, 6, 6, 6, 6),
num_heads=(6, 6, 6, 6, 6, 6),
window_size=(3, 8, 8),
num_frames=3,
img_size = 60, # 사이즈 조정
patch_size=1,
cpu_cache_length=100,
is_low_res_input=True,
use_checkpoint = [True, True, True, True],
spynet_path=None):
self.swin_backbone = nn.ModuleDict()
modules = ['backward_1', 'forward_1', 'backward_2', 'forward_2']
for i, module in enumerate(modules):
if torch.cuda.is_available():
self.swin_backbone[module] = SwinIRFM(
img_size=img_size,
patch_size=patch_size,
in_chans=in_channels,
embed_dim=embed_dim,
depths=depths,
num_heads=num_heads,
window_size=window_size,
mlp_ratio=2.,
qkv_bias=True,
qk_scale=None,
drop_rate=0.,
attn_drop_rate=0.,
drop_path_rate=0.1,
norm_layer=nn.LayerNorm,
ape=False,
patch_norm=True,
use_checkpoint=use_checkpoint[i],
upscale=4, # 배수 조정
img_range=1.,
upsampler='pixelshuffle',
resi_connection='1conv',
num_frames=num_frames)
사이즈 변경을 해봤으나 코드에 대한 이해 부족으로 해결 못함.
180 x 144 비디오의 결과 값은 완성도가 높았습니다. 그러나 모델이 해상도를 줄인 후 Upscaling을 진행했을 때 완성도가 높아진 것인지 확인이 필요합니다.
'AI' 카테고리의 다른 글
Object Separation (cloth-segmentation) (0) | 2024.05.22 |
---|---|
Video Upscaling (VRT) (0) | 2024.05.22 |
Stable Diffsuion TEST (0) | 2024.04.18 |
생성형 AI의 일관성을 위한 사전 자료조사 (1) | 2024.04.18 |
생성형 AI 일관성 유지 (0) | 2024.04.18 |