Merge pull request #335 from HttpRunner/modular

refactor file loader and testcase loader
This commit is contained in:
debugtalk
2018-08-05 00:17:26 +08:00
committed by GitHub
17 changed files with 1296 additions and 1265 deletions

View File

@@ -1,7 +1,7 @@
__title__ = 'HttpRunner'
__description__ = 'One-stop solution for HTTP(S) testing.'
__url__ = 'https://github.com/HttpRunner/HttpRunner'
__version__ = '1.5.8'
__version__ = '1.5.9'
__author__ = 'debugtalk'
__author_email__ = 'mail@debugtalk.com'
__license__ = 'MIT'

View File

@@ -5,7 +5,7 @@ import os
import re
import sys
from httprunner import built_in, exceptions, logger, testcase, utils
from httprunner import built_in, exceptions, logger, parser, testcase, utils
from httprunner.compat import OrderedDict
@@ -177,7 +177,7 @@ class Context(object):
# 5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
if isinstance(check_item, (dict, list)) \
or testcase.extract_variables(check_item) \
or parser.extract_variables(check_item) \
or testcase.extract_functions(check_item):
# format 1/2/3
check_value = self.eval_content(check_item)
@@ -251,7 +251,7 @@ class Context(object):
for validator in validators:
# evaluate validators with context variable mapping.
evaluated_validator = self.eval_check_item(
testcase.parse_validator(validator),
parser.parse_validator(validator),
resp_obj
)

407
httprunner/loader.py Normal file
View File

@@ -0,0 +1,407 @@
import csv
import io
import json
import os
import yaml
from httprunner import exceptions, logger, parser, utils
###############################################################################
## file loader
###############################################################################
def _check_format(file_path, content):
""" check testcase format if valid
"""
# TODO: replace with JSON schema validation
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
def load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
_check_format(yaml_file, yaml_content)
return yaml_content
def load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
_check_format(json_file, json_content)
return json_content
def load_csv_file(csv_file):
""" load csv file and check file content format
@param
csv_file: csv file path
e.g. csv file content:
username,password
test1,111111
test2,222222
test3,333333
@return
list of parameter, each parameter is in dict format
e.g.
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return load_yaml_file(file_path)
elif file_suffix == ".csv":
return load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files in list format.
@param
folder_path: specified folder path to load
recursive: if True, will load files recursively
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def load_dot_env_file(path):
""" load .env file and set to os.environ
"""
if not path:
path = os.path.join(os.getcwd(), ".env")
if not os.path.isfile(path):
logger.log_debug(".env file not exist: {}".format(path))
return
else:
if not os.path.isfile(path):
raise exceptions.FileNotFound("env file not exist: {}".format(path))
logger.log_info("Loading environment variables from {}".format(path))
with io.open(path, 'r', encoding='utf-8') as fp:
for line in fp:
variable, value = line.split("=")
variable = variable.strip()
os.environ[variable] = value.strip()
logger.log_debug("Loaded variable: {}".format(variable))
###############################################################################
## suite loader
###############################################################################
overall_def_dict = {
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
# TODO: cache api and suite loading
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
for test_file in load_folder_files(api_def_folder):
load_api_file(test_file)
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
for suite_file in load_folder_files(suite_def_folder):
suite = load_test_file(suite_file)
if "def" not in suite["config"]:
raise exceptions.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parser.parse_function(call_func)
suite["function_meta"] = function_meta
overall_def_dict["suite"][function_meta["func_name"]] = suite
def load_api_file(file_path):
""" load api definition from file and store in overall_def_dict["api"]
api file should be in format below:
[
{
"api": {
"def": "api_login",
"request": {},
"validate": []
}
},
{
"api": {
"def": "api_logout",
"request": {},
"validate": []
}
}
]
"""
api_items = load_file(file_path)
if not isinstance(api_items, list):
raise exceptions.FileFormatError("API format error: {}".format(file_path))
for api_item in api_items:
if not isinstance(api_item, dict) or len(api_item) != 1:
raise exceptions.FileFormatError("API format error: {}".format(file_path))
key, api_dict = api_item.popitem()
if key != "api" or not isinstance(api_dict, dict) or "def" not in api_dict:
raise exceptions.FileFormatError("API format error: {}".format(file_path))
api_def = api_dict.pop("def")
function_meta = parser.parse_function(api_def)
func_name = function_meta["func_name"]
if func_name in overall_def_dict["api"]:
logger.log_warning("API definition duplicated: {}".format(func_name))
api_dict["function_meta"] = function_meta
overall_def_dict["api"][func_name] = api_dict
def load_test_file(file_path):
""" load testcase file or testsuite file
@param file_path: absolute valid file path
file_path should be in format below:
[
{
"config": {
"name": "",
"def": "suite_order()",
"request": {}
}
},
{
"test": {
"name": "add product to cart",
"api": "api_add_cart()",
"validate": []
}
},
{
"test": {
"name": "checkout cart",
"request": {},
"validate": []
}
}
]
@return testset dict
{
"config": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"config": {
"path": file_path
},
"testcases": [] # TODO: rename to tests
}
for item in load_file(file_path):
if not isinstance(item, dict) or len(item) != 1:
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
key, test_block = item.popitem()
if not isinstance(test_block, dict):
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
if key == "config":
testset["config"].update(test_block)
elif key == "test":
if "api" in test_block:
ref_call = test_block["api"]
def_block = _get_block_by_name(ref_call, "api")
utils._override_block(def_block, test_block)
testset["testcases"].append(test_block)
elif "suite" in test_block:
ref_call = test_block["suite"]
block = _get_block_by_name(ref_call, "suite")
testset["testcases"].extend(block["testcases"])
else:
testset["testcases"].append(test_block)
else:
logger.log_warning(
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
)
return testset
def _get_block_by_name(ref_call, ref_type):
""" get test content by reference name
@params:
ref_call: e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parser.parse_function(ref_call)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
block = _get_test_definition(func_name, ref_type)
def_args = block.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exceptions.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
block = utils.substitute_variables_with_mapping(block, args_mapping)
return block
def _get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
block = overall_def_dict.get(ref_type, {}).get(name)
if not block:
err_msg = "{} not found!".format(name)
if ref_type == "api":
raise exceptions.ApiNotFound(err_msg)
else:
# ref_type == "suite":
raise exceptions.SuiteNotFound(err_msg)
return block
def load_testcases(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcases list, each testcase is corresponding to a file
[
testcase_dict_1,
testcase_dict_2
]
"""
if isinstance(path, (list, set)):
testcases_list = []
for file_path in set(path):
testcases = load_testcases(file_path)
if not testcases:
continue
testcases_list.extend(testcases)
return testcases_list
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = load_folder_files(path)
testcases_list = load_testcases(files_list)
elif os.path.isfile(path):
try:
testcase = load_test_file(path)
if testcase["testcases"]:
testcases_list = [testcase]
else:
testcases_list = []
except exceptions.FileFormatError:
testcases_list = []
else:
err_msg = "file not found: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
testcases_cache_mapping[path] = testcases_list
return testcases_list

View File

@@ -6,7 +6,7 @@ import os
import sys
from httprunner.logger import color_print
from httprunner.testcase import TestcaseLoader
from httprunner import loader
from locust.main import main
@@ -40,8 +40,8 @@ def gen_locustfile(testcase_file_path):
"templates",
"locustfile_template"
)
TestcaseLoader.load_test_dependencies()
testset = TestcaseLoader.load_test_file(testcase_file_path)
loader.load_test_dependencies()
testset = loader.load_test_file(testcase_file_path)
host = testset.get("config", {}).get("request", {}).get("base_url", "")
with io.open(template_path, encoding='utf-8') as template:

129
httprunner/parser.py Normal file
View File

@@ -0,0 +1,129 @@
import ast
import re
from httprunner import exceptions
variable_regexp = r"\$([\w_]+)"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
def parse_string_value(str_value):
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
"""
# TODO: change variable notation from $var to {{var}}
try:
return re.findall(variable_regexp, content)
except TypeError:
return []
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
matched = function_regexp_compile.match(content)
if not matched:
raise exceptions.FunctionNotFound("{} not found!".format(content))
function_meta = {
"func_name": matched.group(1),
"args": [],
"kwargs": {}
}
args_str = matched.group(2).strip()
if args_str == "":
return function_meta
args_list = args_str.split(',')
for arg in args_list:
arg = arg.strip()
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key.strip()] = parse_string_value(value.strip())
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
def parse_validator(validator):
""" parse validator, validator maybe in two format
@param (dict) validator
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
@return (dict) validator info
{
"check": "status_code",
"expect": 201,
"comparator": "eq"
}
"""
if not isinstance(validator, dict):
raise exceptions.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and len(validator) > 1:
# format1
check_item = validator.get("check")
if "expect" in validator:
expect_value = validator.get("expect")
elif "expected" in validator:
expect_value = validator.get("expected")
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}

View File

@@ -103,6 +103,7 @@ class Runner(object):
def do_hook_actions(self, actions):
for action in actions:
logger.log_debug("call hook: {}".format(action))
# TODO: check hook function if valid
self.context.eval_content(action)
def run_test(self, testcase_dict):

View File

@@ -4,12 +4,10 @@ import copy
import sys
import unittest
from httprunner import exceptions, logger, runner, testcase, utils
from httprunner import exceptions, loader, logger, runner, testcase, utils
from httprunner.compat import is_py3
from httprunner.report import (HtmlTestResult, get_platform, get_summary,
render_html_report)
from httprunner.testcase import TestcaseLoader
from httprunner.utils import load_dot_env_file, print_output
class TestCase(unittest.TestCase):
@@ -180,8 +178,8 @@ def init_test_suites(path_or_testsets, mapping=None, http_client_session=None):
passed in variables mapping, it will override variables in config block
"""
if not testcase.is_testsets(path_or_testsets):
TestcaseLoader.load_test_dependencies()
testsets = TestcaseLoader.load_testsets_by_path(path_or_testsets)
loader.load_test_dependencies()
testsets = loader.load_testcases(path_or_testsets)
else:
testsets = path_or_testsets
@@ -212,7 +210,7 @@ class HttpRunner(object):
- dot_env_path: .env file path
"""
dot_env_path = kwargs.pop("dot_env_path", None)
load_dot_env_file(dot_env_path)
loader.load_dot_env_file(dot_env_path)
kwargs.setdefault("resultclass", HtmlTestResult)
self.runner = unittest.TextTestRunner(**kwargs)
@@ -268,7 +266,7 @@ class HttpRunner(object):
test_suite_summary["name"] = test_suite.config.get("name")
test_suite_summary["base_url"] = test_suite.config.get("request", {}).get("base_url", "")
test_suite_summary["output"] = test_suite.output
print_output(test_suite_summary["output"])
utils.print_output(test_suite_summary["output"])
accumulate_stat(self.summary["stat"], test_suite_summary["stat"])
accumulate_stat(self.summary["time"], test_suite_summary["time"])

View File

@@ -1,7 +1,5 @@
# encoding: utf-8
import ast
import collections
import io
import itertools
import json
@@ -9,31 +7,14 @@ import os
import random
import re
from httprunner import exceptions, logger, utils
from httprunner import exceptions, loader, logger, parser, utils
from httprunner.compat import (OrderedDict, basestring, builtin_str,
numeric_types, str)
from httprunner.utils import FileUtils
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w\.\-_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w\.\-_ =,]*)\)$")
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
"""
try:
return re.findall(variable_regexp, content)
except TypeError:
return []
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
@param (str) content
@@ -50,493 +31,6 @@ def extract_functions(content):
except TypeError:
return []
def parse_string_value(str_value):
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
matched = function_regexp_compile.match(content)
if not matched:
raise exceptions.FunctionNotFound("{} not found!".format(content))
function_meta = {
"func_name": matched.group(1),
"args": [],
"kwargs": {}
}
args_str = matched.group(2).strip()
if args_str == "":
return function_meta
args_list = args_str.split(',')
for arg in args_list:
arg = arg.strip()
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key.strip()] = parse_string_value(value.strip())
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
class TestcaseLoader(object):
overall_def_dict = {
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
@staticmethod
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
# TODO: cache api and suite loading
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
for test_file in FileUtils.load_folder_files(api_def_folder):
TestcaseLoader.load_api_file(test_file)
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
for suite_file in FileUtils.load_folder_files(suite_def_folder):
suite = TestcaseLoader.load_test_file(suite_file)
if "def" not in suite["config"]:
raise exceptions.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
TestcaseLoader.overall_def_dict["suite"][function_meta["func_name"]] = suite
@staticmethod
def load_api_file(file_path):
""" load api definition from file and store in overall_def_dict["api"]
api file should be in format below:
[
{
"api": {
"def": "api_login",
"request": {},
"validate": []
}
},
{
"api": {
"def": "api_logout",
"request": {},
"validate": []
}
}
]
"""
api_items = FileUtils.load_file(file_path)
if not isinstance(api_items, list):
raise exceptions.FileFormatError("API format error: {}".format(file_path))
for api_item in api_items:
if not isinstance(api_item, dict) or len(api_item) != 1:
raise exceptions.FileFormatError("API format error: {}".format(file_path))
key, api_dict = api_item.popitem()
if key != "api" or not isinstance(api_dict, dict) or "def" not in api_dict:
raise exceptions.FileFormatError("API format error: {}".format(file_path))
api_def = api_dict.pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
if func_name in TestcaseLoader.overall_def_dict["api"]:
logger.log_warning("API definition duplicated: {}".format(func_name))
api_dict["function_meta"] = function_meta
TestcaseLoader.overall_def_dict["api"][func_name] = api_dict
@staticmethod
def load_test_file(file_path):
""" load testcase file or suite file
@param file_path: absolute valid file path
file_path should be in format below:
[
{
"config": {
"name": "",
"def": "suite_order()",
"request": {}
}
},
{
"test": {
"name": "add product to cart",
"api": "api_add_cart()",
"validate": []
}
},
{
"test": {
"name": "checkout cart",
"request": {},
"validate": []
}
}
]
@return testset dict
{
"config": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"config": {
"path": file_path
},
"testcases": [] # TODO: rename to tests
}
for item in FileUtils.load_file(file_path):
if not isinstance(item, dict) or len(item) != 1:
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
key, test_block = item.popitem()
if not isinstance(test_block, dict):
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
if key == "config":
testset["config"].update(test_block)
elif key == "test":
if "api" in test_block:
ref_call = test_block["api"]
def_block = TestcaseLoader._get_block_by_name(ref_call, "api")
TestcaseLoader._override_block(def_block, test_block)
testset["testcases"].append(test_block)
elif "suite" in test_block:
ref_call = test_block["suite"]
block = TestcaseLoader._get_block_by_name(ref_call, "suite")
testset["testcases"].extend(block["testcases"])
else:
testset["testcases"].append(test_block)
else:
logger.log_warning(
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
)
return testset
@staticmethod
def _get_block_by_name(ref_call, ref_type):
""" get test content by reference name
@params:
ref_call: e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_call)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
block = TestcaseLoader._get_test_definition(func_name, ref_type)
def_args = block.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exceptions.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
block = substitute_variables_with_mapping(block, args_mapping)
return block
@staticmethod
def _get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
block = TestcaseLoader.overall_def_dict.get(ref_type, {}).get(name)
if not block:
err_msg = "{} not found!".format(name)
if ref_type == "api":
raise exceptions.ApiNotFound(err_msg)
else:
# ref_type == "suite":
raise exceptions.SuiteNotFound(err_msg)
return block
@staticmethod
def _override_block(def_block, current_block):
""" override def_block with current_block
@param def_block:
{
"name": "get token",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
@param current_block:
{
"name": "get token",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
def_validators = def_block.get("validate") or def_block.get("validators", [])
current_validators = current_block.get("validate") or current_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
current_extractors = current_block.get("extract") \
or current_block.get("extractors") \
or current_block.get("extract_binds", [])
current_block.update(def_block)
current_block["validate"] = _merge_validator(
def_validators,
current_validators
)
current_block["extract"] = _merge_extractor(
def_extrators,
current_extractors
)
@staticmethod
def load_testsets_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
for file_path in set(path):
testset = TestcaseLoader.load_testsets_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
return testsets
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in TestcaseLoader.testcases_cache_mapping:
return TestcaseLoader.testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = FileUtils.load_folder_files(path)
testcases_list = TestcaseLoader.load_testsets_by_path(files_list)
elif os.path.isfile(path):
try:
testset = TestcaseLoader.load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exceptions.FileFormatError:
testcases_list = []
else:
logger.log_error(u"file not found: {}".format(path))
testcases_list = []
TestcaseLoader.testcases_cache_mapping[path] = testcases_list
return testcases_list
def parse_validator(validator):
""" parse validator, validator maybe in two format
@param (dict) validator
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
@return (dict) validator info
{
"check": "status_code",
"expect": 201,
"comparator": "eq"
}
"""
if not isinstance(validator, dict):
raise exceptions.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and len(validator) > 1:
# format1
check_item = validator.get("check")
if "expect" in validator:
expect_value = validator.get("expect")
elif "expected" in validator:
expect_value = validator.get("expected")
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _get_validators_mapping(validators):
""" get validators mapping from api or test validators
@param (list) validators:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
@return
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def _merge_validator(def_validators, current_validators):
""" merge def_validators with current_validators
@params:
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not def_validators:
return current_validators
elif not current_validators:
return def_validators
else:
api_validators_mapping = _get_validators_mapping(def_validators)
test_validators_mapping = _get_validators_mapping(current_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def _merge_extractor(def_extrators, current_extractors):
""" merge def_extrators with current_extractors
@params:
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not def_extrators:
return current_extractors
elif not current_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in current_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def is_testset(data_structure):
""" check if data_structure is a testset
@@ -578,61 +72,6 @@ def is_testsets(data_structure):
return True
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
# TODO: refactor type check
if isinstance(content, bool):
return content
if isinstance(content, (numeric_types, type)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
if not isinstance(value, str):
value = builtin_str(value)
content = content.replace(var, value)
return content
def gen_cartesian_product(*args):
""" generate cartesian product for lists
@@ -792,7 +231,7 @@ class TestcaseParser(object):
os.path.dirname(self.file_path),
"{}".format(csv_file_name)
)
csv_content_list = FileUtils.load_file(parameter_file_path)
csv_content_list = loader.load_file(parameter_file_path)
if fetch_method.lower() == "random":
random.shuffle(csv_content_list)
@@ -802,7 +241,7 @@ class TestcaseParser(object):
def _eval_content_functions(self, content):
functions_list = extract_functions(content)
for func_content in functions_list:
function_meta = parse_function(func_content)
function_meta = parser.parse_function(func_content)
func_name = function_meta['func_name']
args = function_meta.get('args', [])
@@ -844,7 +283,7 @@ class TestcaseParser(object):
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = extract_variables(content)
variables_list = parser.extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_variable(variable_name)

View File

@@ -1,7 +1,7 @@
# encoding: utf-8
import collections
import copy
import csv
import hashlib
import hmac
import imp
@@ -10,14 +10,13 @@ import io
import json
import os.path
import random
import re
import string
import types
from datetime import datetime
import yaml
from httprunner import exceptions, logger
from httprunner.compat import OrderedDict, basestring, is_py2, is_py3, str
from httprunner import exceptions, logger, parser
from httprunner.compat import (OrderedDict, basestring, builtin_str, is_py2,
is_py3, numeric_types, str)
from requests.structures import CaseInsensitiveDict
SECRET_KEY = "DebugTalk"
@@ -44,132 +43,6 @@ def remove_prefix(text, prefix):
return text
class FileUtils(object):
@staticmethod
def _check_format(file_path, content):
""" check testcase format if valid
"""
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
@staticmethod
def _load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
FileUtils._check_format(yaml_file, yaml_content)
return yaml_content
@staticmethod
def _load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
FileUtils._check_format(json_file, json_content)
return json_content
@staticmethod
def _load_csv_file(csv_file):
""" load csv file and check file content format
@param
csv_file: csv file path
e.g. csv file content:
username,password
test1,111111
test2,222222
test3,333333
@return
list of parameter, each parameter is in dict format
e.g.
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
@staticmethod
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return FileUtils._load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return FileUtils._load_yaml_file(file_path)
elif file_suffix == ".csv":
return FileUtils._load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
@staticmethod
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files in list format.
@param
folder_path: specified folder path to load
recursive: if True, will load files recursively
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(FileUtils.load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def query_json(json_content, query, delimiter='.'):
""" Do an xpath-like query with json_content.
@param (dict/list/string) json_content
@@ -213,6 +86,205 @@ def query_json(json_content, query, delimiter='.'):
return json_content
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
# TODO: refactor type check
if isinstance(content, bool):
return content
if isinstance(content, (numeric_types, type)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
if not isinstance(value, str):
value = builtin_str(value)
content = content.replace(var, value)
return content
def _get_validators_mapping(validators):
""" get validators mapping from api or test validators
@param (list) validators:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
@return
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parser.parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def _merge_validator(def_validators, current_validators):
""" merge def_validators with current_validators
@params:
def_validators: [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
current_validators: [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
@return:
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not def_validators:
return current_validators
elif not current_validators:
return def_validators
else:
api_validators_mapping = _get_validators_mapping(def_validators)
test_validators_mapping = _get_validators_mapping(current_validators)
api_validators_mapping.update(test_validators_mapping)
return list(api_validators_mapping.values())
def _merge_extractor(def_extrators, current_extractors):
""" merge def_extrators with current_extractors
@params:
def_extrators: [{"var1": "val1"}, {"var2": "val2"}]
current_extractors: [{"var1": "val111"}, {"var3": "val3"}]
@return:
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not def_extrators:
return current_extractors
elif not current_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in current_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def _override_block(def_block, current_block):
""" override def_block with current_block
@param def_block:
{
"name": "get token",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
@param current_block:
{
"name": "get token",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
@return
{
"name": "get token",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
def_validators = def_block.get("validate") or def_block.get("validators", [])
current_validators = current_block.get("validate") or current_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
current_extractors = current_block.get("extract") \
or current_block.get("extractors") \
or current_block.get("extract_binds", [])
current_block.update(def_block)
current_block["validate"] = _merge_validator(
def_validators,
current_validators
)
current_block["extract"] = _merge_extractor(
def_extrators,
current_extractors
)
def get_uniform_comparator(comparator):
""" convert comparator alias to uniform name
"""
@@ -504,25 +576,6 @@ def create_scaffold(project_path):
logger.color_print(msg, "BLUE")
def load_dot_env_file(path):
""" load .env file and set to os.environ
"""
if not path:
path = os.path.join(os.getcwd(), ".env")
if not os.path.isfile(path):
logger.log_debug(".env file not exist: {}".format(path))
return
else:
if not os.path.isfile(path):
raise exceptions.FileNotFound("env file not exist: {}".format(path))
logger.log_info("Loading environment variables from {}".format(path))
with io.open(path, 'r', encoding='utf-8') as fp:
for line in fp:
variable, value = line.split("=")
variable = variable.strip()
os.environ[variable] = value.strip()
logger.log_debug("Loaded variable: {}".format(variable))
def validate_json_file(file_list):
""" validate JSON testset format

View File

@@ -2,9 +2,9 @@ import os
import time
import requests
from httprunner import exceptions, response, runner, testcase
from httprunner import exceptions, loader, response, runner, testcase
from httprunner.context import Context
from httprunner.utils import FileUtils, gen_md5
from httprunner.utils import gen_md5
from tests.base import ApiServerUnittest
@@ -13,7 +13,7 @@ class VariableBindsUnittest(ApiServerUnittest):
def setUp(self):
self.context = Context()
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo_binds.yml')
self.testcases = FileUtils.load_file(testcase_file_path)
self.testcases = loader.load_file(testcase_file_path)
def test_context_init_functions(self):
self.assertIn("get_timestamp", self.context.testset_functions_config)

View File

@@ -2,7 +2,6 @@ import os
import shutil
from httprunner import HttpRunner
from httprunner.exceptions import FileNotFound
from tests.base import HTTPBIN_SERVER, ApiServerUnittest
@@ -155,13 +154,3 @@ class TestHttpRunner(ApiServerUnittest):
summary = runner.summary
self.assertTrue(summary["success"])
self.assertEqual(summary["stat"]["testsRun"], 8)
def test_load_env_path(self):
self.assertNotIn("PROJECT_KEY", os.environ)
HttpRunner(dot_env_path="tests/data/test.env").run(self.testset_path)
self.assertIn("PROJECT_KEY", os.environ)
self.assertEqual(os.environ["UserName"], "debugtalk")
def test_load_env_path_not_exist(self):
with self.assertRaises(FileNotFound):
HttpRunner(dot_env_path="not_exist.env").run(self.testset_path)

326
tests/test_loader.py Normal file
View File

@@ -0,0 +1,326 @@
import os
import unittest
from httprunner import exceptions, loader, utils
class TestFileLoader(unittest.TestCase):
def test_load_yaml_file_file_format_error(self):
yaml_tmp_file = "tests/data/tmp.yml"
# create empty yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
loader.load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
# create invalid format yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
loader.load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
def test_load_json_file_file_format_error(self):
json_tmp_file = "tests/data/tmp.json"
# create empty file
with open(json_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create empty json file
with open(json_tmp_file, 'w') as f:
f.write("{}")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create invalid format json file
with open(json_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
def test_load_testcases_bad_filepath(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
with self.assertRaises(exceptions.FileNotFound):
loader.load_file(testcase_file_path)
def test_load_json_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
testcases = loader.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_yaml_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_hardcode.yml')
testcases = loader.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_csv_file_one_parameter(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/user_agent.csv')
csv_content = loader.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'user_agent': 'iOS/10.1'},
{'user_agent': 'iOS/10.2'},
{'user_agent': 'iOS/10.3'}
]
)
def test_load_csv_file_multiple_parameters(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/account.csv')
csv_content = loader.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
)
def test_load_folder_files(self):
folder = os.path.join(os.getcwd(), 'tests')
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
file2 = os.path.join(os.getcwd(), 'tests', 'data', 'demo_binds.yml')
files = loader.load_folder_files(folder, recursive=False)
self.assertNotIn(file2, files)
files = loader.load_folder_files(folder)
self.assertIn(file2, files)
self.assertNotIn(file1, files)
files = loader.load_folder_files(folder)
api_file = os.path.join(os.getcwd(), 'tests', 'api', 'basic.yml')
self.assertIn(api_file, files)
files = loader.load_folder_files("not_existed_foulder", recursive=False)
self.assertEqual([], files)
files = loader.load_folder_files(file2, recursive=False)
self.assertEqual([], files)
def test_load_dot_env_file(self):
self.assertNotIn("PROJECT_KEY", os.environ)
loader.load_dot_env_file("tests/data/test.env")
self.assertIn("PROJECT_KEY", os.environ)
self.assertEqual(os.environ["UserName"], "debugtalk")
def test_load_env_path_not_exist(self):
with self.assertRaises(exceptions.FileNotFound):
loader.load_dot_env_file("not_exist.env")
class TestSuiteLoader(unittest.TestCase):
def setUp(self):
loader.overall_def_dict = {
"api": {},
"suite": {}
}
def test_load_test_dependencies(self):
loader.load_test_dependencies()
overall_def_dict = loader.overall_def_dict
self.assertIn("get_token", overall_def_dict["api"])
self.assertIn("create_and_check", overall_def_dict["suite"])
def test_load_api_file(self):
loader.load_api_file("tests/api/basic.yml")
overall_api_def_dict = loader.overall_def_dict["api"]
self.assertIn("get_token",overall_api_def_dict)
self.assertEqual("/api/get-token", overall_api_def_dict["get_token"]["request"]["url"])
self.assertIn("$user_agent", overall_api_def_dict["get_token"]["function_meta"]["args"])
self.assertEqual(len(overall_api_def_dict["get_token"]["validate"]), 3)
def test_load_test_file_suite(self):
loader.load_api_file("tests/api/basic.yml")
testset = loader.load_test_file("tests/suite/create_and_get.yml")
self.assertEqual(testset["config"]["name"], "create user and check result.")
self.assertEqual(len(testset["testcases"]), 3)
self.assertEqual(testset["testcases"][0]["name"], "make sure user $uid does not exist")
self.assertEqual(testset["testcases"][0]["request"]["url"], "/api/users/$uid")
def test_load_test_file_testcase(self):
loader.load_test_dependencies()
testset = loader.load_test_file("tests/testcases/smoketest.yml")
self.assertEqual(testset["config"]["name"], "smoketest")
self.assertEqual(testset["config"]["path"], "tests/testcases/smoketest.yml")
self.assertIn("device_sn", testset["config"]["variables"][0])
self.assertEqual(len(testset["testcases"]), 8)
self.assertEqual(testset["testcases"][0]["name"], "get token")
def test_get_block_by_name(self):
loader.load_test_dependencies()
ref_call = "get_user($uid, $token)"
block = loader._get_block_by_name(ref_call, "api")
self.assertEqual(block["request"]["url"], "/api/users/$uid")
self.assertEqual(block["function_meta"]["func_name"], "get_user")
self.assertEqual(block["function_meta"]["args"], ['$uid', '$token'])
def test_get_block_by_name_args_mismatch(self):
loader.load_test_dependencies()
ref_call = "get_user($uid, $token, $var)"
with self.assertRaises(exceptions.ParamsError):
loader._get_block_by_name(ref_call, "api")
def test_override_block(self):
loader.load_test_dependencies()
def_block = loader._get_block_by_name("get_token($user_agent, $device_sn, $os_platform, $app_version)", "api")
test_block = {
"name": "override block",
"variables": [
{"var": 123}
],
'request': {
'url': '/api/get-token', 'method': 'POST', 'headers': {'user_agent': '$user_agent', 'device_sn': '$device_sn', 'os_platform': '$os_platform', 'app_version': '$app_version'}, 'json': {'sign': '${get_sign($user_agent, $device_sn, $os_platform, $app_version)}'}},
'validate': [
{'eq': ['status_code', 201]},
{'len_eq': ['content.token', 32]}
]
}
utils._override_block(def_block, test_block)
self.assertEqual(test_block["name"], "override block")
self.assertIn({'check': 'status_code', 'expect': 201, 'comparator': 'eq'}, test_block["validate"])
self.assertIn({'check': 'content.token', 'comparator': 'len_eq', 'expect': 32}, test_block["validate"])
def test_get_test_definition_api(self):
loader.load_test_dependencies()
api_def = loader._get_test_definition("get_headers", "api")
self.assertEqual(api_def["request"]["url"], "/headers")
self.assertEqual(len(api_def["setup_hooks"]), 2)
self.assertEqual(len(api_def["teardown_hooks"]), 1)
with self.assertRaises(exceptions.ApiNotFound):
loader._get_test_definition("get_token_XXX", "api")
def test_get_test_definition_suite(self):
loader.load_test_dependencies()
api_def = loader._get_test_definition("create_and_check", "suite")
self.assertEqual(api_def["config"]["name"], "create user and check result.")
with self.assertRaises(exceptions.SuiteNotFound):
loader._get_test_definition("create_and_check_XXX", "suite")
def test_load_testcases_by_path_files(self):
testsets_list = []
# absolute file path
path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
testset_list = loader.load_testcases(path)
self.assertEqual(len(testset_list), 1)
self.assertIn("path", testset_list[0]["config"])
self.assertEqual(testset_list[0]["config"]["path"], path)
self.assertEqual(len(testset_list[0]["testcases"]), 3)
testsets_list.extend(testset_list)
# relative file path
path = 'tests/data/demo_testset_hardcode.yml'
testset_list = loader.load_testcases(path)
self.assertEqual(len(testset_list), 1)
self.assertIn("path", testset_list[0]["config"])
self.assertIn(path, testset_list[0]["config"]["path"])
self.assertEqual(len(testset_list[0]["testcases"]), 3)
testsets_list.extend(testset_list)
# list/set container with file(s)
path = [
os.path.join(os.getcwd(), 'tests/data/demo_testset_hardcode.json'),
'tests/data/demo_testset_hardcode.yml'
]
testset_list = loader.load_testcases(path)
self.assertEqual(len(testset_list), 2)
self.assertEqual(len(testset_list[0]["testcases"]), 3)
self.assertEqual(len(testset_list[1]["testcases"]), 3)
testsets_list.extend(testset_list)
self.assertEqual(len(testsets_list), 4)
for testset in testsets_list:
for test in testset["testcases"]:
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_testcases_by_path_folder(self):
loader.load_test_dependencies()
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data')
testset_list_1 = loader.load_testcases(path)
self.assertGreater(len(testset_list_1), 4)
# relative folder path
path = 'tests/data/'
testset_list_2 = loader.load_testcases(path)
self.assertEqual(len(testset_list_1), len(testset_list_2))
# list/set container with file(s)
path = [
os.path.join(os.getcwd(), 'tests/data'),
'tests/data/'
]
testset_list_3 = loader.load_testcases(path)
self.assertEqual(len(testset_list_3), 2 * len(testset_list_1))
def test_load_testcases_by_path_not_exist(self):
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data_not_exist')
with self.assertRaises(exceptions.FileNotFound):
loader.load_testcases(path)
# relative folder path
path = 'tests/data_not_exist'
with self.assertRaises(exceptions.FileNotFound):
loader.load_testcases(path)
# list/set container with file(s)
path = [
os.path.join(os.getcwd(), 'tests/data_not_exist'),
'tests/data_not_exist/'
]
with self.assertRaises(exceptions.FileNotFound):
loader.load_testcases(path)
def test_load_testcases_by_path_layered(self):
loader.load_test_dependencies()
path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_layer.yml')
testsets_list = loader.load_testcases(path)
self.assertIn("variables", testsets_list[0]["config"])
self.assertIn("request", testsets_list[0]["config"])
self.assertIn("request", testsets_list[0]["testcases"][0])
self.assertIn("url", testsets_list[0]["testcases"][0]["request"])
self.assertIn("validate", testsets_list[0]["testcases"][0])

114
tests/test_parser.py Normal file
View File

@@ -0,0 +1,114 @@
import os
import unittest
from httprunner import parser, exceptions
class TestParser(unittest.TestCase):
def test_parse_string_value(self):
self.assertEqual(parser.parse_string_value("123"), 123)
self.assertEqual(parser.parse_string_value("12.3"), 12.3)
self.assertEqual(parser.parse_string_value("a123"), "a123")
self.assertEqual(parser.parse_string_value("$var"), "$var")
self.assertEqual(parser.parse_string_value("${func}"), "${func}")
def test_extract_variables(self):
self.assertEqual(
parser.extract_variables("$var"),
["var"]
)
self.assertEqual(
parser.extract_variables("$var123"),
["var123"]
)
self.assertEqual(
parser.extract_variables("$var_name"),
["var_name"]
)
self.assertEqual(
parser.extract_variables("var"),
[]
)
self.assertEqual(
parser.extract_variables("a$var"),
["var"]
)
self.assertEqual(
parser.extract_variables("$v ar"),
["v"]
)
self.assertEqual(
parser.extract_variables(" "),
[]
)
self.assertEqual(
parser.extract_variables("$abc*"),
["abc"]
)
self.assertEqual(
parser.extract_variables("${func()}"),
[]
)
self.assertEqual(
parser.extract_variables("${func(1,2)}"),
[]
)
self.assertEqual(
parser.extract_variables("${gen_md5($TOKEN, $data, $random)}"),
["TOKEN", "data", "random"]
)
def test_parse_function(self):
self.assertEqual(
parser.parse_function("func()"),
{'func_name': 'func', 'args': [], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function("func(5)"),
{'func_name': 'func', 'args': [5], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function("func(1, 2)"),
{'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function("func(a=1, b=2)"),
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
)
self.assertEqual(
parser.parse_function("func(a= 1, b =2)"),
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
)
self.assertEqual(
parser.parse_function("func(1, 2, a=3, b=4)"),
{'func_name': 'func', 'args': [1, 2], 'kwargs': {'a': 3, 'b': 4}}
)
self.assertEqual(
parser.parse_function("func($request, 123)"),
{'func_name': 'func', 'args': ["$request", 123], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function("func( )"),
{'func_name': 'func', 'args': [], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function("func(hello world, a=3, b=4)"),
{'func_name': 'func', 'args': ["hello world"], 'kwargs': {'a': 3, 'b': 4}}
)
self.assertEqual(
parser.parse_function("func($request, 12 3)"),
{'func_name': 'func', 'args': ["$request", '12 3'], 'kwargs': {}}
)
def test_parse_validator(self):
validator = {"check": "status_code", "comparator": "eq", "expect": 201}
self.assertEqual(
parser.parse_validator(validator),
{"check": "status_code", "comparator": "eq", "expect": 201}
)
validator = {'eq': ['status_code', 201]}
self.assertEqual(
parser.parse_validator(validator),
{"check": "status_code", "comparator": "eq", "expect": 201}
)

View File

@@ -1,9 +1,8 @@
import os
import time
from httprunner import HttpRunner, exceptions, runner
from httprunner.testcase import TestcaseLoader
from httprunner.utils import FileUtils, deep_update_dict
from httprunner import HttpRunner, exceptions, loader, runner
from httprunner.utils import deep_update_dict
from tests.base import HTTPBIN_SERVER, ApiServerUnittest
@@ -27,7 +26,7 @@ class TestRunner(ApiServerUnittest):
def test_run_single_testcase(self):
for testcase_file_path in self.testcase_file_path_list:
testcases = FileUtils.load_file(testcase_file_path)
testcases = loader.load_file(testcase_file_path)
config_dict = {
"path": testcase_file_path
@@ -188,6 +187,7 @@ class TestRunner(ApiServerUnittest):
{"eq": ["content.headers.Host", "127.0.0.1:8888"]},
{"eq": ["text.headers.Host", "127.0.0.1:8888"]},
{"eq": ["new_attribute", "new_attribute_value"]},
{"eq": ["new_attribute_dict", {"key": 123}]},
{"eq": ["new_attribute_dict.key", 123]}
]
}
@@ -380,7 +380,7 @@ class TestRunner(ApiServerUnittest):
def test_run_testcase_with_empty_header(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/test_bugfix.yml')
testsets = TestcaseLoader.load_testsets_by_path(testcase_file_path)
testsets = loader.load_testcases(testcase_file_path)
testset = testsets[0]
config_dict_headers = testset["config"]["request"]["headers"]
test_dict_headers = testset["testcases"][0]["request"]["headers"]
@@ -393,7 +393,7 @@ class TestRunner(ApiServerUnittest):
def test_bugfix_type_match(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/test_bugfix.yml')
testcases = FileUtils.load_file(testcase_file_path)
testcases = loader.load_file(testcase_file_path)
config_dict = {
"path": testcase_file_path
}

View File

@@ -1,7 +1,6 @@
import os
from httprunner import task
from httprunner.testcase import TestcaseLoader
from httprunner import loader, task
from tests.base import ApiServerUnittest
@@ -17,7 +16,7 @@ class TestTask(ApiServerUnittest):
def test_create_suite(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo_testset_variables.yml')
testset = TestcaseLoader.load_test_file(testcase_file_path)
testset = loader.load_test_file(testcase_file_path)
suite = task.TestSuite(testset)
self.assertEqual(suite.countTestCases(), 3)
for testcase in suite:

View File

@@ -2,193 +2,7 @@ import os
import time
import unittest
from httprunner import testcase
from httprunner.exceptions import ApiNotFound, ParamsError, SuiteNotFound
from httprunner.testcase import TestcaseLoader
class TestTestcaseLoader(unittest.TestCase):
def setUp(self):
TestcaseLoader.overall_def_dict = {
"api": {},
"suite": {}
}
def test_load_test_dependencies(self):
TestcaseLoader.load_test_dependencies()
overall_def_dict = TestcaseLoader.overall_def_dict
self.assertIn("get_token", overall_def_dict["api"])
self.assertIn("create_and_check", overall_def_dict["suite"])
def test_load_api_file(self):
TestcaseLoader.load_api_file("tests/api/basic.yml")
overall_api_def_dict = TestcaseLoader.overall_def_dict["api"]
self.assertIn("get_token",overall_api_def_dict)
self.assertEqual("/api/get-token", overall_api_def_dict["get_token"]["request"]["url"])
self.assertIn("$user_agent", overall_api_def_dict["get_token"]["function_meta"]["args"])
self.assertEqual(len(overall_api_def_dict["get_token"]["validate"]), 3)
def test_load_test_file_suite(self):
TestcaseLoader.load_api_file("tests/api/basic.yml")
testset = TestcaseLoader.load_test_file("tests/suite/create_and_get.yml")
self.assertEqual(testset["config"]["name"], "create user and check result.")
self.assertEqual(len(testset["testcases"]), 3)
self.assertEqual(testset["testcases"][0]["name"], "make sure user $uid does not exist")
self.assertEqual(testset["testcases"][0]["request"]["url"], "/api/users/$uid")
def test_load_test_file_testcase(self):
TestcaseLoader.load_test_dependencies()
testset = TestcaseLoader.load_test_file("tests/testcases/smoketest.yml")
self.assertEqual(testset["config"]["name"], "smoketest")
self.assertEqual(testset["config"]["path"], "tests/testcases/smoketest.yml")
self.assertIn("device_sn", testset["config"]["variables"][0])
self.assertEqual(len(testset["testcases"]), 8)
self.assertEqual(testset["testcases"][0]["name"], "get token")
def test_get_block_by_name(self):
TestcaseLoader.load_test_dependencies()
ref_call = "get_user($uid, $token)"
block = TestcaseLoader._get_block_by_name(ref_call, "api")
self.assertEqual(block["request"]["url"], "/api/users/$uid")
self.assertEqual(block["function_meta"]["func_name"], "get_user")
self.assertEqual(block["function_meta"]["args"], ['$uid', '$token'])
def test_get_block_by_name_args_mismatch(self):
TestcaseLoader.load_test_dependencies()
ref_call = "get_user($uid, $token, $var)"
with self.assertRaises(ParamsError):
TestcaseLoader._get_block_by_name(ref_call, "api")
def test_get_test_definition_api(self):
TestcaseLoader.load_test_dependencies()
api_def = TestcaseLoader._get_test_definition("get_headers", "api")
self.assertEqual(api_def["request"]["url"], "/headers")
self.assertEqual(len(api_def["setup_hooks"]), 2)
self.assertEqual(len(api_def["teardown_hooks"]), 1)
with self.assertRaises(ApiNotFound):
TestcaseLoader._get_test_definition("get_token_XXX", "api")
def test_get_test_definition_suite(self):
TestcaseLoader.load_test_dependencies()
api_def = TestcaseLoader._get_test_definition("create_and_check", "suite")
self.assertEqual(api_def["config"]["name"], "create user and check result.")
with self.assertRaises(SuiteNotFound):
TestcaseLoader._get_test_definition("create_and_check_XXX", "suite")
def test_override_block(self):
TestcaseLoader.load_test_dependencies()
def_block = TestcaseLoader._get_block_by_name("get_token($user_agent, $device_sn, $os_platform, $app_version)", "api")
test_block = {
"name": "override block",
"variables": [
{"var": 123}
],
'request': {
'url': '/api/get-token', 'method': 'POST', 'headers': {'user_agent': '$user_agent', 'device_sn': '$device_sn', 'os_platform': '$os_platform', 'app_version': '$app_version'}, 'json': {'sign': '${get_sign($user_agent, $device_sn, $os_platform, $app_version)}'}},
'validate': [
{'eq': ['status_code', 201]},
{'len_eq': ['content.token', 32]}
]
}
TestcaseLoader._override_block(def_block, test_block)
self.assertEqual(test_block["name"], "override block")
self.assertIn({'check': 'status_code', 'expect': 201, 'comparator': 'eq'}, test_block["validate"])
self.assertIn({'check': 'content.token', 'comparator': 'len_eq', 'expect': 32}, test_block["validate"])
def test_load_testcases_by_path_files(self):
testsets_list = []
# absolute file path
path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
testset_list = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(len(testset_list), 1)
self.assertIn("path", testset_list[0]["config"])
self.assertEqual(testset_list[0]["config"]["path"], path)
self.assertEqual(len(testset_list[0]["testcases"]), 3)
testsets_list.extend(testset_list)
# relative file path
path = 'tests/data/demo_testset_hardcode.yml'
testset_list = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(len(testset_list), 1)
self.assertIn("path", testset_list[0]["config"])
self.assertIn(path, testset_list[0]["config"]["path"])
self.assertEqual(len(testset_list[0]["testcases"]), 3)
testsets_list.extend(testset_list)
# list/set container with file(s)
path = [
os.path.join(os.getcwd(), 'tests/data/demo_testset_hardcode.json'),
'tests/data/demo_testset_hardcode.yml'
]
testset_list = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(len(testset_list), 2)
self.assertEqual(len(testset_list[0]["testcases"]), 3)
self.assertEqual(len(testset_list[1]["testcases"]), 3)
testsets_list.extend(testset_list)
self.assertEqual(len(testsets_list), 4)
for testset in testsets_list:
for test in testset["testcases"]:
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_testcases_by_path_folder(self):
TestcaseLoader.load_test_dependencies()
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data')
testset_list_1 = TestcaseLoader.load_testsets_by_path(path)
self.assertGreater(len(testset_list_1), 4)
# relative folder path
path = 'tests/data/'
testset_list_2 = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(len(testset_list_1), len(testset_list_2))
# list/set container with file(s)
path = [
os.path.join(os.getcwd(), 'tests/data'),
'tests/data/'
]
testset_list_3 = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(len(testset_list_3), 2 * len(testset_list_1))
def test_load_testcases_by_path_not_exist(self):
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data_not_exist')
testset_list_1 = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(testset_list_1, [])
# relative folder path
path = 'tests/data_not_exist'
testset_list_2 = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(testset_list_2, [])
# list/set container with file(s)
path = [
os.path.join(os.getcwd(), 'tests/data_not_exist'),
'tests/data_not_exist/'
]
testset_list_3 = TestcaseLoader.load_testsets_by_path(path)
self.assertEqual(testset_list_3, [])
def test_load_testcases_by_path_layered(self):
TestcaseLoader.load_test_dependencies()
path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_layer.yml')
testsets_list = TestcaseLoader.load_testsets_by_path(path)
self.assertIn("variables", testsets_list[0]["config"])
self.assertIn("request", testsets_list[0]["config"])
self.assertIn("request", testsets_list[0]["testcases"][0])
self.assertIn("url", testsets_list[0]["testcases"][0]["request"])
self.assertIn("validate", testsets_list[0]["testcases"][0])
from httprunner import exceptions, loader, testcase
class TestcaseParserUnittest(unittest.TestCase):
@@ -306,51 +120,6 @@ class TestcaseParserUnittest(unittest.TestCase):
3 * 2 * 3
)
def test_extract_variables(self):
self.assertEqual(
testcase.extract_variables("$var"),
["var"]
)
self.assertEqual(
testcase.extract_variables("$var123"),
["var123"]
)
self.assertEqual(
testcase.extract_variables("$var_name"),
["var_name"]
)
self.assertEqual(
testcase.extract_variables("var"),
[]
)
self.assertEqual(
testcase.extract_variables("a$var"),
["var"]
)
self.assertEqual(
testcase.extract_variables("$v ar"),
["v"]
)
self.assertEqual(
testcase.extract_variables(" "),
[]
)
self.assertEqual(
testcase.extract_variables("$abc*"),
["abc"]
)
self.assertEqual(
testcase.extract_variables("${func()}"),
[]
)
self.assertEqual(
testcase.extract_variables("${func(1,2)}"),
[]
)
self.assertEqual(
testcase.extract_variables("${gen_md5($TOKEN, $data, $random)}"),
["TOKEN", "data", "random"]
)
def test_eval_content_variables(self):
variables = {
@@ -414,62 +183,13 @@ class TestcaseParserUnittest(unittest.TestCase):
def test_eval_content_variables_search_upward(self):
testcase_parser = testcase.TestcaseParser()
with self.assertRaises(ParamsError):
with self.assertRaises(exceptions.ParamsError):
testcase_parser._eval_content_variables("/api/$SECRET_KEY")
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
content = testcase_parser._eval_content_variables("/api/$SECRET_KEY")
self.assertEqual(content, "/api/DebugTalk")
def test_parse_string_value(self):
self.assertEqual(testcase.parse_string_value("123"), 123)
self.assertEqual(testcase.parse_string_value("12.3"), 12.3)
self.assertEqual(testcase.parse_string_value("a123"), "a123")
self.assertEqual(testcase.parse_string_value("$var"), "$var")
self.assertEqual(testcase.parse_string_value("${func}"), "${func}")
def test_parse_function(self):
self.assertEqual(
testcase.parse_function("func()"),
{'func_name': 'func', 'args': [], 'kwargs': {}}
)
self.assertEqual(
testcase.parse_function("func(5)"),
{'func_name': 'func', 'args': [5], 'kwargs': {}}
)
self.assertEqual(
testcase.parse_function("func(1, 2)"),
{'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
)
self.assertEqual(
testcase.parse_function("func(a=1, b=2)"),
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
)
self.assertEqual(
testcase.parse_function("func(a= 1, b =2)"),
{'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
)
self.assertEqual(
testcase.parse_function("func(1, 2, a=3, b=4)"),
{'func_name': 'func', 'args': [1, 2], 'kwargs': {'a': 3, 'b': 4}}
)
self.assertEqual(
testcase.parse_function("func($request, 123)"),
{'func_name': 'func', 'args': ["$request", 123], 'kwargs': {}}
)
self.assertEqual(
testcase.parse_function("func( )"),
{'func_name': 'func', 'args': [], 'kwargs': {}}
)
self.assertEqual(
testcase.parse_function("func(hello world, a=3, b=4)"),
{'func_name': 'func', 'args': ["hello world"], 'kwargs': {'a': 3, 'b': 4}}
)
self.assertEqual(
testcase.parse_function("func($request, 12 3)"),
{'func_name': 'func', 'args': ["$request", '12 3'], 'kwargs': {}}
)
def test_parse_content_with_bindings_variables(self):
variables = {
@@ -486,7 +206,7 @@ class TestcaseParserUnittest(unittest.TestCase):
"123str_value1/456"
)
with self.assertRaises(ParamsError):
with self.assertRaises(exceptions.ParamsError):
testcase_parser.eval_content_with_bindings("$str_3")
self.assertEqual(
@@ -596,7 +316,7 @@ class TestcaseParserUnittest(unittest.TestCase):
def test_eval_content_functions_search_upward(self):
testcase_parser = testcase.TestcaseParser()
with self.assertRaises(ParamsError):
with self.assertRaises(exceptions.ParamsError):
testcase_parser._eval_content_functions("/api/${gen_md5(abc)}")
testcase_parser.file_path = "tests/data/demo_testset_hardcode.yml"
@@ -649,105 +369,6 @@ class TestcaseParserUnittest(unittest.TestCase):
3
)
def test_substitute_variables_with_mapping(self):
content = {
'request': {
'url': '/api/users/$uid',
'method': "$method",
'headers': {'token': '$token'},
'data': {
"null": None,
"true": True,
"false": False,
"empty_str": ""
}
}
}
mapping = {
"$uid": 1000,
"$method": "POST"
}
result = testcase.substitute_variables_with_mapping(content, mapping)
self.assertEqual("/api/users/1000", result["request"]["url"])
self.assertEqual("$token", result["request"]["headers"]["token"])
self.assertEqual("POST", result["request"]["method"])
self.assertIsNone(result["request"]["data"]["null"])
self.assertTrue(result["request"]["data"]["true"])
self.assertFalse(result["request"]["data"]["false"])
self.assertEqual("", result["request"]["data"]["empty_str"])
def test_parse_validator(self):
validator = {"check": "status_code", "comparator": "eq", "expect": 201}
self.assertEqual(
testcase.parse_validator(validator),
{"check": "status_code", "comparator": "eq", "expect": 201}
)
validator = {'eq': ['status_code', 201]}
self.assertEqual(
testcase.parse_validator(validator),
{"check": "status_code", "comparator": "eq", "expect": 201}
)
def test_merge_validator(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
merged_validators = testcase._merge_validator(def_validators, current_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "eq"},
merged_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "len_eq"},
merged_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "len_eq"},
merged_validators
)
def test_merge_validator_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
merged_validators = testcase._merge_validator(def_validators, current_validators)
self.assertEqual(len(merged_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'eq'}, merged_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'eq'}, merged_validators)
def test_merge_extractor(self):
api_extrators = [{"var1": "val1"}, {"var2": "val2"}]
current_extractors = [{"var1": "val111"}, {"var3": "val3"}]
merged_extractors = testcase._merge_extractor(api_extrators, current_extractors)
self.assertIn(
{"var1": "val111"},
merged_extractors
)
self.assertIn(
{"var2": "val2"},
merged_extractors
)
self.assertIn(
{"var3": "val3"},
merged_extractors
)
def test_is_testsets(self):
data_structure = "path/to/file"
self.assertFalse(testcase.is_testsets(data_structure))

View File

@@ -4,138 +4,9 @@ import unittest
from httprunner import exceptions, utils
from httprunner.compat import OrderedDict
from httprunner.utils import FileUtils
from tests.base import ApiServerUnittest
class TestFileUtils(unittest.TestCase):
def test_load_yaml_file_file_format_error(self):
yaml_tmp_file = "tests/data/tmp.yml"
# create empty yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
FileUtils._load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
# create invalid format yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
FileUtils._load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
def test_load_json_file_file_format_error(self):
json_tmp_file = "tests/data/tmp.json"
# create empty file
with open(json_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
FileUtils._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create empty json file
with open(json_tmp_file, 'w') as f:
f.write("{}")
with self.assertRaises(exceptions.FileFormatError):
FileUtils._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create invalid format json file
with open(json_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
FileUtils._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
def test_load_testcases_bad_filepath(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
with self.assertRaises(exceptions.FileNotFound):
FileUtils.load_file(testcase_file_path)
def test_load_json_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_hardcode.json')
testcases = FileUtils.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_yaml_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testset_hardcode.yml')
testcases = FileUtils.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_csv_file_one_parameter(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/user_agent.csv')
csv_content = FileUtils.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'user_agent': 'iOS/10.1'},
{'user_agent': 'iOS/10.2'},
{'user_agent': 'iOS/10.3'}
]
)
def test_load_csv_file_multiple_parameters(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/account.csv')
csv_content = FileUtils.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
)
def test_load_folder_files(self):
folder = os.path.join(os.getcwd(), 'tests')
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
file2 = os.path.join(os.getcwd(), 'tests', 'data', 'demo_binds.yml')
files = FileUtils.load_folder_files(folder, recursive=False)
self.assertNotIn(file2, files)
files = FileUtils.load_folder_files(folder)
self.assertIn(file2, files)
self.assertNotIn(file1, files)
files = FileUtils.load_folder_files(folder)
api_file = os.path.join(os.getcwd(), 'tests', 'api', 'basic.yml')
self.assertIn(api_file, files)
files = FileUtils.load_folder_files("not_existed_foulder", recursive=False)
self.assertEqual([], files)
files = FileUtils.load_folder_files(file2, recursive=False)
self.assertEqual([], files)
class TestUtils(ApiServerUnittest):
def test_remove_prefix(self):
@@ -190,6 +61,90 @@ class TestUtils(ApiServerUnittest):
result = utils.query_json(json_content, query)
self.assertEqual(result, "L")
def test_substitute_variables_with_mapping(self):
content = {
'request': {
'url': '/api/users/$uid',
'method': "$method",
'headers': {'token': '$token'},
'data': {
"null": None,
"true": True,
"false": False,
"empty_str": ""
}
}
}
mapping = {
"$uid": 1000,
"$method": "POST"
}
result = utils.substitute_variables_with_mapping(content, mapping)
self.assertEqual("/api/users/1000", result["request"]["url"])
self.assertEqual("$token", result["request"]["headers"]["token"])
self.assertEqual("POST", result["request"]["method"])
self.assertIsNone(result["request"]["data"]["null"])
self.assertTrue(result["request"]["data"]["true"])
self.assertFalse(result["request"]["data"]["false"])
self.assertEqual("", result["request"]["data"]["empty_str"])
def test_merge_validator(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
merged_validators = utils._merge_validator(def_validators, current_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "eq"},
merged_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "len_eq"},
merged_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "len_eq"},
merged_validators
)
def test_merge_validator_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
merged_validators = utils._merge_validator(def_validators, current_validators)
self.assertEqual(len(merged_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'eq'}, merged_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'eq'}, merged_validators)
def test_merge_extractor(self):
api_extrators = [{"var1": "val1"}, {"var2": "val2"}]
current_extractors = [{"var1": "val111"}, {"var3": "val3"}]
merged_extractors = utils._merge_extractor(api_extrators, current_extractors)
self.assertIn(
{"var1": "val111"},
merged_extractors
)
self.assertIn(
{"var2": "val2"},
merged_extractors
)
self.assertIn(
{"var3": "val3"},
merged_extractors
)
def test_get_uniform_comparator(self):
self.assertEqual(utils.get_uniform_comparator("eq"), "equals")
self.assertEqual(utils.get_uniform_comparator("=="), "equals")