shinto package

Subpackages

Submodules

shinto.config module

Config File handling.

exception shinto.config.ConfigError[source]

Bases: Exception

Gets raised when an unsupported config file type is found.

class shinto.config.ConfigType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Config file types.

INI = 'INI'
JSON = 'JSON'
YAML = 'YAML'
shinto.config.load_config_file(file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False) dict[str, Any][source]

Load config from file.

Parameters:
  • file_path (str) – The path to the config file.

  • defaults (dict) – If provided, the default values to merge with the config file.

  • substitute_env_vars (bool) – If True, substitute environment variables in config values.

  • raise_on_missing_env (bool) – If True, raise error for missing env vars without defaults.

Returns:

The config data.

Return type:

dict

shinto.config.output_config(configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None) str[source]

Dump config dict to a output_config_type (yaml, json, ini).

Parameters:
  • configdata (dict) – The config data to dump.

  • output_config_type (ConfigType) – The output config type.

  • sensitive_keys (list) – The sensitive keys to mask. Defaults to [“password”, “pass”, “passwd”].

Returns:

The dumped config data.

Return type:

str

shinto.general module

Provides functions to handle configuration settings for the application.

shinto.general.calculate_crc32_for_file(filename: Path, chunk_size: int = 65536) int[source]

Calculate the CRC32 checksum for a file.

shinto.general.compare_json(json1: dict, json2: dict) bool[source]

Compare two JSON objects for equality, ignoring key order.

shinto.general.get_mimetype_for_file(file_path: str) str[source]

Get MIME type from file path using built-in mimetypes module.

shinto.general.normalize_timestamp(timestamp: datetime | str | None) datetime | None[source]

Normalize a timestamp argument to a timezone-aware datetime object. Accepts a datetime, ISO 8601 string, or None. If no timezone is present, assumes UTC. Returns None if input is None. Raises ValueError for invalid input.

shinto.jsonschema module

Json schema validation functions.

class shinto.jsonschema.JsonSchemaRegistry[source]

Bases: object

Class for validating JSON against JSON schemas.

check_unresolvable_refs()[source]

Check all schemas in the registry for unresolvable references.

Raises:

referencing.exceptions.Unresolvable – If a referenced schema is not found.

contents(schema_id: str) dict[source]

Get the contents of a schema.

get_referenced_schema_ids(schema_id: str) list[str][source]

Get all referenced schema IDs in a schema.

If referenced schemas are registered in the registry, this method recursively traverses them to find all child references.

Parameters:

schema_id (str) – The schema ID to analyse.

Returns:

A list of schema IDs.

Return type:

list[str]

get_references(schema_id: str) list[str][source]

Get all $ref URI references in a schema.

If referenced schemas are registered in the registry, this method recursively traverses them to find all child references.

Parameters:

schema_id (str) – The schema ID to analyse.

Returns:

A list of all $ref URIs found directly or transitively.

Return type:

list[str]

get_schema_ids() list[str][source]

Get all schema IDs in the registry.

normalize_schema_id(schema_id: str) str[source]

Clean a schema ID or convert a schema filepath to a schema ID.

The schema ID is lowercased, slashes and “.json” are replaced with underscores, and schema IDs are stripped of character not in [a-z0-9_].

Parameters:

schema_id (str) – The schema ID or filepath.

Returns:

The normalized schema ID.

Return type:

str

register_directory(directory: str, pattern: str = '**/*.json')[source]

Register all schemas in a directory.

register_schema(schema: str | dict) str[source]

Add a schema to the registry.

When registering a schema from a dict, you can only reference the schema using the $id not the filepath. When registering a schema from a filepath, and the schema does not have a $id, the schema ID is generated from the filepath.

Parameters:

schema (str | dict) – JSON schema or filepath to schema.

Returns:

The schema ID.

Return type:

str

Raises:
  • ValueError – If the schema does not have a $id. For filepaths,

  • KeyError – If a conflicting schema is already registered.

schema_id_in_registry(schema_id: str) bool[source]

Check if a schema ID is in the registry.

validate_json_against_schemas(data: dict | list, schema_ids: list[str])[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_ids (list[str]) – A list of schema IDs to validate against.

Raises:

jsonschema.exceptions.ValidationError – The first validation error.

validate_json_against_schemas_complete(data: dict | list, schema_ids: list[str]) list[shinto.jsonschema.ValidationErrorGroup][source]

Validate JSON data against JSON schemas and return all errors.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_ids (list[str]) – A list of schema IDs to validate against.

Returns:

A list of validation error groups.

Return type:

list[ValidationErrorGroup]

class shinto.jsonschema.ValidationErrorGroup(schema_id: str, message: str, errors: list[ValidationError])[source]

Bases: object

A group of validation errors.

schema_id: The ID of the schema that the errors belong to. message: A message describing the errors. errors: A list of validation errors.

errors: list[ValidationError]
message: str
schema_id: str

shinto.logging module

Logging setup.

shinto.logging.setup_logging(application_name: str | None = None, loglevel: str | int = 30, log_to_stdout: bool = True, log_to_file: bool = True, log_to_journald: bool = False, log_filename: str | None = None, setup_uvicorn_logging: bool = False)[source]

Set up logging for the application.

If log_to_file is True, log_filename must be provided in order to log to a file.

Parameters:
  • application_name (str | None) – The “name” to display in the logs. Defaults to sys.argv[0].

  • loglevel (str | int) – The log level to use. Defaults to logging.WARNING.

  • log_to_stdout (bool) – Whether to log to stdout. Defaults to True.

  • log_to_file (bool) – Whether to log to a file. Defaults to True.

  • log_to_journald (bool) – Whether to log to journald. Defaults to False.

  • log_filename (str | None) – The filename to log to. Defaults to None.

  • setup_uvicorn_logging (bool) – Whether to setup uvicorn logging. Defaults to False.

Example

>>> setup_logging(application_name="myapp", loglevel=logging.INFO)

shinto.metrics module

metrics module.

class shinto.metrics.PersistantMetrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json'))[source]

Bases: object

Persistant metrics class.

inc_metric(application_name: str, metric: str) int[source]

Increment a metric.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

push_metric(application_name: str, metric: str, value: int = 0)[source]

Push a metric to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

  • value (int) – The value of the metric.

shinto.metrics.inc_persistant_counter(application_name: str, metric: str) int[source]

Push a persistant counter to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

shinto.metrics.init_persistant_metrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json')) PersistantMetrics[source]

Initialize the persistant metrics.

shinto.metrics.push_metric(application_name: str, metric: str, value: int, prometheus_collecter_path: str = PosixPath('/var/lib/node_exporter/textfile_collector'))[source]

Push a metric to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

  • value (int) – The value of the metric.

  • prometheus_collecter_path (str) – The path to where the Prometheus textcollector will read prom files.

shinto.rabbitmq module

RabbitMQ handler module.

exception shinto.rabbitmq.QueueError[source]

Bases: Exception

Exception while queueing the message.

class shinto.rabbitmq.QueueHandler(host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None)[source]

Bases: object

RabbitMQ handler. Will wait for messages or publish messages.

check_for_queue()[source]

Check if queue exists and if not: Declare it.

consume(callback: Callable[[Channel, Method, BasicProperties, bytes], None])[source]

Consume messages from the queue.

Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function:

def callback(channel, method, properties, body):

!Callback function MUST ACK or NACK the message after processing

use: ‘channel.basic_ack(delivery_tag=method.delivery_tag)’ to ACK message

publish(message: str)[source]

Publish message.

shinto.retry_wrapper module

A wrapper to retry a method if it fails.

shinto.retry_wrapper.retry(max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Callable[[...], Callable[[...], Any] | Awaitable[Any]][source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The decorated method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> @retry()
... async def my_method():
...     return "Hello, World!"
... result = await my_method()
... result
"Hello, World!"
shinto.retry_wrapper.retry_call(f: ~typing.Callable[[...], ~typing.Any], fargs: tuple[typing.Any, ...] | None = None, fkwargs: dict[str, typing.Any] | None = None, max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Coroutine[Any, Any, Any] | Any[source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • f – The method to retry.

  • fargs – The arguments to pass to the method.

  • fkwargs – The keyword arguments to pass to the method.

  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The result of the method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> async def my_method():
...     return "Hello, World!"
... result = await retry_call(my_method)
... result
"Hello, World!"

shinto.taxonomy module

Module for handling taxonomies.

class shinto.taxonomy.Taxonomy(taxonomy_dict: dict, skip_unknown_types: bool = False)[source]

Bases: object

Class representing a taxonomy.

fields: list[shinto.taxonomy.TaxonomyField]
level: Literal['stage_plus'] | None
try_add_field(field_dict: dict)[source]

Try to add a field to the taxonomy, skipping if the type is unrecognized.

validate(data: dict)[source]

Validate data against the taxonomy.

Parameters:

data – Project data dictionary (with potential ‘stages’ array)

Raises:

TaxonomyComplianceError – If value doesn’t comply

class shinto.taxonomy.TaxonomyCategoricalValue(value_dict: dict)[source]

Bases: object

Class representing a value in a taxonomy field.

description: str | None
label: str | None
value: str
exception shinto.taxonomy.TaxonomyComplianceError[source]

Bases: Exception

Exception raised for data that does not comply with the taxonomy.

class shinto.taxonomy.TaxonomyField(field_dict: dict)[source]

Bases: object

Class representing a field in a taxonomy.

description: str | None
field_id: str
label: str
level: str | None
tags: list[str] | None
type: Literal['number', 'categorical', 'text', 'string', 'multi_categorical', 'date-time', 'date', 'polygon', 'object', 'boolean']
validate(value: Any)[source]

Validate a single field value against its type and constraints.

Parameters:

value – The value to validate

Raises:

TaxonomyComplianceError – If value doesn’t comply

values: list[shinto.taxonomy.TaxonomyCategoricalValue] | None

shinto.transform module

class shinto.transform.DefaultingDict[source]

Bases: dict

Dict that returns empty string for missing keys.

class shinto.transform.SafeContext[source]

Bases: dict

Dict that returns None for missing keys instead of raising KeyError.

shinto.transform.apply_action(out_row: Dict[str, Any], in_row: Dict[str, Any], action: Dict[str, Any]) None[source]

Apply a single transformation action.

Actions:
  • add_field: Add field if it doesn’t exist

  • set_field: Set field (overwrite if exists)

  • remove_field: Remove field

  • rename_field: Rename field

  • copy_field: Copy field to another name

  • remap_field: Remap categorical field value using mapping

shinto.transform.cast_value(value: Any, typ: str | None) Any[source]

Cast value to specified type.

shinto.transform.eval_expr(expr: str, ctx: Dict[str, Any]) Any[source]

Evaluate a Python expression using row fields as variables.

Returns None if the expression fails (e.g., missing fields in comparisons).

shinto.transform.passes_filter(row: Dict[str, Any], flt: Dict[str, Any] | None) bool[source]

Check if row passes filter.

Filter forms:
  • {“expr”: “field == ‘value’”} - Python expression

  • {“all”: [{filter}, {filter}]} - all must pass

  • {“any”: [{filter}, {filter}]} - at least one must pass

  • {“not”: {filter}} - negation

  • {“exists”: {“path”: “field”}} - field exists

  • {“eq”: [{source}, {source}]} - equality

  • {“in”: [{source}, {source}]} - membership

shinto.transform.resolve_source(row: Dict[str, Any], source: Dict[str, Any] | None) Any[source]

Resolve source specification to a value.

Source forms:
  • {“const”: 123} - constant value

  • {“path”: “field_name”} - extract field from row (supports dot notation and glom paths)

  • {“template”: “Text {field}”} - format template with row fields

  • {“expr”: “field1 + field2”} - evaluate Python expression

  • {“remap”: {…}} - remap value using mapping dictionary

  • {“coalesce”: [“field1”, “field2”, “field3”]} - return first non-null, non-empty value

shinto.transform.to_bool(v: Any) bool | None[source]

Convert various values to boolean.

shinto.transform.transform_data(data: List[Dict[str, Any]], transformation: List[Dict[str, Any]]) List[Dict[str, Any]][source]

Apply transformation pipeline to stage data.

Parameters:
  • data – List of stage dictionaries

  • transformation – Pipeline configuration (list of transformation steps)

Returns:

Transformed stage data

Module contents

Shinto Library.

class shinto.JsonSchemaRegistry[source]

Bases: object

Class for validating JSON against JSON schemas.

check_unresolvable_refs()[source]

Check all schemas in the registry for unresolvable references.

Raises:

referencing.exceptions.Unresolvable – If a referenced schema is not found.

contents(schema_id: str) dict[source]

Get the contents of a schema.

get_referenced_schema_ids(schema_id: str) list[str][source]

Get all referenced schema IDs in a schema.

If referenced schemas are registered in the registry, this method recursively traverses them to find all child references.

Parameters:

schema_id (str) – The schema ID to analyse.

Returns:

A list of schema IDs.

Return type:

list[str]

get_references(schema_id: str) list[str][source]

Get all $ref URI references in a schema.

If referenced schemas are registered in the registry, this method recursively traverses them to find all child references.

Parameters:

schema_id (str) – The schema ID to analyse.

Returns:

A list of all $ref URIs found directly or transitively.

Return type:

list[str]

get_schema_ids() list[str][source]

Get all schema IDs in the registry.

normalize_schema_id(schema_id: str) str[source]

Clean a schema ID or convert a schema filepath to a schema ID.

The schema ID is lowercased, slashes and “.json” are replaced with underscores, and schema IDs are stripped of character not in [a-z0-9_].

Parameters:

schema_id (str) – The schema ID or filepath.

Returns:

The normalized schema ID.

Return type:

str

register_directory(directory: str, pattern: str = '**/*.json')[source]

Register all schemas in a directory.

register_schema(schema: str | dict) str[source]

Add a schema to the registry.

When registering a schema from a dict, you can only reference the schema using the $id not the filepath. When registering a schema from a filepath, and the schema does not have a $id, the schema ID is generated from the filepath.

Parameters:

schema (str | dict) – JSON schema or filepath to schema.

Returns:

The schema ID.

Return type:

str

Raises:
  • ValueError – If the schema does not have a $id. For filepaths,

  • KeyError – If a conflicting schema is already registered.

schema_id_in_registry(schema_id: str) bool[source]

Check if a schema ID is in the registry.

validate_json_against_schemas(data: dict | list, schema_ids: list[str])[source]

Validate JSON data against JSON schemas.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_ids (list[str]) – A list of schema IDs to validate against.

Raises:

jsonschema.exceptions.ValidationError – The first validation error.

validate_json_against_schemas_complete(data: dict | list, schema_ids: list[str]) list[shinto.jsonschema.ValidationErrorGroup][source]

Validate JSON data against JSON schemas and return all errors.

Parameters:
  • data (dict | list) – The JSON data to validate.

  • schema_ids (list[str]) – A list of schema IDs to validate against.

Returns:

A list of validation error groups.

Return type:

list[ValidationErrorGroup]

exception shinto.QueueError[source]

Bases: Exception

Exception while queueing the message.

class shinto.QueueHandler(host: str | None = None, port: int | str | None = None, username: str | None = None, password: str | None = None, queue: str | None = None, exchange: str | None = None)[source]

Bases: object

RabbitMQ handler. Will wait for messages or publish messages.

check_for_queue()[source]

Check if queue exists and if not: Declare it.

consume(callback: Callable[[Channel, Method, BasicProperties, bytes], None])[source]

Consume messages from the queue.

Wait for messages in the RabbitMQ Queue and use callback when one is received The callback function:

def callback(channel, method, properties, body):

!Callback function MUST ACK or NACK the message after processing

use: ‘channel.basic_ack(delivery_tag=method.delivery_tag)’ to ACK message

publish(message: str)[source]

Publish message.

class shinto.Taxonomy(taxonomy_dict: dict, skip_unknown_types: bool = False)[source]

Bases: object

Class representing a taxonomy.

fields: list[shinto.taxonomy.TaxonomyField]
level: Literal['stage_plus'] | None
try_add_field(field_dict: dict)[source]

Try to add a field to the taxonomy, skipping if the type is unrecognized.

validate(data: dict)[source]

Validate data against the taxonomy.

Parameters:

data – Project data dictionary (with potential ‘stages’ array)

Raises:

TaxonomyComplianceError – If value doesn’t comply

exception shinto.TaxonomyComplianceError[source]

Bases: Exception

Exception raised for data that does not comply with the taxonomy.

class shinto.ValidationErrorGroup(schema_id: str, message: str, errors: list[ValidationError])[source]

Bases: object

A group of validation errors.

schema_id: The ID of the schema that the errors belong to. message: A message describing the errors. errors: A list of validation errors.

errors: list[ValidationError]
message: str
schema_id: str
shinto.calculate_crc32_for_file(filename: Path, chunk_size: int = 65536) int[source]

Calculate the CRC32 checksum for a file.

shinto.get_mimetype_for_file(file_path: str) str[source]

Get MIME type from file path using built-in mimetypes module.

shinto.inc_persistant_counter(application_name: str, metric: str) int[source]

Push a persistant counter to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

shinto.init_persistant_metrics(metric_file: str = PosixPath('/var/lib/shinto/metrics.json')) PersistantMetrics[source]

Initialize the persistant metrics.

shinto.load_config_file(file_path: str, defaults: dict[str, Any] | None = None, substitute_env_vars: bool = True, raise_on_missing_env: bool = False) dict[str, Any][source]

Load config from file.

Parameters:
  • file_path (str) – The path to the config file.

  • defaults (dict) – If provided, the default values to merge with the config file.

  • substitute_env_vars (bool) – If True, substitute environment variables in config values.

  • raise_on_missing_env (bool) – If True, raise error for missing env vars without defaults.

Returns:

The config data.

Return type:

dict

shinto.output_config(configdata: dict, output_config_type: ConfigType = ConfigType.YAML, sensitive_keys: list[str] | None = None) str[source]

Dump config dict to a output_config_type (yaml, json, ini).

Parameters:
  • configdata (dict) – The config data to dump.

  • output_config_type (ConfigType) – The output config type.

  • sensitive_keys (list) – The sensitive keys to mask. Defaults to [“password”, “pass”, “passwd”].

Returns:

The dumped config data.

Return type:

str

shinto.push_metric(application_name: str, metric: str, value: int, prometheus_collecter_path: str = PosixPath('/var/lib/node_exporter/textfile_collector'))[source]

Push a metric to the Prometheus Pushgateway.

Parameters:
  • application_name (str) – The name of the application.

  • metric (str) – The name of the metric.

  • value (int) – The value of the metric.

  • prometheus_collecter_path (str) – The path to where the Prometheus textcollector will read prom files.

shinto.retry(max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Callable[[...], Callable[[...], Any] | Awaitable[Any]][source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The decorated method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> @retry()
... async def my_method():
...     return "Hello, World!"
... result = await my_method()
... result
"Hello, World!"
shinto.retry_call(f: ~typing.Callable[[...], ~typing.Any], fargs: tuple[typing.Any, ...] | None = None, fkwargs: dict[str, typing.Any] | None = None, max_tries: int | None = None, delay: float = 0.0, exceptions: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>, backoff: float = 1.0, delay_increment: float = 0.0, max_delay: float | None = None) Coroutine[Any, Any, Any] | Any[source]

Retry a method if it fails.

Retries a method if it raises an exception specified in the exceptions tuple. The method is attempted up to max_tries times with a delay of delay seconds between retries. The delay between retries is increased by a factor of backoff and incremented by delay_increment up to a maximum delay of max_delay.

Parameters:
  • f – The method to retry.

  • fargs – The arguments to pass to the method.

  • fkwargs – The keyword arguments to pass to the method.

  • max_tries – The maximum number of attempts. Default: None (infinite).

  • delay – The delay between retries (in seconds). Default: 1.

  • exceptions – The exception or a tuple of exceptions to catch. Default: Exception.

  • backoff – Multiplier applied to the delay between retries. Default: 1 (no backoff).

  • delay_increment – Value to add to the delay between retries. Default: 0.

  • max_delay – The maximum delay between retries. Default: None (no maximum).

Returns:

The result of the method.

Raises:
  • ValueError – If invalid arguments are provided.

  • RetryError – If the maximum number of retries is reached.

Example

>>> async def my_method():
...     return "Hello, World!"
... result = await retry_call(my_method)
... result
"Hello, World!"
shinto.setup_logging(application_name: str | None = None, loglevel: str | int = 30, log_to_stdout: bool = True, log_to_file: bool = True, log_to_journald: bool = False, log_filename: str | None = None, setup_uvicorn_logging: bool = False)[source]

Set up logging for the application.

If log_to_file is True, log_filename must be provided in order to log to a file.

Parameters:
  • application_name (str | None) – The “name” to display in the logs. Defaults to sys.argv[0].

  • loglevel (str | int) – The log level to use. Defaults to logging.WARNING.

  • log_to_stdout (bool) – Whether to log to stdout. Defaults to True.

  • log_to_file (bool) – Whether to log to a file. Defaults to True.

  • log_to_journald (bool) – Whether to log to journald. Defaults to False.

  • log_filename (str | None) – The filename to log to. Defaults to None.

  • setup_uvicorn_logging (bool) – Whether to setup uvicorn logging. Defaults to False.

Example

>>> setup_logging(application_name="myapp", loglevel=logging.INFO)
shinto.transform_data(data: List[Dict[str, Any]], transformation: List[Dict[str, Any]]) List[Dict[str, Any]][source]

Apply transformation pipeline to stage data.

Parameters:
  • data – List of stage dictionaries

  • transformation – Pipeline configuration (list of transformation steps)

Returns:

Transformed stage data