Merge pull request #781 from httprunner/dev

2.4.0
This commit is contained in:
debugtalk
2019-12-11 16:15:16 +08:00
committed by GitHub
35 changed files with 1677 additions and 1530 deletions

View File

@@ -1,5 +1,19 @@
# Release History # Release History
## 2.4.0 (2019-12-11)
**Added**
- feat: validate with python script, ref #773
**Changed**
- refactor: make loader as submodule, split to check/locate/load/buildup
- refactor: make built_in as submodule, split to comparators and functions
- refactor: adjust code for context and validator
- docs: update cli argument help
- adjust format code, remove unused import
## 2.3.3 (2019-12-04) ## 2.3.3 (2019-12-04)
**Fixed** **Fixed**

View File

@@ -1,4 +1,4 @@
__version__ = "2.3.3" __version__ = "2.4.0"
__description__ = "One-stop solution for HTTP(S) testing." __description__ = "One-stop solution for HTTP(S) testing."
__all__ = ["__version__", "__description__"] __all__ = ["__version__", "__description__"]

View File

@@ -2,7 +2,7 @@ import os
import unittest import unittest
from httprunner import (__version__, exceptions, loader, logger, parser, from httprunner import (__version__, exceptions, loader, logger, parser,
report, runner, utils, validator) report, runner, utils)
class HttpRunner(object): class HttpRunner(object):
@@ -257,7 +257,7 @@ class HttpRunner(object):
""" """
# load tests # load tests
self.exception_stage = "load tests" self.exception_stage = "load tests"
tests_mapping = loader.load_tests(path, dot_env_path) tests_mapping = loader.load_cases(path, dot_env_path)
if mapping: if mapping:
tests_mapping["project_mapping"]["variables"] = mapping tests_mapping["project_mapping"]["variables"] = mapping
@@ -277,9 +277,9 @@ class HttpRunner(object):
""" """
logger.log_info("HttpRunner version: {}".format(__version__)) logger.log_info("HttpRunner version: {}".format(__version__))
if validator.is_testcase_path(path_or_tests): if loader.is_testcase_path(path_or_tests):
return self.run_path(path_or_tests, dot_env_path, mapping) return self.run_path(path_or_tests, dot_env_path, mapping)
elif validator.is_testcases(path_or_tests): elif loader.is_testcases(path_or_tests):
return self.run_tests(path_or_tests) return self.run_tests(path_or_tests)
else: else:
raise exceptions.ParamsError("Invalid testcase path or testcases: {}".format(path_or_tests)) raise exceptions.ParamsError("Invalid testcase path or testcases: {}".format(path_or_tests))

View File

@@ -1,206 +0,0 @@
# encoding: utf-8
"""
Built-in dependent functions used in YAML/JSON testcases.
"""
import datetime
import os
import random
import re
import string
import time
import filetype
from requests_toolbelt import MultipartEncoder
from httprunner.compat import basestring, builtin_str, integer_types
from httprunner.exceptions import ParamsError
PWD = os.getcwd()
###############################################################################
## built-in functions
###############################################################################
def gen_random_string(str_len):
""" generate random string with specified length
"""
return ''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(str_len))
def get_timestamp(str_len=13):
""" get timestamp string, length can only between 0 and 16
"""
if isinstance(str_len, integer_types) and 0 < str_len < 17:
return builtin_str(time.time()).replace(".", "")[:str_len]
raise ParamsError("timestamp length can only between 0 and 16.")
def get_current_date(fmt="%Y-%m-%d"):
""" get current date, default format is %Y-%m-%d
"""
return datetime.datetime.now().strftime(fmt)
def sleep(n_secs):
""" sleep n seconds
"""
time.sleep(n_secs)
###############################################################################
## upload files with requests-toolbelt
# e.g.
# - test:
# name: upload file
# variables:
# file_path: "data/test.env"
# multipart_encoder: ${multipart_encoder(file=$file_path)}
# request:
# url: /post
# method: POST
# headers:
# Content-Type: ${multipart_content_type($multipart_encoder)}
# data: $multipart_encoder
# validate:
# - eq: ["status_code", 200]
# - startswith: ["content.files.file", "UserName=test"]
###############################################################################
def multipart_encoder(**kwargs):
""" initialize MultipartEncoder with uploading fields.
"""
def get_filetype(file_path):
file_type = filetype.guess(file_path)
if file_type:
return file_type.mime
else:
return "text/html"
fields_dict = {}
for key, value in kwargs.items():
if os.path.isabs(value):
_file_path = value
is_file = True
else:
global PWD
_file_path = os.path.join(PWD, value)
is_file = os.path.isfile(_file_path)
if is_file:
filename = os.path.basename(_file_path)
with open(_file_path, 'rb') as f:
mime_type = get_filetype(_file_path)
fields_dict[key] = (filename, f.read(), mime_type)
else:
fields_dict[key] = value
return MultipartEncoder(fields=fields_dict)
def multipart_content_type(multipart_encoder):
""" prepare Content-Type for request headers
"""
return multipart_encoder.content_type
###############################################################################
## built-in comparators
###############################################################################
def equals(check_value, expect_value):
assert check_value == expect_value
def less_than(check_value, expect_value):
assert check_value < expect_value
def less_than_or_equals(check_value, expect_value):
assert check_value <= expect_value
def greater_than(check_value, expect_value):
assert check_value > expect_value
def greater_than_or_equals(check_value, expect_value):
assert check_value >= expect_value
def not_equals(check_value, expect_value):
assert check_value != expect_value
def string_equals(check_value, expect_value):
assert builtin_str(check_value) == builtin_str(expect_value)
def length_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) == expect_value
def length_greater_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) > expect_value
def length_greater_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) >= expect_value
def length_less_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) < expect_value
def length_less_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) <= expect_value
def contains(check_value, expect_value):
assert isinstance(check_value, (list, tuple, dict, basestring))
assert expect_value in check_value
def contained_by(check_value, expect_value):
assert isinstance(expect_value, (list, tuple, dict, basestring))
assert check_value in expect_value
def type_match(check_value, expect_value):
def get_type(name):
if isinstance(name, type):
return name
elif isinstance(name, basestring):
try:
return __builtins__[name]
except KeyError:
raise ValueError(name)
else:
raise ValueError(name)
assert isinstance(check_value, get_type(expect_value))
def regex_match(check_value, expect_value):
assert isinstance(expect_value, basestring)
assert isinstance(check_value, basestring)
assert re.match(expect_value, check_value)
def startswith(check_value, expect_value):
assert builtin_str(check_value).startswith(builtin_str(expect_value))
def endswith(check_value, expect_value):
assert builtin_str(check_value).endswith(builtin_str(expect_value))

View File

@@ -0,0 +1,2 @@
from httprunner.builtin.comparators import *
from httprunner.builtin.functions import *

View File

@@ -0,0 +1,99 @@
"""
Built-in validate comparators.
"""
import re
from httprunner.compat import basestring, builtin_str, integer_types
def equals(check_value, expect_value):
assert check_value == expect_value
def less_than(check_value, expect_value):
assert check_value < expect_value
def less_than_or_equals(check_value, expect_value):
assert check_value <= expect_value
def greater_than(check_value, expect_value):
assert check_value > expect_value
def greater_than_or_equals(check_value, expect_value):
assert check_value >= expect_value
def not_equals(check_value, expect_value):
assert check_value != expect_value
def string_equals(check_value, expect_value):
assert builtin_str(check_value) == builtin_str(expect_value)
def length_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) == expect_value
def length_greater_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) > expect_value
def length_greater_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) >= expect_value
def length_less_than(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) < expect_value
def length_less_than_or_equals(check_value, expect_value):
assert isinstance(expect_value, integer_types)
assert len(check_value) <= expect_value
def contains(check_value, expect_value):
assert isinstance(check_value, (list, tuple, dict, basestring))
assert expect_value in check_value
def contained_by(check_value, expect_value):
assert isinstance(expect_value, (list, tuple, dict, basestring))
assert check_value in expect_value
def type_match(check_value, expect_value):
def get_type(name):
if isinstance(name, type):
return name
elif isinstance(name, basestring):
try:
return __builtins__[name]
except KeyError:
raise ValueError(name)
else:
raise ValueError(name)
assert isinstance(check_value, get_type(expect_value))
def regex_match(check_value, expect_value):
assert isinstance(expect_value, basestring)
assert isinstance(check_value, basestring)
assert re.match(expect_value, check_value)
def startswith(check_value, expect_value):
assert builtin_str(check_value).startswith(builtin_str(expect_value))
def endswith(check_value, expect_value):
assert builtin_str(check_value).endswith(builtin_str(expect_value))

View File

@@ -0,0 +1,105 @@
"""
Built-in functions used in YAML/JSON testcases.
"""
import datetime
import os
import random
import string
import time
import filetype
from requests_toolbelt import MultipartEncoder
from httprunner.compat import builtin_str, integer_types
from httprunner.exceptions import ParamsError
PWD = os.getcwd()
def gen_random_string(str_len):
""" generate random string with specified length
"""
return ''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(str_len))
def get_timestamp(str_len=13):
""" get timestamp string, length can only between 0 and 16
"""
if isinstance(str_len, integer_types) and 0 < str_len < 17:
return builtin_str(time.time()).replace(".", "")[:str_len]
raise ParamsError("timestamp length can only between 0 and 16.")
def get_current_date(fmt="%Y-%m-%d"):
""" get current date, default format is %Y-%m-%d
"""
return datetime.datetime.now().strftime(fmt)
def sleep(n_secs):
""" sleep n seconds
"""
time.sleep(n_secs)
"""
upload files with requests-toolbelt
e.g.
- test:
name: upload file
variables:
file_path: "data/test.env"
multipart_encoder: ${multipart_encoder(file=$file_path)}
request:
url: /post
method: POST
headers:
Content-Type: ${multipart_content_type($multipart_encoder)}
data: $multipart_encoder
validate:
- eq: ["status_code", 200]
- startswith: ["content.files.file", "UserName=test"]
"""
def multipart_encoder(**kwargs):
""" initialize MultipartEncoder with uploading fields.
"""
def get_filetype(file_path):
file_type = filetype.guess(file_path)
if file_type:
return file_type.mime
else:
return "text/html"
fields_dict = {}
for key, value in kwargs.items():
if os.path.isabs(value):
_file_path = value
is_file = True
else:
global PWD
_file_path = os.path.join(PWD, value)
is_file = os.path.isfile(_file_path)
if is_file:
filename = os.path.basename(_file_path)
with open(_file_path, 'rb') as f:
mime_type = get_filetype(_file_path)
fields_dict[key] = (filename, f.read(), mime_type)
else:
fields_dict[key] = value
return MultipartEncoder(fields=fields_dict)
def multipart_content_type(multipart_encoder):
""" prepare Content-Type for request headers
"""
return multipart_encoder.content_type

View File

@@ -5,11 +5,11 @@ import sys
from httprunner import __description__, __version__ from httprunner import __description__, __version__
from httprunner.api import HttpRunner from httprunner.api import HttpRunner
from httprunner.compat import is_py2 from httprunner.compat import is_py2
from httprunner.loader import validate_json_file
from httprunner.logger import color_print from httprunner.logger import color_print
from httprunner.report import gen_html_report from httprunner.report import gen_html_report
from httprunner.utils import (create_scaffold, get_python2_retire_msg, from httprunner.utils import (create_scaffold, get_python2_retire_msg,
prettify_json_file) prettify_json_file)
from httprunner.validator import validate_json_file
def main(): def main():
@@ -23,8 +23,8 @@ def main():
'-V', '--version', dest='version', action='store_true', '-V', '--version', dest='version', action='store_true',
help="show version") help="show version")
parser.add_argument( parser.add_argument(
'testcase_paths', nargs='*', 'testfile_paths', nargs='*',
help="testcase file path") help="Specify api/testcase/testsuite file paths to run.")
parser.add_argument( parser.add_argument(
'--log-level', default='INFO', '--log-level', default='INFO',
help="Specify logging level, default is INFO.") help="Specify logging level, default is INFO.")
@@ -36,19 +36,19 @@ def main():
help="Specify .env file path, which is useful for keeping sensitive data.") help="Specify .env file path, which is useful for keeping sensitive data.")
parser.add_argument( parser.add_argument(
'--report-template', '--report-template',
help="specify report template path.") help="Specify report template path.")
parser.add_argument( parser.add_argument(
'--report-dir', '--report-dir',
help="specify report save directory.") help="Specify report save directory.")
parser.add_argument( parser.add_argument(
'--report-file', '--report-file',
help="specify report file path, this has higher priority than specifying report dir.") help="Specify report file path, this has higher priority than specifying report dir.")
parser.add_argument(
'--save-tests', action='store_true', default=False,
help="Save loaded/parsed/summary json data to JSON files.")
parser.add_argument( parser.add_argument(
'--failfast', action='store_true', default=False, '--failfast', action='store_true', default=False,
help="Stop the test run on the first error or failure.") help="Stop the test run on the first error or failure.")
parser.add_argument(
'--save-tests', action='store_true', default=False,
help="Save loaded tests and parsed tests to JSON file.")
parser.add_argument( parser.add_argument(
'--startproject', '--startproject',
help="Specify new project name.") help="Specify new project name.")
@@ -96,9 +96,9 @@ def main():
report_dir = args.report_dir or os.path.join(runner.project_working_directory, "reports") report_dir = args.report_dir or os.path.join(runner.project_working_directory, "reports")
gen_html_report( gen_html_report(
summary, summary,
args.report_template, report_template=args.report_template,
report_dir, report_dir=report_dir,
args.report_file report_file=args.report_file
) )
err_code |= (0 if summary and summary["success"] else 1) err_code |= (0 if summary and summary["success"] else 1)
except Exception: except Exception:

