Files
httprunner/httprunner/testcase.py
2018-04-29 13:30:25 +08:00

861 lines
28 KiB
Python

# encoding: utf-8
import ast
import collections
import io
import itertools
import json
import os
import random
import re
from httprunner import exception, logger, utils
from httprunner.utils import FileUtils
from httprunner.compat import OrderedDict, numeric_types
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
test_def_overall_dict = {
"loaded": False,
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
"""
try:
return re.findall(variable_regexp, content)
except TypeError:
return []
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
@param (str) content
@return (list) functions list
e.g. ${func(5)} => ["func(5)"]
${func(a=1, b=2)} => ["func(a=1, b=2)"]
/api/1000?_t=${get_timestamp()} => ["get_timestamp()"]
/api/${add(1, 2)} => ["add(1, 2)"]
"/api/${add(1, 2)}?_t=${get_timestamp()}" => ["add(1, 2)", "get_timestamp()"]
"""
try:
return re.findall(function_regexp, content)
except TypeError:
return []
def parse_string_value(str_value):
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
function_meta = {
"args": [],
"kwargs": {}
}
matched = function_regexp_compile.match(content)
function_meta["func_name"] = matched.group(1)
args_str = matched.group(2).replace(" ", "")
if args_str == "":
return function_meta
args_list = args_str.split(',')
for arg in args_list:
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key] = parse_string_value(value)
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
test_def_overall_dict["loaded"] = True
test_def_overall_dict["api"] = {}
test_def_overall_dict["suite"] = {}
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
api_files = FileUtils.load_folder_files(api_def_folder)
for test_file in api_files:
testset = load_test_file(test_file)
test_def_overall_dict["api"].update(testset["api"])
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
suite_files = FileUtils.load_folder_files(suite_def_folder)
for suite_file in suite_files:
suite = load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
test_def_overall_dict["suite"][function_meta["func_name"]] = suite
def load_testsets_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
for file_path in set(path):
testset = load_testsets_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
return testsets
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = FileUtils.load_folder_files(path)
testcases_list = load_testsets_by_path(files_list)
elif os.path.isfile(path):
try:
testset = load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exception.FileFormatError:
testcases_list = []
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
testcases_cache_mapping[path] = testcases_list
return testcases_list
def parse_validator(validator):
""" parse validator, validator maybe in two format
@param (dict) validator
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
@return (dict) validator info
{
"check": "status_code",
"expect": 201,
"comparator": "eq"
}
"""
if not isinstance(validator, dict):
raise exception.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and len(validator) > 1:
# format1
check_item = validator.get("check")
if "expect" in validator:
expect_value = validator.get("expect")
elif "expected" in validator:
expect_value = validator.get("expected")
else:
raise exception.ParamsError("invalid validator: {}".format(validator))
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exception.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exception.ParamsError("invalid validator: {}".format(validator))
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _get_validators_mapping(validators):
""" get validators mapping from api or test validators
@param (list) validators:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
@return
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def merge_validator(api_validators, test_validators):
""" merge api_validators with test_validators
@params:
api_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
test_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not api_validators:
return test_validators
elif not test_validators:
return api_validators
else:
api_validators_mapping = _get_validators_mapping(api_validators)
test_validators_mapping = _get_validators_mapping(test_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def merge_extractor(api_extrators, test_extracors):
""" merge api_extrators with test_extracors
@params:
api_extrators: [{"var1": "val1"}, {"var2": "val2"}]
test_extracors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not api_extrators:
return test_extracors
elif not test_extracors:
return api_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in api_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in test_extracors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def extend_test_api(test_block_dict):
""" update test block api with api definition
@param
test_block_dict:
{
"name": "get token",
"api": "get_token($user_agent, $device_sn, $os_platform, $app_version)",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 200]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 200]}, {'len_eq': ['content.token', 16]}]
}
"""
ref_name = test_block_dict["api"]
test_info = get_testinfo_by_reference(ref_name, "api")
api_validators = test_info.get("validate") or test_info.get("validators", [])
test_validators = test_block_dict.get("validate") or test_block_dict.get("validators", [])
api_extrators = test_info.get("extract") \
or test_info.get("extractors") \
or test_info.get("extract_binds", [])
test_extracors = test_block_dict.get("extract") \
or test_block_dict.get("extractors") \
or test_block_dict.get("extract_binds", [])
test_block_dict.update(test_info)
test_block_dict["validate"] = merge_validator(
api_validators,
test_validators
)
test_block_dict["extract"] = merge_extractor(
api_extrators,
test_extracors
)
def is_testset(data_structure):
""" check if data_structure is a testset
testset should always be in the following data structure:
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
if not isinstance(data_structure, dict):
return False
if "name" not in data_structure or "testcases" not in data_structure:
return False
if not isinstance(data_structure["testcases"], list):
return False
return True
def is_testsets(data_structure):
""" check if data_structure is testset or testsets
testsets should always be in the following data structure:
testset_dict
or
[
testset_dict_1,
testset_dict_2
]
"""
if not isinstance(data_structure, list):
return is_testset(data_structure)
for item in data_structure:
if not is_testset(item):
return False
return True
def load_test_file(file_path):
""" load testset file, get testset data structure.
@param file_path: absolute valid testset file path
@return testset dict
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"name": "",
"config": {
"path": file_path
},
"api": {},
"testcases": []
}
tests_list = FileUtils.load_file(file_path)
for item in tests_list:
for key in item:
if key == "config":
testset["config"].update(item["config"])
testset["name"] = item["config"].get("name", "")
elif key == "test":
test_block_dict = item["test"]
if "api" in test_block_dict:
extend_test_api(test_block_dict)
testset["testcases"].append(test_block_dict)
elif "suite" in test_block_dict:
ref_name = test_block_dict["suite"]
test_info = get_testinfo_by_reference(ref_name, "suite")
testset["testcases"].extend(test_info["testcases"])
else:
testset["testcases"].append(test_block_dict)
elif key == "api":
api_def = item["api"].pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
if func_name in testset["api"]:
logger.log_warning("api definition duplicated: {}".format(func_name))
api_info = {}
api_info["function_meta"] = function_meta
api_info.update(item["api"])
testset["api"][func_name] = api_info
else:
logger.log_warning(
"unexpected block: {}. block should only be 'config', 'test' or 'api'.".format(key)
)
return testset
def get_testinfo_by_reference(ref_name, ref_type):
""" get test content by reference name
@params:
ref_name: reference name, e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_name)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
test_info = get_test_definition(func_name, ref_type)
def_args = test_info.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exception.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
test_info = substitute_variables_with_mapping(test_info, args_mapping)
return test_info
def get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
if not test_def_overall_dict.get("loaded", False):
load_test_dependencies()
test_info = test_def_overall_dict.get(ref_type, {}).get(name)
if not test_info:
err_msg = "{} {} not found!".format(ref_type, name)
if ref_type == "api":
raise exception.ApiNotFound(err_msg)
elif ref_type == "suite":
raise exception.SuiteNotFound(err_msg)
else:
raise exception.ParamsError("ref_type can only be api or suite!")
return test_info
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
if isinstance(content, bool):
return content
if isinstance(content, (numeric_types, type)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
content = content.replace(var, str(value))
return content
def gen_cartesian_product(*args):
""" generate cartesian product for lists
@param
(list) args
[{"a": 1}, {"a": 2}],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
@return
cartesian product in list
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
"""
if not args:
return []
elif len(args) == 1:
return args[0]
product_list = []
for product_item_tuple in itertools.product(*args):
product_item_dict = {}
for item in product_item_tuple:
product_item_dict.update(item)
product_list.append(product_item_dict)
return product_list
def parse_parameters(parameters, testset_path=None):
""" parse parameters and generate cartesian product
@params
(list) parameters: parameter name and value in list
parameter value may be in three types:
(1) data list
(2) call built-in parameterize function
(3) call custom function in debugtalk.py
e.g.
[
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": "${parameterize(account.csv)}"},
{"app_version": "${gen_app_version()}"}
]
(str) testset_path: testset file path, used for locating csv file and debugtalk.py
@return cartesian product in list
"""
testcase_parser = TestcaseParser(file_path=testset_path)
parsed_parameters_list = []
for parameter in parameters:
parameter_name, parameter_content = list(parameter.items())[0]
parameter_name_list = parameter_name.split("-")
if isinstance(parameter_content, list):
# (1) data list
# e.g. {"app_version": ["2.8.5", "2.8.6"]}
# => [{"app_version": "2.8.5", "app_version": "2.8.6"}]
# e.g. {"username-password": [["user1", "111111"], ["test2", "222222"]}
# => [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
parameter_content_list = []
for parameter_item in parameter_content:
if not isinstance(parameter_item, (list, tuple)):
# "2.8.5" => ["2.8.5"]
parameter_item = [parameter_item]
# ["app_version"], ["2.8.5"] => {"app_version": "2.8.5"}
# ["username", "password"], ["user1", "111111"] => {"username": "user1", "password": "111111"}
parameter_content_dict = dict(zip(parameter_name_list, parameter_item))
parameter_content_list.append(parameter_content_dict)
else:
# (2) & (3)
parsed_parameter_content = testcase_parser.eval_content_with_bindings(parameter_content)
# e.g. [{'app_version': '2.8.5'}, {'app_version': '2.8.6'}]
# e.g. [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
if not isinstance(parsed_parameter_content, list):
raise exception.ParamsError("parameters syntax error!")
parameter_content_list = [
# get subset by parameter name
{key: parameter_item[key] for key in parameter_name_list}
for parameter_item in parsed_parameter_content
]
parsed_parameters_list.append(parameter_content_list)
return gen_cartesian_product(*parsed_parameters_list)
class TestcaseParser(object):
def __init__(self, variables={}, functions={}, file_path=None):
self.update_binded_variables(variables)
self.bind_functions(functions)
self.file_path = file_path
def update_binded_variables(self, variables):
""" bind variables to current testcase parser
@param (dict) variables, variables binds mapping
{
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"data": {"name": "user", "password": "123456"},
"uuid": 1000
}
"""
self.variables = variables
def bind_functions(self, functions):
""" bind functions to current testcase parser
@param (dict) functions, functions binds mapping
{
"add_two_nums": lambda a, b=1: a + b
}
"""
self.functions = functions
def _get_bind_item(self, item_type, item_name):
if item_type == "function":
if item_name in self.functions:
return self.functions[item_name]
try:
# check if builtin functions
item_func = eval(item_name)
if callable(item_func):
# is builtin function
return item_func
except (NameError, TypeError):
# is not builtin function, continue to search
pass
elif item_type == "variable":
if item_name in self.variables:
return self.variables[item_name]
else:
raise exception.ParamsError("bind item should only be function or variable.")
try:
assert self.file_path is not None
return utils.search_conf_item(self.file_path, item_type, item_name)
except (AssertionError, exception.FunctionNotFound):
raise exception.ParamsError(
"{} is not defined in bind {}s!".format(item_name, item_type))
def get_bind_function(self, func_name):
return self._get_bind_item("function", func_name)
def get_bind_variable(self, variable_name):
return self._get_bind_item("variable", variable_name)
def parameterize(self, csv_file_name, fetch_method="Sequential"):
parameter_file_path = os.path.join(
os.path.dirname(self.file_path),
"{}".format(csv_file_name)
)
csv_content_list = FileUtils.load_file(parameter_file_path)
if fetch_method.lower() == "random":
random.shuffle(csv_content_list)
return csv_content_list
def _eval_content_functions(self, content):
functions_list = extract_functions(content)
for func_content in functions_list:
function_meta = parse_function(func_content)
func_name = function_meta['func_name']
args = function_meta.get('args', [])
kwargs = function_meta.get('kwargs', {})
args = self.eval_content_with_bindings(args)
kwargs = self.eval_content_with_bindings(kwargs)
if func_name in ["parameterize", "P"]:
eval_value = self.parameterize(*args, **kwargs)
else:
func = self.get_bind_function(func_name)
eval_value = func(*args, **kwargs)
func_content = "${" + func_content + "}"
if func_content == content:
# content is a variable
content = eval_value
else:
# content contains one or many variables
content = content.replace(
func_content,
str(eval_value), 1
)
return content
def _eval_content_variables(self, content):
""" replace all variables of string content with mapping value.
@param (str) content
@return (str) parsed content
e.g.
variable_mapping = {
"var_1": "abc",
"var_2": "def"
}
$var_1 => "abc"
$var_1#XYZ => "abc#XYZ"
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_variable(variable_name)
if "${}".format(variable_name) == content:
# content is a variable
content = variable_value
else:
# content contains one or several variables
content = content.replace(
"${}".format(variable_name),
str(variable_value), 1
)
return content
def eval_content_with_bindings(self, content):
""" parse content recursively, each variable and function in content will be evaluated.
@param (dict) content in any data structure
{
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1, 1)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
@return (dict) parsed content with evaluated bind values
{
"url": "http://127.0.0.1:5000/api/users/1000/2",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"sum": 3
},
"body": {"name": "user", "password": "123456"}
}
"""
if content is None:
return None
if isinstance(content, (list, tuple)):
return [
self.eval_content_with_bindings(item)
for item in content
]
if isinstance(content, dict):
evaluated_data = {}
for key, value in content.items():
eval_key = self.eval_content_with_bindings(key)
eval_value = self.eval_content_with_bindings(value)
evaluated_data[eval_key] = eval_value
return evaluated_data
if isinstance(content, (numeric_types, type)):
return content
# content is in string format here
content = content.strip()
# replace functions with evaluated value
# Notice: _eval_content_functions must be called before _eval_content_variables
content = self._eval_content_functions(content)
# replace variables with binding value
content = self._eval_content_variables(content)
return content