Commit d2b7b093 authored by Wei Shoulin's avatar Wei Shoulin
Browse files

refactor(plan): add write_simulate_file

parent c323ea2b
Pipeline #11314 canceled with stages
in 0 seconds
......@@ -3,6 +3,8 @@ from typing import Optional, IO, Tuple, Literal, Union
from .common import request, Result
DateTimeTuple = Tuple[str, str]
WRITE_BATCH_SIZE = 512
def find(mode: Optional[str] = None,
obs_group: Optional[str] = None,
proposal_id: Optional[str] = None,
......@@ -83,8 +85,7 @@ def find_by_opid(opid: str) -> Result:
"""
return request.get(f"/api/plan/{opid}")
def write_file(local_file: Union[IO, str, list]) -> Result:
def load_file(local_file: Union[IO, str, list]):
"""
将本地json文件、json数据流或json数据列表写入DFS中。
......@@ -92,8 +93,7 @@ def write_file(local_file: Union[IO, str, list]) -> Result:
local_file (Union[IO, str, list]): 文件路径、数据流或JSON数据列表。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
any
"""
batch_size = 512
......@@ -108,15 +108,47 @@ def write_file(local_file: Union[IO, str, list]) -> Result:
raise ValueError("Unsupported type for local_file")
if not isinstance(data, list):
raise ValueError("Data must be a list of JSON objects")
for i in range(0, len(data), batch_size):
batch_data = data[i:i + batch_size]
raise ValueError("Data must be a list of JSON objects")
def write_file(local_file: Union[IO, str, list]) -> Result:
"""
将本地json文件、json数据流或json数据列表写入DFS中。
Args:
local_file (Union[IO, str, list]): 文件路径、数据流或JSON数据列表。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
data = load_file(local_file)
for i in range(0, len(data), WRITE_BATCH_SIZE):
batch_data = data[i:i + WRITE_BATCH_SIZE]
response = request.post("/api/plan/file", {"plans": batch_data})
if not response.success:
return response
return Result.ok_data(len(data))
def write_simulate_file(local_file: Union[IO, str, list]) -> Result:
"""
将本地json文件、json数据流或json数据列表写入DFS中。
Args:
local_file (Union[IO, str, list]): 文件路径、数据流或JSON数据列表。
Returns:
Result: 操作的结果对象,包含操作是否成功以及相关的错误信息,成功返回数据对象。
"""
data = load_file(local_file)
#将data中id置为-1
for plan in data:
plan['id'] = -1
for i in range(0, len(data), WRITE_BATCH_SIZE):
batch_data = data[i:i + WRITE_BATCH_SIZE]
response = request.post("/api/plan/file", {"plans": batch_data})
if not response.success:
return response
return Result.ok_data(len(data))
def new(data: dict) -> Result:
"""
新建编排数据
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment