Falcon API Framework on Docker – 2

In this article, we will need to build an API by using the Falcon API Framework in Python. We’re going to cover some hints and methods to request. This article is part of a series:

1) Bootstrapping Falcon API Framework on Docker

2) Creating an API by using individual Falcon methods. (You’re here.)

You had better read the previous article or you will be confused about that. Right, let’s start.

My root directory is the following. First of all, I’ve created the requirements.txtfile.

attrs==19.1.0
falcon==2.0.0
falcon-autocrud==1.0.36
gunicorn==19.9.0
jsonschema==3.0.1
marshmallow==2.19.5
psycopg2==2.8.3
pyrsistent==0.15.4
python-dateutil==2.8.0
six==1.12.0
SQLAlchemy==1.3.6
webargs==5.4.0

It’s ready to fetch the libraries by this command.

pip install -r requirements.txt

After arranging the libraries, we can create the Python files. My main file is app.py. In spite of this, you can define another name.

import falcon
from resources import athlete
from resources import plan
from resources import exercise
from services import database_service

from middlewares import (
    ContentEncodingMiddleware,
)

conn = database_service.connect()

api = falcon.API(middleware=[
    ContentEncodingMiddleware(),
])

# api = falcon.API() athlete = athlete.Athlete(conn, database_service)
plan = plan.Plan(conn, database_service)
exercise = exercise.Exercise(conn, database_service)

api.add_route('/athletes/{id}', athlete)
api.add_route('/athletes', athlete, suffix='collection')
api.add_route('/plans/{id}', plan)
api.add_route('/plans', plan, suffix='collection')
api.add_route('/exercises/{id}', exercise)
api.add_route('/exercises', exercise, suffix='collection')

This file provides Falcon app to bootstrap. I added my routes and related files into this file. Now, as you see, there’s resourcedirectory. This directory ships the my objecys. I’ve 3 objects to ensure as the resources. First one is plan.py.


import falcon
from webargs import fields
from webargs.falconparser import use_args


