from contextlib import contextmanager from abc import ABCMeta, abstractmethod from io import BytesIO import os import logging import threading from typing import Generator, Any from urllib.parse import urlparse from .config import get_storage_config class FileStorage(metaclass=ABCMeta): @abstractmethod def get_full_file_path(self, file_path: str) -> str: return NotImplemented @abstractmethod def read_file(self, file_path: str) -> Generator[Any, Any, Any]: return NotImplemented @abstractmethod def close(self) -> None: return NotImplemented class PosixFileStorage(FileStorage): def __init__(self, root: str): self.root = root if not os.path.isdir(root): raise ValueError(f"{root} is not a directory") def get_full_file_path(self, file_path: str) -> str: if file_path.startswith('/'): return file_path return os.path.join(self.root, file_path) @contextmanager def read_file(self, file_path: str) -> Generator[BytesIO, Any, Any]: full_file_path = self.get_full_file_path(file_path) file_obj = None try: file_obj = open(full_file_path, "rb") file_content = file_obj.read() yield BytesIO(file_content) finally: if file_obj is not None: file_obj.close() def close(self) -> None: pass class S3FileStorage(FileStorage): _instance = None _is_initialized = False _lock = threading.Lock() def __new__(cls, *args, **kwargs): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self, s3_endpoint: str, s3_access_key: str, s3_secret_key: str, s3_storage: str): if not self._is_initialized: with self._lock: if not self._is_initialized: import boto3 self.s3_storage = s3_storage parsed_url = urlparse(s3_storage) bucket_name = parsed_url.netloc self.storage_prefix = parsed_url.path.lstrip('/') self.bucket_name = bucket_name self.s3_endpoint = s3_endpoint self.s3 = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key) self._is_initialized = True def get_full_file_path(self, file_path: str) -> str: return f"{self.s3_storage}/{file_path}" def add_prefix(self, file_path: str) -> str: if self.storage_prefix == '': return file_path return f"{self.storage_prefix}/{file_path}" @contextmanager def read_file(self, file_path: str) -> Generator[BytesIO, None, None]: if file_path.startswith('/'): with PosixFileStorage("/").read_file(file_path) as content: yield content else: file_obj = self.s3.get_object(Bucket=self.bucket_name, Key=self.add_prefix(file_path)) try: file_content = file_obj['Body'].read() yield BytesIO(file_content) finally: pass def close(self) -> None: self.s3.close() def get_file_storage() -> FileStorage: storage_settings = get_storage_config().data if storage_settings.FILE_STORAGE_TYPE == "s3": return S3FileStorage(storage_settings.S3_ENDPOINT, storage_settings.S3_ACCESS_KEY, storage_settings.S3_SECRET_KEY, storage_settings.S3_STORAGE) elif storage_settings.FILE_STORAGE_TYPE == "posix": if not os.environ.get("CSST_DFS_ROOT", None): logging.warning(f"CSST_DFS_ROOT environment variable is not set, using default value from DFS config [{storage_settings.CSST_DFS_ROOT}].") return PosixFileStorage(os.environ.get("CSST_DFS_ROOT", storage_settings.CSST_DFS_ROOT)) else: raise ValueError(f"Invalid storage type: {storage_settings.FILE_STORAGE_TYPE}")