An error occurred while loading the file. Please try again.
-
Wei Shoulin authoredebbe07bf
import requests
import os
from typing import IO, Optional
from . import Result
def auth_header() -> dict:
token = os.getenv("CSST_DFS_TOKEN", None)
if not token:
raise ValueError("env CSST_DFS_TOKEN is not set.")
return {
"Authorization": f"Bearer {token}"
}
def get_request_url(endpoint: str) -> str:
base_url = os.getenv("CSST_DFS_GATEWAY", None)
if not base_url:
raise ValueError("env CSST_DFS_GATEWAY is not set.")
if not endpoint.startswith("http://"):
base_url = "http://" + base_url
return f"{base_url}{endpoint}"
def request_error_handler_decorator(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
if 'content-type' in result.headers and result.headers['content-type'] == "application/octet-stream":
return result
return Result.from_response(result)
except requests.exceptions.ConnectionError as e:
return Result.error(code=502, message= f"Bad Gateway: {e}")
except Exception as e:
return Result.error(message = str(e))
return wrapper
@request_error_handler_decorator
def get(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.get(
url = get_request_url(endpoint),
headers = auth_header(),
timeout = timeout
)
@request_error_handler_decorator
def post(endpoint: str, data: dict, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.post(
url = get_request_url(endpoint),
headers = auth_header(),
json = data,
timeout = timeout
)
@request_error_handler_decorator
def put(endpoint: str, data: Optional[dict] = None, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.put(
url = get_request_url(endpoint),
headers = auth_header(),
json = data,
timeout = timeout
)
@request_error_handler_decorator
def delete(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.delete(
url = get_request_url(endpoint),
headers = auth_header(),
timeout = timeout
)
def post_file(endpoint: str, file_path: str, data: dict) -> requests.Response:
with open(file_path, "rb") as bytesio:
return post_bytesio(endpoint, bytesio, data)
@request_error_handler_decorator
def post_bytesio(endpoint: str, file_bytes: IO, data: dict, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
return requests.post(
url = get_request_url(endpoint),
headers = auth_header(),
files = {
"file": file_bytes
},
data = data, timeout = timeout
)
def download_file(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response:
"""
下载指定endpoint对应的文件。
Args:
endpoint (str): 文件对应的endpoint。
Returns:
requests.Response: 发送HTTP GET请求后返回的响应对象。
"""
return requests.get(
url = get_request_url(endpoint),
headers = auth_header(), timeout = timeout
)