From 3b92859be3c147d621b696ccfba27fbe2c655638 Mon Sep 17 00:00:00 2001 From: debugtalk Date: Thu, 22 Jun 2017 18:37:48 +0800 Subject: [PATCH] ApiServer: add support MD5 authentication --- test/api_server.py | 39 +++++++++++++++++++++++++++++++++++++-- test/base.py | 25 +++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/test/api_server.py b/test/api_server.py index c762c36a..05f224f1 100644 --- a/test/api_server.py +++ b/test/api_server.py @@ -1,6 +1,8 @@ +import hashlib import json -from flask import Flask -from flask import request, make_response +from functools import wraps + +from flask import Flask, make_response, request app = Flask(__name__) @@ -19,11 +21,38 @@ data structure: """ users_dict = {} +AUTHENTICATION = False +TOKEN = "debugtalk" + +def validate_request(func): + + @wraps(func) + def wrapper(*args, **kwds): + if not AUTHENTICATION: + return func(*args, **kwds) + + try: + req_headers = request.headers + req_authorization = req_headers['Authorization'] + random_str = req_headers['Random'] + data = request.data.decode("utf-8") + authorization_str = "".join([TOKEN, data, random_str]) + authorization = hashlib.md5(authorization_str.encode('utf-8')).hexdigest() + assert authorization == req_authorization + return func(*args, **kwds) + except (KeyError, AssertionError): + return "Authorization failed!", 403 + + return wrapper + + @app.route('/') +@validate_request def index(): return "Hello World!" @app.route('/customize-response', methods=['POST']) +@validate_request def get_customized_response(): expected_resp_json = request.get_json() status_code = expected_resp_json.get('status_code', 200) @@ -37,6 +66,7 @@ def get_customized_response(): return response @app.route('/api/users') +@validate_request def get_users(): users_list = [user for uid, user in users_dict.items()] users = { @@ -49,6 +79,7 @@ def get_users(): return response @app.route('/api/users', methods=['DELETE']) +@validate_request def clear_users(): users_dict.clear() result = { @@ -59,6 +90,7 @@ def clear_users(): return response @app.route('/api/users/', methods=['POST']) +@validate_request def create_user(uid): user = request.get_json() if uid not in users_dict: @@ -80,6 +112,7 @@ def create_user(uid): return response @app.route('/api/users/') +@validate_request def get_user(uid): user = users_dict.get(uid, {}) if user: @@ -100,6 +133,7 @@ def get_user(uid): return response @app.route('/api/users/', methods=['PUT']) +@validate_request def update_user(uid): user = users_dict.get(uid, {}) if user: @@ -119,6 +153,7 @@ def update_user(uid): return response @app.route('/api/users/', methods=['DELETE']) +@validate_request def delete_user(uid): user = users_dict.pop(uid, {}) if user: diff --git a/test/base.py b/test/base.py index 1813a89b..c02d5287 100644 --- a/test/base.py +++ b/test/base.py @@ -1,14 +1,22 @@ +import hashlib import multiprocessing +import random +import string import time import unittest + from . import api_server + class ApiServerUnittest(unittest.TestCase): + """ Test case class that sets up an HTTP server which can be used within the tests """ - Test case class that sets up an HTTP server which can be used within the tests - """ + + authentication = False + @classmethod def setUpClass(cls): + api_server.AUTHENTICATION = cls.authentication cls.api_server_process = multiprocessing.Process( target=api_server.app.run ) @@ -18,3 +26,16 @@ class ApiServerUnittest(unittest.TestCase): @classmethod def tearDownClass(cls): cls.api_server_process.terminate() + + def prepare_headers(self, data=""): + token = api_server.TOKEN + random_str = ''.join( + random.choice(string.ascii_uppercase + string.digits) for _ in range(5)) + + authorization_str = "".join([token, data, random_str]) + authorization = hashlib.md5(authorization_str.encode('utf-8')).hexdigest() + headers = { + 'authorization': authorization, + 'random': random_str + } + return headers