You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
440 lines
16 KiB
440 lines
16 KiB
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""Converts a mitmproxy dump file to a swagger schema."""
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import traceback
|
|
import urllib
|
|
from typing import Any, Optional, Sequence, Union
|
|
|
|
import msgpack
|
|
import ruamel.yaml
|
|
from mitmproxy.exceptions import FlowReadException
|
|
|
|
from mitmproxy2swagger import console_util, swagger_util
|
|
from mitmproxy2swagger.har_capture_reader import HarCaptureReader, har_archive_heuristic
|
|
from mitmproxy2swagger.mitmproxy_capture_reader import (
|
|
MitmproxyCaptureReader,
|
|
mitmproxy_dump_file_huristic,
|
|
)
|
|
|
|
|
|
def path_to_regex(path):
|
|
# replace the path template with a regex
|
|
path = re.escape(path)
|
|
path = path.replace(r"\{", "(?P<")
|
|
path = path.replace(r"\}", ">[^/]+)")
|
|
path = path.replace(r"\*", ".*")
|
|
return "^" + path + "$"
|
|
|
|
|
|
def strip_query_string(path):
|
|
# remove the query string from the path
|
|
return path.split("?")[0]
|
|
|
|
|
|
def set_key_if_not_exists(dict, key, value):
|
|
if key not in dict:
|
|
dict[key] = value
|
|
|
|
|
|
def progress_callback(progress):
|
|
console_util.print_progress_bar(progress)
|
|
|
|
|
|
def detect_input_format(file_path):
|
|
har_score = har_archive_heuristic(file_path)
|
|
mitmproxy_score = mitmproxy_dump_file_huristic(file_path)
|
|
if "MITMPROXY2SWAGGER_DEBUG" in os.environ:
|
|
print("har score: " + str(har_score))
|
|
print("mitmproxy score: " + str(mitmproxy_score))
|
|
if har_score > mitmproxy_score:
|
|
return HarCaptureReader(file_path, progress_callback)
|
|
return MitmproxyCaptureReader(file_path, progress_callback)
|
|
|
|
|
|
def main(override_args: Optional[Sequence[str]] = None):
|
|
parser = argparse.ArgumentParser(
|
|
description="Converts a mitmproxy dump file or HAR to a swagger schema."
|
|
)
|
|
parser.add_argument(
|
|
"-i",
|
|
"--input",
|
|
help="The input mitmproxy dump file or HAR dump file (from DevTools)",
|
|
required=True,
|
|
)
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
help="The output swagger schema file (yaml). If it exists, new endpoints will be added",
|
|
required=True,
|
|
)
|
|
parser.add_argument("-p", "--api-prefix", help="The api prefix", required=True)
|
|
parser.add_argument(
|
|
"-e",
|
|
"--examples",
|
|
action="store_true",
|
|
help="Include examples in the schema. This might expose sensitive information.",
|
|
)
|
|
parser.add_argument(
|
|
"-hd",
|
|
"--headers",
|
|
action="store_true",
|
|
help="Include headers in the schema. This might expose sensitive information.",
|
|
)
|
|
parser.add_argument(
|
|
"-f",
|
|
"--format",
|
|
choices=["flow", "har"],
|
|
help="Override the input file format auto-detection.",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--param-regex",
|
|
default="[0-9]+",
|
|
help="Regex to match parameters in the API paths. Path segments that match this regex will be turned into parameter placeholders.",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--suppress-params",
|
|
action="store_true",
|
|
help="Do not include API paths that have the original parameter values, only the ones with placeholders.",
|
|
)
|
|
args = parser.parse_args(override_args)
|
|
|
|
try:
|
|
args.param_regex = re.compile("^" + args.param_regex + "$")
|
|
except re.error as e:
|
|
print(
|
|
f"{console_util.ANSI_RED}Invalid path parameter regex: {e}{console_util.ANSI_RESET}"
|
|
)
|
|
sys.exit(1)
|
|
|
|
yaml = ruamel.yaml.YAML()
|
|
|
|
capture_reader: Union[MitmproxyCaptureReader, HarCaptureReader]
|
|
if args.format == "flow" or args.format == "mitmproxy":
|
|
capture_reader = MitmproxyCaptureReader(args.input, progress_callback)
|
|
elif args.format == "har":
|
|
capture_reader = HarCaptureReader(args.input, progress_callback)
|
|
else:
|
|
capture_reader = detect_input_format(args.input)
|
|
|
|
swagger = None
|
|
|
|
# try loading the existing swagger file
|
|
try:
|
|
base_dir = os.getcwd()
|
|
relative_path = args.output
|
|
abs_path = os.path.join(base_dir, relative_path)
|
|
with open(abs_path, "r") as f:
|
|
swagger = yaml.load(f)
|
|
except FileNotFoundError:
|
|
print("No existing swagger file found. Creating new one.")
|
|
if swagger is None:
|
|
swagger = ruamel.yaml.comments.CommentedMap(
|
|
{
|
|
"openapi": "3.0.0",
|
|
"info": {
|
|
"title": args.input + " Mitmproxy2Swagger",
|
|
"version": "1.0.0",
|
|
},
|
|
}
|
|
)
|
|
# strip the trailing slash from the api prefix
|
|
args.api_prefix = args.api_prefix.rstrip("/")
|
|
|
|
if "servers" not in swagger or swagger["servers"] is None:
|
|
swagger["servers"] = []
|
|
|
|
# add the server if it doesn't exist
|
|
if not any(server["url"] == args.api_prefix for server in swagger["servers"]):
|
|
swagger["servers"].append(
|
|
{"url": args.api_prefix, "description": "The default server"}
|
|
)
|
|
|
|
if "paths" not in swagger or swagger["paths"] is None:
|
|
swagger["paths"] = {}
|
|
|
|
if "x-path-templates" not in swagger or swagger["x-path-templates"] is None:
|
|
swagger["x-path-templates"] = []
|
|
|
|
path_templates = []
|
|
for path in swagger["paths"]:
|
|
path_templates.append(path)
|
|
|
|
# also add paths from the the x-path-templates array
|
|
if "x-path-templates" in swagger and swagger["x-path-templates"] is not None:
|
|
for path in swagger["x-path-templates"]:
|
|
path_templates.append(path)
|
|
|
|
# new endpoints will be added here so that they can be added as comments in the swagger file
|
|
new_path_templates = []
|
|
path_template_regexes = [re.compile(path_to_regex(path)) for path in path_templates]
|
|
|
|
try:
|
|
for req in capture_reader.captured_requests():
|
|
# strip the api prefix from the url
|
|
url = req.get_matching_url(args.api_prefix)
|
|
|
|
if url is None:
|
|
continue
|
|
method = req.get_method().lower()
|
|
path = strip_query_string(url).removeprefix(args.api_prefix)
|
|
status = req.get_response_status_code()
|
|
|
|
# check if the path matches any of the path templates, and save the index
|
|
path_template_index = None
|
|
for i, path_template_regex in enumerate(path_template_regexes):
|
|
if path_template_regex.match(path):
|
|
path_template_index = i
|
|
break
|
|
if path_template_index is None:
|
|
if path in new_path_templates:
|
|
continue
|
|
new_path_templates.append(path)
|
|
continue
|
|
|
|
path_template_to_set = path_templates[path_template_index]
|
|
set_key_if_not_exists(swagger["paths"], path_template_to_set, {})
|
|
|
|
set_key_if_not_exists(
|
|
swagger["paths"][path_template_to_set],
|
|
method,
|
|
{
|
|
"summary": swagger_util.path_template_to_endpoint_name(
|
|
method, path_template_to_set
|
|
),
|
|
"responses": {},
|
|
},
|
|
)
|
|
|
|
params = swagger_util.url_to_params(url, path_template_to_set)
|
|
if args.headers:
|
|
headers_request = swagger_util.request_to_headers(
|
|
req.get_request_headers()
|
|
)
|
|
if headers_request is not None and len(headers_request) > 0:
|
|
set_key_if_not_exists(
|
|
swagger["paths"][path_template_to_set][method],
|
|
"parameters",
|
|
headers_request,
|
|
)
|
|
if params is not None and len(params) > 0:
|
|
set_key_if_not_exists(
|
|
swagger["paths"][path_template_to_set][method], "parameters", params
|
|
)
|
|
|
|
if method not in ["get", "head"]:
|
|
body = req.get_request_body()
|
|
if body is not None:
|
|
body_val = None
|
|
content_type = None
|
|
# try to parse the body as json
|
|
try:
|
|
body_val = json.loads(req.get_request_body())
|
|
content_type = "application/json"
|
|
except UnicodeDecodeError:
|
|
pass
|
|
except json.decoder.JSONDecodeError:
|
|
pass
|
|
|
|
# try to parse the body as msgpack, if it's not json
|
|
if body_val is None:
|
|
try:
|
|
body_val = msgpack.loads(req.get_request_body())
|
|
content_type = "application/msgpack"
|
|
except Exception:
|
|
pass
|
|
|
|
if content_type is None:
|
|
# try to parse the body as form data
|
|
try:
|
|
body_val_bytes: Any = dict(
|
|
urllib.parse.parse_qsl(
|
|
body, encoding="utf-8", keep_blank_values=True
|
|
)
|
|
)
|
|
body_val = {}
|
|
did_find_anything = False
|
|
for key, value in body_val_bytes.items():
|
|
did_find_anything = True
|
|
body_val[key.decode("utf-8")] = value.decode("utf-8")
|
|
if did_find_anything:
|
|
content_type = "application/x-www-form-urlencoded"
|
|
else:
|
|
body_val = None
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
if body_val is not None:
|
|
content_to_set = {
|
|
"content": {
|
|
content_type: {
|
|
"schema": swagger_util.value_to_schema(body_val)
|
|
}
|
|
}
|
|
}
|
|
if args.examples:
|
|
content_to_set["content"][content_type]["example"] = (
|
|
swagger_util.limit_example_size(body_val)
|
|
)
|
|
set_key_if_not_exists(
|
|
swagger["paths"][path_template_to_set][method],
|
|
"requestBody",
|
|
content_to_set,
|
|
)
|
|
|
|
response_body = req.get_response_body()
|
|
if response_body is not None:
|
|
# try parsing the response as json
|
|
try:
|
|
response_parsed = json.loads(response_body)
|
|
response_content_type = "application/json"
|
|
except UnicodeDecodeError:
|
|
response_parsed = None
|
|
except json.decoder.JSONDecodeError:
|
|
response_parsed = None
|
|
|
|
if response_parsed is None:
|
|
# try parsing the response as msgpack, if it's not json
|
|
try:
|
|
response_parsed = msgpack.loads(response_body)
|
|
response_content_type = "application/msgpack"
|
|
except Exception:
|
|
response_parsed = None
|
|
|
|
if response_parsed is not None:
|
|
resp_data_to_set = {
|
|
"description": req.get_response_reason(),
|
|
"content": {
|
|
response_content_type: {
|
|
"schema": swagger_util.value_to_schema(response_parsed)
|
|
}
|
|
},
|
|
}
|
|
if args.examples:
|
|
resp_data_to_set["content"][response_content_type][
|
|
"example"
|
|
] = swagger_util.limit_example_size(response_parsed)
|
|
if args.headers:
|
|
resp_data_to_set["headers"] = swagger_util.response_to_headers(
|
|
req.get_response_headers()
|
|
)
|
|
|
|
set_key_if_not_exists(
|
|
swagger["paths"][path_template_to_set][method]["responses"],
|
|
str(status),
|
|
resp_data_to_set,
|
|
)
|
|
|
|
if (
|
|
"responses" in swagger["paths"][path_template_to_set][method]
|
|
and len(swagger["paths"][path_template_to_set][method]["responses"])
|
|
== 0
|
|
):
|
|
# add a default response if there were no responses detected,
|
|
# this is for compliance with the OpenAPI spec
|
|
content_type = (
|
|
req.get_response_headers().get("content-type") or "text/plain"
|
|
)
|
|
|
|
swagger["paths"][path_template_to_set][method]["responses"]["200"] = {
|
|
"description": "OK",
|
|
"content": {},
|
|
}
|
|
|
|
except FlowReadException as e:
|
|
print(f"Flow file corrupted: {e}")
|
|
traceback.print_exception(*sys.exc_info())
|
|
print(
|
|
f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. "
|
|
)
|
|
if not args.format:
|
|
print(
|
|
f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}"
|
|
)
|
|
sys.exit(1)
|
|
except ValueError as e:
|
|
print(f"ValueError: {e}")
|
|
# print stack trace
|
|
traceback.print_exception(*sys.exc_info())
|
|
print(
|
|
f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. "
|
|
)
|
|
if not args.format:
|
|
print(
|
|
f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}"
|
|
)
|
|
sys.exit(1)
|
|
|
|
def is_param(param_value):
|
|
return args.param_regex.match(param_value) is not None
|
|
|
|
new_path_templates.sort()
|
|
|
|
# add suggested path templates
|
|
# basically inspects urls and replaces segments containing only numbers with a parameter
|
|
new_path_templates_with_suggestions = []
|
|
for path in new_path_templates:
|
|
# check if path contains number-only segments
|
|
segments = path.split("/")
|
|
has_param = any(is_param(segment) for segment in segments)
|
|
if has_param:
|
|
# replace digit segments with {id}, {id1}, {id2} etc
|
|
new_segments = []
|
|
param_id = 0
|
|
for segment in segments:
|
|
if is_param(segment):
|
|
param_name = "id" + str(param_id)
|
|
if param_id == 0:
|
|
param_name = "id"
|
|
new_segments.append("{" + param_name + "}")
|
|
param_id += 1
|
|
else:
|
|
new_segments.append(segment)
|
|
suggested_path = "/".join(new_segments)
|
|
# prepend the suggested path to the new_path_templates list
|
|
if suggested_path not in new_path_templates_with_suggestions:
|
|
new_path_templates_with_suggestions.append("ignore:" + suggested_path)
|
|
|
|
if not has_param or not args.suppress_params:
|
|
new_path_templates_with_suggestions.append("ignore:" + path)
|
|
|
|
# remove the ending comments not to add them twice
|
|
|
|
# append the contents of new_path_templates_with_suggestions to swagger['x-path-templates']
|
|
for path in new_path_templates_with_suggestions:
|
|
swagger["x-path-templates"].append(path)
|
|
|
|
# remove elements already generated
|
|
swagger["x-path-templates"] = [
|
|
path for path in swagger["x-path-templates"] if path not in swagger["paths"]
|
|
]
|
|
|
|
# remove duplicates while preserving order
|
|
def f7(seq):
|
|
seen = set()
|
|
seen_add = seen.add
|
|
return [x for x in seq if not (x in seen or seen_add(x))]
|
|
|
|
swagger["x-path-templates"] = f7(swagger["x-path-templates"])
|
|
|
|
swagger["x-path-templates"] = ruamel.yaml.comments.CommentedSeq(
|
|
swagger["x-path-templates"]
|
|
)
|
|
swagger["x-path-templates"].yaml_set_start_comment(
|
|
"Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy"
|
|
)
|
|
# save the swagger file
|
|
with open(args.output, "w") as f:
|
|
yaml.dump(swagger, f)
|
|
print("Done!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|