shinto.pg package

Submodules

shinto.pg.connection module

Connection module, a wrapper around psycopg Connection and Async.

class shinto.pg.connection.AsyncConnection(pgconn: PGconn, row_factory: AsyncRowFactory[Row] = <function tuple_row>)[source]

Bases: AsyncConnection

Wrapper for an async connection to the database.

async execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int[source]

Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data asynchronously.

Parameters:
  • query (str) – The command to execute.

  • params (dict) – The query parameters to format the command.

Returns:

The number of rows affected.

Return type:

int

Raises:

psycopg.Error – If the command execution fails.

Example

>>> await conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1})
1
async execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple][source]

Execute a query or command to the database asynchronously.

Parameters:
  • query (str) – The query to execute.

  • params (dict) – The query parameters to format the query.

Returns:

The result of the query.

Return type:

list[tuple]

Raises:

psycopg.Error – If the query execution fails.

Example

>>> await conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1})
[(1, "name")]
async write_records(query: str, records: list[tuple]) int[source]

Write data records to the database asynchronously.

Parameters:
  • query (str) – The query to execute.

  • records (list[tuple]) – The records to write.

Returns:

The number of records written.

Return type:

int

Raises:

psycopg.Error – If the query execution fails.

class shinto.pg.connection.Connection(pgconn: PGconn, row_factory: RowFactory[Row] = <function tuple_row>)[source]

Bases: Connection

Wrapper for a connection to the database.

execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int[source]

Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data.

Parameters:
  • query (str) – The command to execute.

  • params (dict) – The query parameters to format the command.

Returns:

The number of rows affected.

Return type:

int

Raises:

psycopg.Error – If the command execution fails.

Example

>>> conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1})
1
execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple][source]

Execute a query or command to the database.

Parameters:
  • query (str) – The query to execute.

  • params (dict) – The query parameters to format the query.

Returns:

The result of the query.

Return type:

list[tuple]

Raises:

psycopg.Error – If the query execution fails.

Example

>>> conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1})
[(1, "name")]
write_records(query: str, records: list[tuple]) int[source]

Write data records to the database.

Parameters:
  • query (str) – The query to execute.

  • records (list[tuple]) – The records to write.

Returns:

The number of records written.

Return type:

int

Raises:

psycopg.Error – If the query execution fails.

shinto.pg.pool module

Connection pool module, a wrapper around psycopg_pool ConnectionPool and AsyncConnectionPool.

class shinto.pg.pool.AsyncConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.ACT] = <class 'psycopg.AsyncConnection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], None] | ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], ~typing.Awaitable[None]] | None = None, num_workers: int = 3)[source]

Bases: AsyncConnectionPool

AsyncConnectionPool class.

Example

>>> pool = shinto.pg.AsyncConnectionPool(
...     min_size=1,
...     max_size=10,
...     kwargs={
...         "host": "localhost",
...         "port": 6432,
...         "dbname": "mydb",
...         "user": "myuser",
...         "password": "mypass",
...     },
... )
>>> async with pool.connection() as conn:
...     await conn.execute_query("SELECT * FROM mytable")
connection(timeout: float | None = None) AsyncGenerator[AsyncConnection, None, None][source]

Context manager to obtain an async connection from the pool.

Yields a custom AsyncConnection object that extends psycopg.AsyncConnection.

Parameters:

timeout (float | None) – The maximum time to wait for a connection.

Yields:

AsyncConnection – A connection to the database.

class shinto.pg.pool.ConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.CT] = <class 'psycopg.Connection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool.ConnectionPool[~typing.Any]], None] | None = None, num_workers: int = 3)[source]

Bases: ConnectionPool

ConnectionPool class.

Example

>>> pool = shinto.pg.ConnectionPool(
...     min_size=1,
...     max_size=10,
...     kwargs={
...         "host": "localhost",
...         "port": 6432,
...         "database": "mydb",
...         "user": "myuser",
...         "password": "mypass",
...     },
... )
>>> with pool.connection() as conn:
...     conn.execute_query("SELECT * FROM mytable")
connection(timeout: float | None = None) Generator[Connection, None, None][source]

Context manager to obtain a connection from the pool.

Yields a custom Connection object that extends psycopg.Connection.

Parameters:

timeout (float | None) – The maximum time to wait for a connection.

Yields:

Connection – A connection to the database.

shinto.pg.utils module

Utility functions for the shinto.pg package.

exception shinto.pg.utils.EmptyQueryResultError[source]

Bases: Exception

Error raised when the query result is empty.

exception shinto.pg.utils.InvalidJsonError[source]

Bases: Exception

Error raised when the json object is invalid.

exception shinto.pg.utils.MultipleObjectsReturnedError[source]

Bases: Exception

Error raised when multiple objects are returned from a query.

shinto.pg.utils.parse_json_from_query_result(query_result: list[tuple]) dict | list[source]

Get json from the query result.

