import requests import os from typing import IO, Optional from . import Result def auth_header() -> dict: token = os.getenv("CSST_DFS_TOKEN", None) if not token: raise ValueError("env CSST_DFS_TOKEN is not set.") return { "Authorization": f"Bearer {token}" } def get_request_url(endpoint: str) -> str: base_url = os.getenv("CSST_DFS_GATEWAY", None) if not base_url: raise ValueError("env CSST_DFS_GATEWAY is not set.") if not endpoint.startswith("http://"): base_url = "http://" + base_url return f"{base_url}{endpoint}" def request_error_handler_decorator(func): def wrapper(*args, **kwargs): try: result = func(*args, **kwargs) if 'content-type' in result.headers and result.headers['content-type'] == "application/octet-stream": return result return Result.from_response(result) except requests.exceptions.ConnectionError as e: return Result.error(code=502, message= f"Bad Gateway: {e}") except Exception as e: return Result.error(message = str(e)) return wrapper @request_error_handler_decorator def get(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response: return requests.get( url = get_request_url(endpoint), headers = auth_header(), timeout = timeout ) @request_error_handler_decorator def post(endpoint: str, data: dict, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response: return requests.post( url = get_request_url(endpoint), headers = auth_header(), json = data, timeout = timeout ) @request_error_handler_decorator def put(endpoint: str, data: Optional[dict] = None, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response: return requests.put( url = get_request_url(endpoint), headers = auth_header(), json = data, timeout = timeout ) @request_error_handler_decorator def delete(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response: return requests.delete( url = get_request_url(endpoint), headers = auth_header(), timeout = timeout ) def post_file(endpoint: str, file_path: str, data: dict) -> requests.Response: with open(file_path, "rb") as bytesio: return post_bytesio(endpoint, bytesio, data) @request_error_handler_decorator def post_bytesio(endpoint: str, file_bytes: IO, data: dict, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response: return requests.post( url = get_request_url(endpoint), headers = auth_header(), files = { "file": file_bytes }, data = data, timeout = timeout ) def download_file(endpoint: str, timeout = os.getenv("CSST_DFS_REQUEST_TIMEOUT", 300)) -> requests.Response: """ 下载指定endpoint对应的文件。 Args: endpoint (str): 文件对应的endpoint。 Returns: requests.Response: 发送HTTP GET请求后返回的响应对象。 """ return requests.get( url = get_request_url(endpoint), headers = auth_header(), timeout = timeout )