Merge branch 'master' into bugfix

This commit is contained in:
debugtalk
2018-08-09 14:43:17 +08:00
committed by GitHub
21 changed files with 1519 additions and 1294 deletions

View File

@@ -1,7 +1,7 @@
__title__ = 'HttpRunner'
__description__ = 'One-stop solution for HTTP(S) testing.'
__url__ = 'https://github.com/HttpRunner/HttpRunner'
__version__ = '1.5.9'
__version__ = '1.5.10'
__author__ = 'debugtalk'
__author_email__ = 'mail@debugtalk.com'
__license__ = 'MIT'

View File

@@ -2,11 +2,288 @@
import copy
import os
import random
import re
import sys
from httprunner import built_in, exceptions, logger, parser, testcase, utils
from httprunner.compat import OrderedDict
from httprunner import built_in, exceptions, loader, logger, parser, utils
from httprunner.compat import OrderedDict, basestring, builtin_str, str
def parse_parameters(parameters, testset_path=None):
""" parse parameters and generate cartesian product.
Args:
parameters (list) parameters: parameter name and value in list
parameter value may be in three types:
(1) data list, e.g. ["iOS/10.1", "iOS/10.2", "iOS/10.3"]
(2) call built-in parameterize function, "${parameterize(account.csv)}"
(3) call custom function in debugtalk.py, "${gen_app_version()}"
testset_path (str): testset file path, used for locating csv file and debugtalk.py
Returns:
list: cartesian product list
Examples:
>>> parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": "${parameterize(account.csv)}"},
{"app_version": "${gen_app_version()}"}
]
>>> parse_parameters(parameters)
"""
testcase_parser = TestcaseParser(file_path=testset_path)
parsed_parameters_list = []
for parameter in parameters:
parameter_name, parameter_content = list(parameter.items())[0]
parameter_name_list = parameter_name.split("-")
if isinstance(parameter_content, list):
# (1) data list
# e.g. {"app_version": ["2.8.5", "2.8.6"]}
# => [{"app_version": "2.8.5", "app_version": "2.8.6"}]
# e.g. {"username-password": [["user1", "111111"], ["test2", "222222"]}
# => [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
parameter_content_list = []
for parameter_item in parameter_content:
if not isinstance(parameter_item, (list, tuple)):
# "2.8.5" => ["2.8.5"]
parameter_item = [parameter_item]
# ["app_version"], ["2.8.5"] => {"app_version": "2.8.5"}
# ["username", "password"], ["user1", "111111"] => {"username": "user1", "password": "111111"}
parameter_content_dict = dict(zip(parameter_name_list, parameter_item))
parameter_content_list.append(parameter_content_dict)
else:
# (2) & (3)
parsed_parameter_content = testcase_parser.eval_content_with_bindings(parameter_content)
# e.g. [{'app_version': '2.8.5'}, {'app_version': '2.8.6'}]
# e.g. [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
if not isinstance(parsed_parameter_content, list):
raise exceptions.ParamsError("parameters syntax error!")
parameter_content_list = [
# get subset by parameter name
{key: parameter_item[key] for key in parameter_name_list}
for parameter_item in parsed_parameter_content
]
parsed_parameters_list.append(parameter_content_list)
return utils.gen_cartesian_product(*parsed_parameters_list)
class TestcaseParser(object):
def __init__(self, variables={}, functions={}, file_path=None):
self.update_binded_variables(variables)
self.bind_functions(functions)
self.file_path = file_path
def update_binded_variables(self, variables):
""" bind variables to current testcase parser
@param (dict) variables, variables binds mapping
{
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"data": {"name": "user", "password": "123456"},
"uuid": 1000
}
"""
self.variables = variables
def bind_functions(self, functions):
""" bind functions to current testcase parser
@param (dict) functions, functions binds mapping
{
"add_two_nums": lambda a, b=1: a + b
}
"""
self.functions = functions
def _get_bind_item(self, item_type, item_name):
""" get specified function or variable.
Args:
item_type(str): functions or variables
item_name(str): function name or variable name
Returns:
object: specified function or variable object.
"""
if item_type == "functions":
if item_name in self.functions:
return self.functions[item_name]
try:
# check if builtin functions
item_func = eval(item_name)
if callable(item_func):
# is builtin function
return item_func
except (NameError, TypeError):
# is not builtin function, continue to search
pass
else:
# item_type == "variables":
if item_name in self.variables:
return self.variables[item_name]
debugtalk_module = loader.load_debugtalk_module(self.file_path)
return loader.get_module_item(debugtalk_module, item_type, item_name)
def get_bind_function(self, func_name):
return self._get_bind_item("functions", func_name)
def get_bind_variable(self, variable_name):
return self._get_bind_item("variables", variable_name)
def load_csv_list(self, csv_file_name, fetch_method="Sequential"):
""" locate csv file and load csv content.
Args:
csv_file_name (str): csv file name
fetch_method (str): fetch data method, defaults to Sequential.
If set to "random", csv data list will be reordered in random.
Returns:
list: csv data list
"""
csv_file_path = loader.locate_file(self.file_path, csv_file_name)
csv_content_list = loader.load_file(csv_file_path)
if fetch_method.lower() == "random":
random.shuffle(csv_content_list)
return csv_content_list
def _eval_content_functions(self, content):
functions_list = parser.extract_functions(content)
for func_content in functions_list:
function_meta = parser.parse_function(func_content)
func_name = function_meta['func_name']
args = function_meta.get('args', [])
kwargs = function_meta.get('kwargs', {})
args = self.eval_content_with_bindings(args)
kwargs = self.eval_content_with_bindings(kwargs)
if func_name in ["parameterize", "P"]:
eval_value = self.load_csv_list(*args, **kwargs)
else:
func = self.get_bind_function(func_name)
eval_value = func(*args, **kwargs)
func_content = "${" + func_content + "}"
if func_content == content:
# content is a variable
content = eval_value
else:
# content contains one or many variables
content = content.replace(
func_content,
str(eval_value), 1
)
return content
def _eval_content_variables(self, content):
""" replace all variables of string content with mapping value.
@param (str) content
@return (str) parsed content
e.g.
variable_mapping = {
"var_1": "abc",
"var_2": "def"
}
$var_1 => "abc"
$var_1#XYZ => "abc#XYZ"
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = parser.extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_variable(variable_name)
if "${}".format(variable_name) == content:
# content is a variable
content = variable_value
else:
# content contains one or several variables
if not isinstance(variable_value, str):
variable_value = builtin_str(variable_value)
content = content.replace(
"${}".format(variable_name),
variable_value, 1
)
return content
def eval_content_with_bindings(self, content):
""" parse content recursively, each variable and function in content will be evaluated.
@param (dict) content in any data structure
{
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1, 1)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
@return (dict) parsed content with evaluated bind values
{
"url": "http://127.0.0.1:5000/api/users/1000/2",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"sum": 3
},
"body": {"name": "user", "password": "123456"}
}
"""
if content is None:
return None
if isinstance(content, (list, tuple)):
return [
self.eval_content_with_bindings(item)
for item in content
]
if isinstance(content, dict):
evaluated_data = {}
for key, value in content.items():
eval_key = self.eval_content_with_bindings(key)
eval_value = self.eval_content_with_bindings(value)
evaluated_data[eval_key] = eval_value
return evaluated_data
if isinstance(content, basestring):
# content is in string format here
content = content.strip()
# replace functions with evaluated value
# Notice: _eval_content_functions must be called before _eval_content_variables
content = self._eval_content_functions(content)
# replace variables with binding value
content = self._eval_content_variables(content)
return content
class Context(object):
@@ -16,7 +293,7 @@ class Context(object):
def __init__(self):
self.testset_shared_variables_mapping = OrderedDict()
self.testcase_variables_mapping = OrderedDict()
self.testcase_parser = testcase.TestcaseParser()
self.testcase_parser = TestcaseParser()
self.evaluated_validators = []
self.init_context()
@@ -69,11 +346,9 @@ class Context(object):
def import_module_items(self, imported_module):
""" import module functions and variables and bind to testset context
"""
imported_functions_dict = utils.filter_module(imported_module, "function")
self.__update_context_functions_config("testset", imported_functions_dict)
imported_variables_dict = utils.filter_module(imported_module, "variable")
self.bind_variables(imported_variables_dict, "testset")
module_mapping = loader.load_python_module(imported_module)
self.__update_context_functions_config("testset", module_mapping["functions"])
self.bind_variables(module_mapping["variables"], "testset")
def bind_variables(self, variables, level="testcase"):
""" bind variables to testset context or current testcase context.
@@ -178,7 +453,7 @@ class Context(object):
if isinstance(check_item, (dict, list)) \
or parser.extract_variables(check_item) \
or testcase.extract_functions(check_item):
or parser.extract_functions(check_item):
# format 1/2/3
check_value = self.eval_content(check_item)
else:

View File

@@ -50,8 +50,5 @@ class VariableNotFound(NotFoundError):
class ApiNotFound(NotFoundError):
pass
class SuiteNotFound(NotFoundError):
pass
class TestcaseNotFound(NotFoundError):
pass

View File

@@ -1,10 +1,13 @@
import collections
import csv
import importlib
import io
import json
import os
import yaml
from httprunner import exceptions, logger, parser, utils
from httprunner import exceptions, logger, parser, validator
from httprunner.compat import OrderedDict
###############################################################################
## file loader
@@ -136,24 +139,190 @@ def load_folder_files(folder_path, recursive=True):
def load_dot_env_file(path):
""" load .env file and set to os.environ
""" load .env file
"""
if not path:
path = os.path.join(os.getcwd(), ".env")
if not os.path.isfile(path):
logger.log_debug(".env file not exist: {}".format(path))
return
return {}
else:
if not os.path.isfile(path):
raise exceptions.FileNotFound("env file not exist: {}".format(path))
logger.log_info("Loading environment variables from {}".format(path))
env_variables_mapping = {}
with io.open(path, 'r', encoding='utf-8') as fp:
for line in fp:
variable, value = line.split("=")
variable = variable.strip()
os.environ[variable] = value.strip()
logger.log_debug("Loaded variable: {}".format(variable))
if "=" in line:
variable, value = line.split("=")
elif ":" in line:
variable, value = line.split(":")
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
return env_variables_mapping
def locate_file(start_path, file_name):
""" locate filename and return file path.
searching will be recursive upward until current working directory.
Args:
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound("invalid path: {}".format(start_path))
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
if os.path.isabs(file_path):
file_path = file_path[len(os.getcwd())+1:]
return file_path
# current working directory
if os.path.abspath(start_dir_path) == os.getcwd():
raise exceptions.FileNotFound("{} not found in {}".format(file_name, start_path))
# locate recursive upward
return locate_file(os.path.dirname(start_dir_path), file_name)
###############################################################################
## debugtalk.py module loader
###############################################################################
def convert_module_name(python_file_path):
""" convert python file relative path to module name.
Args:
python_file_path (str): python file relative path
Returns:
str: module name
Examples:
>>> convert_module_name("debugtalk.py")
debugtalk
>>> convert_module_name("tests/debugtalk.py")
tests.debugtalk
>>> convert_module_name("tests/data/debugtalk.py")
tests.data.debugtalk
"""
module_name = python_file_path.replace("/", ".").rstrip(".py")
return module_name
def load_python_module(module):
""" load python module.
Args:
module: python module
Returns:
dict: variables and functions mapping for specified python module
{
"variables": {},
"functions": {}
}
"""
debugtalk_module = {
"variables": {},
"functions": {}
}
for name, item in vars(module).items():
if validator.is_function((name, item)):
debugtalk_module["functions"][name] = item
elif validator.is_variable((name, item)):
debugtalk_module["variables"][name] = item
else:
pass
return debugtalk_module
def load_debugtalk_module(start_path=None):
""" load debugtalk.py module.
Args:
start_path (str, optional): start locating path, maybe file path or directory path.
Defaults to current working directory.
Returns:
dict: variables and functions mapping for debugtalk.py
{
"variables": {},
"functions": {}
}
"""
start_path = start_path or os.getcwd()
try:
module_path = locate_file(start_path, "debugtalk.py")
module_name = convert_module_name(module_path)
except exceptions.FileNotFound:
return {
"variables": {},
"functions": {}
}
imported_module = importlib.import_module(module_name)
return load_python_module(imported_module)
def get_module_item(module_mapping, item_type, item_name):
""" get expected function or variable from module mapping.
Args:
module_mapping(dict): module mapping with variables and functions.
{
"variables": {},
"functions": {}
}
item_type(str): "functions" or "variables"
item_name(str): function name or variable name
Returns:
object: specified variable or function object.
Raises:
exceptions.FunctionNotFound: If specified function not found in module mapping
exceptions.VariableNotFound: If specified variable not found in module mapping
"""
try:
return module_mapping[item_type][item_name]
except KeyError:
err_msg = "{} not found in debugtalk.py module!\n".format(item_name)
err_msg += "module mapping: {}".format(module_mapping)
if item_type == "functions":
raise exceptions.FunctionNotFound(err_msg)
else:
raise exceptions.VariableNotFound(err_msg)
###############################################################################
@@ -168,7 +337,7 @@ overall_def_dict = {
testcases_cache_mapping = {}
def load_test_dependencies():
def _load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
@@ -177,12 +346,12 @@ def load_test_dependencies():
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
for test_file in load_folder_files(api_def_folder):
load_api_file(test_file)
_load_api_file(test_file)
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
for suite_file in load_folder_files(suite_def_folder):
suite = load_test_file(suite_file)
suite = _load_test_file(suite_file)
if "def" not in suite["config"]:
raise exceptions.ParamsError("def missed in suite file: {}!".format(suite_file))
@@ -192,7 +361,7 @@ def load_test_dependencies():
overall_def_dict["suite"][function_meta["func_name"]] = suite
def load_api_file(file_path):
def _load_api_file(file_path):
""" load api definition from file and store in overall_def_dict["api"]
api file should be in format below:
[
@@ -235,7 +404,7 @@ def load_api_file(file_path):
overall_def_dict["api"][func_name] = api_dict
def load_test_file(file_path):
def _load_test_file(file_path):
""" load testcase file or testsuite file
@param file_path: absolute valid file path
file_path should be in format below:
@@ -289,7 +458,7 @@ def load_test_file(file_path):
if "api" in test_block:
ref_call = test_block["api"]
def_block = _get_block_by_name(ref_call, "api")
utils._override_block(def_block, test_block)
_override_block(def_block, test_block)
testset["testcases"].append(test_block)
elif "suite" in test_block:
ref_call = test_block["suite"]
@@ -329,15 +498,15 @@ def _get_block_by_name(ref_call, ref_type):
args_mapping[item] = call_args[index]
if args_mapping:
block = utils.substitute_variables_with_mapping(block, args_mapping)
block = parser.parse_data(block, args_mapping)
return block
def _get_test_definition(name, ref_type):
""" get expected api or suite.
""" get expected api or testcase.
@params:
name: api or suite name
name: api or testcase name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
@@ -350,11 +519,152 @@ def _get_test_definition(name, ref_type):
raise exceptions.ApiNotFound(err_msg)
else:
# ref_type == "suite":
raise exceptions.SuiteNotFound(err_msg)
raise exceptions.TestcaseNotFound(err_msg)
return block
def _override_block(def_block, current_block):
""" override def_block with current_block
@param def_block:
{
"name": "get token",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
@param current_block:
{
"name": "get token",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
def_validators = def_block.get("validate") or def_block.get("validators", [])
current_validators = current_block.get("validate") or current_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
current_extractors = current_block.get("extract") \
or current_block.get("extractors") \
or current_block.get("extract_binds", [])
current_block.update(def_block)
current_block["validate"] = _merge_validator(
def_validators,
current_validators
)
current_block["extract"] = _merge_extractor(
def_extrators,
current_extractors
)
def _get_validators_mapping(validators):
""" get validators mapping from api or test validators
@param (list) validators:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
@return
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parser.parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def _merge_validator(def_validators, current_validators):
""" merge def_validators with current_validators
@params:
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not def_validators:
return current_validators
elif not current_validators:
return def_validators
else:
api_validators_mapping = _get_validators_mapping(def_validators)
test_validators_mapping = _get_validators_mapping(current_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def _merge_extractor(def_extrators, current_extractors):
""" merge def_extrators with current_extractors
@params:
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not def_extrators:
return current_extractors
elif not current_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in current_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def load_testcases(path):
""" load testcases from file path
@param path: path could be in several type
@@ -390,7 +700,7 @@ def load_testcases(path):
elif os.path.isfile(path):
try:
testcase = load_test_file(path)
testcase = _load_test_file(path)
if testcase["testcases"]:
testcases_list = [testcase]
else:
@@ -399,9 +709,21 @@ def load_testcases(path):
testcases_list = []
else:
err_msg = "file not found: {}".format(path)
err_msg = "path not exist: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
testcases_cache_mapping[path] = testcases_list
return testcases_list
def load(path):
""" main interface for loading testcases
@param (str) path: testcase file/folder path
@return (list) testcases list
"""
if validator.is_testcases(path):
return path
_load_test_dependencies()
return load_testcases(path)

View File

@@ -40,9 +40,8 @@ def gen_locustfile(testcase_file_path):
"templates",
"locustfile_template"
)
loader.load_test_dependencies()
testset = loader.load_test_file(testcase_file_path)
host = testset.get("config", {}).get("request", {}).get("base_url", "")
testcases = loader.load(testcase_file_path)
host = testcases[0].get("config", {}).get("request", {}).get("base_url", "")
with io.open(template_path, encoding='utf-8') as template:
with io.open(locustfile_path, 'w', encoding='utf-8') as locustfile:

View File

@@ -1,9 +1,14 @@
# encoding: utf-8
import ast
import os
import re
from httprunner import exceptions
from httprunner.compat import builtin_str, numeric_types, str
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
@@ -25,13 +30,26 @@ def parse_string_value(str_value):
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
Args:
content (str): string content
Returns:
list: variables list extracted from string content
Examples:
>>> extract_variables("$variable")
["variable"]
>>> extract_variables("/blog/$postid")
["postid"]
>>> extract_variables("/$var1/$var2")
["var1", "var2"]
>>> extract_variables("abc")
[]
"""
# TODO: change variable notation from $var to {{var}}
try:
@@ -40,16 +58,69 @@ def extract_variables(content):
return []
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
Args:
content (str): string content
Returns:
list: functions list extracted from string content
Examples:
>>> extract_functions("${func(5)}")
["func(5)"]
>>> extract_functions("${func(a=1, b=2)}")
["func(a=1, b=2)"]
>>> extract_functions("/api/1000?_t=${get_timestamp()}")
["get_timestamp()"]
>>> extract_functions("/api/${add(1, 2)}")
["add(1, 2)"]
>>> extract_functions("/api/${add(1, 2)}?_t=${get_timestamp()}")
["add(1, 2)", "get_timestamp()"]
"""
try:
return re.findall(function_regexp, content)
except TypeError:
return []
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
Args:
content (str): string content
Returns:
dict: function meta dict
{
"func_name": "xxx",
"args": [],
"kwargs": {}
}
Examples:
>>> parse_function("func()")
{'func_name': 'func', 'args': [], 'kwargs': {}}
>>> parse_function("func(5)")
{'func_name': 'func', 'args': [5], 'kwargs': {}}
>>> parse_function("func(1, 2)")
{'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
>>> parse_function("func(a=1, b=2)")
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
>>> parse_function("func(1, 2, a=3, b=4)")
{'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
matched = function_regexp_compile.match(content)
if not matched:
@@ -127,3 +198,55 @@ def parse_validator(validator):
"expect": expect_value,
"comparator": comparator
}
def parse_data(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
# TODO: refactor type check
# TODO: combine this with TestcaseParser
if content is None or isinstance(content, (numeric_types, bool, type)):
return content
if isinstance(content, (list, set, tuple)):
return [
parse_data(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = parse_data(key, mapping)
eval_value = parse_data(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
if not isinstance(value, str):
value = builtin_str(value)
content = content.replace(var, value)
return content

View File

@@ -3,7 +3,7 @@
import json
import re
from httprunner import exceptions, logger, testcase, utils
from httprunner import exceptions, logger, utils
from httprunner.compat import OrderedDict, basestring, is_py2
from requests.models import PreparedRequest
from requests.structures import CaseInsensitiveDict

View File

@@ -4,7 +4,7 @@ import copy
import sys
import unittest
from httprunner import exceptions, loader, logger, runner, testcase, utils
from httprunner import context, exceptions, loader, logger, runner, utils
from httprunner.compat import is_py3
from httprunner.report import (HtmlTestResult, get_platform, get_summary,
render_html_report)
@@ -78,7 +78,7 @@ class TestSuite(unittest.TestSuite):
config_dict_variables,
config_dict_parameters
)
self.testcase_parser = testcase.TestcaseParser()
self.testcase_parser = context.TestcaseParser()
testcases = testset.get("testcases", [])
for config_variables in config_parametered_variables_list:
@@ -114,7 +114,7 @@ class TestSuite(unittest.TestSuite):
def _get_parametered_variables(self, variables, parameters):
""" parameterize varaibles with parameters
"""
cartesian_product_parameters = testcase.parse_parameters(
cartesian_product_parameters = context.parse_parameters(
parameters,
self.testset_file_path
) or [{}]
@@ -177,11 +177,7 @@ def init_test_suites(path_or_testsets, mapping=None, http_client_session=None):
mapping (dict):
passed in variables mapping, it will override variables in config block
"""
if not testcase.is_testsets(path_or_testsets):
loader.load_test_dependencies()
testsets = loader.load_testcases(path_or_testsets)
else:
testsets = path_or_testsets
testsets = loader.load(path_or_testsets)
# TODO: move comparator uniform here
mapping = mapping or {}
@@ -210,7 +206,7 @@ class HttpRunner(object):
- dot_env_path: .env file path
"""
dot_env_path = kwargs.pop("dot_env_path", None)
loader.load_dot_env_file(dot_env_path)
utils.set_os_environ(loader.load_dot_env_file(dot_env_path))
kwargs.setdefault("resultclass", HtmlTestResult)
self.runner = unittest.TextTestRunner(**kwargs)

View File

@@ -1,363 +0,0 @@
# encoding: utf-8
import io
import itertools
import json
import os
import random
import re
from httprunner import exceptions, loader, logger, parser, utils
from httprunner.compat import (OrderedDict, basestring, builtin_str,
numeric_types, str)
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
@param (str) content
@return (list) functions list
e.g. ${func(5)} => ["func(5)"]
${func(a=1, b=2)} => ["func(a=1, b=2)"]
/api/1000?_t=${get_timestamp()} => ["get_timestamp()"]
/api/${add(1, 2)} => ["add(1, 2)"]
"/api/${add(1, 2)}?_t=${get_timestamp()}" => ["add(1, 2)", "get_timestamp()"]
"""
try:
return re.findall(function_regexp, content)
except TypeError:
return []
def is_testset(data_structure):
""" check if data_structure is a testset
testset should always be in the following data structure:
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
if not isinstance(data_structure, dict):
return False
if "name" not in data_structure or "testcases" not in data_structure:
return False
if not isinstance(data_structure["testcases"], list):
return False
return True
def is_testsets(data_structure):
""" check if data_structure is testset or testsets
testsets should always be in the following data structure:
testset_dict
or
[
testset_dict_1,
testset_dict_2
]
"""
if not isinstance(data_structure, list):
return is_testset(data_structure)
for item in data_structure:
if not is_testset(item):
return False
return True
def gen_cartesian_product(*args):
""" generate cartesian product for lists
@param
(list) args
[{"a": 1}, {"a": 2}],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
@return
cartesian product in list
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
"""
if not args:
return []
elif len(args) == 1:
return args[0]
product_list = []
for product_item_tuple in itertools.product(*args):
product_item_dict = {}
for item in product_item_tuple:
product_item_dict.update(item)
product_list.append(product_item_dict)
return product_list
def parse_parameters(parameters, testset_path=None):
""" parse parameters and generate cartesian product
@params
(list) parameters: parameter name and value in list
parameter value may be in three types:
(1) data list
(2) call built-in parameterize function
(3) call custom function in debugtalk.py
e.g.
[
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": "${parameterize(account.csv)}"},
{"app_version": "${gen_app_version()}"}
]
(str) testset_path: testset file path, used for locating csv file and debugtalk.py
@return cartesian product in list
"""
testcase_parser = TestcaseParser(file_path=testset_path)
parsed_parameters_list = []
for parameter in parameters:
parameter_name, parameter_content = list(parameter.items())[0]
parameter_name_list = parameter_name.split("-")
if isinstance(parameter_content, list):
# (1) data list
# e.g. {"app_version": ["2.8.5", "2.8.6"]}
# => [{"app_version": "2.8.5", "app_version": "2.8.6"}]
# e.g. {"username-password": [["user1", "111111"], ["test2", "222222"]}
# => [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
parameter_content_list = []
for parameter_item in parameter_content:
if not isinstance(parameter_item, (list, tuple)):
# "2.8.5" => ["2.8.5"]
parameter_item = [parameter_item]
# ["app_version"], ["2.8.5"] => {"app_version": "2.8.5"}
# ["username", "password"], ["user1", "111111"] => {"username": "user1", "password": "111111"}
parameter_content_dict = dict(zip(parameter_name_list, parameter_item))
parameter_content_list.append(parameter_content_dict)
else:
# (2) & (3)
parsed_parameter_content = testcase_parser.eval_content_with_bindings(parameter_content)
# e.g. [{'app_version': '2.8.5'}, {'app_version': '2.8.6'}]
# e.g. [{"username": "user1", "password": "111111"}, {"username": "user2", "password": "222222"}]
if not isinstance(parsed_parameter_content, list):
raise exceptions.ParamsError("parameters syntax error!")
parameter_content_list = [
# get subset by parameter name
{key: parameter_item[key] for key in parameter_name_list}
for parameter_item in parsed_parameter_content
]
parsed_parameters_list.append(parameter_content_list)
return gen_cartesian_product(*parsed_parameters_list)
class TestcaseParser(object):
def __init__(self, variables={}, functions={}, file_path=None):
self.update_binded_variables(variables)
self.bind_functions(functions)
self.file_path = file_path
def update_binded_variables(self, variables):
""" bind variables to current testcase parser
@param (dict) variables, variables binds mapping
{
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"data": {"name": "user", "password": "123456"},
"uuid": 1000
}
"""
self.variables = variables
def bind_functions(self, functions):
""" bind functions to current testcase parser
@param (dict) functions, functions binds mapping
{
"add_two_nums": lambda a, b=1: a + b
}
"""
self.functions = functions
def _get_bind_item(self, item_type, item_name):
if item_type == "function":
if item_name in self.functions:
return self.functions[item_name]
try:
# check if builtin functions
item_func = eval(item_name)
if callable(item_func):
# is builtin function
return item_func
except (NameError, TypeError):
# is not builtin function, continue to search
pass
elif item_type == "variable":
if item_name in self.variables:
return self.variables[item_name]
else:
raise exceptions.ParamsError("bind item should only be function or variable.")
try:
assert self.file_path is not None
return utils.search_conf_item(self.file_path, item_type, item_name)
except (AssertionError, exceptions.FunctionNotFound):
raise exceptions.ParamsError(
"{} is not defined in bind {}s!".format(item_name, item_type))
def get_bind_function(self, func_name):
return self._get_bind_item("function", func_name)
def get_bind_variable(self, variable_name):
return self._get_bind_item("variable", variable_name)
def parameterize(self, csv_file_name, fetch_method="Sequential"):
parameter_file_path = os.path.join(
os.path.dirname(self.file_path),
"{}".format(csv_file_name)
)
csv_content_list = loader.load_file(parameter_file_path)
if fetch_method.lower() == "random":
random.shuffle(csv_content_list)
return csv_content_list
def _eval_content_functions(self, content):
functions_list = extract_functions(content)
for func_content in functions_list:
function_meta = parser.parse_function(func_content)
func_name = function_meta['func_name']
args = function_meta.get('args', [])
kwargs = function_meta.get('kwargs', {})
args = self.eval_content_with_bindings(args)
kwargs = self.eval_content_with_bindings(kwargs)
if func_name in ["parameterize", "P"]:
eval_value = self.parameterize(*args, **kwargs)
else:
func = self.get_bind_function(func_name)
eval_value = func(*args, **kwargs)
func_content = "${" + func_content + "}"
if func_content == content:
# content is a variable
content = eval_value
else:
# content contains one or many variables
content = content.replace(
func_content,
str(eval_value), 1
)
return content
def _eval_content_variables(self, content):
""" replace all variables of string content with mapping value.
@param (str) content
@return (str) parsed content
e.g.
variable_mapping = {
"var_1": "abc",
"var_2": "def"
}
$var_1 => "abc"
$var_1#XYZ => "abc#XYZ"
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = parser.extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_variable(variable_name)
if "${}".format(variable_name) == content:
# content is a variable
content = variable_value
else:
# content contains one or several variables
if not isinstance(variable_value, str):
variable_value = builtin_str(variable_value)
content = content.replace(
"${}".format(variable_name),
variable_value, 1
)
return content
def eval_content_with_bindings(self, content):
""" parse content recursively, each variable and function in content will be evaluated.
@param (dict) content in any data structure
{
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1, 1)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
@return (dict) parsed content with evaluated bind values
{
"url": "http://127.0.0.1:5000/api/users/1000/2",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"sum": 3
},
"body": {"name": "user", "password": "123456"}
}
"""
if content is None:
return None
if isinstance(content, (list, tuple)):
return [
self.eval_content_with_bindings(item)
for item in content
]
if isinstance(content, dict):
evaluated_data = {}
for key, value in content.items():
eval_key = self.eval_content_with_bindings(key)
eval_value = self.eval_content_with_bindings(value)
evaluated_data[eval_key] = eval_value
return evaluated_data
if isinstance(content, basestring):
# content is in string format here
content = content.strip()
# replace functions with evaluated value
# Notice: _eval_content_functions must be called before _eval_content_variables
content = self._eval_content_functions(content)
# replace variables with binding value
content = self._eval_content_variables(content)
return content

View File

@@ -1,23 +1,18 @@
# encoding: utf-8
import collections
import copy
import hashlib
import hmac
import imp
import importlib
import io
import itertools
import json
import os.path
import random
import string
import types
from datetime import datetime
from httprunner import exceptions, logger, parser
from httprunner.compat import (OrderedDict, basestring, builtin_str, is_py2,
is_py3, numeric_types, str)
from requests.structures import CaseInsensitiveDict
from httprunner import exceptions, logger
from httprunner.compat import OrderedDict, basestring, is_py2
SECRET_KEY = "DebugTalk"
@@ -43,6 +38,14 @@ def remove_prefix(text, prefix):
return text
def set_os_environ(variables_mapping):
""" set variables mapping to os.environ
"""
for variable in variables_mapping:
os.environ[variable] = variables_mapping[variable]
logger.log_debug("Loaded variable: {}".format(variable))
def query_json(json_content, query, delimiter='.'):
""" Do an xpath-like query with json_content.
@param (dict/list/string) json_content
@@ -87,204 +90,6 @@ def query_json(json_content, query, delimiter='.'):
return json_content
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
# TODO: refactor type check
if isinstance(content, bool):
return content
if isinstance(content, (numeric_types, type)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
if not isinstance(value, str):
value = builtin_str(value)
content = content.replace(var, value)
return content
def _get_validators_mapping(validators):
""" get validators mapping from api or test validators
@param (list) validators:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
@return
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parser.parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def _merge_validator(def_validators, current_validators):
""" merge def_validators with current_validators
@params:
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not def_validators:
return current_validators
elif not current_validators:
return def_validators
else:
api_validators_mapping = _get_validators_mapping(def_validators)
test_validators_mapping = _get_validators_mapping(current_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def _merge_extractor(def_extrators, current_extractors):
""" merge def_extrators with current_extractors
@params:
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not def_extrators:
return current_extractors
elif not current_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in current_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def _override_block(def_block, current_block):
""" override def_block with current_block
@param def_block:
{
"name": "get token",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
@param current_block:
{
"name": "get token",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
def_validators = def_block.get("validate") or def_block.get("validators", [])
current_validators = current_block.get("validate") or current_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
current_extractors = current_block.get("extract") \
or current_block.get("extractors") \
or current_block.get("extract_binds", [])
current_block.update(def_block)
current_block["validate"] = _merge_validator(
def_validators,
current_validators
)
current_block["extract"] = _merge_extractor(
def_extrators,
current_extractors
)
def get_uniform_comparator(comparator):
""" convert comparator alias to uniform name
"""
@@ -338,86 +143,6 @@ def deep_update_dict(origin_dict, override_dict):
return origin_dict
def is_function(tup):
""" Takes (name, object) tuple, returns True if it is a function.
"""
name, item = tup
return isinstance(item, types.FunctionType)
def is_variable(tup):
""" Takes (name, object) tuple, returns True if it is a variable.
"""
name, item = tup
if callable(item):
# function or class
return False
if isinstance(item, types.ModuleType):
# imported module
return False
if name.startswith("_"):
# private property
return False
return True
def get_imported_module(module_name):
""" import module and return imported module
"""
return importlib.import_module(module_name)
def get_imported_module_from_file(file_path):
""" import module from python file path and return imported module
"""
if is_py3:
imported_module = importlib.machinery.SourceFileLoader(
'module_name', file_path).load_module()
elif is_py2:
imported_module = imp.load_source('module_name', file_path)
else:
raise RuntimeError("Neither Python 3 nor Python 2.")
return imported_module
def filter_module(module, filter_type):
""" filter functions or variables from import module
@params
module: imported module
filter_type: "function" or "variable"
"""
filter_type = is_function if filter_type == "function" else is_variable
module_functions_dict = dict(filter(filter_type, vars(module).items()))
return module_functions_dict
def search_conf_item(start_path, item_type, item_name):
""" search expected function or variable recursive upward
@param
start_path: search start path
item_type: "function" or "variable"
item_name: function name or variable name
"""
dir_path = os.path.dirname(os.path.abspath(start_path))
target_file = os.path.join(dir_path, "debugtalk.py")
if os.path.isfile(target_file):
imported_module = get_imported_module_from_file(target_file)
items_dict = filter_module(imported_module, item_type)
if item_name in items_dict:
return items_dict[item_name]
else:
return search_conf_item(dir_path, item_type, item_name)
if dir_path == start_path:
# system root path
err_msg = "{} not found in recursive upward path!".format(item_name)
if item_type == "function":
raise exceptions.FunctionNotFound(err_msg)
else:
raise exceptions.VariableNotFound(err_msg)
return search_conf_item(dir_path, item_type, item_name)
def lower_dict_keys(origin_dict):
""" convert keys in dict to lower case
e.g.
@@ -577,6 +302,40 @@ def create_scaffold(project_path):
logger.color_print(msg, "BLUE")
def gen_cartesian_product(*args):
""" generate cartesian product for lists
@param
(list) args
[{"a": 1}, {"a": 2}],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
@return
cartesian product in list
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
"""
if not args:
return []
elif len(args) == 1:
return args[0]
product_list = []
for product_item_tuple in itertools.product(*args):
product_item_dict = {}
for item in product_item_tuple:
product_item_dict.update(item)
product_list.append(product_item_dict)
return product_list
def validate_json_file(file_list):
""" validate JSON testset format
"""

78
httprunner/validator.py Normal file
View File

@@ -0,0 +1,78 @@
# encoding: utf-8
import types
""" validate data format
TODO: refactor with JSON schema validate
"""
def is_testcase(data_structure):
""" check if data_structure is a testcase
testcase should always be in the following data structure:
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
if not isinstance(data_structure, dict):
return False
if "name" not in data_structure or "testcases" not in data_structure:
return False
if not isinstance(data_structure["testcases"], list):
return False
return True
def is_testcases(data_structure):
""" check if data_structure is testcase or testcases list
testsets should always be in the following data structure:
testset_dict
or
[
testset_dict_1,
testset_dict_2
]
"""
if not isinstance(data_structure, list):
return is_testcase(data_structure)
for item in data_structure:
if not is_testcase(item):
return False
return True
###############################################################################
## validate varibles and functions
###############################################################################
def is_function(tup):
""" Takes (name, object) tuple, returns True if it is a function.
"""
name, item = tup
return isinstance(item, types.FunctionType)
def is_variable(tup):
""" Takes (name, object) tuple, returns True if it is a variable.
"""
name, item = tup
if callable(item):
# function or class
return False
if isinstance(item, types.ModuleType):
# imported module
return False
if name.startswith("_"):
# private property
return False
return True

View File

@@ -90,7 +90,8 @@ setup(
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6'
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7'
],
entry_points={
'console_scripts': [

View File

@@ -1,17 +1,17 @@
import os
import time
import unittest
import requests
from httprunner import exceptions, loader, response, runner, testcase
from httprunner.context import Context
from httprunner import context, exceptions, loader, parser, response, runner
from httprunner.utils import gen_md5
from tests.base import ApiServerUnittest
class VariableBindsUnittest(ApiServerUnittest):
class TestContext(ApiServerUnittest):
def setUp(self):
self.context = Context()
self.context = context.Context()
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo_binds.yml')
self.testcases = loader.load_file(testcase_file_path)
@@ -250,7 +250,7 @@ class VariableBindsUnittest(ApiServerUnittest):
variables = []
self.context.bind_variables(variables)
with self.assertRaises(exceptions.ParamsError):
with self.assertRaises(exceptions.VariableNotFound):
self.context.validate(validators, resp_obj)
# expected value missed in variables mapping
@@ -261,3 +261,324 @@ class VariableBindsUnittest(ApiServerUnittest):
with self.assertRaises(exceptions.ValidationFailure):
self.context.validate(validators, resp_obj)
class TestTestcaseParser(unittest.TestCase):
def test_eval_content_variables(self):
variables = {
"var_1": "abc",
"var_2": "def",
"var_3": 123,
"var_4": {"a": 1},
"var_5": True,
"var_6": None
}
testcase_parser = context.TestcaseParser(variables=variables)
self.assertEqual(
testcase_parser._eval_content_variables("$var_1"),
"abc"
)
self.assertEqual(
testcase_parser._eval_content_variables("var_1"),
"var_1"
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_1#XYZ"),
"abc#XYZ"
)
self.assertEqual(
testcase_parser._eval_content_variables("/$var_1/$var_2/var3"),
"/abc/def/var3"
)
self.assertEqual(
testcase_parser._eval_content_variables("/$var_1/$var_2/$var_1"),
"/abc/def/abc"
)
self.assertEqual(
testcase_parser._eval_content_variables("${func($var_1, $var_2, xyz)}"),
"${func(abc, def, xyz)}"
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_3"),
123
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_4"),
{"a": 1}
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_5"),
True
)
self.assertEqual(
testcase_parser._eval_content_variables("abc$var_5"),
"abcTrue"
)
self.assertEqual(
testcase_parser._eval_content_variables("abc$var_4"),
"abc{'a': 1}"
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_6"),
None
)
def test_eval_content_variables_search_upward(self):
testcase_parser = context.TestcaseParser()
with self.assertRaises(exceptions.VariableNotFound):
testcase_parser._eval_content_variables("/api/$SECRET_KEY")
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
content = testcase_parser._eval_content_variables("/api/$SECRET_KEY")
self.assertEqual(content, "/api/DebugTalk")
def test_parse_content_with_bindings_variables(self):
variables = {
"str_1": "str_value1",
"str_2": "str_value2"
}
testcase_parser = context.TestcaseParser(variables=variables)
self.assertEqual(
testcase_parser.eval_content_with_bindings("$str_1"),
"str_value1"
)
self.assertEqual(
testcase_parser.eval_content_with_bindings("123$str_1/456"),
"123str_value1/456"
)
with self.assertRaises(exceptions.VariableNotFound):
testcase_parser.eval_content_with_bindings("$str_3")
self.assertEqual(
testcase_parser.eval_content_with_bindings(["$str_1", "str3"]),
["str_value1", "str3"]
)
self.assertEqual(
testcase_parser.eval_content_with_bindings({"key": "$str_1"}),
{"key": "str_value1"}
)
def test_parse_content_with_bindings_multiple_identical_variables(self):
variables = {
"userid": 100,
"data": 1498
}
testcase_parser = context.TestcaseParser(variables=variables)
content = "/users/$userid/training/$data?userId=$userid&data=$data"
self.assertEqual(
testcase_parser.eval_content_with_bindings(content),
"/users/100/training/1498?userId=100&data=1498"
)
def test_parse_variables_multiple_identical_variables(self):
variables = {
"user": 100,
"userid": 1000,
"data": 1498
}
testcase_parser = context.TestcaseParser(variables=variables)
content = "/users/$user/$userid/$data?userId=$userid&data=$data"
self.assertEqual(
testcase_parser.eval_content_with_bindings(content),
"/users/100/1000/1498?userId=1000&data=1498"
)
def test_parse_content_with_bindings_functions(self):
import random, string
functions = {
"gen_random_string": lambda str_len: ''.join(random.choice(string.ascii_letters + string.digits) \
for _ in range(str_len))
}
testcase_parser = context.TestcaseParser(functions=functions)
result = testcase_parser.eval_content_with_bindings("${gen_random_string(5)}")
self.assertEqual(len(result), 5)
add_two_nums = lambda a, b=1: a + b
functions["add_two_nums"] = add_two_nums
self.assertEqual(
testcase_parser.eval_content_with_bindings("${add_two_nums(1)}"),
2
)
self.assertEqual(
testcase_parser.eval_content_with_bindings("${add_two_nums(1, 2)}"),
3
)
def test_extract_functions(self):
self.assertEqual(
parser.extract_functions("${func()}"),
["func()"]
)
self.assertEqual(
parser.extract_functions("${func(5)}"),
["func(5)"]
)
self.assertEqual(
parser.extract_functions("${func(a=1, b=2)}"),
["func(a=1, b=2)"]
)
self.assertEqual(
parser.extract_functions("${func(1, $b, c=$x, d=4)}"),
["func(1, $b, c=$x, d=4)"]
)
self.assertEqual(
parser.extract_functions("/api/1000?_t=${get_timestamp()}"),
["get_timestamp()"]
)
self.assertEqual(
parser.extract_functions("/api/${add(1, 2)}"),
["add(1, 2)"]
)
self.assertEqual(
parser.extract_functions("/api/${add(1, 2)}?_t=${get_timestamp()}"),
["add(1, 2)", "get_timestamp()"]
)
self.assertEqual(
parser.extract_functions("abc${func(1, 2, a=3, b=4)}def"),
["func(1, 2, a=3, b=4)"]
)
def test_eval_content_functions(self):
functions = {
"add_two_nums": lambda a, b=1: a + b
}
testcase_parser = context.TestcaseParser(functions=functions)
self.assertEqual(
testcase_parser._eval_content_functions("${add_two_nums(1, 2)}"),
3
)
self.assertEqual(
testcase_parser._eval_content_functions("/api/${add_two_nums(1, 2)}"),
"/api/3"
)
def test_eval_content_functions_search_upward(self):
testcase_parser = context.TestcaseParser()
with self.assertRaises(exceptions.FunctionNotFound):
testcase_parser._eval_content_functions("/api/${gen_md5(abc)}")
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
content = testcase_parser._eval_content_functions("/api/${gen_md5(abc)}")
self.assertEqual(content, "/api/900150983cd24fb0d6963f7d28e17f72")
def test_parse_content_with_bindings_testcase(self):
variables = {
"uid": "1000",
"random": "A2dEx",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"data": {"name": "user", "password": "123456"}
}
functions = {
"add_two_nums": lambda a, b=1: a + b,
"get_timestamp": lambda: int(time.time() * 1000)
}
testcase_template = {
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1,2)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
parsed_testcase = context.TestcaseParser(variables, functions)\
.eval_content_with_bindings(testcase_template)
self.assertEqual(
parsed_testcase["url"],
"http://127.0.0.1:5000/api/users/1000/3"
)
self.assertEqual(
parsed_testcase["headers"]["authorization"],
variables["authorization"]
)
self.assertEqual(
parsed_testcase["headers"]["random"],
variables["random"]
)
self.assertEqual(
parsed_testcase["body"],
variables["data"]
)
self.assertEqual(
parsed_testcase["headers"]["sum"],
3
)
def test_parse_parameters_raw_list(self):
parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": [("user1", "111111"), ["test2", "222222"]]}
]
cartesian_product_parameters = context.parse_parameters(parameters)
self.assertEqual(
len(cartesian_product_parameters),
3 * 2
)
self.assertEqual(
cartesian_product_parameters[0],
{'user_agent': 'iOS/10.1', 'username': 'user1', 'password': '111111'}
)
def test_parse_parameters_parameterize(self):
parameters = [
{"app_version": "${parameterize(app_version.csv)}"},
{"username-password": "${parameterize(account.csv)}"}
]
testset_path = os.path.join(
os.getcwd(),
"tests/data/demo_parameters.yml"
)
cartesian_product_parameters = context.parse_parameters(
parameters,
testset_path
)
self.assertEqual(
len(cartesian_product_parameters),
2 * 3
)
def test_parse_parameters_custom_function(self):
parameters = [
{"app_version": "${gen_app_version()}"},
{"username-password": "${get_account()}"}
]
testset_path = os.path.join(
os.getcwd(),
"tests/data/demo_parameters.yml"
)
cartesian_product_parameters = context.parse_parameters(
parameters,
testset_path
)
self.assertEqual(
len(cartesian_product_parameters),
2 * 2
)
def test_parse_parameters_mix(self):
parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"app_version": "${gen_app_version()}"},
{"username-password": "${parameterize(account.csv)}"}
]
testset_path = os.path.join(
os.getcwd(),
"tests/data/demo_parameters.yml"
)
cartesian_product_parameters = context.parse_parameters(
parameters,
testset_path
)
self.assertEqual(
len(cartesian_product_parameters),
3 * 2 * 3
)

View File

@@ -1,7 +1,7 @@
import os
import unittest
from httprunner import exceptions, loader, utils
from httprunner import exceptions, loader, validator
class TestFileLoader(unittest.TestCase):
@@ -132,15 +132,90 @@ class TestFileLoader(unittest.TestCase):
self.assertEqual([], files)
def test_load_dot_env_file(self):
self.assertNotIn("PROJECT_KEY", os.environ)
loader.load_dot_env_file("tests/data/test.env")
self.assertIn("PROJECT_KEY", os.environ)
self.assertEqual(os.environ["UserName"], "debugtalk")
env_variables_mapping = loader.load_dot_env_file("tests/data/test.env")
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "debugtalk")
def test_load_env_path_not_exist(self):
with self.assertRaises(exceptions.FileNotFound):
loader.load_dot_env_file("not_exist.env")
def test_locate_file(self):
with self.assertRaises(exceptions.FileNotFound):
loader.locate_file(os.getcwd(), "debugtalk.py")
with self.assertRaises(exceptions.FileNotFound):
loader.locate_file("", "debugtalk.py")
start_path = os.path.join(os.getcwd(), "tests")
self.assertEqual(
loader.locate_file(start_path, "debugtalk.py"),
"tests/debugtalk.py"
)
self.assertEqual(
loader.locate_file("tests/", "debugtalk.py"),
"tests/debugtalk.py"
)
self.assertEqual(
loader.locate_file("tests", "debugtalk.py"),
"tests/debugtalk.py"
)
self.assertEqual(
loader.locate_file("tests/base.py", "debugtalk.py"),
"tests/debugtalk.py"
)
self.assertEqual(
loader.locate_file("tests/data/test.env", "debugtalk.py"),
"tests/debugtalk.py"
)
class TestModuleLoader(unittest.TestCase):
def test_filter_module_functions(self):
module_mapping = loader.load_python_module(loader)
functions_dict = module_mapping["functions"]
self.assertIn("load_python_module", functions_dict)
self.assertNotIn("is_py3", functions_dict)
def test_load_debugtalk_module(self):
imported_module_items = loader.load_debugtalk_module()
self.assertEqual(imported_module_items["functions"], {})
self.assertEqual(imported_module_items["variables"], {})
imported_module_items = loader.load_debugtalk_module("tests")
self.assertEqual(
imported_module_items["variables"]["SECRET_KEY"],
"DebugTalk"
)
self.assertIn("alter_response", imported_module_items["functions"])
is_status_code_200 = imported_module_items["functions"]["is_status_code_200"]
self.assertTrue(is_status_code_200(200))
self.assertFalse(is_status_code_200(500))
def test_get_module_item_functions(self):
from httprunner import utils
module_mapping = loader.load_python_module(utils)
gen_md5 = loader.get_module_item(module_mapping, "functions", "gen_md5")
self.assertTrue(validator.is_function(("gen_md5", gen_md5)))
self.assertEqual(gen_md5("abc"), "900150983cd24fb0d6963f7d28e17f72")
with self.assertRaises(exceptions.FunctionNotFound):
loader.get_module_item(module_mapping, "functions", "gen_md4")
def test_get_module_item_variables(self):
from httprunner import utils
module_mapping = loader.load_python_module(utils)
SECRET_KEY = loader.get_module_item(module_mapping, "variables", "SECRET_KEY")
self.assertTrue(validator.is_variable(("SECRET_KEY", SECRET_KEY)))
self.assertEqual(SECRET_KEY, "DebugTalk")
with self.assertRaises(exceptions.VariableNotFound):
loader.get_module_item(module_mapping, "variables", "SECRET_KEY2")
class TestSuiteLoader(unittest.TestCase):
@@ -151,13 +226,13 @@ class TestSuiteLoader(unittest.TestCase):
}
def test_load_test_dependencies(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
overall_def_dict = loader.overall_def_dict
self.assertIn("get_token", overall_def_dict["api"])
self.assertIn("create_and_check", overall_def_dict["suite"])
def test_load_api_file(self):
loader.load_api_file("tests/api/basic.yml")
loader._load_api_file("tests/api/basic.yml")
overall_api_def_dict = loader.overall_def_dict["api"]
self.assertIn("get_token",overall_api_def_dict)
self.assertEqual("/api/get-token", overall_api_def_dict["get_token"]["request"]["url"])
@@ -165,16 +240,16 @@ class TestSuiteLoader(unittest.TestCase):
self.assertEqual(len(overall_api_def_dict["get_token"]["validate"]), 3)
def test_load_test_file_suite(self):
loader.load_api_file("tests/api/basic.yml")
testset = loader.load_test_file("tests/suite/create_and_get.yml")
loader._load_api_file("tests/api/basic.yml")
testset = loader._load_test_file("tests/suite/create_and_get.yml")
self.assertEqual(testset["config"]["name"], "create user and check result.")
self.assertEqual(len(testset["testcases"]), 3)
self.assertEqual(testset["testcases"][0]["name"], "make sure user $uid does not exist")
self.assertEqual(testset["testcases"][0]["request"]["url"], "/api/users/$uid")
def test_load_test_file_testcase(self):
loader.load_test_dependencies()
testset = loader.load_test_file("tests/testcases/smoketest.yml")
loader._load_test_dependencies()
testset = loader._load_test_file("tests/testcases/smoketest.yml")
self.assertEqual(testset["config"]["name"], "smoketest")
self.assertEqual(testset["config"]["path"], "tests/testcases/smoketest.yml")
self.assertIn("device_sn", testset["config"]["variables"][0])
@@ -182,7 +257,7 @@ class TestSuiteLoader(unittest.TestCase):
self.assertEqual(testset["testcases"][0]["name"], "get token")
def test_get_block_by_name(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
ref_call = "get_user($uid, $token)"
block = loader._get_block_by_name(ref_call, "api")
self.assertEqual(block["request"]["url"], "/api/users/$uid")
@@ -190,13 +265,13 @@ class TestSuiteLoader(unittest.TestCase):
self.assertEqual(block["function_meta"]["args"], ['$uid', '$token'])
def test_get_block_by_name_args_mismatch(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
ref_call = "get_user($uid, $token, $var)"
with self.assertRaises(exceptions.ParamsError):
loader._get_block_by_name(ref_call, "api")
def test_override_block(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
def_block = loader._get_block_by_name("get_token($user_agent, $device_sn, $os_platform, $app_version)", "api")
test_block = {
"name": "override block",
@@ -211,13 +286,13 @@ class TestSuiteLoader(unittest.TestCase):
]
}
utils._override_block(def_block, test_block)
loader._override_block(def_block, test_block)
self.assertEqual(test_block["name"], "override block")
self.assertIn({'check': 'status_code', 'expect': 201, 'comparator': 'eq'}, test_block["validate"])
self.assertIn({'check': 'content.token', 'comparator': 'len_eq', 'expect': 32}, test_block["validate"])
def test_get_test_definition_api(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
api_def = loader._get_test_definition("get_headers", "api")
self.assertEqual(api_def["request"]["url"], "/headers")
self.assertEqual(len(api_def["setup_hooks"]), 2)
@@ -227,13 +302,70 @@ class TestSuiteLoader(unittest.TestCase):
loader._get_test_definition("get_token_XXX", "api")
def test_get_test_definition_suite(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
api_def = loader._get_test_definition("create_and_check", "suite")
self.assertEqual(api_def["config"]["name"], "create user and check result.")
with self.assertRaises(exceptions.SuiteNotFound):
with self.assertRaises(exceptions.TestcaseNotFound):
loader._get_test_definition("create_and_check_XXX", "suite")
def test_merge_validator(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
merged_validators = loader._merge_validator(def_validators, current_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "eq"},
merged_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "len_eq"},
merged_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "len_eq"},
merged_validators
)
def test_merge_validator_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
merged_validators = loader._merge_validator(def_validators, current_validators)
self.assertEqual(len(merged_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'eq'}, merged_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'eq'}, merged_validators)
def test_merge_extractor(self):
api_extrators = [{"var1": "val1"}, {"var2": "val2"}]
current_extractors = [{"var1": "val111"}, {"var3": "val3"}]
merged_extractors = loader._merge_extractor(api_extrators, current_extractors)
self.assertIn(
{"var1": "val111"},
merged_extractors
)
self.assertIn(
{"var2": "val2"},
merged_extractors
)
self.assertIn(
{"var3": "val3"},
merged_extractors
)
def test_load_testcases_by_path_files(self):
testsets_list = []
@@ -276,7 +408,7 @@ class TestSuiteLoader(unittest.TestCase):
self.assertIn('method', test['request'])
def test_load_testcases_by_path_folder(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data')
testset_list_1 = loader.load_testcases(path)
@@ -315,7 +447,7 @@ class TestSuiteLoader(unittest.TestCase):
loader.load_testcases(path)
def test_load_testcases_by_path_layered(self):
loader.load_test_dependencies()
loader._load_test_dependencies()
path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_layer.yml')
testsets_list = loader.load_testcases(path)

View File

@@ -1,6 +1,8 @@
import os
import time
import unittest
from httprunner import parser, exceptions
from httprunner import exceptions, parser
class TestParser(unittest.TestCase):
@@ -112,3 +114,30 @@ class TestParser(unittest.TestCase):
parser.parse_validator(validator),
{"check": "status_code", "comparator": "eq", "expect": 201}
)
def test_parse_data(self):
content = {
'request': {
'url': '/api/users/$uid',
'method': "$method",
'headers': {'token': '$token'},
'data': {
"null": None,
"true": True,
"false": False,
"empty_str": ""
}
}
}
mapping = {
"$uid": 1000,
"$method": "POST"
}
result = parser.parse_data(content, mapping)
self.assertEqual("/api/users/1000", result["request"]["url"])
self.assertEqual("$token", result["request"]["headers"]["token"])
self.assertEqual("POST", result["request"]["method"])
self.assertIsNone(result["request"]["data"]["null"])
self.assertTrue(result["request"]["data"]["true"])
self.assertFalse(result["request"]["data"]["false"])
self.assertEqual("", result["request"]["data"]["empty_str"])

View File

@@ -1,5 +1,5 @@
import requests
from httprunner import exceptions, response, utils
from httprunner import built_in, exceptions, loader, response
from httprunner.compat import basestring, bytes
from tests.base import HTTPBIN_SERVER, ApiServerUnittest
@@ -7,8 +7,8 @@ from tests.base import HTTPBIN_SERVER, ApiServerUnittest
class TestResponse(ApiServerUnittest):
def setUp(self):
imported_module = utils.get_imported_module("httprunner.built_in")
self.functions_mapping = utils.filter_module(imported_module, "function")
module_mapping = loader.load_python_module(built_in)
self.functions_mapping = module_mapping["functions"]
def test_parse_response_object_json(self):
url = "http://127.0.0.1:5000/api/users"

View File

@@ -160,7 +160,7 @@ class TestRunner(ApiServerUnittest):
end_time = time.time()
summary = runner.summary
self.assertTrue(summary["success"])
self.assertLess(end_time - start_time, 3)
self.assertLess(end_time - start_time, 10)
def test_run_httprunner_with_teardown_hooks_alter_response(self):
testsets = [

View File

@@ -16,7 +16,7 @@ class TestTask(ApiServerUnittest):
def test_create_suite(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo_testset_variables.yml')
testset = loader.load_test_file(testcase_file_path)
testset = loader._load_test_file(testcase_file_path)
suite = task.TestSuite(testset)
self.assertEqual(suite.countTestCases(), 3)
for testcase in suite:

View File

@@ -1,399 +0,0 @@
import os
import time
import unittest
from httprunner import exceptions, loader, testcase
class TestcaseParserUnittest(unittest.TestCase):
def test_cartesian_product_one(self):
parameters_content_list = [
[
{"a": 1},
{"a": 2}
]
]
product_list = testcase.gen_cartesian_product(*parameters_content_list)
self.assertEqual(
product_list,
[
{"a": 1},
{"a": 2}
]
)
def test_cartesian_product_multiple(self):
parameters_content_list = [
[
{"a": 1},
{"a": 2}
],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
]
product_list = testcase.gen_cartesian_product(*parameters_content_list)
self.assertEqual(
product_list,
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
)
def test_cartesian_product_empty(self):
parameters_content_list = []
product_list = testcase.gen_cartesian_product(*parameters_content_list)
self.assertEqual(product_list, [])
def test_parse_parameters_raw_list(self):
parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"username-password": [("user1", "111111"), ["test2", "222222"]]}
]
cartesian_product_parameters = testcase.parse_parameters(parameters)
self.assertEqual(
len(cartesian_product_parameters),
3 * 2
)
self.assertEqual(
cartesian_product_parameters[0],
{'user_agent': 'iOS/10.1', 'username': 'user1', 'password': '111111'}
)
def test_parse_parameters_parameterize(self):
parameters = [
{"app_version": "${parameterize(app_version.csv)}"},
{"username-password": "${parameterize(account.csv)}"}
]
testset_path = os.path.join(
os.getcwd(),
"tests/data/demo_parameters.yml"
)
cartesian_product_parameters = testcase.parse_parameters(
parameters,
testset_path
)
self.assertEqual(
len(cartesian_product_parameters),
2 * 3
)
def test_parse_parameters_custom_function(self):
parameters = [
{"app_version": "${gen_app_version()}"},
{"username-password": "${get_account()}"}
]
testset_path = os.path.join(
os.getcwd(),
"tests/data/demo_parameters.yml"
)
cartesian_product_parameters = testcase.parse_parameters(
parameters,
testset_path
)
self.assertEqual(
len(cartesian_product_parameters),
2 * 2
)
def test_parse_parameters_mix(self):
parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
{"app_version": "${gen_app_version()}"},
{"username-password": "${parameterize(account.csv)}"}
]
testset_path = os.path.join(
os.getcwd(),
"tests/data/demo_parameters.yml"
)
cartesian_product_parameters = testcase.parse_parameters(
parameters,
testset_path
)
self.assertEqual(
len(cartesian_product_parameters),
3 * 2 * 3
)
def test_eval_content_variables(self):
variables = {
"var_1": "abc",
"var_2": "def",
"var_3": 123,
"var_4": {"a": 1},
"var_5": True,
"var_6": None
}
testcase_parser = testcase.TestcaseParser(variables=variables)
self.assertEqual(
testcase_parser._eval_content_variables("$var_1"),
"abc"
)
self.assertEqual(
testcase_parser._eval_content_variables("var_1"),
"var_1"
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_1#XYZ"),
"abc#XYZ"
)
self.assertEqual(
testcase_parser._eval_content_variables("/$var_1/$var_2/var3"),
"/abc/def/var3"
)
self.assertEqual(
testcase_parser._eval_content_variables("/$var_1/$var_2/$var_1"),
"/abc/def/abc"
)
self.assertEqual(
testcase_parser._eval_content_variables("${func($var_1, $var_2, xyz)}"),
"${func(abc, def, xyz)}"
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_3"),
123
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_4"),
{"a": 1}
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_5"),
True
)
self.assertEqual(
testcase_parser._eval_content_variables("abc$var_5"),
"abcTrue"
)
self.assertEqual(
testcase_parser._eval_content_variables("abc$var_4"),
"abc{'a': 1}"
)
self.assertEqual(
testcase_parser._eval_content_variables("$var_6"),
None
)
def test_eval_content_variables_search_upward(self):
testcase_parser = testcase.TestcaseParser()
with self.assertRaises(exceptions.ParamsError):
testcase_parser._eval_content_variables("/api/$SECRET_KEY")
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
content = testcase_parser._eval_content_variables("/api/$SECRET_KEY")
self.assertEqual(content, "/api/DebugTalk")
def test_parse_content_with_bindings_variables(self):
variables = {
"str_1": "str_value1",
"str_2": "str_value2"
}
testcase_parser = testcase.TestcaseParser(variables=variables)
self.assertEqual(
testcase_parser.eval_content_with_bindings("$str_1"),
"str_value1"
)
self.assertEqual(
testcase_parser.eval_content_with_bindings("123$str_1/456"),
"123str_value1/456"
)
with self.assertRaises(exceptions.ParamsError):
testcase_parser.eval_content_with_bindings("$str_3")
self.assertEqual(
testcase_parser.eval_content_with_bindings(["$str_1", "str3"]),
["str_value1", "str3"]
)
self.assertEqual(
testcase_parser.eval_content_with_bindings({"key": "$str_1"}),
{"key": "str_value1"}
)
def test_parse_content_with_bindings_multiple_identical_variables(self):
variables = {
"userid": 100,
"data": 1498
}
testcase_parser = testcase.TestcaseParser(variables=variables)
content = "/users/$userid/training/$data?userId=$userid&data=$data"
self.assertEqual(
testcase_parser.eval_content_with_bindings(content),
"/users/100/training/1498?userId=100&data=1498"
)
def test_parse_variables_multiple_identical_variables(self):
variables = {
"user": 100,
"userid": 1000,
"data": 1498
}
testcase_parser = testcase.TestcaseParser(variables=variables)
content = "/users/$user/$userid/$data?userId=$userid&data=$data"
self.assertEqual(
testcase_parser.eval_content_with_bindings(content),
"/users/100/1000/1498?userId=1000&data=1498"
)
def test_parse_content_with_bindings_functions(self):
import random, string
functions = {
"gen_random_string": lambda str_len: ''.join(random.choice(string.ascii_letters + string.digits) \
for _ in range(str_len))
}
testcase_parser = testcase.TestcaseParser(functions=functions)
result = testcase_parser.eval_content_with_bindings("${gen_random_string(5)}")
self.assertEqual(len(result), 5)
add_two_nums = lambda a, b=1: a + b
functions["add_two_nums"] = add_two_nums
self.assertEqual(
testcase_parser.eval_content_with_bindings("${add_two_nums(1)}"),
2
)
self.assertEqual(
testcase_parser.eval_content_with_bindings("${add_two_nums(1, 2)}"),
3
)
def test_extract_functions(self):
self.assertEqual(
testcase.extract_functions("${func()}"),
["func()"]
)
self.assertEqual(
testcase.extract_functions("${func(5)}"),
["func(5)"]
)
self.assertEqual(
testcase.extract_functions("${func(a=1, b=2)}"),
["func(a=1, b=2)"]
)
self.assertEqual(
testcase.extract_functions("${func(1, $b, c=$x, d=4)}"),
["func(1, $b, c=$x, d=4)"]
)
self.assertEqual(
testcase.extract_functions("/api/1000?_t=${get_timestamp()}"),
["get_timestamp()"]
)
self.assertEqual(
testcase.extract_functions("/api/${add(1, 2)}"),
["add(1, 2)"]
)
self.assertEqual(
testcase.extract_functions("/api/${add(1, 2)}?_t=${get_timestamp()}"),
["add(1, 2)", "get_timestamp()"]
)
self.assertEqual(
testcase.extract_functions("abc${func(1, 2, a=3, b=4)}def"),
["func(1, 2, a=3, b=4)"]
)
def test_eval_content_functions(self):
functions = {
"add_two_nums": lambda a, b=1: a + b
}
testcase_parser = testcase.TestcaseParser(functions=functions)
self.assertEqual(
testcase_parser._eval_content_functions("${add_two_nums(1, 2)}"),
3
)
self.assertEqual(
testcase_parser._eval_content_functions("/api/${add_two_nums(1, 2)}"),
"/api/3"
)
def test_eval_content_functions_search_upward(self):
testcase_parser = testcase.TestcaseParser()
with self.assertRaises(exceptions.ParamsError):
testcase_parser._eval_content_functions("/api/${gen_md5(abc)}")
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
content = testcase_parser._eval_content_functions("/api/${gen_md5(abc)}")
self.assertEqual(content, "/api/900150983cd24fb0d6963f7d28e17f72")
def test_parse_content_with_bindings_testcase(self):
variables = {
"uid": "1000",
"random": "A2dEx",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"data": {"name": "user", "password": "123456"}
}
functions = {
"add_two_nums": lambda a, b=1: a + b,
"get_timestamp": lambda: int(time.time() * 1000)
}
testcase_template = {
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1,2)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
parsed_testcase = testcase.TestcaseParser(variables, functions)\
.eval_content_with_bindings(testcase_template)
self.assertEqual(
parsed_testcase["url"],
"http://127.0.0.1:5000/api/users/1000/3"
)
self.assertEqual(
parsed_testcase["headers"]["authorization"],
variables["authorization"]
)
self.assertEqual(
parsed_testcase["headers"]["random"],
variables["random"]
)
self.assertEqual(
parsed_testcase["body"],
variables["data"]
)
self.assertEqual(
parsed_testcase["headers"]["sum"],
3
)
def test_is_testsets(self):
data_structure = "path/to/file"
self.assertFalse(testcase.is_testsets(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(testcase.is_testsets(data_structure))
data_structure = {
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
}
self.assertTrue(data_structure)
data_structure = [
{
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
},
{
"name": "desc2",
"config": {},
"api": {},
"testcases": ["testcase21", "testcase22"]
}
]
self.assertTrue(data_structure)

View File

@@ -1,8 +1,7 @@
import os
import shutil
import unittest
from httprunner import exceptions, utils
from httprunner import exceptions, loader, utils
from httprunner.compat import OrderedDict
from tests.base import ApiServerUnittest
@@ -17,6 +16,15 @@ class TestUtils(ApiServerUnittest):
"/post/123"
)
def test_set_os_environ(self):
self.assertNotIn("abc", os.environ)
variables_mapping = {
"abc": "123"
}
utils.set_os_environ(variables_mapping)
self.assertIn("abc", os.environ)
self.assertEqual(os.environ["abc"], "123")
def test_query_json(self):
json_content = {
"ids": [1, 2, 3, 4],
@@ -61,90 +69,6 @@ class TestUtils(ApiServerUnittest):
result = utils.query_json(json_content, query)
self.assertEqual(result, "L")
def test_substitute_variables_with_mapping(self):
content = {
'request': {
'url': '/api/users/$uid',
'method': "$method",
'headers': {'token': '$token'},
'data': {
"null": None,
"true": True,
"false": False,
"empty_str": ""
}
}
}
mapping = {
"$uid": 1000,
"$method": "POST"
}
result = utils.substitute_variables_with_mapping(content, mapping)
self.assertEqual("/api/users/1000", result["request"]["url"])
self.assertEqual("$token", result["request"]["headers"]["token"])
self.assertEqual("POST", result["request"]["method"])
self.assertIsNone(result["request"]["data"]["null"])
self.assertTrue(result["request"]["data"]["true"])
self.assertFalse(result["request"]["data"]["false"])
self.assertEqual("", result["request"]["data"]["empty_str"])
def test_merge_validator(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
merged_validators = utils._merge_validator(def_validators, current_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "eq"},
merged_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "len_eq"},
merged_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "len_eq"},
merged_validators
)
def test_merge_validator_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
merged_validators = utils._merge_validator(def_validators, current_validators)
self.assertEqual(len(merged_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'eq'}, merged_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'eq'}, merged_validators)
def test_merge_extractor(self):
api_extrators = [{"var1": "val1"}, {"var2": "val2"}]
current_extractors = [{"var1": "val111"}, {"var3": "val3"}]
merged_extractors = utils._merge_extractor(api_extrators, current_extractors)
self.assertIn(
{"var1": "val111"},
merged_extractors
)
self.assertIn(
{"var2": "val2"},
merged_extractors
)
self.assertIn(
{"var3": "val3"},
merged_extractors
)
def test_get_uniform_comparator(self):
self.assertEqual(utils.get_uniform_comparator("eq"), "equals")
self.assertEqual(utils.get_uniform_comparator("=="), "equals")
@@ -175,8 +99,9 @@ class TestUtils(ApiServerUnittest):
self.assertEqual(utils.get_uniform_comparator("count_less_than_or_equals"), "length_less_than_or_equals")
def current_validators(self):
imported_module = utils.get_imported_module("httprunner.built_in")
functions_mapping = utils.filter_module(imported_module, "function")
from httprunner import built_in
module_mapping = loader.load_python_module(built_in)
functions_mapping = module_mapping["functions"]
functions_mapping["equals"](None, None)
functions_mapping["equals"](1, 1)
@@ -230,74 +155,6 @@ class TestUtils(ApiServerUnittest):
{'a': 2, 'b': {'c': 33, 'd': 4, 'e': 5}, 'f': 6, 'g': 7, 'h': 123}
)
def test_get_imported_module(self):
imported_module = utils.get_imported_module("os")
self.assertIn("walk", dir(imported_module))
def test_filter_module_functions(self):
imported_module = utils.get_imported_module("httprunner.utils")
self.assertIn("is_py3", dir(imported_module))
functions_dict = utils.filter_module(imported_module, "function")
self.assertIn("filter_module", functions_dict)
self.assertNotIn("is_py3", functions_dict)
def test_get_imported_module_from_file(self):
imported_module = utils.get_imported_module_from_file("tests/debugtalk.py")
self.assertIn("gen_md5", dir(imported_module))
functions_dict = utils.filter_module(imported_module, "function")
self.assertIn("gen_md5", functions_dict)
self.assertNotIn("urllib", functions_dict)
with self.assertRaises(exceptions.FileNotFoundError):
utils.get_imported_module_from_file("tests/debugtalk2.py")
def test_search_conf_function(self):
gen_md5 = utils.search_conf_item("tests/data/demo_binds.yml", "function", "gen_md5")
self.assertTrue(utils.is_function(("gen_md5", gen_md5)))
self.assertEqual(gen_md5("abc"), "900150983cd24fb0d6963f7d28e17f72")
gen_md5 = utils.search_conf_item("tests/data/subfolder/test.yml", "function", "gen_md5")
self.assertTrue(utils.is_function(("_", gen_md5)))
self.assertEqual(gen_md5("abc"), "900150983cd24fb0d6963f7d28e17f72")
with self.assertRaises(exceptions.FunctionNotFound):
utils.search_conf_item("tests/data/subfolder/test.yml", "function", "func_not_exist")
with self.assertRaises(exceptions.FunctionNotFound):
utils.search_conf_item("/user/local/bin", "function", "gen_md5")
def test_search_conf_variable(self):
SECRET_KEY = utils.search_conf_item("tests/data/demo_binds.yml", "variable", "SECRET_KEY")
self.assertTrue(utils.is_variable(("SECRET_KEY", SECRET_KEY)))
self.assertEqual(SECRET_KEY, "DebugTalk")
SECRET_KEY = utils.search_conf_item("tests/data/subfolder/test.yml", "variable", "SECRET_KEY")
self.assertTrue(utils.is_variable(("SECRET_KEY", SECRET_KEY)))
self.assertEqual(SECRET_KEY, "DebugTalk")
with self.assertRaises(exceptions.VariableNotFound):
utils.search_conf_item("tests/data/subfolder/test.yml", "variable", "variable_not_exist")
with self.assertRaises(exceptions.VariableNotFound):
utils.search_conf_item("/user/local/bin", "variable", "SECRET_KEY")
def test_is_variable(self):
var1 = 123
var2 = "abc"
self.assertTrue(utils.is_variable(("var1", var1)))
self.assertTrue(utils.is_variable(("var2", var2)))
__var = 123
self.assertFalse(utils.is_variable(("__var", __var)))
func = lambda x: x + 1
self.assertFalse(utils.is_variable(("func", func)))
self.assertFalse(utils.is_variable(("os", os)))
self.assertFalse(utils.is_variable(("utils", utils)))
def test_handle_config_key_case(self):
origin_dict = {
"Name": "test",
@@ -403,3 +260,46 @@ class TestUtils(ApiServerUnittest):
self.assertTrue(os.path.isdir(os.path.join(project_path, "tests", "testcases")))
self.assertTrue(os.path.isfile(os.path.join(project_path, "tests", "debugtalk.py")))
shutil.rmtree(project_path)
def test_cartesian_product_one(self):
parameters_content_list = [
[
{"a": 1},
{"a": 2}
]
]
product_list = utils.gen_cartesian_product(*parameters_content_list)
self.assertEqual(
product_list,
[
{"a": 1},
{"a": 2}
]
)
def test_cartesian_product_multiple(self):
parameters_content_list = [
[
{"a": 1},
{"a": 2}
],
[
{"x": 111, "y": 112},
{"x": 121, "y": 122}
]
]
product_list = utils.gen_cartesian_product(*parameters_content_list)
self.assertEqual(
product_list,
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
)
def test_cartesian_product_empty(self):
parameters_content_list = []
product_list = utils.gen_cartesian_product(*parameters_content_list)
self.assertEqual(product_list, [])

55
tests/test_validator.py Normal file
View File

@@ -0,0 +1,55 @@
import unittest
from httprunner import validator
class TestValidator(unittest.TestCase):
def test_is_testcases(self):
data_structure = "path/to/file"
self.assertFalse(validator.is_testcases(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(validator.is_testcases(data_structure))
data_structure = {
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
}
self.assertTrue(data_structure)
data_structure = [
{
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
},
{
"name": "desc2",
"config": {},
"api": {},
"testcases": ["testcase21", "testcase22"]
}
]
self.assertTrue(data_structure)
def test_is_variable(self):
var1 = 123
var2 = "abc"
self.assertTrue(validator.is_variable(("var1", var1)))
self.assertTrue(validator.is_variable(("var2", var2)))
__var = 123
self.assertFalse(validator.is_variable(("__var", __var)))
func = lambda x: x + 1
self.assertFalse(validator.is_variable(("func", func)))
self.assertFalse(validator.is_variable(("unittest", unittest)))
def test_is_function(self):
func = lambda x: x + 1
self.assertTrue(validator.is_function(("func", func)))
self.assertTrue(validator.is_function(("func", validator.is_testcase)))