navis

Video Upscaling (IART) 본문

AI

Video Upscaling (IART)

menstua 2024. 5. 22. 12:39
728x90

1. 목차

  • 환경설정
  • 모델 - IART
  • 코드변경
    • 기존 demo.py 파일 input 데이터 처리하게 수정
    • Fast API 통신을 위한 main.py 작성
  • 추론 결과
    • 영상비교
  • 최종 결과

2. 환경 설정

  • AI 모델 테스트 환경
    • Ubuntu 22.04(워크스테이션)
    • Anaconda
    • VS Code
    • Python 3.9.0
    • PyTorch 1.13
    • CUDA 11.7
  • 환경설정
git clone <https://github.com/kai422/IART>
cd IART

pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 --extra-index-url <https://download.pytorch.org/whl/cu117>
pip install -r requirements.txt

-----------------------------------------------
데이터셋 : 
<https://seungjunnah.github.io/Datasets/reds.html>
<https://gist.github.com/SeungjunNah/b10d369b92840cb8dd2118dd4f41d643>
데이터셋 프롬프트 :
python script_name.py --server google --all
-----------------------------------------------
가중치 :
<https://drive.google.com/drive/folders/1MIUK37Izc4IcA_a3eSH-21EXOZO5G5qU>
경로 : 
./experiments./pretrained_models/
  • Fast Server 설정
    • Fast API
    • OpenCV 4.9
    • FFmpeg
  • 환경설정
pip install fastapi uvicorn aiofiles
conda install -c conda-forge ffmpeg

3. IART

https://github.com/kai422/IART

1. 모델 (IART)

[CVPR 2024 Highlight] Enhancing Video Super-Resolution via Implicit Resampling-based Alignment

4. 코드 변경

demo.py

# uvicorn main:app --reload --host 192.168.0.201 --port 5000
# python demo.py --input demo/input --output demo/output

import os
import glob
import shutil
import subprocess
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
import aiofiles
import torch
from torchvision.io import read_video, write_video
from archs.iart_arch import IART
from basicsr.utils import tensor2img
from torchvision import transforms
from PIL import Image

app = FastAPI()

DEMO_SCRIPT_PATH = '/home/viking3/AI/Upscale/IART/demo.py'  # demo.py 파일의 경로를 변수로 설정

# 비디오 파일 저장
async def save_upload_file(upload_file: UploadFile, destination: str) -> None:
    async with aiofiles.open(destination, 'wb') as out_file:  # 비디오 파일을 비동기적으로 저장
        while content := await upload_file.read(1024):  # 파일을 1024 바이트씩 읽음
            await out_file.write(content)  # 읽은 내용을 파일에 씀
    await upload_file.close()  # 파일 닫기

# 프레임 추출
def extract_frames(input_video_path: str, output_frame_path: str) -> None:
    subprocess.run([
        'ffmpeg', '-i', input_video_path, '-q:v', '2', output_frame_path  # ffmpeg를 사용하여 프레임 추출
    ], check=True)

