mirror of
https://github.com/httprunner/httprunner.git
synced 2026-05-12 02:21:29 +08:00
relocate test suite loader
This commit is contained in:
@@ -5,7 +5,7 @@ import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
from httprunner import built_in, exceptions, logger, testcase, utils
|
||||
from httprunner import built_in, exceptions, logger, parser, testcase, utils
|
||||
from httprunner.compat import OrderedDict
|
||||
|
||||
|
||||
@@ -251,7 +251,7 @@ class Context(object):
|
||||
for validator in validators:
|
||||
# evaluate validators with context variable mapping.
|
||||
evaluated_validator = self.eval_check_item(
|
||||
testcase.parse_validator(validator),
|
||||
parser.parse_validator(validator),
|
||||
resp_obj
|
||||
)
|
||||
|
||||
|
||||
@@ -4,8 +4,7 @@ import json
|
||||
import os
|
||||
|
||||
import yaml
|
||||
from httprunner import exceptions, logger
|
||||
|
||||
from httprunner import exceptions, logger, parser, utils
|
||||
|
||||
###############################################################################
|
||||
## file loader
|
||||
@@ -154,4 +153,254 @@ def load_dot_env_file(path):
|
||||
variable, value = line.split("=")
|
||||
variable = variable.strip()
|
||||
os.environ[variable] = value.strip()
|
||||
logger.log_debug("Loaded variable: {}".format(variable))
|
||||
logger.log_debug("Loaded variable: {}".format(variable))
|
||||
|
||||
|
||||
###############################################################################
|
||||
## suite loader
|
||||
###############################################################################
|
||||
|
||||
|
||||
overall_def_dict = {
|
||||
"api": {},
|
||||
"suite": {}
|
||||
}
|
||||
testcases_cache_mapping = {}
|
||||
|
||||
|
||||
def load_test_dependencies():
|
||||
""" load all api and suite definitions.
|
||||
default api folder is "$CWD/tests/api/".
|
||||
default suite folder is "$CWD/tests/suite/".
|
||||
"""
|
||||
# TODO: cache api and suite loading
|
||||
# load api definitions
|
||||
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
|
||||
for test_file in load_folder_files(api_def_folder):
|
||||
load_api_file(test_file)
|
||||
|
||||
# load suite definitions
|
||||
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
|
||||
for suite_file in load_folder_files(suite_def_folder):
|
||||
suite = load_test_file(suite_file)
|
||||
if "def" not in suite["config"]:
|
||||
raise exceptions.ParamsError("def missed in suite file: {}!".format(suite_file))
|
||||
|
||||
call_func = suite["config"]["def"]
|
||||
function_meta = parser.parse_function(call_func)
|
||||
suite["function_meta"] = function_meta
|
||||
overall_def_dict["suite"][function_meta["func_name"]] = suite
|
||||
|
||||
|
||||
def load_api_file(file_path):
|
||||
""" load api definition from file and store in overall_def_dict["api"]
|
||||
api file should be in format below:
|
||||
[
|
||||
{
|
||||
"api": {
|
||||
"def": "api_login",
|
||||
"request": {},
|
||||
"validate": []
|
||||
}
|
||||
},
|
||||
{
|
||||
"api": {
|
||||
"def": "api_logout",
|
||||
"request": {},
|
||||
"validate": []
|
||||
}
|
||||
}
|
||||
]
|
||||
"""
|
||||
api_items = load_file(file_path)
|
||||
if not isinstance(api_items, list):
|
||||
raise exceptions.FileFormatError("API format error: {}".format(file_path))
|
||||
|
||||
for api_item in api_items:
|
||||
if not isinstance(api_item, dict) or len(api_item) != 1:
|
||||
raise exceptions.FileFormatError("API format error: {}".format(file_path))
|
||||
|
||||
key, api_dict = api_item.popitem()
|
||||
if key != "api" or not isinstance(api_dict, dict) or "def" not in api_dict:
|
||||
raise exceptions.FileFormatError("API format error: {}".format(file_path))
|
||||
|
||||
api_def = api_dict.pop("def")
|
||||
function_meta = parser.parse_function(api_def)
|
||||
func_name = function_meta["func_name"]
|
||||
|
||||
if func_name in overall_def_dict["api"]:
|
||||
logger.log_warning("API definition duplicated: {}".format(func_name))
|
||||
|
||||
api_dict["function_meta"] = function_meta
|
||||
overall_def_dict["api"][func_name] = api_dict
|
||||
|
||||
|
||||
def load_test_file(file_path):
|
||||
""" load testcase file or suite file
|
||||
@param file_path: absolute valid file path
|
||||
file_path should be in format below:
|
||||
[
|
||||
{
|
||||
"config": {
|
||||
"name": "",
|
||||
"def": "suite_order()",
|
||||
"request": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"test": {
|
||||
"name": "add product to cart",
|
||||
"api": "api_add_cart()",
|
||||
"validate": []
|
||||
}
|
||||
},
|
||||
{
|
||||
"test": {
|
||||
"name": "checkout cart",
|
||||
"request": {},
|
||||
"validate": []
|
||||
}
|
||||
}
|
||||
]
|
||||
@return testset dict
|
||||
{
|
||||
"config": {},
|
||||
"testcases": [testcase11, testcase12]
|
||||
}
|
||||
"""
|
||||
testset = {
|
||||
"config": {
|
||||
"path": file_path
|
||||
},
|
||||
"testcases": [] # TODO: rename to tests
|
||||
}
|
||||
for item in load_file(file_path):
|
||||
if not isinstance(item, dict) or len(item) != 1:
|
||||
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
|
||||
|
||||
key, test_block = item.popitem()
|
||||
if not isinstance(test_block, dict):
|
||||
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
|
||||
|
||||
if key == "config":
|
||||
testset["config"].update(test_block)
|
||||
|
||||
elif key == "test":
|
||||
if "api" in test_block:
|
||||
ref_call = test_block["api"]
|
||||
def_block = _get_block_by_name(ref_call, "api")
|
||||
utils._override_block(def_block, test_block)
|
||||
testset["testcases"].append(test_block)
|
||||
elif "suite" in test_block:
|
||||
ref_call = test_block["suite"]
|
||||
block = _get_block_by_name(ref_call, "suite")
|
||||
testset["testcases"].extend(block["testcases"])
|
||||
else:
|
||||
testset["testcases"].append(test_block)
|
||||
|
||||
else:
|
||||
logger.log_warning(
|
||||
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
|
||||
)
|
||||
|
||||
return testset
|
||||
|
||||
|
||||
def _get_block_by_name(ref_call, ref_type):
|
||||
""" get test content by reference name
|
||||
@params:
|
||||
ref_call: e.g. api_v1_Account_Login_POST($UserName, $Password)
|
||||
ref_type: "api" or "suite"
|
||||
"""
|
||||
function_meta = parser.parse_function(ref_call)
|
||||
func_name = function_meta["func_name"]
|
||||
call_args = function_meta["args"]
|
||||
block = _get_test_definition(func_name, ref_type)
|
||||
def_args = block.get("function_meta").get("args", [])
|
||||
|
||||
if len(call_args) != len(def_args):
|
||||
raise exceptions.ParamsError("call args mismatch defined args!")
|
||||
|
||||
args_mapping = {}
|
||||
for index, item in enumerate(def_args):
|
||||
if call_args[index] == item:
|
||||
continue
|
||||
|
||||
args_mapping[item] = call_args[index]
|
||||
|
||||
if args_mapping:
|
||||
block = utils.substitute_variables_with_mapping(block, args_mapping)
|
||||
|
||||
return block
|
||||
|
||||
|
||||
def _get_test_definition(name, ref_type):
|
||||
""" get expected api or suite.
|
||||
@params:
|
||||
name: api or suite name
|
||||
ref_type: "api" or "suite"
|
||||
@return
|
||||
expected api info if found, otherwise raise ApiNotFound exception
|
||||
"""
|
||||
block = overall_def_dict.get(ref_type, {}).get(name)
|
||||
|
||||
if not block:
|
||||
err_msg = "{} not found!".format(name)
|
||||
if ref_type == "api":
|
||||
raise exceptions.ApiNotFound(err_msg)
|
||||
else:
|
||||
# ref_type == "suite":
|
||||
raise exceptions.SuiteNotFound(err_msg)
|
||||
|
||||
return block
|
||||
|
||||
|
||||
def load_testsets_by_path(path):
|
||||
""" load testcases from file path
|
||||
@param path: path could be in several type
|
||||
- absolute/relative file path
|
||||
- absolute/relative folder path
|
||||
- list/set container with file(s) and/or folder(s)
|
||||
@return testcase sets list, each testset is corresponding to a file
|
||||
[
|
||||
testset_dict_1,
|
||||
testset_dict_2
|
||||
]
|
||||
"""
|
||||
if isinstance(path, (list, set)):
|
||||
testsets = []
|
||||
|
||||
for file_path in set(path):
|
||||
testset = load_testsets_by_path(file_path)
|
||||
if not testset:
|
||||
continue
|
||||
testsets.extend(testset)
|
||||
|
||||
return testsets
|
||||
|
||||
if not os.path.isabs(path):
|
||||
path = os.path.join(os.getcwd(), path)
|
||||
|
||||
if path in testcases_cache_mapping:
|
||||
return testcases_cache_mapping[path]
|
||||
|
||||
if os.path.isdir(path):
|
||||
files_list = load_folder_files(path)
|
||||
testcases_list = load_testsets_by_path(files_list)
|
||||
|
||||
elif os.path.isfile(path):
|
||||
try:
|
||||
testset = load_test_file(path)
|
||||
if testset["testcases"] or testset["api"]:
|
||||
testcases_list = [testset]
|
||||
else:
|
||||
testcases_list = []
|
||||
except exceptions.FileFormatError:
|
||||
testcases_list = []
|
||||
|
||||
else:
|
||||
logger.log_error(u"file not found: {}".format(path))
|
||||
testcases_list = []
|
||||
|
||||
testcases_cache_mapping[path] = testcases_list
|
||||
return testcases_list
|
||||
|
||||
@@ -6,7 +6,7 @@ import os
|
||||
import sys
|
||||
|
||||
from httprunner.logger import color_print
|
||||
from httprunner.testcase import TestcaseLoader
|
||||
from httprunner import loader
|
||||
from locust.main import main
|
||||
|
||||
|
||||
@@ -40,8 +40,8 @@ def gen_locustfile(testcase_file_path):
|
||||
"templates",
|
||||
"locustfile_template"
|
||||
)
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
testset = TestcaseLoader.load_test_file(testcase_file_path)
|
||||
loader.load_test_dependencies()
|
||||
testset = loader.load_test_file(testcase_file_path)
|
||||
host = testset.get("config", {}).get("request", {}).get("base_url", "")
|
||||
|
||||
with io.open(template_path, encoding='utf-8') as template:
|
||||
|
||||
111
httprunner/parser.py
Normal file
111
httprunner/parser.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import ast
|
||||
import re
|
||||
|
||||
from httprunner import exceptions
|
||||
|
||||
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
|
||||
|
||||
|
||||
def parse_string_value(str_value):
|
||||
""" parse string to number if possible
|
||||
e.g. "123" => 123
|
||||
"12.2" => 12.3
|
||||
"abc" => "abc"
|
||||
"$var" => "$var"
|
||||
"""
|
||||
try:
|
||||
return ast.literal_eval(str_value)
|
||||
except ValueError:
|
||||
return str_value
|
||||
except SyntaxError:
|
||||
# e.g. $var, ${func}
|
||||
return str_value
|
||||
|
||||
|
||||
def parse_function(content):
|
||||
""" parse function name and args from string content.
|
||||
@param (str) content
|
||||
@return (dict) function name and args
|
||||
|
||||
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
|
||||
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
|
||||
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
|
||||
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
|
||||
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
|
||||
"""
|
||||
matched = function_regexp_compile.match(content)
|
||||
if not matched:
|
||||
raise exceptions.FunctionNotFound("{} not found!".format(content))
|
||||
|
||||
function_meta = {
|
||||
"func_name": matched.group(1),
|
||||
"args": [],
|
||||
"kwargs": {}
|
||||
}
|
||||
|
||||
args_str = matched.group(2).strip()
|
||||
if args_str == "":
|
||||
return function_meta
|
||||
|
||||
args_list = args_str.split(',')
|
||||
for arg in args_list:
|
||||
arg = arg.strip()
|
||||
if '=' in arg:
|
||||
key, value = arg.split('=')
|
||||
function_meta["kwargs"][key.strip()] = parse_string_value(value.strip())
|
||||
else:
|
||||
function_meta["args"].append(parse_string_value(arg))
|
||||
|
||||
return function_meta
|
||||
|
||||
|
||||
def parse_validator(validator):
|
||||
""" parse validator, validator maybe in two format
|
||||
@param (dict) validator
|
||||
format1: this is kept for compatiblity with the previous versions.
|
||||
{"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
|
||||
format2: recommended new version
|
||||
{'eq': ['status_code', 201]}
|
||||
{'eq': ['$resp_body_success', True]}
|
||||
@return (dict) validator info
|
||||
{
|
||||
"check": "status_code",
|
||||
"expect": 201,
|
||||
"comparator": "eq"
|
||||
}
|
||||
"""
|
||||
if not isinstance(validator, dict):
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
if "check" in validator and len(validator) > 1:
|
||||
# format1
|
||||
check_item = validator.get("check")
|
||||
|
||||
if "expect" in validator:
|
||||
expect_value = validator.get("expect")
|
||||
elif "expected" in validator:
|
||||
expect_value = validator.get("expected")
|
||||
else:
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
comparator = validator.get("comparator", "eq")
|
||||
|
||||
elif len(validator) == 1:
|
||||
# format2
|
||||
comparator = list(validator.keys())[0]
|
||||
compare_values = validator[comparator]
|
||||
|
||||
if not isinstance(compare_values, list) or len(compare_values) != 2:
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
check_item, expect_value = compare_values
|
||||
|
||||
else:
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
return {
|
||||
"check": check_item,
|
||||
"expect": expect_value,
|
||||
"comparator": comparator
|
||||
}
|
||||
@@ -8,8 +8,6 @@ from httprunner import exceptions, loader, logger, runner, testcase, utils
|
||||
from httprunner.compat import is_py3
|
||||
from httprunner.report import (HtmlTestResult, get_platform, get_summary,
|
||||
render_html_report)
|
||||
from httprunner.testcase import TestcaseLoader
|
||||
from httprunner.utils import print_output
|
||||
|
||||
|
||||
class TestCase(unittest.TestCase):
|
||||
@@ -180,8 +178,8 @@ def init_test_suites(path_or_testsets, mapping=None, http_client_session=None):
|
||||
passed in variables mapping, it will override variables in config block
|
||||
"""
|
||||
if not testcase.is_testsets(path_or_testsets):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
testsets = TestcaseLoader.load_testsets_by_path(path_or_testsets)
|
||||
loader.load_test_dependencies()
|
||||
testsets = loader.load_testsets_by_path(path_or_testsets)
|
||||
else:
|
||||
testsets = path_or_testsets
|
||||
|
||||
@@ -268,7 +266,7 @@ class HttpRunner(object):
|
||||
test_suite_summary["name"] = test_suite.config.get("name")
|
||||
test_suite_summary["base_url"] = test_suite.config.get("request", {}).get("base_url", "")
|
||||
test_suite_summary["output"] = test_suite.output
|
||||
print_output(test_suite_summary["output"])
|
||||
utils.print_output(test_suite_summary["output"])
|
||||
|
||||
accumulate_stat(self.summary["stat"], test_suite_summary["stat"])
|
||||
accumulate_stat(self.summary["time"], test_suite_summary["time"])
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
# encoding: utf-8
|
||||
|
||||
import ast
|
||||
import collections
|
||||
import io
|
||||
import itertools
|
||||
import json
|
||||
@@ -9,13 +7,12 @@ import os
|
||||
import random
|
||||
import re
|
||||
|
||||
from httprunner import exceptions, loader, logger, utils
|
||||
from httprunner import exceptions, loader, logger, parser, utils
|
||||
from httprunner.compat import (OrderedDict, basestring, builtin_str,
|
||||
numeric_types, str)
|
||||
|
||||
variable_regexp = r"\$([\w_]+)"
|
||||
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
|
||||
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
|
||||
|
||||
|
||||
def extract_variables(content):
|
||||
@@ -49,493 +46,6 @@ def extract_functions(content):
|
||||
except TypeError:
|
||||
return []
|
||||
|
||||
def parse_string_value(str_value):
|
||||
""" parse string to number if possible
|
||||
e.g. "123" => 123
|
||||
"12.2" => 12.3
|
||||
"abc" => "abc"
|
||||
"$var" => "$var"
|
||||
"""
|
||||
try:
|
||||
return ast.literal_eval(str_value)
|
||||
except ValueError:
|
||||
return str_value
|
||||
except SyntaxError:
|
||||
# e.g. $var, ${func}
|
||||
return str_value
|
||||
|
||||
def parse_function(content):
|
||||
""" parse function name and args from string content.
|
||||
@param (str) content
|
||||
@return (dict) function name and args
|
||||
|
||||
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
|
||||
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
|
||||
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
|
||||
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
|
||||
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
|
||||
"""
|
||||
matched = function_regexp_compile.match(content)
|
||||
if not matched:
|
||||
raise exceptions.FunctionNotFound("{} not found!".format(content))
|
||||
|
||||
function_meta = {
|
||||
"func_name": matched.group(1),
|
||||
"args": [],
|
||||
"kwargs": {}
|
||||
}
|
||||
|
||||
args_str = matched.group(2).strip()
|
||||
if args_str == "":
|
||||
return function_meta
|
||||
|
||||
args_list = args_str.split(',')
|
||||
for arg in args_list:
|
||||
arg = arg.strip()
|
||||
if '=' in arg:
|
||||
key, value = arg.split('=')
|
||||
function_meta["kwargs"][key.strip()] = parse_string_value(value.strip())
|
||||
else:
|
||||
function_meta["args"].append(parse_string_value(arg))
|
||||
|
||||
return function_meta
|
||||
|
||||
|
||||
class TestcaseLoader(object):
|
||||
|
||||
overall_def_dict = {
|
||||
"api": {},
|
||||
"suite": {}
|
||||
}
|
||||
testcases_cache_mapping = {}
|
||||
|
||||
@staticmethod
|
||||
def load_test_dependencies():
|
||||
""" load all api and suite definitions.
|
||||
default api folder is "$CWD/tests/api/".
|
||||
default suite folder is "$CWD/tests/suite/".
|
||||
"""
|
||||
# TODO: cache api and suite loading
|
||||
# load api definitions
|
||||
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
|
||||
for test_file in loader.load_folder_files(api_def_folder):
|
||||
TestcaseLoader.load_api_file(test_file)
|
||||
|
||||
# load suite definitions
|
||||
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
|
||||
for suite_file in loader.load_folder_files(suite_def_folder):
|
||||
suite = TestcaseLoader.load_test_file(suite_file)
|
||||
if "def" not in suite["config"]:
|
||||
raise exceptions.ParamsError("def missed in suite file: {}!".format(suite_file))
|
||||
|
||||
call_func = suite["config"]["def"]
|
||||
function_meta = parse_function(call_func)
|
||||
suite["function_meta"] = function_meta
|
||||
TestcaseLoader.overall_def_dict["suite"][function_meta["func_name"]] = suite
|
||||
|
||||
@staticmethod
|
||||
def load_api_file(file_path):
|
||||
""" load api definition from file and store in overall_def_dict["api"]
|
||||
api file should be in format below:
|
||||
[
|
||||
{
|
||||
"api": {
|
||||
"def": "api_login",
|
||||
"request": {},
|
||||
"validate": []
|
||||
}
|
||||
},
|
||||
{
|
||||
"api": {
|
||||
"def": "api_logout",
|
||||
"request": {},
|
||||
"validate": []
|
||||
}
|
||||
}
|
||||
]
|
||||
"""
|
||||
api_items = loader.load_file(file_path)
|
||||
if not isinstance(api_items, list):
|
||||
raise exceptions.FileFormatError("API format error: {}".format(file_path))
|
||||
|
||||
for api_item in api_items:
|
||||
if not isinstance(api_item, dict) or len(api_item) != 1:
|
||||
raise exceptions.FileFormatError("API format error: {}".format(file_path))
|
||||
|
||||
key, api_dict = api_item.popitem()
|
||||
if key != "api" or not isinstance(api_dict, dict) or "def" not in api_dict:
|
||||
raise exceptions.FileFormatError("API format error: {}".format(file_path))
|
||||
|
||||
api_def = api_dict.pop("def")
|
||||
function_meta = parse_function(api_def)
|
||||
func_name = function_meta["func_name"]
|
||||
|
||||
if func_name in TestcaseLoader.overall_def_dict["api"]:
|
||||
logger.log_warning("API definition duplicated: {}".format(func_name))
|
||||
|
||||
api_dict["function_meta"] = function_meta
|
||||
TestcaseLoader.overall_def_dict["api"][func_name] = api_dict
|
||||
|
||||
@staticmethod
|
||||
def load_test_file(file_path):
|
||||
""" load testcase file or suite file
|
||||
@param file_path: absolute valid file path
|
||||
file_path should be in format below:
|
||||
[
|
||||
{
|
||||
"config": {
|
||||
"name": "",
|
||||
"def": "suite_order()",
|
||||
"request": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"test": {
|
||||
"name": "add product to cart",
|
||||
"api": "api_add_cart()",
|
||||
"validate": []
|
||||
}
|
||||
},
|
||||
{
|
||||
"test": {
|
||||
"name": "checkout cart",
|
||||
"request": {},
|
||||
"validate": []
|
||||
}
|
||||
}
|
||||
]
|
||||
@return testset dict
|
||||
{
|
||||
"config": {},
|
||||
"testcases": [testcase11, testcase12]
|
||||
}
|
||||
"""
|
||||
testset = {
|
||||
"config": {
|
||||
"path": file_path
|
||||
},
|
||||
"testcases": [] # TODO: rename to tests
|
||||
}
|
||||
for item in loader.load_file(file_path):
|
||||
if not isinstance(item, dict) or len(item) != 1:
|
||||
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
|
||||
|
||||
key, test_block = item.popitem()
|
||||
if not isinstance(test_block, dict):
|
||||
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
|
||||
|
||||
if key == "config":
|
||||
testset["config"].update(test_block)
|
||||
|
||||
elif key == "test":
|
||||
if "api" in test_block:
|
||||
ref_call = test_block["api"]
|
||||
def_block = TestcaseLoader._get_block_by_name(ref_call, "api")
|
||||
TestcaseLoader._override_block(def_block, test_block)
|
||||
testset["testcases"].append(test_block)
|
||||
elif "suite" in test_block:
|
||||
ref_call = test_block["suite"]
|
||||
block = TestcaseLoader._get_block_by_name(ref_call, "suite")
|
||||
testset["testcases"].extend(block["testcases"])
|
||||
else:
|
||||
testset["testcases"].append(test_block)
|
||||
|
||||
else:
|
||||
logger.log_warning(
|
||||
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
|
||||
)
|
||||
|
||||
return testset
|
||||
|
||||
@staticmethod
|
||||
def _get_block_by_name(ref_call, ref_type):
|
||||
""" get test content by reference name
|
||||
@params:
|
||||
ref_call: e.g. api_v1_Account_Login_POST($UserName, $Password)
|
||||
ref_type: "api" or "suite"
|
||||
"""
|
||||
function_meta = parse_function(ref_call)
|
||||
func_name = function_meta["func_name"]
|
||||
call_args = function_meta["args"]
|
||||
block = TestcaseLoader._get_test_definition(func_name, ref_type)
|
||||
def_args = block.get("function_meta").get("args", [])
|
||||
|
||||
if len(call_args) != len(def_args):
|
||||
raise exceptions.ParamsError("call args mismatch defined args!")
|
||||
|
||||
args_mapping = {}
|
||||
for index, item in enumerate(def_args):
|
||||
if call_args[index] == item:
|
||||
continue
|
||||
|
||||
args_mapping[item] = call_args[index]
|
||||
|
||||
if args_mapping:
|
||||
block = substitute_variables_with_mapping(block, args_mapping)
|
||||
|
||||
return block
|
||||
|
||||
@staticmethod
|
||||
def _get_test_definition(name, ref_type):
|
||||
""" get expected api or suite.
|
||||
@params:
|
||||
name: api or suite name
|
||||
ref_type: "api" or "suite"
|
||||
@return
|
||||
expected api info if found, otherwise raise ApiNotFound exception
|
||||
"""
|
||||
block = TestcaseLoader.overall_def_dict.get(ref_type, {}).get(name)
|
||||
|
||||
if not block:
|
||||
err_msg = "{} not found!".format(name)
|
||||
if ref_type == "api":
|
||||
raise exceptions.ApiNotFound(err_msg)
|
||||
else:
|
||||
# ref_type == "suite":
|
||||
raise exceptions.SuiteNotFound(err_msg)
|
||||
|
||||
return block
|
||||
|
||||
@staticmethod
|
||||
def _override_block(def_block, current_block):
|
||||
""" override def_block with current_block
|
||||
@param def_block:
|
||||
{
|
||||
"name": "get token",
|
||||
"request": {...},
|
||||
"validate": [{'eq': ['status_code', 200]}]
|
||||
}
|
||||
@param current_block:
|
||||
{
|
||||
"name": "get token",
|
||||
"extract": [{"token": "content.token"}],
|
||||
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
|
||||
}
|
||||
@return
|
||||
{
|
||||
"name": "get token",
|
||||
"request": {...},
|
||||
"extract": [{"token": "content.token"}],
|
||||
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
|
||||
}
|
||||
"""
|
||||
def_validators = def_block.get("validate") or def_block.get("validators", [])
|
||||
current_validators = current_block.get("validate") or current_block.get("validators", [])
|
||||
|
||||
def_extrators = def_block.get("extract") \
|
||||
or def_block.get("extractors") \
|
||||
or def_block.get("extract_binds", [])
|
||||
current_extractors = current_block.get("extract") \
|
||||
or current_block.get("extractors") \
|
||||
or current_block.get("extract_binds", [])
|
||||
|
||||
current_block.update(def_block)
|
||||
current_block["validate"] = _merge_validator(
|
||||
def_validators,
|
||||
current_validators
|
||||
)
|
||||
current_block["extract"] = _merge_extractor(
|
||||
def_extrators,
|
||||
current_extractors
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def load_testsets_by_path(path):
|
||||
""" load testcases from file path
|
||||
@param path: path could be in several type
|
||||
- absolute/relative file path
|
||||
- absolute/relative folder path
|
||||
- list/set container with file(s) and/or folder(s)
|
||||
@return testcase sets list, each testset is corresponding to a file
|
||||
[
|
||||
testset_dict_1,
|
||||
testset_dict_2
|
||||
]
|
||||
"""
|
||||
if isinstance(path, (list, set)):
|
||||
testsets = []
|
||||
|
||||
for file_path in set(path):
|
||||
testset = TestcaseLoader.load_testsets_by_path(file_path)
|
||||
if not testset:
|
||||
continue
|
||||
testsets.extend(testset)
|
||||
|
||||
return testsets
|
||||
|
||||
if not os.path.isabs(path):
|
||||
path = os.path.join(os.getcwd(), path)
|
||||
|
||||
if path in TestcaseLoader.testcases_cache_mapping:
|
||||
return TestcaseLoader.testcases_cache_mapping[path]
|
||||
|
||||
if os.path.isdir(path):
|
||||
files_list = loader.load_folder_files(path)
|
||||
testcases_list = TestcaseLoader.load_testsets_by_path(files_list)
|
||||
|
||||
elif os.path.isfile(path):
|
||||
try:
|
||||
testset = TestcaseLoader.load_test_file(path)
|
||||
if testset["testcases"] or testset["api"]:
|
||||
testcases_list = [testset]
|
||||
else:
|
||||
testcases_list = []
|
||||
except exceptions.FileFormatError:
|
||||
testcases_list = []
|
||||
|
||||
else:
|
||||
logger.log_error(u"file not found: {}".format(path))
|
||||
testcases_list = []
|
||||
|
||||
TestcaseLoader.testcases_cache_mapping[path] = testcases_list
|
||||
return testcases_list
|
||||
|
||||
def parse_validator(validator):
|
||||
""" parse validator, validator maybe in two format
|
||||
@param (dict) validator
|
||||
format1: this is kept for compatiblity with the previous versions.
|
||||
{"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
|
||||
format2: recommended new version
|
||||
{'eq': ['status_code', 201]}
|
||||
{'eq': ['$resp_body_success', True]}
|
||||
@return (dict) validator info
|
||||
{
|
||||
"check": "status_code",
|
||||
"expect": 201,
|
||||
"comparator": "eq"
|
||||
}
|
||||
"""
|
||||
if not isinstance(validator, dict):
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
if "check" in validator and len(validator) > 1:
|
||||
# format1
|
||||
check_item = validator.get("check")
|
||||
|
||||
if "expect" in validator:
|
||||
expect_value = validator.get("expect")
|
||||
elif "expected" in validator:
|
||||
expect_value = validator.get("expected")
|
||||
else:
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
comparator = validator.get("comparator", "eq")
|
||||
|
||||
elif len(validator) == 1:
|
||||
# format2
|
||||
comparator = list(validator.keys())[0]
|
||||
compare_values = validator[comparator]
|
||||
|
||||
if not isinstance(compare_values, list) or len(compare_values) != 2:
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
check_item, expect_value = compare_values
|
||||
|
||||
else:
|
||||
raise exceptions.ParamsError("invalid validator: {}".format(validator))
|
||||
|
||||
return {
|
||||
"check": check_item,
|
||||
"expect": expect_value,
|
||||
"comparator": comparator
|
||||
}
|
||||
|
||||
def _get_validators_mapping(validators):
|
||||
""" get validators mapping from api or test validators
|
||||
@param (list) validators:
|
||||
[
|
||||
{"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
|
||||
]
|
||||
@return
|
||||
{
|
||||
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
|
||||
}
|
||||
"""
|
||||
validators_mapping = {}
|
||||
|
||||
for validator in validators:
|
||||
validator = parse_validator(validator)
|
||||
|
||||
if not isinstance(validator["check"], collections.Hashable):
|
||||
check = json.dumps(validator["check"])
|
||||
else:
|
||||
check = validator["check"]
|
||||
|
||||
key = (check, validator["comparator"])
|
||||
validators_mapping[key] = validator
|
||||
|
||||
return validators_mapping
|
||||
|
||||
def _merge_validator(def_validators, current_validators):
|
||||
""" merge def_validators with current_validators
|
||||
@params:
|
||||
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
|
||||
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
|
||||
@return:
|
||||
[
|
||||
{"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
{"check": "s2", "expect": 16, "comparator": "len_eq"},
|
||||
{"check": "s3", "expect": 12, "comparator": "len_eq"}
|
||||
]
|
||||
"""
|
||||
if not def_validators:
|
||||
return current_validators
|
||||
|
||||
elif not current_validators:
|
||||
return def_validators
|
||||
|
||||
else:
|
||||
api_validators_mapping = _get_validators_mapping(def_validators)
|
||||
test_validators_mapping = _get_validators_mapping(current_validators)
|
||||
|
||||
api_validators_mapping.update(test_validators_mapping)
|
||||
return list(api_validators_mapping.values())
|
||||
|
||||
def _merge_extractor(def_extrators, current_extractors):
|
||||
""" merge def_extrators with current_extractors
|
||||
@params:
|
||||
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
|
||||
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
|
||||
@return:
|
||||
[
|
||||
{"var1": "val111"},
|
||||
{"var2": "val2"},
|
||||
{"var3": "val3"}
|
||||
]
|
||||
"""
|
||||
if not def_extrators:
|
||||
return current_extractors
|
||||
|
||||
elif not current_extractors:
|
||||
return def_extrators
|
||||
|
||||
else:
|
||||
extractor_dict = OrderedDict()
|
||||
for api_extrator in def_extrators:
|
||||
if len(api_extrator) != 1:
|
||||
logger.log_warning("incorrect extractor: {}".format(api_extrator))
|
||||
continue
|
||||
|
||||
var_name = list(api_extrator.keys())[0]
|
||||
extractor_dict[var_name] = api_extrator[var_name]
|
||||
|
||||
for test_extrator in current_extractors:
|
||||
if len(test_extrator) != 1:
|
||||
logger.log_warning("incorrect extractor: {}".format(test_extrator))
|
||||
continue
|
||||
|
||||
var_name = list(test_extrator.keys())[0]
|
||||
extractor_dict[var_name] = test_extrator[var_name]
|
||||
|
||||
extractor_list = []
|
||||
for key, value in extractor_dict.items():
|
||||
extractor_list.append({key: value})
|
||||
|
||||
return extractor_list
|
||||
|
||||
|
||||
def is_testset(data_structure):
|
||||
""" check if data_structure is a testset
|
||||
@@ -577,61 +87,6 @@ def is_testsets(data_structure):
|
||||
|
||||
return True
|
||||
|
||||
def substitute_variables_with_mapping(content, mapping):
|
||||
""" substitute variables in content with mapping
|
||||
e.g.
|
||||
@params
|
||||
content = {
|
||||
'request': {
|
||||
'url': '/api/users/$uid',
|
||||
'headers': {'token': '$token'}
|
||||
}
|
||||
}
|
||||
mapping = {"$uid": 1000}
|
||||
@return
|
||||
{
|
||||
'request': {
|
||||
'url': '/api/users/1000',
|
||||
'headers': {'token': '$token'}
|
||||
}
|
||||
}
|
||||
"""
|
||||
# TODO: refactor type check
|
||||
if isinstance(content, bool):
|
||||
return content
|
||||
|
||||
if isinstance(content, (numeric_types, type)):
|
||||
return content
|
||||
|
||||
if not content:
|
||||
return content
|
||||
|
||||
if isinstance(content, (list, set, tuple)):
|
||||
return [
|
||||
substitute_variables_with_mapping(item, mapping)
|
||||
for item in content
|
||||
]
|
||||
|
||||
if isinstance(content, dict):
|
||||
substituted_data = {}
|
||||
for key, value in content.items():
|
||||
eval_key = substitute_variables_with_mapping(key, mapping)
|
||||
eval_value = substitute_variables_with_mapping(value, mapping)
|
||||
substituted_data[eval_key] = eval_value
|
||||
|
||||
return substituted_data
|
||||
|
||||
# content is in string format here
|
||||
for var, value in mapping.items():
|
||||
if content == var:
|
||||
# content is a variable
|
||||
content = value
|
||||
else:
|
||||
if not isinstance(value, str):
|
||||
value = builtin_str(value)
|
||||
content = content.replace(var, value)
|
||||
|
||||
return content
|
||||
|
||||
def gen_cartesian_product(*args):
|
||||
""" generate cartesian product for lists
|
||||
@@ -801,7 +256,7 @@ class TestcaseParser(object):
|
||||
def _eval_content_functions(self, content):
|
||||
functions_list = extract_functions(content)
|
||||
for func_content in functions_list:
|
||||
function_meta = parse_function(func_content)
|
||||
function_meta = parser.parse_function(func_content)
|
||||
func_name = function_meta['func_name']
|
||||
|
||||
args = function_meta.get('args', [])
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# encoding: utf-8
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import hashlib
|
||||
import hmac
|
||||
@@ -13,8 +14,9 @@ import string
|
||||
import types
|
||||
from datetime import datetime
|
||||
|
||||
from httprunner import exceptions, logger
|
||||
from httprunner.compat import OrderedDict, basestring, is_py2, is_py3, str
|
||||
from httprunner import exceptions, logger, parser
|
||||
from httprunner.compat import (OrderedDict, basestring, builtin_str, is_py2,
|
||||
is_py3, numeric_types, str)
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
|
||||
SECRET_KEY = "DebugTalk"
|
||||
@@ -84,6 +86,205 @@ def query_json(json_content, query, delimiter='.'):
|
||||
|
||||
return json_content
|
||||
|
||||
|
||||
def substitute_variables_with_mapping(content, mapping):
|
||||
""" substitute variables in content with mapping
|
||||
e.g.
|
||||
@params
|
||||
content = {
|
||||
'request': {
|
||||
'url': '/api/users/$uid',
|
||||
'headers': {'token': '$token'}
|
||||
}
|
||||
}
|
||||
mapping = {"$uid": 1000}
|
||||
@return
|
||||
{
|
||||
'request': {
|
||||
'url': '/api/users/1000',
|
||||
'headers': {'token': '$token'}
|
||||
}
|
||||
}
|
||||
"""
|
||||
# TODO: refactor type check
|
||||
if isinstance(content, bool):
|
||||
return content
|
||||
|
||||
if isinstance(content, (numeric_types, type)):
|
||||
return content
|
||||
|
||||
if not content:
|
||||
return content
|
||||
|
||||
if isinstance(content, (list, set, tuple)):
|
||||
return [
|
||||
substitute_variables_with_mapping(item, mapping)
|
||||
for item in content
|
||||
]
|
||||
|
||||
if isinstance(content, dict):
|
||||
substituted_data = {}
|
||||
for key, value in content.items():
|
||||
eval_key = substitute_variables_with_mapping(key, mapping)
|
||||
eval_value = substitute_variables_with_mapping(value, mapping)
|
||||
substituted_data[eval_key] = eval_value
|
||||
|
||||
return substituted_data
|
||||
|
||||
# content is in string format here
|
||||
for var, value in mapping.items():
|
||||
if content == var:
|
||||
# content is a variable
|
||||
content = value
|
||||
else:
|
||||
if not isinstance(value, str):
|
||||
value = builtin_str(value)
|
||||
content = content.replace(var, value)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def _get_validators_mapping(validators):
|
||||
""" get validators mapping from api or test validators
|
||||
@param (list) validators:
|
||||
[
|
||||
{"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
|
||||
]
|
||||
@return
|
||||
{
|
||||
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
|
||||
}
|
||||
"""
|
||||
validators_mapping = {}
|
||||
|
||||
for validator in validators:
|
||||
validator = parser.parse_validator(validator)
|
||||
|
||||
if not isinstance(validator["check"], collections.Hashable):
|
||||
check = json.dumps(validator["check"])
|
||||
else:
|
||||
check = validator["check"]
|
||||
|
||||
key = (check, validator["comparator"])
|
||||
validators_mapping[key] = validator
|
||||
|
||||
return validators_mapping
|
||||
|
||||
|
||||
def _merge_validator(def_validators, current_validators):
|
||||
""" merge def_validators with current_validators
|
||||
@params:
|
||||
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
|
||||
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
|
||||
@return:
|
||||
[
|
||||
{"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
{"check": "s2", "expect": 16, "comparator": "len_eq"},
|
||||
{"check": "s3", "expect": 12, "comparator": "len_eq"}
|
||||
]
|
||||
"""
|
||||
if not def_validators:
|
||||
return current_validators
|
||||
|
||||
elif not current_validators:
|
||||
return def_validators
|
||||
|
||||
else:
|
||||
api_validators_mapping = _get_validators_mapping(def_validators)
|
||||
test_validators_mapping = _get_validators_mapping(current_validators)
|
||||
|
||||
api_validators_mapping.update(test_validators_mapping)
|
||||
return list(api_validators_mapping.values())
|
||||
|
||||
|
||||
def _merge_extractor(def_extrators, current_extractors):
|
||||
""" merge def_extrators with current_extractors
|
||||
@params:
|
||||
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
|
||||
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
|
||||
@return:
|
||||
[
|
||||
{"var1": "val111"},
|
||||
{"var2": "val2"},
|
||||
{"var3": "val3"}
|
||||
]
|
||||
"""
|
||||
if not def_extrators:
|
||||
return current_extractors
|
||||
|
||||
elif not current_extractors:
|
||||
return def_extrators
|
||||
|
||||
else:
|
||||
extractor_dict = OrderedDict()
|
||||
for api_extrator in def_extrators:
|
||||
if len(api_extrator) != 1:
|
||||
logger.log_warning("incorrect extractor: {}".format(api_extrator))
|
||||
continue
|
||||
|
||||
var_name = list(api_extrator.keys())[0]
|
||||
extractor_dict[var_name] = api_extrator[var_name]
|
||||
|
||||
for test_extrator in current_extractors:
|
||||
if len(test_extrator) != 1:
|
||||
logger.log_warning("incorrect extractor: {}".format(test_extrator))
|
||||
continue
|
||||
|
||||
var_name = list(test_extrator.keys())[0]
|
||||
extractor_dict[var_name] = test_extrator[var_name]
|
||||
|
||||
extractor_list = []
|
||||
for key, value in extractor_dict.items():
|
||||
extractor_list.append({key: value})
|
||||
|
||||
return extractor_list
|
||||
|
||||
|
||||
def _override_block(def_block, current_block):
|
||||
""" override def_block with current_block
|
||||
@param def_block:
|
||||
{
|
||||
"name": "get token",
|
||||
"request": {...},
|
||||
"validate": [{'eq': ['status_code', 200]}]
|
||||
}
|
||||
@param current_block:
|
||||
{
|
||||
"name": "get token",
|
||||
"extract": [{"token": "content.token"}],
|
||||
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
|
||||
}
|
||||
@return
|
||||
{
|
||||
"name": "get token",
|
||||
"request": {...},
|
||||
"extract": [{"token": "content.token"}],
|
||||
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
|
||||
}
|
||||
"""
|
||||
def_validators = def_block.get("validate") or def_block.get("validators", [])
|
||||
current_validators = current_block.get("validate") or current_block.get("validators", [])
|
||||
|
||||
def_extrators = def_block.get("extract") \
|
||||
or def_block.get("extractors") \
|
||||
or def_block.get("extract_binds", [])
|
||||
current_extractors = current_block.get("extract") \
|
||||
or current_block.get("extractors") \
|
||||
or current_block.get("extract_binds", [])
|
||||
|
||||
current_block.update(def_block)
|
||||
current_block["validate"] = _merge_validator(
|
||||
def_validators,
|
||||
current_validators
|
||||
)
|
||||
current_block["extract"] = _merge_extractor(
|
||||
def_extrators,
|
||||
current_extractors
|
||||
)
|
||||
|
||||
|
||||
def get_uniform_comparator(comparator):
|
||||
""" convert comparator alias to uniform name
|
||||
"""
|
||||
|
||||
@@ -2,7 +2,6 @@ import os
|
||||
import shutil
|
||||
|
||||
from httprunner import HttpRunner
|
||||
from httprunner.exceptions import FileNotFound
|
||||
from tests.base import ApiServerUnittest
|
||||
|
||||
|
||||
@@ -155,13 +154,3 @@ class TestHttpRunner(ApiServerUnittest):
|
||||
summary = runner.summary
|
||||
self.assertTrue(summary["success"])
|
||||
self.assertEqual(summary["stat"]["testsRun"], 8)
|
||||
|
||||
def test_load_env_path(self):
|
||||
self.assertNotIn("PROJECT_KEY", os.environ)
|
||||
HttpRunner(dot_env_path="tests/data/test.env").run(self.testset_path)
|
||||
self.assertIn("PROJECT_KEY", os.environ)
|
||||
self.assertEqual(os.environ["UserName"], "debugtalk")
|
||||
|
||||
def test_load_env_path_not_exist(self):
|
||||
with self.assertRaises(FileNotFound):
|
||||
HttpRunner(dot_env_path="not_exist.env").run(self.testset_path)
|
||||
|
||||
326
tests/test_loader.py
Normal file
326
tests/test_loader.py
Normal file
@@ -0,0 +1,326 @@
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from httprunner import exceptions, loader, utils
|
||||
|
||||
|
||||
class TestFileLoader(unittest.TestCase):
|
||||
|
||||
def test_load_yaml_file_file_format_error(self):
|
||||
yaml_tmp_file = "tests/data/tmp.yml"
|
||||
# create empty yaml file
|
||||
with open(yaml_tmp_file, 'w') as f:
|
||||
f.write("")
|
||||
|
||||
with self.assertRaises(exceptions.FileFormatError):
|
||||
loader.load_yaml_file(yaml_tmp_file)
|
||||
|
||||
os.remove(yaml_tmp_file)
|
||||
|
||||
# create invalid format yaml file
|
||||
with open(yaml_tmp_file, 'w') as f:
|
||||
f.write("abc")
|
||||
|
||||
with self.assertRaises(exceptions.FileFormatError):
|
||||
loader.load_yaml_file(yaml_tmp_file)
|
||||
|
||||
os.remove(yaml_tmp_file)
|
||||
|
||||
|
||||
def test_load_json_file_file_format_error(self):
|
||||
json_tmp_file = "tests/data/tmp.json"
|
||||
# create empty file
|
||||
with open(json_tmp_file, 'w') as f:
|
||||
f.write("")
|
||||
|
||||
with self.assertRaises(exceptions.FileFormatError):
|
||||
loader.load_json_file(json_tmp_file)
|
||||
|
||||
os.remove(json_tmp_file)
|
||||
|
||||
# create empty json file
|
||||
with open(json_tmp_file, 'w') as f:
|
||||
f.write("{}")
|
||||
|
||||
with self.assertRaises(exceptions.FileFormatError):
|
||||
loader.load_json_file(json_tmp_file)
|
||||
|
||||
os.remove(json_tmp_file)
|
||||
|
||||
# create invalid format json file
|
||||
with open(json_tmp_file, 'w') as f:
|
||||
f.write("abc")
|
||||
|
||||
with self.assertRaises(exceptions.FileFormatError):
|
||||
loader.load_json_file(json_tmp_file)
|
||||
|
||||
os.remove(json_tmp_file)
|
||||
|
||||
def test_load_testcases_bad_filepath(self):
|
||||
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
|
||||
with self.assertRaises(exceptions.FileNotFound):
|
||||
loader.load_file(testcase_file_path)
|
||||
|
||||
def test_load_json_testcases(self):
|
||||
testcase_file_path = os.path.join(
|
||||
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
|
||||
testcases = loader.load_file(testcase_file_path)
|
||||
self.assertEqual(len(testcases), 3)
|
||||
test = testcases[0]["test"]
|
||||
self.assertIn('name', test)
|
||||
self.assertIn('request', test)
|
||||
self.assertIn('url', test['request'])
|
||||
self.assertIn('method', test['request'])
|
||||
|
||||
def test_load_yaml_testcases(self):
|
||||
testcase_file_path = os.path.join(
|
||||
os.getcwd(), 'tests/data/demo_testset_hardcode.yml')
|
||||
testcases = loader.load_file(testcase_file_path)
|
||||
self.assertEqual(len(testcases), 3)
|
||||
test = testcases[0]["test"]
|
||||
self.assertIn('name', test)
|
||||
self.assertIn('request', test)
|
||||
self.assertIn('url', test['request'])
|
||||
self.assertIn('method', test['request'])
|
||||
|
||||
def test_load_csv_file_one_parameter(self):
|
||||
csv_file_path = os.path.join(
|
||||
os.getcwd(), 'tests/data/user_agent.csv')
|
||||
csv_content = loader.load_file(csv_file_path)
|
||||
self.assertEqual(
|
||||
csv_content,
|
||||
[
|
||||
{'user_agent': 'iOS/10.1'},
|
||||
{'user_agent': 'iOS/10.2'},
|
||||
{'user_agent': 'iOS/10.3'}
|
||||
]
|
||||
)
|
||||
|
||||
def test_load_csv_file_multiple_parameters(self):
|
||||
csv_file_path = os.path.join(
|
||||
os.getcwd(), 'tests/data/account.csv')
|
||||
csv_content = loader.load_file(csv_file_path)
|
||||
self.assertEqual(
|
||||
csv_content,
|
||||
[
|
||||
{'username': 'test1', 'password': '111111'},
|
||||
{'username': 'test2', 'password': '222222'},
|
||||
{'username': 'test3', 'password': '333333'}
|
||||
]
|
||||
)
|
||||
|
||||
def test_load_folder_files(self):
|
||||
folder = os.path.join(os.getcwd(), 'tests')
|
||||
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
|
||||
file2 = os.path.join(os.getcwd(), 'tests', 'data', 'demo_binds.yml')
|
||||
|
||||
files = loader.load_folder_files(folder, recursive=False)
|
||||
self.assertNotIn(file2, files)
|
||||
|
||||
files = loader.load_folder_files(folder)
|
||||
self.assertIn(file2, files)
|
||||
self.assertNotIn(file1, files)
|
||||
|
||||
files = loader.load_folder_files(folder)
|
||||
api_file = os.path.join(os.getcwd(), 'tests', 'api', 'basic.yml')
|
||||
self.assertIn(api_file, files)
|
||||
|
||||
files = loader.load_folder_files("not_existed_foulder", recursive=False)
|
||||
self.assertEqual([], files)
|
||||
|
||||
files = loader.load_folder_files(file2, recursive=False)
|
||||
self.assertEqual([], files)
|
||||
|
||||
def test_load_dot_env_file(self):
|
||||
self.assertNotIn("PROJECT_KEY", os.environ)
|
||||
loader.load_dot_env_file("tests/data/test.env")
|
||||
self.assertIn("PROJECT_KEY", os.environ)
|
||||
self.assertEqual(os.environ["UserName"], "debugtalk")
|
||||
|
||||
def test_load_env_path_not_exist(self):
|
||||
with self.assertRaises(exceptions.FileNotFound):
|
||||
loader.load_dot_env_file("not_exist.env")
|
||||
|
||||
|
||||
class TestSuiteLoader(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
loader.overall_def_dict = {
|
||||
"api": {},
|
||||
"suite": {}
|
||||
}
|
||||
|
||||
def test_load_test_dependencies(self):
|
||||
loader.load_test_dependencies()
|
||||
overall_def_dict = loader.overall_def_dict
|
||||
self.assertIn("get_token", overall_def_dict["api"])
|
||||
self.assertIn("create_and_check", overall_def_dict["suite"])
|
||||
|
||||
def test_load_api_file(self):
|
||||
loader.load_api_file("tests/api/basic.yml")
|
||||
overall_api_def_dict = loader.overall_def_dict["api"]
|
||||
self.assertIn("get_token",overall_api_def_dict)
|
||||
self.assertEqual("/api/get-token", overall_api_def_dict["get_token"]["request"]["url"])
|
||||
self.assertIn("$user_agent", overall_api_def_dict["get_token"]["function_meta"]["args"])
|
||||
self.assertEqual(len(overall_api_def_dict["get_token"]["validate"]), 3)
|
||||
|
||||
def test_load_test_file_suite(self):
|
||||
loader.load_api_file("tests/api/basic.yml")
|
||||
testset = loader.load_test_file("tests/suite/create_and_get.yml")
|
||||
self.assertEqual(testset["config"]["name"], "create user and check result.")
|
||||
self.assertEqual(len(testset["testcases"]), 3)
|
||||
self.assertEqual(testset["testcases"][0]["name"], "make sure user $uid does not exist")
|
||||
self.assertEqual(testset["testcases"][0]["request"]["url"], "/api/users/$uid")
|
||||
|
||||
def test_load_test_file_testcase(self):
|
||||
loader.load_test_dependencies()
|
||||
testset = loader.load_test_file("tests/testcases/smoketest.yml")
|
||||
self.assertEqual(testset["config"]["name"], "smoketest")
|
||||
self.assertEqual(testset["config"]["path"], "tests/testcases/smoketest.yml")
|
||||
self.assertIn("device_sn", testset["config"]["variables"][0])
|
||||
self.assertEqual(len(testset["testcases"]), 8)
|
||||
self.assertEqual(testset["testcases"][0]["name"], "get token")
|
||||
|
||||
def test_get_block_by_name(self):
|
||||
loader.load_test_dependencies()
|
||||
ref_call = "get_user($uid, $token)"
|
||||
block = loader._get_block_by_name(ref_call, "api")
|
||||
self.assertEqual(block["request"]["url"], "/api/users/$uid")
|
||||
self.assertEqual(block["function_meta"]["func_name"], "get_user")
|
||||
self.assertEqual(block["function_meta"]["args"], ['$uid', '$token'])
|
||||
|
||||
def test_get_block_by_name_args_mismatch(self):
|
||||
loader.load_test_dependencies()
|
||||
ref_call = "get_user($uid, $token, $var)"
|
||||
with self.assertRaises(exceptions.ParamsError):
|
||||
loader._get_block_by_name(ref_call, "api")
|
||||
|
||||
def test_override_block(self):
|
||||
loader.load_test_dependencies()
|
||||
def_block = loader._get_block_by_name("get_token($user_agent, $device_sn, $os_platform, $app_version)", "api")
|
||||
test_block = {
|
||||
"name": "override block",
|
||||
"variables": [
|
||||
{"var": 123}
|
||||
],
|
||||
'request': {
|
||||
'url': '/api/get-token', 'method': 'POST', 'headers': {'user_agent': '$user_agent', 'device_sn': '$device_sn', 'os_platform': '$os_platform', 'app_version': '$app_version'}, 'json': {'sign': '${get_sign($user_agent, $device_sn, $os_platform, $app_version)}'}},
|
||||
'validate': [
|
||||
{'eq': ['status_code', 201]},
|
||||
{'len_eq': ['content.token', 32]}
|
||||
]
|
||||
}
|
||||
|
||||
utils._override_block(def_block, test_block)
|
||||
self.assertEqual(test_block["name"], "override block")
|
||||
self.assertIn({'check': 'status_code', 'expect': 201, 'comparator': 'eq'}, test_block["validate"])
|
||||
self.assertIn({'check': 'content.token', 'comparator': 'len_eq', 'expect': 32}, test_block["validate"])
|
||||
|
||||
def test_get_test_definition_api(self):
|
||||
loader.load_test_dependencies()
|
||||
api_def = loader._get_test_definition("get_headers", "api")
|
||||
self.assertEqual(api_def["request"]["url"], "/headers")
|
||||
self.assertEqual(len(api_def["setup_hooks"]), 2)
|
||||
self.assertEqual(len(api_def["teardown_hooks"]), 1)
|
||||
|
||||
with self.assertRaises(exceptions.ApiNotFound):
|
||||
loader._get_test_definition("get_token_XXX", "api")
|
||||
|
||||
def test_get_test_definition_suite(self):
|
||||
loader.load_test_dependencies()
|
||||
api_def = loader._get_test_definition("create_and_check", "suite")
|
||||
self.assertEqual(api_def["config"]["name"], "create user and check result.")
|
||||
|
||||
with self.assertRaises(exceptions.SuiteNotFound):
|
||||
loader._get_test_definition("create_and_check_XXX", "suite")
|
||||
|
||||
def test_load_testcases_by_path_files(self):
|
||||
testsets_list = []
|
||||
|
||||
# absolute file path
|
||||
path = os.path.join(
|
||||
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
|
||||
testset_list = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list), 1)
|
||||
self.assertIn("path", testset_list[0]["config"])
|
||||
self.assertEqual(testset_list[0]["config"]["path"], path)
|
||||
self.assertEqual(len(testset_list[0]["testcases"]), 3)
|
||||
testsets_list.extend(testset_list)
|
||||
|
||||
# relative file path
|
||||
path = 'tests/data/demo_testset_hardcode.yml'
|
||||
testset_list = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list), 1)
|
||||
self.assertIn("path", testset_list[0]["config"])
|
||||
self.assertIn(path, testset_list[0]["config"]["path"])
|
||||
self.assertEqual(len(testset_list[0]["testcases"]), 3)
|
||||
testsets_list.extend(testset_list)
|
||||
|
||||
# list/set container with file(s)
|
||||
path = [
|
||||
os.path.join(os.getcwd(), 'tests/data/demo_testset_hardcode.json'),
|
||||
'tests/data/demo_testset_hardcode.yml'
|
||||
]
|
||||
testset_list = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list), 2)
|
||||
self.assertEqual(len(testset_list[0]["testcases"]), 3)
|
||||
self.assertEqual(len(testset_list[1]["testcases"]), 3)
|
||||
testsets_list.extend(testset_list)
|
||||
self.assertEqual(len(testsets_list), 4)
|
||||
|
||||
for testset in testsets_list:
|
||||
for test in testset["testcases"]:
|
||||
self.assertIn('name', test)
|
||||
self.assertIn('request', test)
|
||||
self.assertIn('url', test['request'])
|
||||
self.assertIn('method', test['request'])
|
||||
|
||||
def test_load_testcases_by_path_folder(self):
|
||||
loader.load_test_dependencies()
|
||||
# absolute folder path
|
||||
path = os.path.join(os.getcwd(), 'tests/data')
|
||||
testset_list_1 = loader.load_testsets_by_path(path)
|
||||
self.assertGreater(len(testset_list_1), 4)
|
||||
|
||||
# relative folder path
|
||||
path = 'tests/data/'
|
||||
testset_list_2 = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list_1), len(testset_list_2))
|
||||
|
||||
# list/set container with file(s)
|
||||
path = [
|
||||
os.path.join(os.getcwd(), 'tests/data'),
|
||||
'tests/data/'
|
||||
]
|
||||
testset_list_3 = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list_3), 2 * len(testset_list_1))
|
||||
|
||||
def test_load_testcases_by_path_not_exist(self):
|
||||
# absolute folder path
|
||||
path = os.path.join(os.getcwd(), 'tests/data_not_exist')
|
||||
testset_list_1 = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(testset_list_1, [])
|
||||
|
||||
# relative folder path
|
||||
path = 'tests/data_not_exist'
|
||||
testset_list_2 = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(testset_list_2, [])
|
||||
|
||||
# list/set container with file(s)
|
||||
path = [
|
||||
os.path.join(os.getcwd(), 'tests/data_not_exist'),
|
||||
'tests/data_not_exist/'
|
||||
]
|
||||
testset_list_3 = loader.load_testsets_by_path(path)
|
||||
self.assertEqual(testset_list_3, [])
|
||||
|
||||
def test_load_testcases_by_path_layered(self):
|
||||
loader.load_test_dependencies()
|
||||
path = os.path.join(
|
||||
os.getcwd(), 'tests/data/demo_testset_layer.yml')
|
||||
testsets_list = loader.load_testsets_by_path(path)
|
||||
self.assertIn("variables", testsets_list[0]["config"])
|
||||
self.assertIn("request", testsets_list[0]["config"])
|
||||
self.assertIn("request", testsets_list[0]["testcases"][0])
|
||||
self.assertIn("url", testsets_list[0]["testcases"][0]["request"])
|
||||
self.assertIn("validate", testsets_list[0]["testcases"][0])
|
||||
68
tests/test_parser.py
Normal file
68
tests/test_parser.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import os
|
||||
import unittest
|
||||
from httprunner import parser, exceptions
|
||||
|
||||
|
||||
class TestParser(unittest.TestCase):
|
||||
|
||||
def test_parse_string_value(self):
|
||||
self.assertEqual(parser.parse_string_value("123"), 123)
|
||||
self.assertEqual(parser.parse_string_value("12.3"), 12.3)
|
||||
self.assertEqual(parser.parse_string_value("a123"), "a123")
|
||||
self.assertEqual(parser.parse_string_value("$var"), "$var")
|
||||
self.assertEqual(parser.parse_string_value("${func}"), "${func}")
|
||||
|
||||
def test_parse_function(self):
|
||||
self.assertEqual(
|
||||
parser.parse_function("func()"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func(5)"),
|
||||
{'func_name': 'func', 'args': [5], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func(1, 2)"),
|
||||
{'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func(a=1, b=2)"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func(a= 1, b =2)"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func(1, 2, a=3, b=4)"),
|
||||
{'func_name': 'func', 'args': [1, 2], 'kwargs': {'a': 3, 'b': 4}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func($request, 123)"),
|
||||
{'func_name': 'func', 'args': ["$request", 123], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func( )"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func(hello world, a=3, b=4)"),
|
||||
{'func_name': 'func', 'args': ["hello world"], 'kwargs': {'a': 3, 'b': 4}}
|
||||
)
|
||||
self.assertEqual(
|
||||
parser.parse_function("func($request, 12 3)"),
|
||||
{'func_name': 'func', 'args': ["$request", '12 3'], 'kwargs': {}}
|
||||
)
|
||||
|
||||
def test_parse_validator(self):
|
||||
validator = {"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
self.assertEqual(
|
||||
parser.parse_validator(validator),
|
||||
{"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
)
|
||||
|
||||
validator = {'eq': ['status_code', 201]}
|
||||
self.assertEqual(
|
||||
parser.parse_validator(validator),
|
||||
{"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
)
|
||||
@@ -2,7 +2,6 @@ import os
|
||||
import time
|
||||
|
||||
from httprunner import HttpRunner, exceptions, loader, runner
|
||||
from httprunner.testcase import TestcaseLoader
|
||||
from httprunner.utils import deep_update_dict
|
||||
from tests.base import ApiServerUnittest
|
||||
|
||||
@@ -380,7 +379,7 @@ class TestRunner(ApiServerUnittest):
|
||||
def test_run_testcase_with_empty_header(self):
|
||||
testcase_file_path = os.path.join(
|
||||
os.getcwd(), 'tests/data/test_bugfix.yml')
|
||||
testsets = TestcaseLoader.load_testsets_by_path(testcase_file_path)
|
||||
testsets = loader.load_testsets_by_path(testcase_file_path)
|
||||
testset = testsets[0]
|
||||
config_dict_headers = testset["config"]["request"]["headers"]
|
||||
test_dict_headers = testset["testcases"][0]["request"]["headers"]
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import os
|
||||
|
||||
from httprunner import task
|
||||
from httprunner.testcase import TestcaseLoader
|
||||
from httprunner import loader, task
|
||||
from tests.base import ApiServerUnittest
|
||||
|
||||
|
||||
@@ -17,7 +16,7 @@ class TestTask(ApiServerUnittest):
|
||||
|
||||
def test_create_suite(self):
|
||||
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo_testset_variables.yml')
|
||||
testset = TestcaseLoader.load_test_file(testcase_file_path)
|
||||
testset = loader.load_test_file(testcase_file_path)
|
||||
suite = task.TestSuite(testset)
|
||||
self.assertEqual(suite.countTestCases(), 3)
|
||||
for testcase in suite:
|
||||
|
||||
@@ -2,193 +2,7 @@ import os
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from httprunner import testcase
|
||||
from httprunner.exceptions import ApiNotFound, ParamsError, SuiteNotFound
|
||||
from httprunner.testcase import TestcaseLoader
|
||||
|
||||
|
||||
class TestTestcaseLoader(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
TestcaseLoader.overall_def_dict = {
|
||||
"api": {},
|
||||
"suite": {}
|
||||
}
|
||||
|
||||
def test_load_test_dependencies(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
overall_def_dict = TestcaseLoader.overall_def_dict
|
||||
self.assertIn("get_token", overall_def_dict["api"])
|
||||
self.assertIn("create_and_check", overall_def_dict["suite"])
|
||||
|
||||
def test_load_api_file(self):
|
||||
TestcaseLoader.load_api_file("tests/api/basic.yml")
|
||||
overall_api_def_dict = TestcaseLoader.overall_def_dict["api"]
|
||||
self.assertIn("get_token",overall_api_def_dict)
|
||||
self.assertEqual("/api/get-token", overall_api_def_dict["get_token"]["request"]["url"])
|
||||
self.assertIn("$user_agent", overall_api_def_dict["get_token"]["function_meta"]["args"])
|
||||
self.assertEqual(len(overall_api_def_dict["get_token"]["validate"]), 3)
|
||||
|
||||
def test_load_test_file_suite(self):
|
||||
TestcaseLoader.load_api_file("tests/api/basic.yml")
|
||||
testset = TestcaseLoader.load_test_file("tests/suite/create_and_get.yml")
|
||||
self.assertEqual(testset["config"]["name"], "create user and check result.")
|
||||
self.assertEqual(len(testset["testcases"]), 3)
|
||||
self.assertEqual(testset["testcases"][0]["name"], "make sure user $uid does not exist")
|
||||
self.assertEqual(testset["testcases"][0]["request"]["url"], "/api/users/$uid")
|
||||
|
||||
def test_load_test_file_testcase(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
testset = TestcaseLoader.load_test_file("tests/testcases/smoketest.yml")
|
||||
self.assertEqual(testset["config"]["name"], "smoketest")
|
||||
self.assertEqual(testset["config"]["path"], "tests/testcases/smoketest.yml")
|
||||
self.assertIn("device_sn", testset["config"]["variables"][0])
|
||||
self.assertEqual(len(testset["testcases"]), 8)
|
||||
self.assertEqual(testset["testcases"][0]["name"], "get token")
|
||||
|
||||
def test_get_block_by_name(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
ref_call = "get_user($uid, $token)"
|
||||
block = TestcaseLoader._get_block_by_name(ref_call, "api")
|
||||
self.assertEqual(block["request"]["url"], "/api/users/$uid")
|
||||
self.assertEqual(block["function_meta"]["func_name"], "get_user")
|
||||
self.assertEqual(block["function_meta"]["args"], ['$uid', '$token'])
|
||||
|
||||
def test_get_block_by_name_args_mismatch(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
ref_call = "get_user($uid, $token, $var)"
|
||||
with self.assertRaises(ParamsError):
|
||||
TestcaseLoader._get_block_by_name(ref_call, "api")
|
||||
|
||||
def test_get_test_definition_api(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
api_def = TestcaseLoader._get_test_definition("get_headers", "api")
|
||||
self.assertEqual(api_def["request"]["url"], "/headers")
|
||||
self.assertEqual(len(api_def["setup_hooks"]), 2)
|
||||
self.assertEqual(len(api_def["teardown_hooks"]), 1)
|
||||
|
||||
with self.assertRaises(ApiNotFound):
|
||||
TestcaseLoader._get_test_definition("get_token_XXX", "api")
|
||||
|
||||
def test_get_test_definition_suite(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
api_def = TestcaseLoader._get_test_definition("create_and_check", "suite")
|
||||
self.assertEqual(api_def["config"]["name"], "create user and check result.")
|
||||
|
||||
with self.assertRaises(SuiteNotFound):
|
||||
TestcaseLoader._get_test_definition("create_and_check_XXX", "suite")
|
||||
|
||||
def test_override_block(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
def_block = TestcaseLoader._get_block_by_name("get_token($user_agent, $device_sn, $os_platform, $app_version)", "api")
|
||||
test_block = {
|
||||
"name": "override block",
|
||||
"variables": [
|
||||
{"var": 123}
|
||||
],
|
||||
'request': {
|
||||
'url': '/api/get-token', 'method': 'POST', 'headers': {'user_agent': '$user_agent', 'device_sn': '$device_sn', 'os_platform': '$os_platform', 'app_version': '$app_version'}, 'json': {'sign': '${get_sign($user_agent, $device_sn, $os_platform, $app_version)}'}},
|
||||
'validate': [
|
||||
{'eq': ['status_code', 201]},
|
||||
{'len_eq': ['content.token', 32]}
|
||||
]
|
||||
}
|
||||
|
||||
TestcaseLoader._override_block(def_block, test_block)
|
||||
self.assertEqual(test_block["name"], "override block")
|
||||
self.assertIn({'check': 'status_code', 'expect': 201, 'comparator': 'eq'}, test_block["validate"])
|
||||
self.assertIn({'check': 'content.token', 'comparator': 'len_eq', 'expect': 32}, test_block["validate"])
|
||||
|
||||
def test_load_testcases_by_path_files(self):
|
||||
testsets_list = []
|
||||
|
||||
# absolute file path
|
||||
path = os.path.join(
|
||||
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
|
||||
testset_list = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list), 1)
|
||||
self.assertIn("path", testset_list[0]["config"])
|
||||
self.assertEqual(testset_list[0]["config"]["path"], path)
|
||||
self.assertEqual(len(testset_list[0]["testcases"]), 3)
|
||||
testsets_list.extend(testset_list)
|
||||
|
||||
# relative file path
|
||||
path = 'tests/data/demo_testset_hardcode.yml'
|
||||
testset_list = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list), 1)
|
||||
self.assertIn("path", testset_list[0]["config"])
|
||||
self.assertIn(path, testset_list[0]["config"]["path"])
|
||||
self.assertEqual(len(testset_list[0]["testcases"]), 3)
|
||||
testsets_list.extend(testset_list)
|
||||
|
||||
# list/set container with file(s)
|
||||
path = [
|
||||
os.path.join(os.getcwd(), 'tests/data/demo_testset_hardcode.json'),
|
||||
'tests/data/demo_testset_hardcode.yml'
|
||||
]
|
||||
testset_list = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list), 2)
|
||||
self.assertEqual(len(testset_list[0]["testcases"]), 3)
|
||||
self.assertEqual(len(testset_list[1]["testcases"]), 3)
|
||||
testsets_list.extend(testset_list)
|
||||
self.assertEqual(len(testsets_list), 4)
|
||||
|
||||
for testset in testsets_list:
|
||||
for test in testset["testcases"]:
|
||||
self.assertIn('name', test)
|
||||
self.assertIn('request', test)
|
||||
self.assertIn('url', test['request'])
|
||||
self.assertIn('method', test['request'])
|
||||
|
||||
def test_load_testcases_by_path_folder(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
# absolute folder path
|
||||
path = os.path.join(os.getcwd(), 'tests/data')
|
||||
testset_list_1 = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertGreater(len(testset_list_1), 4)
|
||||
|
||||
# relative folder path
|
||||
path = 'tests/data/'
|
||||
testset_list_2 = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list_1), len(testset_list_2))
|
||||
|
||||
# list/set container with file(s)
|
||||
path = [
|
||||
os.path.join(os.getcwd(), 'tests/data'),
|
||||
'tests/data/'
|
||||
]
|
||||
testset_list_3 = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(len(testset_list_3), 2 * len(testset_list_1))
|
||||
|
||||
def test_load_testcases_by_path_not_exist(self):
|
||||
# absolute folder path
|
||||
path = os.path.join(os.getcwd(), 'tests/data_not_exist')
|
||||
testset_list_1 = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(testset_list_1, [])
|
||||
|
||||
# relative folder path
|
||||
path = 'tests/data_not_exist'
|
||||
testset_list_2 = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(testset_list_2, [])
|
||||
|
||||
# list/set container with file(s)
|
||||
path = [
|
||||
os.path.join(os.getcwd(), 'tests/data_not_exist'),
|
||||
'tests/data_not_exist/'
|
||||
]
|
||||
testset_list_3 = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertEqual(testset_list_3, [])
|
||||
|
||||
def test_load_testcases_by_path_layered(self):
|
||||
TestcaseLoader.load_test_dependencies()
|
||||
path = os.path.join(
|
||||
os.getcwd(), 'tests/data/demo_testset_layer.yml')
|
||||
testsets_list = TestcaseLoader.load_testsets_by_path(path)
|
||||
self.assertIn("variables", testsets_list[0]["config"])
|
||||
self.assertIn("request", testsets_list[0]["config"])
|
||||
self.assertIn("request", testsets_list[0]["testcases"][0])
|
||||
self.assertIn("url", testsets_list[0]["testcases"][0]["request"])
|
||||
self.assertIn("validate", testsets_list[0]["testcases"][0])
|
||||
from httprunner import exceptions, loader, testcase
|
||||
|
||||
|
||||
class TestcaseParserUnittest(unittest.TestCase):
|
||||
@@ -414,62 +228,13 @@ class TestcaseParserUnittest(unittest.TestCase):
|
||||
def test_eval_content_variables_search_upward(self):
|
||||
testcase_parser = testcase.TestcaseParser()
|
||||
|
||||
with self.assertRaises(ParamsError):
|
||||
with self.assertRaises(exceptions.ParamsError):
|
||||
testcase_parser._eval_content_variables("/api/$SECRET_KEY")
|
||||
|
||||
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
|
||||
content = testcase_parser._eval_content_variables("/api/$SECRET_KEY")
|
||||
self.assertEqual(content, "/api/DebugTalk")
|
||||
|
||||
def test_parse_string_value(self):
|
||||
self.assertEqual(testcase.parse_string_value("123"), 123)
|
||||
self.assertEqual(testcase.parse_string_value("12.3"), 12.3)
|
||||
self.assertEqual(testcase.parse_string_value("a123"), "a123")
|
||||
self.assertEqual(testcase.parse_string_value("$var"), "$var")
|
||||
self.assertEqual(testcase.parse_string_value("${func}"), "${func}")
|
||||
|
||||
def test_parse_function(self):
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func()"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func(5)"),
|
||||
{'func_name': 'func', 'args': [5], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func(1, 2)"),
|
||||
{'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func(a=1, b=2)"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func(a= 1, b =2)"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func(1, 2, a=3, b=4)"),
|
||||
{'func_name': 'func', 'args': [1, 2], 'kwargs': {'a': 3, 'b': 4}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func($request, 123)"),
|
||||
{'func_name': 'func', 'args': ["$request", 123], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func( )"),
|
||||
{'func_name': 'func', 'args': [], 'kwargs': {}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func(hello world, a=3, b=4)"),
|
||||
{'func_name': 'func', 'args': ["hello world"], 'kwargs': {'a': 3, 'b': 4}}
|
||||
)
|
||||
self.assertEqual(
|
||||
testcase.parse_function("func($request, 12 3)"),
|
||||
{'func_name': 'func', 'args': ["$request", '12 3'], 'kwargs': {}}
|
||||
)
|
||||
|
||||
|
||||
def test_parse_content_with_bindings_variables(self):
|
||||
variables = {
|
||||
@@ -486,7 +251,7 @@ class TestcaseParserUnittest(unittest.TestCase):
|
||||
"123str_value1/456"
|
||||
)
|
||||
|
||||
with self.assertRaises(ParamsError):
|
||||
with self.assertRaises(exceptions.ParamsError):
|
||||
testcase_parser.eval_content_with_bindings("$str_3")
|
||||
|
||||
self.assertEqual(
|
||||
@@ -596,7 +361,7 @@ class TestcaseParserUnittest(unittest.TestCase):
|
||||
def test_eval_content_functions_search_upward(self):
|
||||
testcase_parser = testcase.TestcaseParser()
|
||||
|
||||
with self.assertRaises(ParamsError):
|
||||
with self.assertRaises(exceptions.ParamsError):
|
||||
testcase_parser._eval_content_functions("/api/${gen_md5(abc)}")
|
||||
|
||||
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
|
||||
@@ -649,105 +414,6 @@ class TestcaseParserUnittest(unittest.TestCase):
|
||||
3
|
||||
)
|
||||
|
||||
|
||||
def test_substitute_variables_with_mapping(self):
|
||||
content = {
|
||||
'request': {
|
||||
'url': '/api/users/$uid',
|
||||
'method': "$method",
|
||||
'headers': {'token': '$token'},
|
||||
'data': {
|
||||
"null": None,
|
||||
"true": True,
|
||||
"false": False,
|
||||
"empty_str": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
mapping = {
|
||||
"$uid": 1000,
|
||||
"$method": "POST"
|
||||
}
|
||||
result = testcase.substitute_variables_with_mapping(content, mapping)
|
||||
self.assertEqual("/api/users/1000", result["request"]["url"])
|
||||
self.assertEqual("$token", result["request"]["headers"]["token"])
|
||||
self.assertEqual("POST", result["request"]["method"])
|
||||
self.assertIsNone(result["request"]["data"]["null"])
|
||||
self.assertTrue(result["request"]["data"]["true"])
|
||||
self.assertFalse(result["request"]["data"]["false"])
|
||||
self.assertEqual("", result["request"]["data"]["empty_str"])
|
||||
|
||||
|
||||
def test_parse_validator(self):
|
||||
validator = {"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
self.assertEqual(
|
||||
testcase.parse_validator(validator),
|
||||
{"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
)
|
||||
|
||||
validator = {'eq': ['status_code', 201]}
|
||||
self.assertEqual(
|
||||
testcase.parse_validator(validator),
|
||||
{"check": "status_code", "comparator": "eq", "expect": 201}
|
||||
)
|
||||
|
||||
def test_merge_validator(self):
|
||||
def_validators = [
|
||||
{'eq': ['v1', 200]},
|
||||
{"check": "s2", "expect": 16, "comparator": "len_eq"}
|
||||
]
|
||||
current_validators = [
|
||||
{"check": "v1", "expect": 201},
|
||||
{'len_eq': ['s3', 12]}
|
||||
]
|
||||
|
||||
merged_validators = testcase._merge_validator(def_validators, current_validators)
|
||||
self.assertIn(
|
||||
{"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
merged_validators
|
||||
)
|
||||
self.assertIn(
|
||||
{"check": "s2", "expect": 16, "comparator": "len_eq"},
|
||||
merged_validators
|
||||
)
|
||||
self.assertIn(
|
||||
{"check": "s3", "expect": 12, "comparator": "len_eq"},
|
||||
merged_validators
|
||||
)
|
||||
|
||||
def test_merge_validator_with_dict(self):
|
||||
def_validators = [
|
||||
{'eq': ["a", {"v": 1}]},
|
||||
{'eq': [{"b": 1}, 200]}
|
||||
]
|
||||
current_validators = [
|
||||
{'len_eq': ['s3', 12]},
|
||||
{'eq': [{"b": 1}, 201]}
|
||||
]
|
||||
|
||||
merged_validators = testcase._merge_validator(def_validators, current_validators)
|
||||
self.assertEqual(len(merged_validators), 3)
|
||||
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'eq'}, merged_validators)
|
||||
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'eq'}, merged_validators)
|
||||
|
||||
def test_merge_extractor(self):
|
||||
api_extrators = [{"var1": "val1"}, {"var2": "val2"}]
|
||||
current_extractors = [{"var1": "val111"}, {"var3": "val3"}]
|
||||
|
||||
merged_extractors = testcase._merge_extractor(api_extrators, current_extractors)
|
||||
self.assertIn(
|
||||
{"var1": "val111"},
|
||||
merged_extractors
|
||||
)
|
||||
self.assertIn(
|
||||
{"var2": "val2"},
|
||||
merged_extractors
|
||||
)
|
||||
self.assertIn(
|
||||
{"var3": "val3"},
|
||||
merged_extractors
|
||||
)
|
||||
|
||||
def test_is_testsets(self):
|
||||
data_structure = "path/to/file"
|
||||
self.assertFalse(testcase.is_testsets(data_structure))
|
||||
|
||||
@@ -61,6 +61,90 @@ class TestUtils(ApiServerUnittest):
|
||||
result = utils.query_json(json_content, query)
|
||||
self.assertEqual(result, "L")
|
||||
|
||||
def test_substitute_variables_with_mapping(self):
|
||||
content = {
|
||||
'request': {
|
||||
'url': '/api/users/$uid',
|
||||
'method': "$method",
|
||||
'headers': {'token': '$token'},
|
||||
'data': {
|
||||
"null": None,
|
||||
"true": True,
|
||||
"false": False,
|
||||
"empty_str": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
mapping = {
|
||||
"$uid": 1000,
|
||||
"$method": "POST"
|
||||
}
|
||||
result = utils.substitute_variables_with_mapping(content, mapping)
|
||||
self.assertEqual("/api/users/1000", result["request"]["url"])
|
||||
self.assertEqual("$token", result["request"]["headers"]["token"])
|
||||
self.assertEqual("POST", result["request"]["method"])
|
||||
self.assertIsNone(result["request"]["data"]["null"])
|
||||
self.assertTrue(result["request"]["data"]["true"])
|
||||
self.assertFalse(result["request"]["data"]["false"])
|
||||
self.assertEqual("", result["request"]["data"]["empty_str"])
|
||||
|
||||
def test_merge_validator(self):
|
||||
def_validators = [
|
||||
{'eq': ['v1', 200]},
|
||||
{"check": "s2", "expect": 16, "comparator": "len_eq"}
|
||||
]
|
||||
current_validators = [
|
||||
{"check": "v1", "expect": 201},
|
||||
{'len_eq': ['s3', 12]}
|
||||
]
|
||||
|
||||
merged_validators = utils._merge_validator(def_validators, current_validators)
|
||||
self.assertIn(
|
||||
{"check": "v1", "expect": 201, "comparator": "eq"},
|
||||
merged_validators
|
||||
)
|
||||
self.assertIn(
|
||||
{"check": "s2", "expect": 16, "comparator": "len_eq"},
|
||||
merged_validators
|
||||
)
|
||||
self.assertIn(
|
||||
{"check": "s3", "expect": 12, "comparator": "len_eq"},
|
||||
merged_validators
|
||||
)
|
||||
|
||||
def test_merge_validator_with_dict(self):
|
||||
def_validators = [
|
||||
{'eq': ["a", {"v": 1}]},
|
||||
{'eq': [{"b": 1}, 200]}
|
||||
]
|
||||
current_validators = [
|
||||
{'len_eq': ['s3', 12]},
|
||||
{'eq': [{"b": 1}, 201]}
|
||||
]
|
||||
|
||||
merged_validators = utils._merge_validator(def_validators, current_validators)
|
||||
self.assertEqual(len(merged_validators), 3)
|
||||
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'eq'}, merged_validators)
|
||||
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'eq'}, merged_validators)
|
||||
|
||||
def test_merge_extractor(self):
|
||||
api_extrators = [{"var1": "val1"}, {"var2": "val2"}]
|
||||
current_extractors = [{"var1": "val111"}, {"var3": "val3"}]
|
||||
|
||||
merged_extractors = utils._merge_extractor(api_extrators, current_extractors)
|
||||
self.assertIn(
|
||||
{"var1": "val111"},
|
||||
merged_extractors
|
||||
)
|
||||
self.assertIn(
|
||||
{"var2": "val2"},
|
||||
merged_extractors
|
||||
)
|
||||
self.assertIn(
|
||||
{"var3": "val3"},
|
||||
merged_extractors
|
||||
)
|
||||
|
||||
def test_get_uniform_comparator(self):
|
||||
self.assertEqual(utils.get_uniform_comparator("eq"), "equals")
|
||||
self.assertEqual(utils.get_uniform_comparator("=="), "equals")
|
||||
|
||||
Reference in New Issue
Block a user