shinto package

Subpackages

Submodules

shinto.config module

Config File handling.

exception shinto.config.ConfigError[source]

Bases: Exception

Gets raised when an unsupported config file type is found.

class shinto.config.ConfigType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Config file types.

INI = 'INI'
JSON = 'JSON'
YAML = 'YAML'
shinto.config.load_config_file(file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False) dict[str, Any][source]

Load config from file.

Parameters:
  • file_path (str) – The path to the config file.

  • defaults (dict) – If provided, the default values to merge with the config file.

  • substitute_env_vars (bool) – If True, substitute environment variables in config values.

  • raise_on_missing_env (bool) – If True, raise error for missing env vars without defaults.

Returns:

The config data.

Return type:

dict

shinto.config.output_config(configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None) str[source]

Dump config dict to a output_config_type (yaml, json, ini).

Parameters:
  • configdata (dict) – The config data to dump.

  • output_config_type (ConfigType) – The output config type.

  • sensitive_keys (list) – The sensitive keys to mask. Defaults to [“password”, “pass”, “passwd”].

Returns:

The dumped config data.

Return type:

str

shinto.jsonschema module

Json schema validation functions.

class shinto.jsonschema.JsonSchemaRegistry[source]

Bases: object

Class for validating JSON against JSON schemas.

contents(schema_id: str) dict[source]

Get the contents of a schema.

get_schema(schema_id: str) dict[source]

Get a schema by its ID.

get_schema_id(schema_filepath: str) str | None[source]

Get the schema ID for a given schema filepath.

next_filepath_for_schema_id(schema_id: str) str | None[source]

Get the first filepath for a schema ID.

register_directory(directory: str, pattern: str = '**/*.json')[source]

Register all schemas in a directory.

register_schema(schema: str | dict) str[source]

Add a schema to the registry.

When registering a schema from a dict, you can only reference the schema using the $id not the filepath. When registering a schema from a filepath, and the schema does not have a $id, the schema ID is generated from the filepath.

Parameters:

schema (str | dict) – JSON schema or filepath to schema.

Returns:

The schema ID.

Return type:

str

Raises:
  • ValueError – If the schema does not have a $id. For filepaths,

  • KeyError – If a conflicting schema is already registered.

schema_id_in_registry(schema_id: str) bool[source]

Check if a schema ID is in the registry.

property schema_mappings: dict[str, str]

Get the schema filepath mappings.

schema_registered(schema_filepath: str) bool[source]

Check if a schema filepath is registered.

update_schema_refs_to(convert_to: Literal['id', 'filepath'])[source]

Update schema refs.

Update all schema refs in schemas in the registry if possible. If the reference cannot be updated, it is left unchanged.

Parameters:

convert_to (Literal["id", "filepath"]) – What to replace the refs by.

