shinto package
Subpackages
Submodules
shinto.config module
Config File handling.
- exception shinto.config.ConfigError[source]
Bases:
ExceptionGets raised when an unsupported config file type is found.
- class shinto.config.ConfigType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
EnumConfig file types.
- INI = 'INI'
- JSON = 'JSON'
- YAML = 'YAML'
- shinto.config.load_config_file(file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False) dict[str, Any][source]
Load config from file.
- Parameters:
file_path (str) – The path to the config file.
defaults (dict) – If provided, the default values to merge with the config file.
substitute_env_vars (bool) – If True, substitute environment variables in config values.
raise_on_missing_env (bool) – If True, raise error for missing env vars without defaults.
- Returns:
The config data.
- Return type:
dict
- shinto.config.output_config(configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None) str[source]
Dump config dict to a output_config_type (yaml, json, ini).
- Parameters:
configdata (dict) – The config data to dump.
output_config_type (ConfigType) – The output config type.
sensitive_keys (list) – The sensitive keys to mask. Defaults to [“password”, “pass”, “passwd”].
- Returns:
The dumped config data.
- Return type:
str
shinto.jsonschema module
Json schema validation functions.
- class shinto.jsonschema.JsonSchemaRegistry[source]
Bases:
objectClass for validating JSON against JSON schemas.
- check_unresolvable_refs() list[dict[str, str]][source]
Check all schemas in the registry for unresolvable references.
- Raises:
referencing.exceptions.Unresolvable – If a referenced schema is not found.
- get_schema_id(schema_filepath: str) str | None[source]
Get the schema ID for a given schema filepath.
- next_filepath_for_schema_id(schema_id: str) str | None[source]
Get the first filepath for a schema ID.
- register_directory(directory: str, pattern: str = '**/*.json')[source]
Register all schemas in a directory.
- register_schema(schema: str | dict) str[source]
Add a schema to the registry.
When registering a schema from a dict, you can only reference the schema using the $id not the filepath. When registering a schema from a filepath, and the schema does not have a $id, the schema ID is generated from the filepath.
- Parameters:
schema (str | dict) – JSON schema or filepath to schema.
- Returns:
The schema ID.
- Return type:
str
- Raises:
ValueError – If the schema does not have a $id. For filepaths,
KeyError – If a conflicting schema is already registered.
- property schema_mappings: dict[str, str]
Get the schema filepath mappings.
- update_schema_refs_to(convert_to: Literal['id', 'filepath'])[source]
Update schema refs.
Update all schema refs in schemas in the registry if possible. If the reference cannot be updated, it is left unchanged.
- Parameters:
convert_to (Literal["id", "filepath"]) – What to replace the refs by.
- validate_json_against_schemas(data: dict | list, schema_filepaths: list[str] | None = None, schema_ids: list[str] | None = None)[source]
Validate JSON data against JSON schemas.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filepaths (list[str]) – A list of schema filepaths to validate against.
schema_ids (list[str]) – A list of schema IDs to validate against.
- Raises:
jsonschema.exceptions.ValidationError – The first validation error.
- validate_json_against_schemas_complete(data: dict | list, schema_filepaths: list[str] | None = None, schema_ids: list[str] | None = None) list[shinto.jsonschema.ValidationErrorGroup][source]
Validate JSON data against JSON schemas and return all errors.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filepaths (list[str]) – A list of schema filepaths to validate against.
schema_ids (list[str]) – A list of schema IDs to validate against.
- Returns:
A list of validation error groups.
- Return type:
list[ValidationErrorGroup]
- class shinto.jsonschema.ValidationErrorGroup(schema_id: str, message: str, errors: list[ValidationError])[source]
Bases:
objectA group of validation errors.
schema_id: The ID of the schema that the errors belong to. message: A message describing the errors. errors: A list of validation errors.
- errors: list[ValidationError]
- message: str
- schema_id: str
- shinto.jsonschema.validate_json_against_schemas(data: dict | list, schema_filenames: list[str])[source]
Validate JSON data against JSON schemas.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filenames (list[str]) – A list of schema filenames to validate against.
- Raises:
FileNotFoundError – If the schema file is not found.
jsonschema.exceptions.ValidationError – If the data does not validate against the schema.
(Any other jsonschema.exceptions exceptions) –
Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead
- async shinto.jsonschema.validate_json_against_schemas_async(data: dict | list, schema_filenames: list[str])[source]
Validate JSON data against JSON schemas.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filenames (list[str]) – A list of schema filenames to validate against.
- Raises:
FileNotFoundError – If the schema file is not found.
jsonschema.exceptions.SchemaError – If the schema is invalid.
jsonschema.exceptions.ValidationError – If the data does not validate against the schema.
Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead
- shinto.jsonschema.validate_json_against_schemas_complete(data: dict | list, schema_filenames: list[str]) list[ValidationError][source]
Validate JSON data against JSON schemas and return all errors.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filenames (list[str]) – A list of schema filenames to validate against.
- Returns:
A list of validation errors.
- Return type:
list[jsonschema.exceptions.ValidationError]
- Raises:
FileNotFoundError – If the schema file is not found.
(Any other jsonschema.exceptions exceptions) –
Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas_complete instead
shinto.logging module
Logging setup.
- shinto.logging.setup_logging(application_name: str | None = None, loglevel: str | int = 30, log_to_stdout: bool = True, log_to_file: bool = True, log_to_journald: bool = False, log_filename: str | None = None, setup_uvicorn_logging: bool = False)[source]
Set up logging for the application.
If log_to_file is True, log_filename must be provided in order to log to a file.
- Parameters:
application_name (str | None) – The “name” to display in the logs. Defaults to sys.argv[0].
loglevel (str | int) – The log level to use. Defaults to logging.WARNING.
log_to_stdout (bool) – Whether to log to stdout. Defaults to True.
log_to_file (bool) – Whether to log to a file. Defaults to True.
log_to_journald (bool) – Whether to log to journald. Defaults to False.
log_filename (str | None) – The filename to log to. Defaults to None.
setup_uvicorn_logging (bool) – Whether to setup uvicorn logging. Defaults to False.
Example
>>> setup_logging(application_name="myapp", loglevel=logging.INFO)
shinto.metrics module
metrics module.
- class shinto.metrics.PersistantMetrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json'))[source]
Bases:
objectPersistant metrics class.
- shinto.metrics.inc_persistant_counter(application_name: str, metric: str) int[source]
Push a persistant counter to the Prometheus Pushgateway.
- Parameters:
application_name (str) – The name of the application.
metric (str) – The name of the metric.
- shinto.metrics.init_persistant_metrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json')) PersistantMetrics[source]
Initialize the persistant metrics.
- shinto.metrics.push_metric(application_name: str, metric: str, value: int, prometheus_collecter_path: str = PosixPath('/var/lib/node_exporter/textfile_collector'))[source]
Push a metric to the Prometheus Pushgateway.
- Parameters:
application_name (str) – The name of the application.
metric (str) – The name of the metric.
value (int) – The value of the metric.
prometheus_collecter_path (str) – The path to where the Prometheus textcollector will read prom files.
shinto.rabbitmq module
RabbitMQ handler module.
- exception shinto.rabbitmq.QueueError[source]
Bases:
ExceptionException while queueing the message.
- class shinto.rabbitmq.QueueHandler(host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None)[source]
Bases:
objectRabbitMQ handler. Will wait for messages or publish messages.
- consume(callback: Callable[[Channel, Method, BasicProperties, bytes], None])[source]
Consume messages from the queue.
Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function:
def callback(channel, method, properties, body):
- !Callback function MUST ACK or NACK the message after processing
use: ‘channel.basic_ack(delivery_tag=method.delivery_tag)’ to ACK message
shinto.retry_wrapper module
A wrapper to retry a method if it fails.
- shinto.retry_wrapper.retry(max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Callable[[...], Callable[[...], Any] | Awaitable[Any]][source]
Retry a method if it fails.
Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.
- Parameters:
max_tries – The maximum number of attempts. Default: None (infinite).
delay – The delay between retries (in seconds). Default: 1.
exceptions – The exception or a tuple of exceptions to catch. Default: Exception.
backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).
delay_increment – Value to add to the delay between retries. Default: 0.
max_delay – The maximum delay between retries. Default: None (no maximum).
- Returns:
The decorated method.
- Raises:
ValueError – If invalid arguments are provided.
RetryError – If the maximum number of retries is reached.
Example
>>> @retry() ... async def my_method(): ... return "Hello, World!" ... result = await my_method() ... result "Hello, World!"
- shinto.retry_wrapper.retry_call(f: ~typing.Callable[[...], ~typing.Any], fargs: tuple[typing.Any, ...] | None = None, fkwargs: dict[str, typing.Any] | None = None, max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Coroutine[Any, Any, Any] | Any[source]
Retry a method if it fails.
Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.
- Parameters:
f – The method to retry.
fargs – The arguments to pass to the method.
fkwargs – The keyword arguments to pass to the method.
max_tries – The maximum number of attempts. Default: None (infinite).
delay – The delay between retries (in seconds). Default: 1.
exceptions – The exception or a tuple of exceptions to catch. Default: Exception.
backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).
delay_increment – Value to add to the delay between retries. Default: 0.
max_delay – The maximum delay between retries. Default: None (no maximum).
- Returns:
The result of the method.
- Raises:
ValueError – If invalid arguments are provided.
RetryError – If the maximum number of retries is reached.
Example
>>> async def my_method(): ... return "Hello, World!" ... result = await retry_call(my_method) ... result "Hello, World!"
shinto.taxonomy module
Module for handling taxonomies.
- class shinto.taxonomy.Taxonomy(taxonomy_dict: dict, strict: bool = True)[source]
Bases:
objectClass representing a taxonomy.
- fields: list[shinto.taxonomy.TaxonomyField]
- level: Literal['stage_plus'] | None
- validate_data(data: dict)[source]
Validate data against the taxonomy.
- Parameters:
data – Project data dictionary (with potential ‘stages’ array)
- Raises:
TaxonomyComplianceError – If value doesn’t comply
- class shinto.taxonomy.TaxonomyCategoricalValue(value_dict: dict)[source]
Bases:
objectClass representing a value in a taxonomy field.
- description: str | None
- label: str | None
- value: str
- exception shinto.taxonomy.TaxonomyComplianceError[source]
Bases:
ExceptionException raised for data that does not comply with the taxonomy.
- class shinto.taxonomy.TaxonomyField(field_dict: dict, strict: bool = True)[source]
Bases:
objectClass representing a field in a taxonomy.
- description: str | None
- field_id: str
- label: str
- tags: list[str] | None
- type: Literal['number', 'categorical', 'text', 'string', 'multi_categorical', 'date-time', 'date', 'polygon']
- validate(value: Any)[source]
Validate a single field value against its type and constraints.
- Parameters:
value – The value to validate
- Raises:
TaxonomyComplianceError – If value doesn’t comply
- values: list[shinto.taxonomy.TaxonomyCategoricalValue] | None
shinto.transform module
- class shinto.transform.DefaultingDict[source]
Bases:
dictDict that returns empty string for missing keys.
- class shinto.transform.SafeContext[source]
Bases:
dictDict that returns None for missing keys instead of raising KeyError.
- shinto.transform.apply_action(out_row: Dict[str, Any], in_row: Dict[str, Any], action: Dict[str, Any]) None[source]
Apply a single transformation action.
- Actions:
add_field: Add field if it doesn’t exist
set_field: Set field (overwrite if exists)
remove_field: Remove field
rename_field: Rename field
copy_field: Copy field to another name
remap_field: Remap categorical field value using mapping
- shinto.transform.eval_expr(expr: str, ctx: Dict[str, Any]) Any[source]
Evaluate a Python expression using row fields as variables.
Returns None if the expression fails (e.g., missing fields in comparisons).
- shinto.transform.passes_filter(row: Dict[str, Any], flt: Dict[str, Any] | None) bool[source]
Check if row passes filter.
- Filter forms:
{“expr”: “field == ‘value’”} - Python expression
{“all”: [{filter}, {filter}]} - all must pass
{“any”: [{filter}, {filter}]} - at least one must pass
{“not”: {filter}} - negation
{“exists”: {“path”: “field”}} - field exists
{“eq”: [{source}, {source}]} - equality
{“in”: [{source}, {source}]} - membership
- shinto.transform.resolve_source(row: Dict[str, Any], source: Dict[str, Any] | None) Any[source]
Resolve source specification to a value.
- Source forms:
{“const”: 123} - constant value
{“path”: “field_name”} - extract field from row (supports dot notation and glom paths)
{“template”: “Text {field}”} - format template with row fields
{“expr”: “field1 + field2”} - evaluate Python expression
- shinto.transform.transform_data(data: List[Dict[str, Any]], transformation: List[Dict[str, Any]]) List[Dict[str, Any]][source]
Apply transformation pipeline to stage data.
- Parameters:
data – List of stage dictionaries
transformation – Pipeline configuration (list of transformation steps)
- Returns:
Transformed stage data
Module contents
Shinto Library.
- class shinto.QueueHandler(host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None)[source]
Bases:
objectRabbitMQ handler. Will wait for messages or publish messages.
- consume(callback: Callable[[Channel, Method, BasicProperties, bytes], None])[source]
Consume messages from the queue.
Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function:
def callback(channel, method, properties, body):
- !Callback function MUST ACK or NACK the message after processing
use: ‘channel.basic_ack(delivery_tag=method.delivery_tag)’ to ACK message
- class shinto.Taxonomy(taxonomy_dict: dict, strict: bool = True)[source]
Bases:
objectClass representing a taxonomy.
- fields: list[shinto.taxonomy.TaxonomyField]
- level: Literal['stage_plus'] | None
- validate_data(data: dict)[source]
Validate data against the taxonomy.
- Parameters:
data – Project data dictionary (with potential ‘stages’ array)
- Raises:
TaxonomyComplianceError – If value doesn’t comply
- exception shinto.TaxonomyComplianceError[source]
Bases:
ExceptionException raised for data that does not comply with the taxonomy.
- shinto.inc_persistant_counter(application_name: str, metric: str) int[source]
Push a persistant counter to the Prometheus Pushgateway.
- Parameters:
application_name (str) – The name of the application.
metric (str) – The name of the metric.
- shinto.init_persistant_metrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json')) PersistantMetrics[source]
Initialize the persistant metrics.
- shinto.load_config_file(file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False) dict[str, Any][source]
Load config from file.
- Parameters:
file_path (str) – The path to the config file.
defaults (dict) – If provided, the default values to merge with the config file.
substitute_env_vars (bool) – If True, substitute environment variables in config values.
raise_on_missing_env (bool) – If True, raise error for missing env vars without defaults.
- Returns:
The config data.
- Return type:
dict
- shinto.output_config(configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None) str[source]
Dump config dict to a output_config_type (yaml, json, ini).
- Parameters:
configdata (dict) – The config data to dump.
output_config_type (ConfigType) – The output config type.
sensitive_keys (list) – The sensitive keys to mask. Defaults to [“password”, “pass”, “passwd”].
- Returns:
The dumped config data.
- Return type:
str
- shinto.push_metric(application_name: str, metric: str, value: int, prometheus_collecter_path: str = PosixPath('/var/lib/node_exporter/textfile_collector'))[source]
Push a metric to the Prometheus Pushgateway.
- Parameters:
application_name (str) – The name of the application.
metric (str) – The name of the metric.
value (int) – The value of the metric.
prometheus_collecter_path (str) – The path to where the Prometheus textcollector will read prom files.
- shinto.retry(max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Callable[[...], Callable[[...], Any] | Awaitable[Any]][source]
Retry a method if it fails.
Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.
- Parameters:
max_tries – The maximum number of attempts. Default: None (infinite).
delay – The delay between retries (in seconds). Default: 1.
exceptions – The exception or a tuple of exceptions to catch. Default: Exception.
backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).
delay_increment – Value to add to the delay between retries. Default: 0.
max_delay – The maximum delay between retries. Default: None (no maximum).
- Returns:
The decorated method.
- Raises:
ValueError – If invalid arguments are provided.
RetryError – If the maximum number of retries is reached.
Example
>>> @retry() ... async def my_method(): ... return "Hello, World!" ... result = await my_method() ... result "Hello, World!"
- shinto.retry_call(f: ~typing.Callable[[...], ~typing.Any], fargs: tuple[typing.Any, ...] | None = None, fkwargs: dict[str, typing.Any] | None = None, max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Coroutine[Any, Any, Any] | Any[source]
Retry a method if it fails.
Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.
- Parameters:
f – The method to retry.
fargs – The arguments to pass to the method.
fkwargs – The keyword arguments to pass to the method.
max_tries – The maximum number of attempts. Default: None (infinite).
delay – The delay between retries (in seconds). Default: 1.
exceptions – The exception or a tuple of exceptions to catch. Default: Exception.
backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).
delay_increment – Value to add to the delay between retries. Default: 0.
max_delay – The maximum delay between retries. Default: None (no maximum).
- Returns:
The result of the method.
- Raises:
ValueError – If invalid arguments are provided.
RetryError – If the maximum number of retries is reached.
Example
>>> async def my_method(): ... return "Hello, World!" ... result = await retry_call(my_method) ... result "Hello, World!"
- shinto.setup_logging(application_name: str | None = None, loglevel: str | int = 30, log_to_stdout: bool = True, log_to_file: bool = True, log_to_journald: bool = False, log_filename: str | None = None, setup_uvicorn_logging: bool = False)[source]
Set up logging for the application.
If log_to_file is True, log_filename must be provided in order to log to a file.
- Parameters:
application_name (str | None) – The “name” to display in the logs. Defaults to sys.argv[0].
loglevel (str | int) – The log level to use. Defaults to logging.WARNING.
log_to_stdout (bool) – Whether to log to stdout. Defaults to True.
log_to_file (bool) – Whether to log to a file. Defaults to True.
log_to_journald (bool) – Whether to log to journald. Defaults to False.
log_filename (str | None) – The filename to log to. Defaults to None.
setup_uvicorn_logging (bool) – Whether to setup uvicorn logging. Defaults to False.
Example
>>> setup_logging(application_name="myapp", loglevel=logging.INFO)
- shinto.transform_data(data: List[Dict[str, Any]], transformation: List[Dict[str, Any]]) List[Dict[str, Any]][source]
Apply transformation pipeline to stage data.
- Parameters:
data – List of stage dictionaries
transformation – Pipeline configuration (list of transformation steps)
- Returns:
Transformed stage data
- shinto.validate_json_against_schemas(data: dict | list, schema_filenames: list[str])[source]
Validate JSON data against JSON schemas.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filenames (list[str]) – A list of schema filenames to validate against.
- Raises:
FileNotFoundError – If the schema file is not found.
jsonschema.exceptions.ValidationError – If the data does not validate against the schema.
(Any other jsonschema.exceptions exceptions) –
Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead
- async shinto.validate_json_against_schemas_async(data: dict | list, schema_filenames: list[str])[source]
Validate JSON data against JSON schemas.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filenames (list[str]) – A list of schema filenames to validate against.
- Raises:
FileNotFoundError – If the schema file is not found.
jsonschema.exceptions.SchemaError – If the schema is invalid.
jsonschema.exceptions.ValidationError – If the data does not validate against the schema.
Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead
- shinto.validate_json_against_schemas_complete(data: dict | list, schema_filenames: list[str]) list[ValidationError][source]
Validate JSON data against JSON schemas and return all errors.
- Parameters:
data (dict | list) – The JSON data to validate.
schema_filenames (list[str]) – A list of schema filenames to validate against.
- Returns:
A list of validation errors.
- Return type:
list[jsonschema.exceptions.ValidationError]
- Raises:
FileNotFoundError – If the schema file is not found.
(Any other jsonschema.exceptions exceptions) –
Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas_complete instead