Real-time Notifications with Python and Postgres

It is 2019 and if you are a Backend developer it is very likely that at some stage of development it was or will be necessary to implement Websockets.

About four years ago in the world of Python implementing asynchronous functions in a backend was somewhat complicated since the options available like Tornado or gevent have a difficult learning curve. Many developers turned to see with good eyes implementing backends with NodeJS to avoid headaches.

After a while Django Channels appeared, it was one of the first ASGI frameworks and thus the implementation of Websockets began to be somewhat easier. Django Channels gave us the opportunity to create sequential functions along with asynchronous functions. Later other frameworks followed like Quart and FastAPI.

Of these three, FastAPI has become the favorite for its flexibility and speed. So for this post I’ll be using FastAPI.

The Starting Lineup

  • Postgres
  • FastAPI
  • AIOPG

Create the database

createuser food_user 
createdb -E utf-8 fast_food_db 
psql fast_food_db 
alter user food_user with encrypted password <password>; 
grant all privileges on database fast_food_db to food_user;

Create a virtual environment and install packages

python3 -m venv my_env 
my_env pip install fastapi SQLAlchemy alembic psycopg2 uvicorn aiopg 
mkdir FastFoodAPI 
cd FastFoodAPI 
mkdir fastfoodapi 
#initialize alembic for db migrations 
my_env alembic init alembic

Edit alembic.ini

In this file we configure the database url

sqlalchemy.url = postgresql://food_user:<password>@localhost/fast_food_db

Edit env.py in the alembic folder

In this file we need to import our models to make them available in memory when migrations are executed with the arg –autogenerate and in the process we are going to s set the pythonpath to the root directory.

#set pythonpath
import sys
sys.path.append(".")
from fastfoodapi.meta import Base 
from fastfoodapi.models import * 
target_metadata = Base.metadata

Create our fastfoodapi / models.py model


from sqlalchemy import ( Column, Index, Integer, Text, orm, MetaData, Table, DateTime, LargeBinary, ForeignKey, Table, Boolean, func, String, BigInteger, Numeric, ) 
from sqlalchemy.orm import relationship 
from .meta import Base 
class Order(Base): __tablename__ = "orders" 
  id = Column(Integer, primary_key=True, index=True) 
  name = Column(Text) 
  quantity = Column(Integer) 
  price = Column(Numeric)

Create a meta file for Alembic fastfoodapi / meta.py

from sqlalchemy import create_engine 
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy.orm import sessionmaker 
from sqlalchemy.schema import MetaData 

NAMING_CONVENTION = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s" } metadata = 

MetaData(naming_convention=NAMING_CONVENTION) Base = 
declarative_base(metadata=metadata)

Execute the migration

alembic revision --autogenerate -m "migration msg" 
alembic upgrade head

Create triggers

We are going to create the trigger with alembic but now we are just going to create a blank migration to add the sql.

alembic revision -m "migration msg"
from sqlalchemy import text 
def upgrade(): 
  connection = op.get_bind() 
  trigger = text( """ CREATE OR REPLACE FUNCTION orders_event() 
  RETURNS TRIGGER AS $$ DECLARE record RECORD; payload JSON; 
  BEGIN IF (TG_OP = 'DELETE') THEN record = OLD; ELSE record = 
  NEW; END IF; payload = json_build_object('table', 
  TG_TABLE_NAME, 'action', TG_OP, 'data', row_to_json(record)); 
  PERFORM pg_notify('orders', payload::text); RETURN NULL; END; 
  $$ LANGUAGE plpgsql; CREATE TRIGGER notify_order_event AFTER 
  INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE 
  PROCEDURE orders_event(); """) connection.execute(trigger) 
def downgrade(): 
  drop_trigger = text(""" drop trigger notify_order_event on 
  orders; """) 
  connection = op.get_bind() connection.execute(drop_trigger)
alembic upgrade head

Until now this would be the structure of our project

Project structure

Create our fastfoodapi / main.py backend

import json 
import asyncio 
import aiopg 
from starlette.endpoints import WebSocketEndpoint 
from starlette.websockets import WebSocket 
from fastapi import Depends, FastAPI, HTTPException 

dsn = 'dbname = fast_food_db user = food_user password = password host = 127.0.0.1' 

app = FastAPI () 

@app.websocket_route ('/ order_events') 
class WebSocketOrders (WebSocketEndpoint):
     encoding = 'json'
     def __init __ (self, scope, receive, send):
         super () .__ init __ (scope, receive, send)
         self.connected = False
         self.loop = asyncio.get_event_loop ()
         self.websocket = {} 