class Plan(object):
    post_request_args = {"name": fields.Str(required=True), "description": fields.Str(required=True),
                         "difficulty": fields.Int(required=True), "athlete_id": fields.Int(required=True)}

    def __init__(self, conn, database_service):
        self.conn, self.database_service, self.resource_name = conn, database_service, self.__class__.__name__

    def on_delete(self, req, resp, id):

        try:
            q = " ".join(
                ["DELETE", "FROM", self.resource_name.lower(), "WHERE", self.resource_name.lower() + "_id = %s"])
            q_resp = self.database_service.run_delete_query(self.conn, q, [id])
            if not q_resp['status']:
                output = {"status": True, "message": q_resp['message'],
                          "data": None}
            else:
                output = {"status": True, "message": self.resource_name + " was deleted successfully!", "data": None}

            resp.status = falcon.HTTP_200
            resp.body = output
        except Exception as error:
            output = {"status": False, "message": str(error), "data": None}
            resp.status = falcon.HTTP_500
            resp.body = output

    def on_get(self, req, resp, id):
        try:
            cur = self.conn.cursor()

            q = " ".join(
                ["SELECT", "*", "FROM", self.resource_name.lower(), "wHERE", self.resource_name.lower() + "_id = %s"])
            q_resp = self.database_service.run_get_query(cur, q, [id])
            if not q_resp['status']:
                output = {"status": True, "message": q_resp['message'],
                          "data": None}
            else:
                output = {"status": True, "message": None,
                          'data': self.database_service.set_columns(q_resp['data'], cur)}

            resp.status = falcon.HTTP_200
            resp.body = output
        except Exception as error:
            output = {"status": False, "message": str(error), "data": None}
            resp.status = falcon.HTTP_500
            resp.body = output

    def on_get_collection(self, req, resp):
        try:
            cur = self.conn.cursor()
            q = " ".join(
                ["SELECT * FROM", self.resource_name.lower()])
            q_resp = self.database_service.run_get_query(cur, q, [])
            if not q_resp['status']:
                output = {"status": True, "message": q_resp['message'],
                          "data": None}
            else:
                output = {"status": True, "message": None,
                          'data': self.database_service.set_columns(q_resp['data'], cur)}

            resp.status = falcon.HTTP_200
            resp.body = output
        except Exception as error:
            output = {"status": False, "message": str(error), "data": None}
            resp.status = falcon.HTTP_500
            resp.body = output

    def on_put(self, req, resp, id):
        try:

            cur = self.conn.cursor()
            # q = "SELECT name, description, difficulty FROM " + self.resource_name.lower() + " WHERE " + self.resource_name.lower() + "_id = %s;"             get_q = " ".join(
                ["SELECT name,description,difficulty FROM", self.resource_name.lower(), "wHERE",
                 self.resource_name.lower() + "_id = %s"])
            get_resp = self.database_service.run_get_query(cur, get_q, [id])

            record = list(self.database_service.set_columns(get_resp['data'], cur))[0]

            request = req.media
            for index in record.keys():
                if index in request.keys():
                    record[index] = request[index]
            record['id'] = id

            update_q = " ".join(
                ["UPDATE", self.resource_name.lower(), "SET name=%s, description=%s, difficulty=%s WHERE",
                 self.resource_name.lower() + "_id=%s RETURNING ", self.resource_name.lower() + "_id;"])

            update_resp = self.database_service.run_upsert_query(self.conn, update_q, record.values())
            if not update_resp['status']:
                output = {"status": True, "message": update_resp['message'],
                          "data": None}
            else:
                response_data = {
                    "id": update_resp['data'],
                    "name": record['name'],
                    "description": record['description'],
                    "difficulty": record['difficulty']
                }

                output = {"status": True, "message": self.resource_name + " is updated successfully!",
                          "data": response_data}

            resp.status = falcon.HTTP_201
            resp.body = output

        except Exception as error:
            output = {"status": False, "message": str(error), "data": None}
            resp.status = falcon.HTTP_500
            resp.body = output

    @use_args(post_request_args)
    def on_post_collection(self, req, resp, args):
        try:
            # q = "INSERT INTO " + self.resource_name.lower() + " (name, description, difficulty, athlete_id) VALUES (%s,%s,%s,%s) RETURNING " + self.resource_name.lower() + "_id;"             q = " ".join(
                ["INSERT INTO", self.resource_name.lower(),
                 "(name, description, difficulty, athlete_id) VALUES (%s,%s,%s,%s) RETURNING",
                 self.resource_name.lower() + "_id;"])

            params = {'name': args['name'], 'description': args['description'], 'difficulty': args['difficulty'],
                      'athlete_id': args['athlete_id']}

            q_resp = self.database_service.run_upsert_query(self.conn, q, params.values())

            if not q_resp['status']:
                output = {"status": True, "message": q_resp['message'],
                          "data": None}
            else:
                response_data = {
                    "id": q_resp['data'],
                    "name": args['name'],
                    "description": args['description'],
                    "difficulty": args['difficulty'],
                    "athlete_id": args['athlete_id']
                }

                output = {"status": True, "message": self.resource_name + " is added successfully!",
                          "data": response_data}

            resp.status = falcon.HTTP_201
            resp.body = output
        except Exception as error:
            output = {"status": False, "message": str(error), "data": None}
            resp.status = falcon.HTTP_500
            resp.body = output

I’m going to share only this file as a resource. Because other files are exact same according to code structure.

You’ll see the prefix in front of the methods called on_and HTTP verbs come after this prefix. We understand, each method represents the HTTP verbs.

According to this, we’ll have these endpoints;

1-) http://localhost:8000/plans (GET, POST, PUT, DELETE)

Perfection! It’s straightforward to bootstrap. We can pass the configuration step. Therefore, I’ve created a conf directory. It ships two files. config.ini and gunicorn_conf.py. The ini file is like this.

[postgresqlDB]
host = postgresql
db = test_dev
user = mertingen
pass = mertingen

That’s configuration file for web service.

import multiprocessing

bind = '0.0.0.0:8000'
workers = multiprocessing.cpu_count() * 2 + 1
timeout = 30
worker_connections = 1000

