Files
httprunner/httprunner/testcase.py
debugtalk dcc1e70181 refactor hook mechanism:
1, remove EventHook;
2, setup_hooks: could reference request dict;
3, teardown_hooks: could reference Response object.
2018-05-10 13:40:47 +08:00

921 lines
31 KiB
Python

# encoding: utf-8
import ast
import collections
import io
import itertools
import json
import os
import random
import re
from httprunner import exception, logger, utils
from httprunner.compat import OrderedDict, basestring, numeric_types
from httprunner.utils import FileUtils
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
"""
try:
return re.findall(variable_regexp, content)
except TypeError:
return []
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
@param (str) content
@return (list) functions list
e.g. ${func(5)} => ["func(5)"]
${func(a=1, b=2)} => ["func(a=1, b=2)"]
/api/1000?_t=${get_timestamp()} => ["get_timestamp()"]
/api/${add(1, 2)} => ["add(1, 2)"]
"/api/${add(1, 2)}?_t=${get_timestamp()}" => ["add(1, 2)", "get_timestamp()"]
"""
try:
return re.findall(function_regexp, content)
except TypeError:
return []
def parse_string_value(str_value):
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
matched = function_regexp_compile.match(content)
if not matched:
raise exception.FunctionNotFound("{} not found!".format(content))
function_meta = {
"func_name": matched.group(1),
"args": [],
"kwargs": {}
}
args_str = matched.group(2).replace(" ", "")
if args_str == "":
return function_meta
args_list = args_str.split(',')
for arg in args_list:
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key] = parse_string_value(value)
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
class TestcaseLoader(object):
overall_def_dict = {
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
@staticmethod
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
# TODO: cache api and suite loading
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
for test_file in FileUtils.load_folder_files(api_def_folder):
TestcaseLoader.load_api_file(test_file)
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
for suite_file in FileUtils.load_folder_files(suite_def_folder):
suite = TestcaseLoader.load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
TestcaseLoader.overall_def_dict["suite"][function_meta["func_name"]] = suite
@staticmethod
def load_api_file(file_path):
""" load api definition from file and store in overall_def_dict["api"]
api file should be in format below:
[
{
"api": {
"def": "api_login",
"request": {},
"validate": []
}
},
{
"api": {
"def": "api_logout",
"request": {},
"validate": []
}
}
]
"""
api_items = FileUtils.load_file(file_path)
if not isinstance(api_items, list):
raise exception.FileFormatError("API format error: {}".format(file_path))
for api_item in api_items:
if not isinstance(api_item, dict) or len(api_item) != 1:
raise exception.FileFormatError("API format error: {}".format(file_path))
key, api_dict = api_item.popitem()
if key != "api" or not isinstance(api_dict, dict) or "def" not in api_dict:
raise exception.FileFormatError("API format error: {}".format(file_path))
api_def = api_dict.pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
if func_name in TestcaseLoader.overall_def_dict["api"]:
logger.log_warning("API definition duplicated: {}".format(func_name))
api_dict["function_meta"] = function_meta
TestcaseLoader.overall_def_dict["api"][func_name] = api_dict
@staticmethod
def load_test_file(file_path):
""" load testcase file or suite file
@param file_path: absolute valid file path
file_path should be in format below:
[
{
"config": {
"name": "",
"def": "suite_order()",
"request": {}
}
},
{
"test": {
"name": "add product to cart",
"api": "api_add_cart()",
"validate": []
}
},
{
"test": {
"name": "checkout cart",
"request": {},
"validate": []
}
}
]
@return testset dict
{
"name": "desc1",
"config": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"name": "",
"config": {
"path": file_path
},
"testcases": [] # TODO: rename to tests
}
for item in FileUtils.load_file(file_path):
if not isinstance(item, dict) or len(item) != 1:
raise exception.FileFormatError("Testcase format error: {}".format(file_path))
key, test_block = item.popitem()
if not isinstance(test_block, dict):
raise exception.FileFormatError("Testcase format error: {}".format(file_path))
if key == "config":
testset["config"].update(test_block)
testset["name"] = test_block.get("name", "")
elif key == "test":
if "api" in test_block:
ref_call = test_block["api"]
def_block = TestcaseLoader._get_block_by_name(ref_call, "api")
TestcaseLoader._override_block(def_block, test_block)
testset["testcases"].append(test_block)
elif "suite" in test_block:
ref_call = test_block["suite"]
block = TestcaseLoader._get_block_by_name(ref_call, "suite")
testset["testcases"].extend(block["testcases"])
else:
testset["testcases"].append(test_block)
else:
logger.log_warning(
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
)
return testset
@staticmethod
def _get_block_by_name(ref_call, ref_type):
""" get test content by reference name
@params:
ref_call: e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_call)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
block = TestcaseLoader._get_test_definition(func_name, ref_type)
def_args = block.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exception.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
block = substitute_variables_with_mapping(block, args_mapping)
return block
@staticmethod
def _get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
block = TestcaseLoader.overall_def_dict.get(ref_type, {}).get(name)
if not block:
err_msg = "{} not found!".format(name)
if ref_type == "api":
raise exception.ApiNotFound(err_msg)
else:
# ref_type == "suite":
raise exception.SuiteNotFound(err_msg)
return block
@staticmethod
def _override_block(def_block, current_block):
""" override def_block with current_block
@param def_block:
{
"name": "get token",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
@param current_block:
{
"name": "get token",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
def_validators = def_block.get("validate") or def_block.get("validators", [])
current_validators = current_block.get("validate") or current_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
current_extractors = current_block.get("extract") \
or current_block.get("extractors") \
or current_block.get("extract_binds", [])
current_block.update(def_block)
current_block["validate"] = _merge_validator(
def_validators,
current_validators
)
current_block["extract"] = _merge_extractor(
def_extrators,
current_extractors
)
@staticmethod
def load_testsets_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
for file_path in set(path):
testset = TestcaseLoader.load_testsets_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
return testsets
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in TestcaseLoader.testcases_cache_mapping:
return TestcaseLoader.testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = FileUtils.load_folder_files(path)
testcases_list = TestcaseLoader.load_testsets_by_path(files_list)
elif os.path.isfile(path):
try:
testset = TestcaseLoader.load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exception.FileFormatError:
testcases_list = []
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
TestcaseLoader.testcases_cache_mapping[path] = testcases_list
return testcases_list
def parse_validator(validator):
""" parse validator, validator maybe in two format
@param (dict) validator
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
@return (dict) validator info
{
"check": "status_code",
"expect": 201,
"comparator": "eq"
}
"""
if not isinstance(validator, dict):
raise exception.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and len(validator) > 1:
# format1
check_item = validator.get("check")
if "expect" in validator:
expect_value = validator.get("expect")
elif "expected" in validator:
expect_value = validator.get("expected")
else:
raise exception.ParamsError("invalid validator: {}".format(validator))
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exception.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exception.ParamsError("invalid validator: {}".format(validator))
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _get_validators_mapping(validators):
""" get validators mapping from api or test validators
@param (list) validators:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
@return
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def _merge_validator(def_validators, current_validators):
""" merge def_validators with current_validators
@params:
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not def_validators:
return current_validators
elif not current_validators:
return def_validators
else:
api_validators_mapping = _get_validators_mapping(def_validators)
test_validators_mapping = _get_validators_mapping(current_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def _merge_extractor(def_extrators, current_extractors):
""" merge def_extrators with current_extractors
@params:
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not def_extrators:
return current_extractors
elif not current_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in current_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def is_testset(data_structure):
""" check if data_structure is a testset
testset should always be in the following data structure:
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
if not isinstance(data_structure, dict):
return False
if "name" not in data_structure or "testcases" not in data_structure:
return False
if not isinstance(data_structure["testcases"], list):
return False
return True
def is_testsets(data_structure):
""" check if data_structure is testset or testsets
testsets should always be in the following data structure:
testset_dict
or
[
testset_dict_1,
testset_dict_2
]
"""
if not isinstance(data_structure, list):
return is_testset(data_structure)
for item in data_structure:
if not is_testset(item):
return False
return True
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
# TODO: refactor type check
if isinstance(content, bool):
return content
if isinstance(content, (numeric_types, type)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
content = content.replace(var, str(value))
return content
def gen_cartesian_product(*args):
""" generate cartesian product for lists
@param
(list) args
[{"a": 1}, {"a": 2}],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
@return
cartesian product in list
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
"""
if not args:
return []
elif len(args) == 1:
return args[0]
product_list = []
for product_item_tuple in itertools.product(*args):
product_item_dict = {}
for item in product_item_tuple:
product_item_dict.update(item)
product_list.append(product_item_dict)
return product_list
def parse_parameters(parameters, testset_path=None):
""" parse parameters and generate cartesian product
@params
(list) parameters: parameter name and value in list
parameter value may be in three types:
(1) data list
(2) call built-in parameterize function
(3) call custom function in debugtalk.py
e.g.
[
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": "${parameterize(account.csv)}"},
{"app_version": "${gen_app_version()}"}
]
(str) testset_path: testset file path, used for locating csv file and debugtalk.py
@return cartesian product in list
"""
testcase_parser = TestcaseParser(file_path=testset_path)
parsed_parameters_list = []
for parameter in parameters:
parameter_name, parameter_content = list(parameter.items())[0]
parameter_name_list = parameter_name.split("-")
if isinstance(parameter_content, list):
# (1) data list
# e.g. {"app_version": ["2.8.5", "2.8.6"]}
# => [{"app_version": "2.8.5", "app_version": "2.8.6"}]
# e.g. {"username-password": [["user1", "111111"], ["test2", "222222"]}
# => [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
parameter_content_list = []
for parameter_item in parameter_content:
if not isinstance(parameter_item, (list, tuple)):
# "2.8.5" => ["2.8.5"]
parameter_item = [parameter_item]
# ["app_version"], ["2.8.5"] => {"app_version": "2.8.5"}
# ["username", "password"], ["user1", "111111"] => {"username": "user1", "password": "111111"}
parameter_content_dict = dict(zip(parameter_name_list, parameter_item))
parameter_content_list.append(parameter_content_dict)
else:
# (2) & (3)
parsed_parameter_content = testcase_parser.eval_content_with_bindings(parameter_content)
# e.g. [{'app_version': '2.8.5'}, {'app_version': '2.8.6'}]
# e.g. [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
if not isinstance(parsed_parameter_content, list):
raise exception.ParamsError("parameters syntax error!")
parameter_content_list = [
# get subset by parameter name
{key: parameter_item[key] for key in parameter_name_list}
for parameter_item in parsed_parameter_content
]
parsed_parameters_list.append(parameter_content_list)
return gen_cartesian_product(*parsed_parameters_list)
class TestcaseParser(object):
def __init__(self, variables={}, functions={}, file_path=None):
self.update_binded_variables(variables)
self.bind_functions(functions)
self.file_path = file_path
def update_binded_variables(self, variables):
""" bind variables to current testcase parser
@param (dict) variables, variables binds mapping
{
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"data": {"name": "user", "password": "123456"},
"uuid": 1000
}
"""
self.variables = variables
def bind_functions(self, functions):
""" bind functions to current testcase parser
@param (dict) functions, functions binds mapping
{
"add_two_nums": lambda a, b=1: a + b
}
"""
self.functions = functions
def _get_bind_item(self, item_type, item_name):
if item_type == "function":
if item_name in self.functions:
return self.functions[item_name]
try:
# check if builtin functions
item_func = eval(item_name)
if callable(item_func):
# is builtin function
return item_func
except (NameError, TypeError):
# is not builtin function, continue to search
pass
elif item_type == "variable":
if item_name in self.variables:
return self.variables[item_name]
else:
raise exception.ParamsError("bind item should only be function or variable.")
try:
assert self.file_path is not None
return utils.search_conf_item(self.file_path, item_type, item_name)
except (AssertionError, exception.FunctionNotFound):
raise exception.ParamsError(
"{} is not defined in bind {}s!".format(item_name, item_type))
def get_bind_function(self, func_name):
return self._get_bind_item("function", func_name)
def get_bind_variable(self, variable_name):
return self._get_bind_item("variable", variable_name)
def parameterize(self, csv_file_name, fetch_method="Sequential"):
parameter_file_path = os.path.join(
os.path.dirname(self.file_path),
"{}".format(csv_file_name)
)
csv_content_list = FileUtils.load_file(parameter_file_path)
if fetch_method.lower() == "random":
random.shuffle(csv_content_list)
return csv_content_list
def _eval_content_functions(self, content):
functions_list = extract_functions(content)
for func_content in functions_list:
function_meta = parse_function(func_content)
func_name = function_meta['func_name']
args = function_meta.get('args', [])
kwargs = function_meta.get('kwargs', {})
args = self.eval_content_with_bindings(args)
kwargs = self.eval_content_with_bindings(kwargs)
if func_name in ["parameterize", "P"]:
eval_value = self.parameterize(*args, **kwargs)
else:
func = self.get_bind_function(func_name)
eval_value = func(*args, **kwargs)
func_content = "${" + func_content + "}"
if func_content == content:
# content is a variable
content = eval_value
else:
# content contains one or many variables
content = content.replace(
func_content,
str(eval_value), 1
)
return content
def _eval_content_variables(self, content):
""" replace all variables of string content with mapping value.
@param (str) content
@return (str) parsed content
e.g.
variable_mapping = {
"var_1": "abc",
"var_2": "def"
}
$var_1 => "abc"
$var_1#XYZ => "abc#XYZ"
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_variable(variable_name)
if "${}".format(variable_name) == content:
# content is a variable
content = variable_value
else:
# content contains one or several variables
content = content.replace(
"${}".format(variable_name),
str(variable_value), 1
)
return content
def eval_content_with_bindings(self, content):
""" parse content recursively, each variable and function in content will be evaluated.
@param (dict) content in any data structure
{
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1, 1)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
@return (dict) parsed content with evaluated bind values
{
"url": "http://127.0.0.1:5000/api/users/1000/2",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"sum": 3
},
"body": {"name": "user", "password": "123456"}
}
"""
if content is None:
return None
if isinstance(content, (list, tuple)):
return [
self.eval_content_with_bindings(item)
for item in content
]
if isinstance(content, dict):
evaluated_data = {}
for key, value in content.items():
eval_key = self.eval_content_with_bindings(key)
eval_value = self.eval_content_with_bindings(value)
evaluated_data[eval_key] = eval_value
return evaluated_data
if isinstance(content, basestring):
# content is in string format here
content = content.strip()
# replace functions with evaluated value
# Notice: _eval_content_functions must be called before _eval_content_variables
content = self._eval_content_functions(content)
# replace variables with binding value
content = self._eval_content_variables(content)
return content