@asyncio.coroutine
    async def listen(self, conn, channel):
        async with conn.cursor() as cur:
            await cur.execute("LISTEN {0}".format(channel))
            while self.connected:
                msg = await conn.notifies.get()
                payload: dict = json.loads(msg.payload)
                if payload.get("action") == "INSERT":
                    insert_data: Order = payload.get("data")
                    await self.websocket.send_json(
                        {"message": "New order", "data": insert_data}
                    )
                elif payload.get("action") == "UPDATE":
                    update_data: Order = payload.get("data")
                    await self.websocket.send_json(
                        {"message": "Order update", "data": update_data}
                    )

     async def db_events (self, data: dict, websocket: WebSocket, channel: str):
         async with aiopg.create_pool (dsn) as pool:
             async with pool.acquire () as conn:
                 await asyncio.gather (self.listen (conn, channel))

     async def on_receive (self, websocket: WebSocket, data: dict):
             channel: str = data.get ('channel') 
             asyncio.ensure_future (self.db_events (data, websocket, channel), loop = self.loop)


     async def on_connect (self, websocket: WebSocket):
             await websocket.accept ()
             self.connected = True
             self.websocket = websocket

     async def on_close (self, websocket):
         self.connected = False
         self.websocket.close ()

With a very insecure backend we start our service with uvicorn

my_env uvicorn fastfoodapi.main:app --reload

The client

Now it’s time to connect through the websocket. We will do it with Javascript.

<!DOCTYPE html>
<html>
<header>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css"> </header>

<body>
    <div class="jumbotron text-center">
        <h1>Real Time Notifications With Python And Postgres</h1> </div>
    <div class="container">
        <div class="row">
            <div id="log" class="col-lg-12"></div>
        </div>
    </div>
</body>
<script src="http://code.jquery.com/jquery.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js"></script>
<script type="text/javascript">
    var connection $(function() {
        log = document.getElementById('log');
        window.WebSocket = window.WebSocket || window.MozWebSocket;
        connection = new WebSocket('ws://localhost:8000/order_events') connection.onopen = function() {
            connection.send(JSON.stringify({
                "method": "subscribe",
                "channel": "orders"
            }))
        };
        connection.onerror = function(error) {
            log.innerHTML = '<li class="them">an error occurred when sending-receiving data</li>' + log.innerHTML;
        };
        connection.onmessage = function(message) {
   try {
         var json = JSON.parse(message.data);
    let d = $("<div />", {
        class: 'data',
        "css": {
            background: 'white',
            padding: '0.5em',
            margin: '10px 0'
        }
    }) let h = $("<h1 />", {
        css: {
            margin: 0
        },
        text: json.message
    }) let s = $("<h3 />", {
        css: {
            margin: 0
        },
        text: json.data.name
    }) let p = $("<p />", {
        text: "Price " + json.data.price + " quantity " + json.data.quantity
    }) d.append(h) d.append(s) d.append(p) d.append($("<hr />")) $("#log").append(d.fadeIn())
} catch (e) {
    console.log('This doesn\'t look like a valid JSON: ', message);
    return;
}
        };
    });
</script>

</html>

Now we need to generate inserts in the database, we will do it with the following orders-generator.py script

from sqlalchemy import create_engine, MetaData 
from sqlalchemy.orm import ( sessionmaker ) 
from time import sleep 
from sqlalchemy.sql import select, insert 
from sqlalchemy.ext.automap import automap_base 
import random 

db_url = "postgresql: // food_user: pac4life @ localhost / fast_food_db" 
engine = create_engine (db_url, echo = True) 
metadata = MetaData (bind = engine) 
SessionLocal = sessionmaker (autocommit = False, autoflush = False, bind = engine) 
SessionLocal.configure (bind = engine) 
Base = automap_base () 

class Order (Base):
  tablename = 'orders' 

Base.prepare (engine, reflect = True) 

db = SessionLocal ()
products = ['French Fries', 'Hamburguer', 'Nachos', 'Soda', 'Milkshake', 'Burrito', 'Hot Dog', 'Salad']
qty = [1,2,3,4,5,6,7,8,9,12]
for num in range (1,200):
    order = Order ()
    order.name = random.choice (products)
    order.price = random.choice (qty) * num
    order.quantity = random.choice (qty)
    db.add (order)
    db.commit ()
    sleep (20)

We will see something like this on our orders.html page

Very easy right?. Under all this magic is Starlette that together with FastAPI give us the opportunity to create high-performance asyncio services.

All the code is in Github . There’s also a Spanish version of this post at Appbits.

In the second part we will see how we can make our service more secure with JWT tokens.

 

webjunkie

 

6 thoughts on “Real-time Notifications with Python and Postgres

Leave a Reply

Your email address will not be published. Required fields are marked *