rename package name from ate to httprunner

This commit is contained in:
debugtalk
2017-11-07 11:01:23 +08:00
parent ec3e147a43
commit eec69275f3
29 changed files with 53 additions and 51 deletions

1
httprunner/__init__.py Normal file
View File

@@ -0,0 +1 @@
__version__ = '0.8.1a'

35
httprunner/built_in.py Normal file
View File

@@ -0,0 +1,35 @@
"""
Built-in dependent functions used in YAML/JSON testcases.
"""
import datetime
import random
import string
import time
from httprunner.exception import ParamsError
def gen_random_string(str_len):
""" generate random string with specified length
"""
return ''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(str_len))
def get_timestamp(str_len=13):
""" get timestamp string, length can only between 0 and 16
"""
if isinstance(str_len, int) and 0 < str_len < 17:
return str(time.time()).replace(".", "")[:str_len]
raise ParamsError("timestamp length can only between 0 and 16.")
def get_current_date(fmt="%Y-%m-%d"):
""" get current date, default format is %Y-%m-%d
"""
return datetime.datetime.now().strftime(fmt)
def sleep(sec):
""" sleep specified seconds
"""
time.sleep(sec)

127
httprunner/cli.py Normal file
View File

@@ -0,0 +1,127 @@
import argparse
import logging
import os
import sys
from collections import OrderedDict
import PyUnitReport
from PyUnitReport import __version__ as pyu_version
from httprunner import __version__ as ate_version
from httprunner import exception
from httprunner.task import TaskSuite
from httprunner.utils import create_scaffold
def main_ate():
""" API test: parse command line options and run commands.
"""
parser = argparse.ArgumentParser(
description='Api Test Engine.')
parser.add_argument(
'-V', '--version', dest='version', action='store_true',
help="show version")
parser.add_argument(
'testset_paths', nargs='*',
help="testset file path")
parser.add_argument(
'--log-level', default='INFO',
help="Specify logging level, default is INFO.")
parser.add_argument(
'--report-name',
help="Specify report name, default is generated time.")
parser.add_argument(
'--failfast', action='store_true', default=False,
help="Stop the test run on the first error or failure.")
parser.add_argument(
'--startproject',
help="Specify new project name.")
args = parser.parse_args()
if args.version:
print("HttpRunner version: {}".format(ate_version))
print("PyUnitReport version: {}".format(pyu_version))
exit(0)
log_level = getattr(logging, args.log_level.upper())
logging.basicConfig(level=log_level)
project_name = args.startproject
if project_name:
project_path = os.path.join(os.getcwd(), project_name)
create_scaffold(project_path)
exit(0)
report_name = args.report_name
if report_name and len(args.testset_paths) > 1:
report_name = None
logging.warning("More than one testset paths specified, \
report name is ignored, use generated time instead.")
results = {}
success = True
for testset_path in set(args.testset_paths):
testset_path = testset_path.rstrip('/')
try:
task_suite = TaskSuite(testset_path)
except exception.TestcaseNotFound:
success = False
continue
output_folder_name = os.path.basename(os.path.splitext(testset_path)[0])
kwargs = {
"output": output_folder_name,
"report_name": report_name,
"failfast": args.failfast
}
result = PyUnitReport.HTMLTestRunner(**kwargs).run(task_suite)
results[testset_path] = OrderedDict({
"total": result.testsRun,
"successes": len(result.successes),
"failures": len(result.failures),
"errors": len(result.errors),
"skipped": len(result.skipped)
})
if len(result.successes) != result.testsRun:
success = False
for task in task_suite.tasks:
task.print_output()
return 0 if success is True else 1
def main_locust():
""" Performance test with locust: parse command line options and run commands.
"""
try:
from httprunner import locusts
except ImportError:
print("Locust is not installed, exit.")
exit(1)
sys.argv[0] = 'locust'
if len(sys.argv) == 1:
sys.argv.extend(["-h"])
if sys.argv[1] in ["-h", "--help", "-V", "--version"]:
locusts.main()
sys.exit(0)
try:
testcase_index = sys.argv.index('-f') + 1
assert testcase_index < len(sys.argv)
except (ValueError, AssertionError):
print("Testcase file is not specified, exit.")
sys.exit(1)
testcase_file_path = sys.argv[testcase_index]
sys.argv[testcase_index] = locusts.parse_locustfile(testcase_file_path)
if "--full-speed" in sys.argv:
locusts.run_locusts_at_full_speed(sys.argv)
else:
locusts.main()

164
httprunner/client.py Normal file
View File

