refactor: merge v3

This commit is contained in:
debugtalk
2020-04-23 21:10:59 +08:00
parent c21de865f3
commit 826fca83c2
39 changed files with 967 additions and 6021 deletions

View File

@@ -1,5 +1,5 @@
from httprunner.v3.runner import TestCaseRunner
from httprunner.v3.schema import TestsConfig, TestStep
from httprunner.runner import TestCaseRunner
from httprunner.schema import TestsConfig, TestStep
class TestCaseRequestMethodsHardcode(TestCaseRunner):

View File

@@ -26,8 +26,8 @@ teststeps:
validate:
- eq: ["status_code", 200]
- eq: ["body.args.foo1", "session_bar1"]
- eq: ["body.args.sum_v", 3]
- eq: ["body.args.foo2", "session_bar2"]
- eq: ["body.args.sum_v", "3"]
-
name: post raw text
variables:

View File

@@ -1,5 +1,5 @@
from httprunner.v3.runner import TestCaseRunner
from httprunner.v3.schema import TestsConfig, TestStep
from httprunner.runner import TestCaseRunner
from httprunner.schema import TestsConfig, TestStep
from examples.postman_echo import debugtalk

View File

@@ -1,5 +1,5 @@
from httprunner.v3.runner import TestCaseRunner
from httprunner.v3.schema import TestsConfig, TestStep
from httprunner.runner import TestCaseRunner
from httprunner.schema import TestsConfig, TestStep
class TestCaseRequestMethodsWithVariables(TestCaseRunner):

View File

@@ -1,5 +1,5 @@
from httprunner.v3.runner import TestCaseRunner
from httprunner.v3.schema import TestsConfig, TestStep
from httprunner.runner import TestCaseRunner
from httprunner.schema import TestsConfig, TestStep
from examples.postman_echo import debugtalk

View File

@@ -1,5 +1,5 @@
from httprunner.v3.runner import TestCaseRunner
from httprunner.v3.schema import TestsConfig, TestStep
from httprunner.runner import TestCaseRunner
from httprunner.schema import TestsConfig, TestStep
class TestCaseRequestMethodsValidateWithVariables(TestCaseRunner):

View File

@@ -1,11 +1,14 @@
import os
import sys
import unittest
from typing import List
from loguru import logger
from httprunner import (__version__, exceptions, loader, parser,
report, runner, utils)
from httprunner import report, loader, utils, exceptions, __version__
from httprunner.report import gen_html_report
from httprunner.runner import TestCaseRunner
from httprunner.schema import TestsMapping, TestCaseSummary, TestSuiteSummary
class HttpRunner(object):
@@ -23,11 +26,10 @@ class HttpRunner(object):
"""
def __init__(self, failfast=False, save_tests=False, log_level="WARNING", log_file=None):
def __init__(self, save_tests=False, log_level="WARNING", log_file=None):
""" initialize HttpRunner.
Args:
failfast (bool): stop the test run on the first error or failure.
save_tests (bool): save loaded/parsed tests to JSON file.
log_level (str): logging level.
log_file (str): log file path.
@@ -35,7 +37,7 @@ class HttpRunner(object):
"""
self.exception_stage = "initialize HttpRunner()"
kwargs = {
"failfast": failfast,
"failfast": True,
"resultclass": report.HtmlTestResult
}
@@ -51,90 +53,48 @@ class HttpRunner(object):
self._summary = None
self.test_path = None
def _add_tests(self, testcases):
""" initialize testcase with Runner() and add to test suite.
Args:
testcases (list): testcases list.
Returns:
unittest.TestSuite()
"""
def _add_test(test_runner, test_dict):
def _prepare_tests(self, tests: TestsMapping) -> List[unittest.TestSuite]:
def _add_test(test_runner: TestCaseRunner):
""" add test to testcase.
"""
def test(self):
try:
test_runner.run_test(test_dict)
test_runner.run()
except exceptions.MyBaseFailure as ex:
self.fail(str(ex))
finally:
self.step_datas = test_runner.step_datas
if "config" in test_dict:
# run nested testcase
test.__doc__ = test_dict["config"].get("name")
variables = test_dict["config"].get("variables", {})
else:
# run api test
test.__doc__ = test_dict.get("name")
variables = test_dict.get("variables", {})
if isinstance(test.__doc__, parser.LazyString):
try:
parsed_variables = parser.parse_variables_mapping(variables)
test.__doc__ = parser.parse_lazy_data(
test.__doc__, parsed_variables
)
except exceptions.VariableNotFound:
test.__doc__ = str(test.__doc__)
test.__doc__ = test_runner.config.name
return test
test_suite = unittest.TestSuite()
project_meta = tests.project_meta
testcases = tests.testcases
prepared_testcases: List[unittest.TestSuite] = []
for testcase in testcases:
config = testcase.get("config", {})
test_runner = runner.Runner(config)
testcase.config.variables.update(project_meta.variables)
testcase.config.functions.update(project_meta.functions)
test_runner = TestCaseRunner().init(testcase)
TestSequense = type('TestSequense', (unittest.TestCase,), {})
tests = testcase.get("teststeps", [])
for index, test_dict in enumerate(tests):
times = test_dict.get("times", 1)
try:
times = int(times)
except ValueError:
raise exceptions.ParamsError(
f"times should be digit, given: {times}")
for times_index in range(times):
# suppose one testcase should not have more than 9999 steps,
# and one step should not run more than 999 times.
test_method_name = 'test_{:04}_{:03}'.format(index, times_index)
test_method = _add_test(test_runner, test_dict)
setattr(TestSequense, test_method_name, test_method)
test_method = _add_test(test_runner)
setattr(TestSequense, "test_method_name", test_method)
loaded_testcase = self.test_loader.loadTestsFromTestCase(TestSequense)
setattr(loaded_testcase, "config", config)
setattr(loaded_testcase, "teststeps", tests)
setattr(loaded_testcase, "runner", test_runner)
test_suite.addTest(loaded_testcase)
setattr(loaded_testcase, "config", testcase.config)
prepared_testcases.append(loaded_testcase)
return test_suite
def _run_suite(self, test_suite):
""" run tests in test_suite
Args:
test_suite: unittest.TestSuite()
Returns:
list: tests_results
return prepared_testcases
def _run_suite(self, prepared_testcases: List[unittest.TestSuite]) -> List[TestCaseSummary]:
""" run prepared testcases
"""
tests_results = []
tests_results: List[TestCaseSummary] = []
for index, testcase in enumerate(test_suite):
for index, testcase in enumerate(prepared_testcases):
log_handler = None
if self.save_tests:
logs_file_abs_path = utils.prepare_log_file_abs_path(
@@ -142,72 +102,71 @@ class HttpRunner(object):
)
log_handler = logger.add(logs_file_abs_path, level="DEBUG")
testcase_name = testcase.config.get("name")
logger.info(f"Start to run testcase: {testcase_name}")
logger.info(f"Start to run testcase: {testcase.config.name}")
result = self.unittest_runner.run(testcase)
if result.wasSuccessful():
tests_results.append((testcase, result))
else:
tests_results.insert(0, (testcase, result))
testcase_summary = report.get_summary(result)
testcase_summary.in_out.vars = testcase.config.variables
testcase_summary.in_out.out = testcase.config.export
if self.save_tests and log_handler:
logger.remove(log_handler)
return tests_results
def _aggregate(self, tests_results):
""" aggregate results
Args:
tests_results (list): list of (testcase, result)
"""
summary = {
"success": True,
"stat": {
"testcases": {
"total": len(tests_results),
"success": 0,
"fail": 0
},
"teststeps": {}
},
"time": {},
"platform": report.get_platform(),
"details": []
}
for index, tests_result in enumerate(tests_results):
testcase, result = tests_result
testcase_summary = report.get_summary(result)
if testcase_summary["success"]:
summary["stat"]["testcases"]["success"] += 1
else:
summary["stat"]["testcases"]["fail"] += 1
summary["success"] &= testcase_summary["success"]
testcase_summary["name"] = testcase.config.get("name")
testcase_summary["in_out"] = utils.get_testcase_io(testcase)
report.aggregate_stat(summary["stat"]["teststeps"], testcase_summary["stat"])
report.aggregate_stat(summary["time"], testcase_summary["time"])
if self.save_tests:
logs_file_abs_path = utils.prepare_log_file_abs_path(
self.test_path, f"testcase_{index+1}.log"
)
testcase_summary["log"] = logs_file_abs_path
testcase_summary.log = logs_file_abs_path
summary["details"].append(testcase_summary)
if result.wasSuccessful():
tests_results.append(testcase_summary)
else:
tests_results.insert(0, testcase_summary)
return summary
return tests_results
def run_tests(self, tests_mapping):
def _aggregate(self, tests_results: List[TestCaseSummary]) -> TestSuiteSummary:
""" aggregate multiple testcase results
Args:
tests_results (list): list of testcase summary
"""
testsuite_summary = {
"success": True,
"stat": {
"total": len(tests_results),
"success": 0,
"fail": 0
},
"time": {},
"platform": report.get_platform(),
"testcases": []
}
for testcase_summary in tests_results:
if testcase_summary.success:
testsuite_summary["stat"]["success"] += 1
else:
testsuite_summary["stat"]["fail"] += 1
testsuite_summary["success"] &= testcase_summary.success
testsuite_summary["testcases"].append(testcase_summary)
total_duration = tests_results[-1].time.start_at + tests_results[-1].time.duration \
- tests_results[0].time.start_at
testsuite_summary["time"] = {
"start_at": tests_results[0].time.start_at,
"start_at_iso_format": tests_results[0].time.start_at_iso_format,
"duration": total_duration
}
return TestSuiteSummary.parse_obj(testsuite_summary)
def run_tests(self, tests_mapping) -> TestSuiteSummary:
""" run testcase/testsuite data
"""
self.test_path = tests_mapping.get("project_meta", {}).get("test_path", "")
tests = TestsMapping.parse_obj(tests_mapping)
self.test_path = tests.project_meta.test_path
if self.save_tests:
utils.dump_json_file(
@@ -215,34 +174,13 @@ class HttpRunner(object):
utils.prepare_log_file_abs_path(self.test_path, "loaded.json")
)
# parse tests
self.exception_stage = "parse tests"
parsed_testcases = parser.parse_tests(tests_mapping)
parse_failed_testfiles = parser.get_parse_failed_testfiles()
if parse_failed_testfiles:
logger.warning("parse failures occurred ...")
utils.dump_json_file(
parse_failed_testfiles,
utils.prepare_log_file_abs_path(self.test_path, "parse_failed.json")
)
# prepare testcases
self.exception_stage = "prepare testcases"
prepared_testcases = self._prepare_tests(tests)
if len(parsed_testcases) == 0:
logger.error("failed to parse all cases, abort.")
raise exceptions.ParseTestsFailure
if self.save_tests:
utils.dump_json_file(
parsed_testcases,
utils.prepare_log_file_abs_path(self.test_path, "parsed.json")
)
# add tests to test suite
self.exception_stage = "add tests to test suite"
test_suite = self._add_tests(parsed_testcases)
# run test suite
self.exception_stage = "run test suite"
results = self._run_suite(test_suite)
# run prepared testcases
self.exception_stage = "run prepared testcases"
results = self._run_suite(prepared_testcases)
# aggregate results
self.exception_stage = "aggregate results"
@@ -254,7 +192,7 @@ class HttpRunner(object):
if self.save_tests:
utils.dump_json_file(
self._summary,
self._summary.dict(),
utils.prepare_log_file_abs_path(self.test_path, "summary.json")
)
# save variables and export data
@@ -291,11 +229,11 @@ class HttpRunner(object):
return None
return [
summary["in_out"]
for summary in self._summary["details"]
testcase_summary.in_out.dict()
for testcase_summary in self._summary.testcases
]
def run_path(self, path, dot_env_path=None, mapping=None):
def run_path(self, path, dot_env_path=None, mapping=None) -> TestSuiteSummary:
""" run testcase/testsuite file or folder.
Args:
@@ -308,6 +246,7 @@ class HttpRunner(object):
"""
# load tests
logger.info(f"HttpRunner version: {__version__}")
self.exception_stage = "load tests"
tests_mapping = loader.load_cases(path, dot_env_path)
@@ -330,12 +269,15 @@ class HttpRunner(object):
dict: result summary
"""
logger.info(f"HttpRunner version: {__version__}")
if loader.is_test_path(path_or_tests):
return self.run_path(path_or_tests, dot_env_path, mapping)
elif loader.is_test_content(path_or_tests):
project_working_directory = path_or_tests.get("project_meta", {}).get("PWD", os.getcwd())
loader.init_pwd(project_working_directory)
return self.run_tests(path_or_tests)
else:
raise exceptions.ParamsError(f"Invalid testcase path or testcases: {path_or_tests}")
project_working_directory = path_or_tests.get("project_meta", {}).get("PWD", os.getcwd())
loader.init_pwd(project_working_directory)
return self.run_tests(path_or_tests)
def gen_html_report(self, report_template=None, report_dir=None, report_file=None):
if not self._summary:
return None
return gen_html_report(self._summary, report_template, report_dir, report_file)

View File

@@ -1,6 +1,6 @@
import unittest
from httprunner.v3.api import HttpRunner
from httprunner.api import HttpRunner
class TestHttpRunner(unittest.TestCase):

View File

@@ -20,7 +20,7 @@ $ pip install locustio
from loguru import logger
from httprunner import __description__, __version__
from httprunner.v3.api import HttpRunner
from httprunner.api import HttpRunner
from httprunner.ext.har2case import init_har2case_parser, main_har2case
from httprunner.ext.scaffold import init_parser_scaffold, main_scaffold
from httprunner.ext.locusts import init_parser_locusts, main_locusts

View File

@@ -9,7 +9,7 @@ from requests.exceptions import (InvalidSchema, InvalidURL, MissingSchema,
from httprunner import response
from httprunner.utils import lower_dict_keys, omit_long_data
from httprunner.v3.schema import SessionData
from httprunner.schema import SessionData
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

View File

@@ -1,66 +0,0 @@
import copy
from httprunner import parser, utils
class SessionContext(object):
""" HttpRunner session, store runtime variables.
Examples:
>>> variables = {"SECRET_KEY": "DebugTalk"}
>>> context = SessionContext(variables)
Equivalent to:
>>> context = SessionContext()
>>> context.update_session_variables(variables)
"""
def __init__(self, variables=None):
variables_mapping = utils.ensure_mapping_format(variables or {})
self.session_variables_mapping = parser.parse_variables_mapping(variables_mapping)
self.test_variables_mapping = {}
self.init_test_variables()
def init_test_variables(self, variables_mapping=None):
""" init test variables, called when each test(api) starts.
variables_mapping will be evaluated first.
Args:
variables_mapping (dict)
{
"random": "${gen_random_string(5)}",
"authorization": "${gen_md5($TOKEN, $data, $random)}",
"data": '{"name": "user", "password": "123456"}',
"TOKEN": "debugtalk",
}
"""
variables_mapping = copy.deepcopy(variables_mapping or {})
variables_mapping = utils.ensure_mapping_format(variables_mapping)
variables_mapping.update(self.session_variables_mapping)
parsed_variables_mapping = parser.parse_variables_mapping(variables_mapping)
self.test_variables_mapping = {}
# priority: extracted variable > teststep variable
self.test_variables_mapping.update(parsed_variables_mapping)
self.test_variables_mapping.update(self.session_variables_mapping)
def update_test_variables(self, variable_name, variable_value):
""" update test variables, these variables are only valid in the current test.
"""
self.test_variables_mapping[variable_name] = variable_value
def update_session_variables(self, variables_mapping):
""" update session with extracted variables mapping.
these variables are valid in the whole running session.
"""
variables_mapping = utils.ensure_mapping_format(variables_mapping)
self.session_variables_mapping.update(variables_mapping)
self.test_variables_mapping.update(self.session_variables_mapping)
def eval_content(self, content):
""" evaluate content recursively, take effect on each variable and function in content.
content may be in any data structure, include dict, list, tuple, number, string, etc.
"""
return parser.parse_lazy_data(content, self.test_variables_mapping)

View File

@@ -8,16 +8,14 @@ HttpRunner loader
"""
from httprunner.loader.check import is_test_path, is_test_content, JsonSchemaChecker
from httprunner.loader.buildup import load_cases, load_project_data
from httprunner.loader.check import is_test_path
from httprunner.loader.load import load_csv_file, load_builtin_functions
from httprunner.loader.locate import get_project_working_directory as get_pwd, \
init_project_working_directory as init_pwd
from httprunner.loader.load import load_csv_file, load_builtin_functions
from httprunner.loader.buildup import load_cases, load_project_data
__all__ = [
"is_test_path",
"is_test_content",
"JsonSchemaChecker",
"get_pwd",
"init_pwd",
"load_csv_file",

View File

@@ -4,7 +4,6 @@ import os
from loguru import logger
from httprunner import exceptions, utils
from httprunner.loader.check import JsonSchemaChecker
from httprunner.loader.load import load_module_functions, load_file, load_dot_env_file, \
load_folder_files
from httprunner.loader.locate import init_project_working_directory, get_project_working_directory
@@ -174,7 +173,6 @@ def load_testcase(raw_testcase):
}
"""
JsonSchemaChecker.validate_testcase_format(raw_testcase)
raw_teststeps = raw_testcase.pop("teststeps")
raw_testcase["teststeps"] = [
load_teststep(teststep)
@@ -220,7 +218,6 @@ def load_testsuite(raw_testsuite):
# invalid format
raise exceptions.FileFormatError("Invalid testsuite format!")
JsonSchemaChecker.validate_testsuite_format(raw_testsuite)
raw_testsuite["testcases"] = {}
for raw_testcase in raw_testcases:
__extend_with_testcase_ref(raw_testcase)
@@ -284,7 +281,6 @@ def load_test_file(path: str) -> dict:
elif "request" in raw_content:
# file_type: api
JsonSchemaChecker.validate_api_format(raw_content)
loaded_content = raw_content
loaded_content["path"] = path
loaded_content["type"] = "api"

View File

@@ -1,44 +1,5 @@
import os
from loguru import logger
from pydantic import ValidationError
from httprunner import exceptions
from httprunner.schema import Api, TestCase, TestSuite
class JsonSchemaChecker(object):
@staticmethod
def validate_api_format(content):
""" check api format if valid
"""
try:
Api.parse_obj(content)
except ValidationError as ex:
logger.error(ex)
raise exceptions.FileFormatError(ex)
@staticmethod
def validate_testcase_format(content):
""" check testcase format if valid
"""
try:
TestCase.parse_obj(content)
except ValidationError as ex:
logger.error(ex)
raise exceptions.FileFormatError(ex)
@staticmethod
def validate_testsuite_format(content):
""" check testsuite format if valid
"""
try:
TestSuite.parse_obj(content)
except ValidationError as ex:
logger.error(ex)
raise exceptions.FileFormatError(ex)
def is_test_path(path):
""" check if path is valid json/yaml file path or a existed directory.
@@ -80,77 +41,3 @@ def is_test_path(path):
else:
# path is neither a folder nor a file, maybe a symbol link or something else
return False
def is_test_content(data_structure):
""" check if data_structure is apis/testcases/testsuites.
Args:
data_structure (dict): should include keys, apis or testcases or testsuites
Returns:
bool: True if data_structure is valid apis/testcases/testsuites, otherwise False.
"""
if not isinstance(data_structure, dict):
return False
if "apis" in data_structure:
# maybe a group of api content
apis = data_structure["apis"]
if not isinstance(apis, list):
return False
for item in apis:
is_testcase = False
try:
JsonSchemaChecker.validate_api_format(item)
is_testcase = True
except exceptions.FileFormatError:
pass
if not is_testcase:
return False
return True
elif "testcases" in data_structure:
# maybe a testsuite, containing a group of testcases
testcases = data_structure["testcases"]
if not isinstance(testcases, list):
return False
for item in testcases:
is_testcase = False
try:
JsonSchemaChecker.validate_testcase_format(item)
is_testcase = True
except exceptions.FileFormatError:
pass
if not is_testcase:
return False
return True
elif "testsuites" in data_structure:
# maybe a group of testsuites
testsuites = data_structure["testsuites"]
if not isinstance(testsuites, list):
return False
for item in testsuites:
is_testcase = False
try:
JsonSchemaChecker.validate_testsuite_format(item)
is_testcase = True
except exceptions.FileFormatError:
pass
if not is_testcase:
return False
return True
else:
return False

View File

@@ -1,45 +0,0 @@
import unittest
from httprunner.loader import check
class TestLoaderCheck(unittest.TestCase):
def test_is_testcases(self):
data_structure = "path/to/file"
self.assertFalse(check.is_test_content(data_structure))
data_structure = ["path/to/file1", "path/to/file2"]
self.assertFalse(check.is_test_content(data_structure))
data_structure = {
"project_meta": {
"PWD": "XXXXX",
"functions": {},
"env": {}
},
"testcases": [
{ # testcase data structure
"config": {
"name": "desc1",
"path": "testcase1_path",
"variables": [], # optional
},
"teststeps": [
# test data structure
{
'name': 'test step desc1',
'variables': [], # optional
'extract': {}, # optional
'validate': [],
'request': {
"method": "GET",
"url": "https://docs.httprunner.org"
}
},
# test_dict2 # another test dict
]
},
# testcase_dict_2 # another testcase dict
]
}
self.assertTrue(check.is_test_content(data_structure))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@ from jinja2 import Template
from loguru import logger
from httprunner.exceptions import SummaryEmpty
from httprunner.v3.schema import TestSuiteSummary
from httprunner.schema import TestSuiteSummary
def gen_html_report(testsuite_summary: TestSuiteSummary, report_template=None, report_dir=None, report_file=None):

View File

@@ -1,12 +1,11 @@
import json
from base64 import b64encode
from collections import Iterable
from typing import List
from jinja2 import escape
from requests.cookies import RequestsCookieJar
from httprunner.v3.schema import TestSuiteSummary, SessionData
from httprunner.schema import TestSuiteSummary
def dumps_json(value):

View File

@@ -3,7 +3,7 @@ from datetime import datetime
from httprunner import __version__
from httprunner.report.html.result import HtmlTestResult
from httprunner.v3.schema import TestCaseSummary, TestCaseTime, TestCaseInOut
from httprunner.schema import TestCaseSummary, TestCaseTime, TestCaseInOut
def get_platform():

View File

@@ -1,18 +1,105 @@
import json
import re
from collections import OrderedDict
from typing import Dict, Text, Any, NoReturn
import jsonpath
import jmespath
import requests
from loguru import logger
from httprunner import exceptions, utils
from httprunner.exceptions import ValidationFailure, ParamsError
from httprunner.parser import parse_data, parse_string_value, get_mapping_function
from httprunner.schema import VariablesMapping, Validators, FunctionsMapping
text_extractor_regexp_compile = re.compile(r".*\(.*\).*")
def get_uniform_comparator(comparator: Text):
""" convert comparator alias to uniform name
"""
if comparator in ["eq", "equals", "==", "is"]:
return "equals"
elif comparator in ["lt", "less_than"]:
return "less_than"
elif comparator in ["le", "less_than_or_equals"]:
return "less_than_or_equals"
elif comparator in ["gt", "greater_than"]:
return "greater_than"
elif comparator in ["ge", "greater_than_or_equals"]:
return "greater_than_or_equals"
elif comparator in ["ne", "not_equals"]:
return "not_equals"
elif comparator in ["str_eq", "string_equals"]:
return "string_equals"
elif comparator in ["len_eq", "length_equals", "count_eq"]:
return "length_equals"
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
return "length_greater_than"
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals",
"count_greater_than_or_equals"]:
return "length_greater_than_or_equals"
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
return "length_less_than"
elif comparator in ["len_le", "count_le", "length_less_than_or_equals",
"count_less_than_or_equals"]:
return "length_less_than_or_equals"
else:
return comparator
def uniform_validator(validator):
""" unify validator
Args:
validator (dict): validator maybe in two formats:
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "assert": "eq", "expect": 201}
{"check": "$resp_body_success", "assert": "eq", "expect": True}
format2: recommended new version, {assert: [check_item, expected_value]}
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
Returns
dict: validator info
{
"check": "status_code",
"expect": 201,
"assert": "equals"
}
"""
if not isinstance(validator, dict):
raise ParamsError(f"invalid validator: {validator}")
if "check" in validator and "expect" in validator:
# format1
check_item = validator["check"]
expect_value = validator["expect"]
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise ParamsError(f"invalid validator: {validator}")
check_item, expect_value = compare_values
else:
raise ParamsError(f"invalid validator: {validator}")
# uniform comparator, e.g. lt => less_than, eq => equals
assert_method = get_uniform_comparator(comparator)
return {
"check": check_item,
"expect": expect_value,
"assert": assert_method
}
class ResponseObject(object):
def __init__(self, resp_obj):
def __init__(self, resp_obj: requests.Response):
""" initialize with a requests.Response object
Args:
@@ -20,6 +107,12 @@ class ResponseObject(object):
"""
self.resp_obj = resp_obj
self.resp_obj_meta = {
"status_code": resp_obj.status_code,
"headers": resp_obj.headers,
"body": resp_obj.json()
}
self.validation_results: Dict = {}
def __getattr__(self, key):
try:
@@ -35,269 +128,82 @@ class ResponseObject(object):
except AttributeError:
err_msg = f"ResponseObject does not have attribute: {key}"
logger.error(err_msg)
raise exceptions.ParamsError(err_msg)
raise ParamsError(err_msg)
def _extract_field_with_jsonpath(self, field: str) -> list:
""" extract field from response content with jsonpath expression.
JSONPath Docs: https://goessner.net/articles/JsonPath/
Args:
field: jsonpath expression, e.g. $.code, $..items.*.id
Returns:
A list that extracted from json response example. 1) [200] 2) [1, 2]
Raises:
exceptions.ExtractFailure: If no content matched with jsonpath expression.
Examples:
For example, response body like below:
{
"code": 200,
"data": {
"items": [{
"id": 1,
"name": "Bob"
},
{
"id": 2,
"name": "James"
}
]
},
"message": "success"
}
>>> _extract_field_with_regex("$.code")
[200]
>>> _extract_field_with_regex("$..items.*.id")
[1, 2]
"""
try:
json_body = self.json
assert json_body
result = jsonpath.jsonpath(json_body, field)
assert result
return result
except (AssertionError, exceptions.JSONDecodeError):
err_msg = f"Failed to extract data with jsonpath! => {field}\n"
err_msg += f"response body: {self.text}\n"
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
def _extract_field_with_regex(self, field):
""" extract field from response content with regex.
requests.Response body could be json or html text.
Args:
field (str): regex string that matched r".*\(.*\).*"
Returns:
str: matched content.
Raises:
exceptions.ExtractFailure: If no content matched with regex.
Examples:
>>> # self.text: "LB123abcRB789"
>>> filed = "LB[\d]*(.*)RB[\d]*"
>>> _extract_field_with_regex(field)
abc
"""
matched = re.search(field, self.text)
if not matched:
err_msg = f"Failed to extract data with regex! => {field}\n"
err_msg += f"response body: {self.text}\n"
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
return matched.group(1)
def _extract_field_with_delimiter(self, field):
""" response content could be json or html text.
Args:
field (str): string joined by delimiter.
e.g.
"status_code"
"headers"
"cookies"
"content"
"headers.content-type"
"content.person.name.first_name"
"""
# string.split(sep=None, maxsplit=1) -> list of strings
# e.g. "content.person.name" => ["content", "person.name"]
try:
top_query, sub_query = field.split('.', 1)
except ValueError:
top_query = field
sub_query = None
# status_code
if top_query in ["status_code", "encoding", "ok", "reason", "url"]:
if sub_query:
# status_code.XX
err_msg = f"Failed to extract: {field}\n"
logger.error(err_msg)
raise exceptions.ParamsError(err_msg)
return getattr(self, top_query)
# cookies
elif top_query == "cookies":
cookies = self.cookies
if not sub_query:
# extract cookies
return cookies
try:
return cookies[sub_query]
except KeyError:
err_msg = f"Failed to extract cookie! => {field}\n"
err_msg += f"response cookies: {cookies}\n"
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
# elapsed
elif top_query == "elapsed":
available_attributes = u"available attributes: days, seconds, microseconds, total_seconds"
if not sub_query:
err_msg = "elapsed is datetime.timedelta instance, attribute should also be specified!\n"
err_msg += available_attributes
logger.error(err_msg)
raise exceptions.ParamsError(err_msg)
elif sub_query in ["days", "seconds", "microseconds"]:
return getattr(self.elapsed, sub_query)
elif sub_query == "total_seconds":
return self.elapsed.total_seconds()
else:
err_msg = f"{sub_query} is not valid datetime.timedelta attribute.\n"
err_msg += available_attributes
logger.error(err_msg)
raise exceptions.ParamsError(err_msg)
# headers
elif top_query == "headers":
headers = self.headers
if not sub_query:
# extract headers
return headers
try:
return headers[sub_query]
except KeyError:
err_msg = f"Failed to extract header! => {field}\n"
err_msg += f"response headers: {headers}\n"
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
# response body
elif top_query in ["body", "content", "text", "json"]:
try:
body = self.json
except json.JSONDecodeError:
body = self.text
if not sub_query:
# extract response body
return body
if isinstance(body, (dict, list)):
# content = {"xxx": 123}, content.xxx
return utils.query_json(body, sub_query)
elif sub_query.isdigit():
# content = "abcdefg", content.3 => d
return utils.query_json(body, sub_query)
else:
# content = "<html>abcdefg</html>", content.xxx
err_msg = f"Failed to extract attribute from response body! => {field}\n"
err_msg += f"response body: {body}\n"
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
# new set response attributes in teardown_hooks
elif top_query in self.__dict__:
attributes = self.__dict__[top_query]
if not sub_query:
# extract response attributes
return attributes
if isinstance(attributes, (dict, list)):
# attributes = {"xxx": 123}, content.xxx
return utils.query_json(attributes, sub_query)
elif sub_query.isdigit():
# attributes = "abcdefg", attributes.3 => d
return utils.query_json(attributes, sub_query)
else:
# content = "attributes.new_attribute_not_exist"
err_msg = f"Failed to extract cumstom set attribute from teardown hooks! => {field}\n"
err_msg += f"response set attributes: {attributes}\n"
logger.error(err_msg)
raise exceptions.TeardownHooksFailure(err_msg)
# others
else:
err_msg = f"Failed to extract attribute from response! => {field}\n"
err_msg += "available response attributes: status_code, cookies, elapsed, headers, content, " \
"text, json, encoding, ok, reason, url.\n\n"
err_msg += "If you want to set attribute in teardown_hooks, take the following example as reference:\n"
err_msg += "response.new_attribute = 'new_attribute_value'\n"
logger.error(err_msg)
raise exceptions.ParamsError(err_msg)
def extract_field(self, field):
""" extract value from requests.Response.
"""
if not isinstance(field, str):
err_msg = f"Invalid extractor! => {field}\n"
logger.error(err_msg)
raise exceptions.ParamsError(err_msg)
msg = f"extract: {field}"
if field.startswith("$"):
value = self._extract_field_with_jsonpath(field)
elif text_extractor_regexp_compile.match(field):
value = self._extract_field_with_regex(field)
else:
value = self._extract_field_with_delimiter(field)
msg += f"\t=> {value}"
logger.debug(msg)
return value
def extract_response(self, extractors):
""" extract value from requests.Response and store in OrderedDict.
Args:
extractors (list):
[
{"resp_status_code": "status_code"},
{"resp_headers_content_type": "headers.content-type"},
{"resp_content": "content"},
{"resp_content_person_first_name": "content.person.name.first_name"}
]
Returns:
OrderDict: variable binds ordered dict
"""
def extract(self, extractors: Dict[Text, Text]) -> Dict[Text, Any]:
if not extractors:
return {}
logger.debug("start to extract from response object.")
extracted_variables_mapping = OrderedDict()
extract_binds_order_dict = utils.ensure_mapping_format(extractors)
extract_mapping = {}
for key, field in extractors.items():
field_value = jmespath.search(field, self.resp_obj_meta)
extract_mapping[key] = field_value
for key, field in extract_binds_order_dict.items():
extracted_variables_mapping[key] = self.extract_field(field)
logger.info(f"extract mapping: {extract_mapping}")
return extract_mapping
return extracted_variables_mapping
def validate(self,
validators: Validators,
variables_mapping: VariablesMapping = None,
functions_mapping: FunctionsMapping = None) -> NoReturn:
self.validation_results = {}
if not validators:
return
validate_pass = True
failures = []
for v in validators:
if "validate_extractor" not in self.validation_results:
self.validation_results["validate_extractor"] = []
u_validator = uniform_validator(v)
# check item
check_item = u_validator["check"]
check_value = jmespath.search(check_item, self.resp_obj_meta)
check_value = parse_string_value(check_value)
# comparator
assert_method = u_validator["assert"]
assert_func = get_mapping_function(assert_method, functions_mapping)
# expect item
expect_item = u_validator["expect"]
# parse expected value with config/teststep/extracted variables
expect_value = parse_data(expect_item, variables_mapping, functions_mapping)
validate_msg = f"assert {check_item} {assert_method} {expect_value}({type(expect_value).__name__})"
validator_dict = {
"comparator": assert_method,
"check": check_item,
"check_value": check_value,
"expect": expect_item,
"expect_value": expect_value
}
try:
assert_func(check_value, expect_value)
validate_msg += "\t==> pass"
logger.info(validate_msg)
validator_dict["check_result"] = "pass"
except AssertionError:
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += f"\n" \
f"check_item: {check_item}\n" \
f"check_value: {check_value}({type(check_value).__name__})\n" \
f"assert_method: {assert_method}\n" \
f"expect_value: {expect_value}({type(expect_value).__name__})"
logger.error(validate_msg)
failures.append(validate_msg)
self.validation_results["validate_extractor"].append(validator_dict)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise ValidationFailure(failures_string)

View File

@@ -1,261 +1,68 @@
from enum import Enum
from unittest.case import SkipTest
from typing import List, Dict
from loguru import logger
from httprunner import exceptions, response, utils
from httprunner import utils
from httprunner.client import HttpSession
from httprunner.context import SessionContext
from httprunner.validator import Validator
from httprunner.exceptions import ValidationFailure
from httprunner.parser import build_url, parse_data, parse_variables_mapping
from httprunner.response import ResponseObject
from httprunner.schema import TestsConfig, TestStep, VariablesMapping, TestCase, SessionData
class HookTypeEnum(Enum):
SETUP = 1
TEARDOWN = 2
class TestCaseRunner(object):
config: TestsConfig = {}
teststeps: List[TestStep] = []
session: HttpSession = None
step_datas: List[SessionData] = []
validation_results: Dict = {}
class Runner(object):
""" Running testcases.
def init(self, testcase: TestCase) -> "TestCaseRunner":
self.config = testcase.config
self.teststeps = testcase.teststeps
return self
Examples:
>>> tests_mapping = {
"project_meta": {
"functions": {}
},
"testcases": [
{
"config": {
"name": "XXXX",
"base_url": "http://127.0.0.1",
"verify": False
},
"teststeps": [
{
"name": "test description",
"variables": [], # optional
"request": {
"url": "http://127.0.0.1:5000/api/users/1000",
"method": "GET"
}
}
]
}
]
}
def with_session(self, s: HttpSession) -> "TestCaseRunner":
self.session = s
return self
>>> testcases = parser.parse_tests(tests_mapping)
>>> parsed_testcase = testcases[0]
def with_variables(self, **variables: VariablesMapping) -> "TestCaseRunner":
self.config.variables.update(variables)
return self
>>> test_runner = runner.Runner(parsed_testcase["config"])
>>> test_runner.run_test(parsed_testcase["teststeps"][0])
def __run_step(self, step: TestStep):
logger.info(f"run step: {step.name}")
"""
# parse
request_dict = step.request.dict()
parsed_request_dict = parse_data(request_dict, step.variables, self.config.functions)
def __init__(self, config, http_client_session=None):
""" run testcase or testsuite.
# prepare arguments
method = parsed_request_dict.pop("method")
url_path = parsed_request_dict.pop("url")
url = build_url(self.config.base_url, url_path)
Args:
config (dict): testcase/testsuite config dict
parsed_request_dict["json"] = parsed_request_dict.pop("req_json", {})
{
"name": "ABC",
"variables": {},
"setup_hooks", [],
"teardown_hooks", []
}
http_client_session (instance): requests.Session(), or locust.client.Session() instance.
"""
self.verify = config.get("verify", True)
self.export = config.get("export") or config.get("output", [])
config_variables = config.get("variables", {})
# testcase setup hooks
testcase_setup_hooks = config.get("setup_hooks", [])
# testcase teardown hooks
self.testcase_teardown_hooks = config.get("teardown_hooks", [])
self.http_client_session = http_client_session or HttpSession()
self.session_context = SessionContext(config_variables)
self.session_context.update_session_variables({
"variables": config_variables
})
if testcase_setup_hooks:
self.do_hook_actions(testcase_setup_hooks, HookTypeEnum.SETUP)
def __del__(self):
if self.testcase_teardown_hooks:
self.do_hook_actions(self.testcase_teardown_hooks, HookTypeEnum.TEARDOWN)
def __clear_test_data(self):
""" clear request and response data
"""
if not isinstance(self.http_client_session, HttpSession):
return
self.http_client_session.init_session_data()
def _handle_skip_feature(self, test_dict):
""" handle skip feature for test
- skip: skip current test unconditionally
- skipIf: skip current test if condition is true
- skipUnless: skip current test unless condition is true
Args:
test_dict (dict): test info
Raises:
SkipTest: skip test
"""
# TODO: move skip to initialize
skip_reason = None
if "skip" in test_dict:
skip_reason = test_dict["skip"]
elif "skipIf" in test_dict:
skip_if_condition = test_dict["skipIf"]
if self.session_context.eval_content(skip_if_condition):
skip_reason = f"{skip_if_condition} evaluate to True"
elif "skipUnless" in test_dict:
skip_unless_condition = test_dict["skipUnless"]
if not self.session_context.eval_content(skip_unless_condition):
skip_reason = f"{skip_unless_condition} evaluate to False"
if skip_reason:
raise SkipTest(skip_reason)
def do_hook_actions(self, actions, hook_type):
""" call hook actions.
Args:
actions (list): each action in actions list maybe in two format.
format1 (dict): assignment, the value returned by hook function will be assigned to variable.
{"var": "${func()}"}
format2 (str): only call hook functions.
${func()}
hook_type (HookTypeEnum): setup/teardown
"""
logger.debug(f"call {hook_type.name} hook actions.")
for action in actions:
if isinstance(action, dict) and len(action) == 1:
# format 1
# {"var": "${func()}"}
var_name, hook_content = list(action.items())[0]
hook_content_eval = self.session_context.eval_content(hook_content)
logger.debug(
f"assignment with hook: {var_name} = {hook_content} => {hook_content_eval}")
self.session_context.update_test_variables(
var_name, hook_content_eval
)
else:
# format 2
logger.debug(f"call hook function: {action}")
# TODO: check hook function if valid
self.session_context.eval_content(action)
def _run_test(self, test_dict):
""" run single teststep.
Args:
test_dict (dict): teststep info
{
"name": "teststep description",
"skip": "skip this test unconditionally",
"times": 3,
"variables": [], # optional, override
"request": {
"url": "http://127.0.0.1:5000/api/users/1000",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random"
},
"json": {"name": "user", "password": "123456"}
},
"extract": {}, # optional
"validate": [], # optional
"setup_hooks": [], # optional
"teardown_hooks": [] # optional
}
Raises:
exceptions.ParamsError
exceptions.ValidationFailure
exceptions.ExtractFailure
"""
# clear meta data first to ensure independence for each test
self.__clear_test_data()
# check skip
self._handle_skip_feature(test_dict)
# prepare
test_dict = utils.lower_test_dict_keys(test_dict)
test_variables = test_dict.get("variables", {})
self.session_context.init_test_variables(test_variables)
# teststep name
test_name = self.session_context.eval_content(test_dict.get("name", ""))
# parse test request
raw_request = test_dict.get('request', {})
parsed_test_request = self.session_context.eval_content(raw_request)
self.session_context.update_test_variables("request", parsed_test_request)
test_variables.update(self.session_context.session_variables_mapping["variables"])
self.session_context.update_test_variables("variables", test_variables)
# setup hooks
setup_hooks = test_dict.get("setup_hooks", [])
if setup_hooks:
self.do_hook_actions(setup_hooks, HookTypeEnum.SETUP)
# prepend url with base_url unless it's already an absolute URL
url = parsed_test_request.pop('url')
base_url = self.session_context.eval_content(test_dict.get("base_url", ""))
parsed_url = utils.build_url(base_url, url)
try:
method = parsed_test_request.pop('method')
parsed_test_request.setdefault("verify", self.verify)
group_name = parsed_test_request.pop("group", None)
except KeyError:
raise exceptions.ParamsError("URL or METHOD missed!")
logger.info(f"{method} {parsed_url}")
logger.debug(f"request kwargs(raw): {parsed_test_request}")
logger.info(f"{method} {url}")
logger.debug(f"request kwargs(raw): {parsed_request_dict}")
# request
resp = self.http_client_session.request(
method,
parsed_url,
name=(group_name or test_name),
**parsed_test_request
)
resp_obj = response.ResponseObject(resp)
self.session = self.session or HttpSession()
resp = self.session.request(method, url, **parsed_request_dict)
resp_obj = ResponseObject(resp)
def log_req_resp_details():
err_msg = "{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)
err_msg = "\n{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)
# log request
err_msg += "====== request details ======\n"
err_msg += f"url: {parsed_url}\n"
err_msg += f"url: {url}\n"
err_msg += f"method: {method}\n"
headers = parsed_test_request.pop("headers", {})
headers = parsed_request_dict.pop("headers", {})
err_msg += f"headers: {headers}\n"
for k, v in parsed_test_request.items():
for k, v in parsed_request_dict.items():
v = utils.omit_long_data(v)
err_msg += f"{k}: {repr(v)}\n"
@@ -268,144 +75,49 @@ class Runner(object):
err_msg += f"body: {repr(resp_obj.text)}\n"
logger.error(err_msg)
# teardown hooks
teardown_hooks = test_dict.get("teardown_hooks", [])
if teardown_hooks:
self.session_context.update_test_variables("response", resp_obj)
self.do_hook_actions(teardown_hooks, HookTypeEnum.TEARDOWN)
self.http_client_session.update_last_req_resp_record(resp_obj)
# extract
extractors = test_dict.get("extract", {})
try:
extracted_variables_mapping = resp_obj.extract_response(extractors)
self.session_context.update_session_variables(extracted_variables_mapping)
except (exceptions.ParamsError, exceptions.ExtractFailure):
log_req_resp_details()
raise
extractors = step.extract
extract_mapping = resp_obj.extract(extractors)
variables_mapping = step.variables
variables_mapping.update(extract_mapping)
# validate
validators = test_dict.get("validate") or test_dict.get("validators") or []
validate_script = test_dict.get("validate_script", [])
if validate_script:
validators.append({
"type": "python_script",
"script": validate_script
})
validator = Validator(self.session_context, resp_obj)
validators = step.validators
try:
validator.validate(validators)
except exceptions.ValidationFailure:
resp_obj.validate(validators, variables_mapping, self.config.functions)
self.session.data.status = "passed"
except ValidationFailure:
self.session.data.status = "failed"
log_req_resp_details()
raise
finally:
self.validation_results = validator.validation_results
self.validation_results = resp_obj.validation_results
# save request & response meta data
self.session.data.validators = self.validation_results
self.session.data.name = step.name
self.step_datas.append(self.session.data)
def _run_testcase(self, testcase_dict):
""" run single testcase.
"""
self.meta_datas = []
config = testcase_dict.get("config", {})
return extract_mapping
# each teststeps in one testcase (YAML/JSON) share the same session.
test_runner = Runner(config, self.http_client_session)
def test_start(self):
"""main entrance"""
self.step_datas.clear()
session_variables = {}
for step in self.teststeps:
# update with config variables
step.variables.update(self.config.variables)
# update with session variables extracted from former step
step.variables.update(session_variables)
# parse variables
step.variables = parse_variables_mapping(step.variables, self.config.functions)
# run step
extract_mapping = self.__run_step(step)
# save extracted variables to session variables
session_variables.update(extract_mapping)
tests = testcase_dict.get("teststeps", [])
return self
for index, test_dict in enumerate(tests):
# override current teststep variables with former testcase output variables
former_output_variables = self.session_context.test_variables_mapping
if former_output_variables:
test_dict.setdefault("variables", {})
test_dict["variables"].update(former_output_variables)
try:
test_runner.run_test(test_dict)
except Exception:
# log exception request_type and name for locust stat
self.exception_request_type = test_runner.exception_request_type
self.exception_name = test_runner.exception_name
raise
finally:
_meta_datas = test_runner.meta_datas
self.meta_datas.append(_meta_datas)
self.session_context.update_session_variables(
test_runner.export_variables(test_runner.export)
)
def run_test(self, test_dict):
""" run single teststep of testcase.
test_dict may be in 3 types.
Args:
test_dict (dict):
# teststep
{
"name": "teststep description",
"variables": [], # optional
"request": {
"url": "http://127.0.0.1:5000/api/users/1000",
"method": "GET"
}
}
# nested testcase
{
"config": {...},
"teststeps": [
{...},
{...}
]
}
# TODO: function
{
"name": "exec function",
"function": "${func()}"
}
"""
self.meta_datas = None
if "teststeps" in test_dict:
# nested testcase
test_dict.setdefault("config", {}).setdefault("variables", {})
test_dict["config"]["variables"].update(
self.session_context.session_variables_mapping)
self._run_testcase(test_dict)
else:
# api
self.validation_results = {}
try:
self._run_test(test_dict)
except Exception:
# log exception request_type and name for locust stat
self.exception_request_type = test_dict["request"]["method"]
self.exception_name = test_dict.get("name")
raise
finally:
# get request/response data and validate results
self.meta_datas = getattr(self.http_client_session, "data", {})
self.meta_datas["validators"] = self.validation_results
def export_variables(self, output_variables_list):
""" export current testcase variables
"""
variables_mapping = self.session_context.session_variables_mapping
output = {}
for variable in output_variables_list:
if variable not in variables_mapping:
logger.warning(
f"variable '{variable}' can not be found in variables mapping, "
"failed to export!"
)
continue
output[variable] = variables_mapping[variable]
utils.print_info(output)
return output
def run(self):
"""main entrance alias for test_start"""
return self.test_start()

View File

@@ -1,3 +0,0 @@
from .api import Api
from .testcase import ProjectMeta, TestCase
from .testsuite import TestSuite

View File

@@ -1,16 +0,0 @@
from typing import Dict, Text
from pydantic import BaseModel, Field
from httprunner.schema import common
class Api(BaseModel):
name: common.Name
request: common.Request
variables: common.Variables = {}
base_url: common.BaseUrl = ""
setup_hooks: common.Hook = []
teardown_hooks: common.Hook = []
extract: Dict[Text, Text] = {}
validation: common.Validate = Field([], alias="validate")

View File

@@ -1,60 +0,0 @@
from enum import Enum
from typing import Dict, List, Any, Text, Union
from pydantic import BaseModel, HttpUrl, Field
Name = Text
Url = Text
BaseUrl = Union[HttpUrl, Text]
Variables = Dict[Text, Any]
Headers = Dict[Text, Text]
Verify = bool
Hook = List[Text]
Export = List[Text]
Validate = List[Dict]
Env = Dict[Text, Any]
class MethodEnum(Text, Enum):
GET = 'GET'
POST = 'POST'
PUT = "PUT"
DELETE = "DELETE"
HEAD = "HEAD"
OPTIONS = "OPTIONS"
PATCH = "PATCH"
CONNECT = "CONNECT"
TRACE = "TRACE"
class TestsConfig(BaseModel):
name: Name
verify: Verify = False
base_url: BaseUrl = ""
variables: Variables = {}
setup_hooks: Hook = []
teardown_hooks: Hook = []
export: Export = []
class Config:
schema_extra = {
"examples": [
{
"name": "used in testcase/testsuite to configure common fields",
"verify": False,
"base_url": "https://httpbin.org"
}
]
}
class Request(BaseModel):
method: MethodEnum = MethodEnum.GET
url: Url
params: Dict[Text, Text] = {}
headers: Headers = {}
req_json: Dict = Field({}, alias="json")
cookies: Dict[Text, Text] = {}
timeout: int = 120
allow_redirects: bool = True
verify: Verify = False

View File

@@ -1,83 +0,0 @@
from typing import Dict, List, Text, Union
from pydantic import BaseModel, Field
from httprunner.schema import common
class ProjectMeta(BaseModel):
debugtalk_py: Text = ""
variables: common.Variables = {}
env: common.Env = {}
class TestStep(BaseModel):
name: common.Name
api: Text = None
testcase: Text = None
request: common.Request = None
variables: common.Variables = {}
extract: Union[Dict[Text, Text], List[Text]] = {}
validation: common.Validate = Field([], alias="validate")
class TestCase(BaseModel):
config: common.TestsConfig
teststeps: List[TestStep]
class Config:
schema_extra = {
"examples": [
{
"config": {
"name": "testcase name"
},
"teststeps": [
{
"name": "api 1",
"api": "/path/to/api1"
},
{
"name": "api 2",
"api": "/path/to/api2"
}
]
},
{
"config": {
"name": "demo testcase",
"variables": {
"device_sn": "ABC",
"username": "${ENV(USERNAME)}",
"password": "${ENV(PASSWORD)}"
},
"base_url": "http://127.0.0.1:5000"
},
"teststeps": [
{
"name": "demo step 1",
"api": "path/to/api1.yml",
"variables": {
"user_agent": "iOS/10.3",
"device_sn": "$device_sn"
},
"extract": {
"token": "content.token"
},
"validate": [
{
"eq": ["status_code", 200]
}
]
},
{
"name": "demo step 2",
"api": "path/to/api2.yml",
"variables": {
"token": "$token"
}
}
]
}
]
}

View File

@@ -1,17 +0,0 @@
from typing import List, Text
from pydantic import BaseModel
from httprunner.schema import common
class TestCase(BaseModel):
name: common.Name
testcase: Text
weight: int = 1
variables: common.Variables = {}
class TestSuite(BaseModel):
config: common.TestsConfig
testcases: List[TestCase]

View File

@@ -6,15 +6,11 @@ import io
import itertools
import json
import os.path
import re
from typing import Union
from loguru import logger
from httprunner import exceptions
from httprunner.exceptions import ParamsError
absolute_http_url_regexp = re.compile(r"^https?://", re.I)
def set_os_environ(variables_mapping):
@@ -52,74 +48,6 @@ def get_os_environ(variable_name):
raise exceptions.EnvNotFound(variable_name)
def build_url(base_url, path):
""" prepend url with base_url unless it's already an absolute URL """
if absolute_http_url_regexp.match(path):
return path
elif base_url:
return "{}/{}".format(base_url.rstrip("/"), path.lstrip("/"))
else:
raise ParamsError("base url missed!")
def query_json(json_content, query, delimiter='.'):
""" Do an xpath-like query with json_content.
Args:
json_content (dict/list/string): content to be queried.
query (str): query string.
delimiter (str): delimiter symbol.
Returns:
str: queried result.
Examples:
>>> json_content = {
"ids": [1, 2, 3, 4],
"person": {
"name": {
"first_name": "Leo",
"last_name": "Lee",
},
"age": 29,
"cities": ["Guangzhou", "Shenzhen"]
}
}
>>>
>>> query_json(json_content, "person.name.first_name")
>>> Leo
>>>
>>> query_json(json_content, "person.name.first_name.0")
>>> L
>>>
>>> query_json(json_content, "person.cities.0")
>>> Guangzhou
"""
raise_flag = False
response_body = f"response body: {json_content}\n"
try:
for key in query.split(delimiter):
if isinstance(json_content, (list, str, bytes)):
json_content = json_content[int(key)]
elif isinstance(json_content, dict):
json_content = json_content[key]
else:
logger.error(
f"invalid type value: {json_content}({type(json_content)})")
raise_flag = True
except (KeyError, ValueError, IndexError):
raise_flag = True
if raise_flag:
err_msg = f"Failed to extract! => {query}\n"
err_msg += response_body
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
return json_content
def lower_dict_keys(origin_dict):
""" convert keys in dict to lower case
@@ -158,21 +86,6 @@ def lower_dict_keys(origin_dict):
}
def lower_test_dict_keys(test_dict):
""" convert keys in test_dict to lower case, convertion will occur in two places:
1, all keys in test_dict;
2, all keys in test_dict["request"]
"""
# convert keys in test_dict
test_dict = lower_dict_keys(test_dict)
if "request" in test_dict:
# convert keys in test_dict["request"]
test_dict["request"] = lower_dict_keys(test_dict["request"])
return test_dict
def deepcopy_dict(data):
""" deepcopy dict data, ignore file object (_io.BufferedReader)
@@ -209,101 +122,6 @@ def deepcopy_dict(data):
return copied_data
def ensure_mapping_format(variables):
""" ensure variables are in mapping format.
Args:
variables (list/dict): original variables
Returns:
dict: ensured variables in dict format
Examples:
>>> variables = [
{"a": 1},
{"b": 2}
]
>>> print(ensure_mapping_format(variables))
{
"a": 1,
"b": 2
}
"""
if isinstance(variables, list):
variables_dict = {}
for map_dict in variables:
variables_dict.update(map_dict)
return variables_dict
elif isinstance(variables, dict):
return variables
else:
raise exceptions.ParamsError("variables format error!")
def extend_variables(raw_variables, override_variables):
""" extend raw_variables with override_variables.
override_variables will merge and override raw_variables.
Args:
raw_variables (list):
override_variables (list):
Returns:
dict: extended variables mapping
Examples:
>>> raw_variables = [{"var1": "val1"}, {"var2": "val2"}]
>>> override_variables = [{"var1": "val111"}, {"var3": "val3"}]
>>> extend_variables(raw_variables, override_variables)
{
'var1', 'val111',
'var2', 'val2',
'var3', 'val3'
}
"""
if not raw_variables:
override_variables_mapping = ensure_mapping_format(override_variables)
return override_variables_mapping
elif not override_variables:
raw_variables_mapping = ensure_mapping_format(raw_variables)
return raw_variables_mapping
else:
raw_variables_mapping = ensure_mapping_format(raw_variables)
override_variables_mapping = ensure_mapping_format(override_variables)
raw_variables_mapping.update(override_variables_mapping)
return raw_variables_mapping
def get_testcase_io(testcase):
""" get and print testcase input(variables) and output(export).
Args:
testcase (unittest.suite.TestSuite): corresponding to one YAML/JSON file, it has been set two attributes:
config: parsed config block
runner: initialized runner.Runner() with config
Returns:
dict: input(variables) and output mapping.
"""
test_runner = testcase.runner
variables = testcase.config.get("variables", {})
output_list = testcase.config.get("export") \
or testcase.config.get("output", [])
export_mapping = test_runner.export_variables(output_list)
return {
"in": variables,
"out": export_mapping
}
def print_info(info_mapping):
""" print info in mapping.

View File

@@ -2,7 +2,7 @@ import io
import os
import unittest
from httprunner import exceptions, loader, utils
from httprunner import loader, utils
class TestUtils(unittest.TestCase):
@@ -16,50 +16,6 @@ class TestUtils(unittest.TestCase):
self.assertIn("abc", os.environ)
self.assertEqual(os.environ["abc"], "123")
def test_query_json(self):
json_content = {
"ids": [1, 2, 3, 4],
"person": {
"name": {
"first_name": "Leo",
"last_name": "Lee",
},
"age": 29,
"cities": ["Guangzhou", "Shenzhen"]
}
}
query = "ids.2"
result = utils.query_json(json_content, query)
self.assertEqual(result, 3)
query = "ids.str_key"
with self.assertRaises(exceptions.ExtractFailure):
utils.query_json(json_content, query)
query = "ids.5"
with self.assertRaises(exceptions.ExtractFailure):
utils.query_json(json_content, query)
query = "person.age"
result = utils.query_json(json_content, query)
self.assertEqual(result, 29)
query = "person.not_exist_key"
with self.assertRaises(exceptions.ExtractFailure):
utils.query_json(json_content, query)
query = "person.cities.0"
result = utils.query_json(json_content, query)
self.assertEqual(result, "Guangzhou")
query = "person.name.first_name"
result = utils.query_json(json_content, query)
self.assertEqual(result, "Leo")
query = "person.name.first_name.0"
result = utils.query_json(json_content, query)
self.assertEqual(result, "L")
def current_validators(self):
from httprunner.builtin import comparators
functions_mapping = loader.load.load_module_functions(comparators)
@@ -112,33 +68,6 @@ class TestUtils(unittest.TestCase):
functions_mapping["type_match"]({}, "dict")
functions_mapping["type_match"]({"a": 1}, "dict")
def test_handle_config_key_case(self):
origin_dict = {
"Name": "test",
"Request": {
"url": "http://127.0.0.1:5000",
"METHOD": "POST",
"Headers": {
"Accept": "application/json",
"User-Agent": "ios/9.3"
}
}
}
new_dict = utils.lower_test_dict_keys(origin_dict)
self.assertIn("name", new_dict)
self.assertIn("request", new_dict)
self.assertIn("method", new_dict["request"])
self.assertIn("headers", new_dict["request"])
self.assertIn("Accept", new_dict["request"]["headers"])
self.assertIn("User-Agent", new_dict["request"]["headers"])
origin_dict = {
"Name": "test",
"Request": "$default_request"
}
new_dict = utils.lower_test_dict_keys(origin_dict)
self.assertIn("$default_request", new_dict["request"])
def test_lower_dict_keys(self):
request_dict = {
"url": "http://127.0.0.1:5000",
@@ -162,29 +91,6 @@ class TestUtils(unittest.TestCase):
new_request_dict = utils.lower_dict_keys(request_dict)
self.assertEqual(None, request_dict)
def test_ensure_mapping_format(self):
map_list = [
{"a": 1},
{"b": 2}
]
ordered_dict = utils.ensure_mapping_format(map_list)
self.assertIsInstance(ordered_dict, dict)
self.assertIn("a", ordered_dict)
def test_extend_variables(self):
raw_variables = [{"var1": "val1"}, {"var2": "val2"}]
override_variables = [{"var1": "val111"}, {"var3": "val3"}]
extended_variables_mapping = utils.extend_variables(raw_variables, override_variables)
self.assertEqual(extended_variables_mapping["var1"], "val111")
self.assertEqual(extended_variables_mapping["var2"], "val2")
self.assertEqual(extended_variables_mapping["var3"], "val3")
def test_extend_variables_fix(self):
raw_variables = [{"var1": "val1"}, {"var2": "val2"}]
override_variables = {}
extended_variables_mapping = utils.extend_variables(raw_variables, override_variables)
self.assertEqual(extended_variables_mapping["var1"], "val1")
def test_deepcopy_dict(self):
license_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)),

View File

@@ -1,287 +0,0 @@
import os
import sys
import unittest
from typing import List, Dict
from loguru import logger
from httprunner import report, loader, utils, exceptions, __version__
from httprunner.report import gen_html_report
from httprunner.v3.runner import TestCaseRunner
from httprunner.v3.schema import TestsMapping, TestCaseSummary, TestSuiteSummary
class HttpRunner(object):
""" Developer Interface: Main Interface
Usage:
from httprunner.api import HttpRunner
runner = HttpRunner(
failfast=True,
save_tests=True,
log_level="INFO",
log_file="test.log"
)
summary = runner.run(path_or_tests)
"""
def __init__(self, save_tests=False, log_level="WARNING", log_file=None):
""" initialize HttpRunner.
Args:
save_tests (bool): save loaded/parsed tests to JSON file.
log_level (str): logging level.
log_file (str): log file path.
"""
self.exception_stage = "initialize HttpRunner()"
kwargs = {
"failfast": True,
"resultclass": report.HtmlTestResult
}
logger.remove()
log_level = log_level.upper()
logger.add(sys.stdout, level=log_level)
if log_file:
logger.add(log_file, level=log_level)
self.unittest_runner = unittest.TextTestRunner(**kwargs)
self.test_loader = unittest.TestLoader()
self.save_tests = save_tests
self._summary = None
self.test_path = None
def _prepare_tests(self, tests: TestsMapping) -> List[unittest.TestSuite]:
def _add_test(test_runner: TestCaseRunner):
""" add test to testcase.
"""
def test(self):
try:
test_runner.run()
except exceptions.MyBaseFailure as ex:
self.fail(str(ex))
finally:
self.step_datas = test_runner.step_datas
test.__doc__ = test_runner.config.name
return test
project_meta = tests.project_meta
testcases = tests.testcases
prepared_testcases: List[unittest.TestSuite] = []
for testcase in testcases:
testcase.config.variables.update(project_meta.variables)
testcase.config.functions.update(project_meta.functions)
test_runner = TestCaseRunner().init(testcase)
TestSequense = type('TestSequense', (unittest.TestCase,), {})
test_method = _add_test(test_runner)
setattr(TestSequense, "test_method_name", test_method)
loaded_testcase = self.test_loader.loadTestsFromTestCase(TestSequense)
setattr(loaded_testcase, "config", testcase.config)
# setattr(loaded_testcase, "teststeps", testcase.teststeps)
# setattr(loaded_testcase, "runner", test_runner)
prepared_testcases.append(loaded_testcase)
return prepared_testcases
def _run_suite(self, prepared_testcases: List[unittest.TestSuite]) -> List[TestCaseSummary]:
""" run prepared testcases
"""
tests_results: List[TestCaseSummary] = []
for index, testcase in enumerate(prepared_testcases):
log_handler = None
if self.save_tests:
logs_file_abs_path = utils.prepare_log_file_abs_path(
self.test_path, f"testcase_{index+1}.log"
)
log_handler = logger.add(logs_file_abs_path, level="DEBUG")
logger.info(f"Start to run testcase: {testcase.config.name}")
result = self.unittest_runner.run(testcase)
testcase_summary = report.get_summary(result)
testcase_summary.in_out.vars = testcase.config.variables
testcase_summary.in_out.out = testcase.config.export
if self.save_tests and log_handler:
logger.remove(log_handler)
logs_file_abs_path = utils.prepare_log_file_abs_path(
self.test_path, f"testcase_{index+1}.log"
)
testcase_summary.log = logs_file_abs_path
if result.wasSuccessful():
tests_results.append(testcase_summary)
else:
tests_results.insert(0, testcase_summary)
return tests_results
def _aggregate(self, tests_results: List[TestCaseSummary]) -> TestSuiteSummary:
""" aggregate multiple testcase results
Args:
tests_results (list): list of testcase summary
"""
testsuite_summary = {
"success": True,
"stat": {
"total": len(tests_results),
"success": 0,
"fail": 0
},
"time": {},
"platform": report.get_platform(),
"testcases": []
}
for testcase_summary in tests_results:
if testcase_summary.success:
testsuite_summary["stat"]["success"] += 1
else:
testsuite_summary["stat"]["fail"] += 1
testsuite_summary["success"] &= testcase_summary.success
testsuite_summary["testcases"].append(testcase_summary)
total_duration = tests_results[-1].time.start_at + tests_results[-1].time.duration \
- tests_results[0].time.start_at
testsuite_summary["time"] = {
"start_at": tests_results[0].time.start_at,
"start_at_iso_format": tests_results[0].time.start_at_iso_format,
"duration": total_duration
}
return TestSuiteSummary.parse_obj(testsuite_summary)
def run_tests(self, tests_mapping) -> TestSuiteSummary:
""" run testcase/testsuite data
"""
tests = TestsMapping.parse_obj(tests_mapping)
self.test_path = tests.project_meta.test_path
if self.save_tests:
utils.dump_json_file(
tests_mapping,
utils.prepare_log_file_abs_path(self.test_path, "loaded.json")
)
# prepare testcases
self.exception_stage = "prepare testcases"
prepared_testcases = self._prepare_tests(tests)
# run prepared testcases
self.exception_stage = "run prepared testcases"
results = self._run_suite(prepared_testcases)
# aggregate results
self.exception_stage = "aggregate results"
self._summary = self._aggregate(results)
# generate html report
self.exception_stage = "generate html report"
report.stringify_summary(self._summary)
if self.save_tests:
utils.dump_json_file(
self._summary.dict(),
utils.prepare_log_file_abs_path(self.test_path, "summary.json")
)
# save variables and export data
vars_out = self.get_vars_out()
utils.dump_json_file(
vars_out,
utils.prepare_log_file_abs_path(self.test_path, "io.json")
)
return self._summary
def get_vars_out(self):
""" get variables and output
Returns:
list: list of variables and output.
if tests are parameterized, list items are corresponded to parameters.
[
{
"in": {
"user1": "leo"
},
"out": {
"out1": "out_value_1"
}
},
{...}
]
None: returns None if tests not started or finished or corrupted.
"""
if not self._summary:
return None
return [
testcase_summary.in_out.dict()
for testcase_summary in self._summary.testcases
]
def run_path(self, path, dot_env_path=None, mapping=None) -> TestSuiteSummary:
""" run testcase/testsuite file or folder.
Args:
path (str): testcase/testsuite file/foler path.
dot_env_path (str): specified .env file path.
mapping (dict): if mapping is specified, it will override variables in config block.
Returns:
dict: result summary
"""
# load tests
logger.info(f"HttpRunner version: {__version__}")
self.exception_stage = "load tests"
tests_mapping = loader.load_cases(path, dot_env_path)
if mapping:
tests_mapping["project_meta"]["variables"] = mapping
return self.run_tests(tests_mapping)
def run(self, path_or_tests, dot_env_path=None, mapping=None):
""" main interface.
Args:
path_or_tests:
str: testcase/testsuite file/foler path
dict: valid testcase/testsuite data
dot_env_path (str): specified .env file path.
mapping (dict): if mapping is specified, it will override variables in config block.
Returns:
dict: result summary
"""
if loader.is_test_path(path_or_tests):
return self.run_path(path_or_tests, dot_env_path, mapping)
elif loader.is_test_content(path_or_tests):
project_working_directory = path_or_tests.get("project_meta", {}).get("PWD", os.getcwd())
loader.init_pwd(project_working_directory)
return self.run_tests(path_or_tests)
else:
raise exceptions.ParamsError(f"Invalid testcase path or testcases: {path_or_tests}")
def gen_html_report(self, report_template=None, report_dir=None, report_file=None):
if not self._summary:
return None
return gen_html_report(self._summary, report_template, report_dir, report_file)

View File

@@ -1,421 +0,0 @@
import ast
import builtins
import re
from typing import Any, Set, Text, Callable, List, Dict
from httprunner import loader, utils, exceptions
from httprunner.v3.schema import VariablesMapping, FunctionsMapping
absolute_http_url_regexp = re.compile(r"^https?://", re.I)
# use $$ to escape $ notation
dolloar_regex_compile = re.compile(r"\$\$")
# variable notation, e.g. ${var} or $var
variable_regex_compile = re.compile(r"\$\{(\w+)\}|\$(\w+)")
# function notation, e.g. ${func1($var_1, $var_3)}
function_regex_compile = re.compile(r"\$\{(\w+)\(([\$\w\.\-/\s=,]*)\)\}")
def parse_string_value(str_value: Text) -> Any:
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def build_url(base_url, path):
""" prepend url with base_url unless it's already an absolute URL """
if absolute_http_url_regexp.match(path):
return path
elif base_url:
return "{}/{}".format(base_url.rstrip("/"), path.lstrip("/"))
else:
raise exceptions.ParamsError("base url missed!")
def regex_findall_variables(content: Text) -> List[Text]:
""" extract all variable names from content, which is in format $variable
Args:
content (str): string content
Returns:
list: variables list extracted from string content
Examples:
>>> regex_findall_variables("$variable")
["variable"]
>>> regex_findall_variables("/blog/$postid")
["postid"]
>>> regex_findall_variables("/$var1/$var2")
["var1", "var2"]
>>> regex_findall_variables("abc")
[]
"""
try:
vars_list = []
for var_tuple in variable_regex_compile.findall(content):
vars_list.append(
var_tuple[0] or var_tuple[1]
)
return vars_list
except TypeError:
return []
def regex_findall_functions(content: Text) -> List[Text]:
""" extract all functions from string content, which are in format ${fun()}
Args:
content (str): string content
Returns:
list: functions list extracted from string content
Examples:
>>> regex_findall_functions("${func(5)}")
["func(5)"]
>>> regex_findall_functions("${func(a=1, b=2)}")
["func(a=1, b=2)"]
>>> regex_findall_functions("/api/1000?_t=${get_timestamp()}")
["get_timestamp()"]
>>> regex_findall_functions("/api/${add(1, 2)}")
["add(1, 2)"]
>>> regex_findall_functions("/api/${add(1, 2)}?_t=${get_timestamp()}")
["add(1, 2)", "get_timestamp()"]
"""
try:
return function_regex_compile.findall(content)
except TypeError:
return []
def extract_variables(content: Any) -> Set:
""" extract all variables in content recursively.
"""
if isinstance(content, (list, set, tuple)):
variables = set()
for item in content:
variables = variables | extract_variables(item)
return variables
elif isinstance(content, dict):
variables = set()
for key, value in content.items():
variables = variables | extract_variables(value)
return variables
elif isinstance(content, str):
return set(regex_findall_variables(content))
return set()
def parse_function_params(params: Text) -> Dict:
""" parse function params to args and kwargs.
Args:
params (str): function param in string
Returns:
dict: function meta dict
{
"args": [],
"kwargs": {}
}
Examples:
>>> parse_function_params("")
{'args': [], 'kwargs': {}}
>>> parse_function_params("5")
{'args': [5], 'kwargs': {}}
>>> parse_function_params("1, 2")
{'args': [1, 2], 'kwargs': {}}
>>> parse_function_params("a=1, b=2")
{'args': [], 'kwargs': {'a': 1, 'b': 2}}
>>> parse_function_params("1, 2, a=3, b=4")
{'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
function_meta = {
"args": [],
"kwargs": {}
}
params_str = params.strip()
if params_str == "":
return function_meta
args_list = params_str.split(',')
for arg in args_list:
arg = arg.strip()
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key.strip()] = parse_string_value(value.strip())
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
def get_mapping_variable(variable_name: Text, variables_mapping: VariablesMapping) -> Any:
""" get variable from variables_mapping.
Args:
variable_name (str): variable name
variables_mapping (dict): variables mapping
Returns:
mapping variable value.
Raises:
exceptions.VariableNotFound: variable is not found.
"""
# TODO: get variable from debugtalk module and environ
try:
return variables_mapping[variable_name]
except KeyError:
raise exceptions.VariableNotFound(f"{variable_name} not found in {variables_mapping}")
def get_mapping_function(function_name: Text, functions_mapping: FunctionsMapping) -> Callable:
""" get function from functions_mapping,
if not found, then try to check if builtin function.
Args:
function_name (str): function name
functions_mapping (dict): functions mapping
Returns:
mapping function object.
Raises:
exceptions.FunctionNotFound: function is neither defined in debugtalk.py nor builtin.
"""
if function_name in functions_mapping:
return functions_mapping[function_name]
elif function_name in ["parameterize", "P"]:
return loader.load_csv_file
elif function_name in ["environ", "ENV"]:
return utils.get_os_environ
elif function_name in ["multipart_encoder", "multipart_content_type"]:
# extension for upload test
from httprunner.ext import uploader
return getattr(uploader, function_name)
try:
# check if HttpRunner builtin functions
built_in_functions = loader.load_builtin_functions()
return built_in_functions[function_name]
except KeyError:
pass
try:
# check if Python builtin functions
return getattr(builtins, function_name)
except AttributeError:
pass
raise exceptions.FunctionNotFound(f"{function_name} is not found.")
def parse_string(
raw_string: Text,
variables_mapping: VariablesMapping,
functions_mapping: FunctionsMapping) -> Any:
""" parse string content with variables and functions mapping.
Args:
raw_string: raw string content to be parsed.
variables_mapping: variables mapping.
functions_mapping: functions mapping.
Returns:
str: parsed string content.
Examples:
>>> raw_string = "abc${add_one($num)}def"
>>> variables_mapping = {"num": 3}
>>> functions_mapping = {"add_one": lambda x: x + 1}
>>> parse_string(raw_string, variables_mapping, functions_mapping)
"abc4def"
"""
try:
match_start_position = raw_string.index("$", 0)
parsed_string = raw_string[0:match_start_position]
except ValueError:
parsed_string = raw_string
return parsed_string
while match_start_position < len(raw_string):
# Notice: notation priority
# $$ > ${func($a, $b)} > $var
# search $$
dollar_match = dolloar_regex_compile.match(raw_string, match_start_position)
if dollar_match:
match_start_position = dollar_match.end()
parsed_string += "$"
continue
# search function like ${func($a, $b)}
func_match = function_regex_compile.match(raw_string, match_start_position)
if func_match:
func_name = func_match.group(1)
func = get_mapping_function(func_name, functions_mapping)
func_params_str = func_match.group(2)
function_meta = parse_function_params(func_params_str)
args = function_meta["args"]
kwargs = function_meta["kwargs"]
parsed_args = parse_data(args, variables_mapping, functions_mapping)
parsed_kwargs = parse_data(kwargs, variables_mapping, functions_mapping)
func_eval_value = func(*parsed_args, **parsed_kwargs)
func_raw_str = "${" + func_name + f"({func_params_str})" + "}"
if func_raw_str == raw_string:
# raw_string is a function, e.g. "${add_one(3)}", return its eval value directly
return func_eval_value
# raw_string contains one or many functions, e.g. "abc${add_one(3)}def"
parsed_string += str(func_eval_value)
match_start_position = func_match.end()
continue
# search variable like ${var} or $var
var_match = variable_regex_compile.match(raw_string, match_start_position)
if var_match:
var_name = var_match.group(1) or var_match.group(2)
var_value = get_mapping_variable(var_name, variables_mapping)
if f"${var_name}" == raw_string or "${" + var_name + "}" == raw_string:
# raw_string is a variable, $var or ${var}, return its value directly
return var_value
# raw_string contains one or many variables, e.g. "abc${var}def"
parsed_string += str(var_value)
match_start_position = var_match.end()
continue
curr_position = match_start_position
try:
# find next $ location
match_start_position = raw_string.index("$", curr_position + 1)
remain_string = raw_string[curr_position:match_start_position]
except ValueError:
remain_string = raw_string[curr_position:]
# break while loop
match_start_position = len(raw_string)
parsed_string += remain_string
return parsed_string
def parse_data(
raw_data: Any,
variables_mapping: VariablesMapping = None,
functions_mapping: FunctionsMapping = None) -> Any:
""" parse raw data with evaluated variables mapping.
Notice: variables_mapping should not contain any variable or function.
"""
if isinstance(raw_data, str):
# content in string format may contains variables and functions
variables_mapping = variables_mapping or {}
functions_mapping = functions_mapping or {}
raw_data = raw_data.strip()
return parse_string(raw_data, variables_mapping, functions_mapping)
elif isinstance(raw_data, (list, set, tuple)):
return [
parse_data(item, variables_mapping, functions_mapping)
for item in raw_data
]
elif isinstance(raw_data, dict):
parsed_data = {}
for key, value in raw_data.items():
parsed_key = parse_data(key, variables_mapping, functions_mapping)
parsed_value = parse_data(value, variables_mapping, functions_mapping)
parsed_data[parsed_key] = parsed_value
return parsed_data
else:
# other types, e.g. None, int, float, bool
return raw_data
def parse_variables_mapping(
variables_mapping: VariablesMapping,
functions_mapping: FunctionsMapping = None) -> VariablesMapping:
parsed_variables: VariablesMapping = {}
while len(parsed_variables) != len(variables_mapping):
for var_name in variables_mapping:
if var_name in parsed_variables:
continue
var_value = variables_mapping[var_name]
variables = extract_variables(var_value)
# check if reference variable itself
if var_name in variables:
# e.g.
# variables_mapping = {"token": "abc$token"}
# variables_mapping = {"key": ["$key", 2]}
raise exceptions.VariableNotFound(var_name)
# check if reference variable not in variables_mapping
not_defined_variables = [
v_name
for v_name in variables
if v_name not in variables_mapping
]
if not_defined_variables:
# e.g. {"varA": "123$varB", "varB": "456$varC"}
# e.g. {"varC": "${sum_two($a, $b)}"}
raise exceptions.VariableNotFound(not_defined_variables)
try:
parsed_value = parse_data(
var_value, parsed_variables, functions_mapping)
except exceptions.VariableNotFound:
continue
parsed_variables[var_name] = parsed_value
return parsed_variables

View File

@@ -1,528 +0,0 @@
import time
import unittest
from httprunner.v3 import parser
from httprunner.exceptions import VariableNotFound, FunctionNotFound
class TestParserBasic(unittest.TestCase):
def test_parse_variables_mapping(self):
variables = {
"varA": "$varB",
"varB": "$varC",
"varC": "123",
"a": 1,
"b": 2
}
parsed_variables = parser.parse_variables_mapping(variables)
print(parsed_variables)
self.assertEqual(parsed_variables["varA"], "123")
self.assertEqual(parsed_variables["varB"], "123")
def test_parse_variables_mapping_exception(self):
variables = {
"varA": "$varB",
"varB": "$varC",
"a": 1,
"b": 2
}
with self.assertRaises(VariableNotFound):
parser.parse_variables_mapping(variables)
def test_parse_string_value(self):
self.assertEqual(parser.parse_string_value("123"), 123)
self.assertEqual(parser.parse_string_value("12.3"), 12.3)
self.assertEqual(parser.parse_string_value("a123"), "a123")
self.assertEqual(parser.parse_string_value("$var"), "$var")
self.assertEqual(parser.parse_string_value("${func}"), "${func}")
def test_extract_variables(self):
self.assertEqual(
parser.extract_variables("$var"),
{"var"}
)
self.assertEqual(
parser.extract_variables("$var123"),
{"var123"}
)
self.assertEqual(
parser.extract_variables("$var_name"),
{"var_name"}
)
self.assertEqual(
parser.extract_variables("var"),
set()
)
self.assertEqual(
parser.extract_variables("a$var"),
{"var"}
)
self.assertEqual(
parser.extract_variables("$v ar"),
{"v"}
)
self.assertEqual(
parser.extract_variables(" "),
set()
)
self.assertEqual(
parser.extract_variables("$abc*"),
{"abc"}
)
self.assertEqual(
parser.extract_variables("${func()}"),
set()
)
self.assertEqual(
parser.extract_variables("${func(1,2)}"),
set()
)
self.assertEqual(
parser.extract_variables("${gen_md5($TOKEN, $data, $random)}"),
{"TOKEN", "data", "random"}
)
def test_parse_function_params(self):
self.assertEqual(
parser.parse_function_params(""),
{'args': [], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function_params("5"),
{'args': [5], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function_params("1, 2"),
{'args': [1, 2], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function_params("a=1, b=2"),
{'args': [], 'kwargs': {'a': 1, 'b': 2}}
)
self.assertEqual(
parser.parse_function_params("a= 1, b =2"),
{'args': [], 'kwargs': {'a': 1, 'b': 2}}
)
self.assertEqual(
parser.parse_function_params("1, 2, a=3, b=4"),
{'args': [1, 2], 'kwargs': {'a': 3, 'b': 4}}
)
self.assertEqual(
parser.parse_function_params("$request, 123"),
{'args': ["$request", 123], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function_params(" "),
{'args': [], 'kwargs': {}}
)
self.assertEqual(
parser.parse_function_params("hello world, a=3, b=4"),
{'args': ["hello world"], 'kwargs': {'a': 3, 'b': 4}}
)
self.assertEqual(
parser.parse_function_params("$request, 12 3"),
{'args': ["$request", '12 3'], 'kwargs': {}}
)
def test_extract_functions(self):
self.assertEqual(
parser.regex_findall_functions("${func()}"),
[("func", "")]
)
self.assertEqual(
parser.regex_findall_functions("${func(5)}"),
[("func", "5")]
)
self.assertEqual(
parser.regex_findall_functions("${func(a=1, b=2)}"),
[("func", "a=1, b=2")]
)
self.assertEqual(
parser.regex_findall_functions("${func(1, $b, c=$x, d=4)}"),
[("func", "1, $b, c=$x, d=4")]
)
self.assertEqual(
parser.regex_findall_functions("/api/1000?_t=${get_timestamp()}"),
[("get_timestamp", "")]
)
self.assertEqual(
parser.regex_findall_functions("/api/${add(1, 2)}"),
[("add", "1, 2")]
)
self.assertEqual(
parser.regex_findall_functions("/api/${add(1, 2)}?_t=${get_timestamp()}"),
[('add', '1, 2'), ('get_timestamp', '')]
)
self.assertEqual(
parser.regex_findall_functions("abc${func(1, 2, a=3, b=4)}def"),
[('func', '1, 2, a=3, b=4')]
)
def test_parse_data_string_with_variables(self):
variables_mapping = {
"var_1": "abc",
"var_2": "def",
"var_3": 123,
"var_4": {"a": 1},
"var_5": True,
"var_6": None
}
self.assertEqual(
parser.parse_data("$var_1", variables_mapping),
"abc"
)
self.assertEqual(
parser.parse_data("${var_1}", variables_mapping),
"abc"
)
self.assertEqual(
parser.parse_data("var_1", variables_mapping),
"var_1"
)
self.assertEqual(
parser.parse_data("$var_1#XYZ", variables_mapping),
"abc#XYZ"
)
self.assertEqual(
parser.parse_data("${var_1}#XYZ", variables_mapping),
"abc#XYZ"
)
self.assertEqual(
parser.parse_data("/$var_1/$var_2/var3", variables_mapping),
"/abc/def/var3"
)
self.assertEqual(
parser.parse_data("$var_3", variables_mapping),
123
)
self.assertEqual(
parser.parse_data("$var_4", variables_mapping),
{"a": 1}
)
self.assertEqual(
parser.parse_data("$var_5", variables_mapping),
True
)
self.assertEqual(
parser.parse_data("abc$var_5", variables_mapping),
"abcTrue"
)
self.assertEqual(
parser.parse_data("abc$var_4", variables_mapping),
"abc{'a': 1}"
)
self.assertEqual(
parser.parse_data("$var_6", variables_mapping),
None
)
with self.assertRaises(VariableNotFound):
parser.parse_data("/api/$SECRET_KEY", variables_mapping)
self.assertEqual(
parser.parse_data(["$var_1", "$var_2"], variables_mapping),
["abc", "def"]
)
self.assertEqual(
parser.parse_data({"$var_1": "$var_2"}, variables_mapping),
{"abc": "def"}
)
# format: $var
value = parser.parse_data("ABC$var_1", variables_mapping)
self.assertEqual(value, "ABCabc")
value = parser.parse_data("ABC$var_1$var_3", variables_mapping)
self.assertEqual(value, "ABCabc123")
value = parser.parse_data("ABC$var_1/$var_3", variables_mapping)
self.assertEqual(value, "ABCabc/123")
value = parser.parse_data("ABC$var_1/", variables_mapping)
self.assertEqual(value, "ABCabc/")
value = parser.parse_data("ABC$var_1$", variables_mapping)
self.assertEqual(value, "ABCabc$")
value = parser.parse_data("ABC$var_1/123$var_1/456", variables_mapping)
self.assertEqual(value, "ABCabc/123abc/456")
value = parser.parse_data("ABC$var_1/$var_2/$var_1", variables_mapping)
self.assertEqual(value, "ABCabc/def/abc")
value = parser.parse_data("func1($var_1, $var_3)", variables_mapping)
self.assertEqual(value, "func1(abc, 123)")
# format: ${var}
value = parser.parse_data("ABC${var_1}", variables_mapping)
self.assertEqual(value, "ABCabc")
value = parser.parse_data("ABC${var_1}${var_3}", variables_mapping)
self.assertEqual(value, "ABCabc123")
value = parser.parse_data("ABC${var_1}/${var_3}", variables_mapping)
self.assertEqual(value, "ABCabc/123")
value = parser.parse_data("ABC${var_1}/", variables_mapping)
self.assertEqual(value, "ABCabc/")
value = parser.parse_data("ABC${var_1}123", variables_mapping)
self.assertEqual(value, "ABCabc123")
value = parser.parse_data("ABC${var_1}/123${var_1}/456", variables_mapping)
self.assertEqual(value, "ABCabc/123abc/456")
value = parser.parse_data("ABC${var_1}/${var_2}/${var_1}", variables_mapping)
self.assertEqual(value, "ABCabc/def/abc")
value = parser.parse_data("func1(${var_1}, ${var_3})", variables_mapping)
self.assertEqual(value, "func1(abc, 123)")
def test_parse_data_multiple_identical_variables(self):
variables_mapping = {
"var_1": "abc",
"var_2": "def",
}
self.assertEqual(
parser.parse_data("/$var_1/$var_2/$var_1", variables_mapping),
"/abc/def/abc"
)
variables_mapping = {
"userid": 100,
"data": 1498
}
content = "/users/$userid/training/$data?userId=$userid&data=$data"
self.assertEqual(
parser.parse_data(content, variables_mapping),
"/users/100/training/1498?userId=100&data=1498"
)
variables_mapping = {
"user": 100,
"userid": 1000,
"data": 1498
}
content = "/users/$user/$userid/$data?userId=$userid&data=$data"
self.assertEqual(
parser.parse_data(content, variables_mapping),
"/users/100/1000/1498?userId=1000&data=1498"
)
def test_parse_data_string_with_functions(self):
import random, string
functions_mapping = {
"gen_random_string": lambda str_len: ''.join(random.choice(string.ascii_letters + string.digits) \
for _ in range(str_len))
}
result = parser.parse_data("${gen_random_string(5)}", functions_mapping=functions_mapping)
self.assertEqual(len(result), 5)
add_two_nums = lambda a, b=1: a + b
functions_mapping["add_two_nums"] = add_two_nums
self.assertEqual(
parser.parse_data("${add_two_nums(1)}", functions_mapping=functions_mapping),
2
)
self.assertEqual(
parser.parse_data("${add_two_nums(1, 2)}", functions_mapping=functions_mapping),
3
)
self.assertEqual(
parser.parse_data("/api/${add_two_nums(1, 2)}", functions_mapping=functions_mapping),
"/api/3"
)
with self.assertRaises(FunctionNotFound):
parser.parse_data("/api/${gen_md5(abc)}")
variables_mapping = {
"var_1": "abc",
"var_2": "def",
"var_3": 123,
"var_4": {"a": 1},
"var_5": True,
"var_6": None
}
functions_mapping = {
"func1": lambda x, y: str(x) + str(y)
}
value = parser.parse_data("${func1($var_1, $var_3)}", variables_mapping, functions_mapping)
self.assertEqual(value, "abc123")
value = parser.parse_data("ABC${func1($var_1, $var_3)}DE", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc123DE")
value = parser.parse_data("ABC${func1($var_1, $var_3)}$var_5", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc123True")
value = parser.parse_data("ABC${func1($var_1, $var_3)}DE$var_4", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc123DE{'a': 1}")
value = parser.parse_data("ABC$var_5${func1($var_1, $var_3)}", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCTrueabc123")
value = parser.parse_data("ABC${ord(a)}DEF${len(abcd)}", variables_mapping, functions_mapping)
self.assertEqual(value, "ABC97DEF4")
def test_parse_data_func_var_duplicate(self):
variables_mapping = {
"var_1": "abc",
"var_2": "def",
"var_3": 123,
"var_4": {"a": 1},
"var_5": True,
"var_6": None
}
functions_mapping = {
"func1": lambda x, y: str(x) + str(y)
}
value = parser.parse_data(
"ABC${func1($var_1, $var_3)}--${func1($var_1, $var_3)}",
variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc123--abc123")
value = parser.parse_data("ABC${func1($var_1, $var_3)}$var_1", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc123abc")
value = parser.parse_data(
"ABC${func1($var_1, $var_3)}$var_1--${func1($var_1, $var_3)}$var_1",
variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc123abc--abc123abc")
def test_parse_data_func_abnormal(self):
variables_mapping = {
"var_1": "abc",
"var_2": "def",
"var_3": 123,
"var_4": {"a": 1},
"var_5": True,
"var_6": None
}
functions_mapping = {
"func1": lambda x, y: str(x) + str(y)
}
# {
value = parser.parse_data("ABC$var_1{", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc{")
value = parser.parse_data("{ABC$var_1{}a}", variables_mapping, functions_mapping)
self.assertEqual(value, "{ABCabc{}a}")
value = parser.parse_data("AB{C$var_1{}a}", variables_mapping, functions_mapping)
self.assertEqual(value, "AB{Cabc{}a}")
# }
value = parser.parse_data("ABC$var_1}", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc}")
# $$
value = parser.parse_data("ABC$$var_1{", variables_mapping, functions_mapping)
self.assertEqual(value, "ABC$var_1{")
# $$$
value = parser.parse_data("ABC$$$var_1{", variables_mapping, functions_mapping)
self.assertEqual(value, "ABC$abc{")
# $$$$
value = parser.parse_data("ABC$$$$var_1{", variables_mapping, functions_mapping)
self.assertEqual(value, "ABC$$var_1{")
# ${
value = parser.parse_data("ABC$var_1${", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc${")
value = parser.parse_data("ABC$var_1${a", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc${a")
# $}
value = parser.parse_data("ABC$var_1$}a", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc$}a")
# }{
value = parser.parse_data("ABC$var_1}{a", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc}{a")
# {}
value = parser.parse_data("ABC$var_1{}a", variables_mapping, functions_mapping)
self.assertEqual(value, "ABCabc{}a")
def test_parse_data_request(self):
content = {
'request': {
'url': '/api/users/$uid',
'method': "$method",
'headers': {'token': '$token'},
'data': {
"null": None,
"true": True,
"false": False,
"empty_str": "",
"value": "abc${add_one(3)}def"
}
}
}
variables_mapping = {
"uid": 1000,
"method": "POST",
"token": "abc123"
}
functions_mapping = {
"add_one": lambda x: x + 1
}
result = parser.parse_data(content, variables_mapping, functions_mapping)
self.assertEqual("/api/users/1000", result["request"]["url"])
self.assertEqual("abc123", result["request"]["headers"]["token"])
self.assertEqual("POST", result["request"]["method"])
self.assertIsNone(result["request"]["data"]["null"])
self.assertTrue(result["request"]["data"]["true"])
self.assertFalse(result["request"]["data"]["false"])
self.assertEqual("", result["request"]["data"]["empty_str"])
self.assertEqual("abc4def", result["request"]["data"]["value"])
def test_parse_data_testcase(self):
variables = {
"uid": "1000",
"random": "A2dEx",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"data": {"name": "user", "password": "123456"}
}
functions = {
"add_two_nums": lambda a, b=1: a + b,
"get_timestamp": lambda: int(time.time() * 1000)
}
testcase_template = {
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1,2)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
parsed_testcase = parser.parse_data(testcase_template, variables, functions)
self.assertEqual(
parsed_testcase["url"],
"http://127.0.0.1:5000/api/users/1000/3"
)
self.assertEqual(
parsed_testcase["headers"]["authorization"],
variables["authorization"]
)
self.assertEqual(
parsed_testcase["headers"]["random"],
variables["random"]
)
self.assertEqual(
parsed_testcase["body"],
variables["data"]
)
self.assertEqual(
parsed_testcase["headers"]["sum"],
3
)

View File

@@ -1,122 +0,0 @@
from typing import Dict, Text, Any, NoReturn
import jmespath
import requests
from loguru import logger
from httprunner.exceptions import ValidationFailure, ParamsError
from httprunner.v3.parser import parse_data, parse_string_value, get_mapping_function
from httprunner.v3.schema import VariablesMapping, Validators, FunctionsMapping
from httprunner.v3.validator import uniform_validator
class ResponseObject(object):
def __init__(self, resp_obj: requests.Response):
""" initialize with a requests.Response object
Args:
resp_obj (instance): requests.Response instance
"""
self.resp_obj = resp_obj
self.resp_obj_meta = {
"status_code": resp_obj.status_code,
"headers": resp_obj.headers,
"body": resp_obj.json()
}
self.validation_results: Dict = {}
def __getattr__(self, key):
try:
if key == "json":
value = self.resp_obj.json()
elif key == "cookies":
value = self.resp_obj.cookies.get_dict()
else:
value = getattr(self.resp_obj, key)
self.__dict__[key] = value
return value
except AttributeError:
err_msg = f"ResponseObject does not have attribute: {key}"
logger.error(err_msg)
raise ParamsError(err_msg)
def extract(self, extractors: Dict[Text, Text]) -> Dict[Text, Any]:
if not extractors:
return {}
extract_mapping = {}
for key, field in extractors.items():
field_value = jmespath.search(field, self.resp_obj_meta)
extract_mapping[key] = field_value
logger.info(f"extract mapping: {extract_mapping}")
return extract_mapping
def validate(self,
validators: Validators,
variables_mapping: VariablesMapping = None,
functions_mapping: FunctionsMapping = None) -> NoReturn:
self.validation_results = {}
if not validators:
return
validate_pass = True
failures = []
for v in validators:
if "validate_extractor" not in self.validation_results:
self.validation_results["validate_extractor"] = []
u_validator = uniform_validator(v)
# check item
check_item = u_validator["check"]
check_value = jmespath.search(check_item, self.resp_obj_meta)
check_value = parse_string_value(check_value)
# comparator
assert_method = u_validator["assert"]
assert_func = get_mapping_function(assert_method, functions_mapping)
# expect item
expect_item = u_validator["expect"]
# parse expected value with config/teststep/extracted variables
expect_value = parse_data(expect_item, variables_mapping, functions_mapping)
validate_msg = f"assert {check_item} {assert_method} {expect_value}({type(expect_value).__name__})"
validator_dict = {
"comparator": assert_method,
"check": check_item,
"check_value": check_value,
"expect": expect_item,
"expect_value": expect_value
}
try:
assert_func(check_value, expect_value)
validate_msg += "\t==> pass"
logger.info(validate_msg)
validator_dict["check_result"] = "pass"
except AssertionError:
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += f"\n" \
f"check_item: {check_item}\n" \
f"check_value: {check_value}({type(check_value).__name__})\n" \
f"assert_method: {assert_method}\n" \
f"expect_value: {expect_value}({type(expect_value).__name__})"
logger.error(validate_msg)
failures.append(validate_msg)
self.validation_results["validate_extractor"].append(validator_dict)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise ValidationFailure(failures_string)

View File

@@ -1,123 +0,0 @@
from typing import List, Dict
from loguru import logger
from httprunner import utils
from httprunner.client import HttpSession
from httprunner.exceptions import ValidationFailure
from httprunner.v3.parser import build_url, parse_data, parse_variables_mapping
from httprunner.v3.response import ResponseObject
from httprunner.v3.schema import TestsConfig, TestStep, VariablesMapping, TestCase, SessionData
class TestCaseRunner(object):
config: TestsConfig = {}
teststeps: List[TestStep] = []
session: HttpSession = None
step_datas: List[SessionData] = []
validation_results: Dict = {}
def init(self, testcase: TestCase) -> "TestCaseRunner":
self.config = testcase.config
self.teststeps = testcase.teststeps
return self
def with_session(self, s: HttpSession) -> "TestCaseRunner":
self.session = s
return self
def with_variables(self, **variables: VariablesMapping) -> "TestCaseRunner":
self.config.variables.update(variables)
return self
def __run_step(self, step: TestStep):
logger.info(f"run step: {step.name}")
# parse
request_dict = step.request.dict()
parsed_request_dict = parse_data(request_dict, step.variables, self.config.functions)
# prepare arguments
method = parsed_request_dict.pop("method")
url_path = parsed_request_dict.pop("url")
url = build_url(self.config.base_url, url_path)
parsed_request_dict["json"] = parsed_request_dict.pop("req_json", {})
logger.info(f"{method} {url}")
logger.debug(f"request kwargs(raw): {parsed_request_dict}")
# request
self.session = self.session or HttpSession()
resp = self.session.request(method, url, **parsed_request_dict)
resp_obj = ResponseObject(resp)
def log_req_resp_details():
err_msg = "\n{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)
# log request
err_msg += "====== request details ======\n"
err_msg += f"url: {url}\n"
err_msg += f"method: {method}\n"
headers = parsed_request_dict.pop("headers", {})
err_msg += f"headers: {headers}\n"
for k, v in parsed_request_dict.items():
v = utils.omit_long_data(v)
err_msg += f"{k}: {repr(v)}\n"
err_msg += "\n"
# log response
err_msg += "====== response details ======\n"
err_msg += f"status_code: {resp_obj.status_code}\n"
err_msg += f"headers: {resp_obj.headers}\n"
err_msg += f"body: {repr(resp_obj.text)}\n"
logger.error(err_msg)
# extract
extractors = step.extract
extract_mapping = resp_obj.extract(extractors)
variables_mapping = step.variables
variables_mapping.update(extract_mapping)
# validate
validators = step.validators
try:
resp_obj.validate(validators, variables_mapping, self.config.functions)
self.session.data.status = "passed"
except ValidationFailure:
self.session.data.status = "failed"
log_req_resp_details()
raise
finally:
self.validation_results = resp_obj.validation_results
# save request & response meta data
self.session.data.validators = self.validation_results
self.session.data.name = step.name
self.step_datas.append(self.session.data)
return extract_mapping
def test_start(self):
"""main entrance"""
self.step_datas.clear()
session_variables = {}
for step in self.teststeps:
# update with config variables
step.variables.update(self.config.variables)
# update with session variables extracted from former step
step.variables.update(session_variables)
# parse variables
step.variables = parse_variables_mapping(step.variables, self.config.functions)
# run step
extract_mapping = self.__run_step(step)
# save extracted variables to session variables
session_variables.update(extract_mapping)
return self
def run(self):
"""main entrance alias for test_start"""
return self.test_start()

View File

@@ -1,91 +0,0 @@
from typing import Text
from httprunner.exceptions import ParamsError
def get_uniform_comparator(comparator: Text):
""" convert comparator alias to uniform name
"""
if comparator in ["eq", "equals", "==", "is"]:
return "equals"
elif comparator in ["lt", "less_than"]:
return "less_than"
elif comparator in ["le", "less_than_or_equals"]:
return "less_than_or_equals"
elif comparator in ["gt", "greater_than"]:
return "greater_than"
elif comparator in ["ge", "greater_than_or_equals"]:
return "greater_than_or_equals"
elif comparator in ["ne", "not_equals"]:
return "not_equals"
elif comparator in ["str_eq", "string_equals"]:
return "string_equals"
elif comparator in ["len_eq", "length_equals", "count_eq"]:
return "length_equals"
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
return "length_greater_than"
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals",
"count_greater_than_or_equals"]:
return "length_greater_than_or_equals"
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
return "length_less_than"
elif comparator in ["len_le", "count_le", "length_less_than_or_equals",
"count_less_than_or_equals"]:
return "length_less_than_or_equals"
else:
return comparator
def uniform_validator(validator):
""" unify validator
Args:
validator (dict): validator maybe in two formats:
format1: this is kept for compatiblity with the previous versions.
{"check": "status_code", "assert": "eq", "expect": 201}
{"check": "$resp_body_success", "assert": "eq", "expect": True}
format2: recommended new version, {assert: [check_item, expected_value]}
{'eq': ['status_code', 201]}
{'eq': ['$resp_body_success', True]}
Returns
dict: validator info
{
"check": "status_code",
"expect": 201,
"assert": "equals"
}
"""
if not isinstance(validator, dict):
raise ParamsError(f"invalid validator: {validator}")
if "check" in validator and "expect" in validator:
# format1
check_item = validator["check"]
expect_value = validator["expect"]
comparator = validator.get("comparator", "eq")
elif len(validator) == 1:
# format2
comparator = list(validator.keys())[0]
compare_values = validator[comparator]
if not isinstance(compare_values, list) or len(compare_values) != 2:
raise ParamsError(f"invalid validator: {validator}")
check_item, expect_value = compare_values
else:
raise ParamsError(f"invalid validator: {validator}")
# uniform comparator, e.g. lt => less_than, eq => equals
assert_method = get_uniform_comparator(comparator)
return {
"check": check_item,
"expect": expect_value,
"assert": assert_method
}

View File

@@ -1,204 +0,0 @@
# encoding: utf-8
import sys
import traceback
from loguru import logger
from httprunner import exceptions, parser
class Validator(object):
"""Validate tests
Attributes:
validation_results (dict): store validation results,
including validate_extractor and validate_script.
"""
def __init__(self, session_context, resp_obj):
""" initialize a Validator for each teststep (API request)
Args:
session_context: HttpRunner session context
resp_obj: ResponseObject instance
"""
self.session_context = session_context
self.resp_obj = resp_obj
self.validation_results = {}
def __eval_validator_check(self, check_item):
""" evaluate check item in validator.
Args:
check_item: check_item should only be the following 5 formats:
1, variable reference, e.g. $token
2, function reference, e.g. ${is_status_code_200($status_code)}
3, dict or list, maybe containing variable/function reference, e.g. {"var": "$abc"}
4, string joined by delimiter. e.g. "status_code", "headers.content-type"
5, regex string, e.g. "LB[\d]*(.*)RB[\d]*"
"""
if isinstance(check_item, (dict, list)) \
or isinstance(check_item, parser.LazyString):
# format 1/2/3
check_value = self.session_context.eval_content(check_item)
else:
# format 4/5
check_value = self.resp_obj.extract_field(check_item)
return check_value
def __eval_validator_expect(self, expect_item):
""" evaluate expect item in validator.
Args:
expect_item: expect_item should only be in 2 types:
1, variable reference, e.g. $expect_status_code
2, actual value, e.g. 200
"""
expect_value = self.session_context.eval_content(expect_item)
return expect_value
def validate_script(self, script):
""" make validation with python script
"""
result = {
"validate_script": "<br/>".join(script),
"check_result": "pass",
"output": ""
}
script = "\n ".join(script)
code = f"""
# encoding: utf-8
def run_validate_script():
{script}
"""
variables = {
"status_code": self.resp_obj.status_code,
"response_json": self.resp_obj.json,
"response": self.resp_obj
}
variables.update(self.session_context.test_variables_mapping)
variables.update(globals())
try:
exec(code, variables)
except SyntaxError as ex:
logger.warning(f"SyntaxError in python validate script: {ex}")
result["check_result"] = "fail"
result["output"] = "<br/>".join([
f"ErrorMessage: {ex.msg}",
f"ErrorLine: {ex.lineno}",
f"ErrorText: {ex.text}"
])
return result
try:
# run python validate script
variables["run_validate_script"]()
except Exception as ex:
logger.warning(f"run python validate script failed: {ex}")
result["check_result"] = "fail"
_type, _value, _tb = sys.exc_info()
_lineno = -1
if _tb.tb_next:
_lineno = _tb.tb_next.tb_lineno
line_no = _lineno - 4
elif len(traceback.extract_tb(_tb)) > 0:
# filename, lineno, name, line
_, _lineno, _, _ = traceback.extract_tb(_tb)[-1]
line_no = _lineno - 4
else:
line_no = "N/A"
result["output"] = "<br/>".join([
f"ErrorType: {_type.__name__}",
f"ErrorLine: {line_no}"
])
return result
def validate(self, validators):
""" make validation with comparators
"""
self.validation_results = {}
if not validators:
return
logger.debug("start to validate.")
validate_pass = True
failures = []
for validator in validators:
if isinstance(validator, dict) and validator.get("type") == "python_script":
script = self.session_context.eval_content(validator["script"])
result = self.validate_script(script)
if result["check_result"] == "fail":
validate_pass = False
failures.append(result["output"])
self.validation_results["validate_script"] = result
continue
if "validate_extractor" not in self.validation_results:
self.validation_results["validate_extractor"] = []
# validator should be LazyFunction object
if not isinstance(validator, parser.LazyFunction):
raise exceptions.ValidationFailure(
f"validator should be parsed first: {validators}")
# evaluate validator args with context variable mapping.
validator_args = validator.get_args()
check_item, expect_item = validator_args
check_value = self.__eval_validator_check(check_item)
expect_value = self.__eval_validator_expect(expect_item)
validator.update_args([check_value, expect_value])
comparator = validator.func_name
validator_dict = {
"comparator": comparator,
"check": check_item,
"check_value": check_value,
"expect": expect_item,
"expect_value": expect_value
}
validate_msg = f"\nvalidate: {check_item} {comparator} {expect_value}({type(expect_value).__name__})"
try:
validator.to_value(self.session_context.test_variables_mapping)
validator_dict["check_result"] = "pass"
validate_msg += "\t==> pass"
logger.debug(validate_msg)
except (AssertionError, TypeError):
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += "\n{}({}) {} {}({})".format(
check_value,
type(check_value).__name__,
comparator,
expect_value,
type(expect_value).__name__
)
logger.error(validate_msg)
failures.append(validate_msg)
self.validation_results["validate_extractor"].append(validator_dict)
# restore validator args, in case of running multiple times
validator.update_args(validator_args)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise exceptions.ValidationFailure(failures_string)

View File

@@ -1,188 +0,0 @@
import os
from httprunner import context, exceptions, loader, parser, runner
from tests.api_server import gen_md5
from tests.base import ApiServerUnittest, gen_random_string
class TestContext(ApiServerUnittest):
def setUp(self):
loader.load_project_data(os.path.join(os.getcwd(), "tests"))
self.context = context.SessionContext(
variables={"SECRET_KEY": "DebugTalk"}
)
def test_init_test_variables_initialize(self):
self.assertEqual(
self.context.test_variables_mapping,
{'SECRET_KEY': 'DebugTalk'}
)
def test_init_test_variables(self):
variables = {
"random": "${gen_random_string($num)}",
"authorization": "${gen_md5($TOKEN, $data, $random)}",
"data": "$username",
# TODO: escape '{' and '}'
# "data": '{"name": "$username", "password": "123456"}',
"TOKEN": "debugtalk",
"username": "user1",
"num": 6
}
functions = {
"gen_random_string": gen_random_string,
"gen_md5": gen_md5
}
variables = parser.prepare_lazy_data(variables, functions, variables.keys())
variables = parser.parse_variables_mapping(variables)
self.context.init_test_variables(variables)
variables_mapping = self.context.test_variables_mapping
self.assertEqual(len(variables_mapping["random"]), 6)
self.assertEqual(len(variables_mapping["authorization"]), 32)
self.assertEqual(variables_mapping["data"], 'user1')
def test_update_seesion_variables(self):
self.context.update_session_variables({"TOKEN": "debugtalk"})
self.assertEqual(
self.context.session_variables_mapping["TOKEN"],
"debugtalk"
)
def test_eval_content_variables(self):
variables = {
"SECRET_KEY": "DebugTalk"
}
content = parser.prepare_lazy_data("abc$SECRET_KEY", {}, variables.keys())
self.assertEqual(
self.context.eval_content(content),
"abcDebugTalk"
)
# TODO: fix variable extraction
# content = "abc$SECRET_KEYdef"
# self.assertEqual(
# self.context.eval_content(content),
# "abcDebugTalkdef"
# )
def test_get_parsed_request(self):
variables = {
"random": "${gen_random_string(5)}",
"data": '{"name": "user", "password": "123456"}',
"authorization": "${gen_md5($TOKEN, $data, $random)}",
"TOKEN": "debugtalk"
}
functions = {
"gen_random_string": gen_random_string,
"gen_md5": gen_md5
}
variables = parser.prepare_lazy_data(variables, functions, variables.keys())
variables = parser.parse_variables_mapping(variables)
self.context.init_test_variables(variables)
request = {
"url": "http://127.0.0.1:5000/api/users/1000",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"secret_key": "$SECRET_KEY"
},
"data": "$data"
}
prepared_request = parser.prepare_lazy_data(
request,
functions,
{"authorization", "random", "SECRET_KEY", "data"}
)
parsed_request = self.context.eval_content(prepared_request)
self.assertIn("authorization", parsed_request["headers"])
self.assertEqual(len(parsed_request["headers"]["authorization"]), 32)
self.assertIn("random", parsed_request["headers"])
self.assertEqual(len(parsed_request["headers"]["random"]), 5)
self.assertIn("data", parsed_request)
self.assertEqual(
parsed_request["data"],
'{"name": "user", "password": "123456"}'
)
self.assertEqual(parsed_request["headers"]["secret_key"], "DebugTalk")
def test_validate(self):
testcases = [
{
"config": {
'name': "test validation"
},
"teststeps": [
{
"name": "test validation",
"request": {
"url": "http://127.0.0.1:5000/",
"method": "GET",
},
"variables": {
"resp_status_code": 200,
"resp_body_success": True
},
"validate": [
{"eq": ["$resp_status_code", 200]},
{"check": "$resp_status_code", "comparator": "eq", "expect": 200},
{"check": "$resp_body_success", "expect": True},
{"check": "${is_status_code_200($resp_status_code)}", "expect": True}
]
}
]
}
]
from tests.debugtalk import is_status_code_200
tests_mapping = {
"project_mapping": {
"functions": {
"is_status_code_200": is_status_code_200
}
},
"testcases": testcases
}
testcases = parser.parse_tests(tests_mapping)
parsed_testcase = testcases[0]
test_runner = runner.Runner(parsed_testcase["config"])
teststep = parsed_testcase["teststeps"][0]
test_runner.run_test(teststep)
def test_validate_exception(self):
testcases = [
{
"config": {
'name': "test validation"
},
"teststeps": [
{
"name": "test validation",
"request": {
"url": "http://127.0.0.1:5000/",
"method": "GET",
},
"variables": {
"resp_status_code": 200,
"resp_body_success": True
},
"validate": [
{"eq": ["$resp_status_code", 201]},
{"check": "$resp_status_code", "expect": 201},
{"check": "$resp_body_success", "comparator": "eq", "expect": True}
]
}
]
}
]
tests_mapping = {
"testcases": testcases
}
testcases = parser.parse_tests(tests_mapping)
parsed_testcase = testcases[0]
test_runner = runner.Runner(parsed_testcase["config"])
teststep = parsed_testcase["teststeps"][0]
with self.assertRaises(exceptions.ValidationFailure):
test_runner.run_test(teststep)