View File

@@ -1,4 +1,4 @@
from httprunner import exceptions, logger, parser, utils from httprunner import parser, utils
class SessionContext(object): class SessionContext(object):
@@ -13,11 +13,12 @@ class SessionContext(object):
>>> context.update_session_variables(variables) >>> context.update_session_variables(variables)
""" """
def __init__(self, variables=None): def __init__(self, variables=None):
variables_mapping = utils.ensure_mapping_format(variables or {}) variables_mapping = utils.ensure_mapping_format(variables or {})
self.session_variables_mapping = parser.parse_variables_mapping(variables_mapping) self.session_variables_mapping = parser.parse_variables_mapping(variables_mapping)
self.test_variables_mapping = {}
self.init_test_variables() self.init_test_variables()
self.validation_results = []
def init_test_variables(self, variables_mapping=None): def init_test_variables(self, variables_mapping=None):
""" init test variables, called when each test(api) starts. """ init test variables, called when each test(api) starts.
@@ -61,110 +62,3 @@ class SessionContext(object):
content may be in any data structure, include dict, list, tuple, number, string, etc. content may be in any data structure, include dict, list, tuple, number, string, etc.
""" """
return parser.parse_lazy_data(content, self.test_variables_mapping) return parser.parse_lazy_data(content, self.test_variables_mapping)
def __eval_validator_check(self, check_item, resp_obj):
""" evaluate check item in validator.
Args:
check_item: check_item should only be the following 5 formats:
1, variable reference, e.g. $token
2, function reference, e.g. ${is_status_code_200($status_code)}
3, dict or list, maybe containing variable/function reference, e.g. {"var": "$abc"}
4, string joined by delimiter. e.g. "status_code", "headers.content-type"
5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
resp_obj: response object
"""
if isinstance(check_item, (dict, list)) \
or isinstance(check_item, parser.LazyString):
# format 1/2/3
check_value = self.eval_content(check_item)
else:
# format 4/5
check_value = resp_obj.extract_field(check_item)
return check_value
def __eval_validator_expect(self, expect_item):
""" evaluate expect item in validator.
Args:
expect_item: expect_item should only be in 2 types:
1, variable reference, e.g. $expect_status_code
2, actual value, e.g. 200
"""
expect_value = self.eval_content(expect_item)
return expect_value
def validate(self, validators, resp_obj):
""" make validation with comparators
"""
self.validation_results = []
if not validators:
return
logger.log_debug("start to validate.")
validate_pass = True
failures = []
for validator in validators:
# validator should be LazyFunction object
if not isinstance(validator, parser.LazyFunction):
raise exceptions.ValidationFailure(
"validator should be parsed first: {}".format(validators))
# evaluate validator args with context variable mapping.
validator_args = validator.get_args()
check_item, expect_item = validator_args
check_value = self.__eval_validator_check(
check_item,
resp_obj
)
expect_value = self.__eval_validator_expect(expect_item)
validator.update_args([check_value, expect_value])
comparator = validator.func_name
validator_dict = {
"comparator": comparator,
"check": check_item,
"check_value": check_value,
"expect": expect_item,
"expect_value": expect_value
}
validate_msg = "\nvalidate: {} {} {}({})".format(
check_item,
comparator,
expect_value,
type(expect_value).__name__
)
try:
validator.to_value(self.test_variables_mapping)
validator_dict["check_result"] = "pass"
validate_msg += "\t==> pass"
logger.log_debug(validate_msg)
except (AssertionError, TypeError):
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += "\n{}({}) {} {}({})".format(
check_value,
type(check_value).__name__,
comparator,
expect_value,
type(expect_value).__name__
)
logger.log_error(validate_msg)
failures.append(validate_msg)
self.validation_results.append(validator_dict)
# restore validator args, in case of running multiple times
validator.update_args(validator_args)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise exceptions.ValidationFailure(failures_string)

View File

@@ -0,0 +1,23 @@
"""
HttpRunner loader
- check: validate testcase data structure with JSON schema (TODO)
- locate: locate debugtalk.py, make it's dir as project root path
- load: load testcase files and relevant data, including debugtalk.py, .env, yaml/json api/testcases, csv, etc.
- buildup: assemble loaded content to httprunner testcase/testsuite data structure
"""
from httprunner.loader.check import is_testcase_path, is_testcases, validate_json_file
from httprunner.loader.load import load_csv_file, load_builtin_functions
from httprunner.loader.buildup import load_cases, load_project_data
__all__ = [
"is_testcase_path",
"is_testcases",
"validate_json_file",
"load_csv_file",
"load_builtin_functions",
"load_project_data",
"load_cases"
]

View File

@@ -1,277 +1,16 @@
import csv
import importlib import importlib
import io
import json
import os import os
import sys
import yaml from httprunner import exceptions, logger, utils
from httprunner.builtin import functions
from httprunner.loader.load import load_module_functions, load_folder_content, load_file, load_dot_env_file, \
load_folder_files
from httprunner.loader.locate import init_project_working_directory, get_project_working_directory
from httprunner import built_in, exceptions, logger, utils, validator tests_def_mapping = {
"api": {},
try: "testcases": {}
# PyYAML version >= 5.1 }
# ref: https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation
yaml.warnings({'YAMLLoadWarning': False})
except AttributeError:
pass
###############################################################################
# file loader
###############################################################################
def _check_format(file_path, content):
""" check testcase format if valid
"""
# TODO: replace with JSON schema validation
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
def load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
_check_format(yaml_file, yaml_content)
return yaml_content
def load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
_check_format(json_file, json_content)
return json_content
def load_csv_file(csv_file):
""" load csv file and check file content format
Args:
csv_file (str): csv file path, csv file content is like below:
Returns:
list: list of parameters, each parameter is in dict format
Examples:
>>> cat csv_file
username,password
test1,111111
test2,222222
test3,333333
>>> load_csv_file(csv_file)
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
if not os.path.isabs(csv_file):
project_working_directory = tests_def_mapping["PWD"] or os.getcwd()
# make compatible with Windows/Linux
csv_file = os.path.join(project_working_directory, *csv_file.split("/"))
if not os.path.isfile(csv_file):
# file path not exist
raise exceptions.CSVNotFound(csv_file)
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return load_yaml_file(file_path)
elif file_suffix == ".csv":
return load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files endswith yml/yaml/json in list.
Args:
folder_path (str): specified folder path to load
recursive (bool): load files recursively if True
Returns:
list: files endswith yml/yaml/json
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def load_dot_env_file(dot_env_path):
""" load .env file.
Args:
dot_env_path (str): .env file path
Returns:
dict: environment variables mapping
{
"UserName": "debugtalk",
"Password": "123456",
"PROJECT_KEY": "ABCDEFGH"
}
Raises:
exceptions.FileFormatError: If .env file format is invalid.
"""
if not os.path.isfile(dot_env_path):
return {}
logger.log_info("Loading environment variables from {}".format(dot_env_path))
env_variables_mapping = {}
with io.open(dot_env_path, 'r', encoding='utf-8') as fp:
for line in fp:
# maxsplit=1
if "=" in line:
variable, value = line.split("=", 1)
elif ":" in line:
variable, value = line.split(":", 1)
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
utils.set_os_environ(env_variables_mapping)
return env_variables_mapping
def locate_file(start_path, file_name):
""" locate filename and return absolute file path.
searching will be recursive upward until current working directory.
Args:
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound("invalid path: {}".format(start_path))
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
return os.path.abspath(file_path)
# current working directory
if os.path.abspath(start_dir_path) in [os.getcwd(), os.path.abspath(os.sep)]:
raise exceptions.FileNotFound("{} not found in {}".format(file_name, start_path))
# locate recursive upward
return locate_file(os.path.dirname(start_dir_path), file_name)
###############################################################################
# debugtalk.py module loader
###############################################################################
def load_module_functions(module):
""" load python module functions.
Args:
module: python module
Returns:
dict: functions mapping for specified python module
{
"func1_name": func1,
"func2_name": func2
}
"""
module_functions = {}
for name, item in vars(module).items():
if validator.is_function(item):
module_functions[name] = item
return module_functions
def load_builtin_functions():
""" load built_in module functions
"""
return load_module_functions(built_in)
def load_debugtalk_functions(): def load_debugtalk_functions():
@@ -291,19 +30,6 @@ def load_debugtalk_functions():
return load_module_functions(imported_module) return load_module_functions(imported_module)
###############################################################################
# testcase loader
###############################################################################
project_mapping = {}
tests_def_mapping = {
"PWD": None,
"api": {},
"testcases": {}
}
def __extend_with_api_ref(raw_testinfo): def __extend_with_api_ref(raw_testinfo):
""" extend with api reference """ extend with api reference
@@ -318,7 +44,8 @@ def __extend_with_api_ref(raw_testinfo):
# 2, api sets file: one file contains a list of api definitions # 2, api sets file: one file contains a list of api definitions
if not os.path.isabs(api_name): if not os.path.isabs(api_name):
# make compatible with Windows/Linux # make compatible with Windows/Linux
api_path = os.path.join(tests_def_mapping["PWD"], *api_name.split("/")) pwd = get_project_working_directory()
api_path = os.path.join(pwd, *api_name.split("/"))
if os.path.isfile(api_path): if os.path.isfile(api_path):
# type 1: api is defined in individual file # type 1: api is defined in individual file
api_name = api_path api_name = api_path
@@ -338,8 +65,9 @@ def __extend_with_testcase_ref(raw_testinfo):
if testcase_path not in tests_def_mapping["testcases"]: if testcase_path not in tests_def_mapping["testcases"]:
# make compatible with Windows/Linux # make compatible with Windows/Linux
pwd = get_project_working_directory()
testcase_path = os.path.join( testcase_path = os.path.join(
project_mapping["PWD"], pwd,
*testcase_path.split("/") *testcase_path.split("/")
) )
loaded_testcase = load_file(testcase_path) loaded_testcase = load_file(testcase_path)
@@ -650,31 +378,6 @@ def load_test_file(path):
return loaded_content return loaded_content
def load_folder_content(folder_path):
""" load api/testcases/testsuites definitions from folder.
Args:
folder_path (str): api/testcases/testsuites files folder.
Returns:
dict: api definition mapping.
{
"tests/api/basic.yml": [
{"api": {"def": "api_login", "request": {}, "validate": []}},
{"api": {"def": "api_logout", "request": {}, "validate": []}}
]
}
"""
items_mapping = {}
for file_path in load_folder_files(folder_path):
items_mapping[file_path] = load_file(file_path)
return items_mapping
def load_api_folder(api_folder_path): def load_api_folder(api_folder_path):
""" load api definitions from api folder. """ load api definitions from api folder.
@@ -724,7 +427,7 @@ def load_api_folder(api_folder_path):
for api_item in api_items: for api_item in api_items:
key, api_dict = api_item.popitem() key, api_dict = api_item.popitem()
api_id = api_dict.get("id") or api_dict.get("def") \ api_id = api_dict.get("id") or api_dict.get("def") \
or api_dict.get("name") or api_dict.get("name")
if key != "api" or not api_id: if key != "api" or not api_id:
raise exceptions.ParamsError( raise exceptions.ParamsError(
"Invalid API defined in {}".format(api_file_path)) "Invalid API defined in {}".format(api_file_path))
@@ -746,27 +449,7 @@ def load_api_folder(api_folder_path):
return api_definition_mapping return api_definition_mapping
def locate_debugtalk_py(start_path): def load_project_data(test_path, dot_env_path=None):
""" locate debugtalk.py file
Args:
start_path (str): start locating path,
maybe testcase file path or directory path
Returns:
str: debugtalk.py file path, None if not found
"""
try:
# locate debugtalk.py file.
debugtalk_path = locate_file(start_path, "debugtalk.py")
except exceptions.FileNotFound:
debugtalk_path = None
return debugtalk_path
def load_project_tests(test_path, dot_env_path=None):
""" load api, testcases, .env, debugtalk.py functions. """ load api, testcases, .env, debugtalk.py functions.
api/testcases folder is relative to project_working_directory api/testcases folder is relative to project_working_directory
@@ -779,31 +462,9 @@ def load_project_tests(test_path, dot_env_path=None):
environments and debugtalk.py functions. environments and debugtalk.py functions.
""" """
debugtalk_path, project_working_directory = init_project_working_directory(test_path)
def prepare_path(path): project_mapping = {}
if not os.path.exists(path):
err_msg = "path not exist: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
return path
test_path = prepare_path(test_path)
# locate debugtalk.py file
debugtalk_path = locate_debugtalk_py(test_path)
if debugtalk_path:
# The folder contains debugtalk.py will be treated as PWD.
project_working_directory = os.path.dirname(debugtalk_path)
else:
# debugtalk.py not found, use os.getcwd() as PWD.
project_working_directory = os.getcwd()
# add PWD to sys.path
sys.path.insert(0, project_working_directory)
# load .env file # load .env file
# NOTICE: # NOTICE:
@@ -821,16 +482,17 @@ def load_project_tests(test_path, dot_env_path=None):
# locate PWD and load debugtalk.py functions # locate PWD and load debugtalk.py functions
project_mapping["PWD"] = project_working_directory project_mapping["PWD"] = project_working_directory
built_in.PWD = project_working_directory functions.PWD = project_working_directory # TODO: remove
project_mapping["functions"] = debugtalk_functions project_mapping["functions"] = debugtalk_functions
project_mapping["test_path"] = test_path project_mapping["test_path"] = test_path
# load api # load api
tests_def_mapping["api"] = load_api_folder(os.path.join(project_working_directory, "api")) tests_def_mapping["api"] = load_api_folder(os.path.join(project_working_directory, "api"))
tests_def_mapping["PWD"] = project_working_directory
return project_mapping
def load_tests(path, dot_env_path=None): def load_cases(path, dot_env_path=None):
""" load testcases from file path, extend and merge with api/testcase definitions. """ load testcases from file path, extend and merge with api/testcase definitions.
Args: Args:
@@ -883,9 +545,9 @@ def load_tests(path, dot_env_path=None):
} }
""" """
load_project_tests(path, dot_env_path)
tests_mapping = { tests_mapping = {
"project_mapping": project_mapping "project_mapping": load_project_data(path, dot_env_path)
} }
def __load_file_content(path): def __load_file_content(path):

