refactor testcase layer

This commit is contained in:
debugtalk
2018-05-02 16:29:35 +08:00
parent 915294ebd3
commit 4b9cc0a9bd
20 changed files with 505 additions and 937 deletions

View File

@@ -10,18 +10,12 @@ import random
import re
from httprunner import exception, logger, utils
from httprunner.utils import FileUtils
from httprunner.compat import OrderedDict, numeric_types
from httprunner.utils import FileUtils
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
test_def_overall_dict = {
"loaded": False,
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
def extract_variables(content):
@@ -86,6 +80,8 @@ def parse_function(content):
"kwargs": {}
}
matched = function_regexp_compile.match(content)
if not matched:
raise exception.ApiNotFound("{} not found!".format(content))
function_meta["func_name"] = matched.group(1)
args_str = matched.group(2).replace(" ", "")
@@ -102,86 +98,291 @@ def parse_function(content):
return function_meta
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
test_def_overall_dict["loaded"] = True
test_def_overall_dict["api"] = {}
test_def_overall_dict["suite"] = {}
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
api_files = FileUtils.load_folder_files(api_def_folder)
class TestcaseLoader(object):
for test_file in api_files:
testset = load_test_file(test_file)
test_def_overall_dict["api"].update(testset["api"])
overall_def_dict = {
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
suite_files = FileUtils.load_folder_files(suite_def_folder)
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
# TODO: cache api and suite loading
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
for test_file in FileUtils.load_folder_files(api_def_folder):
TestcaseLoader.load_api_file(test_file)
for suite_file in suite_files:
suite = load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
for suite_file in FileUtils.load_folder_files(suite_def_folder):
suite = TestcaseLoader.load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
test_def_overall_dict["suite"][function_meta["func_name"]] = suite
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
TestcaseLoader.overall_def_dict["suite"][function_meta["func_name"]] = suite
def load_testsets_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
def load_api_file(file_path):
""" load api definition from file and store in overall_def_dict["api"]
api file should be in format below:
[
{
"api": {
"def": "api_login",
"request": {},
"validate": []
}
},
{
"api": {
"def": "api_logout",
"request": {},
"validate": []
}
}
]
"""
api_items = FileUtils.load_file(file_path)
if not isinstance(api_items, list):
raise exception.FileFormatError("API format error: {}".format(file_path))
for file_path in set(path):
testset = load_testsets_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
for api_item in api_items:
if not isinstance(api_item, dict) or len(api_item) != 1:
raise exception.FileFormatError("API format error: {}".format(file_path))
return testsets
key, api_dict = api_item.popitem()
if key != "api" or not isinstance(api_dict, dict) or "def" not in api_dict:
raise exception.FileFormatError("API format error: {}".format(file_path))
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
api_def = api_dict.pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if func_name in TestcaseLoader.overall_def_dict["api"]:
logger.log_warning("API definition duplicated: {}".format(func_name))
if os.path.isdir(path):
files_list = FileUtils.load_folder_files(path)
testcases_list = load_testsets_by_path(files_list)
api_dict["function_meta"] = function_meta
TestcaseLoader.overall_def_dict["api"][func_name] = api_dict
def load_test_file(file_path):
""" load testcase file or suite file
@param file_path: absolute valid file path
file_path should be in format below:
[
{
"config": {
"name": "",
"def": "suite_order()",
"request": {}
}
},
{
"test": {
"name": "add product to cart",
"api": "api_add_cart()",
"validate": []
}
},
{
"test": {
"name": "checkout cart",
"request": {},
"validate": []
}
}
]
@return testset dict
{
"name": "desc1",
"config": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"name": "",
"config": {
"path": file_path
},
"testcases": [] # TODO: rename to tests
}
for item in FileUtils.load_file(file_path):
if not isinstance(item, dict) or len(item) != 1:
raise exception.FileFormatError("Testcase format error: {}".format(file_path))
key, test_block = item.popitem()
if not isinstance(test_block, dict):
raise exception.FileFormatError("Testcase format error: {}".format(file_path))
if key == "config":
testset["config"].update(test_block)
testset["name"] = test_block.get("name", "")
elif key == "test":
if "api" in test_block:
ref_call = test_block["api"]
def_block = TestcaseLoader._get_block_by_name(ref_call, "api")
TestcaseLoader._override_block(def_block, test_block)
testset["testcases"].append(test_block)
elif "suite" in test_block:
ref_call = test_block["suite"]
block = TestcaseLoader._get_block_by_name(ref_call, "suite")
testset["testcases"].extend(block["testcases"])
else:
testset["testcases"].append(test_block)
elif os.path.isfile(path):
try:
testset = load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
logger.log_warning(
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
)
return testset
def _get_block_by_name(ref_call, ref_type):
""" get test content by reference name
@params:
ref_call: e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_call)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
block = TestcaseLoader._get_test_definition(func_name, ref_type)
def_args = block.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exception.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
block = substitute_variables_with_mapping(block, args_mapping)
return block
def _get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
block = TestcaseLoader.overall_def_dict.get(ref_type, {}).get(name)
if not block:
err_msg = "{} not found!".format(name)
if ref_type == "api":
raise exception.ApiNotFound(err_msg)
else:
# ref_type == "suite":
raise exception.SuiteNotFound(err_msg)
return block
def _override_block(def_block, current_block):
""" override def_block with current_block
@param def_block:
{
"name": "get token",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
@param current_block:
{
"name": "get token",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
def_validators = def_block.get("validate") or def_block.get("validators", [])
current_validators = current_block.get("validate") or current_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
current_extractors = current_block.get("extract") \
or current_block.get("extractors") \
or current_block.get("extract_binds", [])
current_block.update(def_block)
current_block["validate"] = _merge_validator(
def_validators,
current_validators
)
current_block["extract"] = _merge_extractor(
def_extrators,
current_extractors
)
def load_testsets_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
for file_path in set(path):
testset = TestcaseLoader.load_testsets_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
return testsets
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in TestcaseLoader.testcases_cache_mapping:
return TestcaseLoader.testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = FileUtils.load_folder_files(path)
testcases_list = TestcaseLoader.load_testsets_by_path(files_list)
elif os.path.isfile(path):
try:
testset = TestcaseLoader.load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exception.FileFormatError:
testcases_list = []
except exception.FileFormatError:
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
testcases_cache_mapping[path] = testcases_list
return testcases_list
TestcaseLoader.testcases_cache_mapping[path] = testcases_list
return testcases_list
def parse_validator(validator):
""" parse validator, validator maybe in two format
@@ -262,11 +463,11 @@ def _get_validators_mapping(validators):
return validators_mapping
def merge_validator(api_validators, test_validators):
""" merge api_validators with test_validators
def _merge_validator(def_validators, current_validators):
""" merge def_validators with current_validators
@params:
api_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
test_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
@@ -274,24 +475,24 @@ def merge_validator(api_validators, test_validators):
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not api_validators:
return test_validators
if not def_validators:
return current_validators
elif not test_validators:
return api_validators
elif not current_validators:
return def_validators
else:
api_validators_mapping = _get_validators_mapping(api_validators)
test_validators_mapping = _get_validators_mapping(test_validators)
api_validators_mapping = _get_validators_mapping(def_validators)
test_validators_mapping = _get_validators_mapping(current_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def merge_extractor(api_extrators, test_extracors):
""" merge api_extrators with test_extracors
def _merge_extractor(def_extrators, current_extractors):
""" merge def_extrators with current_extractors
@params:
api_extrators: [{"var1": "val1"}, {"var2": "val2"}]
test_extracors: [{"var1": "val111"}, {"var3": "val3"}]
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
@@ -299,15 +500,15 @@ def merge_extractor(api_extrators, test_extracors):
{"var3": "val3"}
]
"""
if not api_extrators:
return test_extracors
if not def_extrators:
return current_extractors
elif not test_extracors:
return api_extrators
elif not current_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in api_extrators:
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
@@ -315,7 +516,7 @@ def merge_extractor(api_extrators, test_extracors):
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in test_extracors:
for test_extrator in current_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
@@ -329,46 +530,6 @@ def merge_extractor(api_extrators, test_extracors):
return extractor_list
def extend_test_api(test_block_dict):
""" update test block api with api definition
@param
test_block_dict:
{
"name": "get token",
"api": "get_token($user_agent, $device_sn, $os_platform, $app_version)",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 200]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 200]}, {'len_eq': ['content.token', 16]}]
}
"""
ref_name = test_block_dict["api"]
test_info = get_testinfo_by_reference(ref_name, "api")
api_validators = test_info.get("validate") or test_info.get("validators", [])
test_validators = test_block_dict.get("validate") or test_block_dict.get("validators", [])
api_extrators = test_info.get("extract") \
or test_info.get("extractors") \
or test_info.get("extract_binds", [])
test_extracors = test_block_dict.get("extract") \
or test_block_dict.get("extractors") \
or test_block_dict.get("extract_binds", [])
test_block_dict.update(test_info)
test_block_dict["validate"] = merge_validator(
api_validators,
test_validators
)
test_block_dict["extract"] = merge_extractor(
api_extrators,
test_extracors
)
def is_testset(data_structure):
""" check if data_structure is a testset
@@ -410,115 +571,6 @@ def is_testsets(data_structure):
return True
def load_test_file(file_path):
""" load testset file, get testset data structure.
@param file_path: absolute valid testset file path
@return testset dict
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"name": "",
"config": {
"path": file_path
},
"api": {},
"testcases": []
}
tests_list = FileUtils.load_file(file_path)
for item in tests_list:
for key in item:
if key == "config":
testset["config"].update(item["config"])
testset["name"] = item["config"].get("name", "")
elif key == "test":
test_block_dict = item["test"]
if "api" in test_block_dict:
extend_test_api(test_block_dict)
testset["testcases"].append(test_block_dict)
elif "suite" in test_block_dict:
ref_name = test_block_dict["suite"]
test_info = get_testinfo_by_reference(ref_name, "suite")
testset["testcases"].extend(test_info["testcases"])
else:
testset["testcases"].append(test_block_dict)
elif key == "api":
api_def = item["api"].pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
if func_name in testset["api"]:
logger.log_warning("api definition duplicated: {}".format(func_name))
api_info = {}
api_info["function_meta"] = function_meta
api_info.update(item["api"])
testset["api"][func_name] = api_info
else:
logger.log_warning(
"unexpected block: {}. block should only be 'config', 'test' or 'api'.".format(key)
)
return testset
def get_testinfo_by_reference(ref_name, ref_type):
""" get test content by reference name
@params:
ref_name: reference name, e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_name)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
test_info = get_test_definition(func_name, ref_type)
def_args = test_info.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exception.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
test_info = substitute_variables_with_mapping(test_info, args_mapping)
return test_info
def get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
if not test_def_overall_dict.get("loaded", False):
load_test_dependencies()
test_info = test_def_overall_dict.get(ref_type, {}).get(name)
if not test_info:
err_msg = "{} {} not found!".format(ref_type, name)
if ref_type == "api":
raise exception.ApiNotFound(err_msg)
elif ref_type == "suite":
raise exception.SuiteNotFound(err_msg)
else:
raise exception.ParamsError("ref_type can only be api or suite!")
return test_info
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.