Source code for shinto.mimir.project

"""Project management functions for Shinto Mimir."""

import json
from datetime import datetime
from typing import Union, Optional
from uuid import UUID

from shinto.general import normalize_timestamp
from shinto.pg.connection import Connection


[docs]def get_project_by_id( connection: Connection, action_by: UUID, project_id: UUID, timestamp: Optional[Union[datetime, str]] = None ) -> dict: """Get a project by ID. Accepts timestamp as datetime, ISO 8601 string, or None.""" timestamp = normalize_timestamp(timestamp) result = connection.execute_query( "SELECT to_json(data.get_project(%s::uuid, %s::uuid, %s::TIMESTAMPTZ))", (action_by, project_id, timestamp), ) return result[0][0] if result else {}
[docs]def get_project_history( connection: Connection, action_by: UUID, project_id: UUID ) -> list[dict]: """Get the history of a project.""" result = connection.execute_query( """ SELECT COALESCE(json_agg(project), '[]'::json) FROM data.get_project_history(%s::uuid, %s::uuid ) AS project """, (action_by, project_id,) ) return result[0][0] if result else []
[docs]def get_project_list( connection: Connection, action_by: UUID, timestamp: Optional[Union[datetime, str]] = None ) -> list[dict]: """Get a list. Accepts timestamp as datetime, ISO 8601 string, or None.""" timestamp = normalize_timestamp(timestamp) result = connection.execute_query( """ SELECT COALESCE(json_agg(project), '[]'::json) FROM data.get_project_list( %s::uuid, %s::TIMESTAMPTZ ) AS project """, (action_by, timestamp) ) return result[0][0] if result else []
[docs]def create_project( connection: Connection, action_by: UUID, data: Optional[dict], action_info: Optional[dict] = None ) -> dict: """Create a project.""" result = connection.execute_query( "SELECT to_json(data.create_project(%s::uuid, %s::jsonb, null, null, %s::jsonb))", ( action_by, json.dumps(data) if data else None, json.dumps(action_info) if action_info else None), ) return result[0][0] if result else {}
[docs]def force_project_record( connection: Connection, action_by: UUID, action: str, project_id: UUID, timestamp: Union[datetime, str], data: dict, action_info: Optional[dict] = None, taxonomy_id: Optional[UUID] = None, taxonomy_timestamp: Optional[Union[datetime, str]] = None ) -> dict: """Force create a project with a specific ID and timestamp.""" timestamp = normalize_timestamp(timestamp) if taxonomy_id and taxonomy_timestamp: taxonomy_timestamp = normalize_timestamp(taxonomy_timestamp) else: taxonomy_timestamp = None if action not in ['created', 'updated', 'deleted']: raise ValueError("Action must be one of 'created', 'updated', or 'deleted'.") if not action_info: action_info = {"forced": True} else: action_info["forced"] = True # First disable the trigger that prevents creating a project with a specific ID and timestamp connection.execute_command( "ALTER TABLE data.project DISABLE TRIGGER project_insert_trigger" ) # Second, disable the trigger that prevents updating a project with a specific ID and timestamp connection.execute_command( "ALTER TABLE data.project DISABLE TRIGGER project_change_trigger" ) # then insert the project with the specified ID and timestamp result = connection.execute_query( """ INSERT INTO data.project ( "id", "timestamp", "action", "action_by", "action_info", taxonomy_id, taxonomy_timestamp, data) VALUES ( %s::uuid, %s::TIMESTAMPTZ, %s::base.record_action, %s::uuid, %s::json, %s::uuid, %s::TIMESTAMPTZ, %s::jsonb ) ON CONFLICT ("id", "timestamp") DO UPDATE SET action = EXCLUDED.action, action_by = EXCLUDED.action_by, action_info = EXCLUDED.action_info, taxonomy_id = EXCLUDED.taxonomy_id, taxonomy_timestamp = EXCLUDED.taxonomy_timestamp, data = EXCLUDED.data RETURNING to_json(data.project.*) """, ( project_id, timestamp, action, action_by, json.dumps(action_info), taxonomy_id, taxonomy_timestamp, json.dumps(data) if data else None ) ) # Finally, turn the triggers back on connection.execute_command( "ALTER TABLE data.project ENABLE TRIGGER project_change_trigger" ) # Turn the trigger back on connection.execute_command( "ALTER TABLE data.project ENABLE TRIGGER project_insert_trigger" ) return result[0][0] if result else {}
[docs]def update_project( connection: Connection, action_by: UUID, project_id: UUID, data: Optional[dict] = None, action_info: Optional[dict] = None ) -> dict: """Update a project. Accepts timestamp as datetime, ISO 8601 string, or None.""" result = connection.execute_query( "SELECT to_json(data.update_project(%s::uuid, %s::uuid, %s::jsonb, null, null, %s::jsonb))", ( action_by, project_id, json.dumps(data) if data else None, json.dumps(action_info) if action_info else None ), ) return result[0][0] if result else {}
[docs]def delete_project( connection: Connection, action_by: UUID, project_id: UUID, action_info: Optional[dict] = None ) -> dict: """Delete a project. Accepts timestamp as datetime, ISO 8601 string, or None.""" result = connection.execute_query( "SELECT to_json(data.delete_project(%s::uuid, %s::uuid, %s::jsonb))", ( action_by, project_id, json.dumps(action_info) if action_info else None ), ) return result[0][0] if result else {}