@@ -0,0 +1,164 @@
import json
import logging
import re
import time
import requests
from httprunner.exception import ParamsError
from requests import Request, Response
from requests.exceptions import (InvalidSchema, InvalidURL, MissingSchema,
RequestException)
absolute_http_url_regexp = re.compile(r"^https?://", re.I)
def prepare_kwargs(method, kwargs):
if method == "POST":
# if request content-type is application/json, request data should be dumped
content_type = kwargs.get("headers", {}).get("content-type", "")
if content_type.startswith("application/json") and "data" in kwargs:
kwargs["data"] = json.dumps(kwargs["data"])
class ApiResponse(Response):
def raise_for_status(self):
if hasattr(self, 'error') and self.error:
raise self.error
Response.raise_for_status(self)
class HttpSession(requests.Session):
"""
Class for performing HTTP requests and holding (session-) cookies between requests (in order
to be able to log in and out of websites). Each request is logged so that HttpRunner can
display statistics.
This is a slightly extended version of `python-request <http://python-requests.org>`_'s
:py:class:`requests.Session` class and mostly this class works exactly the same. However
the methods for making requests (get, post, delete, put, head, options, patch, request)
can now take a *url* argument that's only the path part of the URL, in which case the host
part of the URL will be prepended with the HttpSession.base_url which is normally inherited
from a HttpRunner class' host property.
"""
def __init__(self, base_url=None, *args, **kwargs):
super(HttpSession, self).__init__(*args, **kwargs)
self.base_url = base_url if base_url else ""
def _build_url(self, path):
""" prepend url with hostname unless it's already an absolute URL """
if absolute_http_url_regexp.match(path):
return path
elif self.base_url:
return "%s%s" % (self.base_url, path)
else:
raise ParamsError("base url missed!")
def request(self, method, url, name=None, **kwargs):
"""
Constructs and sends a :py:class:`requests.Request`.
Returns :py:class:`requests.Response` object.
:param method:
method for the new :class:`Request` object.
:param url:
URL for the new :class:`Request` object.
:param name: (optional)
Placeholder, make compatible with Locust's HttpSession
:param params: (optional)
Dictionary or bytes to be sent in the query string for the :class:`Request`.
:param data: (optional)
Dictionary or bytes to send in the body of the :class:`Request`.
:param headers: (optional)
Dictionary of HTTP Headers to send with the :class:`Request`.
:param cookies: (optional)
Dict or CookieJar object to send with the :class:`Request`.
:param files: (optional)
Dictionary of ``'filename': file-like-objects`` for multipart encoding upload.
:param auth: (optional)
Auth tuple or callable to enable Basic/Digest/Custom HTTP Auth.
:param timeout: (optional)
How long to wait for the server to send data before giving up, as a float, or \
a (`connect timeout, read timeout <user/advanced.html#timeouts>`_) tuple.
:type timeout: float or tuple
:param allow_redirects: (optional)
Set to True by default.
:type allow_redirects: bool
:param proxies: (optional)
Dictionary mapping protocol to the URL of the proxy.
:param stream: (optional)
whether to immediately download the response content. Defaults to ``False``.
:param verify: (optional)
if ``True``, the SSL cert will be verified. A CA_BUNDLE path can also be provided.
:param cert: (optional)
if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair.
"""
# prepend url with hostname unless it's already an absolute URL
url = self._build_url(url)
logging.info(" Start to {method} {url}".format(method=method, url=url))
logging.debug(" kwargs: {kwargs}".format(kwargs=kwargs))
# store meta data that is used when reporting the request to locust's statistics
request_meta = {}
# set up pre_request hook for attaching meta data to the request object
request_meta["method"] = method
request_meta["start_time"] = time.time()
if "httpntlmauth" in kwargs:
from requests_ntlm import HttpNtlmAuth
auth_account = kwargs.pop("httpntlmauth")
kwargs["auth"] = HttpNtlmAuth(
auth_account["username"], auth_account["password"])
response = self._send_request_safe_mode(method, url, **kwargs)
request_meta["url"] = (response.history and response.history[0] or response)\
.request.path_url
# record the consumed time
request_meta["response_time"] = int((time.time() - request_meta["start_time"]) * 1000)
# get the length of the content, but if the argument stream is set to True, we take
# the size from the content-length header, in order to not trigger fetching of the body
if kwargs.get("stream", False):
request_meta["content_size"] = int(response.headers.get("content-length") or 0)
else:
request_meta["content_size"] = len(response.content or "")
request_meta["request_headers"] = response.request.headers
request_meta["request_body"] = response.request.body
request_meta["status_code"] = response.status_code
request_meta["response_headers"] = response.headers
request_meta["response_content"] = response.content
logging.debug(" response: {response}".format(response=request_meta))
try:
response.raise_for_status()
except RequestException as e:
logging.error(u" Failed to {method} {url}! exception msg: {exception}".format(
method=method, url=url, exception=str(e)))
else:
logging.info(
""" status_code: {}, response_time: {} ms, response_length: {} bytes"""\
.format(request_meta["status_code"], request_meta["response_time"], \
request_meta["content_size"]))
return response
def _send_request_safe_mode(self, method, url, **kwargs):
"""
Send a HTTP request, and catch any exception that might occur due to connection problems.
Safe mode has been removed from requests 1.x.
"""
try:
prepare_kwargs(method, kwargs)
return requests.Session.request(self, method, url, **kwargs)
except (MissingSchema, InvalidSchema, InvalidURL):
raise
except RequestException as ex:
resp = ApiResponse()
resp.error = ex
resp.status_code = 0 # with this status_code, content returns None
resp.request = Request(method, url).prepare()
return resp

171
httprunner/context.py Normal file
View File

@@ -0,0 +1,171 @@
import copy
import os
import re
import sys
from collections import OrderedDict
from httprunner import utils
from httprunner.testcase import TestcaseParser
class Context(object):
""" Manages context functions and variables.
context has two levels, testset and testcase.
"""
def __init__(self):
self.testset_shared_variables_mapping = OrderedDict()
self.testcase_variables_mapping = OrderedDict()
self.testcase_parser = TestcaseParser()
self.init_context()
def init_context(self, level='testset'):
"""
testset level context initializes when a file is loaded,
testcase level context initializes when each testcase starts.
"""
if level == "testset":
self.testset_functions_config = {}
self.testset_request_config = {}
self.testset_shared_variables_mapping = OrderedDict()
# testcase config shall inherit from testset configs,
# but can not change testset configs, that's why we use copy.deepcopy here.
self.testcase_functions_config = copy.deepcopy(self.testset_functions_config)
self.testcase_variables_mapping = copy.deepcopy(self.testset_shared_variables_mapping)
self.testcase_parser.bind_functions(self.testcase_functions_config)
self.testcase_parser.update_binded_variables(self.testcase_variables_mapping)
if level == "testset":
self.import_module_items(["httprunner.built_in"], "testset")
def config_context(self, config_dict, level):
if level == "testset":
self.testcase_parser.file_path = config_dict.get("path", None)
requires = config_dict.get('requires', [])
self.import_requires(requires)
function_binds = config_dict.get('function_binds', {})
self.bind_functions(function_binds, level)
# import_module_functions will be deprecated soon
module_items = config_dict.get('import_module_items', []) \
or config_dict.get('import_module_functions', [])
self.import_module_items(module_items, level)
variables = config_dict.get('variables') \
or config_dict.get('variable_binds', OrderedDict())
self.bind_variables(variables, level)
def import_requires(self, modules):
""" import required modules dynamically
"""
for module_name in modules:
globals()[module_name] = utils.get_imported_module(module_name)
def bind_functions(self, function_binds, level="testcase"):
""" Bind named functions within the context
This allows for passing in self-defined functions in testing.
e.g. function_binds:
{
"add_one": lambda x: x + 1, # lambda function
"add_two_nums": "lambda x, y: x + y" # lambda function in string
}
"""
eval_function_binds = {}
for func_name, function in function_binds.items():
if isinstance(function, str):
function = eval(function)
eval_function_binds[func_name] = function
self.__update_context_functions_config(level, eval_function_binds)
def import_module_items(self, modules, level="testcase"):
""" import modules and bind all functions within the context
"""
sys.path.insert(0, os.getcwd())
for module_name in modules:
imported_module = utils.get_imported_module(module_name)
imported_functions_dict = utils.filter_module(imported_module, "function")
self.__update_context_functions_config(level, imported_functions_dict)
imported_variables_dict = utils.filter_module(imported_module, "variable")
self.bind_variables(imported_variables_dict, level)
def bind_variables(self, variables, level="testcase"):
""" bind variables to testset context or current testcase context.
variables in testset context can be used in all testcases of current test suite.
@param (list or OrderDict) variables, variable can be value or custom function.
if value is function, it will be called and bind result to variable.
e.g.
OrderDict({
"TOKEN": "debugtalk",
"random": "${gen_random_string(5)}",
"json": {'name': 'user', 'password': '123456'},
"md5": "${gen_md5($TOKEN, $json, $random)}"
})
"""
if isinstance(variables, list):
variables = utils.convert_to_order_dict(variables)
for variable_name, value in variables.items():
variable_evale_value = self.testcase_parser.parse_content_with_bindings(value)
if level == "testset":
self.testset_shared_variables_mapping[variable_name] = variable_evale_value
self.testcase_variables_mapping[variable_name] = variable_evale_value
self.testcase_parser.update_binded_variables(self.testcase_variables_mapping)
def bind_extracted_variables(self, variables):
""" bind extracted variables to testset context
@param (OrderDict) variables
extracted value do not need to evaluate.
"""
for variable_name, value in variables.items():
self.testset_shared_variables_mapping[variable_name] = value
self.testcase_variables_mapping[variable_name] = value
self.testcase_parser.update_binded_variables(self.testcase_variables_mapping)
def __update_context_functions_config(self, level, config_mapping):
"""
@param level: testset or testcase
@param config_type: functions
@param config_mapping: functions config mapping
"""
if level == "testset":
self.testset_functions_config.update(config_mapping)
self.testcase_functions_config.update(config_mapping)
self.testcase_parser.bind_functions(self.testcase_functions_config)
def get_parsed_request(self, request_dict, level="testcase"):
""" get parsed request with bind variables and functions.
@param request_dict: request config mapping
@param level: testset or testcase
"""
if level == "testset":
request_dict = self.testcase_parser.parse_content_with_bindings(
request_dict
)
self.testset_request_config.update(request_dict)
testcase_request_config = utils.deep_update_dict(
copy.deepcopy(self.testset_request_config),
request_dict
)
parsed_request = self.testcase_parser.parse_content_with_bindings(
testcase_request_config
)
return parsed_request
def get_testcase_variables_mapping(self):
return self.testcase_variables_mapping
def exec_content_functions(self, content):
""" execute functions in content.
"""
self.testcase_parser.eval_content_functions(content)

48
httprunner/exception.py Normal file
View File

@@ -0,0 +1,48 @@
#coding: utf-8
import json
try:
FileNotFoundError = FileNotFoundError
except NameError:
FileNotFoundError = IOError
try:
JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
JSONDecodeError = ValueError
class MyBaseError(BaseException):
pass
class FileFormatError(MyBaseError):
pass
class ParamsError(MyBaseError):
pass
class ResponseError(MyBaseError):
pass
class ParseResponseError(MyBaseError):
pass
class ValidationError(MyBaseError):
pass
class NotFoundError(MyBaseError):
pass
class FunctionNotFound(NotFoundError):
pass
class VariableNotFound(NotFoundError):
pass
class ApiNotFound(NotFoundError):
pass
class SuiteNotFound(NotFoundError):
pass
class TestcaseNotFound(NotFoundError):
pass

View File

@@ -0,0 +1,21 @@
#coding: utf-8
import zmq
from locust import HttpLocust, TaskSet, task
from locust.events import request_failure
from httprunner import runner
class WebPageTasks(TaskSet):
def on_start(self):
self.test_runner = runner.Runner(self.client, request_failure)
@task
def test_specified_scenario(self):
self.test_runner.run(self.locust.file_path)
class WebPageUser(HttpLocust):
host = "$HOST"
task_set = WebPageTasks
min_wait = 1000
max_wait = 5000
file_path = "$TESTCASE_FILE"

75
httprunner/locusts.py Normal file
View File

@@ -0,0 +1,75 @@
import codecs
import multiprocessing
import os
import sys
from httprunner.testcase import load_test_file
from locust.main import main
def parse_locustfile(file_path):
""" parse testcase file and return locustfile path.
if file_path is a Python file, assume it is a locustfile
if file_path is a YAML/JSON file, convert it to locustfile
"""
if not os.path.isfile(file_path):
print("file path invalid, exit.")
sys.exit(1)
file_suffix = os.path.splitext(file_path)[1]
if file_suffix == ".py":
locustfile_path = file_path
elif file_suffix in ['.yaml', '.yml', '.json']:
locustfile_path = gen_locustfile(file_path)
else:
# '' or other suffix
print("file type should be YAML/JSON/Python, exit.")
sys.exit(1)
return locustfile_path
def gen_locustfile(testcase_file_path):
""" generate locustfile from template.
"""
locustfile_path = 'locustfile.py'
template_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'locustfile_template'
)
testset = load_test_file(testcase_file_path)
host = testset.get("config", {}).get("request", {}).get("base_url", "")
with codecs.open(template_path, encoding='utf-8') as template:
with codecs.open(locustfile_path, 'w', encoding='utf-8') as locustfile:
template_content = template.read()
template_content = template_content.replace("$HOST", host)
template_content = template_content.replace("$TESTCASE_FILE", testcase_file_path)
locustfile.write(template_content)
return locustfile_path
def start_master(sys_argv):
sys_argv.append("--master")
sys.argv = sys_argv
main()
def start_slave(sys_argv):
sys_argv.extend(["--slave"])
sys.argv = sys_argv
main()
def run_locusts_at_full_speed(sys_argv):
sys_argv.pop(sys_argv.index("--full-speed"))
slaves_num = multiprocessing.cpu_count()
processes = []
for _ in range(slaves_num):
p_slave = multiprocessing.Process(target=start_slave, args=(sys_argv,))
p_slave.daemon = True
p_slave.start()
processes.append(p_slave)
try:
start_master(sys_argv)
except KeyboardInterrupt:
sys.exit(0)

173
httprunner/response.py Normal file
View File

