Source code for shinto.jsonschema

"""Json schema validation functions."""

from __future__ import annotations

import asyncio
import json
import logging
import re
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Literal

import anyio
import jsonschema
from deprecated.sphinx import deprecated
from jsonschema import Draft7Validator, FormatChecker
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT7

if TYPE_CHECKING:  # pragma: no cover
    from jsonschema.exceptions import ValidationError
    from referencing.jsonschema import SchemaRegistry


[docs]@deprecated(version="1.1.0", reason="Use JsonSchemaRegistry.validate_json_against_schemas instead") def validate_json_against_schemas(data: dict | list, schema_filenames: list[str]): """ Validate JSON data against JSON schemas. Args: data (dict | list): The JSON data to validate. schema_filenames (list[str]): A list of schema filenames to validate against. Raises: FileNotFoundError: If the schema file is not found. jsonschema.exceptions.ValidationError: If the data does not validate against the schema. (Any other jsonschema.exceptions exceptions) """ for schema_filename in schema_filenames: schema_filepath = Path(schema_filename).resolve() if not schema_filepath.exists(): msg = f"Schema file not found: {schema_filepath}" raise FileNotFoundError(msg) with Path(schema_filepath).open(encoding="UTF-8") as file: schema = json.load(file) jsonschema.validate(data, schema, format_checker=FormatChecker())
[docs]@deprecated( version="1.1.0", reason="Use JsonSchemaRegistry.validate_json_against_schemas_complete instead" ) def validate_json_against_schemas_complete( data: dict | list, schema_filenames: list[str] ) -> list[ValidationError]: """ Validate JSON data against JSON schemas and return all errors. Args: data (dict | list): The JSON data to validate. schema_filenames (list[str]): A list of schema filenames to validate against. Returns: list[jsonschema.exceptions.ValidationError]: A list of validation errors. Raises: FileNotFoundError: If the schema file is not found. (Any other jsonschema.exceptions exceptions) """ validation_errors = [] for schema_filename in schema_filenames: schema_filepath = Path(schema_filename).resolve() if not schema_filepath.exists(): msg = f"Schema file not found: {schema_filepath}" raise FileNotFoundError(msg) with Path(schema_filepath).open(encoding="UTF-8") as file: schema = json.load(file) validator = Draft7Validator(schema, format_checker=FormatChecker()) validation_errors.extend(list(validator.iter_errors(data))) return validation_errors
async def _validate_json_against_schemas_async_task(data: dict | list, schema_filepath: Path): """Validate JSON data against a schema.""" async with await anyio.open_file(schema_filepath, encoding="UTF-8") as file: schema = json.loads(await file.read()) jsonschema.validate(data, schema, format_checker=FormatChecker())
[docs]@deprecated(version="1.1.0", reason="Use JsonSchemaRegistry.validate_json_against_schemas instead") async def validate_json_against_schemas_async(data: dict | list, schema_filenames: list[str]): """ Validate JSON data against JSON schemas. Args: data (dict | list): The JSON data to validate. schema_filenames (list[str]): A list of schema filenames to validate against. Raises: FileNotFoundError: If the schema file is not found. jsonschema.exceptions.SchemaError: If the schema is invalid. jsonschema.exceptions.ValidationError: If the data does not validate against the schema. """ tasks: list[asyncio.Future] = [] for schema_filename in schema_filenames: schema_filepath = Path(schema_filename).resolve() if not schema_filepath.exists(): msg = f"Schema file not found: {schema_filepath}" raise FileNotFoundError(msg) task = asyncio.create_task(_validate_json_against_schemas_async_task(data, schema_filepath)) tasks.append(task) done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) for task in pending: task.cancel() for task in done: exception = task.exception() if exception: raise exception
[docs]class ValidationErrorGroup: """ A group of validation errors. schema_id: The ID of the schema that the errors belong to. message: A message describing the errors. errors: A list of validation errors. """ schema_id: str message: str errors: list[ValidationError] def __init__(self, schema_id: str, message: str, errors: list[ValidationError]): """Initialize the ValidationErrorGroup class.""" self.schema_id = schema_id self.message = message self.errors = errors
[docs]class JsonSchemaRegistry: """Class for validating JSON against JSON schemas.""" _instance: JsonSchemaRegistry | None = None _lock = threading.Lock() _initialized = False _registry: SchemaRegistry _schema_mappings: dict[str, str] _validator_cache: dict[str, Draft7Validator] def __init__(self) -> None: """Initialize the JsonSchemaValidator class.""" self._registry = Registry() self._schema_mappings = {} self._validator_cache = {} @property def schema_mappings(self) -> dict[str, str]: """Get the schema filepath mappings.""" return self._schema_mappings.copy()
[docs] def register_directory(self, directory: str, pattern: str = "**/*.json"): """Register all schemas in a directory.""" for schema_filepath in Path(directory).resolve().glob(pattern): self.register_schema(str(schema_filepath))
[docs] def register_schema(self, schema: str | dict) -> str: """ Add a schema to the registry. When registering a schema from a dict, you can only reference the schema using the $id not the filepath. When registering a schema from a filepath, and the schema does not have a $id, the schema ID is generated from the filepath. Args: schema (str | dict): JSON schema or filepath to schema. Returns: str: The schema ID. Raises: ValueError: If the schema does not have a $id. For filepaths, KeyError: If a conflicting schema is already registered. """ if isinstance(schema, str): schema_filepath = str(Path(schema).resolve()) with JsonSchemaRegistry._lock, Path(schema_filepath).open(encoding="UTF-8") as file: schema = json.load(file) schema["$id"] = self._clean_schema_id(schema.get("$id", schema_filepath)) schema_id = self._register_schema(schema) self._schema_mappings[schema_filepath] = schema_id return schema_id return self._register_schema(schema)
def _register_schema(self, schema: dict | str) -> str: if not schema.get("$id"): raise ValueError("Schema must have a $id.") schema_id = self._clean_schema_id(schema["$id"]) if self.schema_id_in_registry(schema_id): if self.contents(schema_id) != schema and self._resolve_schema_refs( self.contents(schema_id), True, ) != self._resolve_schema_refs(schema, True): raise KeyError(f"Schema '{schema_id}' already registered with different contents.") else: self._registry = self._registry.with_resource(schema_id, Resource(schema, DRAFT7)) self._validator_cache.clear() return schema_id
[docs] def validate_json_against_schemas( self, data: dict | list, schema_filepaths: list[str] | None = None, schema_ids: list[str] | None = None, ): """ Validate JSON data against JSON schemas. Args: data (dict | list): The JSON data to validate. schema_filepaths (list[str]): A list of schema filepaths to validate against. schema_ids (list[str]): A list of schema IDs to validate against. Raises: jsonschema.exceptions.ValidationError: The first validation error. """ schema_ids = schema_ids or [] if schema_filepaths: schema_ids.extend( [sid for fp in schema_filepaths if (sid := self.get_schema_id(fp)) is not None] ) for schema_id in schema_ids: if not self.schema_id_in_registry(schema_id): raise KeyError(f"Schema '{schema_id}' not found in registry.") validator = self._get_validator(schema_id) validator.validate(data)
[docs] def validate_json_against_schemas_complete( self, data: dict | list, schema_filepaths: list[str] | None = None, schema_ids: list[str] | None = None, ) -> list[ValidationErrorGroup]: """ Validate JSON data against JSON schemas and return all errors. Args: data (dict | list): The JSON data to validate. schema_filepaths (list[str]): A list of schema filepaths to validate against. schema_ids (list[str]): A list of schema IDs to validate against. Returns: list[ValidationErrorGroup]: A list of validation error groups. """ schema_ids = schema_ids or [] if schema_filepaths: schema_ids.extend( [sid for fp in schema_filepaths if (sid := self.get_schema_id(fp)) is not None] ) validation_errors = [] for schema_id in schema_ids: if not self.schema_id_in_registry(schema_id): raise KeyError(f"Schema '{schema_id}' not found in registry.") validator = self._get_validator(schema_id) errors = list(validator.iter_errors(data)) unique_errors = {} for error in errors: path = ".".join(p if isinstance(p, str) else "item" for p in error.path) message = f"{path}: {error.message}" unique_errors.setdefault(message, []) unique_errors[message].append(error) for message, errors in unique_errors.items(): validation_errors.append(ValidationErrorGroup(schema_id, message, errors)) return validation_errors
def _get_validator(self, schema_id: str) -> Draft7Validator: """Get a validator for a schema, creating it if it doesn't exist in the cache.""" if schema_id not in self._validator_cache: schema = self._registry.contents(schema_id) self._validator_cache[schema_id] = Draft7Validator( schema, format_checker=FormatChecker(), registry=self._registry ) return self._validator_cache[schema_id]
[docs] def schema_registered(self, schema_filepath: str) -> bool: """Check if a schema filepath is registered.""" schema_filepath = str(Path(schema_filepath).resolve()) return schema_filepath in self._schema_mappings
[docs] def schema_id_in_registry(self, schema_id: str) -> bool: """Check if a schema ID is in the registry.""" return self._clean_schema_id(schema_id) in self._registry
[docs] def get_schema(self, schema_id: str) -> dict: """Get a schema by its ID.""" return self._registry.contents(self._clean_schema_id(schema_id))
[docs] def contents(self, schema_id: str) -> dict: """Get the contents of a schema.""" return self._registry.contents(self._clean_schema_id(schema_id))
[docs] def next_filepath_for_schema_id(self, schema_id: str) -> str | None: """Get the first filepath for a schema ID.""" schema_id = self._clean_schema_id(schema_id) return next((f for f, m in self._schema_mappings.items() if m == schema_id), None)
[docs] def get_schema_id(self, schema_filepath: str) -> str | None: """Get the schema ID for a given schema filepath.""" schema_filepath = str(Path(schema_filepath).resolve()) return self._schema_mappings.get(schema_filepath)
def _clean_schema_id(self, schema_id: str) -> str: """Clean a schema ID or convert a schema filepath to a schema ID.""" schema_id = re.sub( r"[^a-z0-9_]", "", schema_id.replace("/", "_").replace(".json", "").lower() ) return re.sub(r"_+", "_", schema_id).strip("_")
[docs] def update_schema_refs_to(self, convert_to: Literal["id", "filepath"]): """ Update schema refs. Update all schema refs in schemas in the registry if possible. If the reference cannot be updated, it is left unchanged. Args: convert_to (Literal["id", "filepath"]): What to replace the refs by. """ self._validator_cache.clear() for schema_id in self._registry: schema = self._registry.contents(schema_id) new_schema = self._resolve_schema_refs(schema, convert_to) self._registry = self._registry.with_resource(schema_id, Resource(new_schema, DRAFT7))
def _resolve_schema_refs(self, schema: dict, convert_to: Literal["id", "filepath"]) -> dict: """Update schema refs to schema IDs or filepaths.""" result = {} for key, value in schema.items(): if isinstance(value, dict): result[key] = self._resolve_schema_refs(value, convert_to) elif isinstance(value, list): result[key] = [ self._resolve_schema_refs(item, convert_to) if isinstance(item, dict) else item for item in value ] elif key == "$ref" and not value.startswith("#"): result[key] = self._resolve_ref(value, convert_to) else: result[key] = value return result def _resolve_ref(self, ref: str, convert_to: Literal["id", "filepath"]) -> str: """Resolve a ref to a schema ID or filepath.""" parts = ref.split("#", 1) ref_identifier = parts[0] fragment = f"#{parts[1]}" if len(parts) > 1 else "" schema_id = None filepath = None found_ref = False if self.schema_registered(ref_identifier): found_ref = True schema_id = self.get_schema_id(ref_identifier) filepath = str(Path(ref_identifier).resolve()) elif self.schema_id_in_registry(ref_identifier): found_ref = True schema_id = self._clean_schema_id(ref_identifier) filepath = self.next_filepath_for_schema_id(ref_identifier) if found_ref: if convert_to == "id": return f"{schema_id}{fragment}" if convert_to == "filepath": if filepath: return f"{filepath}{fragment}" logging.warning( "Referenced schema %s is registered from a JSON schema and does not " "have a filepath. Skipping ref update.", ref_identifier, ) else: logging.warning("Referenced schema %s not found in registry.", ref_identifier) return ref