It is 2019 and if you are a Backend developer it is very likely that at some stage of development it was or will be necessary to implement Websockets.
About four years ago in the world of Python implementing asynchronous functions in a backend was somewhat complicated since the options available like Tornado or gevent have a difficult learning curve. Many developers turned to see with good eyes implementing backends with NodeJS to avoid headaches.
After a while Django Channels appeared, it was one of the first ASGI frameworks and thus the implementation of Websockets began to be somewhat easier. Django Channels gave us the opportunity to create sequential functions along with asynchronous functions. Later other frameworks followed like Quart and FastAPI.
Of these three, FastAPI has become the favorite for its flexibility and speed. So for this post I’ll be using FastAPI.
The Starting Lineup
- Postgres
- FastAPI
- AIOPG
Create the database
createuser food_user
createdb -E utf-8 fast_food_db
psql fast_food_db
alter user food_user with encrypted password <password>;
grant all privileges on database fast_food_db to food_user;
Create a virtual environment and install packages
python3 -m venv my_env
my_env pip install fastapi SQLAlchemy alembic psycopg2 uvicorn aiopg
mkdir FastFoodAPI
cd FastFoodAPI
mkdir fastfoodapi
#initialize alembic for db migrations
my_env alembic init alembic
Edit alembic.ini
In this file we configure the database url
sqlalchemy.url = postgresql://food_user:<password>@localhost/fast_food_db
Edit env.py in the alembic folder
In this file we need to import our models to make them available in memory when migrations are executed with the arg –autogenerate and in the process we are going to s set the pythonpath to the root directory.
#set pythonpath
import sys
sys.path.append(".")
from fastfoodapi.meta import Base
from fastfoodapi.models import *
target_metadata = Base.metadata
Create our fastfoodapi / models.py model
from sqlalchemy import ( Column, Index, Integer, Text, orm, MetaData, Table, DateTime, LargeBinary, ForeignKey, Table, Boolean, func, String, BigInteger, Numeric, )
from sqlalchemy.orm import relationship
from .meta import Base
class Order(Base): __tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
name = Column(Text)
quantity = Column(Integer)
price = Column(Numeric)
Create a meta file for Alembic fastfoodapi / meta.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import MetaData
NAMING_CONVENTION = { "ix": "ix_%(column_0_label)s", "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(constraint_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "pk": "pk_%(table_name)s" } metadata =
MetaData(naming_convention=NAMING_CONVENTION) Base =
declarative_base(metadata=metadata)
Execute the migration
alembic revision --autogenerate -m "migration msg"
alembic upgrade head
Create triggers
We are going to create the trigger with alembic but now we are just going to create a blank migration to add the sql.
alembic revision -m "migration msg"
from sqlalchemy import text
def upgrade():
connection = op.get_bind()
trigger = text( """ CREATE OR REPLACE FUNCTION orders_event()
RETURNS TRIGGER AS $$ DECLARE record RECORD; payload JSON;
BEGIN IF (TG_OP = 'DELETE') THEN record = OLD; ELSE record =
NEW; END IF; payload = json_build_object('table',
TG_TABLE_NAME, 'action', TG_OP, 'data', row_to_json(record));
PERFORM pg_notify('orders', payload::text); RETURN NULL; END;
$$ LANGUAGE plpgsql; CREATE TRIGGER notify_order_event AFTER
INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE
PROCEDURE orders_event(); """) connection.execute(trigger)
def downgrade():
drop_trigger = text(""" drop trigger notify_order_event on
orders; """)
connection = op.get_bind() connection.execute(drop_trigger)
alembic upgrade head
Until now this would be the structure of our project
Create our fastfoodapi / main.py backend
import json import asyncio import aiopg from starlette.endpoints import WebSocketEndpoint from starlette.websockets import WebSocket from fastapi import Depends, FastAPI, HTTPException dsn = 'dbname = fast_food_db user = food_user password = password host = 127.0.0.1' app = FastAPI () @app.websocket_route ('/ order_events') class WebSocketOrders (WebSocketEndpoint): encoding = 'json' def __init __ (self, scope, receive, send): super () .__ init __ (scope, receive, send) self.connected = False self.loop = asyncio.get_event_loop () self.websocket = {} @asyncio.coroutine async def listen(self, conn, channel): async with conn.cursor() as cur: await cur.execute("LISTEN {0}".format(channel)) while self.connected: msg = await conn.notifies.get() payload: dict = json.loads(msg.payload) if payload.get("action") == "INSERT": insert_data: Order = payload.get("data") await self.websocket.send_json( {"message": "New order", "data": insert_data} ) elif payload.get("action") == "UPDATE": update_data: Order = payload.get("data") await self.websocket.send_json( {"message": "Order update", "data": update_data} ) async def db_events (self, data: dict, websocket: WebSocket, channel: str): async with aiopg.create_pool (dsn) as pool: async with pool.acquire () as conn: await asyncio.gather (self.listen (conn, channel)) async def on_receive (self, websocket: WebSocket, data: dict): channel: str = data.get ('channel') asyncio.ensure_future (self.db_events (data, websocket, channel), loop = self.loop) async def on_connect (self, websocket: WebSocket): await websocket.accept () self.connected = True self.websocket = websocket async def on_close (self, websocket): self.connected = False self.websocket.close ()
With a very insecure backend we start our service with uvicorn
my_env uvicorn fastfoodapi.main:app --reload
The client
Now it’s time to connect through the websocket. We will do it with Javascript.
<!DOCTYPE html>
<html>
<header>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css"> </header>
<body>
<div class="jumbotron text-center">
<h1>Real Time Notifications With Python And Postgres</h1> </div>
<div class="container">
<div class="row">
<div id="log" class="col-lg-12"></div>
</div>
</div>
</body>
<script src="http://code.jquery.com/jquery.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js"></script>
<script type="text/javascript">
var connection $(function() {
log = document.getElementById('log');
window.WebSocket = window.WebSocket || window.MozWebSocket;
connection = new WebSocket('ws://localhost:8000/order_events') connection.onopen = function() {
connection.send(JSON.stringify({
"method": "subscribe",
"channel": "orders"
}))
};
connection.onerror = function(error) {
log.innerHTML = '<li class="them">an error occurred when sending-receiving data</li>' + log.innerHTML;
};
connection.onmessage = function(message) {
try {
var json = JSON.parse(message.data);
let d = $("<div />", {
class: 'data',
"css": {
background: 'white',
padding: '0.5em',
margin: '10px 0'
}
}) let h = $("<h1 />", {
css: {
margin: 0
},
text: json.message
}) let s = $("<h3 />", {
css: {
margin: 0
},
text: json.data.name
}) let p = $("<p />", {
text: "Price " + json.data.price + " quantity " + json.data.quantity
}) d.append(h) d.append(s) d.append(p) d.append($("<hr />")) $("#log").append(d.fadeIn())
} catch (e) {
console.log('This doesn\'t look like a valid JSON: ', message);
return;
}
};
});
</script>
</html>
Now we need to generate inserts in the database, we will do it with the following orders-generator.py script
from sqlalchemy import create_engine, MetaData from sqlalchemy.orm import ( sessionmaker ) from time import sleep from sqlalchemy.sql import select, insert from sqlalchemy.ext.automap import automap_base import random db_url = "postgresql: // food_user: pac4life @ localhost / fast_food_db" engine = create_engine (db_url, echo = True) metadata = MetaData (bind = engine) SessionLocal = sessionmaker (autocommit = False, autoflush = False, bind = engine) SessionLocal.configure (bind = engine) Base = automap_base () class Order (Base): tablename = 'orders' Base.prepare (engine, reflect = True) db = SessionLocal () products = ['French Fries', 'Hamburguer', 'Nachos', 'Soda', 'Milkshake', 'Burrito', 'Hot Dog', 'Salad'] qty = [1,2,3,4,5,6,7,8,9,12] for num in range (1,200): order = Order () order.name = random.choice (products) order.price = random.choice (qty) * num order.quantity = random.choice (qty) db.add (order) db.commit () sleep (20)
We will see something like this on our orders.html page
Very easy right?. Under all this magic is Starlette that together with FastAPI give us the opportunity to create high-performance asyncio services.
All the code is in Github . There’s also a Spanish version of this post at Appbits.
In the second part we will see how we can make our service more secure with JWT tokens.
i guess no second part? 🙁
Hi Derek,
Sorry I got caught up with work. But I will write the second part soon.
Any update?
This doesn’t seem really python
elif payload.get (‘action’) “UPDATE”:
I guess you should fix whatever is reformatting your blog.
You’re right I’ll try to fix this mess. Thanks for the heads up!.
Lol I guess part II never happened