Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save nochristrequired/0c705b23eff0074c576e0b047e669394 to your computer and use it in GitHub Desktop.
Save nochristrequired/0c705b23eff0074c576e0b047e669394 to your computer and use it in GitHub Desktop.
aiopg playing with aiohttp and gunicorn

aiopg playing with aiohttp and gunicorn

Setup

virtualenv -p /usr/bin/python3.5 env && source env/bin/activate
pip install SQLAlchemy aiohttp aiopg gunicorn

Usage

To (re)create the database and populate it:

python db.py

To run the server using aiohttp's own server:

python server.py

To run the server using gunicorn:

gunicorn -b 0.0.0.0:8000 -k aiohttp.worker.GunicornWebWorker "server:app()"

(You can add the --reload argument to reload the server on file changes.)

import os
import asyncio
import psycopg2
from aiopg.sa import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine as sa_create_engine
import sqlalchemy as sa
metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
sa.Column('id', sa.Integer, sa.Sequence('id_seq'), primary_key=True),
sa.Column('val', sa.String(255)))
DATABASE = {
'drivername': 'postgres',
'host': 'localhost',
'port': '5432',
'username': 'postgres',
'password': os.getenv('PG_PASS', ''),
'database': 'aiohttp_plays_with_aiopg'
}
dsn = str(URL(**DATABASE))
async def populate():
async with create_engine(dsn) as engine:
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val='abc'))
await conn.execute(tbl.insert().values(val='def'))
await conn.execute(tbl.insert().values(val='ghi'))
count = await conn.scalar(tbl.count())
print('populated table with {} items'.format(count))
if __name__ == '__main__':
print('recreating database and populating with initial data')
conn = psycopg2.connect(user=DATABASE['username'], password=DATABASE['password'],
host=DATABASE['host'], port=DATABASE['port'])
conn.autocommit = True
cur = conn.cursor()
cur.execute('DROP DATABASE IF EXISTS {}'.format(DATABASE['database']))
cur.execute('CREATE DATABASE {}'.format(DATABASE['database']))
engine = sa_create_engine(dsn)
metadata.create_all(engine)
engine.dispose()
loop = asyncio.get_event_loop()
loop.run_until_complete(populate())
import asyncio
from aiohttp import web
from aiopg.sa import create_engine
from db import dsn, tbl
async def list_view(request):
engine = request.app['engine']
body = ''
async with engine.acquire() as conn:
async for row in conn.execute(tbl.select()):
body += '<p>{}: {}</p>\n'.format(row.id, row.val)
return web.Response(body=body.encode())
async def finish_controller(app):
print('closing engine')
engine = app['engine']
engine.close()
await engine.wait_closed()
def app(loop=None):
loop = loop or asyncio.get_event_loop()
_app = web.Application(loop=loop)
_app['engine'] = loop.run_until_complete(create_engine(dsn, loop=loop))
_app.register_on_finish(finish_controller)
_app.router.add_route('GET', '/', list_view)
return _app
if __name__ == '__main__':
loop = asyncio.get_event_loop()
the_app = app(loop)
handler = the_app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8000)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(handler.finish_connections(1.0))
loop.run_until_complete(the_app.finish())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment