You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitmproxy2swagger/mitmproxy2swagger/mitmproxy2swagger.py

440 lines
16 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Converts a mitmproxy dump file to a swagger schema."""
import argparse
import json
import os
import re
import sys
import traceback
import urllib
from typing import Any, Optional, Sequence, Union
import msgpack
import ruamel.yaml
from mitmproxy.exceptions import FlowReadException
from mitmproxy2swagger import console_util, swagger_util
from mitmproxy2swagger.har_capture_reader import HarCaptureReader, har_archive_heuristic
from mitmproxy2swagger.mitmproxy_capture_reader import (
MitmproxyCaptureReader,
mitmproxy_dump_file_huristic,
)
def path_to_regex(path):
# replace the path template with a regex
path = re.escape(path)
path = path.replace(r"\{", "(?P<")
path = path.replace(r"\}", ">[^/]+)")
path = path.replace(r"\*", ".*")
return "^" + path + "$"
def strip_query_string(path):
# remove the query string from the path
return path.split("?")[0]
def set_key_if_not_exists(dict, key, value):
if key not in dict:
dict[key] = value
def progress_callback(progress):
console_util.print_progress_bar(progress)
def detect_input_format(file_path):
har_score = har_archive_heuristic(file_path)
mitmproxy_score = mitmproxy_dump_file_huristic(file_path)
if "MITMPROXY2SWAGGER_DEBUG" in os.environ:
print("har score: " + str(har_score))
print("mitmproxy score: " + str(mitmproxy_score))
if har_score > mitmproxy_score:
return HarCaptureReader(file_path, progress_callback)
return MitmproxyCaptureReader(file_path, progress_callback)
def main(override_args: Optional[Sequence[str]] = None):
parser = argparse.ArgumentParser(
description="Converts a mitmproxy dump file or HAR to a swagger schema."
)
parser.add_argument(
"-i",
"--input",
help="The input mitmproxy dump file or HAR dump file (from DevTools)",
required=True,
)
parser.add_argument(
"-o",
"--output",
help="The output swagger schema file (yaml). If it exists, new endpoints will be added",
required=True,
)
parser.add_argument("-p", "--api-prefix", help="The api prefix", required=True)
parser.add_argument(
"-e",
"--examples",
action="store_true",
help="Include examples in the schema. This might expose sensitive information.",
)
parser.add_argument(
"-hd",
"--headers",
action="store_true",
help="Include headers in the schema. This might expose sensitive information.",
)
parser.add_argument(
"-f",
"--format",
choices=["flow", "har"],
help="Override the input file format auto-detection.",
)
parser.add_argument(
"-r",
"--param-regex",
default="[0-9]+",
help="Regex to match parameters in the API paths. Path segments that match this regex will be turned into parameter placeholders.",
)
parser.add_argument(
"-s",
"--suppress-params",
action="store_true",
help="Do not include API paths that have the original parameter values, only the ones with placeholders.",
)
args = parser.parse_args(override_args)
try:
args.param_regex = re.compile("^" + args.param_regex + "$")
except re.error as e:
print(
f"{console_util.ANSI_RED}Invalid path parameter regex: {e}{console_util.ANSI_RESET}"
)
sys.exit(1)
yaml = ruamel.yaml.YAML()
capture_reader: Union[MitmproxyCaptureReader, HarCaptureReader]
if args.format == "flow" or args.format == "mitmproxy":
capture_reader = MitmproxyCaptureReader(args.input, progress_callback)
elif args.format == "har":
capture_reader = HarCaptureReader(args.input, progress_callback)
else:
capture_reader = detect_input_format(args.input)
swagger = None
# try loading the existing swagger file
try:
base_dir = os.getcwd()
relative_path = args.output
abs_path = os.path.join(base_dir, relative_path)
with open(abs_path, "r") as f:
swagger = yaml.load(f)
except FileNotFoundError:
print("No existing swagger file found. Creating new one.")
if swagger is None:
swagger = ruamel.yaml.comments.CommentedMap(
{
"openapi": "3.0.0",
"info": {
"title": args.input + " Mitmproxy2Swagger",
"version": "1.0.0",
},
}
)
# strip the trailing slash from the api prefix
args.api_prefix = args.api_prefix.rstrip("/")
if "servers" not in swagger or swagger["servers"] is None:
swagger["servers"] = []
# add the server if it doesn't exist
if not any(server["url"] == args.api_prefix for server in swagger["servers"]):
swagger["servers"].append(
{"url": args.api_prefix, "description": "The default server"}
)
if "paths" not in swagger or swagger["paths"] is None:
swagger["paths"] = {}
if "x-path-templates" not in swagger or swagger["x-path-templates"] is None:
swagger["x-path-templates"] = []
path_templates = []
for path in swagger["paths"]:
path_templates.append(path)
# also add paths from the the x-path-templates array
if "x-path-templates" in swagger and swagger["x-path-templates"] is not None:
for path in swagger["x-path-templates"]:
path_templates.append(path)
# new endpoints will be added here so that they can be added as comments in the swagger file
new_path_templates = []
path_template_regexes = [re.compile(path_to_regex(path)) for path in path_templates]
try:
for req in capture_reader.captured_requests():
# strip the api prefix from the url
url = req.get_matching_url(args.api_prefix)
if url is None:
continue
method = req.get_method().lower()
path = strip_query_string(url).removeprefix(args.api_prefix)
status = req.get_response_status_code()
# check if the path matches any of the path templates, and save the index
path_template_index = None
for i, path_template_regex in enumerate(path_template_regexes):
if path_template_regex.match(path):
path_template_index = i
break
if path_template_index is None:
if path in new_path_templates:
continue
new_path_templates.append(path)
continue
path_template_to_set = path_templates[path_template_index]
set_key_if_not_exists(swagger["paths"], path_template_to_set, {})
set_key_if_not_exists(
swagger["paths"][path_template_to_set],
method,
{
"summary": swagger_util.path_template_to_endpoint_name(
method, path_template_to_set
),
"responses": {},
},
)
params = swagger_util.url_to_params(url, path_template_to_set)
if args.headers:
headers_request = swagger_util.request_to_headers(
req.get_request_headers()
)
if headers_request is not None and len(headers_request) > 0:
set_key_if_not_exists(
swagger["paths"][path_template_to_set][method],
"parameters",
headers_request,
)
if params is not None and len(params) > 0:
set_key_if_not_exists(
swagger["paths"][path_template_to_set][method], "parameters", params
)
if method not in ["get", "head"]:
body = req.get_request_body()
if body is not None:
body_val = None
content_type = None
# try to parse the body as json
try:
body_val = json.loads(req.get_request_body())
content_type = "application/json"
except UnicodeDecodeError:
pass
except json.decoder.JSONDecodeError:
pass
# try to parse the body as msgpack, if it's not json
if body_val is None:
try:
body_val = msgpack.loads(req.get_request_body())
content_type = "application/msgpack"
except Exception:
pass
if content_type is None:
# try to parse the body as form data
try:
body_val_bytes: Any = dict(
urllib.parse.parse_qsl(
body, encoding="utf-8", keep_blank_values=True
)
)
body_val = {}
did_find_anything = False
for key, value in body_val_bytes.items():
did_find_anything = True
body_val[key.decode("utf-8")] = value.decode("utf-8")
if did_find_anything:
content_type = "application/x-www-form-urlencoded"
else:
body_val = None
except UnicodeDecodeError:
pass
if body_val is not None:
content_to_set = {
"content": {
content_type: {
"schema": swagger_util.value_to_schema(body_val)
}
}
}
if args.examples:
content_to_set["content"][content_type]["example"] = (
swagger_util.limit_example_size(body_val)
)
set_key_if_not_exists(
swagger["paths"][path_template_to_set][method],
"requestBody",
content_to_set,
)
response_body = req.get_response_body()
if response_body is not None:
# try parsing the response as json
try:
response_parsed = json.loads(response_body)
response_content_type = "application/json"
except UnicodeDecodeError:
response_parsed = None
except json.decoder.JSONDecodeError:
response_parsed = None
if response_parsed is None:
# try parsing the response as msgpack, if it's not json
try:
response_parsed = msgpack.loads(response_body)
response_content_type = "application/msgpack"
except Exception:
response_parsed = None
if response_parsed is not None:
resp_data_to_set = {
"description": req.get_response_reason(),
"content": {
response_content_type: {
"schema": swagger_util.value_to_schema(response_parsed)
}
},
}
if args.examples:
resp_data_to_set["content"][response_content_type][
"example"
] = swagger_util.limit_example_size(response_parsed)
if args.headers:
resp_data_to_set["headers"] = swagger_util.response_to_headers(
req.get_response_headers()
)
set_key_if_not_exists(
swagger["paths"][path_template_to_set][method]["responses"],
str(status),
resp_data_to_set,
)
if (
"responses" in swagger["paths"][path_template_to_set][method]
and len(swagger["paths"][path_template_to_set][method]["responses"])
== 0
):
# add a default response if there were no responses detected,
# this is for compliance with the OpenAPI spec
content_type = (
req.get_response_headers().get("content-type") or "text/plain"
)
swagger["paths"][path_template_to_set][method]["responses"]["200"] = {
"description": "OK",
"content": {},
}
except FlowReadException as e:
print(f"Flow file corrupted: {e}")
traceback.print_exception(*sys.exc_info())
print(
f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. "
)
if not args.format:
print(
f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}"
)
sys.exit(1)
except ValueError as e:
print(f"ValueError: {e}")
# print stack trace
traceback.print_exception(*sys.exc_info())
print(
f"{console_util.ANSI_RED}Failed to parse the input file as '{capture_reader.name()}'. "
)
if not args.format:
print(
f"It might happen that the input format as incorrectly detected. Please try using '--format flow' or '--format har' to specify the input format.{console_util.ANSI_RESET}"
)
sys.exit(1)
def is_param(param_value):
return args.param_regex.match(param_value) is not None
new_path_templates.sort()
# add suggested path templates
# basically inspects urls and replaces segments containing only numbers with a parameter
new_path_templates_with_suggestions = []
for path in new_path_templates:
# check if path contains number-only segments
segments = path.split("/")
has_param = any(is_param(segment) for segment in segments)
if has_param:
# replace digit segments with {id}, {id1}, {id2} etc
new_segments = []
param_id = 0
for segment in segments:
if is_param(segment):
param_name = "id" + str(param_id)
if param_id == 0:
param_name = "id"
new_segments.append("{" + param_name + "}")
param_id += 1
else:
new_segments.append(segment)
suggested_path = "/".join(new_segments)
# prepend the suggested path to the new_path_templates list
if suggested_path not in new_path_templates_with_suggestions:
new_path_templates_with_suggestions.append("ignore:" + suggested_path)
if not has_param or not args.suppress_params:
new_path_templates_with_suggestions.append("ignore:" + path)
# remove the ending comments not to add them twice
# append the contents of new_path_templates_with_suggestions to swagger['x-path-templates']
for path in new_path_templates_with_suggestions:
swagger["x-path-templates"].append(path)
# remove elements already generated
swagger["x-path-templates"] = [
path for path in swagger["x-path-templates"] if path not in swagger["paths"]
]
# remove duplicates while preserving order
def f7(seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
swagger["x-path-templates"] = f7(swagger["x-path-templates"])
swagger["x-path-templates"] = ruamel.yaml.comments.CommentedSeq(
swagger["x-path-templates"]
)
swagger["x-path-templates"].yaml_set_start_comment(
"Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy"
)
# save the swagger file
with open(args.output, "w") as f:
yaml.dump(swagger, f)
print("Done!")
if __name__ == "__main__":
main()