191
httprunner/loader/check.py Normal file
View File

@@ -0,0 +1,191 @@
import io
import json
import os
import types
from httprunner import logger, exceptions
# TODO: validate data format with JSON schema
def is_testcase(data_structure):
""" check if data_structure is a testcase.
Args:
data_structure (dict): testcase should always be in the following data structure:
{
"config": {
"name": "desc1",
"variables": [], # optional
"request": {} # optional
},
"teststeps": [
test_dict1,
{ # test_dict2
'name': 'test step desc2',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {},
'function_meta': {}
}
]
}
Returns:
bool: True if data_structure is valid testcase, otherwise False.
"""
# TODO: replace with JSON schema validation
if not isinstance(data_structure, dict):
return False
if "teststeps" not in data_structure:
return False
if not isinstance(data_structure["teststeps"], list):
return False
return True
def is_testcases(data_structure):
""" check if data_structure is testcase or testcases list.
Args:
data_structure (dict): testcase(s) should always be in the following data structure:
{
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
test_dict_2 # another test dict
]
},
testcase_dict_2 # another testcase dict
]
}
Returns:
bool: True if data_structure is valid testcase(s), otherwise False.
"""
if not isinstance(data_structure, dict):
return False
if "testcases" not in data_structure:
return False
testcases = data_structure["testcases"]
if not isinstance(testcases, list):
return False
for item in testcases:
if not is_testcase(item):
return False
return True
def is_testcase_path(path):
""" check if path is testcase path or path list.
Args:
path (str/list): file path or file path list.
Returns:
bool: True if path is valid file path or path list, otherwise False.
"""
if not isinstance(path, (str, list)):
return False
if isinstance(path, list):
for p in path:
if not is_testcase_path(p):
return False
if isinstance(path, str):
if not os.path.exists(path):
return False
return True
def check_testcase_format(file_path, content):
""" check testcase format if valid
"""
# TODO: replace with JSON schema validation
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
def validate_json_file(file_list):
""" validate JSON testcase format
"""
for json_file in set(file_list):
if not json_file.endswith(".json"):
logger.log_warning("Only JSON file format can be validated, skip: {}".format(json_file))
continue
logger.color_print("Start to validate JSON file: {}".format(json_file), "GREEN")
with io.open(json_file) as stream:
try:
json.load(stream)
except ValueError as e:
raise SystemExit(e)
print("OK")
def is_function(item):
""" Takes item object, returns True if it is a function.
"""
return isinstance(item, types.FunctionType)
def is_variable(tup):
""" Takes (name, object) tuple, returns True if it is a variable.
"""
name, item = tup
if callable(item):
# function or class
return False
if isinstance(item, types.ModuleType):
# imported module
return False
if name.startswith("_"):
# private property
return False
return True

241
httprunner/loader/load.py Normal file
View File

@@ -0,0 +1,241 @@
import csv
import io
import json
import os
import yaml
from httprunner import builtin
from httprunner import exceptions, logger, utils
from httprunner.loader.check import check_testcase_format, is_function
from httprunner.loader.locate import get_project_working_directory
try:
# PyYAML version >= 5.1
# ref: https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation
yaml.warnings({'YAMLLoadWarning': False})
except AttributeError:
pass
def _load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
check_testcase_format(yaml_file, yaml_content)
return yaml_content
def _load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
check_testcase_format(json_file, json_content)
return json_content
def load_csv_file(csv_file):
""" load csv file and check file content format
Args:
csv_file (str): csv file path, csv file content is like below:
Returns:
list: list of parameters, each parameter is in dict format
Examples:
>>> cat csv_file
username,password
test1,111111
test2,222222
test3,333333
>>> load_csv_file(csv_file)
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
if not os.path.isabs(csv_file):
pwd = get_project_working_directory()
# make compatible with Windows/Linux
csv_file = os.path.join(pwd, *csv_file.split("/"))
if not os.path.isfile(csv_file):
# file path not exist
raise exceptions.CSVNotFound(csv_file)
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return _load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return _load_yaml_file(file_path)
elif file_suffix == ".csv":
return load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files endswith yml/yaml/json in list.
Args:
folder_path (str): specified folder path to load
recursive (bool): load files recursively if True
Returns:
list: files endswith yml/yaml/json
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def load_dot_env_file(dot_env_path):
""" load .env file.
Args:
dot_env_path (str): .env file path
Returns:
dict: environment variables mapping
{
"UserName": "debugtalk",
"Password": "123456",
"PROJECT_KEY": "ABCDEFGH"
}
Raises:
exceptions.FileFormatError: If .env file format is invalid.
"""
if not os.path.isfile(dot_env_path):
return {}
logger.log_info("Loading environment variables from {}".format(dot_env_path))
env_variables_mapping = {}
with io.open(dot_env_path, 'r', encoding='utf-8') as fp:
for line in fp:
# maxsplit=1
if "=" in line:
variable, value = line.split("=", 1)
elif ":" in line:
variable, value = line.split(":", 1)
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
utils.set_os_environ(env_variables_mapping)
return env_variables_mapping
def load_folder_content(folder_path):
""" load api/testcases/testsuites definitions from folder.
Args:
folder_path (str): api/testcases/testsuites files folder.
Returns:
dict: api definition mapping.
{
"tests/api/basic.yml": [
{"api": {"def": "api_login", "request": {}, "validate": []}},
{"api": {"def": "api_logout", "request": {}, "validate": []}}
]
}
"""
items_mapping = {}
for file_path in load_folder_files(folder_path):
items_mapping[file_path] = load_file(file_path)
return items_mapping
def load_module_functions(module):
""" load python module functions.
Args:
module: python module
Returns:
dict: functions mapping for specified python module
{
"func1_name": func1,
"func2_name": func2
}
"""
module_functions = {}
for name, item in vars(module).items():
if is_function(item):
module_functions[name] = item
return module_functions
def load_builtin_functions():
""" load builtin module functions
"""
return load_module_functions(builtin)

110
httprunner/loader/locate.py Normal file
View File

@@ -0,0 +1,110 @@
import os
import sys
from httprunner import exceptions, logger
project_working_directory = None
def locate_file(start_path, file_name):
""" locate filename and return absolute file path.
searching will be recursive upward until current working directory.
Args:
file_name (str): target locate file name
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound("invalid path: {}".format(start_path))
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
return os.path.abspath(file_path)
# current working directory
if os.path.abspath(start_dir_path) in [os.getcwd(), os.path.abspath(os.sep)]:
raise exceptions.FileNotFound("{} not found in {}".format(file_name, start_path))
# locate recursive upward
return locate_file(os.path.dirname(start_dir_path), file_name)
def locate_debugtalk_py(start_path):
""" locate debugtalk.py file
Args:
start_path (str): start locating path,
maybe testcase file path or directory path
Returns:
str: debugtalk.py file path, None if not found
"""
try:
# locate debugtalk.py file.
debugtalk_path = locate_file(start_path, "debugtalk.py")
except exceptions.FileNotFound:
debugtalk_path = None
return debugtalk_path
def init_project_working_directory(test_path):
""" this should be called at startup
init_project_working_directory <- load_project_data <- load_cases <- run
Args:
test_path: specified testfile path
Returns:
(str, str): debugtalk.py path, project_working_directory
"""
def prepare_path(path):
if not os.path.exists(path):
err_msg = "path not exist: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
return path
test_path = prepare_path(test_path)
# locate debugtalk.py file
debugtalk_path = locate_debugtalk_py(test_path)
global project_working_directory
if debugtalk_path:
# The folder contains debugtalk.py will be treated as PWD.
project_working_directory = os.path.dirname(debugtalk_path)
else:
# debugtalk.py not found, use os.getcwd() as PWD.
project_working_directory = os.getcwd()
# add PWD to sys.path
sys.path.insert(0, project_working_directory)
return debugtalk_path, project_working_directory
def get_project_working_directory():
global project_working_directory
if project_working_directory is None:
raise exceptions.MyBaseFailure("loader.load_cases() has not been called!")
return project_working_directory

View File

