Files
httprunner/httprunner/utils.py

470 lines
13 KiB
Python

# encoding: utf-8
import collections
import copy
import io
import itertools
import json
import os.path
import re
from typing import Union
from loguru import logger
from httprunner import exceptions
from httprunner.exceptions import ParamsError
absolute_http_url_regexp = re.compile(r"^https?://", re.I)
def set_os_environ(variables_mapping):
""" set variables mapping to os.environ
"""
for variable in variables_mapping:
os.environ[variable] = variables_mapping[variable]
logger.debug(f"Set OS environment variable: {variable}")
def unset_os_environ(variables_mapping):
""" set variables mapping to os.environ
"""
for variable in variables_mapping:
os.environ.pop(variable)
logger.debug(f"Unset OS environment variable: {variable}")
def get_os_environ(variable_name):
""" get value of environment variable.
Args:
variable_name(str): variable name
Returns:
value of environment variable.
Raises:
exceptions.EnvNotFound: If environment variable not found.
"""
try:
return os.environ[variable_name]
except KeyError:
raise exceptions.EnvNotFound(variable_name)
def build_url(base_url, path):
""" prepend url with base_url unless it's already an absolute URL """
if absolute_http_url_regexp.match(path):
return path
elif base_url:
return "{}/{}".format(base_url.rstrip("/"), path.lstrip("/"))
else:
raise ParamsError("base url missed!")
def query_json(json_content, query, delimiter='.'):
""" Do an xpath-like query with json_content.
Args:
json_content (dict/list/string): content to be queried.
query (str): query string.
delimiter (str): delimiter symbol.
Returns:
str: queried result.
Examples:
>>> json_content = {
"ids": [1, 2, 3, 4],
"person": {
"name": {
"first_name": "Leo",
"last_name": "Lee",
},
"age": 29,
"cities": ["Guangzhou", "Shenzhen"]
}
}
>>>
>>> query_json(json_content, "person.name.first_name")
>>> Leo
>>>
>>> query_json(json_content, "person.name.first_name.0")
>>> L
>>>
>>> query_json(json_content, "person.cities.0")
>>> Guangzhou
"""
raise_flag = False
response_body = f"response body: {json_content}\n"
try:
for key in query.split(delimiter):
if isinstance(json_content, (list, str, bytes)):
json_content = json_content[int(key)]
elif isinstance(json_content, dict):
json_content = json_content[key]
else:
logger.error(
f"invalid type value: {json_content}({type(json_content)})")
raise_flag = True
except (KeyError, ValueError, IndexError):
raise_flag = True
if raise_flag:
err_msg = f"Failed to extract! => {query}\n"
err_msg += response_body
logger.error(err_msg)
raise exceptions.ExtractFailure(err_msg)
return json_content
def lower_dict_keys(origin_dict):
""" convert keys in dict to lower case
Args:
origin_dict (dict): mapping data structure
Returns:
dict: mapping with all keys lowered.
Examples:
>>> origin_dict = {
"Name": "",
"Request": "",
"URL": "",
"METHOD": "",
"Headers": "",
"Data": ""
}
>>> lower_dict_keys(origin_dict)
{
"name": "",
"request": "",
"url": "",
"method": "",
"headers": "",
"data": ""
}
"""
if not origin_dict or not isinstance(origin_dict, dict):
return origin_dict
return {
key.lower(): value
for key, value in origin_dict.items()
}
def lower_test_dict_keys(test_dict):
""" convert keys in test_dict to lower case, convertion will occur in two places:
1, all keys in test_dict;
2, all keys in test_dict["request"]
"""
# convert keys in test_dict
test_dict = lower_dict_keys(test_dict)
if "request" in test_dict:
# convert keys in test_dict["request"]
test_dict["request"] = lower_dict_keys(test_dict["request"])
return test_dict
def deepcopy_dict(data):
""" deepcopy dict data, ignore file object (_io.BufferedReader)
Args:
data (dict): dict data structure
{
'a': 1,
'b': [2, 4],
'c': lambda x: x+1,
'd': open('LICENSE'),
'f': {
'f1': {'a1': 2},
'f2': io.open('LICENSE', 'rb'),
}
}
Returns:
dict: deep copied dict data, with file object unchanged.
"""
try:
return copy.deepcopy(data)
except TypeError:
copied_data = {}
for key, value in data.items():
if isinstance(value, dict):
copied_data[key] = deepcopy_dict(value)
else:
try:
copied_data[key] = copy.deepcopy(value)
except TypeError:
copied_data[key] = value
return copied_data
def ensure_mapping_format(variables):
""" ensure variables are in mapping format.
Args:
variables (list/dict): original variables
Returns:
dict: ensured variables in dict format
Examples:
>>> variables = [
{"a": 1},
{"b": 2}
]
>>> print(ensure_mapping_format(variables))
{
"a": 1,
"b": 2
}
"""
if isinstance(variables, list):
variables_dict = {}
for map_dict in variables:
variables_dict.update(map_dict)
return variables_dict
elif isinstance(variables, dict):
return variables
else:
raise exceptions.ParamsError("variables format error!")
def extend_variables(raw_variables, override_variables):
""" extend raw_variables with override_variables.
override_variables will merge and override raw_variables.
Args:
raw_variables (list):
override_variables (list):
Returns:
dict: extended variables mapping
Examples:
>>> raw_variables = [{"var1": "val1"}, {"var2": "val2"}]
>>> override_variables = [{"var1": "val111"}, {"var3": "val3"}]
>>> extend_variables(raw_variables, override_variables)
{
'var1', 'val111',
'var2', 'val2',
'var3', 'val3'
}
"""
if not raw_variables:
override_variables_mapping = ensure_mapping_format(override_variables)
return override_variables_mapping
elif not override_variables:
raw_variables_mapping = ensure_mapping_format(raw_variables)
return raw_variables_mapping
else:
raw_variables_mapping = ensure_mapping_format(raw_variables)
override_variables_mapping = ensure_mapping_format(override_variables)
raw_variables_mapping.update(override_variables_mapping)
return raw_variables_mapping
def get_testcase_io(testcase):
""" get and print testcase input(variables) and output(export).
Args:
testcase (unittest.suite.TestSuite): corresponding to one YAML/JSON file, it has been set two attributes:
config: parsed config block
runner: initialized runner.Runner() with config
Returns:
dict: input(variables) and output mapping.
"""
test_runner = testcase.runner
variables = testcase.config.get("variables", {})
output_list = testcase.config.get("export") \
or testcase.config.get("output", [])
export_mapping = test_runner.export_variables(output_list)
return {
"in": variables,
"out": export_mapping
}
def print_info(info_mapping):
""" print info in mapping.
Args:
info_mapping (dict): input(variables) or output mapping.
Examples:
>>> info_mapping = {
"var_a": "hello",
"var_b": "world"
}
>>> info_mapping = {
"status_code": 500
}
>>> print_info(info_mapping)
==================== Output ====================
Key : Value
---------------- : ----------------------------
var_a : hello
var_b : world
------------------------------------------------
"""
if not info_mapping:
return
content_format = "{:<16} : {:<}\n"
content = "\n==================== Output ====================\n"
content += content_format.format("Variable", "Value")
content += content_format.format("-" * 16, "-" * 29)
for key, value in info_mapping.items():
if isinstance(value, (tuple, collections.deque)):
continue
elif isinstance(value, (dict, list)):
value = json.dumps(value)
elif value is None:
value = "None"
content += content_format.format(key, value)
content += "-" * 48 + "\n"
logger.info(content)
def gen_cartesian_product(*args):
""" generate cartesian product for lists
Args:
args (list of list): lists to be generated with cartesian product
Returns:
list: cartesian product in list
Examples:
>>> arg1 = [{"a": 1}, {"a": 2}]
>>> arg2 = [{"x": 111, "y": 112}, {"x": 121, "y": 122}]
>>> args = [arg1, arg2]
>>> gen_cartesian_product(*args)
>>> # same as below
>>> gen_cartesian_product(arg1, arg2)
[
{'a': 1, 'x': 111, 'y': 112},
{'a': 1, 'x': 121, 'y': 122},
{'a': 2, 'x': 111, 'y': 112},
{'a': 2, 'x': 121, 'y': 122}
]
"""
if not args:
return []
elif len(args) == 1:
return args[0]
product_list = []
for product_item_tuple in itertools.product(*args):
product_item_dict = {}
for item in product_item_tuple:
product_item_dict.update(item)
product_list.append(product_item_dict)
return product_list
def omit_long_data(body, omit_len=512):
""" omit too long str/bytes
"""
if not isinstance(body, (str, bytes)):
return body
body_len = len(body)
if body_len <= omit_len:
return body
omitted_body = body[0:omit_len]
appendix_str = f" ... OMITTED {body_len - omit_len} CHARACTORS ..."
if isinstance(body, bytes):
appendix_str = appendix_str.encode("utf-8")
return omitted_body + appendix_str
def dump_json_file(json_data: Union[dict, list], json_file_abs_path: str) -> None:
""" dump json data to file
"""
class PythonObjectEncoder(json.JSONEncoder):
def default(self, obj):
try:
return super().default(self, obj)
except TypeError:
return str(obj)
file_foder_path = os.path.dirname(json_file_abs_path)
if not os.path.isdir(file_foder_path):
os.makedirs(file_foder_path)
try:
with io.open(json_file_abs_path, 'w', encoding='utf-8') as outfile:
json.dump(
json_data,
outfile,
indent=4,
separators=(',', ':'),
ensure_ascii=False,
cls=PythonObjectEncoder
)
msg = f"dump file: {json_file_abs_path}"
logger.info(msg)
except TypeError as ex:
msg = f"Failed to dump json file: {json_file_abs_path}\nReason: {ex}"
logger.error(msg)
def prepare_log_file_abs_path(test_path: str, file_name: str) -> str:
""" prepare dump json file absolute path.
"""
current_working_dir = os.getcwd()
if not test_path:
# running passed in testcase/testsuite data structure
dump_file_name = f"tests_mapping.{file_name}"
dumped_json_file_abs_path = os.path.join(current_working_dir, "logs", dump_file_name)
return dumped_json_file_abs_path
# both test_path and pwd_dir_path are absolute path
logs_dir_path = os.path.join(current_working_dir, "logs")
if os.path.isdir(test_path):
file_foder_path = os.path.join(logs_dir_path, test_path)
dump_file_name = f"all.{file_name}"
else:
file_relative_folder_path, test_file = os.path.split(test_path)
file_foder_path = os.path.join(logs_dir_path, file_relative_folder_path)
test_file_name, _file_suffix = os.path.splitext(test_file)
dump_file_name = f"{test_file_name}.{file_name}"
dumped_json_file_abs_path = os.path.join(file_foder_path, dump_file_name)
return dumped_json_file_abs_path