Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from contextlib import contextmanager
from abc import ABCMeta, abstractmethod
from io import BytesIO
import os
import logging
import threading
import boto3
from typing import Generator, Any
from urllib.parse import urlparse
from .config import get_storage_config
class FileStorage(metaclass=ABCMeta):
@abstractmethod
def get_full_file_path(self, file_path: str) -> str:
return NotImplemented
@abstractmethod
def read_file(self, file_path: str) -> Generator[Any, Any, Any]:
return NotImplemented
@abstractmethod
def close(self) -> None:
return NotImplemented
class PosixFileStorage(FileStorage):
def __init__(self, root: str):
self.root = root
if not os.path.isdir(root):
raise ValueError("Root is not a directory")
def get_full_file_path(self, file_path: str) -> str:
if file_path.startswith('/'):
return file_path
return os.path.join(self.root, file_path)
@contextmanager
def read_file(self, file_path: str) -> Generator[BytesIO, Any, Any]:
full_file_path = self.get_full_file_path(file_path)
file_obj = None
try:
file_obj = open(full_file_path, "rb")
file_content = file_obj.read()
yield BytesIO(file_content)
finally:
if file_obj is not None:
file_obj.close()
def close(self) -> None:
pass
class S3FileStorage(FileStorage):
_instance = None
_is_initialized = False
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, s3_endpoint: str, s3_access_key: str, s3_secret_key: str, s3_storage: str):
if not self._is_initialized:
with self._lock:
if not self._is_initialized:
self.s3_storage = s3_storage
parsed_url = urlparse(s3_storage)
bucket_name = parsed_url.netloc
self.storage_prefix = parsed_url.path.lstrip('/')
self.bucket_name = bucket_name
self.s3_endpoint = s3_endpoint
self.s3 = boto3.client('s3', endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key)
self._is_initialized = True
def get_full_file_path(self, file_path: str) -> str:
return f"{self.s3_storage}/{file_path}"
def add_prefix(self, file_path: str) -> str:
if self.storage_prefix == '':
return file_path
return f"{self.storage_prefix}/{file_path}"
@contextmanager
def read_file(self, file_path: str) -> Generator[BytesIO, None, None]:
if file_path.startswith('/'):
with PosixFileStorage("/").read_file(file_path) as content:
yield content
else:
file_obj = self.s3.get_object(Bucket=self.bucket_name, Key=self.add_prefix(file_path))
try:
file_content = file_obj['Body'].read()
yield BytesIO(file_content)
finally:
pass
def close(self) -> None:
self.s3.close()
def get_file_storage() -> FileStorage:
storage_settings = get_storage_config().data
if storage_settings.FILE_STORAGE_TYPE == "s3":
return S3FileStorage(storage_settings.S3_ENDPOINT, storage_settings.S3_ACCESS_KEY, storage_settings.S3_SECRET_KEY, storage_settings.S3_STORAGE)
elif storage_settings.FILE_STORAGE_TYPE == "posix":
if not os.environ.get("CSST_DFS_ROOT", None):
logging.warning(f"CSST_DFS_ROOT environment variable is not set, using default value from DFS config [{storage_settings.CSST_DFS_ROOT}].")
return PosixFileStorage(os.environ.get("CSST_DFS_ROOT", storage_settings.CSST_DFS_ROOT))
else:
raise ValueError(f"Invalid storage type: {storage_settings.FILE_STORAGE_TYPE}")