Commit cee5dc79 authored by BO ZHANG's avatar BO ZHANG 🏀
Browse files

rename meta methods

parent 944cdeda
Pipeline #11010 passed with stage
...@@ -336,16 +336,18 @@ def get_proc_info(**kwargs): ...@@ -336,16 +336,18 @@ def get_proc_info(**kwargs):
return reformat_header(header, comment="PROCESSING INFORMATION") return reformat_header(header, comment="PROCESSING INFORMATION")
def add_meta_to_table(t: table.Table, meta: dict) -> table.Table: def append_meta_to_table_hdu(hdu: fits.BinTableHDU, meta: dict) -> fits.BinTableHDU:
"""Add meta information to a table. """Append meta information to a table HDU.
Parameters Parameters
---------- ----------
t : Table hdu : fits.BinTableHDU
Table. Table HDU.
meta : dict meta : dict
Meta information. Meta information.
""" """
# extract table from HDU
t = hdu_to_table(hdu)
# determine number of rows # determine number of rows
n_rows = len(t) n_rows = len(t)
# extract subset of meta information # extract subset of meta information
...@@ -357,4 +359,6 @@ def add_meta_to_table(t: table.Table, meta: dict) -> table.Table: ...@@ -357,4 +359,6 @@ def add_meta_to_table(t: table.Table, meta: dict) -> table.Table:
t_meta = table.Table([sub_meta] * n_rows, dtype=("i4", "U36")) t_meta = table.Table([sub_meta] * n_rows, dtype=("i4", "U36"))
# add meta columns to table # add meta columns to table
t.add_columns(t_meta.columns) t.add_columns(t_meta.columns)
return t # convert table to HDU
hdu = table_to_hdu(t)
return hdu
...@@ -15,16 +15,7 @@ import unittest ...@@ -15,16 +15,7 @@ import unittest
from astropy.io import fits from astropy.io import fits
from astropy import table from astropy import table
from csst_common.io import ( from csst_common import io
append_header,
reformat_header,
delete_section,
get_proc_info,
generate_meta,
append_meta,
extract_meta,
add_meta_to_table,
)
class TestFitsHeaderOps(unittest.TestCase): class TestFitsHeaderOps(unittest.TestCase):
...@@ -46,7 +37,7 @@ class TestFitsHeaderOps(unittest.TestCase): ...@@ -46,7 +37,7 @@ class TestFitsHeaderOps(unittest.TestCase):
h2.add_comment("another", before="B") h2.add_comment("another", before="B")
h2.add_comment("=" * 72, before="B") h2.add_comment("=" * 72, before="B")
h_update = append_header(h1, h2, duplicates="update") h_update = io.append_header(h1, h2, duplicates="update")
self.assertEqual( self.assertEqual(
tuple(h_update.keys()), tuple(h_update.keys()),
( (
...@@ -68,7 +59,7 @@ class TestFitsHeaderOps(unittest.TestCase): ...@@ -68,7 +59,7 @@ class TestFitsHeaderOps(unittest.TestCase):
self.assertEqual(h_update["C"], 2) self.assertEqual(h_update["C"], 2)
self.assertEqual(h_update["D"], 2) self.assertEqual(h_update["D"], 2)
h_delete = append_header(h1, h2, duplicates="delete") h_delete = io.append_header(h1, h2, duplicates="delete")
self.assertEqual( self.assertEqual(
tuple(h_delete.keys()), tuple(h_delete.keys()),
( (
...@@ -101,13 +92,13 @@ class TestFitsHeaderOps(unittest.TestCase): ...@@ -101,13 +92,13 @@ class TestFitsHeaderOps(unittest.TestCase):
h.set("SIMPLE", True) h.set("SIMPLE", True)
h.set("NAXIS1", 1) h.set("NAXIS1", 1)
h_rfmt = reformat_header(h, strip=True, comment="WCS info") h_rfmt = io.reformat_header(h, strip=True, comment="WCS info")
self.assertEqual( self.assertEqual(
tuple(h_rfmt.keys()), tuple(h_rfmt.keys()),
("COMMENT", "COMMENT", "COMMENT", "A", "NAXIS1"), ("COMMENT", "COMMENT", "COMMENT", "A", "NAXIS1"),
) )
h_rfmt = reformat_header(h, strip=False, comment="WCS info") h_rfmt = io.reformat_header(h, strip=False, comment="WCS info")
self.assertEqual( self.assertEqual(
tuple(h_rfmt.keys()), tuple(h_rfmt.keys()),
("COMMENT", "COMMENT", "COMMENT", "A", "SIMPLE", "NAXIS1"), ("COMMENT", "COMMENT", "COMMENT", "A", "SIMPLE", "NAXIS1"),
...@@ -127,27 +118,30 @@ class TestFitsHeaderOps(unittest.TestCase): ...@@ -127,27 +118,30 @@ class TestFitsHeaderOps(unittest.TestCase):
h.append(("COMMENT", "=" * 72, ""), bottom=True) h.append(("COMMENT", "=" * 72, ""), bottom=True)
h.append(("COMMENT", "=" * 72, ""), bottom=True) h.append(("COMMENT", "=" * 72, ""), bottom=True)
h_del = delete_section(h, title="WCS") h_del = io.delete_section(h, title="WCS")
self.assertEqual(len(h_del.cards), 7) self.assertEqual(len(h_del.cards), 7)
def test_get_proc_info(self): def test_get_proc_info(self):
proc_info = get_proc_info() proc_info = io.get_proc_info()
self.assertEquals(proc_info["BUILD"], "") self.assertEquals(proc_info["BUILD"], "")
def test_generate_meta(self): def test_generate_meta(self):
h = generate_meta() h = io.generate_meta()
self.assertIsInstance(h, dict) self.assertIsInstance(h, dict)
def test_append_meta(self): def test_append_meta(self):
hdulist = fits.HDUList([fits.PrimaryHDU()]) hdulist = fits.HDUList([fits.PrimaryHDU()])
meta = generate_meta(obs_id="123456") meta = io.generate_meta(obs_id="123456")
hdulist = append_meta(hdulist, meta) hdulist = io.append_meta(hdulist, meta)
self.assertIn("META", set(hdulist[0].header.keys())) self.assertIn("META", set(hdulist[0].header.keys()))
meta = extract_meta(hdulist) meta = io.extract_meta(hdulist)
self.assertEqual(meta["obs_id"], "123456") self.assertEqual(meta["obs_id"], "123456")
def test_add_meta_to_table(self): def test_append_meta_to_table_hdu(self):
t = table.Table([{"a": 1, "b": 2}] * 5) t = table.Table([{"a": 1, "b": 2}] * 5)
meta = generate_meta(obs_id="123456") hdu = io.table_to_hdu(t)
t = add_meta_to_table(t, meta) meta = io.generate_meta(obs_id="123456")
self.assertEqual(set(t.colnames), {"a", "b", "healpix", "data_uuid"}) hdu = io.append_meta_to_table_hdu(hdu, meta)
self.assertEqual(
set(io.hdu_to_table(hdu).colnames), {"a", "b", "healpix", "data_uuid"}
)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment