Real-time Notifications with Python and Postgres

Create a Websocket endpoint in Python with Postgres.

 

Register For Remote Notifications With Firebase Outside AppDelegate

Here’s how you can ask a user to register for remote notification when a certain action has been taken, i.e. created a new account.

import Firebase
import UserNotifications
DispatchQueue.main.async() {
    if #available(iOS 10.0, *) {
        let authOptions : UNAuthorizationOptions = [.alert, .badge, .sound]
        UNUserNotificationCenter.current().requestAuthorization(
            options: authOptions,
            completionHandler: {_,_ in })
        // For iOS 10 display notification (sent via APNS)
        UNUserNotificationCenter.current().delegate = UIApplication.shared.delegate as! AppDelegate
        // For iOS 10 data message (sent via FCM)
        FIRMessaging.messaging().remoteMessageDelegate = UIApplication.shared.delegate as! AppDelegate
    } else {
        let notificationTypes: UIUserNotificationType = [.alert, .badge, .sound]
        let pushNotificationSettings = UIUserNotificationSettings(types: notificationTypes, categories: nil)
        UIApplication.shared.registerUserNotificationSettings(pushNotificationSettings)
    }
    UIApplication.shared.registerForRemoteNotifications()
}
 

How To Save An Image Using DKIMage

Here’s a quick snippet to save the original image once it has been selected using DKImage.
 

self.assets![0].fetchOriginalImageWithCompleteBlock({ (image, info) in
    let phAsset = self.assets![0].originalAsset
    self.savePHAsset(asset: phAsset!){(file) -> Void in
        print(file)
    }
})
func savePHAsset(asset: PHAsset, completionHandler: @escaping ((URL) -> Void)) {
let manager = PHImageManager.default()
let options = PHImageRequestOptions()
options.version = .original
options.isSynchronous = true
manager.requestImageData(for: asset, options: options) { data, _, _, _ in
if let data = data {
let documentsPath = NSSearchPathForDirectoriesInDomains(.documentDirectory, .userDomainMask, true)[0] as String
let image = UIImage(data: data)
self.phaAsset = UIImage(data: data)
self.imageData = UIImageJPEGRepresentation(image!,0.3) as NSData!
self.fileNombre = "\(self.currentTag!)-\(randomStringWithLength(len: 5)).jpg"
self.filename[self.currentTag!] = FileName(name: self.fileNombre)
do {
let nsU = URL(fileURLWithPath:"\(documentsPath)/\(self.fileNombre)")
try self.imageData.write(to: nsU, options: [])
completionHandler(nsU)
} catch {
print("Error with \(documentsPath)/\(self.fileNombre)")
}
}
}
}

 

Real Time Notifications with Pyramid and ZeroMQ

Sooner or later I had to implement real time notifications on my latest project Floresta. The problem was that most of the code is synchronous. I think the ideal solution will be to run aiohttp on a separate instace or in a separate thread, but since I’m still on python 2.7 that was not possible. After researching on the subject I opted to enable gevent on my current uWSGI setup and add ZeroMQ to the equation.
So the first thing I did was install pyzmq, gevent and ZeroMQ. On mac is easy just use homebrew and pip. Make sure you install ZeroMQ version 4.1.5 or higher and pymzq 15.4.0 or higher.
On Debian I had to do the following:

Install libsodium first

git clone git://github.com/jedisct1/libsodium.git
cd libsodium
./autogen.sh
./configure && make check
sudo make install
sudo ldconfig

Then compile ZeroMQ

git clone https://github.com/zeromq/zeromq4-1.git
cd zeromq-4.1.X
./autogen.sh
./configure && make check
sudo make install
sudo ldconfig

On the uWSGI side I had to add these lines to my .ini file

gevent = 100
gevent-monkey-patch = true
enable-threads = true

Now the event.py view

# coding=utf-8
from pyramid.response import Response
from pyramid.view import view_config, notfound_view_config,forbidden_view_config
from pyramid.httpexceptions import (
    HTTPFound,
    HTTPNotFound,
    HTTPForbidden,
)
from sqlalchemy.orm.exc import NoResultFound,MultipleResultsFound
import logging
import os
import datetime
import time
import json
import threading
import  zmq
log = logging.getLogger(__name__)
sock = "ipc:///tmp/zmq.test"
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind(sock)
pub_lock = threading.Lock()
def message_generator():
    try:
        socket2 = context.socket(zmq.SUB)
        #make sure to connect and not bind. You can only have one connect and multiple binds
        socket2.connect(sock)
        """
        you can filter messages based on user id from a DB table as in
        socket2.setsockopt(zmq.SUBSCRIBE, userid)
        """
        socket2.setsockopt(zmq.SUBSCRIBE, ')
    except zmq.error.ZMQError:
        console.log("socket already in use, try restarting it")
    try:
        # run forever
        while True:
            try:
                msg = socket2.recv(zmq.NOBLOCK)
                # break out of the loop
                if msg == "EXIT":
                    console.log("exiting process.")
                    break
                yield "data: %s\n\n" % json.dumps({"message": msg})
                console.log("sending message")
            except:
                console.log("nothing send..")
                pass
            time.sleep(3)
    except GeneratorExit:
        return
#the url that streams the events to the browser
@view_config(route_name="events")
def _orders_events(request):
    headers = [("Content-Type", "text/event-stream"),
                       ("Cache-Control", "no-cache")]
    response = Response(headerlist=headers)
    response.app_iter = message_generator(userid)
    return response
#the url that publishes messages to the subscriber.
@view_config(route_name="events:push")
def push(request):
    msg = json.loads(request.body)["message"]
    with pub_lock:
        pub_socket.send(msg.encode("utf-8"))
    return Response()

The message_generator def will run forever inside your app. The way to stop it is to send a message with the string “Exit”.
Once you have this view in place, all you have to do is connect to the events url from javascript with the following code.

var source = null;
$(function() {
    source = new EventSource("/events");
    source.addEventListener("message", messageReceived, false);
    source.onerror = eventSourceErrorFunction;
    var eventSourceErrorFunction = function(event){
        if (event.eventPhase == EventSource.CLOSED) {
            that.eventSource.close();
            console.log("Event Source Closed");
        }
    }
});
function messageReceived(event) {
console.log("message arrived: " + msg)
}

Send test messages from the Javascript console

$.ajax({
        url: "/events/push",
        data: JSON.stringify({message: "Remote message"}),
        type: "post",
        success: function() {
            console.log("message sent!");
        }
    });

If you plan on placing Nginx in front of uWSGI don’t forget to turn off uwsgi_buffering and set the uwsgi_read_timeout to 300 otherwise the messages will get stuck.

 

Running Standalone SQLAlchemy Scripts in Pyramid

From time to time there comes the need to run automated scripts managed by either Python or Cron. In a recent project I had to run a standalone script that checks records on a database at certain hours. So here’s what I came up with. Most of the script is based on this post by Daniel Mayer.

from paste.deploy import appconfig
from sqlalchemy import engine_from_config
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
#here is where sqlalchemy objects are imported
from PRJ.models import DBSession as db,DBRecords
#import the session manager. This way commits will be handled automatically by the Zope Transaction Manager
import transaction
# Load Application Configuration and Return as Dictionary
conf = appconfig('config:' + 'development.ini',
        relative_to="/var/www/PRJ",
        name="main")
# Bind Engine Based on Config
engine = engine_from_config(conf, 'sqlalchemy.')
db.configure(bind=engine)
#Query the DB
data = db.query(DBRecords).one()
with transaction.manager:
  for record in data:
    #query or update DB
 

Facebook FQL Request Error

Today while attempting to query Events data through FQL, I received the following error:

Impersonated access tokens can only be used with the Graph API.

It was a strange error since I had already created my access token with the right scope. To query the data I’m using the pythonforfacebook SDK which uses https://api.facebook.com/method/fql.query as the url for FQL requests (REST API).
I then tried the graph method (https://graph.facebook.com/fql) which worked perfectly, but the response didn’t include Event pictures and I really need pictures to be present on my events listings. So what worked for me was to make a GET request thought the Graph API instead of a POST.
I use Requests for this so the code is very simple.

data = db.query(Model) #retrieve my access_token from DB
payload = {'q' : query, 'access_token': data.access_token,'format' : 'json'}
res = requests.get('https://graph.facebook.com/fql', params=payload)

With this change everything worked as expected.

 

Simpleform Localization in Pyramid

In a recent project I had to localize the errors thrown by the pyramid_simpleform package. Googling for information I couldn’t find how to do it, so here’s what worked for me at the end.

  from pyramid.i18n import get_locale_name
  from pyramid_simpleform import Form,State
  from formencode import api as formencode_api
  def includeme(config):
    config.scan(__name__)
    config.add_route('login', '/login')
  @view_config(route_name='login',renderer='website/login.mak')
  def login(request):
     """
     set the language in FormEncode according to the request url param _LOCALE_
     """
     formencode_api.set_stdtranslation(languages=[get_locale_name(request)])
     form = Form(request,
           defaults=dict(request.params),
           schema=MySchema,
           state=State()
     )
     """
     set an empty gettext translation function,
     since FormEncode has one already
     configured in the set_stdtranslation function
     """
     form.state._ = ''
     return dict(renderer=FormRenderer(form))

And that’s it, try it for example http://mysite.com/login?_LOCALE_=fr. Make sure the action param in your form passes the _LOCALE_ value if the method is set to post.

 

Manage cron jobs with python-crontab

Cron is the main time based scheduler for any linux based system and is available in almost every distro. And in a recent project I had the task to manage jobs in cron from python. Searching for a good cron manager I came across python-crontab. It makes it really easy to manage jobs directly from cron, here are some examples:
NOTE: This examples used version 0.9.6, there’s a new version available 1.2 on pypi along with some examples, the main difference is that the API has been changed from slice calls to be properties instead of methods.

Installing python-crontab is easy as pie. First we install our virtual enviroment:

cd /var/www
python virtualenv.py --no-site-packages prj-env
cd prj-env
bin/activate

Then we proceed to install python-crontab

pip install python-crontab
or
easy_install python-crontab

Let’s schedule a job to be executed everyday at 12pm

from crontab import CronTab
"""
Here the object can take two parameters one for setting
the user cron jobs, it defaults to the current user
executing the script if ommited. The fake_tab parameter
sets a testing variable. So you can print what could be
written to the file onscreen instead or writting directly
into the crontab file.
"""
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
# You can even set a comment for this command
cron_job = tab.new(cmd, comment='This is the main command')
cron_job.minute().on(0)
cron_job.hour().on(12)
#writes content to crontab
tab.write()
print tab.render()

It will print out

0 12 * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

If we want to schedule a job to be executed every five minutes we could do something like this

from crontab import CronTab
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
cron_job = tab.new(cmd)
cron_job.minute().every(5)
#writes content to crontab
tab.write()
print tab.render()

It will print out

*/5 * * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

If we want to schedule a job for a specific range of hours for example only working hours, we could do the following

from crontab import CronTab
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
cron_job = tab.new(cmd)
cron_job.minute().on(0)
cron_job.hour().during(09,18)
#writes content to crontab
tab.write()
print tab.render()

It will print out

0 09-18 * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

Now to schedule a job to run twice a day at 11 and 16 hrs, we could do the following

from crontab import CronTab
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
cron_job = tab.new(cmd)
cron_job.minute().on(0)
cron_job.hour().on('11,16')
#writes content to crontab
tab.write()
print tab.render()

it will print out

0 11,16 * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

Let’s delete the previous command

from crontab import CronTab
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
tab = CronTab(user='www',fake_tab='True')
cron_job = tab.find_command(cmd)
if len(cron_job) > 0:
    tab.remove_all(cmd)
#writes content to crontab
tab.write()

So there you have it, examples that make python-crontab a great python manager for cron jobs. Now I know there are pure Python implementations like this, an event scheduler named sched and libraries like Kronos. But I decided to keep things simple.

 

Hey Facebook, your help support center is crap.

It’s been almost a month since a friend of mine had her account “Temporarily deactivated”, now here the word temporarily is misleading because she hasn’t recovered her account yet. The worst part is that Facebook doesn’t tell her why the account was blocked. After many hours we came to the conclusion that maybe a jealous friend might have reported her account.
Now can you imagine if she also had a paying subscription to one of those online services that have Facebook based auth logins only?, now she could also be losing money because of this.
So she tried to have Facebook send her a code to a cellphone but their system didn’t sent shit. We live in México so maybe our cellphone carrier is not supported, but do Facebook cares to let you know that?, of course not. So after a few tries for a couple of days Facebook now just shows a message which says “The system is overloaded”. Overloaded with what!?, fuck this is just so frustrating because that message is the only thing that appears from now on.
After a long period of time she received an email from Facebook but it didn’t help at all, they didn’t give any info on why her account was blocked or anything they just sent her a link to the same page where you tell Facebook to send you a code.
The worst thing of all this is that Facebook has top notch engineers working for them, why not try to improve the help support center?. Oh no they can’t they are too busy developing all those innovating features right?.
The good thing is that now I know how to deactivate my account in just one simple step. Just tell a friend to report your profile and Facebook will not give a fuck and deactivate your account for you without a notice. This way you won’t have to receive those fucking emails from them trying to convince you to come back and activate your account.
Facebook please get your shit together. This mechanism of recovering your account is unfriendly, inhuman, it frustrates users and leads to posts like this one. My friend since then has been feeling sad, desperate and depress from all this, it saddens me to see her like this.

 

Setup The Newrelic Monitoring Agent On A Pylons App

Today I decide to signup for a free trial of the newrelic monitoring agent and I wanted to write on how to setup the agent on a Pylons app.
EDIT: Commenter Graham Dumpleton has advised against this setup in the comments. There’s the potential risk of the agent initializing twice and additional modules not working correctly. So please don’t use this setup on a production app.
Open the middleware.py file located on PRJNAME/config and add the following. Make sure to set the correct path where your newrelic.ini file is located

#import the agent
import newrelic.agent
#create a middleware class to initialize the agent
class NewRelicAgent(object):
    def __init__(self, app):
        self.app = app
        newrelic.agent.initialize('/path/to/the/file/newrelic.ini')
    @newrelic.agent.wsgi_application()
    def __call__(self, environ, start_response):
        return self.app(environ, start_response)

Now it’s time to add our custom middleware inside the make_app def

# CUSTOM MIDDLEWARE HERE (filtered by error handling middlewares)
app = NewRelicAgent(app)

And that’s it. Now it’s time to collect some stats :).