mirror of
https://github.com/httprunner/httprunner.git
synced 2026-05-11 18:11:21 +08:00
ApiServer: add support MD5 authentication
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
import hashlib
|
||||
import json
|
||||
from flask import Flask
|
||||
from flask import request, make_response
|
||||
from functools import wraps
|
||||
|
||||
from flask import Flask, make_response, request
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@@ -19,11 +21,38 @@ data structure:
|
||||
"""
|
||||
users_dict = {}
|
||||
|
||||
AUTHENTICATION = False
|
||||
TOKEN = "debugtalk"
|
||||
|
||||
def validate_request(func):
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwds):
|
||||
if not AUTHENTICATION:
|
||||
return func(*args, **kwds)
|
||||
|
||||
try:
|
||||
req_headers = request.headers
|
||||
req_authorization = req_headers['Authorization']
|
||||
random_str = req_headers['Random']
|
||||
data = request.data.decode("utf-8")
|
||||
authorization_str = "".join([TOKEN, data, random_str])
|
||||
authorization = hashlib.md5(authorization_str.encode('utf-8')).hexdigest()
|
||||
assert authorization == req_authorization
|
||||
return func(*args, **kwds)
|
||||
except (KeyError, AssertionError):
|
||||
return "Authorization failed!", 403
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@app.route('/')
|
||||
@validate_request
|
||||
def index():
|
||||
return "Hello World!"
|
||||
|
||||
@app.route('/customize-response', methods=['POST'])
|
||||
@validate_request
|
||||
def get_customized_response():
|
||||
expected_resp_json = request.get_json()
|
||||
status_code = expected_resp_json.get('status_code', 200)
|
||||
@@ -37,6 +66,7 @@ def get_customized_response():
|
||||
return response
|
||||
|
||||
@app.route('/api/users')
|
||||
@validate_request
|
||||
def get_users():
|
||||
users_list = [user for uid, user in users_dict.items()]
|
||||
users = {
|
||||
@@ -49,6 +79,7 @@ def get_users():
|
||||
return response
|
||||
|
||||
@app.route('/api/users', methods=['DELETE'])
|
||||
@validate_request
|
||||
def clear_users():
|
||||
users_dict.clear()
|
||||
result = {
|
||||
@@ -59,6 +90,7 @@ def clear_users():
|
||||
return response
|
||||
|
||||
@app.route('/api/users/<int:uid>', methods=['POST'])
|
||||
@validate_request
|
||||
def create_user(uid):
|
||||
user = request.get_json()
|
||||
if uid not in users_dict:
|
||||
@@ -80,6 +112,7 @@ def create_user(uid):
|
||||
return response
|
||||
|
||||
@app.route('/api/users/<int:uid>')
|
||||
@validate_request
|
||||
def get_user(uid):
|
||||
user = users_dict.get(uid, {})
|
||||
if user:
|
||||
@@ -100,6 +133,7 @@ def get_user(uid):
|
||||
return response
|
||||
|
||||
@app.route('/api/users/<int:uid>', methods=['PUT'])
|
||||
@validate_request
|
||||
def update_user(uid):
|
||||
user = users_dict.get(uid, {})
|
||||
if user:
|
||||
@@ -119,6 +153,7 @@ def update_user(uid):
|
||||
return response
|
||||
|
||||
@app.route('/api/users/<int:uid>', methods=['DELETE'])
|
||||
@validate_request
|
||||
def delete_user(uid):
|
||||
user = users_dict.pop(uid, {})
|
||||
if user:
|
||||
|
||||
25
test/base.py
25
test/base.py
@@ -1,14 +1,22 @@
|
||||
import hashlib
|
||||
import multiprocessing
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from . import api_server
|
||||
|
||||
|
||||
class ApiServerUnittest(unittest.TestCase):
|
||||
""" Test case class that sets up an HTTP server which can be used within the tests
|
||||
"""
|
||||
Test case class that sets up an HTTP server which can be used within the tests
|
||||
"""
|
||||
|
||||
authentication = False
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
api_server.AUTHENTICATION = cls.authentication
|
||||
cls.api_server_process = multiprocessing.Process(
|
||||
target=api_server.app.run
|
||||
)
|
||||
@@ -18,3 +26,16 @@ class ApiServerUnittest(unittest.TestCase):
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.api_server_process.terminate()
|
||||
|
||||
def prepare_headers(self, data=""):
|
||||
token = api_server.TOKEN
|
||||
random_str = ''.join(
|
||||
random.choice(string.ascii_uppercase + string.digits) for _ in range(5))
|
||||
|
||||
authorization_str = "".join([token, data, random_str])
|
||||
authorization = hashlib.md5(authorization_str.encode('utf-8')).hexdigest()
|
||||
headers = {
|
||||
'authorization': authorization,
|
||||
'random': random_str
|
||||
}
|
||||
return headers
|
||||
|
||||
Reference in New Issue
Block a user