Files
httprunner/httprunner/testcase.py
2018-03-27 22:39:43 +08:00

878 lines
29 KiB
Python

# encoding: utf-8
import ast
import csv
import io
import itertools
import json
import os
import random
import re
import yaml
from httprunner import exception, logger, utils
from httprunner.compat import OrderedDict, numeric_types
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
test_def_overall_dict = {
"loaded": False,
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
def _load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
check_format(yaml_file, yaml_content)
return yaml_content
def _load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exception.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exception.FileFormatError(err_msg)
check_format(json_file, json_content)
return json_content
def _load_csv_file(csv_file):
""" load csv file and check file content format
@param
csv_file: csv file path
e.g. csv file content:
username,password
test1,111111
test2,222222
test3,333333
@return
list of parameter, each parameter is in dict format
e.g.
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exception.FileNotFoundError("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return _load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return _load_yaml_file(file_path)
elif file_suffix == ".csv":
return _load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
"""
try:
return re.findall(variable_regexp, content)
except TypeError:
return []
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
@param (str) content
@return (list) functions list
e.g. ${func(5)} => ["func(5)"]
${func(a=1, b=2)} => ["func(a=1, b=2)"]
/api/1000?_t=${get_timestamp()} => ["get_timestamp()"]
/api/${add(1, 2)} => ["add(1, 2)"]
"/api/${add(1, 2)}?_t=${get_timestamp()}" => ["add(1, 2)", "get_timestamp()"]
"""
try:
return re.findall(function_regexp, content)
except TypeError:
return []
def parse_string_value(str_value):
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
function_meta = {
"args": [],
"kwargs": {}
}
matched = function_regexp_compile.match(content)
function_meta["func_name"] = matched.group(1)
args_str = matched.group(2).replace(" ", "")
if args_str == "":
return function_meta
args_list = args_str.split(',')
for arg in args_list:
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key] = parse_string_value(value)
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
test_def_overall_dict["loaded"] = True
test_def_overall_dict["api"] = {}
test_def_overall_dict["suite"] = {}
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
api_files = utils.load_folder_files(api_def_folder)
for test_file in api_files:
testset = load_test_file(test_file)
test_def_overall_dict["api"].update(testset["api"])
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
suite_files = utils.load_folder_files(suite_def_folder)
for suite_file in suite_files:
suite = load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
test_def_overall_dict["suite"][function_meta["func_name"]] = suite
def load_testcases_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
for file_path in set(path):
testset = load_testcases_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
return testsets
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = utils.load_folder_files(path)
testcases_list = load_testcases_by_path(files_list)
elif os.path.isfile(path):
try:
testset = load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exception.FileFormatError:
testcases_list = []
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
testcases_cache_mapping[path] = testcases_list
return testcases_list
def parse_validator(validator):
""" parse validator, validator maybe in two format
@param (dict) validator
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
@return (dict) validator info
{
"check": "status_code",
"expect": 201,
"comparator": "eq"
}
"""
if not isinstance(validator, dict):
raise exception.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and len(validator) > 1:
# format1
check_item = validator.get("check")
if "expect" in validator:
expect_value = validator.get("expect")
elif "expected" in validator:
expect_value = validator.get("expected")
else:
raise exception.ParamsError("invalid validator: {}".format(validator))
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exception.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exception.ParamsError("invalid validator: {}".format(validator))
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def merge_validator(api_validators, test_validators):
""" merge api_validators with test_validators
@params:
api_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
test_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not api_validators:
return test_validators
elif not test_validators:
return api_validators
else:
api_validators_mapping = {}
for api_validator in api_validators:
api_validator = parse_validator(api_validator)
key = (api_validator["check"], api_validator["comparator"])
api_validators_mapping[key] = api_validator
test_validators_mapping = {}
for test_validator in test_validators:
test_validator = parse_validator(test_validator)
key = (test_validator["check"], test_validator["comparator"])
test_validators_mapping[key] = test_validator
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def merge_extractor(api_extrators, test_extracors):
""" merge api_extrators with test_extracors
@params:
api_extrators: [{"var1": "val1"}, {"var2": "val2"}]
test_extracors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not api_extrators:
return test_extracors
elif not test_extracors:
return api_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in api_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in test_extracors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def extend_test_api(test_block_dict):
""" update test block api with api definition
@param
test_block_dict:
{
"name": "get token",
"api": "get_token($user_agent, $device_sn, $os_platform, $app_version)",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 200]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 200]}, {'len_eq': ['content.token', 16]}]
}
"""
ref_name = test_block_dict["api"]
test_info = get_testinfo_by_reference(ref_name, "api")
api_validators = test_info.get("validate") or test_info.get("validators", [])
test_validators = test_block_dict.get("validate") or test_block_dict.get("validators", [])
api_extrators = test_info.get("extract") \
or test_info.get("extractors") \
or test_info.get("extract_binds", [])
test_extracors = test_block_dict.get("extract") \
or test_block_dict.get("extractors") \
or test_block_dict.get("extract_binds", [])
test_block_dict.update(test_info)
test_block_dict["validate"] = merge_validator(
api_validators,
test_validators
)
test_block_dict["extract"] = merge_extractor(
api_extrators,
test_extracors
)
def load_test_file(file_path):
""" load testset file, get testset data structure.
@param file_path: absolute valid testset file path
@return testset dict
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"name": "",
"config": {
"path": file_path
},
"api": {},
"testcases": []
}
tests_list = load_file(file_path)
for item in tests_list:
for key in item:
if key == "config":
testset["config"].update(item["config"])
testset["name"] = item["config"].get("name", "")
elif key == "test":
test_block_dict = item["test"]
if "api" in test_block_dict:
extend_test_api(test_block_dict)
testset["testcases"].append(test_block_dict)
elif "suite" in test_block_dict:
ref_name = test_block_dict["suite"]
test_info = get_testinfo_by_reference(ref_name, "suite")
testset["testcases"].extend(test_info["testcases"])
else:
testset["testcases"].append(test_block_dict)
elif key == "api":
api_def = item["api"].pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
api_info = {}
api_info["function_meta"] = function_meta
api_info.update(item["api"])
testset["api"][func_name] = api_info
else:
logger.log_warning("unexpected block: {}. block should only be config or test.".format(key))
return testset
def get_testinfo_by_reference(ref_name, ref_type):
""" get test content by reference name
@params:
ref_name: reference name, e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_name)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
test_info = get_test_definition(func_name, ref_type)
def_args = test_info.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exception.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
test_info = substitute_variables_with_mapping(test_info, args_mapping)
return test_info
def get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
if not test_def_overall_dict.get("loaded", False):
load_test_dependencies()
test_info = test_def_overall_dict.get(ref_type, {}).get(name)
if not test_info:
err_msg = "{} {} not found!".format(ref_type, name)
if ref_type == "api":
raise exception.ApiNotFound(err_msg)
elif ref_type == "suite":
raise exception.SuiteNotFound(err_msg)
else:
raise exception.ParamsError("ref_type can only be api or suite!")
return test_info
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
if isinstance(content, bool):
return content
if isinstance(content, (numeric_types, type)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
content = content.replace(var, str(value))
return content
def check_format(file_path, content):
""" check testcase format if valid
"""
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exception.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exception.FileFormatError(err_msg)
def gen_cartesian_product(*args):
""" generate cartesian product for lists
@param
(list) args
[{"a": 1}, {"a": 2}],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
@return
cartesian product in list
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
"""
if not args:
return []
elif len(args) == 1:
return args[0]
product_list = []
for product_item_tuple in itertools.product(*args):
product_item_dict = {}
for item in product_item_tuple:
product_item_dict.update(item)
product_list.append(product_item_dict)
return product_list
def parse_parameters(parameters, testset_path=None):
""" parse parameters and generate cartesian product
@params
(list) parameters: parameter name and value in list
parameter value may be in three types:
(1) data list
(2) call built-in parameterize function
(3) call custom function in debugtalk.py
e.g.
[
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": "${parameterize(account.csv)}"},
{"app_version": "${gen_app_version()}"}
]
(str) testset_path: testset file path, used for locating csv file and debugtalk.py
@return cartesian product in list
"""
testcase_parser = TestcaseParser(file_path=testset_path)
parsed_parameters_list = []
for parameter in parameters:
parameter_name, parameter_content = list(parameter.items())[0]
parameter_name_list = parameter_name.split("-")
if isinstance(parameter_content, list):
# (1) data list
# e.g. {"app_version": ["2.8.5", "2.8.6"]}
# => [{"app_version": "2.8.5", "app_version": "2.8.6"}]
# e.g. {"username-password": [["user1", "111111"], ["test2", "222222"]}
# => [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
parameter_content_list = []
for parameter_item in parameter_content:
if not isinstance(parameter_item, (list, tuple)):
# "2.8.5" => ["2.8.5"]
parameter_item = [parameter_item]
# ["app_version"], ["2.8.5"] => {"app_version": "2.8.5"}
# ["username", "password"], ["user1", "111111"] => {"username": "user1", "password": "111111"}
parameter_content_dict = dict(zip(parameter_name_list, parameter_item))
parameter_content_list.append(parameter_content_dict)
else:
# (2) & (3)
parsed_parameter_content = testcase_parser.eval_content_with_bindings(parameter_content)
# e.g. [{'app_version': '2.8.5'}, {'app_version': '2.8.6'}]
# e.g. [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
if not isinstance(parsed_parameter_content, list):
raise exception.ParamsError("parameters syntax error!")
parameter_content_list = [
# get subset by parameter name
{key: parameter_item[key] for key in parameter_name_list}
for parameter_item in parsed_parameter_content
]
parsed_parameters_list.append(parameter_content_list)
return gen_cartesian_product(*parsed_parameters_list)
class TestcaseParser(object):
def __init__(self, variables={}, functions={}, file_path=None):
self.update_binded_variables(variables)
self.bind_functions(functions)
self.file_path = file_path
def update_binded_variables(self, variables):
""" bind variables to current testcase parser
@param (dict) variables, variables binds mapping
{
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"data": {"name": "user", "password": "123456"},
"uuid": 1000
}
"""
self.variables = variables
def bind_functions(self, functions):
""" bind functions to current testcase parser
@param (dict) functions, functions binds mapping
{
"add_two_nums": lambda a, b=1: a + b
}
"""
self.functions = functions
def _get_bind_item(self, item_type, item_name):
if item_type == "function":
if item_name in self.functions:
return self.functions[item_name]
try:
# check if builtin functions
item_func = eval(item_name)
if callable(item_func):
# is builtin function
return item_func
except (NameError, TypeError):
# is not builtin function, continue to search
pass
elif item_type == "variable":
if item_name in self.variables:
return self.variables[item_name]
else:
raise exception.ParamsError("bind item should only be function or variable.")
try:
assert self.file_path is not None
return utils.search_conf_item(self.file_path, item_type, item_name)
except (AssertionError, exception.FunctionNotFound):
raise exception.ParamsError(
"{} is not defined in bind {}s!".format(item_name, item_type))
def get_bind_function(self, func_name):
return self._get_bind_item("function", func_name)
def get_bind_variable(self, variable_name):
return self._get_bind_item("variable", variable_name)
def parameterize(self, csv_file_name, fetch_method="Sequential"):
parameter_file_path = os.path.join(
os.path.dirname(self.file_path),
"{}".format(csv_file_name)
)
csv_content_list = load_file(parameter_file_path)
if fetch_method.lower() == "random":
random.shuffle(csv_content_list)
return csv_content_list
def _eval_content_functions(self, content):
functions_list = extract_functions(content)
for func_content in functions_list:
function_meta = parse_function(func_content)
func_name = function_meta['func_name']
args = function_meta.get('args', [])
kwargs = function_meta.get('kwargs', {})
args = self.eval_content_with_bindings(args)
kwargs = self.eval_content_with_bindings(kwargs)
if func_name in ["parameterize", "P"]:
eval_value = self.parameterize(*args, **kwargs)
else:
func = self.get_bind_function(func_name)
eval_value = func(*args, **kwargs)
func_content = "${" + func_content + "}"
if func_content == content:
# content is a variable
content = eval_value
else:
# content contains one or many variables
content = content.replace(
func_content,
str(eval_value), 1
)
return content
def _eval_content_variables(self, content):
""" replace all variables of string content with mapping value.
@param (str) content
@return (str) parsed content
e.g.
variable_mapping = {
"var_1": "abc",
"var_2": "def"
}
$var_1 => "abc"
$var_1#XYZ => "abc#XYZ"
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_variable(variable_name)
if "${}".format(variable_name) == content:
# content is a variable
content = variable_value
else:
# content contains one or many variables
content = content.replace(
"${}".format(variable_name),
str(variable_value), 1
)
return content
def eval_content_with_bindings(self, content):
""" parse content recursively, each variable and function in content will be evaluated.
@param (dict) content in any data structure
{
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1, 1)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
@return (dict) parsed content with evaluated bind values
{
"url": "http://127.0.0.1:5000/api/users/1000/2",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"sum": 3
},
"body": {"name": "user", "password": "123456"}
}
"""
if content is None:
return None
if isinstance(content, (list, tuple)):
return [
self.eval_content_with_bindings(item)
for item in content
]
if isinstance(content, dict):
evaluated_data = {}
for key, value in content.items():
eval_key = self.eval_content_with_bindings(key)
eval_value = self.eval_content_with_bindings(value)
evaluated_data[eval_key] = eval_value
return evaluated_data
if isinstance(content, (numeric_types, type)):
return content
# content is in string format here
content = content.strip()
# replace functions with evaluated value
# Notice: _eval_content_functions must be called before _eval_content_variables
content = self._eval_content_functions(content)
# replace variables with binding value
content = self._eval_content_variables(content)
return content