fs.py 4.03 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from contextlib import contextmanager
from abc import ABCMeta, abstractmethod
from io import BytesIO
import os
import logging
import threading
import boto3
from typing import Generator, Any
from urllib.parse import urlparse
from .config import get_storage_config

class FileStorage(metaclass=ABCMeta):
    @abstractmethod
    def get_full_file_path(self, file_path: str) -> str:
        return NotImplemented
    
    @abstractmethod
    def read_file(self, file_path: str) -> Generator[Any, Any, Any]:
        return NotImplemented

    @abstractmethod
    def close(self) -> None:
        return NotImplemented
    
class PosixFileStorage(FileStorage):
    def __init__(self, root: str):
        self.root = root
        if not os.path.isdir(root):
            raise ValueError("Root is not a directory")
    
    def get_full_file_path(self, file_path: str) -> str:
        if file_path.startswith('/'):
            return file_path
        return os.path.join(self.root, file_path)
    
    @contextmanager
    def read_file(self, file_path: str) -> Generator[BytesIO, Any, Any]:
        full_file_path = self.get_full_file_path(file_path)
        file_obj = None
        try:
            file_obj = open(full_file_path, "rb")
            file_content = file_obj.read()
            yield BytesIO(file_content)
        finally:
            if file_obj is not None:
                file_obj.close()
    
    def close(self) -> None:
        pass

class S3FileStorage(FileStorage):
    _instance = None
    _is_initialized = False
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, s3_endpoint: str, s3_access_key: str, s3_secret_key: str, s3_storage: str):
        if not self._is_initialized:
            with self._lock:
                if not self._is_initialized:
                    self.s3_storage = s3_storage
                    parsed_url = urlparse(s3_storage)
                    bucket_name = parsed_url.netloc
                    self.storage_prefix = parsed_url.path.lstrip('/')
                    self.bucket_name = bucket_name
                    self.s3_endpoint = s3_endpoint
                    self.s3 = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key)
                    self._is_initialized = True

    def get_full_file_path(self, file_path: str) -> str:
        return f"{self.s3_storage}/{file_path}"
    
    def add_prefix(self, file_path: str) -> str:
        if self.storage_prefix == '':
            return file_path        
        return f"{self.storage_prefix}/{file_path}"
    
    @contextmanager
    def read_file(self, file_path: str) -> Generator[BytesIO, None, None]:
        if file_path.startswith('/'):
            with PosixFileStorage("/").read_file(file_path) as content:
                yield content
        else:
            file_obj = self.s3.get_object(Bucket=self.bucket_name, Key=self.add_prefix(file_path))
            try:
                file_content = file_obj['Body'].read()
                yield BytesIO(file_content)
            finally:
                pass
    def close(self) -> None:
        self.s3.close()
    
def get_file_storage() -> FileStorage:
    storage_settings = get_storage_config().data
    if storage_settings.FILE_STORAGE_TYPE == "s3":
        return S3FileStorage(storage_settings.S3_ENDPOINT, storage_settings.S3_ACCESS_KEY, storage_settings.S3_SECRET_KEY, storage_settings.S3_STORAGE)
    elif storage_settings.FILE_STORAGE_TYPE == "posix":
        if not os.environ.get("CSST_DFS_ROOT", None):
            logging.warning(f"CSST_DFS_ROOT environment variable is not set, using default value from DFS config [{storage_settings.CSST_DFS_ROOT}].")
        return PosixFileStorage(os.environ.get("CSST_DFS_ROOT", storage_settings.CSST_DFS_ROOT))
    else:
        raise ValueError(f"Invalid storage type: {storage_settings.FILE_STORAGE_TYPE}")