Merge branch 'master' into master

This commit is contained in:
debugtalk
2019-12-11 16:16:34 +08:00
committed by GitHub
35 changed files with 1677 additions and 1530 deletions

View File

@@ -1,5 +1,19 @@
# Release History
## 2.4.0 (2019-12-11)
**Added**
- feat: validate with python script, ref #773
**Changed**
- refactor: make loader as submodule, split to check/locate/load/buildup
- refactor: make built_in as submodule, split to comparators and functions
- refactor: adjust code for context and validator
- docs: update cli argument help
- adjust format code, remove unused import
## 2.3.3 (2019-12-04)
**Fixed**

View File

@@ -1,4 +1,4 @@
__version__ = "2.3.3"
__version__ = "2.4.0"
__description__ = "One-stop solution for HTTP(S) testing."
__all__ = ["__version__", "__description__"]

View File

@@ -2,7 +2,7 @@ import os
import unittest
from httprunner import (__version__, exceptions, loader, logger, parser,
report, runner, utils, validator)
report, runner, utils)
class HttpRunner(object):
@@ -260,7 +260,7 @@ class HttpRunner(object):
"""
# load tests
self.exception_stage = "load tests"
tests_mapping = loader.load_tests(path, dot_env_path)
tests_mapping = loader.load_cases(path, dot_env_path)
if mapping:
tests_mapping["project_mapping"]["variables"] = mapping
@@ -280,9 +280,9 @@ class HttpRunner(object):
"""
logger.log_info("HttpRunner version: {}".format(__version__))
if validator.is_testcase_path(path_or_tests):
if loader.is_testcase_path(path_or_tests):
return self.run_path(path_or_tests, dot_env_path, mapping)
elif validator.is_testcases(path_or_tests):
elif loader.is_testcases(path_or_tests):
return self.run_tests(path_or_tests)
else:
raise exceptions.ParamsError("Invalid testcase path or testcases: {}".format(path_or_tests))

View File

@@ -1,206 +0,0 @@
# encoding: utf-8
"""
Built-in dependent functions used in YAML/JSON testcases.
"""
import datetime
import os
import random
import re
import string
import time
import filetype
from requests_toolbelt import MultipartEncoder
from httprunner.compat import basestring, builtin_str, integer_types
from httprunner.exceptions import ParamsError
PWD = os.getcwd()
###############################################################################
## built-in functions
###############################################################################
def gen_random_string(str_len):
""" generate random string with specified length
"""
return ''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(str_len))
def get_timestamp(str_len=13):
""" get timestamp string, length can only between 0 and 16
"""
if isinstance(str_len, integer_types) and 0 < str_len < 17:
return builtin_str(time.time()).replace(".", "")[:str_len]
raise ParamsError("timestamp length can only between 0 and 16.")
def get_current_date(fmt="%Y-%m-%d"):
""" get current date, default format is %Y-%m-%d
"""
return datetime.datetime.now().strftime(fmt)
def sleep(n_secs):
""" sleep n seconds
"""
time.sleep(n_secs)
###############################################################################
## upload files with requests-toolbelt
# e.g.
# - test:
# name: upload file
# variables:
# file_path: "data/test.env"
# multipart_encoder: ${multipart_encoder(file=$file_path)}
# request:
# url: /post
# method: POST
# headers:
# Content-Type: ${multipart_content_type($multipart_encoder)}
# data: $multipart_encoder
# validate:
# - eq: ["status_code", 200]
# - startswith: ["content.files.file", "UserName=test"]
###############################################################################
def multipart_encoder(**kwargs):
""" initialize MultipartEncoder with uploading fields.
"""
def get_filetype(file_path):
file_type = filetype.guess(file_path)
if file_type:
return file_type.mime
else:
return "text/html"
fields_dict = {}
for key, value in kwargs.items():
if os.path.isabs(value):
_file_path = value
is_file = True
else:
global PWD
_file_path = os.path.join(PWD, value)
is_file = os.path.isfile(_file_path)
if is_file:
filename = os.path.basename(_file_path)
with open(_file_path, 'rb') as f:
mime_type = get_filetype(_file_path)
fields_dict[key] = (filename, f.read(), mime_type)
else:
fields_dict[key] = value
return MultipartEncoder(fields=fields_dict)
def multipart_content_type(multipart_encoder):
""" prepare Content-Type for request headers
"""
return multipart_encoder.content_type
###############################################################################
## built-in comparators
###############################################################################
def equals(check_value, expect_value):
assert check_value == expect_value
def less_than(check_value, expect_value):
assert check_value < expect_value
def less_than_or_equals(check_value, expect_value):
assert check_value <= expect_value
def greater_than(check_value, expect_value):
assert check_value > expect_value
def greater_than_or_equals(check_value, expect_value):
assert check_value >= expect_value
def not_equals(check_value, expect_value):
assert check_value != expect_value
def string_equals(check_value, expect_value):
assert builtin_str(check_value) == builtin_str(expect_value)
def length_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) == expect_value
def length_greater_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) > expect_value
def length_greater_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) >= expect_value
def length_less_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) < expect_value
def length_less_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) <= expect_value
def contains(check_value, expect_value):
assert isinstance(check_value, (list, tuple, dict, basestring))
assert expect_value in check_value
def contained_by(check_value, expect_value):
assert isinstance(expect_value, (list, tuple, dict, basestring))
assert check_value in expect_value
def type_match(check_value, expect_value):
def get_type(name):
if isinstance(name, type):
return name
elif isinstance(name, basestring):
try:
return __builtins__[name]
except KeyError:
raise ValueError(name)
else:
raise ValueError(name)
assert isinstance(check_value, get_type(expect_value))
def regex_match(check_value, expect_value):
assert isinstance(expect_value, basestring)
assert isinstance(check_value, basestring)
assert re.match(expect_value, check_value)
def startswith(check_value, expect_value):
assert builtin_str(check_value).startswith(builtin_str(expect_value))
def endswith(check_value, expect_value):
assert builtin_str(check_value).endswith(builtin_str(expect_value))

View File

@@ -0,0 +1,2 @@
from httprunner.builtin.comparators import *
from httprunner.builtin.functions import *

View File

@@ -0,0 +1,99 @@
"""
Built-in validate comparators.
"""
import re
from httprunner.compat import basestring, builtin_str, integer_types
def equals(check_value, expect_value):
assert check_value == expect_value
def less_than(check_value, expect_value):
assert check_value < expect_value
def less_than_or_equals(check_value, expect_value):
assert check_value <= expect_value
def greater_than(check_value, expect_value):
assert check_value > expect_value
def greater_than_or_equals(check_value, expect_value):
assert check_value >= expect_value
def not_equals(check_value, expect_value):
assert check_value != expect_value
def string_equals(check_value, expect_value):
assert builtin_str(check_value) == builtin_str(expect_value)
def length_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) == expect_value
def length_greater_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) > expect_value
def length_greater_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) >= expect_value
def length_less_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) < expect_value
def length_less_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) <= expect_value
def contains(check_value, expect_value):
assert isinstance(check_value, (list, tuple, dict, basestring))
assert expect_value in check_value
def contained_by(check_value, expect_value):
assert isinstance(expect_value, (list, tuple, dict, basestring))
assert check_value in expect_value
def type_match(check_value, expect_value):
def get_type(name):
if isinstance(name, type):
return name
elif isinstance(name, basestring):
try:
return __builtins__[name]
except KeyError:
raise ValueError(name)
else:
raise ValueError(name)
assert isinstance(check_value, get_type(expect_value))
def regex_match(check_value, expect_value):
assert isinstance(expect_value, basestring)
assert isinstance(check_value, basestring)
assert re.match(expect_value, check_value)
def startswith(check_value, expect_value):
assert builtin_str(check_value).startswith(builtin_str(expect_value))
def endswith(check_value, expect_value):
assert builtin_str(check_value).endswith(builtin_str(expect_value))

View File

@@ -0,0 +1,105 @@
"""
Built-in functions used in YAML/JSON testcases.
"""
import datetime
import os
import random
import string
import time
import filetype
from requests_toolbelt import MultipartEncoder
from httprunner.compat import builtin_str, integer_types
from httprunner.exceptions import ParamsError
PWD = os.getcwd()
def gen_random_string(str_len):
""" generate random string with specified length
"""
return ''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(str_len))
def get_timestamp(str_len=13):
""" get timestamp string, length can only between 0 and 16
"""
if isinstance(str_len, integer_types) and 0 < str_len < 17:
return builtin_str(time.time()).replace(".", "")[:str_len]
raise ParamsError("timestamp length can only between 0 and 16.")
def get_current_date(fmt="%Y-%m-%d"):
""" get current date, default format is %Y-%m-%d
"""
return datetime.datetime.now().strftime(fmt)
def sleep(n_secs):
""" sleep n seconds
"""
time.sleep(n_secs)
"""
upload files with requests-toolbelt
e.g.
- test:
name: upload file
variables:
file_path: "data/test.env"
multipart_encoder: ${multipart_encoder(file=$file_path)}
request:
url: /post
method: POST
headers:
Content-Type: ${multipart_content_type($multipart_encoder)}
data: $multipart_encoder
validate:
- eq: ["status_code", 200]
- startswith: ["content.files.file", "UserName=test"]
"""
def multipart_encoder(**kwargs):
""" initialize MultipartEncoder with uploading fields.
"""
def get_filetype(file_path):
file_type = filetype.guess(file_path)
if file_type:
return file_type.mime
else:
return "text/html"
fields_dict = {}
for key, value in kwargs.items():
if os.path.isabs(value):
_file_path = value
is_file = True
else:
global PWD
_file_path = os.path.join(PWD, value)
is_file = os.path.isfile(_file_path)
if is_file:
filename = os.path.basename(_file_path)
with open(_file_path, 'rb') as f:
mime_type = get_filetype(_file_path)
fields_dict[key] = (filename, f.read(), mime_type)
else:
fields_dict[key] = value
return MultipartEncoder(fields=fields_dict)
def multipart_content_type(multipart_encoder):
""" prepare Content-Type for request headers
"""
return multipart_encoder.content_type

View File

@@ -5,11 +5,11 @@ import sys
from httprunner import __description__, __version__
from httprunner.api import HttpRunner
from httprunner.compat import is_py2
from httprunner.loader import validate_json_file
from httprunner.logger import color_print
from httprunner.report import gen_html_report
from httprunner.utils import (create_scaffold, get_python2_retire_msg,
prettify_json_file)
from httprunner.validator import validate_json_file
def main():
@@ -23,8 +23,8 @@ def main():
'-V', '--version', dest='version', action='store_true',
help="show version")
parser.add_argument(
'testcase_paths', nargs='*',
help="testcase file path")
'testfile_paths', nargs='*',
help="Specify api/testcase/testsuite file paths to run.")
parser.add_argument(
'--log-level', default='INFO',
help="Specify logging level, default is INFO.")
@@ -36,19 +36,19 @@ def main():
help="Specify .env file path, which is useful for keeping sensitive data.")
parser.add_argument(
'--report-template',
help="specify report template path.")
help="Specify report template path.")
parser.add_argument(
'--report-dir',
help="specify report save directory.")
help="Specify report save directory.")
parser.add_argument(
'--report-file',
help="specify report file path, this has higher priority than specifying report dir.")
help="Specify report file path, this has higher priority than specifying report dir.")
parser.add_argument(
'--save-tests', action='store_true', default=False,
help="Save loaded/parsed/summary json data to JSON files.")
parser.add_argument(
'--failfast', action='store_true', default=False,
help="Stop the test run on the first error or failure.")
parser.add_argument(
'--save-tests', action='store_true', default=False,
help="Save loaded tests and parsed tests to JSON file.")
parser.add_argument(
'--startproject',
help="Specify new project name.")
@@ -96,9 +96,9 @@ def main():
report_dir = args.report_dir or os.path.join(runner.project_working_directory, "reports")
gen_html_report(
summary,
args.report_template,
report_dir,
args.report_file
report_template=args.report_template,
report_dir=report_dir,
report_file=args.report_file
)
err_code |= (0 if summary and summary["success"] else 1)
except Exception:

View File

@@ -1,4 +1,4 @@
from httprunner import exceptions, logger, parser, utils
from httprunner import parser, utils
class SessionContext(object):
@@ -13,11 +13,12 @@ class SessionContext(object):
>>> context.update_session_variables(variables)
"""
def __init__(self, variables=None):
variables_mapping = utils.ensure_mapping_format(variables or {})
self.session_variables_mapping = parser.parse_variables_mapping(variables_mapping)
self.test_variables_mapping = {}
self.init_test_variables()
self.validation_results = []
def init_test_variables(self, variables_mapping=None):
""" init test variables, called when each test(api) starts.
@@ -61,110 +62,3 @@ class SessionContext(object):
content may be in any data structure, include dict, list, tuple, number, string, etc.
"""
return parser.parse_lazy_data(content, self.test_variables_mapping)
def __eval_validator_check(self, check_item, resp_obj):
""" evaluate check item in validator.
Args:
check_item: check_item should only be the following 5 formats:
1, variable reference, e.g. $token
2, function reference, e.g. ${is_status_code_200($status_code)}
3, dict or list, maybe containing variable/function reference, e.g. {"var": "$abc"}
4, string joined by delimiter. e.g. "status_code", "headers.content-type"
5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
resp_obj: response object
"""
if isinstance(check_item, (dict, list)) \
or isinstance(check_item, parser.LazyString):
# format 1/2/3
check_value = self.eval_content(check_item)
else:
# format 4/5
check_value = resp_obj.extract_field(check_item)
return check_value
def __eval_validator_expect(self, expect_item):
""" evaluate expect item in validator.
Args:
expect_item: expect_item should only be in 2 types:
1, variable reference, e.g. $expect_status_code
2, actual value, e.g. 200
"""
expect_value = self.eval_content(expect_item)
return expect_value
def validate(self, validators, resp_obj):
""" make validation with comparators
"""
self.validation_results = []
if not validators:
return
logger.log_debug("start to validate.")
validate_pass = True
failures = []
for validator in validators:
# validator should be LazyFunction object
if not isinstance(validator, parser.LazyFunction):
raise exceptions.ValidationFailure(
"validator should be parsed first: {}".format(validators))
# evaluate validator args with context variable mapping.
validator_args = validator.get_args()
check_item, expect_item = validator_args
check_value = self.__eval_validator_check(
check_item,
resp_obj
)
expect_value = self.__eval_validator_expect(expect_item)
validator.update_args([check_value, expect_value])
comparator = validator.func_name
validator_dict = {
"comparator": comparator,
"check": check_item,
"check_value": check_value,
"expect": expect_item,
"expect_value": expect_value
}
validate_msg = "\nvalidate: {} {} {}({})".format(
check_item,
comparator,
expect_value,
type(expect_value).__name__
)
try:
validator.to_value(self.test_variables_mapping)
validator_dict["check_result"] = "pass"
validate_msg += "\t==> pass"
logger.log_debug(validate_msg)
except (AssertionError, TypeError):
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += "\n{}({}) {} {}({})".format(
check_value,
type(check_value).__name__,
comparator,
expect_value,
type(expect_value).__name__
)
logger.log_error(validate_msg)
failures.append(validate_msg)
self.validation_results.append(validator_dict)
# restore validator args, in case of running multiple times
validator.update_args(validator_args)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise exceptions.ValidationFailure(failures_string)

View File

@@ -0,0 +1,23 @@
"""
HttpRunner loader
- check: validate testcase data structure with JSON schema (TODO)
- locate: locate debugtalk.py, make it's dir as project root path
- load: load testcase files and relevant data, including debugtalk.py, .env, yaml/json api/testcases, csv, etc.
- buildup: assemble loaded content to httprunner testcase/testsuite data structure
"""
from httprunner.loader.check import is_testcase_path, is_testcases, validate_json_file
from httprunner.loader.load import load_csv_file, load_builtin_functions
from httprunner.loader.buildup import load_cases, load_project_data
__all__ = [
"is_testcase_path",
"is_testcases",
"validate_json_file",
"load_csv_file",
"load_builtin_functions",
"load_project_data",
"load_cases"
]

View File

@@ -1,277 +1,16 @@
import csv
import importlib
import io
import json
import os
import sys
import yaml
from httprunner import exceptions, logger, utils
from httprunner.builtin import functions
from httprunner.loader.load import load_module_functions, load_folder_content, load_file, load_dot_env_file, \
load_folder_files
from httprunner.loader.locate import init_project_working_directory, get_project_working_directory
from httprunner import built_in, exceptions, logger, utils, validator
try:
# PyYAML version >= 5.1
# ref: https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation
yaml.warnings({'YAMLLoadWarning': False})
except AttributeError:
pass
###############################################################################
# file loader
###############################################################################
def _check_format(file_path, content):
""" check testcase format if valid
"""
# TODO: replace with JSON schema validation
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
def load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
_check_format(yaml_file, yaml_content)
return yaml_content
def load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
_check_format(json_file, json_content)
return json_content
def load_csv_file(csv_file):
""" load csv file and check file content format
Args:
csv_file (str): csv file path, csv file content is like below:
Returns:
list: list of parameters, each parameter is in dict format
Examples:
>>> cat csv_file
username,password
test1,111111
test2,222222
test3,333333
>>> load_csv_file(csv_file)
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
if not os.path.isabs(csv_file):
project_working_directory = tests_def_mapping["PWD"] or os.getcwd()
# make compatible with Windows/Linux
csv_file = os.path.join(project_working_directory, *csv_file.split("/"))
if not os.path.isfile(csv_file):
# file path not exist
raise exceptions.CSVNotFound(csv_file)
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return load_yaml_file(file_path)
elif file_suffix == ".csv":
return load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files endswith yml/yaml/json in list.
Args:
folder_path (str): specified folder path to load
recursive (bool): load files recursively if True
Returns:
list: files endswith yml/yaml/json
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def load_dot_env_file(dot_env_path):
""" load .env file.
Args:
dot_env_path (str): .env file path
Returns:
dict: environment variables mapping
{
"UserName": "debugtalk",
"Password": "123456",
"PROJECT_KEY": "ABCDEFGH"
}
Raises:
exceptions.FileFormatError: If .env file format is invalid.
"""
if not os.path.isfile(dot_env_path):
return {}
logger.log_info("Loading environment variables from {}".format(dot_env_path))
env_variables_mapping = {}
with io.open(dot_env_path, 'r', encoding='utf-8') as fp:
for line in fp:
# maxsplit=1
if "=" in line:
variable, value = line.split("=", 1)
elif ":" in line:
variable, value = line.split(":", 1)
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
utils.set_os_environ(env_variables_mapping)
return env_variables_mapping
def locate_file(start_path, file_name):
""" locate filename and return absolute file path.
searching will be recursive upward until current working directory.
Args:
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound("invalid path: {}".format(start_path))
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
return os.path.abspath(file_path)
# current working directory
if os.path.abspath(start_dir_path) in [os.getcwd(), os.path.abspath(os.sep)]:
raise exceptions.FileNotFound("{} not found in {}".format(file_name, start_path))
# locate recursive upward
return locate_file(os.path.dirname(start_dir_path), file_name)
###############################################################################
# debugtalk.py module loader
###############################################################################
def load_module_functions(module):
""" load python module functions.
Args:
module: python module
Returns:
dict: functions mapping for specified python module
{
"func1_name": func1,
"func2_name": func2
}
"""
module_functions = {}
for name, item in vars(module).items():
if validator.is_function(item):
module_functions[name] = item
return module_functions
def load_builtin_functions():
""" load built_in module functions
"""
return load_module_functions(built_in)
tests_def_mapping = {
"api": {},
"testcases": {}
}
def load_debugtalk_functions():
@@ -291,19 +30,6 @@ def load_debugtalk_functions():
return load_module_functions(imported_module)
###############################################################################
# testcase loader
###############################################################################
project_mapping = {}
tests_def_mapping = {
"PWD": None,
"api": {},
"testcases": {}
}
def __extend_with_api_ref(raw_testinfo):
""" extend with api reference
@@ -318,7 +44,8 @@ def __extend_with_api_ref(raw_testinfo):
# 2, api sets file: one file contains a list of api definitions
if not os.path.isabs(api_name):
# make compatible with Windows/Linux
api_path = os.path.join(tests_def_mapping["PWD"], *api_name.split("/"))
pwd = get_project_working_directory()
api_path = os.path.join(pwd, *api_name.split("/"))
if os.path.isfile(api_path):
# type 1: api is defined in individual file
api_name = api_path
@@ -338,8 +65,9 @@ def __extend_with_testcase_ref(raw_testinfo):
if testcase_path not in tests_def_mapping["testcases"]:
# make compatible with Windows/Linux
pwd = get_project_working_directory()
testcase_path = os.path.join(
project_mapping["PWD"],
pwd,
*testcase_path.split("/")
)
loaded_testcase = load_file(testcase_path)
@@ -650,31 +378,6 @@ def load_test_file(path):
return loaded_content
def load_folder_content(folder_path):
""" load api/testcases/testsuites definitions from folder.
Args:
folder_path (str): api/testcases/testsuites files folder.
Returns:
dict: api definition mapping.
{
"tests/api/basic.yml": [
{"api": {"def": "api_login", "request": {}, "validate": []}},
{"api": {"def": "api_logout", "request": {}, "validate": []}}
]
}
"""
items_mapping = {}
for file_path in load_folder_files(folder_path):
items_mapping[file_path] = load_file(file_path)
return items_mapping
def load_api_folder(api_folder_path):
""" load api definitions from api folder.
@@ -724,7 +427,7 @@ def load_api_folder(api_folder_path):
for api_item in api_items:
key, api_dict = api_item.popitem()
api_id = api_dict.get("id") or api_dict.get("def") \
or api_dict.get("name")
or api_dict.get("name")
if key != "api" or not api_id:
raise exceptions.ParamsError(
"Invalid API defined in {}".format(api_file_path))
@@ -746,27 +449,7 @@ def load_api_folder(api_folder_path):
return api_definition_mapping
def locate_debugtalk_py(start_path):
""" locate debugtalk.py file
Args:
start_path (str): start locating path,
maybe testcase file path or directory path
Returns:
str: debugtalk.py file path, None if not found
"""
try:
# locate debugtalk.py file.
debugtalk_path = locate_file(start_path, "debugtalk.py")
except exceptions.FileNotFound:
debugtalk_path = None
return debugtalk_path
def load_project_tests(test_path, dot_env_path=None):
def load_project_data(test_path, dot_env_path=None):
""" load api, testcases, .env, debugtalk.py functions.
api/testcases folder is relative to project_working_directory
@@ -779,31 +462,9 @@ def load_project_tests(test_path, dot_env_path=None):
environments and debugtalk.py functions.
"""
debugtalk_path, project_working_directory = init_project_working_directory(test_path)
def prepare_path(path):
if not os.path.exists(path):
err_msg = "path not exist: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
return path
test_path = prepare_path(test_path)
# locate debugtalk.py file
debugtalk_path = locate_debugtalk_py(test_path)
if debugtalk_path:
# The folder contains debugtalk.py will be treated as PWD.
project_working_directory = os.path.dirname(debugtalk_path)
else:
# debugtalk.py not found, use os.getcwd() as PWD.
project_working_directory = os.getcwd()
# add PWD to sys.path
sys.path.insert(0, project_working_directory)
project_mapping = {}
# load .env file
# NOTICE:
@@ -821,16 +482,17 @@ def load_project_tests(test_path, dot_env_path=None):
# locate PWD and load debugtalk.py functions
project_mapping["PWD"] = project_working_directory
built_in.PWD = project_working_directory
functions.PWD = project_working_directory # TODO: remove
project_mapping["functions"] = debugtalk_functions
project_mapping["test_path"] = test_path
# load api
tests_def_mapping["api"] = load_api_folder(os.path.join(project_working_directory, "api"))
tests_def_mapping["PWD"] = project_working_directory
return project_mapping
def load_tests(path, dot_env_path=None):
def load_cases(path, dot_env_path=None):
""" load testcases from file path, extend and merge with api/testcase definitions.
Args:
@@ -883,9 +545,9 @@ def load_tests(path, dot_env_path=None):
}
"""
load_project_tests(path, dot_env_path)
tests_mapping = {
"project_mapping": project_mapping
"project_mapping": load_project_data(path, dot_env_path)
}
def __load_file_content(path):

191
httprunner/loader/check.py Normal file
View File

@@ -0,0 +1,191 @@
import io
import json
import os
import types
from httprunner import logger, exceptions
# TODO: validate data format with JSON schema
def is_testcase(data_structure):
""" check if data_structure is a testcase.
Args:
data_structure (dict): testcase should always be in the following data structure:
{
"config": {
"name": "desc1",
"variables": [], # optional
"request": {} # optional
},
"teststeps": [
test_dict1,
{ # test_dict2
'name': 'test step desc2',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {},
'function_meta': {}
}
]
}
Returns:
bool: True if data_structure is valid testcase, otherwise False.
"""
# TODO: replace with JSON schema validation
if not isinstance(data_structure, dict):
return False
if "teststeps" not in data_structure:
return False
if not isinstance(data_structure["teststeps"], list):
return False
return True
def is_testcases(data_structure):
""" check if data_structure is testcase or testcases list.
Args:
data_structure (dict): testcase(s) should always be in the following data structure:
{
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
test_dict_2 # another test dict
]
},
testcase_dict_2 # another testcase dict
]
}
Returns:
bool: True if data_structure is valid testcase(s), otherwise False.
"""
if not isinstance(data_structure, dict):
return False
if "testcases" not in data_structure:
return False
testcases = data_structure["testcases"]
if not isinstance(testcases, list):
return False
for item in testcases:
if not is_testcase(item):
return False
return True
def is_testcase_path(path):
""" check if path is testcase path or path list.
Args:
path (str/list): file path or file path list.
Returns:
bool: True if path is valid file path or path list, otherwise False.
"""
if not isinstance(path, (str, list)):
return False
if isinstance(path, list):
for p in path:
if not is_testcase_path(p):
return False
if isinstance(path, str):
if not os.path.exists(path):
return False
return True
def check_testcase_format(file_path, content):
""" check testcase format if valid
"""
# TODO: replace with JSON schema validation
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
def validate_json_file(file_list):
""" validate JSON testcase format
"""
for json_file in set(file_list):
if not json_file.endswith(".json"):
logger.log_warning("Only JSON file format can be validated, skip: {}".format(json_file))
continue
logger.color_print("Start to validate JSON file: {}".format(json_file), "GREEN")
with io.open(json_file) as stream:
try:
json.load(stream)
except ValueError as e:
raise SystemExit(e)
print("OK")
def is_function(item):
""" Takes item object, returns True if it is a function.
"""
return isinstance(item, types.FunctionType)
def is_variable(tup):
""" Takes (name, object) tuple, returns True if it is a variable.
"""
name, item = tup
if callable(item):
# function or class
return False
if isinstance(item, types.ModuleType):
# imported module
return False
if name.startswith("_"):
# private property
return False
return True

241
httprunner/loader/load.py Normal file
View File

@@ -0,0 +1,241 @@
import csv
import io
import json
import os
import yaml
from httprunner import builtin
from httprunner import exceptions, logger, utils
from httprunner.loader.check import check_testcase_format, is_function
from httprunner.loader.locate import get_project_working_directory
try:
# PyYAML version >= 5.1
# ref: https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation
yaml.warnings({'YAMLLoadWarning': False})
except AttributeError:
pass
def _load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
check_testcase_format(yaml_file, yaml_content)
return yaml_content
def _load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
check_testcase_format(json_file, json_content)
return json_content
def load_csv_file(csv_file):
""" load csv file and check file content format
Args:
csv_file (str): csv file path, csv file content is like below:
Returns:
list: list of parameters, each parameter is in dict format
Examples:
>>> cat csv_file
username,password
test1,111111
test2,222222
test3,333333
>>> load_csv_file(csv_file)
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
if not os.path.isabs(csv_file):
pwd = get_project_working_directory()
# make compatible with Windows/Linux
csv_file = os.path.join(pwd, *csv_file.split("/"))
if not os.path.isfile(csv_file):
# file path not exist
raise exceptions.CSVNotFound(csv_file)
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return _load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return _load_yaml_file(file_path)
elif file_suffix == ".csv":
return load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files endswith yml/yaml/json in list.
Args:
folder_path (str): specified folder path to load
recursive (bool): load files recursively if True
Returns:
list: files endswith yml/yaml/json
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def load_dot_env_file(dot_env_path):
""" load .env file.
Args:
dot_env_path (str): .env file path
Returns:
dict: environment variables mapping
{
"UserName": "debugtalk",
"Password": "123456",
"PROJECT_KEY": "ABCDEFGH"
}
Raises:
exceptions.FileFormatError: If .env file format is invalid.
"""
if not os.path.isfile(dot_env_path):
return {}
logger.log_info("Loading environment variables from {}".format(dot_env_path))
env_variables_mapping = {}
with io.open(dot_env_path, 'r', encoding='utf-8') as fp:
for line in fp:
# maxsplit=1
if "=" in line:
variable, value = line.split("=", 1)
elif ":" in line:
variable, value = line.split(":", 1)
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
utils.set_os_environ(env_variables_mapping)
return env_variables_mapping
def load_folder_content(folder_path):
""" load api/testcases/testsuites definitions from folder.
Args:
folder_path (str): api/testcases/testsuites files folder.
Returns:
dict: api definition mapping.
{
"tests/api/basic.yml": [
{"api": {"def": "api_login", "request": {}, "validate": []}},
{"api": {"def": "api_logout", "request": {}, "validate": []}}
]
}
"""
items_mapping = {}
for file_path in load_folder_files(folder_path):
items_mapping[file_path] = load_file(file_path)
return items_mapping
def load_module_functions(module):
""" load python module functions.
Args:
module: python module
Returns:
dict: functions mapping for specified python module
{
"func1_name": func1,
"func2_name": func2
}
"""
module_functions = {}
for name, item in vars(module).items():
if is_function(item):
module_functions[name] = item
return module_functions
def load_builtin_functions():
""" load builtin module functions
"""
return load_module_functions(builtin)