@@ -0,0 +1,173 @@
import logging
import re
from collections import OrderedDict
from httprunner import exception, utils
from requests.structures import CaseInsensitiveDict
text_extractor_regexp_compile = re.compile(r".*\(.*\).*")
class ResponseObject(object):
def __init__(self, resp_obj):
""" initialize with a requests.Response object
@param (requests.Response instance) resp_obj
"""
self.resp_obj = resp_obj
self.resp_text = resp_obj.text
self.resp_body = self.parsed_body()
def parsed_body(self):
try:
return self.resp_obj.json()
except ValueError:
return self.resp_text
def parsed_dict(self):
return {
'status_code': self.resp_obj.status_code,
'headers': self.resp_obj.headers,
'body': self.resp_body
}
def _extract_field_with_regex(self, field):
""" extract field from response content with regex.
requests.Response body could be json or html text.
@param (str) field should only be regex string that matched r".*\(.*\).*"
e.g.
self.resp_text: "LB123abcRB789"
field: "LB[\d]*(.*)RB[\d]*"
return: abc
"""
matched = re.search(field, self.resp_text)
if not matched:
err_msg = u"Extractor error: failed to extract data with regex!\n"
err_msg += u"response body: {}\n".format(self.resp_text)
err_msg += u"regex: {}\n".format(field)
logging.error(err_msg)
raise exception.ParamsError(err_msg)
return matched.group(1)
def _extract_field_with_delimiter(self, field):
""" response content could be json or html text.
@param (str) field should be string joined by delimiter.
e.g.
"status_code"
"content"
"headers.content-type"
"content.person.name.first_name"
"""
try:
# string.split(sep=None, maxsplit=-1) -> list of strings
# e.g. "content.person.name" => ["content", "person.name"]
try:
top_query, sub_query = field.split('.', 1)
except ValueError:
top_query = field
sub_query = None
if top_query in ["body", "content", "text"]:
top_query_content = self.parsed_body()
else:
top_query_content = getattr(self.resp_obj, top_query)
if sub_query:
if not isinstance(top_query_content, (dict, CaseInsensitiveDict, list)):
err_msg = u"Extractor error: failed to extract data with regex!\n"
err_msg += u"response: {}\n".format(self.parsed_dict())
err_msg += u"regex: {}\n".format(field)
logging.error(err_msg)
raise exception.ParamsError(err_msg)
# e.g. key: resp_headers_content_type, sub_query = "content-type"
return utils.query_json(top_query_content, sub_query)
else:
# e.g. key: resp_status_code, resp_content
return top_query_content
except AttributeError:
err_msg = u"Failed to extract value from response!\n"
err_msg += u"response: {}\n".format(self.parsed_dict())
err_msg += u"extract field: {}\n".format(field)
logging.error(err_msg)
raise exception.ParamsError(err_msg)
def extract_field(self, field):
""" extract value from requests.Response.
"""
if text_extractor_regexp_compile.match(field):
return self._extract_field_with_regex(field)
else:
return self._extract_field_with_delimiter(field)
def extract_response(self, extractors):
""" extract value from requests.Response and store in OrderedDict.
@param (list) extractors
[
{"resp_status_code": "status_code"},
{"resp_headers_content_type": "headers.content-type"},
{"resp_content": "content"},
{"resp_content_person_first_name": "content.person.name.first_name"}
]
@return (OrderDict) variable binds ordered dict
"""
extracted_variables_mapping = OrderedDict()
extract_binds_order_dict = utils.convert_to_order_dict(extractors)
for key, field in extract_binds_order_dict.items():
if not isinstance(field, utils.string_type):
raise exception.ParamsError("invalid extractors in testcase!")
extracted_variables_mapping[key] = self.extract_field(field)
return extracted_variables_mapping
def validate(self, validators, variables_mapping):
""" Bind named validators to value within the context.
@param (list) validators
[
{"check": "status_code", "comparator": "eq", "expected": 201},
{"check": "resp_body_success", "comparator": "eq", "expected": True}
]
@param (dict) variables_mapping
{
"resp_body_success": True
}
@return (list) content differences
[
{
"check": "status_code",
"comparator": "eq", "expected": 201, "value": 200
}
]
"""
for validator_dict in validators:
check_item = validator_dict.get("check")
if not check_item:
raise exception.ParamsError("invalid check item in testcase validators!")
if "expected" not in validator_dict:
raise exception.ParamsError("expected item missed in testcase validators!")
expected = validator_dict.get("expected")
comparator = validator_dict.get("comparator", "eq")
if check_item in variables_mapping:
validator_dict["actual_value"] = variables_mapping[check_item]
else:
try:
validator_dict["actual_value"] = self.extract_field(check_item)
except exception.ParseResponseError:
raise exception.ParseResponseError("failed to extract check item in response!")
utils.match_expected(
validator_dict["actual_value"],
expected,
comparator,
check_item
)
return True

246
httprunner/runner.py Normal file
View File

@@ -0,0 +1,246 @@
import logging
from httprunner import exception, response, testcase, utils
from httprunner.client import HttpSession
from httprunner.context import Context
class Runner(object):
def __init__(self, http_client_session=None, request_failure_hook=None):
self.http_client_session = http_client_session
self.context = Context()
testcase.load_test_dependencies()
self.request_failure_hook = request_failure_hook
def init_config(self, config_dict, level):
""" create/update context variables binds
@param (dict) config_dict
@param (str) level, "testset" or "testcase"
testset:
{
"name": "smoke testset",
"path": "tests/data/demo_testset_variables.yml",
"requires": [], # optional
"function_binds": {}, # optional
"import_module_items": [], # optional
"variables": [], # optional
"request": {
"base_url": "http://127.0.0.1:5000",
"headers": {
"User-Agent": "iOS/2.8.3"
}
}
}
testcase:
{
"name": "testcase description",
"requires": [], # optional
"function_binds": {}, # optional
"import_module_items": [], # optional
"variables": [], # optional
"request": {
"url": "/api/get-token",
"method": "POST",
"headers": {
"Content-Type": "application/json"
}
},
"json": {
"sign": "f1219719911caae89ccc301679857ebfda115ca2"
}
}
@param (str) context level, testcase or testset
"""
# convert keys in request headers to lowercase
config_dict = utils.lower_config_dict_key(config_dict)
self.context.init_context(level)
self.context.config_context(config_dict, level)
request_config = config_dict.get('request', {})
parsed_request = self.context.get_parsed_request(request_config, level)
base_url = parsed_request.pop("base_url", None)
self.http_client_session = self.http_client_session or HttpSession(base_url)
return parsed_request
def _run_test(self, testcase_dict):
""" run single testcase.
@param (dict) testcase_dict
{
"name": "testcase description",
"times": 3,
"requires": [], # optional, override
"function_binds": {}, # optional, override
"variables": [], # optional, override
"request": {
"url": "http://127.0.0.1:5000/api/users/1000",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random"
},
"body": '{"name": "user", "password": "123456"}'
},
"extract": [], # optional
"validate": [], # optional
"setup": [], # optional
"teardown": [] # optional
}
@return True or raise exception during test
"""
parsed_request = self.init_config(testcase_dict, level="testcase")
try:
url = parsed_request.pop('url')
method = parsed_request.pop('method')
group_name = parsed_request.pop("group", None)
except KeyError:
raise exception.ParamsError("URL or METHOD missed!")
run_times = int(testcase_dict.get("times", 1))
extractors = testcase_dict.get("extract") \
or testcase_dict.get("extractors") \
or testcase_dict.get("extract_binds", [])
validators = testcase_dict.get("validate") \
or testcase_dict.get("validators", [])
setup_actions = testcase_dict.get("setup", [])
teardown_actions = testcase_dict.get("teardown", [])
def setup_teardown(actions):
for action in actions:
self.context.exec_content_functions(action)
for _ in range(run_times):
setup_teardown(setup_actions)
resp = self.http_client_session.request(
method,
url,
name=group_name,
**parsed_request
)
resp_obj = response.ResponseObject(resp)
extracted_variables_mapping = resp_obj.extract_response(extractors)
self.context.bind_extracted_variables(extracted_variables_mapping)
try:
resp_obj.validate(validators, self.context.get_testcase_variables_mapping())
except (exception.ParamsError, exception.ResponseError, exception.ValidationError):
err_msg = u"Exception occured.\n"
err_msg += u"HTTP request url: {}\n".format(url)
err_msg += u"HTTP request kwargs: {}\n".format(parsed_request)
err_msg += u"HTTP response status_code: {}\n".format(resp.status_code)
err_msg += u"HTTP response content: \n{}".format(resp.text)
logging.error(err_msg)
raise
setup_teardown(teardown_actions)
return True
def _run_testset(self, testset, variables_mapping=None):
""" run single testset, including one or several testcases.
@param
(dict) testset
{
"name": "testset description",
"config": {
"name": "testset description",
"requires": [],
"function_binds": {},
"variables": [],
"request": {}
},
"testcases": [
{
"name": "testcase description",
"variables": [], # optional, override
"request": {},
"extract": {}, # optional
"validate": {} # optional
},
testcase12
]
}
(dict) variables_mapping:
passed in variables mapping, it will override variables in config block
@return (dict) test result of testset
{
"success": True,
"output": {} # variables mapping
}
"""
success = True
config_dict = testset.get("config", {})
variables = config_dict.get("variables", [])
variables_mapping = variables_mapping or {}
config_dict["variables"] = utils.override_variables_binds(variables, variables_mapping)
self.init_config(config_dict, level="testset")
testcases = testset.get("testcases", [])
for testcase_dict in testcases:
try:
self._run_test(testcase_dict)
except exception.MyBaseError as ex:
success = False
if self.request_failure_hook:
self.request_failure_hook.fire(
request_type=testcase_dict.get("request", {}).get("method"),
name=testcase_dict.get("request", {}).get("url"),
response_time=0,
exception=ex
)
break
output_variables_list = config_dict.get("output", [])
return {
"success": success,
"output": self.generate_output(output_variables_list)
}
def run(self, path, mapping=None):
""" run specified testset path or folder path.
@param
path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
(dict) mapping:
passed in variables mapping, it will override variables in config block
"""
success = True
mapping = mapping or {}
output = {}
testsets = testcase.load_testcases_by_path(path)
for testset in testsets:
try:
result = self._run_testset(testset, mapping)
assert result["success"]
output.update(result["output"])
except AssertionError:
success = False
return {
"success": success,
"output": output
}
def generate_output(self, output_variables_list):
""" generate and print output
"""
variables_mapping = self.context.get_testcase_variables_mapping()
output = {
variable: variables_mapping[variable]
for variable in output_variables_list
}
utils.print_output(output)
return output