# 업스케일 스크립트 실행
def run_demo_script(input_dir: str, output_dir: str) -> None:
    try:
        subprocess.run([
            'python', DEMO_SCRIPT_PATH,
            '--input', input_dir,
            '--output', output_dir  # demo.py 스크립트를 실행하여 업스케일 수행
        ], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Failed to run script: {e}")

# 프레임 합치기
def combine_frames_to_video(input_frame_path: str, output_video_path: str, frame_rate: int = 30) -> None:
    frame_pattern = input_frame_path + "%08d.png"  # 프레임 파일 패턴 설정
    
    print(f"Looking for frames at: {frame_pattern}")

    if not glob.glob(input_frame_path + "00000000.png"):  # 첫 번째 프레임 파일이 존재하는지 확인
        print(f"No files found at {input_frame_path + '00000000.png'}")
        return

    try:
        subprocess.run([
            'ffmpeg', '-framerate', str(frame_rate), '-i', frame_pattern,
            '-c:v', 'libx264', '-profile:v', 'high', '-pix_fmt', 'yuv420p', output_video_path  # ffmpeg를 사용하여 비디오로 합치기
        ], check=True)
        print(f"Video created successfully at {output_video_path}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to create video: {e}")

# 응답 후 파일 삭제
def remove_files_after_response(input_dir: str, output_dir: str):
    delete_directory(input_dir)
    delete_directory(output_dir)

# 디렉토리 삭제
def delete_directory(path):
    try:
        shutil.rmtree(path)  # 디렉토리 삭제
        print(f"Successfully deleted {path}")
    except Exception as e:
        print(f"Failed to delete {path}: {e}")

@app.get("/")
async def read_root():
    return {"Hello": "World"}  # 기본 루트 엔드포인트

@app.post("/video")
async def upload_video(background_tasks: BackgroundTasks, video: UploadFile = File(...)):
    base_dir = "demo"
    input_dir = os.path.join(base_dir, "input")
    output_dir = os.path.join(base_dir, "output")
    sequence_dir = os.path.join(input_dir, "video")
    upscaled_frames_dir = os.path.join(output_dir, "video")
    video_name = os.path.splitext(video.filename)[0]
    final_video_path = os.path.join(upscaled_frames_dir, f"{video_name}_upscaled.mp4")

    os.makedirs(sequence_dir, exist_ok=True)
    os.makedirs(upscaled_frames_dir, exist_ok=True)

    await save_upload_file(video, os.path.join(input_dir, video.filename))  # 비디오 파일 저장

    extract_frames(os.path.join(input_dir, video.filename), os.path.join(sequence_dir, video_name + "_frame_%06d.jpg"))  # 프레임 추출

    run_demo_script(sequence_dir, upscaled_frames_dir)  # 업스케일 스크립트 실행

    combine_frames_to_video(upscaled_frames_dir + "/", final_video_path)  # 프레임을 비디오로 합치기

    if os.path.exists(final_video_path):
        background_tasks.add_task(delete_directory, input_dir)
        background_tasks.add_task(delete_directory, output_dir)
        return FileResponse(path=final_video_path, filename=f"{video_name}_upscaled.mp4")  # 업스케일된 비디오 파일 응답
    else:
        return JSONResponse(content={"error": "Failed to create video file"}, status_code=500)  # 비디오 파일 생성 실패 응답

 

기존 def main() 함수에서 테스트 데이터 배열을 사용한 반복문 코드를 수정.

main.py

# uvicorn main:app --reload --host 192.168.0.201 --port 5000
# python demo.py --input demo/input --output demo/output

import os
import glob
import shutil
import subprocess
from fastapi import FastAPI, UploadFile, File, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
import aiofiles
import torch
from torchvision.io import read_video, write_video
from archs.iart_arch import IART
from basicsr.utils import tensor2img
from torchvision import transforms
from PIL import Image

app = FastAPI()

DEMO_SCRIPT_PATH = '/home/viking3/AI/Upscale/IART/demo.py'  # demo.py 파일의 경로를 변수로 설정

# 비디오 파일 저장
async def save_upload_file(upload_file: UploadFile, destination: str) -> None:
    async with aiofiles.open(destination, 'wb') as out_file:  # 비디오 파일을 비동기적으로 저장
        while content := await upload_file.read(1024):  # 파일을 1024 바이트씩 읽음
            await out_file.write(content)  # 읽은 내용을 파일에 씀
    await upload_file.close()  # 파일 닫기

# 프레임 추출
def extract_frames(input_video_path: str, output_frame_path: str) -> None:
    subprocess.run([
        'ffmpeg', '-i', input_video_path, '-q:v', '2', output_frame_path  # ffmpeg를 사용하여 프레임 추출
    ], check=True)

# 업스케일 스크립트 실행
def run_demo_script(input_dir: str, output_dir: str) -> None:
    try:
        subprocess.run([
            'python', DEMO_SCRIPT_PATH,
            '--input', input_dir,
            '--output', output_dir  # demo.py 스크립트를 실행하여 업스케일 수행
        ], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Failed to run script: {e}")

# 프레임 합치기
def combine_frames_to_video(input_frame_path: str, output_video_path: str, frame_rate: int = 30) -> None:
    frame_pattern = input_frame_path + "%08d.png"  # 프레임 파일 패턴 설정
    
    print(f"Looking for frames at: {frame_pattern}")

    if not glob.glob(input_frame_path + "00000000.png"):  # 첫 번째 프레임 파일이 존재하는지 확인
        print(f"No files found at {input_frame_path + '00000000.png'}")
        return

    try:
        subprocess.run([
            'ffmpeg', '-framerate', str(frame_rate), '-i', frame_pattern,
            '-c:v', 'libx264', '-profile:v', 'high', '-pix_fmt', 'yuv420p', output_video_path  # ffmpeg를 사용하여 비디오로 합치기
        ], check=True)
        print(f"Video created successfully at {output_video_path}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to create video: {e}")

# 응답 후 파일 삭제
def remove_files_after_response(input_dir: str, output_dir: str):
    delete_directory(input_dir)
    delete_directory(output_dir)

# 디렉토리 삭제
def delete_directory(path):
    try:
        shutil.rmtree(path)  # 디렉토리 삭제
        print(f"Successfully deleted {path}")
    except Exception as e:
        print(f"Failed to delete {path}: {e}")

@app.get("/")
async def read_root():
    return {"Hello": "World"}  # 기본 루트 엔드포인트

@app.post("/video")
async def upload_video(background_tasks: BackgroundTasks, video: UploadFile = File(...)):
    base_dir = "demo"
    input_dir = os.path.join(base_dir, "input")
    output_dir = os.path.join(base_dir, "output")
    sequence_dir = os.path.join(input_dir, "video")
    upscaled_frames_dir = os.path.join(output_dir, "video")
    video_name = os.path.splitext(video.filename)[0]
    final_video_path = os.path.join(upscaled_frames_dir, f"{video_name}_upscaled.mp4")

    os.makedirs(sequence_dir, exist_ok=True)
    os.makedirs(upscaled_frames_dir, exist_ok=True)

    await save_upload_file(video, os.path.join(input_dir, video.filename))  # 비디오 파일 저장

    extract_frames(os.path.join(input_dir, video.filename), os.path.join(sequence_dir, video_name + "_frame_%06d.jpg"))  # 프레임 추출

    run_demo_script(sequence_dir, upscaled_frames_dir)  # 업스케일 스크립트 실행

    combine_frames_to_video(upscaled_frames_dir + "/", final_video_path)  # 프레임을 비디오로 합치기

    if os.path.exists(final_video_path):
        background_tasks.add_task(delete_directory, input_dir)
        background_tasks.add_task(delete_directory, output_dir)
        return FileResponse(path=final_video_path, filename=f"{video_name}_upscaled.mp4")  # 업스케일된 비디오 파일 응답
    else:
        return JSONResponse(content={"error": "Failed to create video file"}, status_code=500)  # 비디오 파일 생성 실패 응답

 

 

http://192.168.0.201:5000/video 엔드포인트 경로로 Video 파일 전송시

파일저장 → 프레임 추출 → 프레임당 업스케일 → 비디오 합치기 → 리턴 순으로 진행됨

5. 추론 결과

input Video (IART)

 

 

output Video (IART)

 

 

6. 최종 결과

input Video Size가 180 x 144 인데 이 이상 커졌을 때 RuntimeError 발생하여

def __init__(self,
                 in_channels=3,
                 mid_channels=64,
                 embed_dim=120,
                 depths=(6, 6, 6, 6, 6, 6),
                 num_heads=(6, 6, 6, 6, 6, 6),
                 window_size=(3, 8, 8),
                 num_frames=3,
                 img_size = 60, # 사이즈 조정
                 patch_size=1,
                 cpu_cache_length=100,
                 is_low_res_input=True,
                 use_checkpoint = [True, True, True, True],
                 spynet_path=None):
self.swin_backbone = nn.ModuleDict()
      modules = ['backward_1', 'forward_1', 'backward_2', 'forward_2']
      for i, module in enumerate(modules):
          if torch.cuda.is_available():
              self.swin_backbone[module] = SwinIRFM(
                  img_size=img_size,
                  patch_size=patch_size,
                  in_chans=in_channels,
                  embed_dim=embed_dim,
                  depths=depths,
                  num_heads=num_heads,
                  window_size=window_size,
                  mlp_ratio=2.,
                  qkv_bias=True,
                  qk_scale=None,
                  drop_rate=0.,
                  attn_drop_rate=0.,
                  drop_path_rate=0.1,
                  norm_layer=nn.LayerNorm,
                  ape=False,
                  patch_norm=True,
                  use_checkpoint=use_checkpoint[i],
                  upscale=4, # 배수 조정
                  img_range=1.,
                  upsampler='pixelshuffle',
                  resi_connection='1conv',
                  num_frames=num_frames)              
             
   

 

사이즈 변경을 해봤으나 코드에 대한 이해 부족으로 해결 못함.

180 x 144 비디오의 결과 값은 완성도가 높았습니다. 그러나 모델이 해상도를 줄인 후 Upscaling을 진행했을 때 완성도가 높아진 것인지 확인이 필요합니다.

'AI' 카테고리의 다른 글

Object Separation (cloth-segmentation)  (0) 2024.05.22
Video Upscaling (VRT)  (0) 2024.05.22
Stable Diffsuion TEST  (0) 2024.04.18
생성형 AI의 일관성을 위한 사전 자료조사  (1) 2024.04.18
생성형 AI 일관성 유지  (0) 2024.04.18