from fastapi import FastAPI, Depends from psycopg_pool import AsyncConnectionPool import uvicorn import os from pydantic import BaseModel class Item(BaseModel): name: str description: str reporter: str priority: int is_stupid: bool app = FastAPI() pool: AsyncConnectionPool @app.on_event('startup') async def on_startup(): global pool pool = AsyncConnectionPool( os.getenv('DB_URL'), min_size=5, max_size=40 ) await pool.open() print('[INFO] Started connection pool') async with pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute('CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, email VARCHAR(64), remail VARCHAR(64), phone VARCHAR(16), password VARCHAR(128))') print('[INFO] Database initialized') await conn.commit() @app.on_event('shutdown') async def on_shutdown(): await pool.close() print('[INFO] Closed connection pool') async def get_conn(): async with pool.connection() as conn: yield conn @app.get('/') def read_root(): return { 'message': 'microservice is running' } @app.post('/item') async def create_item(item: Item, conn = Depends(get_conn)): async with conn.cursor() as cursor: await cursor.execute('INSERT INTO items (name, description, reporter, priority, is_stupid) VALUES (%s, %s, %s, %s, %s) RETURNING *', (item.name, item.description, item.reporter, item.priority, item.is_stupid)) i = await cursor.fetchone() return {'id': i[0], 'description' : i[1], 'reporter': i[2], 'priority': i[3], 'is_stupid': i[4]} @app.get('/item/{item_id}') async def read_item(item_id, conn = Depends(get_conn)): async with conn.cursor() as cursor: await cursor.execute('SELECT * FROM items WHERE id = %s', (item_id,)) i = await cursor.fetchone() return {'id': i[0], 'name' : i[1], 'description': i[2], 'reporter': i[3], 'priority': i[4], 'is_stupid': i[5]} @app.get('/items') async def read_item(conn = Depends(get_conn)): async with conn.cursor() as cursor: await cursor.execute('SELECT * FROM items') items = await cursor.fetchall() return [{'id': i[0], 'description' : i[1], 'reporter': i[2], 'priority': i[3], 'is_stupid': i[4]} for i in items] @app.get('/datadir') async def get_datadir(conn = Depends(get_conn)): async with conn.cursor() as cursor: await cursor.execute('SHOW data_directory') return {'res': await cursor.fetchall()} if __name__ == '__main__': uvicorn.run( 'main:app', host = '0.0.0.0', port = 1234, reload = True, reload_dirs = ['/app'], server_header = False )