I’ve also servicesdirectory to handle some useful processes. That’s the create_schema_service.py

import psycopg2
import database_service

conn = database_service.connect()


def create_tables():
    """ create tables in the PostgreSQL database"""
    commands = (
        """ CREATE TABLE IF NOT EXISTS athlete ( athlete_id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, phone VARCHAR(255) NOT NULL, gender VARCHAR(255) NOT NULL, birthday DATE NOT NULL ) """,
        """ CREATE TABLE IF NOT EXISTS plan ( plan_id SERIAL PRIMARY KEY, athlete_id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, description TEXT NOT NULL, difficulty VARCHAR(255) NOT NULL, FOREIGN KEY (athlete_id) REFERENCES athlete (athlete_id) ON UPDATE CASCADE ON DELETE CASCADE ) """,
        """ CREATE TABLE IF NOT EXISTS exercise ( exercise_id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, description TEXT NOT NULL ) """
    )

    try:
        cur = conn.cursor()
        for c in commands:
            cur.execute(c)
            print("Table was created successfully!")
        cur.close()
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()


if __name__ == '__main__':
    create_tables()

And, database_service.py

import psycopg2 as pg
import configparser as cp
import os

dir_path = os.path.dirname(os.path.realpath(__file__))
c = cp.ConfigParser()

c.read(dir_path + '/../conf/config.ini')


def connect():
    try:
        connection = pg.connect(user=c['postgresqlDB']['user'],
                                password=c['postgresqlDB']['pass'],
                                host=c['postgresqlDB']['host'],
                                port="5432",
                                database=c['postgresqlDB']['db'])
        print("You are connected!")
        return connection
    except (Exception, pg.Error) as error:
        print("Error while connecting to PostgreSQL", error)

    # finally:     # if connection:     # connection.close()     # print("PostgreSQL connection is closed") 

def set_columns(data, cur):
    items = []
    if data:
        for x in data:
            item = {}
            c = 0
            for col in cur.description:
                item.update({col[0]: x[c]})
                c = c + 1
            items.append(item)
        return items
    else:
        return []


def run_get_query(cur, query, params):
    try:
        if params:
            cur.execute(query, tuple(params))
        else:
            cur.execute(query)
        records = cur.fetchall()
        return {"status": True, "message": "", "data": records}
    except pg.InternalError as e:
        return {"status": False, "message": str(e), "data": None}


def run_upsert_query(conn, q, params):
    try:
        cur = conn.cursor()
        cur.execute(q, tuple(params))
        conn.commit()

        id = cur.fetchone()[0]
        return {"status": True, "message": "", "data": id}
    except pg.InternalError as e:
        conn.rollback()
        return {"status": False, "message": str(e), "data": None}


def run_delete_query(conn, q, params):
    try:
        cur = conn.cursor()
        cur.execute(q, tuple(params))
        conn.commit()
        return {"status": True, "message": "", "data": None}
    except pg.InternalError as e:
        conn.rollback()
        return {"status": False, "message": str(e), "data": None}

Lastly, I’d rather provide my resources as a JSON format. For this, I’ve created middleware in the root directory for Falcon.

import json
from datetime import date, datetime

class JSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, bytes):
            return obj.decode("utf-8")
        if isinstance(obj, (date, datetime)):
            return str(obj.isoformat())
        return super(JSONEncoder, self).default(obj)


class ContentEncodingMiddleware(object):
    def process_response(self, req, resp, _, req_succeeded):
        if not req_succeeded:
            return
        if req.client_accepts_json:
            resp.set_header('Content-Type', 'application/json')
            resp.body = json.dumps(resp.body, cls=JSONEncoder)

Let’s create a plan by Postman.

That’s it!

You’ll able to find the codes and related repository on this link.

https://github.com/mertingen/python-falcon-framework-api

Falcon framework builds pretty-cool APIs. I hope, this article will be useful for you and see you in the next article. If you have any trouble, don’t hesitate to ask.

原文链接:Falcon API Framework on Docker – 2

© 版权声明
THE END
喜欢就支持一下吧
点赞7 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容