Running Standalone SQLAlchemy Scripts in Pyramid

From time to time there comes the need to run automated scripts managed by either Python or Cron. In a recent project I had to run a standalone script that checks records on a database at certain hours. So here’s what I came up with. Most of the script is based on this post by Daniel Mayer.

from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
 
#here is where sqlalchemy objects are imported
from PRJ.models import DBSession as db,DBRecords
 
#import the session manager. This way commits will be handled automatically by the Zope Transaction Manager
import transaction
 
# Load Application Configuration and Return as Dictionary
conf = appconfig('config:' + 'development.ini', 
        relative_to="/var/www/PRJ",
        name="main")
 
# Bind Engine Based on Config
engine = engine_from_config(conf, 'sqlalchemy.')
db.configure(bind=engine)
 
#Query the DB
data = db.query(DBRecords).one()
 
with transaction.manager:
  for record in data:
    #query or update DB
 

Simpleform Localization in Pyramid

In a recent project I had to localize the errors thrown by the pyramid_simpleform package. Googling for information I couldn’t find how to do it, so here’s what worked for me at the end.

  from pyramid.i18n import get_locale_name
  from pyramid_simpleform import Form,State
 
  from formencode import api as formencode_api
 
  def includeme(config):
    config.scan(__name__)
    config.add_route('login', '/login')
 
  @view_config(route_name='login',renderer='website/login.mak')
  def login(request):
     """
     set the language in FormEncode according to the request url param _LOCALE_
     """
     formencode_api.set_stdtranslation(languages=[get_locale_name(request)])
     form = Form(request,
           defaults=dict(request.params),
           schema=MySchema,
           state=State()
     )
     """
     set an empty gettext translation function,
     since FormEncode has one already
     configured in the set_stdtranslation function
     """
     form.state._ = ''
     return dict(renderer=FormRenderer(form))

And that’s it, try it for example http://mysite.com/login?_LOCALE_=fr. Make sure the action param in your form passes the _LOCALE_ value if the method is set to post.

 

Manage cron jobs with python-crontab

Cron is the main time based scheduler for any linux based system and is available in almost every distro. And in a recent project I had the task to manage jobs in cron from python. Searching for a good cron manager I came across python-crontab. It makes it really easy to manage jobs directly from cron, here are some examples:

NOTE: This examples used version 0.9.6, there’s a new version available 1.2 on pypi along with some examples, the main difference is that the API has been changed from slice calls to be properties instead of methods.

Installing python-crontab is easy as pie. First we install our virtual enviroment:

cd /var/www
python virtualenv.py --no-site-packages prj-env
cd prj-env
bin/activate

Then we proceed to install python-crontab

pip install python-crontab 
or
easy_install python-crontab

Let’s schedule a job to be executed everyday at 12pm

from crontab import CronTab
 
"""
Here the object can take two parameters one for setting 
the user cron jobs, it defaults to the current user 
executing the script if ommited. The fake_tab parameter 
sets a testing variable. So you can print what could be 
written to the file onscreen instead or writting directly
into the crontab file. 
"""
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
# You can even set a comment for this command
cron_job = tab.new(cmd, comment='This is the main command')
cron_job.minute().on(0)
cron_job.hour().on(12)
#writes content to crontab
tab.write()
print tab.render()

It will print out

0 12 * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

If we want to schedule a job to be executed every five minutes we could do something like this

 
from crontab import CronTab
 
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
cron_job = tab.new(cmd)
cron_job.minute().every(5)
#writes content to crontab
tab.write()
print tab.render()

It will print out

*/5 * * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

If we want to schedule a job for a specific range of hours for example only working hours, we could do the following

from crontab import CronTab
 
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
cron_job = tab.new(cmd)
cron_job.minute().on(0)
cron_job.hour().during(09,18)
#writes content to crontab
tab.write()
print tab.render()

It will print out

0 09-18 * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

Now to schedule a job to run twice a day at 11 and 16 hrs, we could do the following

from crontab import CronTab
 
tab = CronTab(user='www',fake_tab='True')
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
cron_job = tab.new(cmd)
cron_job.minute().on(0)
cron_job.hour().on('11,16')
 
#writes content to crontab
tab.write()
print tab.render()

it will print out

0 11,16 * * * /var/www/pjr-env/bin/python /var/www/PRJ/job.py

Let’s delete the previous command

from crontab import CronTab
cmd = '/var/www/pjr-env/bin/python /var/www/PRJ/job.py'
tab = CronTab(user='www',fake_tab='True')
 
cron_job = tab.find_command(cmd)
if len(cron_job) > 0:
    tab.remove_all(cmd) 
#writes content to crontab
tab.write()

So there you have it, examples that make python-crontab a great python manager for cron jobs. Now I know there are pure Python implementations like this, an event scheduler named sched and libraries like Kronos. But I decided to keep things simple.

 

Setup The Newrelic Monitoring Agent On A Pylons App

Today I decide to signup for a free trial of the newrelic monitoring agent and I wanted to write on how to setup the agent on a Pylons app.

EDIT: Commenter Graham Dumpleton has advised against this setup in the comments. There’s the potential risk of the agent initializing twice and additional modules not working correctly. So please don’t use this setup on a production app.

Open the middleware.py file located on PRJNAME/config and add the following. Make sure to set the correct path where your newrelic.ini file is located

#import the agent
import newrelic.agent
 
#create a middleware class to initialize the agent
class NewRelicAgent(object):
    def __init__(self, app):
        self.app = app
        newrelic.agent.initialize('/path/to/the/file/newrelic.ini')
 
    @newrelic.agent.wsgi_application()
    def __call__(self, environ, start_response):
        return self.app(environ, start_response)

Now it’s time to add our custom middleware inside the make_app def

# CUSTOM MIDDLEWARE HERE (filtered by error handling middlewares)
app = NewRelicAgent(app)

And that’s it. Now it’s time to collect some stats :).

 

UUID Objects in Pylons with SQLAlchemy

Recently I had to generate and store uuid objects into a Postgres database, but to my surprise SQLAlchemy kept showing the following error:

sqlalchemy.exc.ProgrammingError: (ProgrammingError) can't adapt type 'UUID'

So the solution was that I had to change my field type from varchar to uuid on my Postgres database, import the psycopg2 extras functions and register the uuid type:

import psycopg2.extras
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy import MetaData,Column, Table, ForeignKey
 
engine = create_engine('postgresql://user:pass@localhost/db',echo=True)
metadata = MetaData(bind=engine)
hash_key = uuid.uuid4()
psycopg2.extras.register_uuid()
conn = engine.connect()
query = conn.execute("insert into tbl (uuid_col) values(%(uuid)s",uuid=hash_key)

And voila, values were inserted correctly on my database.

 

Implementing a text based captcha in Pylons

One of the must have elements on any html form as an anti-spam measure is a captcha, and the captcha technique that has worked pretty well for me is text based. So here’s how to implement one.

Create a controller and name it captcha

 ~$ paster controller captcha

Add this function to your helpers.py file

def makeCaptcha(lang):
      label = {
          1 : 'one',
          2 : 'two',
          3 : 'three',
          4 : 'four',
          5 : 'five',
          6 : 'six',
          7 : 'seven',
          8 : 'eight',
          9 : 'nine',
          10 : 'ten'
      }
      num1 = random.randint(1,10)
      num2 = random.randint(1,10)
      res = num1 + num2
      session['captcha'] = res
      session.save()
      return [label[num1],label[num2]]

On the templates folder create an html file with an example form

<form method="post" action="${url(controller='captcha',action='contact')}" id="contact-frm">
  <fieldset>
    <%
      capt = h.makeCaptcha()
      c.captcha =  u"What is the result of the sum %s and %s?." % (capt[0],capt[1])
    %>
    <span>${u"As an  anti spam measure, please answer the following math question."}</span><br />
    <label for="captcha">${c.captcha}</label>
    <!-- we set a maxlength of 2 characters on this input field because the answer should be numeric -->
    ${h.text('captcha',maxlength=2)}
 </fieldset>
</form>

Save it and call it captcha.html

Add the following to captcha controller

I decided to create a validation schema using formencode:

import formencode
 
from formencode import variabledecode
 
from formencode import validators
 
from formencode.validators import Invalid, FancyValidator
 
#this class will validate the captcha value entered by the user
class CaptchaValidator(formencode.FancyValidator):
    def _to_python(self,values,state):
        if session.get('captcha') != int(values.get('captcha')):
          raise formencode.Invalid(u"The math answer is incorrect",values, state)
          return values
 
class NewInquiry(formencode.Schema):
  allow_extra_fields = True
  filter_extra_field = True
  captcha = formencode.validators.String(
    not_empty = True,
    messages = {
      'empty' : 'You need to answer the math question.'
  })
  # chain the captcha validator
  chained_validators = [CaptchaValidator()]
 
# our contact view
def contact(self):
      if request.method == 'POST':
        try:
          values = dict(request.params)
          schema = NewInquiry()
          results = schema.to_python(values)
        except Invalid, e:
          #raise error if something went wrong
        else:
          #send to contacts
      return render('captcha.html')

And that’s pretty much it. The only drawback I can find is that isn’t very intuitive at times since users my try to answer with words instead of numeric values. That’s why I added a maxlength of 2 characters to the input field.

But you could easily implement a Javascript validation function to notify the user to type numeric values instead of characters before she submits the form.

 

Pylons and Twitter Based Authorization

Implementing Twitter authorization in Pylons is easy as butter, I will show you how to do it in the next steps.

Download python-twitter and install it

https://code.google.com/p/python-twitter

  python setup.py build
  python setup.py install

Download and install dependencies

https://github.com/simplegeo/python-oauth2
http://code.google.com/p/httplib2/
http://cheeseshop.python.org/pypi/simplejson

Now you’re almost there. There’s a file called get_access_token.py that you need to call in order to obtain the Twitter session secret_token, but I decided to create a class based from that file instead. Save this class on your project lib folder as twittertoken.py.

import os
import sys
 
# parse_qsl moved to urlparse module in v2.6
try:
  from urlparse import parse_qsl
except:
  from cgi import parse_qsl
 
import oauth2 as oauth
 
REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token'
ACCESS_TOKEN_URL  = 'https://api.twitter.com/oauth/access_token'
AUTHORIZATION_URL = 'https://api.twitter.com/oauth/authorize'
SIGNIN_URL        = 'https://api.twitter.com/oauth/authenticate'
 
class GenerateToken(object):
 
  def __init__(self,consumer_key=None,consumer_secret=None):
    if consumer_key is None or consumer_secret is None:
      raise TokenError('please add a consumer key and secret')
    else:
      signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
      self.oauth_consumer             = oauth.Consumer(key=consumer_key, secret=consumer_secret)
      self.oauth_client               = oauth.Client(self.oauth_consumer)
 
  def getrequestTokenURL(self):
    resp, content = self.oauth_client.request(REQUEST_TOKEN_URL, 'GET')
    if resp['status'] != '200':
      raise TokenError('Invalid response from Twitter')
    else:
      request_token = dict(parse_qsl(content))
      pieces = {
        'url' : "%s?oauth_token=%s" % (AUTHORIZATION_URL, request_token['oauth_token']),
        'token_secret': request_token['oauth_token_secret']
      }
      return pieces
 
  def authRequest(self,oauth_token=None,oauth_token_secret=None,oauth_verifier=None):
      token = oauth.Token(oauth_token,oauth_token_secret)
      token.set_verifier(oauth_verifier)
      oauth_client  = oauth.Client(self.oauth_consumer, token)
      resp, content = oauth_client.request(ACCESS_TOKEN_URL, method='POST', body='oauth_verifier=%s' % oauth_verifier)
      access_token  = dict(parse_qsl(content))
      if resp['status'] != '200':
        raise TokenError('The request for a Token %s did not succeed: %s' % (access_token,resp['status']) )
      else:
        auth = {
          'access_token' : access_token['oauth_token'],
          'access_token_secret' : access_token['oauth_token_secret'],
        }
        return auth
 
class TokenError(Exception):
  '''Base class for Token errors'''
 
  @property
  def message(self):
    '''Returns the first argument used to construct this error.'''
    return self.args[0]

Usage

create a controller in this case I’ll create an account controller

 paster controller account
import twitter
import PRJNAME.lib.twittertoken as twittertoken
def twitter_auth(self):
    token = twittertoken.GenerateToken(consumer_key=config['twitter.key'], consumer_secret=config['twitter.secret'])
    request_token = token.getrequestTokenURL()
    #save the token secret it will be used to generate the user's access_token and and access_secret
    session['token_secret'] = request_token['token_secret']
    session.save()
    # redirect to twitter screen
    return redirect(url(request_token['url']))
 
def twitter_preferences(self):
   params = request.params
   twittertoken.GenerateToken(consumer_key=config['twitter.key'], consumer_secret=config['twitter.secret'])
  auth = twittertoken.authRequest(oauth_token=params.get('oauth_token'),oauth_token_secret=session.get('token_secret'),oauth_verifier=params.get('oauth_verifier'))
  if auth['access_token'] and auth['access_token_secret']:
  #save to db or get user friend list
  for u in api.GetFriends():
    log.debug(u.name)

I hope it helps to someone looking to implement Twitter on their Pylons projects.

 

repoze.what with reflected tables using SQLAlchemy in Pylons

One of the many things I like about SQLAlchemy is the feature of reflected tables, this means I can develop dashboards for existing applications with my current favorite framework(Pylons). So I had to create an authorization mechanism for my dashboard and this recipe from the Pylons cookbook was just what I needed.

The only thing I had to change was the way my tables were being mapped since the users table schema was already defined. This is what I did:

user_table = schema.Table('users', Base.metadata, autoload=True, autoload_with=engine)
 
user_group_table = schema.Table('user_groups',Base.metadata,autoload=True,autoload_with=engine)
 
group_permission_table = schema.Table('group_permission',Base.metadata,autoload=True,autoload_with=engine)
 
permission_table = schema.Table('permissions', Base.metadata,autoload=True,autoload_with=engine)
 
group_table = schema.Table('groups', Base.metadata,autoload=True,autoload_with=engine)
 
orm.mapper(User,user_table, properties={
      'groups':orm.relation(Group, secondary=user_group_table),
    })
 
orm.mapper(Permission,permission_table,properties={
      'groups':orm.relation(Group, secondary=group_permission_table),
    })
 
orm.mapper(Group,group_table,properties={
      'permissions':orm.relation(Permission, secondary=group_permission_table),
      'users':orm.relation(User, secondary=user_group_table),
    })
 
orm.mapper(UGT,user_group_table)
 
orm.mapper(GPT,group_permission_table)

And here’s the schema I had to create, except for the users table:

CREATE TABLE user_groups( 
id serial NOT NULL,  
user_id INTEGER,  
group_id INTEGER,  
CONSTRAINT user_groups_pkey PRIMARY KEY (id), 
CONSTRAINT gid_fkey FOREIGN KEY (group_id) REFERENCES groups (id)
CONSTRAINT uid_fkey FOREIGN KEY (user_id)   REFERENCES users (id) 
)
CREATE TABLE group_permission(
id serial NOT NULL,
gid INTEGER,
permid INTEGER,
CONSTRAINT group_permission_pkey PRIMARY KEY (id),
CONSTRAINT groupid_fkey FOREIGN KEY (gid) REFERENCES groups (id),
CONSTRAINT permid_fkey FOREIGN KEY (permid) REFERENCES permissions (id) 
)
CREATE TABLE permissions(
  id serial NOT NULL,
  pname CHARACTER VARYING(50),
  CONSTRAINT permissions_pkey PRIMARY KEY (id)
)
CREATE TABLE groups(
  id serial NOT NULL,
  gname CHARACTER VARYING(50),
  CONSTRAINT groups_pkey PRIMARY KEY (id)
)
CREATE TABLE users(
id serial NOT NULL,
username CHARACTER VARYING(50),
email CHARACTER VARYING(50),
upassword CHARACTER VARYING(150),
CONSTRAINT users_pkey PRIMARY KEY (id)
)

And that’s all there’s to it. I was very surprise on how flexible SQLAlchemy is.

 

Nginx, uWSGI, Pylons and the double slash problem

So there I am, deploying my first Pylons based app and thinking on which WSGI interface to go with. At the end I decided to go with uWSGI it has been proven to be a beast at handling requests even on high loads.

I setup my Nginx conf file, fired up uWSGI and found out that all of my links had a double slash i.e //home/new. I couldn’t belive it all my hard work for nothing. After a couple of searches I found out a reply on the Nginx mailing list from Igor Sysoev saying the solution is to add the qualified param to all the url() calls.

url(controller='account', action='new', qualified=True)

Indeed that solved the problem, for a second I thought my chance for testing the performance of uWSGI was gone.

 

 

Posting to a page wall with the Facebook SDK in Python

I’ve been developing in Python for quite a few months now and the need to develop a Facebook app has arrived.

So in my app I had the need to post content to a Fan Page wall and after a couple of tests I noticed the content being posted on the Fan Page on behalf of the Page owner and not the page itself.

To fix that I had to make the following changes inside the if condition where it checks for the arguments being posted:

if post_args is not None:
  if post_args['page_token']:
    post_args["access_token"] = post_args['page_token']#self.access_token
  else:
    post_args["access_token"] = self.access_token
else:
  args["access_token"] = self.access_token

So now I’m checking for an extra argument in this case page_token, if it’s set we use that token instead of the token that was used to initialize the Facebook Graph object.

With this changes in place we add the extra parameter page_token to our attachment and it shoud work as expected, with the content being posted on behalf of the page.

graph = facebook.GraphAPI(str(access_token))
attach = {
  "name": 'Hello world',
  "link": 'http://www.example.com',
  "caption": 'test post',
  "description": 'some test',
  "picture" : 'http://www.example.com/picture.jpg'
  "page_token" : str(page_token)
}
msg = 'New content posted'
post = graph.put_wall_post(message=msg, attachment=attach,profile_id=str(page_id))
  if post:
    return 'posted'