fs.py 4.04 KB
Newer Older
Wei Shoulin's avatar
Wei Shoulin committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from contextlib import contextmanager
from abc import ABCMeta, abstractmethod
from io import BytesIO
import os
import logging
import threading
import boto3
from typing import Generator, Any
from urllib.parse import urlparse
from .config import get_storage_config

class FileStorage(metaclass=ABCMeta):
    @abstractmethod
    def get_full_file_path(self, file_path: str) -> str:
        return NotImplemented
    
    @abstractmethod
    def read_file(self, file_path: str) -> Generator[Any, Any, Any]:
        return NotImplemented

    @abstractmethod
    def close(self) -> None:
        return NotImplemented
    
class PosixFileStorage(FileStorage):
    def __init__(self, root: str):
        self.root = root
        if not os.path.isdir(root):
Wei Shoulin's avatar
tips    
Wei Shoulin committed
29
            raise ValueError(f"{root} is not a directory")
Wei Shoulin's avatar
Wei Shoulin committed
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
    
    def get_full_file_path(self, file_path: str) -> str:
        if file_path.startswith('/'):
            return file_path
        return os.path.join(self.root, file_path)
    
    @contextmanager
    def read_file(self, file_path: str) -> Generator[BytesIO, Any, Any]:
        full_file_path = self.get_full_file_path(file_path)
        file_obj = None
        try:
            file_obj = open(full_file_path, "rb")
            file_content = file_obj.read()
            yield BytesIO(file_content)
        finally:
            if file_obj is not None:
                file_obj.close()
    
    def close(self) -> None:
        pass

class S3FileStorage(FileStorage):
    _instance = None
    _is_initialized = False
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, s3_endpoint: str, s3_access_key: str, s3_secret_key: str, s3_storage: str):
        if not self._is_initialized:
            with self._lock:
                if not self._is_initialized:
                    self.s3_storage = s3_storage
                    parsed_url = urlparse(s3_storage)
                    bucket_name = parsed_url.netloc
                    self.storage_prefix = parsed_url.path.lstrip('/')
                    self.bucket_name = bucket_name
                    self.s3_endpoint = s3_endpoint
                    self.s3 = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key)
                    self._is_initialized = True

    def get_full_file_path(self, file_path: str) -> str:
        return f"{self.s3_storage}/{file_path}"
    
    def add_prefix(self, file_path: str) -> str:
        if self.storage_prefix == '':
            return file_path        
        return f"{self.storage_prefix}/{file_path}"
    
    @contextmanager
    def read_file(self, file_path: str) -> Generator[BytesIO, None, None]:
        if file_path.startswith('/'):
            with PosixFileStorage("/").read_file(file_path) as content:
                yield content
        else:
            file_obj = self.s3.get_object(Bucket=self.bucket_name, Key=self.add_prefix(file_path))
            try:
                file_content = file_obj['Body'].read()
                yield BytesIO(file_content)
            finally:
                pass
    def close(self) -> None:
        self.s3.close()
    
def get_file_storage() -> FileStorage:
    storage_settings = get_storage_config().data
    if storage_settings.FILE_STORAGE_TYPE == "s3":
        return S3FileStorage(storage_settings.S3_ENDPOINT, storage_settings.S3_ACCESS_KEY, storage_settings.S3_SECRET_KEY, storage_settings.S3_STORAGE)
    elif storage_settings.FILE_STORAGE_TYPE == "posix":
        if not os.environ.get("CSST_DFS_ROOT", None):
            logging.warning(f"CSST_DFS_ROOT environment variable is not set, using default value from DFS config [{storage_settings.CSST_DFS_ROOT}].")
        return PosixFileStorage(os.environ.get("CSST_DFS_ROOT", storage_settings.CSST_DFS_ROOT))
    else:
        raise ValueError(f"Invalid storage type: {storage_settings.FILE_STORAGE_TYPE}")