64
httprunner/task.py Normal file
View File

@@ -0,0 +1,64 @@
import logging
import unittest
from httprunner import exception, runner, testcase, utils
class ApiTestCase(unittest.TestCase):
""" create a testcase.
"""
def __init__(self, test_runner, testcase_dict):
super(ApiTestCase, self).__init__()
self.test_runner = test_runner
self.testcase_dict = testcase_dict
def runTest(self):
""" run testcase and check result.
"""
self.assertTrue(self.test_runner._run_test(self.testcase_dict))
class ApiTestSuite(unittest.TestSuite):
""" create test suite with a testset, it may include one or several testcases.
each suite should initialize a separate Runner() with testset config.
"""
def __init__(self, testset):
super(ApiTestSuite, self).__init__()
self.test_runner = runner.Runner()
self.config_dict = testset.get("config", {})
self.test_runner.init_config(self.config_dict, level="testset")
testcases = testset.get("testcases", [])
self._add_tests_to_suite(testcases)
def _add_tests_to_suite(self, testcases):
for testcase_dict in testcases:
if utils.PYTHON_VERSION == 3:
ApiTestCase.runTest.__doc__ = testcase_dict['name']
else:
ApiTestCase.runTest.__func__.__doc__ = testcase_dict['name']
test = ApiTestCase(self.test_runner, testcase_dict)
self.addTest(test)
def print_output(self):
output_variables_list = self.config_dict.get("output", [])
self.test_runner.generate_output(output_variables_list)
class TaskSuite(unittest.TestSuite):
""" create test task suite with specified testcase path.
each task suite may include one or several test suite.
"""
def __init__(self, testcase_path):
super(TaskSuite, self).__init__()
self.suite_list = []
testsets = testcase.load_testcases_by_path(testcase_path)
if not testsets:
raise exception.TestcaseNotFound
for testset in testsets:
suite = ApiTestSuite(testset)
self.addTest(suite)
self.suite_list.append(suite)
@property
def tasks(self):
return self.suite_list

547
httprunner/testcase.py Normal file
View File

