Files
httprunner/httprunner/loader.py
2018-08-13 00:29:49 +08:00

956 lines
28 KiB
Python

import collections
import csv
import importlib
import io
import json
import os
import yaml
from httprunner import exceptions, logger, parser, validator
from httprunner.compat import OrderedDict
project_mapping = {
"debugtalk": {},
"env": {},
"def-api": {},
"def-testcase": {}
}
""" dict: save project loaded api/testcases definitions, environments and debugtalk.py module.
"""
testcases_cache_mapping = {}
###############################################################################
## file loader
###############################################################################
def _check_format(file_path, content):
""" check testcase format if valid
"""
# TODO: replace with JSON schema validation
if not content:
# testcase file content is empty
err_msg = u"Testcase file content is empty: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
elif not isinstance(content, (list, dict)):
# testcase file content does not match testcase format
err_msg = u"Testcase file content format invalid: {}".format(file_path)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
def load_yaml_file(yaml_file):
""" load yaml file and check file content format
"""
with io.open(yaml_file, 'r', encoding='utf-8') as stream:
yaml_content = yaml.load(stream)
_check_format(yaml_file, yaml_content)
return yaml_content
def load_json_file(json_file):
""" load json file and check file content format
"""
with io.open(json_file, encoding='utf-8') as data_file:
try:
json_content = json.load(data_file)
except exceptions.JSONDecodeError:
err_msg = u"JSONDecodeError: JSON file format error: {}".format(json_file)
logger.log_error(err_msg)
raise exceptions.FileFormatError(err_msg)
_check_format(json_file, json_content)
return json_content
def load_csv_file(csv_file):
""" load csv file and check file content format
@param
csv_file: csv file path
e.g. csv file content:
username,password
test1,111111
test2,222222
test3,333333
@return
list of parameter, each parameter is in dict format
e.g.
[
{'username': 'test1', 'password': '111111'},
{'username': 'test2', 'password': '222222'},
{'username': 'test3', 'password': '333333'}
]
"""
csv_content_list = []
with io.open(csv_file, encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
csv_content_list.append(row)
return csv_content_list
def load_file(file_path):
if not os.path.isfile(file_path):
raise exceptions.FileNotFound("{} does not exist.".format(file_path))
file_suffix = os.path.splitext(file_path)[1].lower()
if file_suffix == '.json':
return load_json_file(file_path)
elif file_suffix in ['.yaml', '.yml']:
return load_yaml_file(file_path)
elif file_suffix == ".csv":
return load_csv_file(file_path)
else:
# '' or other suffix
err_msg = u"Unsupported file format: {}".format(file_path)
logger.log_warning(err_msg)
return []
def load_folder_files(folder_path, recursive=True):
""" load folder path, return all files endswith yml/yaml/json in list.
Args:
folder_path (str): specified folder path to load
recursive (bool): load files recursively if True
Returns:
list: files endswith yml/yaml/json
"""
if isinstance(folder_path, (list, set)):
files = []
for path in set(folder_path):
files.extend(load_folder_files(path, recursive))
return files
if not os.path.exists(folder_path):
return []
file_list = []
for dirpath, dirnames, filenames in os.walk(folder_path):
filenames_list = []
for filename in filenames:
if not filename.endswith(('.yml', '.yaml', '.json')):
continue
filenames_list.append(filename)
for filename in filenames_list:
file_path = os.path.join(dirpath, filename)
file_list.append(file_path)
if not recursive:
break
return file_list
def load_dot_env_file(path):
""" load .env file
Args:
path (str): .env file path.
If path is None, it will find .env file in current working directory.
Returns:
dict: environment variables mapping
{
"UserName": "debugtalk",
"Password": "123456",
"PROJECT_KEY": "ABCDEFGH"
}
Raises:
exceptions.FileNotFound: If specified env file is not exist.
exceptions.FileFormatError: If env file format is invalid.
"""
if not path:
path = os.path.join(os.getcwd(), ".env")
if not os.path.isfile(path):
logger.log_debug(".env file not exist: {}".format(path))
return {}
else:
if not os.path.isfile(path):
raise exceptions.FileNotFound("env file not exist: {}".format(path))
logger.log_info("Loading environment variables from {}".format(path))
env_variables_mapping = {}
with io.open(path, 'r', encoding='utf-8') as fp:
for line in fp:
if "=" in line:
variable, value = line.split("=")
elif ":" in line:
variable, value = line.split(":")
else:
raise exceptions.FileFormatError(".env format error")
env_variables_mapping[variable.strip()] = value.strip()
project_mapping["env"] = env_variables_mapping
return env_variables_mapping
def locate_file(start_path, file_name):
""" locate filename and return file path.
searching will be recursive upward until current working directory.
Args:
start_path (str): start locating path, maybe file path or directory path
Returns:
str: located file path. None if file not found.
Raises:
exceptions.FileNotFound: If failed to locate file.
"""
if os.path.isfile(start_path):
start_dir_path = os.path.dirname(start_path)
elif os.path.isdir(start_path):
start_dir_path = start_path
else:
raise exceptions.FileNotFound("invalid path: {}".format(start_path))
file_path = os.path.join(start_dir_path, file_name)
if os.path.isfile(file_path):
if os.path.isabs(file_path):
file_path = file_path[len(os.getcwd())+1:]
return file_path
# current working directory
if os.path.abspath(start_dir_path) == os.getcwd():
raise exceptions.FileNotFound("{} not found in {}".format(file_name, start_path))
# locate recursive upward
return locate_file(os.path.dirname(start_dir_path), file_name)
###############################################################################
## debugtalk.py module loader
###############################################################################
def convert_module_name(python_file_path):
""" convert python file relative path to module name.
Args:
python_file_path (str): python file relative path
Returns:
str: module name
Examples:
>>> convert_module_name("debugtalk.py")
debugtalk
>>> convert_module_name("tests/debugtalk.py")
tests.debugtalk
>>> convert_module_name("tests/data/debugtalk.py")
tests.data.debugtalk
"""
module_name = python_file_path.replace("/", ".").rstrip(".py")
return module_name
def load_python_module(module):
""" load python module.
Args:
module: python module
Returns:
dict: variables and functions mapping for specified python module
{
"variables": {},
"functions": {}
}
"""
debugtalk_module = {
"variables": {},
"functions": {}
}
for name, item in vars(module).items():
if validator.is_function((name, item)):
debugtalk_module["functions"][name] = item
elif validator.is_variable((name, item)):
debugtalk_module["variables"][name] = item
else:
pass
return debugtalk_module
def load_debugtalk_module(start_path=None):
""" load debugtalk.py module.
Args:
start_path (str, optional): start locating path, maybe file path or directory path.
Defaults to current working directory.
Returns:
dict: variables and functions mapping for debugtalk.py
{
"variables": {},
"functions": {}
}
"""
start_path = start_path or os.getcwd()
try:
module_path = locate_file(start_path, "debugtalk.py")
module_name = convert_module_name(module_path)
except exceptions.FileNotFound:
return {
"variables": {},
"functions": {}
}
imported_module = importlib.import_module(module_name)
loaded_module = load_python_module(imported_module)
project_mapping["debugtalk"] = loaded_module
return loaded_module
def get_module_item(module_mapping, item_type, item_name):
""" get expected function or variable from module mapping.
Args:
module_mapping(dict): module mapping with variables and functions.
{
"variables": {},
"functions": {}
}
item_type(str): "functions" or "variables"
item_name(str): function name or variable name
Returns:
object: specified variable or function object.
Raises:
exceptions.FunctionNotFound: If specified function not found in module mapping
exceptions.VariableNotFound: If specified variable not found in module mapping
"""
try:
return module_mapping[item_type][item_name]
except KeyError:
err_msg = "{} not found in debugtalk.py module!\n".format(item_name)
err_msg += "module mapping: {}".format(module_mapping)
if item_type == "functions":
raise exceptions.FunctionNotFound(err_msg)
else:
raise exceptions.VariableNotFound(err_msg)
###############################################################################
## testcase loader
###############################################################################
def _load_test_file(file_path):
""" load testcase file or testsuite file
Args:
file_path (str): absolute valid file path. file_path should be in the following format:
[
{
"config": {
"name": "",
"def": "suite_order()",
"request": {}
}
},
{
"test": {
"name": "add product to cart",
"api": "api_add_cart()",
"validate": []
}
},
{
"test": {
"name": "add product to cart",
"suite": "create_and_check()",
"validate": []
}
},
{
"test": {
"name": "checkout cart",
"request": {},
"validate": []
}
}
]
Returns:
dict: testcase dict
{
"config": {},
"teststeps": [teststep11, teststep12]
}
"""
testcase = {
"config": {
"path": file_path
},
"teststeps": []
}
for item in load_file(file_path):
# TODO: add json schema validation
if not isinstance(item, dict) or len(item) != 1:
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
key, test_block = item.popitem()
if not isinstance(test_block, dict):
raise exceptions.FileFormatError("Testcase format error: {}".format(file_path))
if key == "config":
testcase["config"].update(test_block)
elif key == "test":
def extend_api_definition(block):
ref_call = block["api"]
def_block = _get_block_by_name(ref_call, "def-api")
_extend_block(block, def_block)
# reference api
if "api" in test_block:
extend_api_definition(test_block)
testcase["teststeps"].append(test_block)
# reference testcase
elif "suite" in test_block: # TODO: replace suite with testcase
ref_call = test_block["suite"]
block = _get_block_by_name(ref_call, "def-testcase")
# TODO: bugfix lost block config variables
for teststep in block["teststeps"]:
if "api" in teststep:
extend_api_definition(teststep)
testcase["teststeps"].append(teststep)
# define directly
else:
testcase["teststeps"].append(test_block)
else:
logger.log_warning(
"unexpected block key: {}. block key should only be 'config' or 'test'.".format(key)
)
return testcase
def _get_block_by_name(ref_call, ref_type):
""" get test content by reference name.
Args:
ref_call (str): call function.
e.g. api_v1_Account_Login_POST($UserName, $Password)
ref_type (enum): "def-api" or "def-testcase"
Returns:
dict: api/testcase definition.
Raises:
exceptions.ParamsError: call args number is not equal to defined args number.
"""
function_meta = parser.parse_function(ref_call)
func_name = function_meta["func_name"]
call_args = function_meta["args"]
block = _get_test_definition(func_name, ref_type)
def_args = block.get("function_meta", {}).get("args", [])
if len(call_args) != len(def_args):
err_msg = "{}: call args number is not equal to defined args number!\n".format(func_name)
err_msg += "defined args: {}\n".format(def_args)
err_msg += "reference args: {}".format(call_args)
logger.log_error(err_msg)
raise exceptions.ParamsError(err_msg)
args_mapping = {}
for index, item in enumerate(def_args):
if call_args[index] == item:
continue
args_mapping[item] = call_args[index]
if args_mapping:
block = parser.substitute_variables(block, args_mapping)
return block
def _get_test_definition(name, ref_type):
""" get expected api or testcase.
Args:
name (str): api or testcase name
ref_type (enum): "def-api" or "def-testcase"
Returns:
dict: expected api/testcase info if found.
Raises:
exceptions.ApiNotFound: api not found
exceptions.TestcaseNotFound: testcase not found
"""
block = project_mapping.get(ref_type, {}).get(name)
if not block:
err_msg = "{} not found!".format(name)
if ref_type == "def-api":
raise exceptions.ApiNotFound(err_msg)
else:
# ref_type == "def-testcase":
raise exceptions.TestcaseNotFound(err_msg)
return block
def _extend_block(ref_block, def_block):
""" extend ref_block with def_block.
Args:
def_block (dict): api definition dict.
ref_block (dict): reference block
Returns:
dict: extended reference block.
Examples:
>>> def_block = {
"name": "get token 1",
"request": {...},
"validate": [{'eq': ['status_code', 200]}]
}
>>> ref_block = {
"name": "get token 2",
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
>>> _extend_block(def_block, ref_block)
{
"name": "get token 2",
"request": {...},
"extract": [{"token": "content.token"}],
"validate": [{'eq': ['status_code', 201]}, {'len_eq': ['content.token', 16]}]
}
"""
# TODO: override variables
def_validators = def_block.get("validate") or def_block.get("validators", [])
ref_validators = ref_block.get("validate") or ref_block.get("validators", [])
def_extrators = def_block.get("extract") \
or def_block.get("extractors") \
or def_block.get("extract_binds", [])
ref_extractors = ref_block.get("extract") \
or ref_block.get("extractors") \
or ref_block.get("extract_binds", [])
ref_block.update(def_block)
ref_block["validate"] = _merge_validator(
def_validators,
ref_validators
)
ref_block["extract"] = _merge_extractor(
def_extrators,
ref_extractors
)
def _convert_validators_to_mapping(validators):
""" convert validators list to mapping.
Args:
validators (list): validators in list
Returns:
dict: validators mapping, use (check, comparator) as key.
Examples:
>>> validators = [
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": {"b": 1}, "expect": 200, "comparator": "eq"}
]
>>> _convert_validators_to_mapping(validators)
{
("v1", "eq"): {"check": "v1", "expect": 201, "comparator": "eq"},
('{"b": 1}', "eq"): {"check": {"b": 1}, "expect": 200, "comparator": "eq"}
}
"""
validators_mapping = {}
for validator in validators:
validator = parser.parse_validator(validator)
if not isinstance(validator["check"], collections.Hashable):
check = json.dumps(validator["check"])
else:
check = validator["check"]
key = (check, validator["comparator"])
validators_mapping[key] = validator
return validators_mapping
def _merge_validator(def_validators, ref_validators):
""" merge def_validators with ref_validators.
Args:
def_validators (list):
ref_validators (list):
Returns:
list: merged validators
Examples:
>>> def_validators = [{'eq': ['v1', 200]}, {"check": "s2", "expect": 16, "comparator": "len_eq"}]
>>> ref_validators = [{"check": "v1", "expect": 201}, {'len_eq': ['s3', 12]}]
>>> _merge_validator(def_validators, ref_validators)
[
{"check": "v1", "expect": 201, "comparator": "eq"},
{"check": "s2", "expect": 16, "comparator": "len_eq"},
{"check": "s3", "expect": 12, "comparator": "len_eq"}
]
"""
if not def_validators:
return ref_validators
elif not ref_validators:
return def_validators
else:
def_validators_mapping = _convert_validators_to_mapping(def_validators)
ref_validators_mapping = _convert_validators_to_mapping(ref_validators)
def_validators_mapping.update(ref_validators_mapping)
return list(def_validators_mapping.values())
def _merge_extractor(def_extrators, ref_extractors):
""" merge def_extrators with ref_extractors
Args:
def_extrators (list): [{"var1": "val1"}, {"var2": "val2"}]
ref_extractors (list): [{"var1": "val111"}, {"var3": "val3"}]
Returns:
list: merged extractors
Examples:
>>> def_extrators = [{"var1": "val1"}, {"var2": "val2"}]
>>> ref_extractors = [{"var1": "val111"}, {"var3": "val3"}]
>>> _merge_extractor(def_extrators, ref_extractors)
[
{"var1": "val111"},
{"var2": "val2"},
{"var3": "val3"}
]
"""
if not def_extrators:
return ref_extractors
elif not ref_extractors:
return def_extrators
else:
extractor_dict = OrderedDict()
for api_extrator in def_extrators:
if len(api_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(api_extrator))
continue
var_name = list(api_extrator.keys())[0]
extractor_dict[var_name] = api_extrator[var_name]
for test_extrator in ref_extractors:
if len(test_extrator) != 1:
logger.log_warning("incorrect extractor: {}".format(test_extrator))
continue
var_name = list(test_extrator.keys())[0]
extractor_dict[var_name] = test_extrator[var_name]
extractor_list = []
for key, value in extractor_dict.items():
extractor_list.append({key: value})
return extractor_list
def load_folder_content(folder_path):
""" load api/testcases/testsuites definitions from folder.
Args:
folder_path (str): api/testcases/testsuites files folder.
Returns:
dict: api definition mapping.
{
"tests/api/basic.yml": [
{"api": {"def": "api_login", "request": {}, "validate": []}},
{"api": {"def": "api_logout", "request": {}, "validate": []}}
]
}
"""
items_mapping = {}
for file_path in load_folder_files(folder_path):
items_mapping[file_path] = load_file(file_path)
return items_mapping
def load_api_folder(api_folder_path=None):
""" load api definitions from api folder.
Args:
api_folder_path (str): api files folder.
api file should be in the following format:
[
{
"api": {
"def": "api_login",
"request": {},
"validate": []
}
},
{
"api": {
"def": "api_logout",
"request": {},
"validate": []
}
}
]
Returns:
dict: api definition mapping.
{
"api_login": {
"function_meta": {"func_name": "api_login", "args": [], "kwargs": {}}
"request": {}
},
"api_logout": {
"function_meta": {"func_name": "api_logout", "args": [], "kwargs": {}}
"request": {}
}
}
"""
api_definition_mapping = {}
api_folder_path = api_folder_path or os.path.join(os.getcwd(), "api")
api_items_mapping = load_folder_content(api_folder_path)
for api_file_path, api_items in api_items_mapping.items():
# TODO: add JSON schema validation
for api_item in api_items:
key, api_dict = api_item.popitem()
api_def = api_dict.pop("def")
function_meta = parser.parse_function(api_def)
func_name = function_meta["func_name"]
if func_name in api_definition_mapping:
logger.log_warning("API definition duplicated: {}".format(func_name))
api_dict["function_meta"] = function_meta
api_definition_mapping[func_name] = api_dict
project_mapping["def-api"] = api_definition_mapping
return api_definition_mapping
def load_test_folder(test_folder_path=None):
""" load testcases definitions from folder.
Args:
test_folder_path (str): testcases files folder.
testcase file should be in the following format:
[
{
"config": {
"def": "create_and_check",
"request": {},
"validate": []
}
},
{
"test": {
"api": "get_user",
"validate": []
}
}
]
Returns:
dict: testcases definition mapping.
{
"create_and_check": [
{"config": {}},
{"test": {}},
{"test": {}}
],
"tests/testcases/create_and_get.yml": [
{"config": {}},
{"test": {}},
{"test": {}}
]
}
"""
test_definition_mapping = {}
# TODO: replace suite with testcases
test_folder_path = test_folder_path or os.path.join(os.getcwd(), "suite")
test_items_mapping = load_folder_content(test_folder_path)
for test_file_path, items in test_items_mapping.items():
# TODO: add JSON schema validation
testcase = {
"config": {
"path": test_file_path
},
"teststeps": []
}
for item in items:
key, block = item.popitem()
if key == "config":
testcase["config"].update(block)
if "def" not in block:
test_definition_mapping[test_file_path] = testcase
continue
testcase_def = block.pop("def")
function_meta = parser.parse_function(testcase_def)
func_name = function_meta["func_name"]
if func_name in test_definition_mapping:
logger.log_warning("API definition duplicated: {}".format(func_name))
testcase["function_meta"] = function_meta
test_definition_mapping[func_name] = testcase
else:
# key == "test":
testcase["teststeps"].append(block)
project_mapping["def-testcase"] = test_definition_mapping
return test_definition_mapping
def load_project_tests(folder_path):
""" load api, testcases and debugtalk.py module.
Args:
folder_path (str): folder path.
Returns:
dict: project tests mapping.
"""
load_debugtalk_module(folder_path)
load_api_folder(os.path.join(folder_path, "api"))
load_test_folder(os.path.join(folder_path, "suite"))
return project_mapping
def load_testcases(path):
""" load testcases from file path
Args:
path (str): testcase file/foler path.
path could be in several types:
- absolute/relative file path
- absolute/relative folder path
- list/set container with file(s) and/or folder(s)
Returns:
list: testcases list, each testcase is corresponding to a file
[
testcase_dict_1,
testcase_dict_2
]
"""
if isinstance(path, (list, set)):
testcases_list = []
for file_path in set(path):
testcases = load_testcases(file_path)
if not testcases:
continue
testcases_list.extend(testcases)
return testcases_list
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
if path in testcases_cache_mapping:
return testcases_cache_mapping[path]
if os.path.isdir(path):
files_list = load_folder_files(path)
testcases_list = load_testcases(files_list)
elif os.path.isfile(path):
try:
testcase = _load_test_file(path)
if testcase["teststeps"]:
testcases_list = [testcase]
else:
testcases_list = []
except exceptions.FileFormatError:
testcases_list = []
else:
err_msg = "path not exist: {}".format(path)
logger.log_error(err_msg)
raise exceptions.FileNotFound(err_msg)
testcases_cache_mapping[path] = testcases_list
return testcases_list