shinto.pg package
Submodules
shinto.pg.connection module
Connection module, a wrapper around psycopg Connection and Async.
- class shinto.pg.connection.AsyncConnection(pgconn: PGconn, row_factory: AsyncRowFactory[Row] = <function tuple_row>)[source]
Bases:
AsyncConnection
Wrapper for an async connection to the database.
- async execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int [source]
Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data asynchronously.
- Parameters:
query (str) – The command to execute.
params (dict) – The query parameters to format the command.
- Returns:
The number of rows affected.
- Return type:
int
- Raises:
psycopg.Error – If the command execution fails.
Example
>>> await conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1}) 1
- async execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple] [source]
Execute a query or command to the database asynchronously.
- Parameters:
query (str) – The query to execute.
params (dict) – The query parameters to format the query.
- Returns:
The result of the query.
- Return type:
list[tuple]
- Raises:
psycopg.Error – If the query execution fails.
Example
>>> await conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1}) [(1, "name")]
- async write_records(query: str, records: list[tuple]) int [source]
Write data records to the database asynchronously.
- Parameters:
query (str) – The query to execute.
records (list[tuple]) – The records to write.
- Returns:
The number of records written.
- Return type:
int
- Raises:
psycopg.Error – If the query execution fails.
- class shinto.pg.connection.Connection(pgconn: PGconn, row_factory: RowFactory[Row] = <function tuple_row>)[source]
Bases:
Connection
Wrapper for a connection to the database.
- execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int [source]
Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data.
- Parameters:
query (str) – The command to execute.
params (dict) – The query parameters to format the command.
- Returns:
The number of rows affected.
- Return type:
int
- Raises:
psycopg.Error – If the command execution fails.
Example
>>> conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1}) 1
- execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple] [source]
Execute a query or command to the database.
- Parameters:
query (str) – The query to execute.
params (dict) – The query parameters to format the query.
- Returns:
The result of the query.
- Return type:
list[tuple]
- Raises:
psycopg.Error – If the query execution fails.
Example
>>> conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1}) [(1, "name")]
- write_records(query: str, records: list[tuple]) int [source]
Write data records to the database.
- Parameters:
query (str) – The query to execute.
records (list[tuple]) – The records to write.
- Returns:
The number of records written.
- Return type:
int
- Raises:
psycopg.Error – If the query execution fails.
shinto.pg.pool module
Connection pool module, a wrapper around psycopg_pool ConnectionPool and AsyncConnectionPool.
- class shinto.pg.pool.AsyncConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.ACT] = <class 'psycopg.AsyncConnection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], None] | ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], ~typing.Awaitable[None]] | None = None, num_workers: int = 3)[source]
Bases:
AsyncConnectionPool
AsyncConnectionPool class.
Example
>>> pool = shinto.pg.AsyncConnectionPool( ... min_size=1, ... max_size=10, ... kwargs={ ... "host": "localhost", ... "port": 6432, ... "dbname": "mydb", ... "user": "myuser", ... "password": "mypass", ... }, ... ) >>> async with pool.connection() as conn: ... await conn.execute_query("SELECT * FROM mytable")
- connection(timeout: float | None = None) AsyncGenerator[AsyncConnection, None, None] [source]
Context manager to obtain an async connection from the pool.
Yields a custom AsyncConnection object that extends psycopg.AsyncConnection.
- Parameters:
timeout (float | None) – The maximum time to wait for a connection.
- Yields:
AsyncConnection – A connection to the database.
- class shinto.pg.pool.ConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.CT] = <class 'psycopg.Connection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool.ConnectionPool[~typing.Any]], None] | None = None, num_workers: int = 3)[source]
Bases:
ConnectionPool
ConnectionPool class.
Example
>>> pool = shinto.pg.ConnectionPool( ... min_size=1, ... max_size=10, ... kwargs={ ... "host": "localhost", ... "port": 6432, ... "database": "mydb", ... "user": "myuser", ... "password": "mypass", ... }, ... ) >>> with pool.connection() as conn: ... conn.execute_query("SELECT * FROM mytable")
- connection(timeout: float | None = None) Generator[Connection, None, None] [source]
Context manager to obtain a connection from the pool.
Yields a custom Connection object that extends psycopg.Connection.
- Parameters:
timeout (float | None) – The maximum time to wait for a connection.
- Yields:
Connection – A connection to the database.
shinto.pg.utils module
Utility functions for the shinto.pg package.
- exception shinto.pg.utils.EmptyQueryResultError[source]
Bases:
Exception
Error raised when the query result is empty.
- exception shinto.pg.utils.InvalidJsonError[source]
Bases:
Exception
Error raised when the json object is invalid.
- exception shinto.pg.utils.MultipleObjectsReturnedError[source]
Bases:
Exception
Error raised when multiple objects are returned from a query.
- shinto.pg.utils.parse_json_from_query_result(query_result: list[tuple]) dict | list [source]
Get json from the query result.
- Parameters:
query_result (list[tuple]) – The query result to parse.
- Returns:
The parsed json object.
- Return type:
(dict | list)
- Raises:
EmptyQueryResultError – If the query result is empty.
MultipleObjectsReturnedError – If the query result contains multiple objects.
JsonParseError – If the query result is not a valid dict or list.
Module contents
PostgreSQL database adapter for Shinto – A wrapper around psycopg.
- class shinto.pg.AsyncConnection(pgconn: PGconn, row_factory: AsyncRowFactory[Row] = <function tuple_row>)[source]
Bases:
AsyncConnection
Wrapper for an async connection to the database.
- async execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int [source]
Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data asynchronously.
- Parameters:
query (str) – The command to execute.
params (dict) – The query parameters to format the command.
- Returns:
The number of rows affected.
- Return type:
int
- Raises:
psycopg.Error – If the command execution fails.
Example
>>> await conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1}) 1
- async execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple] [source]
Execute a query or command to the database asynchronously.
- Parameters:
query (str) – The query to execute.
params (dict) – The query parameters to format the query.
- Returns:
The result of the query.
- Return type:
list[tuple]
- Raises:
psycopg.Error – If the query execution fails.
Example
>>> await conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1}) [(1, "name")]
- async write_records(query: str, records: list[tuple]) int [source]
Write data records to the database asynchronously.
- Parameters:
query (str) – The query to execute.
records (list[tuple]) – The records to write.
- Returns:
The number of records written.
- Return type:
int
- Raises:
psycopg.Error – If the query execution fails.
- class shinto.pg.AsyncConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.ACT] = <class 'psycopg.AsyncConnection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.ACT], ~typing.Awaitable[None]] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], None] | ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], ~typing.Awaitable[None]] | None = None, num_workers: int = 3)[source]
Bases:
AsyncConnectionPool
AsyncConnectionPool class.
Example
>>> pool = shinto.pg.AsyncConnectionPool( ... min_size=1, ... max_size=10, ... kwargs={ ... "host": "localhost", ... "port": 6432, ... "dbname": "mydb", ... "user": "myuser", ... "password": "mypass", ... }, ... ) >>> async with pool.connection() as conn: ... await conn.execute_query("SELECT * FROM mytable")
- connection(timeout: float | None = None) AsyncGenerator[AsyncConnection, None, None] [source]
Context manager to obtain an async connection from the pool.
Yields a custom AsyncConnection object that extends psycopg.AsyncConnection.
- Parameters:
timeout (float | None) – The maximum time to wait for a connection.
- Yields:
AsyncConnection – A connection to the database.
- class shinto.pg.Connection(pgconn: PGconn, row_factory: RowFactory[Row] = <function tuple_row>)[source]
Bases:
Connection
Wrapper for a connection to the database.
- execute_command(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) int [source]
Execute a command (INSERT, UPDATE, DELETE) that doesn’t return data.
- Parameters:
query (str) – The command to execute.
params (dict) – The query parameters to format the command.
- Returns:
The number of rows affected.
- Return type:
int
- Raises:
psycopg.Error – If the command execution fails.
Example
>>> conn.execute_command("DELETE FROM table WHERE id = %(id)s", {"id": 1}) 1
- execute_query(query: str, params: None | dict[slice(<class 'str'>, typing.Any, None)] = None) list[tuple] [source]
Execute a query or command to the database.
- Parameters:
query (str) – The query to execute.
params (dict) – The query parameters to format the query.
- Returns:
The result of the query.
- Return type:
list[tuple]
- Raises:
psycopg.Error – If the query execution fails.
Example
>>> conn.execute_query("SELECT * FROM table WHERE id = %(id)s", {"id": 1}) [(1, "name")]
- write_records(query: str, records: list[tuple]) int [source]
Write data records to the database.
- Parameters:
query (str) – The query to execute.
records (list[tuple]) – The records to write.
- Returns:
The number of records written.
- Return type:
int
- Raises:
psycopg.Error – If the query execution fails.
- class shinto.pg.ConnectionPool(conninfo: str = '', *, connection_class: ~typing.Type[~psycopg_pool.abc.CT] = <class 'psycopg.Connection'>, kwargs: ~typing.Dict[str, ~typing.Any] | None = None, min_size: int = 4, max_size: int | None = None, open: bool | None = None, configure: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, check: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, reset: ~typing.Callable[[~psycopg_pool.abc.CT], None] | None = None, name: str | None = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Callable[[~psycopg_pool.pool.ConnectionPool[~typing.Any]], None] | None = None, num_workers: int = 3)[source]
Bases:
ConnectionPool
ConnectionPool class.
Example
>>> pool = shinto.pg.ConnectionPool( ... min_size=1, ... max_size=10, ... kwargs={ ... "host": "localhost", ... "port": 6432, ... "database": "mydb", ... "user": "myuser", ... "password": "mypass", ... }, ... ) >>> with pool.connection() as conn: ... conn.execute_query("SELECT * FROM mytable")
- connection(timeout: float | None = None) Generator[Connection, None, None] [source]
Context manager to obtain a connection from the pool.
Yields a custom Connection object that extends psycopg.Connection.
- Parameters:
timeout (float | None) – The maximum time to wait for a connection.
- Yields:
Connection – A connection to the database.
- exception shinto.pg.EmptyQueryResultError[source]
Bases:
Exception
Error raised when the query result is empty.
- exception shinto.pg.InvalidJsonError[source]
Bases:
Exception
Error raised when the json object is invalid.
- exception shinto.pg.MultipleObjectsReturnedError[source]
Bases:
Exception
Error raised when multiple objects are returned from a query.
- shinto.pg.parse_json_from_query_result(query_result: list[tuple]) dict | list [source]
Get json from the query result.
- Parameters:
query_result (list[tuple]) – The query result to parse.
- Returns:
The parsed json object.
- Return type:
(dict | list)
- Raises:
EmptyQueryResultError – If the query result is empty.
MultipleObjectsReturnedError – If the query result contains multiple objects.
JsonParseError – If the query result is not a valid dict or list.