@@ -0,0 +1,547 @@
import ast
import codecs
import json
import logging
import os
import re
import yaml
from httprunner import exception, utils
variable_regexp = r"\$([\w_]+)"
function_regexp = r"\$\{([\w_]+\([\$\w_ =,]*\))\}"
function_regexp_compile = re.compile(r"^([\w_]+)\(([\$\w_ =,]*)\)$")
test_def_overall_dict = {
"loaded": False,
"api": {},
"suite": {}
}
testcases_cache_mapping = {}
def _load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with codecs.open(yaml_file, 'r+', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
check_format(yaml_file, yaml_content)
return yaml_content
def _load_json_file(json_file):
""" load json file and check file content format
"""
with codecs.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exception.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logging.error(err_msg)
raise exception.FileFormatError(err_msg)
check_format(json_file, json_content)
return json_content
def _load_file(testcase_file_path):
file_suffix = os.path.splitext(testcase_file_path)[1]
if file_suffix == '.json':
return _load_json_file(testcase_file_path)
elif file_suffix in ['.yaml', '.yml']:
return _load_yaml_file(testcase_file_path)
else:
# '' or other suffix
err_msg = u"file is not in YAML/JSON format: {}".format(testcase_file_path)
logging.warning(err_msg)
return []
def extract_variables(content):
""" extract all variable names from content, which is in format $variable
@param (str) content
@return (list) variable name list
e.g. $variable => ["variable"]
/blog/$postid => ["postid"]
/$var1/$var2 => ["var1", "var2"]
abc => []
"""
try:
return re.findall(variable_regexp, content)
except TypeError:
return []
def extract_functions(content):
""" extract all functions from string content, which are in format ${fun()}
@param (str) content
@return (list) functions list
e.g. ${func(5)} => ["func(5)"]
${func(a=1, b=2)} => ["func(a=1, b=2)"]
/api/1000?_t=${get_timestamp()} => ["get_timestamp()"]
/api/${add(1, 2)} => ["add(1, 2)"]
"/api/${add(1, 2)}?_t=${get_timestamp()}" => ["add(1, 2)", "get_timestamp()"]
"""
try:
return re.findall(function_regexp, content)
except TypeError:
return []
def parse_string_value(str_value):
""" parse string to number if possible
e.g. "123" => 123
"12.2" => 12.3
"abc" => "abc"
"$var" => "$var"
"""
try:
return ast.literal_eval(str_value)
except ValueError:
return str_value
except SyntaxError:
# e.g. $var, ${func}
return str_value
def parse_function(content):
""" parse function name and args from string content.
@param (str) content
@return (dict) function name and args
e.g. func() => {'func_name': 'func', 'args': [], 'kwargs': {}}
func(5) => {'func_name': 'func', 'args': [5], 'kwargs': {}}
func(1, 2) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {}}
func(a=1, b=2) => {'func_name': 'func', 'args': [], 'kwargs': {'a': 1, 'b': 2}}
func(1, 2, a=3, b=4) => {'func_name': 'func', 'args': [1, 2], 'kwargs': {'a':3, 'b':4}}
"""
function_meta = {
"args": [],
"kwargs": {}
}
matched = function_regexp_compile.match(content)
function_meta["func_name"] = matched.group(1)
args_str = matched.group(2).replace(" ", "")
if args_str == "":
return function_meta
args_list = args_str.split(',')
for arg in args_list:
if '=' in arg:
key, value = arg.split('=')
function_meta["kwargs"][key] = parse_string_value(value)
else:
function_meta["args"].append(parse_string_value(arg))
return function_meta
def load_test_dependencies():
""" load all api and suite definitions.
default api folder is "$CWD/tests/api/".
default suite folder is "$CWD/tests/suite/".
"""
test_def_overall_dict["loaded"] = True
test_def_overall_dict["api"] = {}
test_def_overall_dict["suite"] = {}
# load api definitions
api_def_folder = os.path.join(os.getcwd(), "tests", "api")
api_files = utils.load_folder_files(api_def_folder)
for test_file in api_files:
testset = load_test_file(test_file)
test_def_overall_dict["api"].update(testset["api"])
# load suite definitions
suite_def_folder = os.path.join(os.getcwd(), "tests", "suite")
suite_files = utils.load_folder_files(suite_def_folder)
for suite_file in suite_files:
suite = load_test_file(suite_file)
if "def" not in suite["config"]:
raise exception.ParamsError("def missed in suite file: {}!".format(suite_file))
call_func = suite["config"]["def"]
function_meta = parse_function(call_func)
suite["function_meta"] = function_meta
test_def_overall_dict["suite"][function_meta["func_name"]] = suite
def load_testcases_by_path(path):
""" load testcases from file path
@param path: path could be in several type
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
@return testcase sets list, each testset is corresponding to a file
[
testset_dict_1,
testset_dict_2
]
"""
if isinstance(path, (list, set)):
testsets = []
for file_path in set(path):
testset = load_testcases_by_path(file_path)
if not testset:
continue
testsets.extend(testset)
return testsets
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = utils.load_folder_files(path)
testcases_list = load_testcases_by_path(files_list)
elif os.path.isfile(path):
try:
testset = load_test_file(path)
if testset["testcases"] or testset["api"]:
testcases_list = [testset]
else:
testcases_list = []
except exception.FileFormatError:
testcases_list = []
else:
logging.error(u"file not found: {}".format(path))
testcases_list = []
testcases_cache_mapping[path] = testcases_list
return testcases_list
def load_test_file(file_path):
""" load testset file, get testset data structure.
@param file_path: absolute valid testset file path
@return testset dict
{
"name": "desc1",
"config": {},
"api": {},
"testcases": [testcase11, testcase12]
}
"""
testset = {
"name": "",
"config": {
"path": file_path
},
"api": {},
"testcases": []
}
tests_list = _load_file(file_path)
for item in tests_list:
for key in item:
if key == "config":
testset["config"].update(item["config"])
testset["name"] = item["config"].get("name", "")
elif key == "test":
test_block_dict = item["test"]
if "api" in test_block_dict:
ref_name = test_block_dict["api"]
test_info = get_testinfo_by_reference(ref_name, "api")
test_block_dict.update(test_info)
testset["testcases"].append(test_block_dict)
elif "suite" in test_block_dict:
ref_name = test_block_dict["suite"]
test_info = get_testinfo_by_reference(ref_name, "suite")
testset["testcases"].extend(test_info["testcases"])
else:
testset["testcases"].append(test_block_dict)
elif key == "api":
api_def = item["api"].pop("def")
function_meta = parse_function(api_def)
func_name = function_meta["func_name"]
api_info = {}
api_info["function_meta"] = function_meta
api_info.update(item["api"])
testset["api"][func_name] = api_info
return testset
def get_testinfo_by_reference(ref_name, ref_type):
""" get test content by reference name
@params:
ref_name: reference name, e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type: "api" or "suite"
"""
function_meta = parse_function(ref_name)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
test_info = get_test_definition(func_name, ref_type)
def_args = test_info.get("function_meta").get("args", [])
if len(call_args) != len(def_args):
raise exception.ParamsError("call args mismatch defined args!")
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
test_info = substitute_variables_with_mapping(test_info, args_mapping)
return test_info
def get_test_definition(name, ref_type):
""" get expected api or suite.
@params:
name: api or suite name
ref_type: "api" or "suite"
@return
expected api info if found, otherwise raise ApiNotFound exception
"""
if not test_def_overall_dict.get("loaded", False):
load_test_dependencies()
test_info = test_def_overall_dict.get(ref_type, {}).get(name)
if not test_info:
err_msg = "{} {} not found!".format(ref_type, name)
if ref_type == "api":
raise exception.ApiNotFound(err_msg)
elif ref_type == "suite":
raise exception.SuiteNotFound(err_msg)
else:
raise exception.ParamsError("ref_type can only be api or suite!")
return test_info
def substitute_variables_with_mapping(content, mapping):
""" substitute variables in content with mapping
e.g.
@params
content = {
'request': {
'url': '/api/users/$uid',
'headers': {'token': '$token'}
}
}
mapping = {"$uid": 1000}
@return
{
'request': {
'url': '/api/users/1000',
'headers': {'token': '$token'}
}
}
"""
if isinstance(content, bool):
return content
if isinstance(content, (int, utils.long_type, float, complex)):
return content
if not content:
return content
if isinstance(content, (list, set, tuple)):
return [
substitute_variables_with_mapping(item, mapping)
for item in content
]
if isinstance(content, dict):
substituted_data = {}
for key, value in content.items():
eval_key = substitute_variables_with_mapping(key, mapping)
eval_value = substitute_variables_with_mapping(value, mapping)
substituted_data[eval_key] = eval_value
return substituted_data
# content is in string format here
for var, value in mapping.items():
if content == var:
# content is a variable
content = value
else:
content = content.replace(var, str(value))
return content
def check_format(file_path, content):
""" check testcase format if valid
"""
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logging.error(err_msg)
raise exception.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logging.error(err_msg)
raise exception.FileFormatError(err_msg)
class TestcaseParser(object):
def __init__(self, variables={}, functions={}, file_path=None):
self.update_binded_variables(variables)
self.bind_functions(functions)
self.file_path = file_path
def update_binded_variables(self, variables):
""" bind variables to current testcase parser
@param (dict) variables, variables binds mapping
{
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"data": {"name": "user", "password": "123456"},
"uuid": 1000
}
"""
self.variables = variables
def bind_functions(self, functions):
""" bind functions to current testcase parser
@param (dict) functions, functions binds mapping
{
"add_two_nums": lambda a, b=1: a + b
}
"""
self.functions = functions
def get_bind_item(self, item_type, item_name):
if item_type == "function":
if item_name in self.functions:
return self.functions[item_name]
elif item_type == "variable":
if item_name in self.variables:
return self.variables[item_name]
else:
raise exception.ParamsError("bind item should only be function or variable.")
try:
assert self.file_path is not None
return utils.search_conf_item(self.file_path, item_type, item_name)
except (AssertionError, exception.FunctionNotFound):
raise exception.ParamsError(
"{} is not defined in bind {}s!".format(item_name, item_type))
def eval_content_functions(self, content):
functions_list = extract_functions(content)
for func_content in functions_list:
function_meta = parse_function(func_content)
func_name = function_meta['func_name']
func = self.get_bind_item("function", func_name)
args = function_meta.get('args', [])
kwargs = function_meta.get('kwargs', {})
args = self.parse_content_with_bindings(args)
kwargs = self.parse_content_with_bindings(kwargs)
eval_value = func(*args, **kwargs)
func_content = "${" + func_content + "}"
if func_content == content:
# content is a variable
content = eval_value
else:
# content contains one or many variables
content = content.replace(
func_content,
str(eval_value), 1
)
return content
def eval_content_variables(self, content):
""" replace all variables of string content with mapping value.
@param (str) content
@return (str) parsed content
e.g.
variable_mapping = {
"var_1": "abc",
"var_2": "def"
}
$var_1 => "abc"
$var_1#XYZ => "abc#XYZ"
/$var_1/$var_2/var3 => "/abc/def/var3"
${func($var_1, $var_2, xyz)} => "${func(abc, def, xyz)}"
"""
variables_list = extract_variables(content)
for variable_name in variables_list:
variable_value = self.get_bind_item("variable", variable_name)
if "${}".format(variable_name) == content:
# content is a variable
content = variable_value
else:
# content contains one or many variables
content = content.replace(
"${}".format(variable_name),
str(variable_value), 1
)
return content
def parse_content_with_bindings(self, content):
""" parse content recursively, each variable and function in content will be evaluated.
@param (dict) content in any data structure
{
"url": "http://127.0.0.1:5000/api/users/$uid/${add_two_nums(1, 1)}",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "$authorization",
"random": "$random",
"sum": "${add_two_nums(1, 2)}"
},
"body": "$data"
}
@return (dict) parsed content with evaluated bind values
{
"url": "http://127.0.0.1:5000/api/users/1000/2",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"authorization": "a83de0ff8d2e896dbd8efb81ba14e17d",
"random": "A2dEx",
"sum": 3
},
"body": {"name": "user", "password": "123456"}
}
"""
if isinstance(content, (list, tuple)):
return [
self.parse_content_with_bindings(item)
for item in content
]
if isinstance(content, dict):
evaluated_data = {}
for key, value in content.items():
eval_key = self.parse_content_with_bindings(key)
eval_value = self.parse_content_with_bindings(value)
evaluated_data[eval_key] = eval_value
return evaluated_data
if isinstance(content, (int, utils.long_type, float, complex)):
return content
# content is in string format here
content = "" if content is None else content.strip()
# replace functions with evaluated value
# Notice: eval_content_functions must be called before eval_content_variables
content = self.eval_content_functions(content)
# replace variables with binding value
content = self.eval_content_variables(content)
return content

417
httprunner/utils.py Normal file
View File

@@ -0,0 +1,417 @@
import hashlib
import hmac
import imp
import importlib
import logging
import os.path
import random
import re
import string
import types
from collections import OrderedDict
import yaml
from httprunner import exception
from requests.structures import CaseInsensitiveDict
try:
string_type = basestring
long_type = long
PYTHON_VERSION = 2
except NameError:
string_type = str
long_type = int
PYTHON_VERSION = 3
SECRET_KEY = "DebugTalk"
def gen_random_string(str_len):
return ''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(str_len))
def gen_md5(*str_args):
return hashlib.md5("".join(str_args).encode('utf-8')).hexdigest()
def get_sign(*args):
content = ''.join(args).encode('ascii')
sign_key = SECRET_KEY.encode('ascii')
sign = hmac.new(sign_key, content, hashlib.sha1).hexdigest()
return sign
def remove_prefix(text, prefix):
""" remove prefix from text
"""
if text.startswith(prefix):
return text[len(prefix):]
return text
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files in list format.
@param
folder_path: specified folder path to load
recursive: if True, will load files recursively
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def query_json(json_content, query, delimiter='.'):
""" Do an xpath-like query with json_content.
@param (json_content) json_content
json_content = {
"ids": [1, 2, 3, 4],
"person": {
"name": {
"first_name": "Leo",
"last_name": "Lee",
},
"age": 29,
"cities": ["Guangzhou", "Shenzhen"]
}
}
@param (str) query
"person.name.first_name" => "Leo"
"person.cities.0" => "Guangzhou"
@return queried result
"""
if json_content == "":
raise exception.ResponseError("response content is empty!")
try:
for key in query.split(delimiter):
if isinstance(json_content, list):
json_content = json_content[int(key)]
elif isinstance(json_content, (dict, CaseInsensitiveDict)):
json_content = json_content[key]
else:
raise exception.ParseResponseError(
"response content is in text format! failed to query key {}!".format(key))
except (KeyError, ValueError, IndexError):
raise exception.ParseResponseError("failed to query json when extracting response!")
return json_content
def match_expected(value, expected, comparator="eq", check_item=""):
""" check if value matches expected value.
@param value: actual value that get from response.
@param expected: expected result described in testcase
@param comparator: compare method
@param check_item: check item name
"""
try:
if value is None or expected is None:
assert comparator in ["is", "eq", "equals", "=="]
assert value is None
assert expected is None
if comparator in ["eq", "equals", "=="]:
assert value == expected
elif comparator in ["lt", "less_than"]:
assert value < expected
elif comparator in ["le", "less_than_or_equals"]:
assert value <= expected
elif comparator in ["gt", "greater_than"]:
assert value > expected
elif comparator in ["ge", "greater_than_or_equals"]:
assert value >= expected
elif comparator in ["ne", "not_equals"]:
assert value != expected
elif comparator in ["str_eq", "string_equals"]:
assert str(value) == str(expected)
elif comparator in ["len_eq", "length_equals", "count_eq"]:
assert isinstance(expected, int)
assert len(value) == expected
elif comparator in ["len_gt", "count_gt", "length_greater_than", "count_greater_than"]:
assert isinstance(expected, int)
assert len(value) > expected
elif comparator in ["len_ge", "count_ge", "length_greater_than_or_equals", \
"count_greater_than_or_equals"]:
assert isinstance(expected, int)
assert len(value) >= expected
elif comparator in ["len_lt", "count_lt", "length_less_than", "count_less_than"]:
assert isinstance(expected, int)
assert len(value) < expected
elif comparator in ["len_le", "count_le", "length_less_than_or_equals", \
"count_less_than_or_equals"]:
assert isinstance(expected, int)
assert len(value) <= expected
elif comparator in ["contains"]:
assert isinstance(value, (list, tuple, dict, string_type))
assert expected in value
elif comparator in ["contained_by"]:
assert isinstance(expected, (list, tuple, dict, string_type))
assert value in expected
elif comparator in ["type"]:
assert isinstance(value, expected)
elif comparator in ["regex"]:
assert isinstance(expected, string_type)
assert isinstance(value, string_type)
assert re.match(expected, value)
elif comparator in ["startswith"]:
assert str(value).startswith(str(expected))
elif comparator in ["endswith"]:
assert str(value).endswith(str(expected))
else:
raise exception.ParamsError("comparator not supported!")
return True
except (AssertionError, TypeError):
err_msg = "\n".join([
"check item name: %s;" % check_item,
"check item value: %s (%s);" % (value, type(value).__name__),
"comparator: %s;" % comparator,
"expected value: %s (%s)." % (expected, type(expected).__name__)
])
raise exception.ValidationError(err_msg)
def deep_update_dict(origin_dict, override_dict):
""" update origin dict with override dict recursively
e.g. origin_dict = {'a': 1, 'b': {'c': 2, 'd': 4}}
override_dict = {'b': {'c': 3}}
return: {'a': 1, 'b': {'c': 3, 'd': 4}}
"""
for key, val in override_dict.items():
if isinstance(val, dict):
tmp = deep_update_dict(origin_dict.get(key, {}), val)
origin_dict[key] = tmp
else:
origin_dict[key] = override_dict[key]
return origin_dict
def is_function(tup):
""" Takes (name, object) tuple, returns True if it is a function.
"""
name, item = tup
return isinstance(item, types.FunctionType)
def is_variable(tup):
""" Takes (name, object) tuple, returns True if it is a variable.
"""
name, item = tup
if callable(item):
# function or class
return False
if isinstance(item, types.ModuleType):
# imported module
return False
if name.startswith("_"):
# private property
return False
return True
def get_imported_module(module_name):
""" import module and return imported module
"""
return importlib.import_module(module_name)
def get_imported_module_from_file(file_path):
""" import module from python file path and return imported module
"""
if PYTHON_VERSION == 3:
imported_module = importlib.machinery.SourceFileLoader(
'module_name', file_path).load_module()
else:
# Python 2.7
imported_module = imp.load_source('module_name', file_path)
return imported_module
def filter_module(module, filter_type):
""" filter functions or variables from import module
@params
module: imported module
filter_type: "function" or "variable"
"""
filter_type = is_function if filter_type == "function" else is_variable
module_functions_dict = dict(filter(filter_type, vars(module).items()))
return module_functions_dict
def search_conf_item(start_path, item_type, item_name):
""" search expected function or variable recursive upward
@param
start_path: search start path
item_type: "function" or "variable"
item_name: function name or variable name
"""
dir_path = os.path.dirname(os.path.abspath(start_path))
target_file = os.path.join(dir_path, "debugtalk.py")
if os.path.isfile(target_file):
imported_module = get_imported_module_from_file(target_file)
items_dict = filter_module(imported_module, item_type)
if item_name in items_dict:
return items_dict[item_name]
else:
return search_conf_item(dir_path, item_type, item_name)
if dir_path == start_path:
# system root path
err_msg = "{} not found in recursive upward path!".format(item_name)
if item_type == "function":
raise exception.FunctionNotFound(err_msg)
else:
raise exception.VariableNotFound(err_msg)
return search_conf_item(dir_path, item_type, item_name)
def lower_dict_keys(origin_dict):
""" convert keys in dict to lower case
e.g.
Name => name, Request => request
URL => url, METHOD => method, Headers => headers, Data => data
"""
if not origin_dict or not isinstance(origin_dict, dict):
return origin_dict
return {
key.lower(): value
for key, value in origin_dict.items()
}
def lower_config_dict_key(config_dict):
""" convert key in config dict to lower case, convertion will occur in two places:
1, all keys in config dict;
2, all keys in config["request"]
"""
config_dict = lower_dict_keys(config_dict)
if "request" in config_dict:
config_dict["request"] = lower_dict_keys(config_dict["request"])
return config_dict
def convert_to_order_dict(map_list):
""" convert mapping in list to ordered dict
@param (list) map_list
[
{"a": 1},
{"b": 2}
]
@return (OrderDict)
OrderDict({
"a": 1,
"b": 2
})
"""
ordered_dict = OrderedDict()
for map_dict in map_list:
ordered_dict.update(map_dict)
return ordered_dict
def update_ordered_dict(ordered_dict, override_mapping):
""" override ordered_dict with new mapping
@param
(OrderDict) ordered_dict
OrderDict({
"a": 1,
"b": 2
})
(dict) override_mapping
{"a": 3, "c": 4}
@return (OrderDict)
OrderDict({
"a": 3,
"b": 2,
"c": 4
})
"""
for var, value in override_mapping.items():
ordered_dict.update({var: value})
return ordered_dict
def override_variables_binds(variables, new_mapping):
""" convert variables in testcase to ordered mapping, with new_mapping overrided
"""
if isinstance(variables, list):
variables_ordered_dict = convert_to_order_dict(variables)
elif isinstance(variables, OrderedDict):
variables_ordered_dict = variables
else:
raise exception.ParamsError("variables error!")
return update_ordered_dict(
variables_ordered_dict,
new_mapping
)
def print_output(output):
if not output:
return
content = "\n================== Output ==================\n"
content += '{:<16}: {:<}\n'.format("Variable", "Value")
content += '{:<16}: {:<}\n'.format("--------", "-----")
for variable, value in output.items():
if PYTHON_VERSION == 2:
if isinstance(variable, unicode):
variable = variable.encode("utf-8")
if isinstance(value, unicode):
value = value.encode("utf-8")
content += '{:<16}: {:<}\n'.format(variable, value)
content += "============================================\n"
logging.debug(content)
def create_scaffold(project_path):
logging.info(" Start to create new project: {}".format(project_path))
if os.path.isdir(project_path):
folder_name = os.path.basename(project_path)
logging.warning(u" Folder {} exists, please specify a new folder name.".format(folder_name))
return
def create_path(path, ptype):
if ptype == "folder":
os.makedirs(path)
elif ptype == "file":
open(path, 'w').close()
logging.info("\tcreated {}: {}".format(ptype, path))
path_list = [
(project_path, "folder"),
(os.path.join(project_path, "tests"), "folder"),
(os.path.join(project_path, "tests", "api"), "folder"),
(os.path.join(project_path, "tests", "suite"), "folder"),
(os.path.join(project_path, "tests", "testcases"), "folder"),
(os.path.join(project_path, "tests", "debugtalk.py"), "file")
]
[create_path(p[0], p[1]) for p in path_list]