validate_json_against_schemas(data: dict | list, schema_filepaths: list[str] | None = None, schema_ids: list[str] | None = None)[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filepaths (list[str]) – A list of schema filepaths to validate against.

  • schema_ids (list[str]) – A list of schema IDs to validate against.

Raises:

jsonschema.exceptions.ValidationError – The first validation error.

validate_json_against_schemas_complete(data: dict | list, schema_filepaths: list[str] | None = None, schema_ids: list[str] | None = None) list[shinto.jsonschema.ValidationErrorGroup][source]

Validate JSON data against JSON schemas and return all errors.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filepaths (list[str]) – A list of schema filepaths to validate against.

  • schema_ids (list[str]) – A list of schema IDs to validate against.

Returns:

A list of validation error groups.

Return type:

list[ValidationErrorGroup]

class shinto.jsonschema.ValidationErrorGroup(schema_id: str, message: str, errors: list[ValidationError])[source]

Bases: object

A group of validation errors.

schema_id: The ID of the schema that the errors belong to. message: A message describing the errors. errors: A list of validation errors.

errors: list[ValidationError]
message: str
schema_id: str
shinto.jsonschema.validate_json_against_schemas(data: dict | list, schema_filenames: list[str])[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filenames (list[str]) – A list of schema filenames to validate against.

Raises:
  • FileNotFoundError – If the schema file is not found.

  • jsonschema.exceptions.ValidationError – If the data does not validate against the schema.

  • (Any other jsonschema.exceptions exceptions)

Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead

async shinto.jsonschema.validate_json_against_schemas_async(data: dict | list, schema_filenames: list[str])[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filenames (list[str]) – A list of schema filenames to validate against.

Raises:
  • FileNotFoundError – If the schema file is not found.

  • jsonschema.exceptions.SchemaError – If the schema is invalid.

  • jsonschema.exceptions.ValidationError – If the data does not validate against the schema.

Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead

shinto.jsonschema.validate_json_against_schemas_complete(data: dict | list, schema_filenames: list[str]) list[ValidationError][source]

Validate JSON data against JSON schemas and return all errors.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filenames (list[str]) – A list of schema filenames to validate against.

Returns:

A list of validation errors.

Return type:

list[jsonschema.exceptions.ValidationError]

Raises:
  • FileNotFoundError – If the schema file is not found.

  • (Any other jsonschema.exceptions exceptions)

Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas_complete instead

shinto.logging module

Logging setup.

shinto.logging.setup_logging(application_name: str | None = None, loglevel: str | int = 30, log_to_stdout: bool = True, log_to_file: bool = True, log_to_journald: bool = False, log_filename: str | None = None, setup_uvicorn_logging: bool = False)[source]

Set up logging for the application.

If log_to_file is True, log_filename must be provided in order to log to a file.

Parameters:
  • application_name (str | None) – The “name” to display in the logs. Defaults to sys.argv[0].

  • loglevel (str | int) – The log level to use. Defaults to logging.WARNING.

  • log_to_stdout (bool) – Whether to log to stdout. Defaults to True.

  • log_to_file (bool) – Whether to log to a file. Defaults to True.

  • log_to_journald (bool) – Whether to log to journald. Defaults to False.

  • log_filename (str | None) – The filename to log to. Defaults to None.

  • setup_uvicorn_logging (bool) – Whether to setup uvicorn logging. Defaults to False.

Example

>>> setup_logging(application_name="myapp", loglevel=logging.INFO)

shinto.metrics module

metrics module.

class shinto.metrics.PersistantMetrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json'))[source]

Bases: object

Persistant metrics class.

inc_metric(application_name: str, metric: str) int[source]

Increment a metric.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

push_metric(application_name: str, metric: str, value: int = 0)[source]

Push a metric to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

  • value (int) – The value of the metric.

shinto.metrics.inc_persistant_counter(application_name: str, metric: str) int[source]

Push a persistant counter to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

shinto.metrics.init_persistant_metrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json')) PersistantMetrics[source]

Initialize the persistant metrics.

shinto.metrics.push_metric(application_name: str, metric: str, value: int, prometheus_collecter_path: str = PosixPath('/var/lib/node_exporter/textfile_collector'))[source]

Push a metric to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

  • value (int) – The value of the metric.

  • prometheus_collecter_path (str) – The path to where the Prometheus textcollector will read prom files.

shinto.rabbitmq module

RabbitMQ handler module.

exception shinto.rabbitmq.QueueError[source]

Bases: Exception

Exception while queueing the message.

class shinto.rabbitmq.QueueHandler(host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None)[source]

Bases: object

RabbitMQ handler. Will wait for messages or publish messages.

check_for_queue()[source]

Check if queue exists and if not: Declare it.

consume(callback: Callable[[Channel, Method, BasicProperties, bytes], None])[source]

Consume messages from the queue.

Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function:

def callback(channel, method, properties, body):

!Callback function MUST ACK or NACK the message after processing

use: ‘channel.basic_ack(delivery_tag=method.delivery_tag)’ to ACK message

publish(message: str)[source]

Publish message.

shinto.retry_wrapper module

A wrapper to retry a method if it fails.

shinto.retry_wrapper.retry(max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Callable[[...], Callable[[...], Any] | Awaitable[Any]][source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The decorated method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> @retry()
... async def my_method():
...     return "Hello, World!"
... result = await my_method()
... result
"Hello, World!"
shinto.retry_wrapper.retry_call(f: ~typing.Callable[[...], ~typing.Any], fargs: tuple[typing.Any, ...] | None = None, fkwargs: dict[str, typing.Any] | None = None, max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Coroutine[Any, Any, Any] | Any[source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • f – The method to retry.

  • fargs – The arguments to pass to the method.

  • fkwargs – The keyword arguments to pass to the method.

  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The result of the method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> async def my_method():
...     return "Hello, World!"
... result = await retry_call(my_method)
... result
"Hello, World!"

Module contents

Shinto Library.

exception shinto.QueueError[source]

Bases: Exception

Exception while queueing the message.

class shinto.QueueHandler(host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None)[source]

Bases: object

RabbitMQ handler. Will wait for messages or publish messages.

check_for_queue()[source]

Check if queue exists and if not: Declare it.

consume(callback: Callable[[Channel, Method, BasicProperties, bytes], None])[source]

Consume messages from the queue.

Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function:

def callback(channel, method, properties, body):

!Callback function MUST ACK or NACK the message after processing

use: ‘channel.basic_ack(delivery_tag=method.delivery_tag)’ to ACK message

publish(message: str)[source]

Publish message.

shinto.inc_persistant_counter(application_name: str, metric: str) int[source]

Push a persistant counter to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

shinto.init_persistant_metrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json')) PersistantMetrics[source]

Initialize the persistant metrics.

shinto.load_config_file(file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False) dict[str, Any][source]

Load config from file.

Parameters:
  • file_path (str) – The path to the config file.

  • defaults (dict) – If provided, the default values to merge with the config file.

  • substitute_env_vars (bool) – If True, substitute environment variables in config values.

  • raise_on_missing_env (bool) – If True, raise error for missing env vars without defaults.

Returns:

The config data.

Return type:

dict

shinto.output_config(configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None) str[source]

Dump config dict to a output_config_type (yaml, json, ini).

Parameters:
  • configdata (dict) – The config data to dump.

  • output_config_type (ConfigType) – The output config type.

  • sensitive_keys (list) – The sensitive keys to mask. Defaults to [“password”, “pass”, “passwd”].

Returns:

The dumped config data.

Return type:

str

shinto.push_metric(application_name: str, metric: str, value: int, prometheus_collecter_path: str = PosixPath('/var/lib/node_exporter/textfile_collector'))[source]

Push a metric to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

  • value (int) – The value of the metric.

  • prometheus_collecter_path (str) – The path to where the Prometheus textcollector will read prom files.

shinto.retry(max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Callable[[...], Callable[[...], Any] | Awaitable[Any]][source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The decorated method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> @retry()
... async def my_method():
...     return "Hello, World!"
... result = await my_method()
... result
"Hello, World!"
shinto.retry_call(f: ~typing.Callable[[...], ~typing.Any], fargs: tuple[typing.Any, ...] | None = None, fkwargs: dict[str, typing.Any] | None = None, max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Coroutine[Any, Any, Any] | Any[source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • f – The method to retry.

  • fargs – The arguments to pass to the method.

  • fkwargs – The keyword arguments to pass to the method.

  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The result of the method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> async def my_method():
...     return "Hello, World!"
... result = await retry_call(my_method)
... result
"Hello, World!"
shinto.setup_logging(application_name: str | None = None, loglevel: str | int = 30, log_to_stdout: bool = True, log_to_file: bool = True, log_to_journald: bool = False, log_filename: str | None = None, setup_uvicorn_logging: bool = False)[source]

Set up logging for the application.

If log_to_file is True, log_filename must be provided in order to log to a file.

Parameters:
  • application_name (str | None) – The “name” to display in the logs. Defaults to sys.argv[0].

  • loglevel (str | int) – The log level to use. Defaults to logging.WARNING.

  • log_to_stdout (bool) – Whether to log to stdout. Defaults to True.

  • log_to_file (bool) – Whether to log to a file. Defaults to True.

  • log_to_journald (bool) – Whether to log to journald. Defaults to False.

  • log_filename (str | None) – The filename to log to. Defaults to None.

  • setup_uvicorn_logging (bool) – Whether to setup uvicorn logging. Defaults to False.

Example

>>> setup_logging(application_name="myapp", loglevel=logging.INFO)
shinto.validate_json_against_schemas(data: dict | list, schema_filenames: list[str])[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filenames (list[str]) – A list of schema filenames to validate against.

Raises:
  • FileNotFoundError – If the schema file is not found.

  • jsonschema.exceptions.ValidationError – If the data does not validate against the schema.

  • (Any other jsonschema.exceptions exceptions)

Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead

async shinto.validate_json_against_schemas_async(data: dict | list, schema_filenames: list[str])[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filenames (list[str]) – A list of schema filenames to validate against.

Raises:
  • FileNotFoundError – If the schema file is not found.

  • jsonschema.exceptions.SchemaError – If the schema is invalid.

  • jsonschema.exceptions.ValidationError – If the data does not validate against the schema.

Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas instead

shinto.validate_json_against_schemas_complete(data: dict | list, schema_filenames: list[str]) list[ValidationError][source]

Validate JSON data against JSON schemas and return all errors.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_filenames (list[str]) – A list of schema filenames to validate against.

Returns:

A list of validation errors.

Return type:

list[jsonschema.exceptions.ValidationError]

Raises:
  • FileNotFoundError – If the schema file is not found.

  • (Any other jsonschema.exceptions exceptions)

Deprecated since version 1.1.0: Use JsonSchemaRegistry.validate_json_against_schemas_complete instead