@@ -2,9 +2,11 @@
import ast import ast
import builtins import builtins
import collections
import json
import re import re
from httprunner import exceptions, utils, validator from httprunner import exceptions, utils, loader
from httprunner.compat import basestring, numeric_types, str from httprunner.compat import basestring, numeric_types, str
# use $$ to escape $ notation # use $$ to escape $ notation
@@ -218,6 +220,166 @@ def parse_parameters(parameters, variables_mapping=None, functions_mapping=None)
return utils.gen_cartesian_product(*parsed_parameters_list) return utils.gen_cartesian_product(*parsed_parameters_list)
def get_uniform_comparator(comparator):
""" convert comparator alias to uniform name
"""
if comparator in ["eq", "equals", "==", "is"]:
return "equals"
elif comparator in ["lt", "less_than"]:
return "less_than"
elif comparator in ["le", "less_than_or_equals"]:
return "less_than_or_equals"
elif comparator in ["gt", "greater_than"]:
return "greater_than"
elif comparator in ["ge", "greater_than_or_equals"]:
return "greater_than_or_equals"
elif comparator in ["ne", "not_equals"]:
return "not_equals"
elif comparator in ["str_eq", "string_equals"]:
return "string_equals"
elif comparator in ["len_eq", "length_equals", "count_eq"]:
return "length_equals"
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
return "length_greater_than"
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals",
"count_greater_than_or_equals"]:
return "length_greater_than_or_equals"
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
return "length_less_than"
elif comparator in ["len_le", "count_le", "length_less_than_or_equals",
"count_less_than_or_equals"]:
return "length_less_than_or_equals"
else:
return comparator
def uniform_validator(validator):
""" unify validator
Args:
validator (dict): validator maybe in two formats:
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version, {comparator: [check_item, expected_value]}
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
Returns
dict: validator info
{
"check": "status_code",
"expect": 201,
"comparator": "equals"
}
"""
if not isinstance(validator, dict):
raise exceptions.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and "expect" in validator:
# format1
check_item = validator["check"]
expect_value = validator["expect"]
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
# uniform comparator, e.g. lt => less_than, eq => equals
comparator = get_uniform_comparator(comparator)
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _convert_validators_to_mapping(validators):
""" convert validators list to mapping.
Args:
validators (list): validators in list
Returns:
dict: validators mapping, use (check, comparator) as key.
Examples:
>>> validators = [
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
>>> print(_convert_validators_to_mapping(validators))
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def extend_validators(raw_validators, override_validators):
""" extend raw_validators with override_validators.
override_validators will merge and override raw_validators.
Args:
raw_validators (dict):
override_validators (dict):
Returns:
list: extended validators
Examples:
>>> raw_validators = [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
>>> override_validators = [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
>>> extend_validators(raw_validators, override_validators)
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not raw_validators:
return override_validators
elif not override_validators:
return raw_validators
else:
def_validators_mapping = _convert_validators_to_mapping(raw_validators)
ref_validators_mapping = _convert_validators_to_mapping(override_validators)
def_validators_mapping.update(ref_validators_mapping)
return list(def_validators_mapping.values())
############################################################################### ###############################################################################
## parse content with variables and functions mapping ## parse content with variables and functions mapping
############################################################################### ###############################################################################
@@ -247,8 +409,8 @@ def get_mapping_function(function_name, functions_mapping):
if not found, then try to check if builtin function. if not found, then try to check if builtin function.
Args: Args:
variable_name (str): variable name function_name (str): function name
variables_mapping (dict): variables mapping functions_mapping (dict): functions mapping
Returns: Returns:
mapping function object. mapping function object.
@@ -261,7 +423,6 @@ def get_mapping_function(function_name, functions_mapping):
return functions_mapping[function_name] return functions_mapping[function_name]
elif function_name in ["parameterize", "P"]: elif function_name in ["parameterize", "P"]:
from httprunner import loader
return loader.load_csv_file return loader.load_csv_file
elif function_name in ["environ", "ENV"]: elif function_name in ["environ", "ENV"]:
@@ -269,7 +430,6 @@ def get_mapping_function(function_name, functions_mapping):
try: try:
# check if HttpRunner builtin functions # check if HttpRunner builtin functions
from httprunner import loader
built_in_functions = loader.load_builtin_functions() built_in_functions = loader.load_builtin_functions()
return built_in_functions[function_name] return built_in_functions[function_name]
except KeyError: except KeyError:
@@ -338,6 +498,7 @@ def parse_function_params(params):
class LazyFunction(object): class LazyFunction(object):
""" call function lazily. """ call function lazily.
""" """
def __init__(self, function_meta, functions_mapping=None, check_variables_set=None): def __init__(self, function_meta, functions_mapping=None, check_variables_set=None):
""" init LazyFunction object with function_meta """ init LazyFunction object with function_meta
@@ -410,7 +571,7 @@ class LazyFunction(object):
return "LazyFunction({}({}))".format(self.func_name, args_string) return "LazyFunction({}({}))".format(self.func_name, args_string)
def __prepare_cache_key(self, args, kwargs): def __prepare_cache_key(self, args, kwargs):
return (self.func_name, repr(args), repr(kwargs)) return self.func_name, repr(args), repr(kwargs)
def to_value(self, variables_mapping=None): def to_value(self, variables_mapping=None):
""" parse lazy data with evaluated variables mapping. """ parse lazy data with evaluated variables mapping.
@@ -431,6 +592,7 @@ cached_functions_mapping = {}
class LazyString(object): class LazyString(object):
""" evaluate string lazily. """ evaluate string lazily.
""" """
def __init__(self, raw_string, functions_mapping=None, check_variables_set=None, cached=False): def __init__(self, raw_string, functions_mapping=None, check_variables_set=None, cached=False):
""" make raw_string as lazy object with functions_mapping """ make raw_string as lazy object with functions_mapping
check if any variable undefined in check_variables_set check if any variable undefined in check_variables_set
@@ -511,7 +673,7 @@ class LazyString(object):
curr_position = match_start_position curr_position = match_start_position
try: try:
# find next $ location # find next $ location
match_start_position = raw_string.index("$", curr_position+1) match_start_position = raw_string.index("$", curr_position + 1)
remain_string = raw_string[curr_position:match_start_position] remain_string = raw_string[curr_position:match_start_position]
except ValueError: except ValueError:
remain_string = raw_string[curr_position:] remain_string = raw_string[curr_position:]
@@ -786,11 +948,11 @@ def _extend_with_api(test_dict, api_def_dict):
# merge & override validators TODO: relocate # merge & override validators TODO: relocate
def_raw_validators = api_def_dict.pop("validate", []) def_raw_validators = api_def_dict.pop("validate", [])
def_validators = [ def_validators = [
validator.uniform_validator(_validator) uniform_validator(_validator)
for _validator in def_raw_validators for _validator in def_raw_validators
] ]
ref_validators = test_dict.pop("validate", []) ref_validators = test_dict.pop("validate", [])
test_dict["validate"] = validator.extend_validators( test_dict["validate"] = extend_validators(
def_validators, def_validators,
ref_validators ref_validators
) )
@@ -843,7 +1005,8 @@ def _extend_with_testcase(test_dict, testcase_def_dict):
""" """
# override testcase config variables # override testcase config variables
testcase_def_dict["config"].setdefault("variables", {}) testcase_def_dict["config"].setdefault("variables", {})
testcase_def_variables = utils.ensure_mapping_format(testcase_def_dict["config"].get("variables", {})) testcase_def_variables = utils.ensure_mapping_format(
testcase_def_dict["config"].get("variables", {}))
testcase_def_variables.update(test_dict.pop("variables", {})) testcase_def_variables.update(test_dict.pop("variables", {}))
testcase_def_dict["config"]["variables"] = testcase_def_variables testcase_def_dict["config"]["variables"] = testcase_def_variables
@@ -855,8 +1018,8 @@ def _extend_with_testcase(test_dict, testcase_def_dict):
# override name # override name
test_name = test_dict.pop("name", None) \ test_name = test_dict.pop("name", None) \
or testcase_def_dict["config"].pop("name", None) \ or testcase_def_dict["config"].pop("name", None) \
or "testcase name undefined" or "testcase name undefined"
# override testcase config name, output, etc. # override testcase config name, output, etc.
testcase_def_dict["config"].update(test_dict) testcase_def_dict["config"].update(test_dict)
@@ -875,7 +1038,8 @@ def __prepare_config(config, project_mapping, session_variables_set=None):
override_variables = utils.deepcopy_dict(project_mapping.get("variables", {})) override_variables = utils.deepcopy_dict(project_mapping.get("variables", {}))
functions = project_mapping.get("functions", {}) functions = project_mapping.get("functions", {})
if isinstance(raw_config_variables, basestring) and function_regex_compile.match(raw_config_variables): if isinstance(raw_config_variables, basestring) and function_regex_compile.match(
raw_config_variables):
# config variables are generated by calling function # config variables are generated by calling function
# e.g. # e.g.
# "config": { # "config": {
@@ -946,7 +1110,7 @@ def __prepare_testcase_tests(tests, config, project_mapping, session_variables_s
if "validate" in test_dict: if "validate" in test_dict:
ref_raw_validators = test_dict.pop("validate", []) ref_raw_validators = test_dict.pop("validate", [])
test_dict["validate"] = [ test_dict["validate"] = [
validator.uniform_validator(_validator) uniform_validator(_validator)
for _validator in ref_raw_validators for _validator in ref_raw_validators
] ]

View File

@@ -16,7 +16,7 @@ def prepare_locust_tests(path):
] ]
""" """
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)
locust_tests = [] locust_tests = []

View File

@@ -5,6 +5,7 @@ from unittest.case import SkipTest
from httprunner import exceptions, logger, response, utils from httprunner import exceptions, logger, response, utils
from httprunner.client import HttpSession from httprunner.client import HttpSession
from httprunner.context import SessionContext from httprunner.context import SessionContext
from httprunner.validator import Validator
class Runner(object): class Runner(object):
@@ -62,7 +63,6 @@ class Runner(object):
""" """
self.verify = config.get("verify", True) self.verify = config.get("verify", True)
self.export = config.get("export") or config.get("output", []) self.export = config.get("export") or config.get("output", [])
self.validation_results = []
config_variables = config.get("variables", {}) config_variables = config.get("variables", {})
# testcase setup hooks # testcase setup hooks
@@ -86,19 +86,8 @@ class Runner(object):
if not isinstance(self.http_client_session, HttpSession): if not isinstance(self.http_client_session, HttpSession):
return return
self.validation_results = []
self.http_client_session.init_meta_data() self.http_client_session.init_meta_data()
def __get_test_data(self):
""" get request/response data and validate results
"""
if not isinstance(self.http_client_session, HttpSession):
return
meta_data = self.http_client_session.meta_data
meta_data["validators"] = self.validation_results
return meta_data
def _handle_skip_feature(self, test_dict): def _handle_skip_feature(self, test_dict):
""" handle skip feature for test """ handle skip feature for test
- skip: skip current test unconditionally - skip: skip current test unconditionally
@@ -244,7 +233,8 @@ class Runner(object):
raise exceptions.ParamsError(err_msg) raise exceptions.ParamsError(err_msg)
logger.log_info("{method} {url}".format(method=method, url=parsed_url)) logger.log_info("{method} {url}".format(method=method, url=parsed_url))
logger.log_debug("request kwargs(raw): {kwargs}".format(kwargs=parsed_test_request)) logger.log_debug(
"request kwargs(raw): {kwargs}".format(kwargs=parsed_test_request))
# request # request
resp = self.http_client_session.request( resp = self.http_client_session.request(
@@ -269,9 +259,18 @@ class Runner(object):
# validate # validate
validators = test_dict.get("validate") or test_dict.get("validators") or [] validators = test_dict.get("validate") or test_dict.get("validators") or []
validate_script = test_dict.get("validate_script", [])
if validate_script:
validators.append({
"type": "python_script",
"script": validate_script
})
validator = Validator(self.session_context, resp_obj)
try: try:
self.session_context.validate(validators, resp_obj) validator.validate(validators)
except (exceptions.ParamsError, exceptions.ValidationFailure, exceptions.ExtractFailure): except (exceptions.ParamsError,
exceptions.ValidationFailure, exceptions.ExtractFailure):
err_msg = "{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32) err_msg = "{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)
# log request # log request
@@ -295,7 +294,9 @@ class Runner(object):
raise raise
finally: finally:
self.validation_results = self.session_context.validation_results # get request/response data and validate results
self.meta_datas = getattr(self.http_client_session, "meta_data", {})
self.meta_datas["validators"] = validator.validation_results
def _run_testcase(self, testcase_dict): def _run_testcase(self, testcase_dict):
""" run single testcase. """ run single testcase.
@@ -380,8 +381,6 @@ class Runner(object):
self.exception_request_type = test_dict["request"]["method"] self.exception_request_type = test_dict["request"]["method"]
self.exception_name = test_dict.get("name") self.exception_name = test_dict.get("name")
raise raise
finally:
self.meta_datas = self.__get_test_data()
def export_variables(self, output_variables_list): def export_variables(self, output_variables_list):
""" export current testcase variables """ export current testcase variables
@@ -392,8 +391,8 @@ class Runner(object):
for variable in output_variables_list: for variable in output_variables_list:
if variable not in variables_mapping: if variable not in variables_mapping:
logger.log_warning( logger.log_warning(
"variable '{}' can not be found in variables mapping, failed to export!"\ "variable '{}' can not be found in variables mapping, "
.format(variable) "failed to export!".format(variable)
) )
continue continue

View File

@@ -279,15 +279,17 @@
{% endfor %} {% endfor %}
<h3>Validators:</h3> <h3>Validators:</h3>
<div style="overflow: auto"> <div style="overflow: auto">
<table> {% set validate_extractors = meta_data.validators.validate_extractor %}
{% if validate_extractors %}
<table>
<tr> <tr>
<th>check</th> <th>check</th>
<th>comparator</th> <th>comparator</th>
<th>expect value</th> <th>expect value</th>
<th>actual value</th> <th>actual value</th>
</tr> </tr>
{% for validator in meta_data.validators %} {% for validator in validate_extractors %}
<tr> <tr>
{% if validator.check_result == "pass" %} {% if validator.check_result == "pass" %}
<td class="passed"> <td class="passed">
@@ -303,7 +305,27 @@
<td>{{validator.check_value | e}}</td> <td>{{validator.check_value | e}}</td>
</tr> </tr>
{% endfor %} {% endfor %}
</table> </table>
{% endif %}
{% set validate_script = meta_data.validators.validate_script %}
{% if validate_script %}
<table>
<tr>
<th>validate script</th><th>exception</th>
</tr>
<tr>
<td>{{validate_script.validate_script | safe}}</td>
{% if validate_script.check_result == "pass" %}
<td class="passed">
{% elif validate_script.check_result == "fail" %}
<td class="failed">
{% endif %}
{{validate_script.exception}}
</td>
</tr>
</table>
{% endif %}
</div> </div>
<h3>Statistics:</h3> <h3>Statistics:</h3>

View File

@@ -119,52 +119,6 @@ def query_json(json_content, query, delimiter='.'):
return json_content return json_content
def deep_update_dict(origin_dict, override_dict):
""" update origin dict with override dict recursively
e.g. origin_dict = {'a': 1, 'b': {'c': 2, 'd': 4}}
override_dict = {'b': {'c': 3}}
return: {'a': 1, 'b': {'c': 3, 'd': 4}}
"""
if not override_dict:
return origin_dict
for key, val in override_dict.items():
if isinstance(val, dict):
tmp = deep_update_dict(origin_dict.get(key, {}), val)
origin_dict[key] = tmp
elif val is None:
# fix #64: when headers in test is None, it should inherit from config
continue
else:
origin_dict[key] = override_dict[key]
return origin_dict
def convert_dict_to_params(src_dict):
""" convert dict to params string
Args:
src_dict (dict): source mapping data structure
Returns:
str: string params data
Examples:
>>> src_dict = {
"a": 1,
"b": 2
}
>>> convert_dict_to_params(src_dict)
>>> "a=1&b=2"
"""
return "&".join([
"{}={}".format(key, value)
for key, value in src_dict.items()
])
def lower_dict_keys(origin_dict): def lower_dict_keys(origin_dict):
""" convert keys in dict to lower case """ convert keys in dict to lower case

View File

@@ -1,347 +1,191 @@
# encoding: utf-8 # encoding: utf-8
import collections
import io
import json
import os
import types
from httprunner import exceptions, logger from httprunner import exceptions, logger, parser
""" validate data format
TODO: refactor with JSON schema validate
"""
def is_testcase(data_structure): class Validator(object):
""" check if data_structure is a testcase. """Validate tests
Args: Attributes:
data_structure (dict): testcase should always be in the following data structure: validation_results (dict): store validation results,
including validate_extractor and validate_script.
{
"config": {
"name": "desc1",
"variables": [], # optional
"request": {} # optional
},
"teststeps": [
test_dict1,
{ # test_dict2
'name': 'test step desc2',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {},
'function_meta': {}
}
]
}
Returns:
bool: True if data_structure is valid testcase, otherwise False.
""" """
# TODO: replace with JSON schema validation
if not isinstance(data_structure, dict):
return False
if "teststeps" not in data_structure: def __init__(self, session_context, resp_obj):
return False """ initialize a Validator for each teststep (API request)
if not isinstance(data_structure["teststeps"], list): Args:
return False session_context: HttpRunner session context
resp_obj: ResponseObject instance
"""
self.session_context = session_context
self.resp_obj = resp_obj
self.validation_results = {}
return True def __eval_validator_check(self, check_item):
""" evaluate check item in validator.
Args:
check_item: check_item should only be the following 5 formats:
1, variable reference, e.g. $token
2, function reference, e.g. ${is_status_code_200($status_code)}
3, dict or list, maybe containing variable/function reference, e.g. {"var": "$abc"}
4, string joined by delimiter. e.g. "status_code", "headers.content-type"
5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
def is_testcases(data_structure): """
""" check if data_structure is testcase or testcases list. if isinstance(check_item, (dict, list)) \
or isinstance(check_item, parser.LazyString):
# format 1/2/3
check_value = self.session_context.eval_content(check_item)
else:
# format 4/5
check_value = self.resp_obj.extract_field(check_item)
Args: return check_value
data_structure (dict): testcase(s) should always be in the following data structure:
{
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
test_dict_2 # another test dict
]
},
testcase_dict_2 # another testcase dict
]
}
Returns: def __eval_validator_expect(self, expect_item):
bool: True if data_structure is valid testcase(s), otherwise False. """ evaluate expect item in validator.
""" Args:
if not isinstance(data_structure, dict): expect_item: expect_item should only be in 2 types:
return False 1, variable reference, e.g. $expect_status_code
2, actual value, e.g. 200
if "testcases" not in data_structure: """
return False expect_value = self.session_context.eval_content(expect_item)
return expect_value
testcases = data_structure["testcases"] def validate_script(self, script):
if not isinstance(testcases, list): """ make validation with python script
return False """
validator_dict = {
for item in testcases: "validate_script": "<br/>".join(script),
if not is_testcase(item): "check_result": "fail",
return False "exception": ""
return True
def is_testcase_path(path):
""" check if path is testcase path or path list.
Args:
path (str/list): file path or file path list.
Returns:
bool: True if path is valid file path or path list, otherwise False.
"""
if not isinstance(path, (str, list)):
return False
if isinstance(path, list):
for p in path:
if not is_testcase_path(p):
return False
if isinstance(path, str):
if not os.path.exists(path):
return False
return True
###############################################################################
## testcase validator utils
###############################################################################
def get_uniform_comparator(comparator):
""" convert comparator alias to uniform name
"""
if comparator in ["eq", "equals", "==", "is"]:
return "equals"
elif comparator in ["lt", "less_than"]:
return "less_than"
elif comparator in ["le", "less_than_or_equals"]:
return "less_than_or_equals"
elif comparator in ["gt", "greater_than"]:
return "greater_than"
elif comparator in ["ge", "greater_than_or_equals"]:
return "greater_than_or_equals"
elif comparator in ["ne", "not_equals"]:
return "not_equals"
elif comparator in ["str_eq", "string_equals"]:
return "string_equals"
elif comparator in ["len_eq", "length_equals", "count_eq"]:
return "length_equals"
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
return "length_greater_than"
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals",
"count_greater_than_or_equals"]:
return "length_greater_than_or_equals"
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
return "length_less_than"
elif comparator in ["len_le", "count_le", "length_less_than_or_equals",
"count_less_than_or_equals"]:
return "length_less_than_or_equals"
else:
return comparator
def uniform_validator(validator):
""" unify validator
Args:
validator (dict): validator maybe in two formats:
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "comparator": "eq", "expect": 201}
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
format2: recommended new version, {comparator: [check_item, expected_value]}
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
Returns
dict: validator info
{
"check": "status_code",
"expect": 201,
"comparator": "equals"
}
"""
if not isinstance(validator, dict):
raise exceptions.ParamsError("invalid validator: {}".format(validator))
if "check" in validator and "expect" in validator:
# format1
check_item = validator["check"]
expect_value = validator["expect"]
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
check_item, expect_value = compare_values
else:
raise exceptions.ParamsError("invalid validator: {}".format(validator))
# uniform comparator, e.g. lt => less_than, eq => equals
comparator = get_uniform_comparator(comparator)
return {
"check": check_item,
"expect": expect_value,
"comparator": comparator
}
def _convert_validators_to_mapping(validators):
""" convert validators list to mapping.
Args:
validators (list): validators in list
Returns:
dict: validators mapping, use (check, comparator) as key.
Examples:
>>> validators = [
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
>>> print(_convert_validators_to_mapping(validators))
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
} }
""" script = "\n ".join(script)
validators_mapping = {} code = """
# encoding: utf-8
for validator in validators: try:
if not isinstance(validator["check"], collections.Hashable): {}
check = json.dumps(validator["check"]) except Exception as ex:
else: import traceback
check = validator["check"] import sys
_type, _value, _tb = sys.exc_info()
# filename, lineno, name, line
_, _lineno, _, line_content = traceback.extract_tb(_tb, 1)[0]
key = (check, validator["comparator"]) line_no = _lineno - 4
validators_mapping[key] = validator
return validators_mapping c_exception = _type.__name__ + "\\n"
c_exception += "\\tError line number: " + str(line_no) + "\\n"
c_exception += "\\tError line content: " + str(line_content) + "\\n"
def extend_validators(raw_validators, override_validators):
""" extend raw_validators with override_validators.
override_validators will merge and override raw_validators.
Args:
raw_validators (dict):
override_validators (dict):
Returns:
list: extended validators
Examples:
>>> raw_validators = [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
>>> override_validators = [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
>>> extend_validators(raw_validators, override_validators)
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not raw_validators:
return override_validators
elif not override_validators:
return raw_validators
if _value.args:
c_exception += "\\tError description: " + str(_value)
else: else:
def_validators_mapping = _convert_validators_to_mapping(raw_validators) c_exception += "\\tError description: " + _type.__name__
ref_validators_mapping = _convert_validators_to_mapping(override_validators)
def_validators_mapping.update(ref_validators_mapping) raise _type(c_exception)
return list(def_validators_mapping.values()) """.format(script)
variables = {
"status_code": self.resp_obj.status_code,
"response_json": self.resp_obj.json,
"response": self.resp_obj
}
variables.update(self.session_context.test_variables_mapping)
try:
code = compile(code, '<string>', 'exec')
exec(code, variables)
validator_dict["check_result"] = "pass"
return validator_dict, ""
except Exception as ex:
validator_dict["check_result"] = "fail"
validator_dict["exception"] = "<br/>".join(str(ex).splitlines())
return validator_dict, str(ex)
############################################################################### def validate(self, validators):
## validate varibles and functions """ make validation with comparators
############################################################################### """
self.validation_results = {}
if not validators:
return
logger.log_debug("start to validate.")
def is_function(item): validate_pass = True
""" Takes item object, returns True if it is a function. failures = []
"""
return isinstance(item, types.FunctionType)
for validator in validators:
def is_variable(tup): if isinstance(validator, dict) and validator.get("type") == "python_script":
""" Takes (name, object) tuple, returns True if it is a variable. validator_dict, ex = self.validate_script(validator["script"])
""" if ex:
name, item = tup validate_pass = False
if callable(item): failures.append(ex)
# function or class
return False
if isinstance(item, types.ModuleType): self.validation_results["validate_script"] = validator_dict
# imported module continue
return False
if name.startswith("_"): if "validate_extractor" not in self.validation_results:
# private property self.validation_results["validate_extractor"] = []
return False
return True # validator should be LazyFunction object
if not isinstance(validator, parser.LazyFunction):
raise exceptions.ValidationFailure(
"validator should be parsed first: {}".format(validators))
# evaluate validator args with context variable mapping.
validator_args = validator.get_args()
check_item, expect_item = validator_args
check_value = self.__eval_validator_check(check_item)
expect_value = self.__eval_validator_expect(expect_item)
validator.update_args([check_value, expect_value])
def validate_json_file(file_list): comparator = validator.func_name
""" validate JSON testcase format validator_dict = {
""" "comparator": comparator,
for json_file in set(file_list): "check": check_item,
if not json_file.endswith(".json"): "check_value": check_value,
logger.log_warning("Only JSON file format can be validated, skip: {}".format(json_file)) "expect": expect_item,
continue "expect_value": expect_value
}
validate_msg = "\nvalidate: {} {} {}({})".format(
check_item,
comparator,
expect_value,
type(expect_value).__name__
)
logger.color_print("Start to validate JSON file: {}".format(json_file), "GREEN")
with io.open(json_file) as stream:
try: try:
json.load(stream) validator.to_value(self.session_context.test_variables_mapping)
except ValueError as e: validator_dict["check_result"] = "pass"
raise SystemExit(e) validate_msg += "\t==> pass"
logger.log_debug(validate_msg)
except (AssertionError, TypeError):
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += "\n{}({}) {} {}({})".format(
check_value,
type(check_value).__name__,
comparator,
expect_value,
type(expect_value).__name__
)
logger.log_error(validate_msg)
failures.append(validate_msg)
print("OK") self.validation_results["validate_extractor"].append(validator_dict)
# restore validator args, in case of running multiple times
validator.update_args(validator_args)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise exceptions.ValidationFailure(failures_string)

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "httprunner" name = "httprunner"
version = "2.3.3" version = "2.4.0"
description = "One-stop solution for HTTP(S) testing." description = "One-stop solution for HTTP(S) testing."
license = "Apache-2.0" license = "Apache-2.0"
readme = "README.md" readme = "README.md"

View File

@@ -5,7 +5,7 @@ from functools import wraps
from flask import Flask, make_response, request from flask import Flask, make_response, request
from httprunner.built_in import gen_random_string from httprunner.builtin.functions import gen_random_string
try: try:
from httpbin import app as httpbin_app from httpbin import app as httpbin_app

View File

@@ -1,13 +1,34 @@
- config: - config:
name: basic test with httpbin name: basic test with httpbin
request: base_url: http://httpbin.org/
base_url: http://httpbin.org/
- test: - test:
name: headers name: validate response with json path
request: request:
url: /headers url: /get
params:
a: 1
b: 2
method: GET method: GET
validate: validate:
- eq: ["status_code", 200] - eq: ["status_code", 200]
- assert_status_code_is_200: ["status_code"] - eq: ["json.args.a", '1']
- eq: ["json.args.b", '2']
validate_script:
- "assert status_code == 200"
- test:
name: validate response with python script
request:
url: /get
params:
a: 1
b: 2
method: GET
validate:
- eq: ["status_code", 200]
validate_script:
- "assert status_code == 201"
- "a = response_json.get('args').get('a')"
- "assert a == '1'"

View File

@@ -289,6 +289,10 @@ class TestHttpRunner(ApiServerUnittest):
self.assertEqual(summary["stat"]["testcases"]["total"], 2) self.assertEqual(summary["stat"]["testcases"]["total"], 2)
self.assertEqual(summary["stat"]["teststeps"]["total"], 4) self.assertEqual(summary["stat"]["teststeps"]["total"], 4)
def test_validate_script(self):
summary = self.runner.run("tests/httpbin/validate.yml")
self.assertFalse(summary["success"])
def test_run_httprunner_with_hooks(self): def test_run_httprunner_with_hooks(self):
testcase_file_path = os.path.join( testcase_file_path = os.path.join(
os.getcwd(), 'tests/httpbin/hooks.yml') os.getcwd(), 'tests/httpbin/hooks.yml')
@@ -327,9 +331,9 @@ class TestHttpRunner(ApiServerUnittest):
] ]
} }
] ]
loader.load_project_tests("tests")
tests_mapping = { tests_mapping = {
"project_mapping": loader.project_mapping, "project_mapping": loader.load_project_data("tests"),
"testcases": testcases "testcases": testcases
} }
summary = self.runner.run_tests(tests_mapping) summary = self.runner.run_tests(tests_mapping)
@@ -359,9 +363,8 @@ class TestHttpRunner(ApiServerUnittest):
] ]
} }
] ]
loader.load_project_tests("tests")
tests_mapping = { tests_mapping = {
"project_mapping": loader.project_mapping, "project_mapping": loader.load_project_data("tests"),
"testcases": testcases "testcases": testcases
} }
summary = self.runner.run_tests(tests_mapping) summary = self.runner.run_tests(tests_mapping)
@@ -389,9 +392,8 @@ class TestHttpRunner(ApiServerUnittest):
] ]
} }
] ]
loader.load_project_tests("tests")
tests_mapping = { tests_mapping = {
"project_mapping": loader.project_mapping, "project_mapping": loader.load_project_data("tests"),
"testcases": testcases "testcases": testcases
} }
summary = self.runner.run_tests(tests_mapping) summary = self.runner.run_tests(tests_mapping)
@@ -600,7 +602,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_loader(self): def test_testcase_loader(self):
testcase_path = "tests/testcases/setup.yml" testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
self.assertIsInstance(project_mapping, dict) self.assertIsInstance(project_mapping, dict)
@@ -625,7 +627,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_parser(self): def test_testcase_parser(self):
testcase_path = "tests/testcases/setup.yml" testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
parsed_testcases = parser.parse_tests(tests_mapping) parsed_testcases = parser.parse_tests(tests_mapping)
@@ -646,7 +648,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_add_tests(self): def test_testcase_add_tests(self):
testcase_path = "tests/testcases/setup.yml" testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner() runner = HttpRunner()
@@ -660,7 +662,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_complex_verify(self): def test_testcase_complex_verify(self):
testcase_path = "tests/testcases/create_user.yml" testcase_path = "tests/testcases/create_user.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)
teststeps = testcases[0]["teststeps"] teststeps = testcases[0]["teststeps"]
@@ -677,7 +679,7 @@ class TestApi(ApiServerUnittest):
def test_testcase_simple_run_suite(self): def test_testcase_simple_run_suite(self):
testcase_path = "tests/testcases/setup.yml" testcase_path = "tests/testcases/setup.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner() runner = HttpRunner()
test_suite = runner._add_tests(testcases) test_suite = runner._add_tests(testcases)
@@ -691,7 +693,7 @@ class TestApi(ApiServerUnittest):
"tests/testcases/create_user.json", "tests/testcases/create_user.json",
"tests/testcases/create_user.v2.json" "tests/testcases/create_user.v2.json"
]: ]:
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner() runner = HttpRunner()
test_suite = runner._add_tests(testcases) test_suite = runner._add_tests(testcases)
@@ -710,7 +712,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_loader(self): def test_testsuite_loader(self):
testcase_path = "tests/testsuites/create_users.yml" testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
self.assertIsInstance(project_mapping, dict) self.assertIsInstance(project_mapping, dict)
@@ -742,7 +744,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_parser(self): def test_testsuite_parser(self):
testcase_path = "tests/testsuites/create_users.yml" testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
parsed_testcases = parser.parse_tests(tests_mapping) parsed_testcases = parser.parse_tests(tests_mapping)
self.assertEqual(len(parsed_testcases), 2) self.assertEqual(len(parsed_testcases), 2)
@@ -760,7 +762,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_add_tests(self): def test_testsuite_add_tests(self):
testcase_path = "tests/testsuites/create_users.yml" testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)
runner = HttpRunner() runner = HttpRunner()
@@ -772,7 +774,7 @@ class TestApi(ApiServerUnittest):
def test_testsuite_run_suite(self): def test_testsuite_run_suite(self):
testcase_path = "tests/testsuites/create_users.yml" testcase_path = "tests/testsuites/create_users.yml"
tests_mapping = loader.load_tests(testcase_path) tests_mapping = loader.load_cases(testcase_path)
testcases = parser.parse_tests(tests_mapping) testcases = parser.parse_tests(tests_mapping)

View File

@@ -8,8 +8,7 @@ from tests.base import ApiServerUnittest, gen_random_string
class TestContext(ApiServerUnittest): class TestContext(ApiServerUnittest):
def setUp(self): def setUp(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) loader.load_project_data(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
self.context = context.SessionContext( self.context = context.SessionContext(
variables={"SECRET_KEY": "DebugTalk"} variables={"SECRET_KEY": "DebugTalk"}
) )

View File

View File

@@ -3,207 +3,21 @@ import os
import unittest import unittest
from httprunner import exceptions, loader from httprunner import exceptions, loader
from httprunner.loader import buildup
class TestFileLoader(unittest.TestCase):
def test_load_yaml_file_file_format_error(self):
yaml_tmp_file = "tests/data/tmp.yml"
# create empty yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
loader.load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
# create invalid format yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
loader.load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
def test_load_json_file_file_format_error(self):
json_tmp_file = "tests/data/tmp.json"
# create empty file
with open(json_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create empty json file
with open(json_tmp_file, 'w') as f:
f.write("{}")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create invalid format json file
with open(json_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
loader.load_json_file(json_tmp_file)
os.remove(json_tmp_file)
def test_load_testcases_bad_filepath(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
with self.assertRaises(exceptions.FileNotFound):
loader.load_file(testcase_file_path)
def test_load_json_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.json')
testcases = loader.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_yaml_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.yml')
testcases = loader.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_csv_file_one_parameter(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/user_agent.csv')
csv_content = loader.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'user_agent': 'iOS/10.1'},
{'user_agent': 'iOS/10.2'},
{'user_agent': 'iOS/10.3'}
]
)
def test_load_csv_file_multiple_parameters(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/account.csv')
csv_content = loader.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
)
def test_load_folder_files(self):
folder = os.path.join(os.getcwd(), 'tests')
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
file2 = os.path.join(os.getcwd(), 'tests', 'api', 'reset_all.yml')
files = loader.load_folder_files(folder, recursive=False)
self.assertEqual(files, [])
files = loader.load_folder_files(folder)
self.assertIn(file2, files)
self.assertNotIn(file1, files)
files = loader.load_folder_files("not_existed_foulder", recursive=False)
self.assertEqual([], files)
files = loader.load_folder_files(file2, recursive=False)
self.assertEqual([], files)
def test_load_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", ".env"
)
env_variables_mapping = loader.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "debugtalk")
def test_load_custom_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data", "test.env"
)
env_variables_mapping = loader.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "test")
self.assertEqual(env_variables_mapping["content_type"], "application/json; charset=UTF-8")
def test_load_env_path_not_exist(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data",
)
env_variables_mapping = loader.load_dot_env_file(dot_env_path)
self.assertEqual(env_variables_mapping, {})
def test_locate_file(self):
with self.assertRaises(exceptions.FileNotFound):
loader.locate_file(os.getcwd(), "debugtalk.py")
with self.assertRaises(exceptions.FileNotFound):
loader.locate_file("", "debugtalk.py")
start_path = os.path.join(os.getcwd(), "tests")
self.assertEqual(
loader.locate_file(start_path, "debugtalk.py"),
os.path.join(
os.getcwd(), "tests/debugtalk.py"
)
)
self.assertEqual(
loader.locate_file("tests/", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
loader.locate_file("tests", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
loader.locate_file("tests/base.py", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
loader.locate_file("tests/data/demo_testcase.yml", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
def test_load_folder_content(self):
path = os.path.join(os.getcwd(), "tests", "api")
items_mapping = loader.load_folder_content(path)
file_path = os.path.join(os.getcwd(), "tests", "api", "reset_all.yml")
self.assertIn(file_path, items_mapping)
self.assertIsInstance(items_mapping[file_path], dict)
class TestModuleLoader(unittest.TestCase): class TestModuleLoader(unittest.TestCase):
def test_filter_module_functions(self): def test_filter_module_functions(self):
module_functions = loader.load_module_functions(loader) module_functions = buildup.load_module_functions(buildup)
self.assertIn("load_module_functions", module_functions) self.assertIn("load_module_functions", module_functions)
self.assertNotIn("is_py3", module_functions) self.assertNotIn("is_py3", module_functions)
def test_load_debugtalk_module(self): def test_load_debugtalk_module(self):
loader.load_project_tests(os.path.join(os.getcwd(), "httprunner")) project_mapping = buildup.load_project_data(os.path.join(os.getcwd(), "httprunner"))
project_mapping = loader.project_mapping
self.assertNotIn("alter_response", project_mapping["functions"]) self.assertNotIn("alter_response", project_mapping["functions"])
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) project_mapping = buildup.load_project_data(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
self.assertIn("alter_response", project_mapping["functions"]) self.assertIn("alter_response", project_mapping["functions"])
is_status_code_200 = project_mapping["functions"]["is_status_code_200"] is_status_code_200 = project_mapping["functions"]["is_status_code_200"]
@@ -211,27 +25,27 @@ class TestModuleLoader(unittest.TestCase):
self.assertFalse(is_status_code_200(500)) self.assertFalse(is_status_code_200(500))
def test_load_debugtalk_py(self): def test_load_debugtalk_py(self):
loader.load_project_tests("tests/data/demo_testcase.yml") project_mapping = buildup.load_project_data("tests/data/demo_testcase.yml")
project_working_directory = loader.project_mapping["PWD"] project_working_directory = project_mapping["PWD"]
debugtalk_functions = loader.project_mapping["functions"] debugtalk_functions = project_mapping["functions"]
self.assertEqual( self.assertEqual(
project_working_directory, project_working_directory,
os.path.join(os.getcwd(), "tests") os.path.join(os.getcwd(), "tests")
) )
self.assertIn("gen_md5", debugtalk_functions) self.assertIn("gen_md5", debugtalk_functions)
loader.load_project_tests("tests/base.py") project_mapping = buildup.load_project_data("tests/base.py")
project_working_directory = loader.project_mapping["PWD"] project_working_directory = project_mapping["PWD"]
debugtalk_functions = loader.project_mapping["functions"] debugtalk_functions = project_mapping["functions"]
self.assertEqual( self.assertEqual(
project_working_directory, project_working_directory,
os.path.join(os.getcwd(), "tests") os.path.join(os.getcwd(), "tests")
) )
self.assertIn("gen_md5", debugtalk_functions) self.assertIn("gen_md5", debugtalk_functions)
loader.load_project_tests("httprunner/__init__.py") project_mapping = buildup.load_project_data("httprunner/__init__.py")
project_working_directory = loader.project_mapping["PWD"] project_working_directory = project_mapping["PWD"]
debugtalk_functions = loader.project_mapping["functions"] debugtalk_functions = project_mapping["functions"]
self.assertEqual( self.assertEqual(
project_working_directory, project_working_directory,
os.getcwd() os.getcwd()
@@ -243,9 +57,8 @@ class TestSuiteLoader(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) cls.project_mapping = buildup.load_project_data(os.path.join(os.getcwd(), "tests"))
cls.project_mapping = loader.project_mapping cls.tests_def_mapping = buildup.tests_def_mapping
cls.tests_def_mapping = loader.tests_def_mapping
def test_load_teststep_api(self): def test_load_teststep_api(self):
raw_test = { raw_test = {
@@ -255,7 +68,7 @@ class TestSuiteLoader(unittest.TestCase):
{"uid": "999"} {"uid": "999"}
] ]
} }
teststep = loader.load_teststep(raw_test) teststep = buildup.load_teststep(raw_test)
self.assertEqual( self.assertEqual(
"create user (override).", "create user (override).",
teststep["name"] teststep["name"]
@@ -273,7 +86,7 @@ class TestSuiteLoader(unittest.TestCase):
{"device_sn": "$device_sn"} {"device_sn": "$device_sn"}
] ]
} }
testcase = loader.load_teststep(raw_test) testcase = buildup.load_teststep(raw_test)
self.assertEqual( self.assertEqual(
"setup and reset all (override).", "setup and reset all (override).",
testcase["name"] testcase["name"]
@@ -284,7 +97,7 @@ class TestSuiteLoader(unittest.TestCase):
self.assertEqual(tests[1]["name"], "reset all users") self.assertEqual(tests[1]["name"], "reset all users")
def test_load_test_file_api(self): def test_load_test_file_api(self):
loaded_content = loader.load_test_file("tests/api/create_user.yml") loaded_content = buildup.load_test_file("tests/api/create_user.yml")
self.assertEqual(loaded_content["type"], "api") self.assertEqual(loaded_content["type"], "api")
self.assertIn("path", loaded_content) self.assertIn("path", loaded_content)
self.assertIn("request", loaded_content) self.assertIn("request", loaded_content)
@@ -292,8 +105,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testcase(self): def test_load_test_file_testcase(self):
for loaded_content in [ for loaded_content in [
loader.load_test_file("tests/testcases/setup.yml"), buildup.load_test_file("tests/testcases/setup.yml"),
loader.load_test_file("tests/testcases/setup.json") buildup.load_test_file("tests/testcases/setup.json")
]: ]:
self.assertEqual(loaded_content["type"], "testcase") self.assertEqual(loaded_content["type"], "testcase")
self.assertIn("path", loaded_content) self.assertIn("path", loaded_content)
@@ -304,8 +117,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testcase_v2(self): def test_load_test_file_testcase_v2(self):
for loaded_content in [ for loaded_content in [
loader.load_test_file("tests/testcases/setup.v2.yml"), buildup.load_test_file("tests/testcases/setup.v2.yml"),
loader.load_test_file("tests/testcases/setup.v2.json") buildup.load_test_file("tests/testcases/setup.v2.json")
]: ]:
self.assertEqual(loaded_content["type"], "testcase") self.assertEqual(loaded_content["type"], "testcase")
self.assertIn("path", loaded_content) self.assertIn("path", loaded_content)
@@ -316,8 +129,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testsuite(self): def test_load_test_file_testsuite(self):
for loaded_content in [ for loaded_content in [
loader.load_test_file("tests/testsuites/create_users.yml"), buildup.load_test_file("tests/testsuites/create_users.yml"),
loader.load_test_file("tests/testsuites/create_users.json") buildup.load_test_file("tests/testsuites/create_users.json")
]: ]:
self.assertEqual(loaded_content["type"], "testsuite") self.assertEqual(loaded_content["type"], "testsuite")
@@ -332,8 +145,8 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_test_file_testsuite_v2(self): def test_load_test_file_testsuite_v2(self):
for loaded_content in [ for loaded_content in [
loader.load_test_file("tests/testsuites/create_users.v2.yml"), buildup.load_test_file("tests/testsuites/create_users.v2.yml"),
loader.load_test_file("tests/testsuites/create_users.v2.json") buildup.load_test_file("tests/testsuites/create_users.v2.json")
]: ]:
self.assertEqual(loaded_content["type"], "testsuite") self.assertEqual(loaded_content["type"], "testsuite")
@@ -349,7 +162,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_api_file(self): def test_load_tests_api_file(self):
path = os.path.join( path = os.path.join(
os.getcwd(), 'tests/api/create_user.yml') os.getcwd(), 'tests/api/create_user.yml')
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
api_list = tests_mapping["apis"] api_list = tests_mapping["apis"]
self.assertEqual(len(api_list), 1) self.assertEqual(len(api_list), 1)
@@ -359,7 +172,7 @@ class TestSuiteLoader(unittest.TestCase):
# absolute file path # absolute file path
path = os.path.join( path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.json') os.getcwd(), 'tests/data/demo_testcase_hardcode.json')
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
testcases_list = tests_mapping["testcases"] testcases_list = tests_mapping["testcases"]
self.assertEqual(len(testcases_list), 1) self.assertEqual(len(testcases_list), 1)
@@ -368,7 +181,7 @@ class TestSuiteLoader(unittest.TestCase):
# relative file path # relative file path
path = 'tests/data/demo_testcase_hardcode.yml' path = 'tests/data/demo_testcase_hardcode.yml'
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
testcases_list = tests_mapping["testcases"] testcases_list = tests_mapping["testcases"]
self.assertEqual(len(testcases_list), 1) self.assertEqual(len(testcases_list), 1)
@@ -378,7 +191,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_testcase_file_2(self): def test_load_tests_testcase_file_2(self):
testcase_file_path = os.path.join( testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase.yml') os.getcwd(), 'tests/data/demo_testcase.yml')
tests_mapping = loader.load_tests(testcase_file_path) tests_mapping = loader.load_cases(testcase_file_path)
testcases = tests_mapping["testcases"] testcases = tests_mapping["testcases"]
self.assertIsInstance(testcases, list) self.assertIsInstance(testcases, list)
self.assertEqual(testcases[0]["config"]["name"], '123t$var_a') self.assertEqual(testcases[0]["config"]["name"], '123t$var_a')
@@ -398,7 +211,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_testcase_file_with_api_ref(self): def test_load_tests_testcase_file_with_api_ref(self):
path = os.path.join( path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_layer.yml') os.getcwd(), 'tests/data/demo_testcase_layer.yml')
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
testcases_list = tests_mapping["testcases"] testcases_list = tests_mapping["testcases"]
self.assertIn('device_sn', testcases_list[0]["config"]["variables"]) self.assertIn('device_sn', testcases_list[0]["config"]["variables"])
@@ -418,7 +231,7 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_testsuite_file_with_testcase_ref(self): def test_load_tests_testsuite_file_with_testcase_ref(self):
path = os.path.join( path = os.path.join(
os.getcwd(), 'tests/testsuites/create_users.yml') os.getcwd(), 'tests/testsuites/create_users.yml')
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
project_mapping = tests_mapping["project_mapping"] project_mapping = tests_mapping["project_mapping"]
testsuites_list = tests_mapping["testsuites"] testsuites_list = tests_mapping["testsuites"]
@@ -443,13 +256,13 @@ class TestSuiteLoader(unittest.TestCase):
def test_load_tests_folder_path(self): def test_load_tests_folder_path(self):
# absolute folder path # absolute folder path
path = os.path.join(os.getcwd(), 'tests/data') path = os.path.join(os.getcwd(), 'tests/data')
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
testcase_list_1 = tests_mapping["testcases"] testcase_list_1 = tests_mapping["testcases"]
self.assertGreater(len(testcase_list_1), 4) self.assertGreater(len(testcase_list_1), 4)
# relative folder path # relative folder path
path = 'tests/data/' path = 'tests/data/'
tests_mapping = loader.load_tests(path) tests_mapping = loader.load_cases(path)
testcase_list_2 = tests_mapping["testcases"] testcase_list_2 = tests_mapping["testcases"]
self.assertEqual(len(testcase_list_1), len(testcase_list_2)) self.assertEqual(len(testcase_list_1), len(testcase_list_2))
@@ -457,22 +270,22 @@ class TestSuiteLoader(unittest.TestCase):
# absolute folder path # absolute folder path
path = os.path.join(os.getcwd(), 'tests/data_not_exist') path = os.path.join(os.getcwd(), 'tests/data_not_exist')
with self.assertRaises(exceptions.FileNotFound): with self.assertRaises(exceptions.FileNotFound):
loader.load_tests(path) loader.load_cases(path)
# relative folder path # relative folder path
path = 'tests/data_not_exist' path = 'tests/data_not_exist'
with self.assertRaises(exceptions.FileNotFound): with self.assertRaises(exceptions.FileNotFound):
loader.load_tests(path) loader.load_cases(path)
def test_load_api_folder(self): def test_load_api_folder(self):
path = os.path.join(os.getcwd(), "tests", "api") path = os.path.join(os.getcwd(), "tests", "api")
api_definition_mapping = loader.load_api_folder(path) api_definition_mapping = buildup.load_api_folder(path)
api_file_path = os.path.join(os.getcwd(), "tests", "api", "get_token.yml") api_file_path = os.path.join(os.getcwd(), "tests", "api", "get_token.yml")
self.assertIn(api_file_path, api_definition_mapping) self.assertIn(api_file_path, api_definition_mapping)
self.assertIn("request", api_definition_mapping[api_file_path]) self.assertIn("request", api_definition_mapping[api_file_path])
def test_load_project_tests(self): def test_load_project_tests(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) buildup.load_project_data(os.path.join(os.getcwd(), "tests"))
api_file_path = os.path.join(os.getcwd(), "tests", "api", "get_token.yml") api_file_path = os.path.join(os.getcwd(), "tests", "api", "get_token.yml")
self.assertIn(api_file_path, self.tests_def_mapping["api"]) self.assertIn(api_file_path, self.tests_def_mapping["api"])
self.assertEqual(self.project_mapping["env"]["PROJECT_KEY"], "ABCDEFGH") self.assertEqual(self.project_mapping["env"]["PROJECT_KEY"], "ABCDEFGH")

View File

@@ -0,0 +1,76 @@
import unittest
from httprunner.loader import check
class TestLoaderCheck(unittest.TestCase):
def test_is_function(self):
func = lambda x: x + 1
self.assertTrue(check.is_function(func))
self.assertTrue(check.is_function(check.is_testcase))
def test_is_testcases(self):
data_structure = "path/to/file"
self.assertFalse(check.is_testcases(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(check.is_testcases(data_structure))
data_structure = {
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
# test_dict2 # another test dict
]
},
# testcase_dict_2 # another testcase dict
]
}
self.assertTrue(check.is_testcases(data_structure))
data_structure = [
{
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
},
{
"name": "desc2",
"config": {},
"api": {},
"testcases": ["testcase21", "testcase22"]
}
]
self.assertTrue(data_structure)
def test_is_variable(self):
var1 = 123
var2 = "abc"
self.assertTrue(check.is_variable(("var1", var1)))
self.assertTrue(check.is_variable(("var2", var2)))
__var = 123
self.assertFalse(check.is_variable(("__var", __var)))
func = lambda x: x + 1
self.assertFalse(check.is_variable(("func", func)))
self.assertFalse(check.is_variable(("unittest", unittest)))

View File

@@ -0,0 +1,159 @@
import os
import unittest
from httprunner import exceptions
from httprunner.loader import load
class TestFileLoader(unittest.TestCase):
def test_load_yaml_file_file_format_error(self):
yaml_tmp_file = "tests/data/tmp.yml"
# create empty yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
load._load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
# create invalid format yaml file
with open(yaml_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
load._load_yaml_file(yaml_tmp_file)
os.remove(yaml_tmp_file)
def test_load_json_file_file_format_error(self):
json_tmp_file = "tests/data/tmp.json"
# create empty file
with open(json_tmp_file, 'w') as f:
f.write("")
with self.assertRaises(exceptions.FileFormatError):
load._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create empty json file
with open(json_tmp_file, 'w') as f:
f.write("{}")
with self.assertRaises(exceptions.FileFormatError):
load._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
# create invalid format json file
with open(json_tmp_file, 'w') as f:
f.write("abc")
with self.assertRaises(exceptions.FileFormatError):
load._load_json_file(json_tmp_file)
os.remove(json_tmp_file)
def test_load_testcases_bad_filepath(self):
testcase_file_path = os.path.join(os.getcwd(), 'tests/data/demo')
with self.assertRaises(exceptions.FileNotFound):
load.load_file(testcase_file_path)
def test_load_json_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.json')
testcases = load.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_yaml_testcases(self):
testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase_hardcode.yml')
testcases = load.load_file(testcase_file_path)
self.assertEqual(len(testcases), 3)
test = testcases[0]["test"]
self.assertIn('name', test)
self.assertIn('request', test)
self.assertIn('url', test['request'])
self.assertIn('method', test['request'])
def test_load_csv_file_one_parameter(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/user_agent.csv')
csv_content = load.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'user_agent': 'iOS/10.1'},
{'user_agent': 'iOS/10.2'},
{'user_agent': 'iOS/10.3'}
]
)
def test_load_csv_file_multiple_parameters(self):
csv_file_path = os.path.join(
os.getcwd(), 'tests/data/account.csv')
csv_content = load.load_file(csv_file_path)
self.assertEqual(
csv_content,
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
)
def test_load_folder_files(self):
folder = os.path.join(os.getcwd(), 'tests')
file1 = os.path.join(os.getcwd(), 'tests', 'test_utils.py')
file2 = os.path.join(os.getcwd(), 'tests', 'api', 'reset_all.yml')
files = load.load_folder_files(folder, recursive=False)
self.assertEqual(files, [])
files = load.load_folder_files(folder)
self.assertIn(file2, files)
self.assertNotIn(file1, files)
files = load.load_folder_files("not_existed_foulder", recursive=False)
self.assertEqual([], files)
files = load.load_folder_files(file2, recursive=False)
self.assertEqual([], files)
def test_load_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", ".env"
)
env_variables_mapping = load.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "debugtalk")
def test_load_custom_dot_env_file(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data", "test.env"
)
env_variables_mapping = load.load_dot_env_file(dot_env_path)
self.assertIn("PROJECT_KEY", env_variables_mapping)
self.assertEqual(env_variables_mapping["UserName"], "test")
self.assertEqual(env_variables_mapping["content_type"], "application/json; charset=UTF-8")
def test_load_env_path_not_exist(self):
dot_env_path = os.path.join(
os.getcwd(), "tests", "data",
)
env_variables_mapping = load.load_dot_env_file(dot_env_path)
self.assertEqual(env_variables_mapping, {})
def test_load_folder_content(self):
path = os.path.join(os.getcwd(), "tests", "api")
items_mapping = load.load_folder_content(path)
file_path = os.path.join(os.getcwd(), "tests", "api", "reset_all.yml")
self.assertIn(file_path, items_mapping)
self.assertIsInstance(items_mapping[file_path], dict)

View File

@@ -0,0 +1,40 @@
import os
import unittest
from httprunner import exceptions
from httprunner.loader import locate
class TestLoaderLocate(unittest.TestCase):
def test_locate_file(self):
with self.assertRaises(exceptions.FileNotFound):
locate.locate_file(os.getcwd(), "debugtalk.py")
with self.assertRaises(exceptions.FileNotFound):
locate.locate_file("", "debugtalk.py")
start_path = os.path.join(os.getcwd(), "tests")
self.assertEqual(
locate.locate_file(start_path, "debugtalk.py"),
os.path.join(
os.getcwd(), "tests/debugtalk.py"
)
)
self.assertEqual(
locate.locate_file("tests/", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
locate.locate_file("tests", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
locate.locate_file("tests/base.py", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)
self.assertEqual(
locate.locate_file("tests/data/demo_testcase.yml", "debugtalk.py"),
os.path.join(os.getcwd(), "tests", "debugtalk.py")
)

View File

@@ -3,6 +3,7 @@ import time
import unittest import unittest
from httprunner import exceptions, loader, parser from httprunner import exceptions, loader, parser
from httprunner.loader import load
from tests.debugtalk import gen_random_string, sum_two from tests.debugtalk import gen_random_string, sum_two
@@ -806,6 +807,104 @@ class TestParserBasic(unittest.TestCase):
self.assertEqual(parsed_variables["var2"], "abc$123") self.assertEqual(parsed_variables["var2"], "abc$123")
self.assertEqual(parsed_variables["var3"], "abc$$num0") self.assertEqual(parsed_variables["var3"], "abc$$num0")
def test_get_uniform_comparator(self):
self.assertEqual(parser.get_uniform_comparator("eq"), "equals")
self.assertEqual(parser.get_uniform_comparator("=="), "equals")
self.assertEqual(parser.get_uniform_comparator("lt"), "less_than")
self.assertEqual(parser.get_uniform_comparator("le"), "less_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("gt"), "greater_than")
self.assertEqual(parser.get_uniform_comparator("ge"), "greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("ne"), "not_equals")
self.assertEqual(parser.get_uniform_comparator("str_eq"), "string_equals")
self.assertEqual(parser.get_uniform_comparator("len_eq"), "length_equals")
self.assertEqual(parser.get_uniform_comparator("count_eq"), "length_equals")
self.assertEqual(parser.get_uniform_comparator("len_gt"), "length_greater_than")
self.assertEqual(parser.get_uniform_comparator("count_gt"), "length_greater_than")
self.assertEqual(parser.get_uniform_comparator("count_greater_than"), "length_greater_than")
self.assertEqual(parser.get_uniform_comparator("len_ge"), "length_greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_ge"), "length_greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_greater_than_or_equals"), "length_greater_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("len_lt"), "length_less_than")
self.assertEqual(parser.get_uniform_comparator("count_lt"), "length_less_than")
self.assertEqual(parser.get_uniform_comparator("count_less_than"), "length_less_than")
self.assertEqual(parser.get_uniform_comparator("len_le"), "length_less_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_le"), "length_less_than_or_equals")
self.assertEqual(parser.get_uniform_comparator("count_less_than_or_equals"), "length_less_than_or_equals")
def test_parse_validator(self):
_validator = {"check": "status_code", "comparator": "eq", "expect": 201}
self.assertEqual(
parser.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
_validator = {'eq': ['status_code', 201]}
self.assertEqual(
parser.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
def test_extend_validators(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
def_validators = [
parser.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
parser.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = parser.extend_validators(def_validators, ref_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "equals"},
extended_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "length_equals"},
extended_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "length_equals"},
extended_validators
)
def test_extend_validators_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
def_validators = [
parser.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
parser.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = parser.extend_validators(def_validators, ref_validators)
self.assertEqual(len(extended_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'equals'}, extended_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'equals'}, extended_validators)
class TestParser(unittest.TestCase): class TestParser(unittest.TestCase):
def test_parse_parameters_raw_list(self): def test_parse_parameters_raw_list(self):
@@ -833,11 +932,11 @@ class TestParser(unittest.TestCase):
dot_env_path = os.path.join( dot_env_path = os.path.join(
os.getcwd(), "tests", ".env" os.getcwd(), "tests", ".env"
) )
loader.load_dot_env_file(dot_env_path) load.load_dot_env_file(dot_env_path)
from tests import debugtalk from tests import debugtalk
cartesian_product_parameters = parser.parse_parameters( cartesian_product_parameters = parser.parse_parameters(
parameters, parameters,
functions_mapping=loader.load_module_functions(debugtalk) functions_mapping=load.load_module_functions(debugtalk)
) )
self.assertIn( self.assertIn(
{ {
@@ -856,7 +955,7 @@ class TestParser(unittest.TestCase):
) )
def test_parse_parameters_parameterize(self): def test_parse_parameters_parameterize(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) loader.load_project_data(os.path.join(os.getcwd(), "tests"))
parameters = [ parameters = [
{"app_version": "${parameterize(data/app_version.csv)}"}, {"app_version": "${parameterize(data/app_version.csv)}"},
{"username-password": "${parameterize(data/account.csv)}"} {"username-password": "${parameterize(data/account.csv)}"}
@@ -868,8 +967,7 @@ class TestParser(unittest.TestCase):
) )
def test_parse_parameters_mix(self): def test_parse_parameters_mix(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) project_mapping = loader.load_project_data(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
parameters = [ parameters = [
{"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]}, {"user_agent": ["iOS/10.1", "iOS/10.2", "iOS/10.3"]},
@@ -886,7 +984,7 @@ class TestParser(unittest.TestCase):
def test_parse_tests_testcase(self): def test_parse_tests_testcase(self):
testcase_file_path = os.path.join( testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/demo_testcase.yml') os.getcwd(), 'tests/data/demo_testcase.yml')
tests_mapping = loader.load_tests(testcase_file_path) tests_mapping = loader.load_cases(testcase_file_path)
testcases = tests_mapping["testcases"] testcases = tests_mapping["testcases"]
self.assertEqual( self.assertEqual(
testcases[0]["config"]["variables"]["var_c"], testcases[0]["config"]["variables"]["var_c"],
@@ -1272,13 +1370,13 @@ class TestParser(unittest.TestCase):
parser.eval_lazy_data(content) parser.eval_lazy_data(content)
def test_extend_with_api(self): def test_extend_with_api(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) loader.load_project_data(os.path.join(os.getcwd(), "tests"))
raw_testinfo = { raw_testinfo = {
"name": "get token", "name": "get token",
"base_url": "https://github.com", "base_url": "https://github.com",
"api": "api/get_token.yml", "api": "api/get_token.yml",
} }
api_def_dict = loader.load_teststep(raw_testinfo) api_def_dict = loader.buildup.load_teststep(raw_testinfo)
test_block = { test_block = {
"name": "override block", "name": "override block",
"times": 3, "times": 3,

View File

@@ -1,6 +1,6 @@
import requests import requests
from httprunner import built_in, exceptions, loader, response from httprunner import exceptions, response
from httprunner.compat import basestring, bytes from httprunner.compat import basestring, bytes
from tests.api_server import HTTPBIN_SERVER from tests.api_server import HTTPBIN_SERVER
from tests.base import ApiServerUnittest from tests.base import ApiServerUnittest
@@ -8,9 +8,6 @@ from tests.base import ApiServerUnittest
class TestResponse(ApiServerUnittest): class TestResponse(ApiServerUnittest):
def setUp(self):
self.functions_mapping = loader.load_module_functions(built_in)
def test_parse_response_object_json(self): def test_parse_response_object_json(self):
url = "http://127.0.0.1:5000/api/users" url = "http://127.0.0.1:5000/api/users"
resp = requests.get(url) resp = requests.get(url)

View File

@@ -9,8 +9,7 @@ from tests.base import ApiServerUnittest
class TestRunner(ApiServerUnittest): class TestRunner(ApiServerUnittest):
def setUp(self): def setUp(self):
loader.load_project_tests(os.path.join(os.getcwd(), "tests")) project_mapping = loader.load_project_data(os.path.join(os.getcwd(), "tests"))
project_mapping = loader.project_mapping
self.debugtalk_functions = project_mapping["functions"] self.debugtalk_functions = project_mapping["functions"]
config = { config = {
@@ -35,7 +34,7 @@ class TestRunner(ApiServerUnittest):
] ]
for testcase_file_path in testcase_file_path_list: for testcase_file_path in testcase_file_path_list:
tests_mapping = loader.load_tests(testcase_file_path) tests_mapping = loader.load_cases(testcase_file_path)
parsed_testcases = parser.parse_tests(tests_mapping) parsed_testcases = parser.parse_tests(tests_mapping)
parsed_testcase = parsed_testcases[0] parsed_testcase = parsed_testcases[0]
test_runner = runner.Runner(parsed_testcase["config"]) test_runner = runner.Runner(parsed_testcase["config"])
@@ -289,7 +288,7 @@ class TestRunner(ApiServerUnittest):
def test_bugfix_type_match(self): def test_bugfix_type_match(self):
testcase_file_path = os.path.join( testcase_file_path = os.path.join(
os.getcwd(), 'tests/data/bugfix_type_match.yml') os.getcwd(), 'tests/data/bugfix_type_match.yml')
tests_mapping = loader.load_tests(testcase_file_path) tests_mapping = loader.load_cases(testcase_file_path)
parsed_testcases = parser.parse_tests(tests_mapping) parsed_testcases = parser.parse_tests(tests_mapping)
parsed_testcase = parsed_testcases[0] parsed_testcase = parsed_testcases[0]
test_runner = runner.Runner(parsed_testcase["config"]) test_runner = runner.Runner(parsed_testcase["config"])

View File

@@ -62,8 +62,8 @@ class TestUtils(ApiServerUnittest):
self.assertEqual(result, "L") self.assertEqual(result, "L")
def current_validators(self): def current_validators(self):
from httprunner import built_in from httprunner.builtin import comparators
functions_mapping = loader.load_module_functions(built_in) functions_mapping = loader.load.load_module_functions(comparators)
functions_mapping["equals"](None, None) functions_mapping["equals"](None, None)
functions_mapping["equals"](1, 1) functions_mapping["equals"](1, 1)
@@ -108,15 +108,6 @@ class TestUtils(ApiServerUnittest):
functions_mapping["type_match"]({}, "dict") functions_mapping["type_match"]({}, "dict")
functions_mapping["type_match"]({"a": 1}, "dict") functions_mapping["type_match"]({"a": 1}, "dict")
def test_deep_update_dict(self):
origin_dict = {'a': 1, 'b': {'c': 3, 'd': 4}, 'f': 6, 'h': 123}
override_dict = {'a': 2, 'b': {'c': 33, 'e': 5}, 'g': 7, 'h': None}
updated_dict = utils.deep_update_dict(origin_dict, override_dict)
self.assertEqual(
updated_dict,
{'a': 2, 'b': {'c': 33, 'd': 4, 'e': 5}, 'f': 6, 'g': 7, 'h': 123}
)
def test_handle_config_key_case(self): def test_handle_config_key_case(self):
origin_dict = { origin_dict = {
"Name": "test", "Name": "test",

View File

@@ -4,170 +4,4 @@ from httprunner import validator
class TestValidator(unittest.TestCase): class TestValidator(unittest.TestCase):
pass
def test_is_testcases(self):
data_structure = "path/to/file"
self.assertFalse(validator.is_testcases(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(validator.is_testcases(data_structure))
data_structure = {
"project_mapping": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': [], # optional
'validate': [],
'request': {}
},
# test_dict2 # another test dict
]
},
# testcase_dict_2 # another testcase dict
]
}
self.assertTrue(validator.is_testcases(data_structure))
data_structure = [
{
"name": "desc1",
"config": {},
"api": {},
"testcases": ["testcase11", "testcase12"]
},
{
"name": "desc2",
"config": {},
"api": {},
"testcases": ["testcase21", "testcase22"]
}
]
self.assertTrue(data_structure)
def test_is_variable(self):
var1 = 123
var2 = "abc"
self.assertTrue(validator.is_variable(("var1", var1)))
self.assertTrue(validator.is_variable(("var2", var2)))
__var = 123
self.assertFalse(validator.is_variable(("__var", __var)))
func = lambda x: x + 1
self.assertFalse(validator.is_variable(("func", func)))
self.assertFalse(validator.is_variable(("unittest", unittest)))
def test_is_function(self):
func = lambda x: x + 1
self.assertTrue(validator.is_function(func))
self.assertTrue(validator.is_function(validator.is_testcase))
def test_get_uniform_comparator(self):
self.assertEqual(validator.get_uniform_comparator("eq"), "equals")
self.assertEqual(validator.get_uniform_comparator("=="), "equals")
self.assertEqual(validator.get_uniform_comparator("lt"), "less_than")
self.assertEqual(validator.get_uniform_comparator("le"), "less_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("gt"), "greater_than")
self.assertEqual(validator.get_uniform_comparator("ge"), "greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("ne"), "not_equals")
self.assertEqual(validator.get_uniform_comparator("str_eq"), "string_equals")
self.assertEqual(validator.get_uniform_comparator("len_eq"), "length_equals")
self.assertEqual(validator.get_uniform_comparator("count_eq"), "length_equals")
self.assertEqual(validator.get_uniform_comparator("len_gt"), "length_greater_than")
self.assertEqual(validator.get_uniform_comparator("count_gt"), "length_greater_than")
self.assertEqual(validator.get_uniform_comparator("count_greater_than"), "length_greater_than")
self.assertEqual(validator.get_uniform_comparator("len_ge"), "length_greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_ge"), "length_greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_greater_than_or_equals"), "length_greater_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("len_lt"), "length_less_than")
self.assertEqual(validator.get_uniform_comparator("count_lt"), "length_less_than")
self.assertEqual(validator.get_uniform_comparator("count_less_than"), "length_less_than")
self.assertEqual(validator.get_uniform_comparator("len_le"), "length_less_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_le"), "length_less_than_or_equals")
self.assertEqual(validator.get_uniform_comparator("count_less_than_or_equals"), "length_less_than_or_equals")
def test_parse_validator(self):
_validator = {"check": "status_code", "comparator": "eq", "expect": 201}
self.assertEqual(
validator.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
_validator = {'eq': ['status_code', 201]}
self.assertEqual(
validator.uniform_validator(_validator),
{"check": "status_code", "comparator": "equals", "expect": 201}
)
def test_extend_validators(self):
def_validators = [
{'eq': ['v1', 200]},
{"check": "s2", "expect": 16, "comparator": "len_eq"}
]
current_validators = [
{"check": "v1", "expect": 201},
{'len_eq': ['s3', 12]}
]
def_validators = [
validator.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
validator.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = validator.extend_validators(def_validators, ref_validators)
self.assertIn(
{"check": "v1", "expect": 201, "comparator": "equals"},
extended_validators
)
self.assertIn(
{"check": "s2", "expect": 16, "comparator": "length_equals"},
extended_validators
)
self.assertIn(
{"check": "s3", "expect": 12, "comparator": "length_equals"},
extended_validators
)
def test_extend_validators_with_dict(self):
def_validators = [
{'eq': ["a", {"v": 1}]},
{'eq': [{"b": 1}, 200]}
]
current_validators = [
{'len_eq': ['s3', 12]},
{'eq': [{"b": 1}, 201]}
]
def_validators = [
validator.uniform_validator(_validator)
for _validator in def_validators
]
ref_validators = [
validator.uniform_validator(_validator)
for _validator in current_validators
]
extended_validators = validator.extend_validators(def_validators, ref_validators)
self.assertEqual(len(extended_validators), 3)
self.assertIn({'check': {'b': 1}, 'expect': 201, 'comparator': 'equals'}, extended_validators)
self.assertNotIn({'check': {'b': 1}, 'expect': 200, 'comparator': 'equals'}, extended_validators)