Source code for shinto.mimir.taxonomy

import json
from datetime import datetime
from typing import Union, Optional
from uuid import UUID

from shinto.general import normalize_timestamp
from shinto.pg.connection import Connection

[docs]def get_taxonomy_by_id( connection: Connection, action_by: UUID, taxonomy_id: UUID, timestamp: Optional[Union[datetime, str]] = None ) -> dict: """Get a taxonomy by ID. Accepts timestamp as datetime, ISO 8601 string, or None.""" timestamp = normalize_timestamp(timestamp) result = connection.execute_query( "SELECT to_json(data.get_taxonomy_by_id(%s::uuid, %s::uuid, %s::TIMESTAMPTZ))", (action_by, taxonomy_id, timestamp), ) return result[0][0] if result else {}
[docs]def get_taxonomy_by_name( connection: Connection, action_by: UUID, taxonomy_name: str, timestamp: Optional[Union[datetime, str]] = None ) -> dict: """Get a taxonomy by name. Accepts timestamp as datetime, ISO 8601 string, or None.""" timestamp = normalize_timestamp(timestamp) result = connection.execute_query( "SELECT to_json(data.get_taxonomy_by_name(%s::uuid, %s::text, %s::TIMESTAMPTZ))", (action_by, taxonomy_name, timestamp), ) return result[0][0] if result else {}
[docs]def get_taxonomy_history( connection: Connection, action_by: UUID, taxonomy_id: UUID ) -> list[dict]: """Get the history of a taxonomy.""" result = connection.execute_query( """ SELECT COALESCE(json_agg(row), '[]'::json) FROM data.get_taxonomy_history( %s::uuid, %s::uuid ) AS row """, (action_by, taxonomy_id,) ) return result[0][0] if result else []
[docs]def get_taxonomy_list( connection: Connection, action_by: UUID, timestamp: Optional[Union[datetime, str]] = None ) -> list[dict]: """Get a list of taxonomies. Accepts timestamp as datetime, ISO 8601 string, or None.""" timestamp = normalize_timestamp(timestamp) result = connection.execute_query( """ SELECT COALESCE(json_agg(row), '[]'::json) FROM data.get_taxonomy_list( %s::uuid, %s::TIMESTAMPTZ ) AS row """, (action_by, timestamp,) ) return result[0][0] if result else []
[docs]def create_taxonomy( connection: Connection, action_by: UUID, name: str, data: Optional[dict], action_info: Optional[dict] = None ) -> dict: """Create a taxonomy.""" result = connection.execute_query( "SELECT to_json(data.create_taxonomy(%s::uuid, %s::text, %s::jsonb, %s::jsonb))", ( action_by, name, json.dumps(data) if data else None, json.dumps(action_info) if action_info else None ) ) return result[0][0] if result else {}
[docs]def update_taxonomy( connection: Connection, action_by: UUID, taxonomy_id: UUID, name: Optional[str] = None, data: Optional[dict] = None, action_info: Optional[dict] = None ) -> dict: """Update a taxonomy. Accepts timestamp as datetime, ISO 8601 string, or None.""" result = connection.execute_query( "SELECT to_json(data.update_taxonomy(%s::uuid, %s::uuid, %s::text, %s::jsonb, %s::jsonb))", ( action_by, taxonomy_id, name, json.dumps(data) if data else None, json.dumps(action_info) if action_info else None ) ) return result[0][0] if result else {}
[docs]def delete_taxonomy( connection: Connection, action_by: UUID, taxonomy_id: UUID, action_info: Optional[dict] = None ) -> dict: """Delete a taxonomy. Accepts timestamp as datetime, ISO 8601 string, or None.""" result = connection.execute_query( "SELECT to_json(data.delete_taxonomy(%s::uuid, %s::uuid, %s::jsonb))", ( action_by, taxonomy_id, json.dumps(action_info) if action_info else None ) ) return result[0][0] if result else {}