Note
We won't use relationships in this example; see SQLAlchemy documentation for those.
First of all, we need to install asyncio sqlalchemy by:
pip3 install -U 'sqlalchemy[asyncio]' aiomysql
First of file, we need to import things that we need in future:
from sqlalchemy.ext.asyncio import AsyncAttrs, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import sqltypes
Now let's define "Declarative Base" class and our table:
class Model(AsyncAttrs, DeclarativeBase):
# You don't need this if you aren't using MySQL
__table_args__ = {"mysql_engine": "InnoDB"}
def __repr__(self) -> str:
return "{}({})".format(
self.__class__.__qualname__,
", ".join(f"{k}={getattr(self, k, None)}" for k in self.__table__.columns.keys()),
)
class User(Model):
__tablename__ = "users"
id: Mapped[int] = mapped_column(sqltypes.Integer, primary_key=True, autoincrement=True)
first_name: Mapped[str] = mapped_column(sqltypes.VARCHAR(32), nullable=False)
last_name: Mapped[str] = mapped_column(sqltypes.VARCHAR(32), nullable=True)
amount: Mapped[int] = mapped_column(sqltypes.Integer, nullable=False, default=0)
Our table is ready; wait where's our connection?
_engine = create_async_engine(
"mysql+aiomysql://user:[email protected]:3306/database_name"
)
db = async_sessionmaker(_engine, expire_on_commit=False)
async def initialize_database():
async with _engine.begin() as conn:
await conn.run_sync(Model.metadata.create_all)
async def dispose_database():
await _engine.dispose()
OK, now everything is ready to use. here is an example for using:
from sqlalchemy import sql
import asyncio
async def main():
await initialize_database()
# inserting
async with db.begin() as session:
await session.execute(
sql.insert(User).values(first_name="Ali", last_name="P")
)
await session.execute(
sql.insert(User).values(first_name="aWolverP", last_name=None)
)
# selecting
async with db.begin() as session:
user_1 = await session.execute(
sql.select(User).where(User.id == 1)
)
user_1 = user_1.scalar()
print(user_1)
# User(id=1, first_name="Ali", last_name="P", amount=0)
users = await session.execute(
sql.select(User)
)
users = users.scalars().all()
print(users)
# [User(id=1, first_name="Ali", last_name="P", amount=0), User(id=2, first_name="aWolverP", last_name=None, amount=0)]
count = await session.execute(
sql.select(sql.func.count(User.id))
)
count = count.scalar()
print(count)
# 2
# updating
async with db.begin() as session:
await session.execute(
sql.update(User).where(User.id == 1)
.values(
{
User.first_name: "New Name",
User.amount: User.amount + 5000,
}
)
)
user_1 = await session.execute(
sql.select(User).where(User.id == 1)
)
user_1 = user_1.scalar()
print(user_1)
# User(id=1, first_name="New Name", last_name="P", amount=5000)
# deleting
async with db.begin() as session:
await session.execute(
sql.delete(User).where(User.id == 2)
)
user_2 = await session.execute(
sql.select(User).where(User.id == 2)
)
user_2 = user_2.scalar()
print(user_2)
# None
await dispose_database()
asyncio.run(main())