Real Time Notifications with Pyramid and ZeroMQ

Sooner or later I had to implement real time notifications on my latest project Floresta. The problem was that most of the code is synchronous. I think the ideal solution will be to run aiohttp on a separate instace or in a separate thread, but since I’m still on python 2.7 that was not possible. After researching on the subject I opted to enable gevent on my current uWSGI setup and add ZeroMQ to the equation.

So the first thing I did was install pyzmq, gevent and ZeroMQ. On mac is easy just use homebrew and pip. Make sure you install ZeroMQ version 4.1.5 or higher and pymzq 15.4.0 or higher.

On Debian I had to do the following:

Install libsodium first

git clone git://github.com/jedisct1/libsodium.git
cd libsodium
./autogen.sh
./configure && make check 
sudo make install 
sudo ldconfig

Then compile ZeroMQ

git clone https://github.com/zeromq/zeromq4-1.git
cd zeromq-4.1.X
./autogen.sh 
./configure && make check 
sudo make install 
sudo ldconfig

On the uWSGI side I had to add these lines to my .ini file

gevent = 100
gevent-monkey-patch = true
enable-threads = true

Now the event.py view

# coding=utf-8
from pyramid.response import Response
from pyramid.view import view_config, notfound_view_config,forbidden_view_config
 
from pyramid.httpexceptions import (
    HTTPFound,
    HTTPNotFound,
    HTTPForbidden,
)
 
from sqlalchemy.orm.exc import NoResultFound,MultipleResultsFound
 
import logging
 
import os
 
import datetime
 
import time
 
import json 
 
import threading
 
import  zmq
 
log = logging.getLogger(__name__)
 
sock = "ipc:///tmp/zmq.test"
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind(sock)
pub_lock = threading.Lock()
 
def message_generator():
    try:
        socket2 = context.socket(zmq.SUB)
        #make sure to connect and not bind. You can only have one connect and multiple binds
        socket2.connect(sock)
        """
        you can filter messages based on user id from a DB table as in
        socket2.setsockopt(zmq.SUBSCRIBE, userid)
        """
        socket2.setsockopt(zmq.SUBSCRIBE, ')  
    except zmq.error.ZMQError:
        console.log("socket already in use, try restarting it")
    try:
        # run forever
        while True:
            try:
                msg = socket2.recv(zmq.NOBLOCK)
                # break out of the loop
                if msg == "EXIT":
                    console.log("exiting process.")
                    break
                yield "data: %s\n\n" % json.dumps({"message": msg})
                console.log("sending message")
            except:
                console.log("nothing send..")
                pass
            time.sleep(3)
    except GeneratorExit:
        return
 
#the url that streams the events to the browser
 
@view_config(route_name="events")
def _orders_events(request):
    headers = [("Content-Type", "text/event-stream"),
                       ("Cache-Control", "no-cache")]
    response = Response(headerlist=headers)
    response.app_iter = message_generator(userid)
    return response
 
#the url that publishes messages to the subscriber.
@view_config(route_name="events:push")
def push(request):
    msg = json.loads(request.body)["message"]
    with pub_lock:
        pub_socket.send(msg.encode("utf-8"))
    return Response()

The message_generator def will run forever inside your app. The way to stop it is to send a message with the string “Exit”.

Once you have this view in place, all you have to do is connect to the events url from javascript with the following code.

var source = null;
 
$(function() {
    source = new EventSource("/events");
    source.addEventListener("message", messageReceived, false);
    source.onerror = eventSourceErrorFunction;
    var eventSourceErrorFunction = function(event){
        if (event.eventPhase == EventSource.CLOSED) {
            that.eventSource.close();
            console.log("Event Source Closed");
        }
    }
});
 
function messageReceived(event) {
console.log("message arrived: " + msg)
}

Send test messages from the Javascript console

$.ajax({
        url: "/events/push",
        data: JSON.stringify({message: "Remote message"}),
        type: "post",
        success: function() {
            console.log("message sent!");
        }
    });

If you plan on placing Nginx in front of uWSGI don’t forget to turn off uwsgi_buffering and set the uwsgi_read_timeout to 300 otherwise the messages will get stuck.

 

Running Standalone SQLAlchemy Scripts in Pyramid

From time to time there comes the need to run automated scripts managed by either Python or Cron. In a recent project I had to run a standalone script that checks records on a database at certain hours. So here’s what I came up with. Most of the script is based on this post by Daniel Mayer.

from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
 
#here is where sqlalchemy objects are imported
from PRJ.models import DBSession as db,DBRecords
 
#import the session manager. This way commits will be handled automatically by the Zope Transaction Manager
import transaction
 
# Load Application Configuration and Return as Dictionary
conf = appconfig('config:' + 'development.ini', 
        relative_to="/var/www/PRJ",
        name="main")
 
# Bind Engine Based on Config
engine = engine_from_config(conf, 'sqlalchemy.')
db.configure(bind=engine)
 
#Query the DB
data = db.query(DBRecords).one()
 
with transaction.manager:
  for record in data:
    #query or update DB
 

Simpleform Localization in Pyramid

In a recent project I had to localize the errors thrown by the pyramid_simpleform package. Googling for information I couldn’t find how to do it, so here’s what worked for me at the end.

  from pyramid.i18n import get_locale_name
  from pyramid_simpleform import Form,State
 
  from formencode import api as formencode_api
 
  def includeme(config):
    config.scan(__name__)
    config.add_route('login', '/login')
 
  @view_config(route_name='login',renderer='website/login.mak')
  def login(request):
     """
     set the language in FormEncode according to the request url param _LOCALE_
     """
     formencode_api.set_stdtranslation(languages=[get_locale_name(request)])
     form = Form(request,
           defaults=dict(request.params),
           schema=MySchema,
           state=State()
     )
     """
     set an empty gettext translation function,
     since FormEncode has one already
     configured in the set_stdtranslation function
     """
     form.state._ = ''
     return dict(renderer=FormRenderer(form))

And that’s it, try it for example http://mysite.com/login?_LOCALE_=fr. Make sure the action param in your form passes the _LOCALE_ value if the method is set to post.