Parameters:

query_result (list[tuple]) – The query result to parse.

Returns:

The parsed json object.

Return type:

(dict | list)

Raises:

Module contents

PostgreSQL database adapter for Shinto – A wrapper around psycopg.

class shinto.pg.AsyncConnection(pgconn: PGconn, row_factory: AsyncRowFactory[Row] = <function tuple_row>)[source]

Bases: AsyncConnection

Wrapper for an async connection to the database.

async execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int[source]

Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data asynchronously.

Parameters:
  • query (str) – The command to execute.

  • params (dict) – The query parameters to format the command.

Returns:

The number of rows affected.

Return type:

int

Raises:

psycopg.Error – If the command execution fails.

Example

>>> await conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1})
1
async execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple][source]

Execute a query or command to the database asynchronously.

Parameters:
  • query (str) – The query to execute.

  • params (dict) – The query parameters to format the query.

Returns:

The result of the query.

Return type:

list[tuple]

Raises:

psycopg.Error – If the query execution fails.

Example

>>> await conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1})
[(1, "name")]
async write_records(query: str, records: list[tuple]) int[source]

Write data records to the database asynchronously.

Parameters:
  • query (str) – The query to execute.

  • records (list[tuple]) – The records to write.

Returns:

The number of records written.

Return type:

int

Raises:

psycopg.Error – If the query execution fails.

class shinto.pg.AsyncConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.ACT] = <class 'psycopg.AsyncConnection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], None] | ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], ~typing.Awaitable[None]] | None = None, num_workers: int = 3)[source]

Bases: AsyncConnectionPool

AsyncConnectionPool class.

Example

>>> pool = shinto.pg.AsyncConnectionPool(
...     min_size=1,
...     max_size=10,
...     kwargs={
...         "host": "localhost",
...         "port": 6432,
...         "dbname": "mydb",
...         "user": "myuser",
...         "password": "mypass",
...     },
... )
>>> async with pool.connection() as conn:
...     await conn.execute_query("SELECT * FROM mytable")
connection(timeout: float | None = None) AsyncGenerator[AsyncConnection, None, None][source]

Context manager to obtain an async connection from the pool.

Yields a custom AsyncConnection object that extends psycopg.AsyncConnection.

Parameters:

timeout (float | None) – The maximum time to wait for a connection.

Yields:

AsyncConnection – A connection to the database.

class shinto.pg.Connection(pgconn: PGconn, row_factory: RowFactory[Row] = <function tuple_row>)[source]

Bases: Connection

Wrapper for a connection to the database.

execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int[source]

Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data.

Parameters:
  • query (str) – The command to execute.

  • params (dict) – The query parameters to format the command.

Returns:

The number of rows affected.

Return type:

int

Raises:

psycopg.Error – If the command execution fails.

Example

>>> conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1})
1
execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple][source]

Execute a query or command to the database.

Parameters:
  • query (str) – The query to execute.

  • params (dict) – The query parameters to format the query.

Returns:

The result of the query.

Return type:

list[tuple]

Raises:

psycopg.Error – If the query execution fails.

Example

>>> conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1})
[(1, "name")]
write_records(query: str, records: list[tuple]) int[source]

Write data records to the database.

Parameters:
  • query (str) – The query to execute.

  • records (list[tuple]) – The records to write.

Returns:

The number of records written.

Return type:

int

Raises:

psycopg.Error – If the query execution fails.

class shinto.pg.ConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.CT] = <class 'psycopg.Connection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool.ConnectionPool[~typing.Any]], None] | None = None, num_workers: int = 3)[source]

Bases: ConnectionPool

ConnectionPool class.

Example

>>> pool = shinto.pg.ConnectionPool(
...     min_size=1,
...     max_size=10,
...     kwargs={
...         "host": "localhost",
...         "port": 6432,
...         "database": "mydb",
...         "user": "myuser",
...         "password": "mypass",
...     },
... )
>>> with pool.connection() as conn:
...     conn.execute_query("SELECT * FROM mytable")
connection(timeout: float | None = None) Generator[Connection, None, None][source]

Context manager to obtain a connection from the pool.

Yields a custom Connection object that extends psycopg.Connection.

Parameters:

timeout (float | None) – The maximum time to wait for a connection.

Yields:

Connection – A connection to the database.

exception shinto.pg.EmptyQueryResultError[source]

Bases: Exception

Error raised when the query result is empty.

exception shinto.pg.InvalidJsonError[source]

Bases: Exception

Error raised when the json object is invalid.

exception shinto.pg.MultipleObjectsReturnedError[source]

Bases: Exception

Error raised when multiple objects are returned from a query.

shinto.pg.parse_json_from_query_result(query_result: list[tuple]) dict | list[source]

Get json from the query result.

Parameters:

query_result (list[tuple]) – The query result to parse.

Returns:

The parsed json object.

Return type:

(dict | list)

Raises: