Merge branch 'master' into dev

This commit is contained in:
readyou
2019-12-26 21:48:59 +08:00
committed by GitHub
60 changed files with 1545 additions and 895 deletions

View File

@@ -1,4 +1,16 @@
__version__ = "2.4.0"
__version__ = "2.4.8"
__description__ = "One-stop solution for HTTP(S) testing."
__all__ = ["__version__", "__description__"]
import uuid
import sentry_sdk
sentry_sdk.init(
dsn="https://cc6dd86fbe9f4e7fbd95248cfcff114d@sentry.io/1862849",
release="httprunner@{}".format(__version__)
)
with sentry_sdk.configure_scope() as scope:
scope.set_user({"id": uuid.getnode()})

View File

@@ -1,6 +1,5 @@
import sys
from httprunner.cli import main
if __name__ == "__main__":
sys.exit(main())
main()

View File

@@ -1,6 +1,8 @@
import os
import unittest
from sentry_sdk import capture_message
from httprunner import (__version__, exceptions, loader, logger, parser,
report, runner, utils)
@@ -183,6 +185,7 @@ class HttpRunner(object):
def run_tests(self, tests_mapping):
""" run testcase/testsuite data
"""
capture_message("start to run tests")
project_mapping = tests_mapping.get("project_mapping", {})
self.project_working_directory = project_mapping.get("PWD", os.getcwd())
@@ -192,6 +195,10 @@ class HttpRunner(object):
# parse tests
self.exception_stage = "parse tests"
parsed_testcases = parser.parse_tests(tests_mapping)
parse_failed_testfiles = parser.get_parse_failed_testfiles()
if parse_failed_testfiles:
logger.log_warning("parse failures occurred ...")
utils.dump_logs(parse_failed_testfiles, project_mapping, "parse_failed")
if self.save_tests:
utils.dump_logs(parsed_testcases, project_mapping, "parsed")
@@ -274,6 +281,8 @@ class HttpRunner(object):
path_or_tests:
str: testcase/testsuite file/foler path
dict: valid testcase/testsuite data
dot_env_path (str): specified .env file path.
mapping (dict): if mapping is specified, it will override variables in config block.
Returns:
dict: result summary

View File

@@ -3,19 +3,13 @@ Built-in functions used in YAML/JSON testcases.
"""
import datetime
import os
import random
import string
import time
import filetype
from requests_toolbelt import MultipartEncoder
from httprunner.compat import builtin_str, integer_types
from httprunner.exceptions import ParamsError
PWD = os.getcwd()
def gen_random_string(str_len):
""" generate random string with specified length
@@ -44,62 +38,3 @@ def sleep(n_secs):
"""
time.sleep(n_secs)
"""
upload files with requests-toolbelt
e.g.
- test:
name: upload file
variables:
file_path: "data/test.env"
multipart_encoder: ${multipart_encoder(file=$file_path)}
request:
url: /post
method: POST
headers:
Content-Type: ${multipart_content_type($multipart_encoder)}
data: $multipart_encoder
validate:
- eq: ["status_code", 200]
- startswith: ["content.files.file", "UserName=test"]
"""
def multipart_encoder(**kwargs):
""" initialize MultipartEncoder with uploading fields.
"""
def get_filetype(file_path):
file_type = filetype.guess(file_path)
if file_type:
return file_type.mime
else:
return "text/html"
fields_dict = {}
for key, value in kwargs.items():
if os.path.isabs(value):
_file_path = value
is_file = True
else:
global PWD
_file_path = os.path.join(PWD, value)
is_file = os.path.isfile(_file_path)
if is_file:
filename = os.path.basename(_file_path)
with open(_file_path, 'rb') as f:
mime_type = get_filetype(_file_path)
fields_dict[key] = (filename, f.read(), mime_type)
else:
fields_dict[key] = value
return MultipartEncoder(fields=fields_dict)
def multipart_content_type(multipart_encoder):
""" prepare Content-Type for request headers
"""
return multipart_encoder.content_type

View File

@@ -2,6 +2,8 @@ import argparse
import os
import sys
from sentry_sdk import capture_exception
from httprunner import __description__, __version__
from httprunner.api import HttpRunner
from httprunner.compat import is_py2
@@ -64,23 +66,23 @@ def main():
if len(sys.argv) == 1:
# no argument passed
parser.print_help()
return 0
sys.exit(0)
if args.version:
color_print("{}".format(__version__), "GREEN")
return 0
sys.exit(0)
if args.validate:
validate_json_file(args.validate)
return 0
sys.exit(0)
if args.prettify:
prettify_json_file(args.prettify)
return 0
sys.exit(0)
project_name = args.startproject
if project_name:
create_scaffold(project_name)
return 0
sys.exit(0)
runner = HttpRunner(
failfast=args.failfast,
@@ -91,7 +93,7 @@ def main():
err_code = 0
try:
for path in args.testcase_paths:
for path in args.testfile_paths:
summary = runner.run(path, dot_env_path=args.dot_env_path)
report_dir = args.report_dir or os.path.join(runner.project_working_directory, "reports")
gen_html_report(
@@ -101,12 +103,13 @@ def main():
report_file=args.report_file
)
err_code |= (0 if summary and summary["success"] else 1)
except Exception:
except Exception as ex:
color_print("!!!!!!!!!! exception stage: {} !!!!!!!!!!".format(runner.exception_stage), "YELLOW")
raise
capture_exception(ex)
err_code = 1
return err_code
sys.exit(err_code)
if __name__ == '__main__':
sys.exit(main())
main()

View File

@@ -14,6 +14,74 @@ from httprunner.utils import lower_dict_keys, omit_long_data
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get_req_resp_record(resp_obj):
""" get request and response info from Response() object.
"""
def log_print(req_resp_dict, r_type):
msg = "\n================== {} details ==================\n".format(r_type)
for key, value in req_resp_dict[r_type].items():
msg += "{:<16} : {}\n".format(key, repr(value))
logger.log_debug(msg)
req_resp_dict = {
"request": {},
"response": {}
}
# record actual request info
req_resp_dict["request"]["url"] = resp_obj.request.url
req_resp_dict["request"]["method"] = resp_obj.request.method
req_resp_dict["request"]["headers"] = dict(resp_obj.request.headers)
request_body = resp_obj.request.body
if request_body:
request_content_type = lower_dict_keys(
req_resp_dict["request"]["headers"]
).get("content-type")
if request_content_type and "multipart/form-data" in request_content_type:
# upload file type
req_resp_dict["request"]["body"] = "upload file stream (OMITTED)"
else:
req_resp_dict["request"]["body"] = request_body
# log request details in debug mode
log_print(req_resp_dict, "request")
# record response info
req_resp_dict["response"]["ok"] = resp_obj.ok
req_resp_dict["response"]["url"] = resp_obj.url
req_resp_dict["response"]["status_code"] = resp_obj.status_code
req_resp_dict["response"]["reason"] = resp_obj.reason
req_resp_dict["response"]["cookies"] = resp_obj.cookies or {}
req_resp_dict["response"]["encoding"] = resp_obj.encoding
resp_headers = dict(resp_obj.headers)
req_resp_dict["response"]["headers"] = resp_headers
lower_resp_headers = lower_dict_keys(resp_headers)
content_type = lower_resp_headers.get("content-type", "")
req_resp_dict["response"]["content_type"] = content_type
if "image" in content_type:
# response is image type, record bytes content only
req_resp_dict["response"]["body"] = resp_obj.content
else:
try:
# try to record json data
if isinstance(resp_obj, response.ResponseObject):
req_resp_dict["response"]["body"] = resp_obj.json
else:
req_resp_dict["response"]["body"] = resp_obj.json()
except ValueError:
# only record at most 512 text charactors
resp_text = resp_obj.text
req_resp_dict["response"]["body"] = omit_long_data(resp_text)
# log response details in debug mode
log_print(req_resp_dict, "response")
return req_resp_dict
class ApiResponse(Response):
def raise_for_status(self):
@@ -62,79 +130,12 @@ class HttpSession(requests.Session):
}
}
def get_req_resp_record(self, resp_obj):
""" get request and response info from Response() object.
"""
def log_print(req_resp_dict, r_type):
msg = "\n================== {} details ==================\n".format(r_type)
for key, value in req_resp_dict[r_type].items():
msg += "{:<16} : {}\n".format(key, repr(value))
logger.log_debug(msg)
req_resp_dict = {
"request": {},
"response": {}
}
# record actual request info
req_resp_dict["request"]["url"] = resp_obj.request.url
req_resp_dict["request"]["method"] = resp_obj.request.method
req_resp_dict["request"]["headers"] = dict(resp_obj.request.headers)
request_body = resp_obj.request.body
if request_body:
request_content_type = lower_dict_keys(
req_resp_dict["request"]["headers"]
).get("content-type")
if request_content_type and "multipart/form-data" in request_content_type:
# upload file type
req_resp_dict["request"]["body"] = "upload file stream (OMITTED)"
else:
req_resp_dict["request"]["body"] = request_body
# log request details in debug mode
log_print(req_resp_dict, "request")
# record response info
req_resp_dict["response"]["ok"] = resp_obj.ok
req_resp_dict["response"]["url"] = resp_obj.url
req_resp_dict["response"]["status_code"] = resp_obj.status_code
req_resp_dict["response"]["reason"] = resp_obj.reason
req_resp_dict["response"]["cookies"] = resp_obj.cookies or {}
req_resp_dict["response"]["encoding"] = resp_obj.encoding
resp_headers = dict(resp_obj.headers)
req_resp_dict["response"]["headers"] = resp_headers
lower_resp_headers = lower_dict_keys(resp_headers)
content_type = lower_resp_headers.get("content-type", "")
req_resp_dict["response"]["content_type"] = content_type
if "image" in content_type:
# response is image type, record bytes content only
req_resp_dict["response"]["content"] = resp_obj.content
else:
try:
# try to record json data
if isinstance(resp_obj, response.ResponseObject):
req_resp_dict["response"]["json"] = resp_obj.json
else:
req_resp_dict["response"]["json"] = resp_obj.json()
except ValueError:
# only record at most 512 text charactors
resp_text = resp_obj.text
req_resp_dict["response"]["text"] = omit_long_data(resp_text)
# log response details in debug mode
log_print(req_resp_dict, "response")
return req_resp_dict
def update_last_req_resp_record(self, resp_obj):
"""
update request and response info from Response() object.
"""
self.meta_data["data"].pop()
self.meta_data["data"].append(self.get_req_resp_record(resp_obj))
self.meta_data["data"].append(get_req_resp_record(resp_obj))
def request(self, method, url, name=None, **kwargs):
"""
@@ -207,7 +208,7 @@ class HttpSession(requests.Session):
# record request and response histories, include 30X redirection
response_list = response.history + [response]
self.meta_data["data"] = [
self.get_req_resp_record(resp_obj)
get_req_resp_record(resp_obj)
for resp_obj in response_list
]

View File

@@ -6,18 +6,23 @@ from httprunner.compat import JSONDecodeError, FileNotFoundError
these exceptions will mark test as failure
"""
class MyBaseFailure(Exception):
pass
class ValidationFailure(MyBaseFailure):
pass
class ExtractFailure(MyBaseFailure):
pass
class SetupHooksFailure(MyBaseFailure):
pass
class TeardownHooksFailure(MyBaseFailure):
pass
@@ -26,35 +31,51 @@ class TeardownHooksFailure(MyBaseFailure):
these exceptions will mark test as error
"""
class MyBaseError(Exception):
pass
class FileFormatError(MyBaseError):
pass
class ParamsError(MyBaseError):
pass
class NotFoundError(MyBaseError):
pass
class FileNotFound(FileNotFoundError, NotFoundError):
pass
class FunctionNotFound(NotFoundError):
pass
class VariableNotFound(NotFoundError):
pass
class EnvNotFound(NotFoundError):
pass
class CSVNotFound(NotFoundError):
pass
class ApiNotFound(NotFoundError):
pass
class TestcaseNotFound(NotFoundError):
pass
class SummaryEmpty(MyBaseError):
""" test result summary data is empty
"""

View File

@@ -1,2 +1,2 @@
# NOTICE:
# This file should not be deleted, or ImportError will be raised in Python 2.7 when importing plugin
# This file should not be deleted, or ImportError will be raised in Python 2.7 when importing extension

View File

@@ -17,7 +17,7 @@ $ locusts -f xxx.yml --processes
```
```shell script
$ python3 -m httprunner.plugins.locusts -h
$ python3 -m httprunner.ext.locusts -h
Usage: locust [options] [LocustClass [LocustClass2 ... ]]

View File

@@ -0,0 +1,4 @@
from httprunner.ext.locusts.cli import main
if __name__ == "__main__":
main()

View File

@@ -2,6 +2,7 @@ try:
# monkey patch ssl at beginning to avoid RecursionError when running locust.
from gevent import monkey
monkey.patch_ssl()
from locust import main as locust_main
except ImportError:
msg = """
Locust is not installed, install first and try again.
@@ -61,8 +62,7 @@ def gen_locustfile(testcase_file_path):
def start_locust_main():
from locust.main import main
main()
locust_main.main()
def start_master(sys_argv):

View File

@@ -5,7 +5,7 @@ from locust import HttpLocust, TaskSet, task
from locust.events import request_failure
from httprunner.exceptions import MyBaseError, MyBaseFailure
from httprunner.plugins.locusts.utils import prepare_locust_tests
from httprunner.ext.locusts.utils import prepare_locust_tests
from httprunner.runner import Runner
logging.getLogger().setLevel(logging.CRITICAL)

View File

@@ -0,0 +1,144 @@
""" upload test extension.
If you want to use this extension, you should install the following dependencies first.
- requests_toolbelt
- filetype
Then you can write upload test script as below:
- test:
name: upload file
request:
url: http://httpbin.org/upload
method: POST
headers:
Cookie: session=AAA-BBB-CCC
upload:
file: "data/file_to_upload"
field1: "value1"
field2: "value2"
validate:
- eq: ["status_code", 200]
For compatibility, you can also write upload test script in old way:
- test:
name: upload file
variables:
file: "data/file_to_upload"
field1: "value1"
field2: "value2"
m_encoder: ${multipart_encoder(file=$file, field1=$field1, field2=$field2)}
request:
url: http://httpbin.org/upload
method: POST
headers:
Content-Type: ${multipart_content_type($m_encoder)}
Cookie: session=AAA-BBB-CCC
data: $m_encoder
validate:
- eq: ["status_code", 200]
"""
import os
import sys
try:
import filetype
from requests_toolbelt import MultipartEncoder
except ImportError:
msg = """
uploader extension dependencies uninstalled, install first and try again.
install with pip:
$ pip install requests_toolbelt filetype
"""
print(msg)
sys.exit(0)
from httprunner.exceptions import ParamsError
def prepare_upload_test(test_dict):
""" preprocess for upload test
replace `upload` info with MultipartEncoder
Args:
test_dict (dict):
{
"variables": {},
"request": {
"url": "http://httpbin.org/upload",
"method": "POST",
"headers": {
"Cookie": "session=AAA-BBB-CCC"
},
"upload": {
"file": "data/file_to_upload"
"md5": "123"
}
}
}
"""
upload_json = test_dict["request"].pop("upload", {})
if not upload_json:
raise ParamsError("invalid upload info: {}".format(upload_json))
params_list = []
for key, value in upload_json.items():
test_dict["variables"][key] = value
params_list.append("{}=${}".format(key, key))
params_str = ", ".join(params_list)
test_dict["variables"]["m_encoder"] = "${multipart_encoder(" + params_str + ")}"
test_dict["request"].setdefault("headers", {})
test_dict["request"]["headers"]["Content-Type"] = "${multipart_content_type($m_encoder)}"
test_dict["request"]["data"] = "$m_encoder"
def multipart_encoder(**kwargs):
""" initialize MultipartEncoder with uploading fields.
"""
def get_filetype(file_path):
file_type = filetype.guess(file_path)
if file_type:
return file_type.mime
else:
return "text/html"
fields_dict = {}
for key, value in kwargs.items():
if os.path.isabs(value):
# value is absolute file path
_file_path = value
is_exists_file = os.path.isfile(value)
else:
# value is not absolute file path, check if it is relative file path
from httprunner.loader import get_pwd
_file_path = os.path.join(get_pwd(), value)
is_exists_file = os.path.isfile(_file_path)
if is_exists_file:
# value is file path to upload
filename = os.path.basename(_file_path)
mime_type = get_filetype(_file_path)
# TODO: fix ResourceWarning for unclosed file
file_handler = open(_file_path, 'rb')
fields_dict[key] = (filename, file_handler, mime_type)
else:
fields_dict[key] = value
return MultipartEncoder(fields=fields_dict)
def multipart_content_type(m_encoder):
""" prepare Content-Type for request headers
"""
return m_encoder.content_type

View File

@@ -9,6 +9,7 @@ HttpRunner loader
"""
from httprunner.loader.check import is_testcase_path, is_testcases, validate_json_file
from httprunner.loader.locate import get_project_working_directory as get_pwd
from httprunner.loader.load import load_csv_file, load_builtin_functions
from httprunner.loader.buildup import load_cases, load_project_data
@@ -16,6 +17,7 @@ __all__ = [
"is_testcase_path",
"is_testcases",
"validate_json_file",
"get_pwd",
"load_csv_file",
"load_builtin_functions",
"load_project_data",

View File

@@ -2,8 +2,7 @@ import importlib
import os
from httprunner import exceptions, logger, utils
from httprunner.builtin import functions
from httprunner.loader.load import load_module_functions, load_folder_content, load_file, load_dot_env_file, \
from httprunner.loader.load import load_module_functions, load_file, load_dot_env_file, \
load_folder_files
from httprunner.loader.locate import init_project_working_directory, get_project_working_directory
@@ -50,12 +49,16 @@ def __extend_with_api_ref(raw_testinfo):
# type 1: api is defined in individual file
api_name = api_path
try:
if api_name in tests_def_mapping["api"]:
block = tests_def_mapping["api"][api_name]
# NOTICE: avoid project_mapping been changed during iteration.
raw_testinfo["api_def"] = utils.deepcopy_dict(block)
except KeyError:
elif not os.path.isfile(api_name):
raise exceptions.ApiNotFound("{} not found!".format(api_name))
else:
block = load_file(api_name)
# NOTICE: avoid project_mapping been changed during iteration.
raw_testinfo["api_def"] = utils.deepcopy_dict(block)
tests_def_mapping["api"][api_name] = block
def __extend_with_testcase_ref(raw_testinfo):
@@ -335,7 +338,6 @@ def load_test_file(path):
"""
raw_content = load_file(path)
loaded_content = None
if isinstance(raw_content, dict):
@@ -378,77 +380,6 @@ def load_test_file(path):
return loaded_content
def load_api_folder(api_folder_path):
""" load api definitions from api folder.
Args:
api_folder_path (str): api files folder.
api file should be in the following format:
[
{
"api": {
"def": "api_login",
"request": {},
"validate": []
}
},
{
"api": {
"def": "api_logout",
"request": {},
"validate": []
}
}
]
Returns:
dict: api definition mapping.
{
"api_login": {
"function_meta": {"func_name": "api_login", "args": [], "kwargs": {}}
"request": {}
},
"api_logout": {
"function_meta": {"func_name": "api_logout", "args": [], "kwargs": {}}
"request": {}
}
}
"""
api_definition_mapping = {}
api_items_mapping = load_folder_content(api_folder_path)
for api_file_path, api_items in api_items_mapping.items():
# TODO: add JSON schema validation
if isinstance(api_items, list):
for api_item in api_items:
key, api_dict = api_item.popitem()
api_id = api_dict.get("id") or api_dict.get("def") \
or api_dict.get("name")
if key != "api" or not api_id:
raise exceptions.ParamsError(
"Invalid API defined in {}".format(api_file_path))
if api_id in api_definition_mapping:
raise exceptions.ParamsError(
"Duplicated API ({}) defined in {}".format(
api_id, api_file_path))
else:
api_definition_mapping[api_id] = api_dict
elif isinstance(api_items, dict):
if api_file_path in api_definition_mapping:
raise exceptions.ParamsError(
"Duplicated API defined: {}".format(api_file_path))
else:
api_definition_mapping[api_file_path] = api_items
return api_definition_mapping
def load_project_data(test_path, dot_env_path=None):
""" load api, testcases, .env, debugtalk.py functions.
api/testcases folder is relative to project_working_directory
@@ -480,14 +411,9 @@ def load_project_data(test_path, dot_env_path=None):
debugtalk_functions = {}
# locate PWD and load debugtalk.py functions
project_mapping["PWD"] = project_working_directory
functions.PWD = project_working_directory # TODO: remove
project_mapping["functions"] = debugtalk_functions
project_mapping["test_path"] = test_path
# load api
tests_def_mapping["api"] = load_api_folder(os.path.join(project_working_directory, "api"))
project_mapping["test_path"] = os.path.abspath(test_path)
return project_mapping

View File

@@ -127,6 +127,8 @@ def is_testcase_path(path):
if not os.path.exists(path):
return False
# TODO: check file format if valid
return True

View File

@@ -185,31 +185,6 @@ def load_dot_env_file(dot_env_path):
return env_variables_mapping
def load_folder_content(folder_path):
""" load api/testcases/testsuites definitions from folder.
Args:
folder_path (str): api/testcases/testsuites files folder.
Returns:
dict: api definition mapping.
{
"tests/api/basic.yml": [
{"api": {"def": "api_login", "request": {}, "validate": []}},
{"api": {"def": "api_logout", "request": {}, "validate": []}}
]
}
"""
items_mapping = {}
for file_path in load_folder_files(folder_path):
items_mapping[file_path] = load_file(file_path)
return items_mapping
def load_module_functions(module):
""" load python module functions.

View File

@@ -16,6 +16,14 @@ variable_regex_compile = re.compile(r"\$\{(\w+)\}|\$(\w+)")
# function notation, e.g. ${func1($var_1, $var_3)}
function_regex_compile = re.compile(r"\$\{(\w+)\(([\$\w\.\-/\s=,]*)\)\}")
""" Store parse failed api/testcase/testsuite file path
"""
parse_failed_testfiles = {}
def get_parse_failed_testfiles():
return parse_failed_testfiles
def parse_string_value(str_value):
""" parse string to number if possible
@@ -428,6 +436,11 @@ def get_mapping_function(function_name, functions_mapping):
elif function_name in ["environ", "ENV"]:
return utils.get_os_environ
elif function_name in ["multipart_encoder", "multipart_content_type"]:
# extension for upload test
from httprunner.ext import uploader
return getattr(uploader, function_name)
try:
# check if HttpRunner builtin functions
built_in_functions = loader.load_builtin_functions()
@@ -439,8 +452,9 @@ def get_mapping_function(function_name, functions_mapping):
# check if Python builtin functions
return getattr(builtins, function_name)
except AttributeError:
# is not builtin function
raise exceptions.FunctionNotFound("{} is not found.".format(function_name))
pass
raise exceptions.FunctionNotFound("{} is not found.".format(function_name))
def parse_function_params(params):
@@ -1139,6 +1153,8 @@ def __prepare_testcase_tests(tests, config, project_mapping, session_variables_s
# 3, testcase_def config => testcase_def test_dict
test_dict = _parse_testcase(test_dict, project_mapping, session_variables_set)
if not test_dict:
continue
elif "api_def" in test_dict:
# test_dict has API reference
@@ -1146,13 +1162,18 @@ def __prepare_testcase_tests(tests, config, project_mapping, session_variables_s
api_def_dict = test_dict.pop("api_def")
_extend_with_api(test_dict, api_def_dict)
# verify priority: testcase teststep > testcase config
if "request" in test_dict:
if "verify" not in test_dict["request"]:
test_dict["request"]["verify"] = config_verify
if "upload" in test_dict["request"]:
from httprunner.ext.uploader import prepare_upload_test
prepare_upload_test(test_dict)
# current teststep variables
teststep_variables_set |= set(test_dict.get("variables", {}).keys())
# verify priority: testcase teststep > testcase config
if "request" in test_dict and "verify" not in test_dict["request"]:
test_dict["request"]["verify"] = config_verify
# move extracted variable to session variables
if "extract" in test_dict:
extract_mapping = utils.ensure_mapping_format(test_dict["extract"])
@@ -1205,21 +1226,34 @@ def _parse_testcase(testcase, project_mapping, session_variables_set=None):
"""
testcase.setdefault("config", {})
prepared_config = __prepare_config(
testcase["config"],
project_mapping,
session_variables_set
)
prepared_testcase_tests = __prepare_testcase_tests(
testcase["teststeps"],
prepared_config,
project_mapping,
session_variables_set
)
return {
"config": prepared_config,
"teststeps": prepared_testcase_tests
}
try:
prepared_config = __prepare_config(
testcase["config"],
project_mapping,
session_variables_set
)
prepared_testcase_tests = __prepare_testcase_tests(
testcase["teststeps"],
prepared_config,
project_mapping,
session_variables_set
)
return {
"config": prepared_config,
"teststeps": prepared_testcase_tests
}
except (exceptions.MyBaseFailure, exceptions.MyBaseError):
testcase_type = testcase["type"]
testcase_path = testcase.get("path")
global parse_failed_testfiles
if testcase_type not in parse_failed_testfiles:
parse_failed_testfiles[testcase_type] = []
parse_failed_testfiles[testcase_type].append(testcase_path)
return None
def __get_parsed_testsuite_testcases(testcases, testsuite_config, project_mapping):
@@ -1275,6 +1309,7 @@ def __get_parsed_testsuite_testcases(testcases, testsuite_config, project_mappin
parsed_testcase = testcase.pop("testcase_def")
parsed_testcase.setdefault("config", {})
parsed_testcase["path"] = testcase["testcase"]
parsed_testcase["type"] = "testcase"
parsed_testcase["config"]["name"] = testcase_name
if "weight" in testcase:
@@ -1320,6 +1355,8 @@ def __get_parsed_testsuite_testcases(testcases, testsuite_config, project_mappin
parameter_variables
)
parsed_testcase_copied = _parse_testcase(testcase_copied, project_mapping)
if not parsed_testcase_copied:
continue
parsed_testcase_copied["config"]["name"] = parse_lazy_data(
parsed_testcase_copied["config"]["name"],
testcase_copied["config"]["variables"]
@@ -1328,6 +1365,8 @@ def __get_parsed_testsuite_testcases(testcases, testsuite_config, project_mappin
else:
parsed_testcase = _parse_testcase(parsed_testcase, project_mapping)
if not parsed_testcase:
continue
parsed_testcase_list.append(parsed_testcase)
return parsed_testcase_list
@@ -1424,7 +1463,10 @@ def parse_tests(tests_mapping):
elif test_type == "testcases":
for testcase in tests_mapping["testcases"]:
testcase["type"] = "testcase"
parsed_testcase = _parse_testcase(testcase, project_mapping)
if not parsed_testcase:
continue
testcases.append(parsed_testcase)
elif test_type == "apis":
@@ -1434,9 +1476,13 @@ def parse_tests(tests_mapping):
"config": {
"name": api_content.get("name")
},
"teststeps": [api_content]
"teststeps": [api_content],
"path": api_content.pop("path", None),
"type": api_content.pop("type", "api")
}
parsed_testcase = _parse_testcase(testcase, project_mapping)
if not parsed_testcase:
continue
testcases.append(parsed_testcase)
return testcases

View File

@@ -1,4 +0,0 @@
from httprunner.plugins.locusts.cli import main
if __name__ == "__main__":
main()

View File

@@ -1,386 +0,0 @@
import io
import os
import platform
import time
import unittest
from base64 import b64encode
from collections import Iterable
from datetime import datetime
from jinja2 import Template, escape
from requests.cookies import RequestsCookieJar
from httprunner import __version__, logger
from httprunner.compat import basestring, bytes, json, numeric_types
def get_platform():
return {
"httprunner_version": __version__,
"python_version": "{} {}".format(
platform.python_implementation(),
platform.python_version()
),
"platform": platform.platform()
}
def get_summary(result):
""" get summary from test result
Args:
result (instance): HtmlTestResult() instance
Returns:
dict: summary extracted from result.
{
"success": True,
"stat": {},
"time": {},
"records": []
}
"""
summary = {
"success": result.wasSuccessful(),
"stat": {
'total': result.testsRun,
'failures': len(result.failures),
'errors': len(result.errors),
'skipped': len(result.skipped),
'expectedFailures': len(result.expectedFailures),
'unexpectedSuccesses': len(result.unexpectedSuccesses)
}
}
summary["stat"]["successes"] = summary["stat"]["total"] \
- summary["stat"]["failures"] \
- summary["stat"]["errors"] \
- summary["stat"]["skipped"] \
- summary["stat"]["expectedFailures"] \
- summary["stat"]["unexpectedSuccesses"]
summary["time"] = {
'start_at': result.start_at,
'duration': result.duration
}
summary["records"] = result.records
return summary
def aggregate_stat(origin_stat, new_stat):
""" aggregate new_stat to origin_stat.
Args:
origin_stat (dict): origin stat dict, will be updated with new_stat dict.
new_stat (dict): new stat dict.
"""
for key in new_stat:
if key not in origin_stat:
origin_stat[key] = new_stat[key]
elif key == "start_at":
# start datetime
origin_stat["start_at"] = min(origin_stat["start_at"], new_stat["start_at"])
elif key == "duration":
# duration = max_end_time - min_start_time
max_end_time = max(origin_stat["start_at"] + origin_stat["duration"],
new_stat["start_at"] + new_stat["duration"])
min_start_time = min(origin_stat["start_at"], new_stat["start_at"])
origin_stat["duration"] = max_end_time - min_start_time
else:
origin_stat[key] += new_stat[key]
def stringify_summary(summary):
""" stringify summary, in order to dump json file and generate html report.
"""
for index, suite_summary in enumerate(summary["details"]):
if not suite_summary.get("name"):
suite_summary["name"] = "testcase {}".format(index)
for record in suite_summary.get("records"):
meta_datas = record['meta_datas']
__stringify_meta_datas(meta_datas)
meta_datas_expanded = []
__expand_meta_datas(meta_datas, meta_datas_expanded)
record["meta_datas_expanded"] = meta_datas_expanded
record["response_time"] = __get_total_response_time(meta_datas_expanded)
def __stringify_request(request_data):
""" stringfy HTTP request data
Args:
request_data (dict): HTTP request data in dict.
{
"url": "http://127.0.0.1:5000/api/get-token",
"method": "POST",
"headers": {
"User-Agent": "python-requests/2.20.0",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"user_agent": "iOS/10.3",
"device_sn": "TESTCASE_CREATE_XXX",
"os_platform": "ios",
"app_version": "2.8.6",
"Content-Type": "application/json",
"Content-Length": "52"
},
"json": {
"sign": "cb9d60acd09080ea66c8e63a1c78c6459ea00168"
},
"verify": false
}
"""
for key, value in request_data.items():
if isinstance(value, list):
value = json.dumps(value, indent=2, ensure_ascii=False)
elif isinstance(value, bytes):
try:
encoding = "utf-8"
value = escape(value.decode(encoding))
except UnicodeDecodeError:
pass
elif not isinstance(value, (basestring, numeric_types, Iterable)):
# class instance, e.g. MultipartEncoder()
value = repr(value)
elif isinstance(value, RequestsCookieJar):
value = value.get_dict()
request_data[key] = value
def __stringify_response(response_data):
""" stringfy HTTP response data
Args:
response_data (dict):
{
"status_code": 404,
"headers": {
"Content-Type": "application/json",
"Content-Length": "30",
"Server": "Werkzeug/0.14.1 Python/3.7.0",
"Date": "Tue, 27 Nov 2018 06:19:27 GMT"
},
"encoding": "None",
"content_type": "application/json",
"ok": false,
"url": "http://127.0.0.1:5000/api/users/9001",
"reason": "NOT FOUND",
"cookies": {},
"json": {
"success": false,
"data": {}
}
}
"""
for key, value in response_data.items():
if isinstance(value, list):
value = json.dumps(value, indent=2, ensure_ascii=False)
elif isinstance(value, bytes):
try:
encoding = response_data.get("encoding")
if not encoding or encoding == "None":
encoding = "utf-8"
if key == "content" and "image" in response_data["content_type"]:
# display image
value = "data:{};base64,{}".format(
response_data["content_type"],
b64encode(value).decode(encoding)
)
else:
value = escape(value.decode(encoding))
except UnicodeDecodeError:
pass
elif not isinstance(value, (basestring, numeric_types, Iterable)):
# class instance, e.g. MultipartEncoder()
value = repr(value)
elif isinstance(value, RequestsCookieJar):
value = value.get_dict()
response_data[key] = value
def __expand_meta_datas(meta_datas, meta_datas_expanded):
""" expand meta_datas to one level
Args:
meta_datas (dict/list): maybe in nested format
Returns:
list: expanded list in one level
Examples:
>>> meta_datas = [
[
dict1,
dict2
],
dict3
]
>>> meta_datas_expanded = []
>>> __expand_meta_datas(meta_datas, meta_datas_expanded)
>>> print(meta_datas_expanded)
[dict1, dict2, dict3]
"""
if isinstance(meta_datas, dict):
meta_datas_expanded.append(meta_datas)
elif isinstance(meta_datas, list):
for meta_data in meta_datas:
__expand_meta_datas(meta_data, meta_datas_expanded)
def __get_total_response_time(meta_datas_expanded):
""" caculate total response time of all meta_datas
"""
try:
response_time = 0
for meta_data in meta_datas_expanded:
response_time += meta_data["stat"]["response_time_ms"]
return "{:.2f}".format(response_time)
except TypeError:
# failure exists
return "N/A"
def __stringify_meta_datas(meta_datas):
if isinstance(meta_datas, list):
for _meta_data in meta_datas:
__stringify_meta_datas(_meta_data)
elif isinstance(meta_datas, dict):
data_list = meta_datas["data"]
for data in data_list:
__stringify_request(data["request"])
__stringify_response(data["response"])
def gen_html_report(summary, report_template=None, report_dir=None, report_file=None):
""" render html report with specified report name and template
Args:
summary (dict): test result summary data
report_template (str): specify html report template path, template should be in Jinja2 format.
report_dir (str): specify html report save directory
report_file (str): specify html report file path, this has higher priority than specifying report dir.
"""
if not report_template:
report_template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"static",
"report_template.html"
)
logger.log_debug("No html report template specified, use default.")
else:
logger.log_info("render with html report template: {}".format(report_template))
logger.log_info("Start to render Html report ...")
start_at_timestamp = int(summary["time"]["start_at"])
summary["time"]["start_datetime"] = datetime.fromtimestamp(start_at_timestamp).strftime('%Y-%m-%d %H:%M:%S')
if report_file:
report_dir = os.path.dirname(report_file)
report_file_name = os.path.basename(report_file)
else:
report_dir = report_dir or os.path.join(os.getcwd(), "reports")
report_file_name = "{}.html".format(start_at_timestamp)
if not os.path.isdir(report_dir):
os.makedirs(report_dir)
report_path = os.path.join(report_dir, report_file_name)
with io.open(report_template, "r", encoding='utf-8') as fp_r:
template_content = fp_r.read()
with io.open(report_path, 'w', encoding='utf-8') as fp_w:
rendered_content = Template(
template_content,
extensions=["jinja2.ext.loopcontrols"]
).render(summary)
fp_w.write(rendered_content)
logger.log_info("Generated Html report: {}".format(report_path))
return report_path
class HtmlTestResult(unittest.TextTestResult):
""" A html result class that can generate formatted html results.
Used by TextTestRunner.
"""
def __init__(self, stream, descriptions, verbosity):
super(HtmlTestResult, self).__init__(stream, descriptions, verbosity)
self.records = []
def _record_test(self, test, status, attachment=''):
data = {
'name': test.shortDescription(),
'status': status,
'attachment': attachment,
"meta_datas": test.meta_datas
}
self.records.append(data)
def startTestRun(self):
self.start_at = time.time()
def startTest(self, test):
""" add start test time """
super(HtmlTestResult, self).startTest(test)
logger.color_print(test.shortDescription(), "yellow")
def addSuccess(self, test):
super(HtmlTestResult, self).addSuccess(test)
self._record_test(test, 'success')
print("")
def addError(self, test, err):
super(HtmlTestResult, self).addError(test, err)
self._record_test(test, 'error', self._exc_info_to_string(err, test))
print("")
def addFailure(self, test, err):
super(HtmlTestResult, self).addFailure(test, err)
self._record_test(test, 'failure', self._exc_info_to_string(err, test))
print("")
def addSkip(self, test, reason):
super(HtmlTestResult, self).addSkip(test, reason)
self._record_test(test, 'skipped', reason)
print("")
def addExpectedFailure(self, test, err):
super(HtmlTestResult, self).addExpectedFailure(test, err)
self._record_test(test, 'ExpectedFailure', self._exc_info_to_string(err, test))
print("")
def addUnexpectedSuccess(self, test):
super(HtmlTestResult, self).addUnexpectedSuccess(test)
self._record_test(test, 'UnexpectedSuccess')
print("")
@property
def duration(self):
return time.time() - self.start_at

View File

@@ -0,0 +1,20 @@
"""
HttpRunner report
- summarize: aggregate test stat data to summary
- stringify: stringify summary, in order to dump json file and generate html report.
- html: render html report
"""
from httprunner.report.summarize import get_platform, aggregate_stat, get_summary
from httprunner.report.stringify import stringify_summary
from httprunner.report.html import HtmlTestResult, gen_html_report
__all__ = [
"get_platform",
"aggregate_stat",
"get_summary",
"stringify_summary",
"HtmlTestResult",
"gen_html_report"
]

View File

@@ -0,0 +1,15 @@
"""
HttpRunner html report
- result: define resultclass for unittest TextTestRunner
- gen_report: render html report with jinja2 template
"""
from httprunner.report.html.result import HtmlTestResult
from httprunner.report.html.gen_report import gen_html_report
__all__ = [
"HtmlTestResult",
"gen_html_report"
]

View File

@@ -0,0 +1,62 @@
import io
import os
from datetime import datetime
from jinja2 import Template
from httprunner import logger
from httprunner.exceptions import SummaryEmpty
def gen_html_report(summary, report_template=None, report_dir=None, report_file=None):
""" render html report with specified report name and template
Args:
summary (dict): test result summary data
report_template (str): specify html report template path, template should be in Jinja2 format.
report_dir (str): specify html report save directory
report_file (str): specify html report file path, this has higher priority than specifying report dir.
"""
if not summary["time"] or summary["stat"]["testcases"]["total"] == 0:
logger.log_error("test result summary is empty ! {}".format(summary))
raise SummaryEmpty
if not report_template:
report_template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"template.html"
)
logger.log_debug("No html report template specified, use default.")
else:
logger.log_info("render with html report template: {}".format(report_template))
logger.log_info("Start to render Html report ...")
start_at_timestamp = int(summary["time"]["start_at"])
summary["time"]["start_datetime"] = datetime.fromtimestamp(start_at_timestamp).strftime('%Y-%m-%d %H:%M:%S')
if report_file:
report_dir = os.path.dirname(report_file)
report_file_name = os.path.basename(report_file)
else:
report_dir = report_dir or os.path.join(os.getcwd(), "reports")
report_file_name = "{}.html".format(start_at_timestamp)
if not os.path.isdir(report_dir):
os.makedirs(report_dir)
report_path = os.path.join(report_dir, report_file_name)
with io.open(report_template, "r", encoding='utf-8') as fp_r:
template_content = fp_r.read()
with io.open(report_path, 'w', encoding='utf-8') as fp_w:
rendered_content = Template(
template_content,
extensions=["jinja2.ext.loopcontrols"]
).render(summary)
fp_w.write(rendered_content)
logger.log_info("Generated Html report: {}".format(report_path))
return report_path

View File

@@ -0,0 +1,64 @@
import time
import unittest
from httprunner import logger
class HtmlTestResult(unittest.TextTestResult):
""" A html result class that can generate formatted html results.
Used by TextTestRunner.
"""
def __init__(self, stream, descriptions, verbosity):
super(HtmlTestResult, self).__init__(stream, descriptions, verbosity)
self.records = []
def _record_test(self, test, status, attachment=''):
data = {
'name': test.shortDescription(),
'status': status,
'attachment': attachment,
"meta_datas": test.meta_datas
}
self.records.append(data)
def startTestRun(self):
self.start_at = time.time()
def startTest(self, test):
""" add start test time """
super(HtmlTestResult, self).startTest(test)
logger.color_print(test.shortDescription(), "yellow")
def addSuccess(self, test):
super(HtmlTestResult, self).addSuccess(test)
self._record_test(test, 'success')
print("")
def addError(self, test, err):
super(HtmlTestResult, self).addError(test, err)
self._record_test(test, 'error', self._exc_info_to_string(err, test))
print("")
def addFailure(self, test, err):
super(HtmlTestResult, self).addFailure(test, err)
self._record_test(test, 'failure', self._exc_info_to_string(err, test))
print("")
def addSkip(self, test, reason):
super(HtmlTestResult, self).addSkip(test, reason)
self._record_test(test, 'skipped', reason)
print("")
def addExpectedFailure(self, test, err):
super(HtmlTestResult, self).addExpectedFailure(test, err)
self._record_test(test, 'ExpectedFailure', self._exc_info_to_string(err, test))
print("")
def addUnexpectedSuccess(self, test):
super(HtmlTestResult, self).addUnexpectedSuccess(test)
self._record_test(test, 'UnexpectedSuccess')
print("")
@property
def duration(self):
return time.time() - self.start_at

View File

@@ -232,14 +232,10 @@
<tr>
<th>{{key}}</th>
<td>
{% if key == "headers" %}
{% for header_key, header_value in req_resp.request.headers.items() %}
<div>
<strong>{{ header_key }}</strong>: {{ header_value }}
</div>
{% endfor %}
{% if key in ["headers", "body"] %}
<pre>{{ value | e }}</pre>
{% else %}
{{value}}
{{value}}
{% endif %}
</td>
</tr>
@@ -254,20 +250,14 @@
<tr>
<th>{{key}}</th>
<td>
{% if key == "headers" %}
{% for header_key, header_value in req_resp.response.headers.items() %}
<div>
<strong>{{ header_key }}</strong>: {{ header_value }}
</div>
{% endfor %}
{% elif key == "content" %}
{% if key == "headers" %}
<pre>{{ value | e }}</pre>
{% elif key == "body" %}
{% if "image" in req_resp.response.content_type %}
<img src="{{ req_resp.response.content }}" />
{% else %}
{{ value }}
{% endif %}
{% elif key in ["text", "json"] %}
<pre>{{ value | e }}</pre>
{% endif %}
{% else %}
{{ value }}
{% endif %}

View File

@@ -0,0 +1,216 @@
from base64 import b64encode
from collections import Iterable
from jinja2 import escape
from requests.cookies import RequestsCookieJar
from httprunner.compat import basestring, bytes, json, numeric_types, JSONDecodeError
def dumps_json(value):
""" dumps json value to indented string
Args:
value (dict): raw json data
Returns:
str: indented json dump string
"""
return json.dumps(value, indent=2, ensure_ascii=False)
def detect_encoding(value):
try:
return json.detect_encoding(value)
except AttributeError:
return "utf-8"
def __stringify_request(request_data):
""" stringfy HTTP request data
Args:
request_data (dict): HTTP request data in dict.
{
"url": "http://127.0.0.1:5000/api/get-token",
"method": "POST",
"headers": {
"User-Agent": "python-requests/2.20.0",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"user_agent": "iOS/10.3",
"device_sn": "TESTCASE_CREATE_XXX",
"os_platform": "ios",
"app_version": "2.8.6",
"Content-Type": "application/json",
"Content-Length": "52"
},
"body": b'{"sign": "cb9d60acd09080ea66c8e63a1c78c6459ea00168"}',
"verify": false
}
"""
for key, value in request_data.items():
if isinstance(value, (list, dict)):
value = dumps_json(value)
elif isinstance(value, bytes):
try:
encoding = detect_encoding(value)
value = value.decode(encoding)
if key == "body":
try:
# request body is in json format
value = json.loads(value)
value = dumps_json(value)
except JSONDecodeError:
pass
value = escape(value)
except UnicodeDecodeError:
pass
elif not isinstance(value, (basestring, numeric_types, Iterable)):
# class instance, e.g. MultipartEncoder()
value = repr(value)
elif isinstance(value, RequestsCookieJar):
value = value.get_dict()
request_data[key] = value
def __stringify_response(response_data):
""" stringfy HTTP response data
Args:
response_data (dict):
{
"status_code": 404,
"headers": {
"Content-Type": "application/json",
"Content-Length": "30",
"Server": "Werkzeug/0.14.1 Python/3.7.0",
"Date": "Tue, 27 Nov 2018 06:19:27 GMT"
},
"encoding": "None",
"content_type": "application/json",
"ok": false,
"url": "http://127.0.0.1:5000/api/users/9001",
"reason": "NOT FOUND",
"cookies": {},
"body": {
"success": false,
"data": {}
}
}
"""
for key, value in response_data.items():
if isinstance(value, (list, dict)):
value = dumps_json(value)
elif isinstance(value, bytes):
try:
encoding = response_data.get("encoding")
if not encoding or encoding == "None":
encoding = detect_encoding(value)
if key == "body" and "image" in response_data["content_type"]:
# display image
value = "data:{};base64,{}".format(
response_data["content_type"],
b64encode(value).decode(encoding)
)
else:
value = escape(value.decode(encoding))
except UnicodeDecodeError:
pass
elif not isinstance(value, (basestring, numeric_types, Iterable)):
# class instance, e.g. MultipartEncoder()
value = repr(value)
elif isinstance(value, RequestsCookieJar):
value = value.get_dict()
response_data[key] = value
def __expand_meta_datas(meta_datas, meta_datas_expanded):
""" expand meta_datas to one level
Args:
meta_datas (dict/list): maybe in nested format
Returns:
list: expanded list in one level
Examples:
>>> meta_datas = [
[
dict1,
dict2
],
dict3
]
>>> meta_datas_expanded = []
>>> __expand_meta_datas(meta_datas, meta_datas_expanded)
>>> print(meta_datas_expanded)
[dict1, dict2, dict3]
"""
if isinstance(meta_datas, dict):
meta_datas_expanded.append(meta_datas)
elif isinstance(meta_datas, list):
for meta_data in meta_datas:
__expand_meta_datas(meta_data, meta_datas_expanded)
def __get_total_response_time(meta_datas_expanded):
""" caculate total response time of all meta_datas
"""
try:
response_time = 0
for meta_data in meta_datas_expanded:
response_time += meta_data["stat"]["response_time_ms"]
return "{:.2f}".format(response_time)
except TypeError:
# failure exists
return "N/A"
def __stringify_meta_datas(meta_datas):
if isinstance(meta_datas, list):
for _meta_data in meta_datas:
__stringify_meta_datas(_meta_data)
elif isinstance(meta_datas, dict):
data_list = meta_datas["data"]
for data in data_list:
__stringify_request(data["request"])
__stringify_response(data["response"])
def stringify_summary(summary):
""" stringify summary, in order to dump json file and generate html report.
"""
for index, suite_summary in enumerate(summary["details"]):
if not suite_summary.get("name"):
suite_summary["name"] = "testcase {}".format(index)
for record in suite_summary.get("records"):
meta_datas = record['meta_datas']
__stringify_meta_datas(meta_datas)
meta_datas_expanded = []
__expand_meta_datas(meta_datas, meta_datas_expanded)
record["meta_datas_expanded"] = meta_datas_expanded
record["response_time"] = __get_total_response_time(meta_datas_expanded)

View File

@@ -0,0 +1,82 @@
import platform
from httprunner import __version__
def get_platform():
return {
"httprunner_version": __version__,
"python_version": "{} {}".format(
platform.python_implementation(),
platform.python_version()
),
"platform": platform.platform()
}
def aggregate_stat(origin_stat, new_stat):
""" aggregate new_stat to origin_stat.
Args:
origin_stat (dict): origin stat dict, will be updated with new_stat dict.
new_stat (dict): new stat dict.
"""
for key in new_stat:
if key not in origin_stat:
origin_stat[key] = new_stat[key]
elif key == "start_at":
# start datetime
origin_stat["start_at"] = min(origin_stat["start_at"], new_stat["start_at"])
elif key == "duration":
# duration = max_end_time - min_start_time
max_end_time = max(origin_stat["start_at"] + origin_stat["duration"],
new_stat["start_at"] + new_stat["duration"])
min_start_time = min(origin_stat["start_at"], new_stat["start_at"])
origin_stat["duration"] = max_end_time - min_start_time
else:
origin_stat[key] += new_stat[key]
def get_summary(result):
""" get summary from test result
Args:
result (instance): HtmlTestResult() instance
Returns:
dict: summary extracted from result.
{
"success": True,
"stat": {},
"time": {},
"records": []
}
"""
summary = {
"success": result.wasSuccessful(),
"stat": {
'total': result.testsRun,
'failures': len(result.failures),
'errors': len(result.errors),
'skipped': len(result.skipped),
'expectedFailures': len(result.expectedFailures),
'unexpectedSuccesses': len(result.unexpectedSuccesses)
}
}
summary["stat"]["successes"] = summary["stat"]["total"] \
- summary["stat"]["failures"] \
- summary["stat"]["errors"] \
- summary["stat"]["skipped"] \
- summary["stat"]["expectedFailures"] \
- summary["stat"]["unexpectedSuccesses"]
summary["time"] = {
'start_at': result.start_at,
'duration': result.duration
}
summary["records"] = result.records
return summary

View File

@@ -175,7 +175,7 @@ class ResponseObject(object):
raise exceptions.ExtractFailure(err_msg)
# response body
elif top_query in ["content", "text", "json"]:
elif top_query in ["body", "content", "text", "json"]:
try:
body = self.json
except exceptions.JSONDecodeError:
@@ -222,7 +222,8 @@ class ResponseObject(object):
# others
else:
err_msg = u"Failed to extract attribute from response! => {}\n".format(field)
err_msg += u"available response attributes: status_code, cookies, elapsed, headers, content, text, json, encoding, ok, reason, url.\n\n"
err_msg += u"available response attributes: status_code, cookies, elapsed, headers, content, " \
u"text, json, encoding, ok, reason, url.\n\n"
err_msg += u"If you want to set attribute in teardown_hooks, take the following example as reference:\n"
err_msg += u"response.new_attribute = 'new_attribute_value'\n"
logger.log_error(err_msg)

View File

@@ -1,5 +1,6 @@
# encoding: utf-8
from enum import Enum
from unittest.case import SkipTest
from httprunner import exceptions, logger, response, utils
@@ -8,6 +9,11 @@ from httprunner.context import SessionContext
from httprunner.validator import Validator
class HookTypeEnum(Enum):
SETUP = 1
TEARDOWN = 2
class Runner(object):
""" Running testcases.
@@ -74,11 +80,11 @@ class Runner(object):
self.session_context = SessionContext(config_variables)
if testcase_setup_hooks:
self.do_hook_actions(testcase_setup_hooks, "setup")
self.do_hook_actions(testcase_setup_hooks, HookTypeEnum.SETUP)
def __del__(self):
if self.testcase_teardown_hooks:
self.do_hook_actions(self.testcase_teardown_hooks, "teardown")
self.do_hook_actions(self.testcase_teardown_hooks, HookTypeEnum.TEARDOWN)
def __clear_test_data(self):
""" clear request and response data
@@ -131,10 +137,10 @@ class Runner(object):
format2 (str): only call hook functions.
${func()}
hook_type (enum): setup/teardown
hook_type (HookTypeEnum): setup/teardown
"""
logger.log_debug("call {} hook actions.".format(hook_type))
logger.log_debug("call {} hook actions.".format(hook_type.name))
for action in actions:
if isinstance(action, dict) and len(action) == 1:
@@ -215,7 +221,7 @@ class Runner(object):
# setup hooks
setup_hooks = test_dict.get("setup_hooks", [])
if setup_hooks:
self.do_hook_actions(setup_hooks, "setup")
self.do_hook_actions(setup_hooks, HookTypeEnum.SETUP)
try:
method = parsed_test_request.pop('method')
@@ -245,32 +251,7 @@ class Runner(object):
)
resp_obj = response.ResponseObject(resp)
# teardown hooks
teardown_hooks = test_dict.get("teardown_hooks", [])
if teardown_hooks:
self.session_context.update_test_variables("response", resp_obj)
self.do_hook_actions(teardown_hooks, "teardown")
self.http_client_session.update_last_req_resp_record(resp_obj)
# extract
extractors = test_dict.get("extract", {})
extracted_variables_mapping = resp_obj.extract_response(extractors)
self.session_context.update_session_variables(extracted_variables_mapping)
# validate
validators = test_dict.get("validate") or test_dict.get("validators") or []
validate_script = test_dict.get("validate_script", [])
if validate_script:
validators.append({
"type": "python_script",
"script": validate_script
})
validator = Validator(self.session_context, resp_obj)
try:
validator.validate(validators)
except (exceptions.ParamsError,
exceptions.ValidationFailure, exceptions.ExtractFailure):
def log_req_resp_details():
err_msg = "{} DETAILED REQUEST & RESPONSE {}\n".format("*" * 32, "*" * 32)
# log request
@@ -291,12 +272,39 @@ class Runner(object):
err_msg += "body: {}\n".format(repr(resp_obj.text))
logger.log_error(err_msg)
# teardown hooks
teardown_hooks = test_dict.get("teardown_hooks", [])
if teardown_hooks:
self.session_context.update_test_variables("response", resp_obj)
self.do_hook_actions(teardown_hooks, HookTypeEnum.TEARDOWN)
self.http_client_session.update_last_req_resp_record(resp_obj)
# extract
extractors = test_dict.get("extract", {})
try:
extracted_variables_mapping = resp_obj.extract_response(extractors)
self.session_context.update_session_variables(extracted_variables_mapping)
except (exceptions.ParamsError, exceptions.ExtractFailure):
log_req_resp_details()
raise
finally:
# get request/response data and validate results
self.meta_datas = getattr(self.http_client_session, "meta_data", {})
self.meta_datas["validators"] = validator.validation_results
# validate
validators = test_dict.get("validate") or test_dict.get("validators") or []
validate_script = test_dict.get("validate_script", [])
if validate_script:
validators.append({
"type": "python_script",
"script": validate_script
})
validator = Validator(self.session_context, resp_obj)
try:
validator.validate(validators)
except exceptions.ValidationFailure:
log_req_resp_details()
raise
return validator.validation_results
def _run_testcase(self, testcase_dict):
""" run single testcase.
@@ -374,13 +382,18 @@ class Runner(object):
self._run_testcase(test_dict)
else:
# api
validation_results = {}
try:
self._run_test(test_dict)
validation_results = self._run_test(test_dict)
except Exception:
# log exception request_type and name for locust stat
self.exception_request_type = test_dict["request"]["method"]
self.exception_name = test_dict.get("name")
raise
finally:
# get request/response data and validate results
self.meta_datas = getattr(self.http_client_session, "meta_data", {})
self.meta_datas["validators"] = validation_results
def export_variables(self, output_variables_list):
""" export current testcase variables

View File

@@ -571,6 +571,10 @@ def dump_json_file(json_data, json_file_abs_path):
except TypeError:
return str(obj)
file_foder_path = os.path.dirname(json_file_abs_path)
if not os.path.isdir(file_foder_path):
os.makedirs(file_foder_path)
try:
with io.open(json_file_abs_path, 'w', encoding='utf-8') as outfile:
if is_py2:
@@ -579,6 +583,7 @@ def dump_json_file(json_data, json_file_abs_path):
json_data,
indent=4,
separators=(',', ':'),
encoding="utf8",
ensure_ascii=False,
cls=PythonObjectEncoder
))
@@ -626,9 +631,6 @@ def prepare_dump_json_file_abs_path(project_mapping, tag_name):
test_file_name, _file_suffix = os.path.splitext(test_file)
dump_file_name = "{}.{}.json".format(test_file_name, tag_name)
if not os.path.isdir(file_foder_path):
os.makedirs(file_foder_path)
dumped_json_file_abs_path = os.path.join(file_foder_path, dump_file_name)
return dumped_json_file_abs_path