110
httprunner/loader/locate.py Normal file
View File

@@ -0,0 +1,110 @@
import os
import sys
from httprunner import exceptions, logger
project_working_directory = None
def locate_file(start_path, file_name):
""" locate filename and return absolute file path.
searching will be recursive upward until current working directory.
Args:
file_name (str): target locate file name
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound("invalid path: {}".format(start_path))
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
return os.path.abspath(file_path)
# current working directory
if os.path.abspath(start_dir_path) in [os.getcwd(), os.path.abspath(os.sep)]:
raise exceptions.FileNotFound("{} not found in {}".format(file_name, start_path))
# locate recursive upward
return locate_file(os.path.dirname(start_dir_path), file_name)
def locate_debugtalk_py(start_path):
""" locate debugtalk.py file
Args:
start_path (str): start locating path,
maybe testcase file path or directory path
Returns:
str: debugtalk.py file path, None if not found
"""
try:
# locate debugtalk.py file.
debugtalk_path = locate_file(start_path, "debugtalk.py")
except exceptions.FileNotFound:
debugtalk_path = None
return debugtalk_path
def init_project_working_directory(test_path):
""" this should be called at startup
init_project_working_directory <- load_project_data <- load_cases <- run
Args:
test_path: specified testfile path
Returns:
(str, str): debugtalk.py path, project_working_directory
"""
def prepare_path(path):
if not os.path.exists(path):
err_msg = "path not exist: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
return path
test_path = prepare_path(test_path)
# locate debugtalk.py file
debugtalk_path = locate_debugtalk_py(test_path)
global project_working_directory
if debugtalk_path:
# The folder contains debugtalk.py will be treated as PWD.
project_working_directory = os.path.dirname(debugtalk_path)
else:
# debugtalk.py not found, use os.getcwd() as PWD.
project_working_directory = os.getcwd()
# add PWD to sys.path
sys.path.insert(0, project_working_directory)
return debugtalk_path, project_working_directory
def get_project_working_directory():
global project_working_directory
if project_working_directory is None:
raise exceptions.MyBaseFailure("loader.load_cases() has not been called!")
return project_working_directory

View File

@@ -2,9 +2,11 @@
import ast
import builtins
import collections
import json
import re
from httprunner import exceptions, utils, validator
from httprunner import exceptions, utils, loader
from httprunner.compat import basestring, numeric_types, str
# use $$ to escape $ notation
@@ -218,6 +220,166 @@ def parse_parameters(parameters, variables_mapping=None, functions_mapping=None)
return utils.gen_cartesian_product(*parsed_parameters_list)
def get_uniform_comparator(comparator):
""" convert comparator alias to uniform name
"""
if comparator in ["eq", "equals", "==", "is"]:
return "equals"
elif comparator in ["lt", "less_than"]:
return "less_than"
elif comparator in ["le", "less_than_or_equals"]:
return "less_than_or_equals"
elif comparator in ["gt", "greater_than"]:
return "greater_than"
elif comparator in ["ge", "greater_than_or_equals"]:
return "greater_than_or_equals"
elif comparator in ["ne", "not_equals"]:
return "not_equals"
elif comparator in ["str_eq", "string_equals"]:
return "string_equals"
elif comparator in ["len_eq", "length_equals", "count_eq"]:
return "length_equals"
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
return "length_greater_than"
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals",
"count_greater_than_or_equals"]:
return "length_greater_than_or_equals"
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
return "length_less_than"
elif comparator in ["len_le", "count_le", "length_less_than_or_equals",
"count_less_than_or_equals"]:
return "length_less_than_or_equals"
else:
return comparator
def uniform_validator(validator):
""" unify validator
Args:
validator (dict): validator maybe in two formats:
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version, {comparator: [check_item, expected_value]}
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
Returns
dict: validator info
{
"check": "status_code",
"expect": 201,
"comparator": "equals"
}
"""
if not isinstance(validator, dict):
raise exceptions.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and "expect" in validator:
# format1
check_item = validator["check"]
expect_value = validator["expect"]
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
# uniform comparator, e.g. lt => less_than, eq => equals
comparator = get_uniform_comparator(comparator)
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _convert_validators_to_mapping(validators):
""" convert validators list to mapping.
Args:
validators (list): validators in list
Returns:
dict: validators mapping, use (check, comparator) as key.
Examples:
>>> validators = [
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
>>> print(_convert_validators_to_mapping(validators))
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def extend_validators(raw_validators, override_validators):
""" extend raw_validators with override_validators.
override_validators will merge and override raw_validators.
Args:
raw_validators (dict):
override_validators (dict):
Returns:
list: extended validators
Examples:
>>> raw_validators = [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
>>> override_validators = [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
>>> extend_validators(raw_validators, override_validators)
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not raw_validators:
return override_validators
elif not override_validators:
return raw_validators
else:
def_validators_mapping = _convert_validators_to_mapping(raw_validators)
ref_validators_mapping = _convert_validators_to_mapping(override_validators)
def_validators_mapping.update(ref_validators_mapping)
return list(def_validators_mapping.values())
###############################################################################
## parse content with variables and functions mapping
###############################################################################
@@ -247,8 +409,8 @@ def get_mapping_function(function_name, functions_mapping):
if not found, then try to check if builtin function.
Args:
variable_name (str): variable name
variables_mapping (dict): variables mapping
function_name (str): function name
functions_mapping (dict): functions mapping
Returns:
mapping function object.
@@ -261,7 +423,6 @@ def get_mapping_function(function_name, functions_mapping):
return functions_mapping[function_name]
elif function_name in ["parameterize", "P"]:
from httprunner import loader
return loader.load_csv_file
elif function_name in ["environ", "ENV"]:
@@ -269,7 +430,6 @@ def get_mapping_function(function_name, functions_mapping):
try:
# check if HttpRunner builtin functions
from httprunner import loader
built_in_functions = loader.load_builtin_functions()
return built_in_functions[function_name]
except KeyError:
@@ -338,6 +498,7 @@ def parse_function_params(params):
class LazyFunction(object):
""" call function lazily.
"""
def __init__(self, function_meta, functions_mapping=None, check_variables_set=None):
""" init LazyFunction object with function_meta
@@ -410,7 +571,7 @@ class LazyFunction(object):
return "LazyFunction({}({}))".format(self.func_name, args_string)
def __prepare_cache_key(self, args, kwargs):
return (self.func_name, repr(args), repr(kwargs))
return self.func_name, repr(args), repr(kwargs)
def to_value(self, variables_mapping=None):
""" parse lazy data with evaluated variables mapping.
@@ -431,6 +592,7 @@ cached_functions_mapping = {}
class LazyString(object):
""" evaluate string lazily.
"""
def __init__(self, raw_string, functions_mapping=None, check_variables_set=None, cached=False):
""" make raw_string as lazy object with functions_mapping
check if any variable undefined in check_variables_set
@@ -511,7 +673,7 @@ class LazyString(object):
curr_position = match_start_position
try:
# find next $ location
match_start_position = raw_string.index("$", curr_position+1)
match_start_position = raw_string.index("$", curr_position + 1)
remain_string = raw_string[curr_position:match_start_position]
except ValueError:
remain_string = raw_string[curr_position:]
@@ -786,11 +948,11 @@ def _extend_with_api(test_dict, api_def_dict):
# merge & override validators TODO: relocate
def_raw_validators = api_def_dict.pop("validate", [])
def_validators = [
validator.uniform_validator(_validator)
uniform_validator(_validator)
for _validator in def_raw_validators
]
ref_validators = test_dict.pop("validate", [])
test_dict["validate"] = validator.extend_validators(
test_dict["validate"] = extend_validators(
def_validators,
ref_validators
)
@@ -843,7 +1005,8 @@ def _extend_with_testcase(test_dict, testcase_def_dict):
"""
# override testcase config variables
testcase_def_dict["config"].setdefault("variables", {})
testcase_def_variables = utils.ensure_mapping_format(testcase_def_dict["config"].get("variables", {}))
testcase_def_variables = utils.ensure_mapping_format(
testcase_def_dict["config"].get("variables", {}))
testcase_def_variables.update(test_dict.pop("variables", {}))
testcase_def_dict["config"]["variables"] = testcase_def_variables
@@ -855,8 +1018,8 @@ def _extend_with_testcase(test_dict, testcase_def_dict):
# override name
test_name = test_dict.pop("name", None) \
or testcase_def_dict["config"].pop("name", None) \
or "testcase name undefined"
or testcase_def_dict["config"].pop("name", None) \
or "testcase name undefined"
# override testcase config name, output, etc.
testcase_def_dict["config"].update(test_dict)
@@ -875,7 +1038,8 @@ def __prepare_config(config, project_mapping, session_variables_set=None):
override_variables = utils.deepcopy_dict(project_mapping.get("variables", {}))
functions = project_mapping.get("functions", {})
if isinstance(raw_config_variables, basestring) and function_regex_compile.match(raw_config_variables):
if isinstance(raw_config_variables, basestring) and function_regex_compile.match(
raw_config_variables):
# config variables are generated by calling function
# e.g.
# "config": {
@@ -946,7 +1110,7 @@ def __prepare_testcase_tests(tests, config, project_mapping, session_variables_s
if "validate" in test_dict:
ref_raw_validators = test_dict.pop("validate", [])
test_dict["validate"] = [
validator.uniform_validator(_validator)
uniform_validator(_validator)
for _validator in ref_raw_validators
]

View File

@@ -16,7 +16,7 @@ def prepare_locust_tests(path):
]
"""
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
testcases = parser.parse_tests(tests_mapping)
locust_tests = []

View File

@@ -5,6 +5,7 @@ from unittest.case import SkipTest
from httprunner import exceptions, logger, response, utils
from httprunner.client import HttpSession
from httprunner.context import SessionContext
from httprunner.validator import Validator
class Runner(object):
@@ -62,7 +63,6 @@ class Runner(object):
"""
self.verify = config.get("verify", True)
self.export = config.get("export") or config.get("output", [])
self.validation_results = []
config_variables = config.get("variables", {})
# testcase setup hooks
@@ -86,19 +86,8 @@ class Runner(object):
if not isinstance(self.http_client_session, HttpSession):
return
self.validation_results = []
self.http_client_session.init_meta_data()
def __get_test_data(self):
""" get request/response data and validate results
"""
if not isinstance(self.http_client_session, HttpSession):
return
meta_data = self.http_client_session.meta_data
meta_data["validators"] = self.validation_results
return meta_data
def _handle_skip_feature(self, test_dict):
""" handle skip feature for test
- skip: skip current test unconditionally
@@ -244,7 +233,8 @@ class Runner(object):
raise exceptions.ParamsError(err_msg)
logger.log_info("{method} {url}".format(method=method, url=parsed_url))
logger.log_debug("request kwargs(raw): {kwargs}".format(kwargs=parsed_test_request))
logger.log_debug(
"request kwargs(raw): {kwargs}".format(kwargs=parsed_test_request))
# request
resp = self.http_client_session.request(
@@ -269,9 +259,18 @@ class Runner(object):
# validate
validators = test_dict.get("validate") or test_dict.get("validators") or []
validate_script = test_dict.get("validate_script", [])
if validate_script:
validators.append({
"type": "python_script",
"script": validate_script
})
validator = Validator(self.session_context, resp_obj)
try:
self.session_context.validate(validators, resp_obj)
except (exceptions.ParamsError, exceptions.ValidationFailure, exceptions.ExtractFailure):
validator.validate(validators)
except (exceptions.ParamsError,
exceptions.ValidationFailure, exceptions.ExtractFailure):
err_msg = "{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)
# log request
@@ -295,7 +294,9 @@ class Runner(object):
raise
finally:
self.validation_results = self.session_context.validation_results
# get request/response data and validate results
self.meta_datas = getattr(self.http_client_session, "meta_data", {})
self.meta_datas["validators"] = validator.validation_results
def _run_testcase(self, testcase_dict):
""" run single testcase.
@@ -380,8 +381,6 @@ class Runner(object):
self.exception_request_type = test_dict["request"]["method"]
self.exception_name = test_dict.get("name")
raise
finally:
self.meta_datas = self.__get_test_data()
def export_variables(self, output_variables_list):
""" export current testcase variables
@@ -392,8 +391,8 @@ class Runner(object):
for variable in output_variables_list:
if variable not in variables_mapping:
logger.log_warning(
"variable '{}' can not be found in variables mapping, failed to export!"\
.format(variable)
"variable '{}' can not be found in variables mapping, "
"failed to export!".format(variable)
)
continue

View File

@@ -279,15 +279,17 @@
{% endfor %}
<h3>Validators:</h3>
<div style="overflow: auto">
<table>
<div style="overflow: auto">
{% set validate_extractors = meta_data.validators.validate_extractor %}
{% if validate_extractors %}
<table>
<tr>
<th>check</th>
<th>comparator</th>
<th>expect value</th>
<th>actual value</th>
</tr>
{% for validator in meta_data.validators %}
{% for validator in validate_extractors %}
<tr>
{% if validator.check_result == "pass" %}
<td class="passed">
@@ -303,7 +305,27 @@
<td>{{validator.check_value | e}}</td>
</tr>
{% endfor %}
</table>
</table>
{% endif %}
{% set validate_script = meta_data.validators.validate_script %}
{% if validate_script %}
<table>
<tr>
<th>validate script</th><th>exception</th>
</tr>
<tr>
<td>{{validate_script.validate_script | safe}}</td>
{% if validate_script.check_result == "pass" %}
<td class="passed">
{% elif validate_script.check_result == "fail" %}
<td class="failed">
{% endif %}
{{validate_script.exception}}
</td>
</tr>
</table>
{% endif %}
</div>
<h3>Statistics:</h3>

View File

@@ -119,52 +119,6 @@ def query_json(json_content, query, delimiter='.'):
return json_content
def deep_update_dict(origin_dict, override_dict):
""" update origin dict with override dict recursively
e.g. origin_dict = {'a': 1, 'b': {'c': 2, 'd': 4}}
override_dict = {'b': {'c': 3}}
return: {'a': 1, 'b': {'c': 3, 'd': 4}}
"""
if not override_dict:
return origin_dict
for key, val in override_dict.items():
if isinstance(val, dict):
tmp = deep_update_dict(origin_dict.get(key, {}), val)
origin_dict[key] = tmp
elif val is None:
# fix #64: when headers in test is None, it should inherit from config
continue
else:
origin_dict[key] = override_dict[key]
return origin_dict
def convert_dict_to_params(src_dict):
""" convert dict to params string
Args:
src_dict (dict): source mapping data structure
Returns:
str: string params data
Examples:
>>> src_dict = {
"a": 1,
"b": 2
}
>>> convert_dict_to_params(src_dict)
>>> "a=1&b=2"
"""
return "&".join([
"{}={}".format(key, value)
for key, value in src_dict.items()
])
def lower_dict_keys(origin_dict):
""" convert keys in dict to lower case

View File

@@ -1,347 +1,191 @@
# encoding: utf-8
import collections
import io
import json
import os
import types
from httprunner import exceptions, logger
""" validate data format
TODO: refactor with JSON schema validate
"""
from httprunner import exceptions, logger, parser
def is_testcase(data_structure):
""" check if data_structure is a testcase.
class Validator(object):
"""Validate tests
Args:
data_structure (dict): testcase should always be in the following data structure:
{
"config": {
"name": "desc1",
"variables": [], # optional
"request": {} # optional
},
"teststeps": [
test_dict1,
{ # test_dict2
'name': 'test step desc2',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {},
'function_meta': {}
}
]
}
Returns:
bool: True if data_structure is valid testcase, otherwise False.
Attributes:
validation_results (dict): store validation results,
including validate_extractor and validate_script.
"""
# TODO: replace with JSON schema validation
if not isinstance(data_structure, dict):
return False
if "teststeps" not in data_structure:
return False
def __init__(self, session_context, resp_obj):
""" initialize a Validator for each teststep (API request)
if not isinstance(data_structure["teststeps"], list):
return False
Args:
session_context: HttpRunner session context
resp_obj: ResponseObject instance
"""
self.session_context = session_context
self.resp_obj = resp_obj
self.validation_results = {}
return True
def __eval_validator_check(self, check_item):
""" evaluate check item in validator.
Args:
check_item: check_item should only be the following 5 formats:
1, variable reference, e.g. $token
2, function reference, e.g. ${is_status_code_200($status_code)}
3, dict or list, maybe containing variable/function reference, e.g. {"var": "$abc"}
4, string joined by delimiter. e.g. "status_code", "headers.content-type"
5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
def is_testcases(data_structure):
""" check if data_structure is testcase or testcases list.
"""
if isinstance(check_item, (dict, list)) \
or isinstance(check_item, parser.LazyString):
# format 1/2/3
check_value = self.session_context.eval_content(check_item)
else:
# format 4/5
check_value = self.resp_obj.extract_field(check_item)
Args:
data_structure (dict): testcase(s) should always be in the following data structure:
{
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
test_dict_2 # another test dict
]
},
testcase_dict_2 # another testcase dict
]
}
return check_value
Returns:
bool: True if data_structure is valid testcase(s), otherwise False.
def __eval_validator_expect(self, expect_item):
""" evaluate expect item in validator.
"""
if not isinstance(data_structure, dict):
return False
Args:
expect_item: expect_item should only be in 2 types:
1, variable reference, e.g. $expect_status_code
2, actual value, e.g. 200
if "testcases" not in data_structure:
return False
"""
expect_value = self.session_context.eval_content(expect_item)
return expect_value
testcases = data_structure["testcases"]
if not isinstance(testcases, list):
return False
for item in testcases:
if not is_testcase(item):
return False
return True
def is_testcase_path(path):
""" check if path is testcase path or path list.
Args:
path (str/list): file path or file path list.
Returns:
bool: True if path is valid file path or path list, otherwise False.
"""
if not isinstance(path, (str, list)):
return False
if isinstance(path, list):
for p in path:
if not is_testcase_path(p):
return False
if isinstance(path, str):
if not os.path.exists(path):
return False
return True
###############################################################################
## testcase validator utils
###############################################################################
def get_uniform_comparator(comparator):
""" convert comparator alias to uniform name
"""
if comparator in ["eq", "equals", "==", "is"]:
return "equals"
elif comparator in ["lt", "less_than"]:
return "less_than"
elif comparator in ["le", "less_than_or_equals"]:
return "less_than_or_equals"
elif comparator in ["gt", "greater_than"]:
return "greater_than"
elif comparator in ["ge", "greater_than_or_equals"]:
return "greater_than_or_equals"
elif comparator in ["ne", "not_equals"]:
return "not_equals"
elif comparator in ["str_eq", "string_equals"]:
return "string_equals"
elif comparator in ["len_eq", "length_equals", "count_eq"]:
return "length_equals"
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
return "length_greater_than"
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals",
"count_greater_than_or_equals"]:
return "length_greater_than_or_equals"
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
return "length_less_than"
elif comparator in ["len_le", "count_le", "length_less_than_or_equals",
"count_less_than_or_equals"]:
return "length_less_than_or_equals"
else:
return comparator
def uniform_validator(validator):
""" unify validator
Args:
validator (dict): validator maybe in two formats:
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version, {comparator: [check_item, expected_value]}
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
Returns
dict: validator info
{
"check": "status_code",
"expect": 201,
"comparator": "equals"
}
"""
if not isinstance(validator, dict):
raise exceptions.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and "expect" in validator:
# format1
check_item = validator["check"]
expect_value = validator["expect"]
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
# uniform comparator, e.g. lt => less_than, eq => equals
comparator = get_uniform_comparator(comparator)
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _convert_validators_to_mapping(validators):
""" convert validators list to mapping.
Args:
validators (list): validators in list
Returns:
dict: validators mapping, use (check, comparator) as key.
Examples:
>>> validators = [
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
>>> print(_convert_validators_to_mapping(validators))
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
def validate_script(self, script):
""" make validation with python script
"""
validator_dict = {
"validate_script": "<br/>".join(script),
"check_result": "fail",
"exception": ""
}
"""
validators_mapping = {}
script = "\n ".join(script)
code = """
# encoding: utf-8
for validator in validators:
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
try:
{}
except Exception as ex:
import traceback
import sys
_type, _value, _tb = sys.exc_info()
# filename, lineno, name, line
_, _lineno, _, line_content = traceback.extract_tb(_tb, 1)[0]
key = (check, validator["comparator"])
validators_mapping[key] = validator
line_no = _lineno - 4
return validators_mapping
def extend_validators(raw_validators, override_validators):
""" extend raw_validators with override_validators.
override_validators will merge and override raw_validators.
Args:
raw_validators (dict):
override_validators (dict):
Returns:
list: extended validators
Examples:
>>> raw_validators = [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
>>> override_validators = [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
>>> extend_validators(raw_validators, override_validators)
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not raw_validators:
return override_validators
elif not override_validators:
return raw_validators
c_exception = _type.__name__ + "\\n"
c_exception += "\\tError line number: " + str(line_no) + "\\n"
c_exception += "\\tError line content: " + str(line_content) + "\\n"
if _value.args:
c_exception += "\\tError description: " + str(_value)
else:
def_validators_mapping = _convert_validators_to_mapping(raw_validators)
ref_validators_mapping = _convert_validators_to_mapping(override_validators)
c_exception += "\\tError description: " + _type.__name__
def_validators_mapping.update(ref_validators_mapping)
return list(def_validators_mapping.values())
raise _type(c_exception)
""".format(script)
variables = {
"status_code": self.resp_obj.status_code,
"response_json": self.resp_obj.json,
"response": self.resp_obj
}
variables.update(self.session_context.test_variables_mapping)
try:
code = compile(code, '<string>', 'exec')
exec(code, variables)
validator_dict["check_result"] = "pass"
return validator_dict, ""
except Exception as ex:
validator_dict["check_result"] = "fail"
validator_dict["exception"] = "<br/>".join(str(ex).splitlines())
return validator_dict, str(ex)
###############################################################################
## validate varibles and functions
###############################################################################
def validate(self, validators):
""" make validation with comparators
"""
self.validation_results = {}
if not validators:
return
logger.log_debug("start to validate.")
def is_function(item):
""" Takes item object, returns True if it is a function.
"""
return isinstance(item, types.FunctionType)
validate_pass = True
failures = []
for validator in validators:
def is_variable(tup):
""" Takes (name, object) tuple, returns True if it is a variable.
"""
name, item = tup
if callable(item):
# function or class
return False
if isinstance(validator, dict) and validator.get("type") == "python_script":
validator_dict, ex = self.validate_script(validator["script"])
if ex:
validate_pass = False
failures.append(ex)
if isinstance(item, types.ModuleType):
# imported module
return False
self.validation_results["validate_script"] = validator_dict
continue
if name.startswith("_"):
# private property
return False
if "validate_extractor" not in self.validation_results:
self.validation_results["validate_extractor"] = []
return True
# validator should be LazyFunction object
if not isinstance(validator, parser.LazyFunction):
raise exceptions.ValidationFailure(
"validator should be parsed first: {}".format(validators))
# evaluate validator args with context variable mapping.
validator_args = validator.get_args()
check_item, expect_item = validator_args
check_value = self.__eval_validator_check(check_item)
expect_value = self.__eval_validator_expect(expect_item)
validator.update_args([check_value, expect_value])
def validate_json_file(file_list):
""" validate JSON testcase format
"""
for json_file in set(file_list):
if not json_file.endswith(".json"):
logger.log_warning("Only JSON file format can be validated, skip: {}".format(json_file))
continue
comparator = validator.func_name
validator_dict = {
"comparator": comparator,
"check": check_item,
"check_value": check_value,
"expect": expect_item,
"expect_value": expect_value
}
validate_msg = "\nvalidate: {} {} {}({})".format(
check_item,
comparator,
expect_value,
type(expect_value).__name__
)
logger.color_print("Start to validate JSON file: {}".format(json_file), "GREEN")
with io.open(json_file) as stream:
try:
json.load(stream)
except ValueError as e:
raise SystemExit(e)
validator.to_value(self.session_context.test_variables_mapping)
validator_dict["check_result"] = "pass"
validate_msg += "\t==> pass"
logger.log_debug(validate_msg)
except (AssertionError, TypeError):
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += "\n{}({}) {} {}({})".format(
check_value,
type(check_value).__name__,
comparator,
expect_value,
type(expect_value).__name__
)
logger.log_error(validate_msg)
failures.append(validate_msg)
print("OK")
self.validation_results["validate_extractor"].append(validator_dict)
# restore validator args, in case of running multiple times
validator.update_args(validator_args)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise exceptions.ValidationFailure(failures_string)

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "httprunner"
version = "2.3.3"
version = "2.4.0"
description = "One-stop solution for HTTP(S) testing."
license = "Apache-2.0"
readme = "README.md"

View File

@@ -5,7 +5,7 @@ from functools import wraps
from flask import Flask, make_response, request
from httprunner.built_in import gen_random_string
from httprunner.builtin.functions import gen_random_string
try:
from httpbin import app as httpbin_app

View File

@@ -1,13 +1,34 @@
- config:
name: basic test with httpbin
request:
base_url: http://httpbin.org/
base_url: http://httpbin.org/
- test:
name: headers
name: validate response with json path
request:
url: /headers
url: /get
params:
a: 1
b: 2
method: GET
validate:
- eq: ["status_code", 200]
- assert_status_code_is_200: ["status_code"]
- eq: ["json.args.a", '1']
- eq: ["json.args.b", '2']
validate_script:
- "assert status_code == 200"
- test:
name: validate response with python script
request:
url: /get
params:
a: 1
b: 2
method: GET
validate:
- eq: ["status_code", 200]
validate_script:
- "assert status_code == 201"
- "a = response_json.get('args').get('a')"
- "assert a == '1'"

View File

@@ -289,6 +289,10 @@ class TestHttpRunner(ApiServerUnittest):
self.assertEqual(summary["stat"]["testcases"]["total"], 2)
self.assertEqual(summary["stat"]["teststeps"]["total"], 4)
def test_validate_script(self):
summary = self.runner.run("tests/httpbin/validate.yml")
self.assertFalse(summary["success"])
def test_run_httprunner_with_hooks(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/httpbin/hooks.yml')
@@ -327,9 +331,9 @@ class TestHttpRunner(ApiServerUnittest):
]
}
]
loader.load_project_tests("tests")
tests_mapping = {
"project_mapping": loader.project_mapping,
"project_mapping": loader.load_project_data("tests"),
"testcases": testcases
}
summary = self.runner.run_tests(tests_mapping)
@@ -359,9 +363,8 @@ class TestHttpRunner(ApiServerUnittest):
]
}
]
loader.load_project_tests("tests")
tests_mapping = {
"project_mapping": loader.project_mapping,
"project_mapping": loader.load_project_data("tests"),
"testcases": testcases
}
summary = self.runner.run_tests(tests_mapping)
@@ -389,9 +392,8 @@ class TestHttpRunner(ApiServerUnittest):
]
}
]
loader.load_project_tests("tests")
tests_mapping = {
"project_mapping": loader.project_mapping,
"project_mapping": loader.load_project_data("tests"),
"testcases": testcases
}
summary = self.runner.run_tests(tests_mapping)
@@ -600,7 +602,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_loader(self):
testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
project_mapping = tests_mapping["project_mapping"]
self.assertIsInstance(project_mapping, dict)
@@ -625,7 +627,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_parser(self):
testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
parsed_testcases = parser.parse_tests(tests_mapping)
@@ -646,7 +648,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_add_tests(self):
testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner()
@@ -660,7 +662,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_complex_verify(self):
testcase_path = "tests/testcases/create_user.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping)
teststeps = testcases[0]["teststeps"]
@@ -677,7 +679,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_simple_run_suite(self):
testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner()
test_suite = runner._add_tests(testcases)
@@ -691,7 +693,7 @@ class TestApi(ApiServerUnittest):
"tests/testcases/create_user.json",
"tests/testcases/create_user.v2.json"
]:
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner()
test_suite = runner._add_tests(testcases)
@@ -710,7 +712,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_loader(self):
testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
project_mapping = tests_mapping["project_mapping"]
self.assertIsInstance(project_mapping, dict)
@@ -742,7 +744,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_parser(self):
testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
parsed_testcases = parser.parse_tests(tests_mapping)
self.assertEqual(len(parsed_testcases), 2)
@@ -760,7 +762,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_add_tests(self):
testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner()
@@ -772,7 +774,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_run_suite(self):
testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path)
tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping)

View File

@@ -8,8 +8,7 @@ from tests.base import ApiServerUnittest, gen_random_string
class TestContext(ApiServerUnittest):
def setUp(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
loader.load_project_data(os.path.join(os.getcwd(), "tests"))
self.context = context.SessionContext(
variables={"SECRET_KEY": "DebugTalk"}
)

View File

View File

@@ -3,207 +3,21 @@ import os
import unittest
from httprunner import exceptions, loader
class TestFileLoader(unittest.TestCase):
def test_load_yaml_file_file_format_error(self):
yaml_tmp_file = "tests/data/tmp.yml"
# create empty yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
loader.load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
# create invalid format yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
loader.load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
def test_load_json_file_file_format_error(self):
json_tmp_file = "tests/data/tmp.json"
# create empty file
with open(json_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create empty json file
with open(json_tmp_file, 'w') as f:
f.write("{}")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create invalid format json file
with open(json_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
def test_load_testcases_bad_filepath(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
with self.assertRaises(exceptions.FileNotFound):
loader.load_file(testcase_file_path)
def test_load_json_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.json')
testcases = loader.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_yaml_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.yml')
testcases = loader.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_csv_file_one_parameter(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/user_agent.csv')
csv_content = loader.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'user_agent': 'iOS/10.1'},
{'user_agent': 'iOS/10.2'},
{'user_agent': 'iOS/10.3'}
]
)
def test_load_csv_file_multiple_parameters(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/account.csv')
csv_content = loader.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
)
def test_load_folder_files(self):
folder = os.path.join(os.getcwd(), 'tests')
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
file2 = os.path.join(os.getcwd(), 'tests', 'api', 'reset_all.yml')
files = loader.load_folder_files(folder, recursive=False)
self.assertEqual(files, [])
files = loader.load_folder_files(folder)
self.assertIn(file2, files)
self.assertNotIn(file1, files)
files = loader.load_folder_files("not_existed_foulder", recursive=False)
self.assertEqual([], files)
files = loader.load_folder_files(file2, recursive=False)
self.assertEqual([], files)
def test_load_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", ".env"
)
env_variables_mapping = loader.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "debugtalk")
def test_load_custom_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data", "test.env"
)
env_variables_mapping = loader.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "test")
self.assertEqual(env_variables_mapping["content_type"], "application/json; charset=UTF-8")
def test_load_env_path_not_exist(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data",
)
env_variables_mapping = loader.load_dot_env_file(dot_env_path)
self.assertEqual(env_variables_mapping, {})
def test_locate_file(self):
with self.assertRaises(exceptions.FileNotFound):
loader.locate_file(os.getcwd(), "debugtalk.py")
with self.assertRaises(exceptions.FileNotFound):
loader.locate_file("", "debugtalk.py")
start_path = os.path.join(os.getcwd(), "tests")
self.assertEqual(
loader.locate_file(start_path, "debugtalk.py"),
os.path.join(
os.getcwd(), "tests/debugtalk.py"
)
)
self.assertEqual(
loader.locate_file("tests/", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
loader.locate_file("tests", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
loader.locate_file("tests/base.py", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
loader.locate_file("tests/data/demo_testcase.yml", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
def test_load_folder_content(self):
path = os.path.join(os.getcwd(), "tests", "api")
items_mapping = loader.load_folder_content(path)
file_path = os.path.join(os.getcwd(), "tests", "api", "reset_all.yml")
self.assertIn(file_path, items_mapping)
self.assertIsInstance(items_mapping[file_path], dict)
from httprunner.loader import buildup
class TestModuleLoader(unittest.TestCase):
def test_filter_module_functions(self):
module_functions = loader.load_module_functions(loader)
module_functions = buildup.load_module_functions(buildup)
self.assertIn("load_module_functions", module_functions)
self.assertNotIn("is_py3", module_functions)
def test_load_debugtalk_module(self):
loader.load_project_tests(os.path.join(os.getcwd(), "httprunner"))
project_mapping = loader.project_mapping
project_mapping = buildup.load_project_data(os.path.join(os.getcwd(), "httprunner"))
self.assertNotIn("alter_response", project_mapping["functions"])
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
project_mapping = buildup.load_project_data(os.path.join(os.getcwd(), "tests"))
self.assertIn("alter_response", project_mapping["functions"])
is_status_code_200 = project_mapping["functions"]["is_status_code_200"]
@@ -211,27 +25,27 @@ class TestModuleLoader(unittest.TestCase):
self.assertFalse(is_status_code_200(500))
def test_load_debugtalk_py(self):
loader.load_project_tests("tests/data/demo_testcase.yml")
project_working_directory = loader.project_mapping["PWD"]
debugtalk_functions = loader.project_mapping["functions"]
project_mapping = buildup.load_project_data("tests/data/demo_testcase.yml")
project_working_directory = project_mapping["PWD"]
debugtalk_functions = project_mapping["functions"]
self.assertEqual(
project_working_directory,
os.path.join(os.getcwd(), "tests")
)
self.assertIn("gen_md5", debugtalk_functions)
loader.load_project_tests("tests/base.py")
project_working_directory = loader.project_mapping["PWD"]
debugtalk_functions = loader.project_mapping["functions"]
project_mapping = buildup.load_project_data("tests/base.py")
project_working_directory = project_mapping["PWD"]
debugtalk_functions = project_mapping["functions"]
self.assertEqual(
project_working_directory,
os.path.join(os.getcwd(), "tests")
)
self.assertIn("gen_md5", debugtalk_functions)
loader.load_project_tests("httprunner/__init__.py")
project_working_directory = loader.project_mapping["PWD"]
debugtalk_functions = loader.project_mapping["functions"]
project_mapping = buildup.load_project_data("httprunner/__init__.py")
project_working_directory = project_mapping["PWD"]
debugtalk_functions = project_mapping["functions"]
self.assertEqual(
project_working_directory,
os.getcwd()
@@ -243,9 +57,8 @@ class TestSuiteLoader(unittest.TestCase):
@classmethod
def setUpClass(cls):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
cls.project_mapping = loader.project_mapping
cls.tests_def_mapping = loader.tests_def_mapping
cls.project_mapping = buildup.load_project_data(os.path.join(os.getcwd(), "tests"))
cls.tests_def_mapping = buildup.tests_def_mapping
def test_load_teststep_api(self):
raw_test = {
@@ -255,7 +68,7 @@ class TestSuiteLoader(unittest.TestCase):
{"uid": "999"}
]
}
teststep = loader.load_teststep(raw_test)
teststep = buildup.load_teststep(raw_test)
self.assertEqual(
"create user (override).",
teststep["name"]
@@ -273,7 +86,7 @@ class TestSuiteLoader(unittest.TestCase):
{"device_sn": "$device_sn"}
]
}
testcase = loader.load_teststep(raw_test)
testcase = buildup.load_teststep(raw_test)
self.assertEqual(
"setup and reset all (override).",
testcase["name"]
@@ -284,7 +97,7 @@ class TestSuiteLoader(unittest.TestCase):
self.assertEqual(tests[1]["name"], "reset all users")
def test_load_test_file_api(self):
loaded_content = loader.load_test_file("tests/api/create_user.yml")
loaded_content = buildup.load_test_file("tests/api/create_user.yml")
self.assertEqual(loaded_content["type"], "api")
self.assertIn("path", loaded_content)
self.assertIn("request", loaded_content)
@@ -292,8 +105,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testcase(self):
for loaded_content in [
loader.load_test_file("tests/testcases/setup.yml"),
loader.load_test_file("tests/testcases/setup.json")
buildup.load_test_file("tests/testcases/setup.yml"),
buildup.load_test_file("tests/testcases/setup.json")
]:
self.assertEqual(loaded_content["type"], "testcase")
self.assertIn("path", loaded_content)
@@ -304,8 +117,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testcase_v2(self):
for loaded_content in [
loader.load_test_file("tests/testcases/setup.v2.yml"),
loader.load_test_file("tests/testcases/setup.v2.json")
buildup.load_test_file("tests/testcases/setup.v2.yml"),
buildup.load_test_file("tests/testcases/setup.v2.json")
]:
self.assertEqual(loaded_content["type"], "testcase")
self.assertIn("path", loaded_content)
@@ -316,8 +129,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testsuite(self):
for loaded_content in [
loader.load_test_file("tests/testsuites/create_users.yml"),
loader.load_test_file("tests/testsuites/create_users.json")
buildup.load_test_file("tests/testsuites/create_users.yml"),
buildup.load_test_file("tests/testsuites/create_users.json")
]:
self.assertEqual(loaded_content["type"], "testsuite")
@@ -332,8 +145,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testsuite_v2(self):
for loaded_content in [
loader.load_test_file("tests/testsuites/create_users.v2.yml"),
loader.load_test_file("tests/testsuites/create_users.v2.json")
buildup.load_test_file("tests/testsuites/create_users.v2.yml"),
buildup.load_test_file("tests/testsuites/create_users.v2.json")
]:
self.assertEqual(loaded_content["type"], "testsuite")
@@ -349,7 +162,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_api_file(self):
path = os.path.join(
os.getcwd(), 'tests/api/create_user.yml')
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"]
api_list = tests_mapping["apis"]
self.assertEqual(len(api_list), 1)
@@ -359,7 +172,7 @@ class TestSuiteLoader(unittest.TestCase):
# absolute file path
path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.json')
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"]
testcases_list = tests_mapping["testcases"]
self.assertEqual(len(testcases_list), 1)
@@ -368,7 +181,7 @@ class TestSuiteLoader(unittest.TestCase):
# relative file path
path = 'tests/data/demo_testcase_hardcode.yml'
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"]
testcases_list = tests_mapping["testcases"]
self.assertEqual(len(testcases_list), 1)
@@ -378,7 +191,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_testcase_file_2(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase.yml')
tests_mapping = loader.load_tests(testcase_file_path)
tests_mapping = loader.load_cases(testcase_file_path)
testcases = tests_mapping["testcases"]
self.assertIsInstance(testcases, list)
self.assertEqual(testcases[0]["config"]["name"], '123t$var_a')
@@ -398,7 +211,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_testcase_file_with_api_ref(self):
path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_layer.yml')
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"]
testcases_list = tests_mapping["testcases"]
self.assertIn('device_sn', testcases_list[0]["config"]["variables"])
@@ -418,7 +231,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_testsuite_file_with_testcase_ref(self):
path = os.path.join(
os.getcwd(), 'tests/testsuites/create_users.yml')
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"]
testsuites_list = tests_mapping["testsuites"]
@@ -443,13 +256,13 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_folder_path(self):
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data')
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
testcase_list_1 = tests_mapping["testcases"]
self.assertGreater(len(testcase_list_1), 4)
# relative folder path
path = 'tests/data/'
tests_mapping = loader.load_tests(path)
tests_mapping = loader.load_cases(path)
testcase_list_2 = tests_mapping["testcases"]
self.assertEqual(len(testcase_list_1), len(testcase_list_2))
@@ -457,22 +270,22 @@ class TestSuiteLoader(unittest.TestCase):
# absolute folder path
path = os.path.join(os.getcwd(), 'tests/data_not_exist')
with self.assertRaises(exceptions.FileNotFound):
loader.load_tests(path)
loader.load_cases(path)
# relative folder path
path = 'tests/data_not_exist'
with self.assertRaises(exceptions.FileNotFound):
loader.load_tests(path)
loader.load_cases(path)
def test_load_api_folder(self):
path = os.path.join(os.getcwd(), "tests", "api")
api_definition_mapping = loader.load_api_folder(path)
api_definition_mapping = buildup.load_api_folder(path)
api_file_path = os.path.join(os.getcwd(), "tests", "api", "get_token.yml")
self.assertIn(api_file_path, api_definition_mapping)
self.assertIn("request", api_definition_mapping[api_file_path])
def test_load_project_tests(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
buildup.load_project_data(os.path.join(os.getcwd(), "tests"))
api_file_path = os.path.join(os.getcwd(), "tests", "api", "get_token.yml")
self.assertIn(api_file_path, self.tests_def_mapping["api"])
self.assertEqual(self.project_mapping["env"]["PROJECT_KEY"], "ABCDEFGH")

View File

@@ -0,0 +1,76 @@
import unittest
from httprunner.loader import check
class TestLoaderCheck(unittest.TestCase):
def test_is_function(self):
func = lambda x: x + 1
self.assertTrue(check.is_function(func))
self.assertTrue(check.is_function(check.is_testcase))
def test_is_testcases(self):
data_structure = "path/to/file"
self.assertFalse(check.is_testcases(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(check.is_testcases(data_structure))
data_structure = {
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
# test_dict2 # another test dict
]
},
# testcase_dict_2 # another testcase dict
]
}
self.assertTrue(check.is_testcases(data_structure))
data_structure = [
{
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
},
{
"name": "desc2",
"config": {},
"api": {},
"testcases": ["testcase21", "testcase22"]
}
]
self.assertTrue(data_structure)
def test_is_variable(self):
var1 = 123
var2 = "abc"
self.assertTrue(check.is_variable(("var1", var1)))
self.assertTrue(check.is_variable(("var2", var2)))
__var = 123
self.assertFalse(check.is_variable(("__var", __var)))
func = lambda x: x + 1
self.assertFalse(check.is_variable(("func", func)))
self.assertFalse(check.is_variable(("unittest", unittest)))

View File

@@ -0,0 +1,159 @@
import os
import unittest
from httprunner import exceptions
from httprunner.loader import load
class TestFileLoader(unittest.TestCase):
def test_load_yaml_file_file_format_error(self):
yaml_tmp_file = "tests/data/tmp.yml"
# create empty yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
load._load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
# create invalid format yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
load._load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
def test_load_json_file_file_format_error(self):
json_tmp_file = "tests/data/tmp.json"
# create empty file
with open(json_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
load._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create empty json file
with open(json_tmp_file, 'w') as f:
f.write("{}")
with self.assertRaises(exceptions.FileFormatError):
load._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create invalid format json file
with open(json_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
load._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
def test_load_testcases_bad_filepath(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
with self.assertRaises(exceptions.FileNotFound):
load.load_file(testcase_file_path)
def test_load_json_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.json')
testcases = load.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_yaml_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.yml')
testcases = load.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_csv_file_one_parameter(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/user_agent.csv')
csv_content = load.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'user_agent': 'iOS/10.1'},
{'user_agent': 'iOS/10.2'},
{'user_agent': 'iOS/10.3'}
]
)
def test_load_csv_file_multiple_parameters(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/account.csv')
csv_content = load.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
)
def test_load_folder_files(self):
folder = os.path.join(os.getcwd(), 'tests')
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
file2 = os.path.join(os.getcwd(), 'tests', 'api', 'reset_all.yml')
files = load.load_folder_files(folder, recursive=False)
self.assertEqual(files, [])
files = load.load_folder_files(folder)
self.assertIn(file2, files)
self.assertNotIn(file1, files)
files = load.load_folder_files("not_existed_foulder", recursive=False)
self.assertEqual([], files)
files = load.load_folder_files(file2, recursive=False)
self.assertEqual([], files)
def test_load_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", ".env"
)
env_variables_mapping = load.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "debugtalk")
def test_load_custom_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data", "test.env"
)
env_variables_mapping = load.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "test")
self.assertEqual(env_variables_mapping["content_type"], "application/json; charset=UTF-8")
def test_load_env_path_not_exist(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data",
)
env_variables_mapping = load.load_dot_env_file(dot_env_path)
self.assertEqual(env_variables_mapping, {})
def test_load_folder_content(self):
path = os.path.join(os.getcwd(), "tests", "api")
items_mapping = load.load_folder_content(path)
file_path = os.path.join(os.getcwd(), "tests", "api", "reset_all.yml")
self.assertIn(file_path, items_mapping)
self.assertIsInstance(items_mapping[file_path], dict)

View File

@@ -0,0 +1,40 @@
import os
import unittest
from httprunner import exceptions
from httprunner.loader import locate
class TestLoaderLocate(unittest.TestCase):
def test_locate_file(self):
with self.assertRaises(exceptions.FileNotFound):
locate.locate_file(os.getcwd(), "debugtalk.py")
with self.assertRaises(exceptions.FileNotFound):
locate.locate_file("", "debugtalk.py")
start_path = os.path.join(os.getcwd(), "tests")
self.assertEqual(
locate.locate_file(start_path, "debugtalk.py"),
os.path.join(
os.getcwd(), "tests/debugtalk.py"
)
)
self.assertEqual(
locate.locate_file("tests/", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
locate.locate_file("tests", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
locate.locate_file("tests/base.py", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
locate.locate_file("tests/data/demo_testcase.yml", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)

View File

@@ -3,6 +3,7 @@ import time
import unittest
from httprunner import exceptions, loader, parser
from httprunner.loader import load
from tests.debugtalk import gen_random_string, sum_two
@@ -806,6 +807,104 @@ class TestParserBasic(unittest.TestCase):
self.assertEqual(parsed_variables["var2"], "abc$123")
self.assertEqual(parsed_variables["var3"], "abc$$num0")
def test_get_uniform_comparator(self):
self.assertEqual(parser.get_uniform_comparator("eq"), "equals")
self.assertEqual(parser.get_uniform_comparator("=="), "equals")
self.assertEqual(parser.get_uniform_comparator("lt"), "less_than")
self.assertEqual(parser.get_uniform_comparator("le"), "less_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("gt"), "greater_than")
self.assertEqual(parser.get_uniform_comparator("ge"), "greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("ne"), "not_equals")
self.assertEqual(parser.get_uniform_comparator("str_eq"), "string_equals")
self.assertEqual(parser.get_uniform_comparator("len_eq"), "length_equals")
self.assertEqual(parser.get_uniform_comparator("count_eq"), "length_equals")
self.assertEqual(parser.get_uniform_comparator("len_gt"), "length_greater_than")
self.assertEqual(parser.get_uniform_comparator("count_gt"), "length_greater_than")
self.assertEqual(parser.get_uniform_comparator("count_greater_than"), "length_greater_than")
self.assertEqual(parser.get_uniform_comparator("len_ge"), "length_greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_ge"), "length_greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_greater_than_or_equals"), "length_greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("len_lt"), "length_less_than")
self.assertEqual(parser.get_uniform_comparator("count_lt"), "length_less_than")
self.assertEqual(parser.get_uniform_comparator("count_less_than"), "length_less_than")
self.assertEqual(parser.get_uniform_comparator("len_le"), "length_less_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_le"), "length_less_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_less_than_or_equals"), "length_less_than_or_equals")
def test_parse_validator(self):
_validator = {"check": "status_code", "comparator": "eq", "expect": 201}
self.assertEqual(
parser.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
_validator = {'eq': ['status_code', 201]}
self.assertEqual(
parser.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
def test_extend_validators(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
def_validators = [
parser.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
parser.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = parser.extend_validators(def_validators, ref_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "equals"},
extended_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "length_equals"},
extended_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "length_equals"},
extended_validators
)
def test_extend_validators_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
def_validators = [
parser.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
parser.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = parser.extend_validators(def_validators, ref_validators)
self.assertEqual(len(extended_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'equals'}, extended_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'equals'}, extended_validators)
class TestParser(unittest.TestCase):
def test_parse_parameters_raw_list(self):
@@ -833,11 +932,11 @@ class TestParser(unittest.TestCase):
dot_env_path = os.path.join(
os.getcwd(), "tests", ".env"
)
loader.load_dot_env_file(dot_env_path)
load.load_dot_env_file(dot_env_path)
from tests import debugtalk
cartesian_product_parameters = parser.parse_parameters(
parameters,
functions_mapping=loader.load_module_functions(debugtalk)
functions_mapping=load.load_module_functions(debugtalk)
)
self.assertIn(
{
@@ -856,7 +955,7 @@ class TestParser(unittest.TestCase):
)
def test_parse_parameters_parameterize(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
loader.load_project_data(os.path.join(os.getcwd(), "tests"))
parameters = [
{"app_version": "${parameterize(data/app_version.csv)}"},
{"username-password": "${parameterize(data/account.csv)}"}
@@ -868,8 +967,7 @@ class TestParser(unittest.TestCase):
)
def test_parse_parameters_mix(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
project_mapping = loader.load_project_data(os.path.join(os.getcwd(), "tests"))
parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
@@ -886,7 +984,7 @@ class TestParser(unittest.TestCase):
def test_parse_tests_testcase(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase.yml')
tests_mapping = loader.load_tests(testcase_file_path)
tests_mapping = loader.load_cases(testcase_file_path)
testcases = tests_mapping["testcases"]
self.assertEqual(
testcases[0]["config"]["variables"]["var_c"],
@@ -1272,13 +1370,13 @@ class TestParser(unittest.TestCase):
parser.eval_lazy_data(content)
def test_extend_with_api(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
loader.load_project_data(os.path.join(os.getcwd(), "tests"))
raw_testinfo = {
"name": "get token",
"base_url": "https://github.com",
"api": "api/get_token.yml",
}
api_def_dict = loader.load_teststep(raw_testinfo)
api_def_dict = loader.buildup.load_teststep(raw_testinfo)
test_block = {
"name": "override block",
"times": 3,

View File

@@ -1,6 +1,6 @@
import requests
from httprunner import built_in, exceptions, loader, response
from httprunner import exceptions, response
from httprunner.compat import basestring, bytes
from tests.api_server import HTTPBIN_SERVER
from tests.base import ApiServerUnittest
@@ -8,9 +8,6 @@ from tests.base import ApiServerUnittest
class TestResponse(ApiServerUnittest):
def setUp(self):
self.functions_mapping = loader.load_module_functions(built_in)
def test_parse_response_object_json(self):
url = "http://127.0.0.1:5000/api/users"
resp = requests.get(url)

View File

@@ -9,8 +9,7 @@ from tests.base import ApiServerUnittest
class TestRunner(ApiServerUnittest):
def setUp(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
project_mapping = loader.load_project_data(os.path.join(os.getcwd(), "tests"))
self.debugtalk_functions = project_mapping["functions"]
config = {
@@ -35,7 +34,7 @@ class TestRunner(ApiServerUnittest):
]
for testcase_file_path in testcase_file_path_list:
tests_mapping = loader.load_tests(testcase_file_path)
tests_mapping = loader.load_cases(testcase_file_path)
parsed_testcases = parser.parse_tests(tests_mapping)
parsed_testcase = parsed_testcases[0]
test_runner = runner.Runner(parsed_testcase["config"])
@@ -289,7 +288,7 @@ class TestRunner(ApiServerUnittest):
def test_bugfix_type_match(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/bugfix_type_match.yml')
tests_mapping = loader.load_tests(testcase_file_path)
tests_mapping = loader.load_cases(testcase_file_path)
parsed_testcases = parser.parse_tests(tests_mapping)
parsed_testcase = parsed_testcases[0]
test_runner = runner.Runner(parsed_testcase["config"])

View File

@@ -62,8 +62,8 @@ class TestUtils(ApiServerUnittest):
self.assertEqual(result, "L")
def current_validators(self):
from httprunner import built_in
functions_mapping = loader.load_module_functions(built_in)
from httprunner.builtin import comparators
functions_mapping = loader.load.load_module_functions(comparators)
functions_mapping["equals"](None, None)
functions_mapping["equals"](1, 1)
@@ -108,15 +108,6 @@ class TestUtils(ApiServerUnittest):
functions_mapping["type_match"]({}, "dict")
functions_mapping["type_match"]({"a": 1}, "dict")
def test_deep_update_dict(self):
origin_dict = {'a': 1, 'b': {'c': 3, 'd': 4}, 'f': 6, 'h': 123}
override_dict = {'a': 2, 'b': {'c': 33, 'e': 5}, 'g': 7, 'h': None}
updated_dict = utils.deep_update_dict(origin_dict, override_dict)
self.assertEqual(
updated_dict,
{'a': 2, 'b': {'c': 33, 'd': 4, 'e': 5}, 'f': 6, 'g': 7, 'h': 123}
)
def test_handle_config_key_case(self):
origin_dict = {
"Name": "test",

View File

@@ -4,170 +4,4 @@ from httprunner import validator
class TestValidator(unittest.TestCase):
def test_is_testcases(self):
data_structure = "path/to/file"
self.assertFalse(validator.is_testcases(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(validator.is_testcases(data_structure))
data_structure = {
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
# test_dict2 # another test dict
]
},
# testcase_dict_2 # another testcase dict
]
}
self.assertTrue(validator.is_testcases(data_structure))
data_structure = [
{
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
},
{
"name": "desc2",
"config": {},
"api": {},
"testcases": ["testcase21", "testcase22"]
}
]
self.assertTrue(data_structure)
def test_is_variable(self):
var1 = 123
var2 = "abc"
self.assertTrue(validator.is_variable(("var1", var1)))
self.assertTrue(validator.is_variable(("var2", var2)))
__var = 123
self.assertFalse(validator.is_variable(("__var", __var)))
func = lambda x: x + 1
self.assertFalse(validator.is_variable(("func", func)))
self.assertFalse(validator.is_variable(("unittest", unittest)))
def test_is_function(self):
func = lambda x: x + 1
self.assertTrue(validator.is_function(func))
self.assertTrue(validator.is_function(validator.is_testcase))
def test_get_uniform_comparator(self):
self.assertEqual(validator.get_uniform_comparator("eq"), "equals")
self.assertEqual(validator.get_uniform_comparator("=="), "equals")
self.assertEqual(validator.get_uniform_comparator("lt"), "less_than")
self.assertEqual(validator.get_uniform_comparator("le"), "less_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("gt"), "greater_than")
self.assertEqual(validator.get_uniform_comparator("ge"), "greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("ne"), "not_equals")
self.assertEqual(validator.get_uniform_comparator("str_eq"), "string_equals")
self.assertEqual(validator.get_uniform_comparator("len_eq"), "length_equals")
self.assertEqual(validator.get_uniform_comparator("count_eq"), "length_equals")
self.assertEqual(validator.get_uniform_comparator("len_gt"), "length_greater_than")
self.assertEqual(validator.get_uniform_comparator("count_gt"), "length_greater_than")
self.assertEqual(validator.get_uniform_comparator("count_greater_than"), "length_greater_than")
self.assertEqual(validator.get_uniform_comparator("len_ge"), "length_greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_ge"), "length_greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_greater_than_or_equals"), "length_greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("len_lt"), "length_less_than")
self.assertEqual(validator.get_uniform_comparator("count_lt"), "length_less_than")
self.assertEqual(validator.get_uniform_comparator("count_less_than"), "length_less_than")
self.assertEqual(validator.get_uniform_comparator("len_le"), "length_less_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_le"), "length_less_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_less_than_or_equals"), "length_less_than_or_equals")
def test_parse_validator(self):
_validator = {"check": "status_code", "comparator": "eq", "expect": 201}
self.assertEqual(
validator.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
_validator = {'eq': ['status_code', 201]}
self.assertEqual(
validator.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
def test_extend_validators(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
def_validators = [
validator.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
validator.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = validator.extend_validators(def_validators, ref_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "equals"},
extended_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "length_equals"},
extended_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "length_equals"},
extended_validators
)
def test_extend_validators_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
def_validators = [
validator.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
validator.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = validator.extend_validators(def_validators, ref_validators)
self.assertEqual(len(extended_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'equals'}, extended_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'equals'}, extended_validators)
pass