add a binary to pypi

pull/1/head
alufers 3 years ago
parent a1093b8cb1
commit a9c5966109

Binary file not shown.

Binary file not shown.

@ -1,250 +0,0 @@
#!/usr/bin/env python
"""
Converts a mitmproxy dump file to a swagger schema.
"""
from email import header
from mitmproxy import io as iom, http
from mitmproxy.exceptions import FlowReadException
import pprint
import sys
import io
import json
import os
import argparse
import ruamel.yaml
import re
import swagger_util
parser = argparse.ArgumentParser(
description='Converts a mitmproxy dump file to a swagger schema.')
parser.add_argument(
'-i', '--input', help='The input mitmproxy dump file', required=True)
parser.add_argument(
'-o', '--output', help='The output swagger schema file (yaml). If it exists, new endpoints will be added', required=True)
parser.add_argument('-p', '--api-prefix', help='The api prefix', required=True)
parser.add_argument('-e', '--examples', action='store_true',
help='Include examples in the schema. This might expose sensitive information.')
args = parser.parse_args()
yaml = ruamel.yaml.YAML()
swagger = None
# try loading the existing swagger file
try:
with open(args.output, 'r') as f:
swagger = yaml.load(f)
except FileNotFoundError:
print("No existing swagger file found. Creating new one.")
pass
if swagger is None:
swagger = ruamel.yaml.comments.CommentedMap({
"openapi": "3.0.0",
"info": {
"title": args.input + " Mitmproxy2Swagger",
"version": "1.0.0"
},
})
# strip the trailing slash from the api prefix
args.api_prefix = args.api_prefix.rstrip('/')
if not 'servers' in swagger or swagger['servers'] is None:
swagger['servers'] = []
# add the server if it doesn't exist
if not any(server['url'] == args.api_prefix for server in swagger['servers']):
swagger['servers'].append({
"url": args.api_prefix,
"description": "The default server"
})
if not 'paths' in swagger or swagger['paths'] is None:
swagger['paths'] = {}
if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None:
swagger['x-path-templates'] = []
path_templates = []
for path in swagger['paths']:
path_templates.append(path)
# also add paths from the the x-path-templates array
if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None:
for path in swagger['x-path-templates']:
path_templates.append(path)
# new endpoints will be added here so that they can be added as comments in the swagger file
new_path_templates = []
def path_to_regex(path):
# replace the path template with a regex
path = path.replace('{', '(?P<')
path = path.replace('}', '>[^/]+)')
path = path.replace('*', '.*')
path = path.replace('/', '\/')
return "^" + path + "$"
def strip_query_string(path):
# remove the query string from the path
return path.split('?')[0]
def set_key_if_not_exists(dict, key, value):
if key not in dict:
dict[key] = value
path_template_regexes = [re.compile(path_to_regex(path))
for path in path_templates]
with open(args.input, 'rb') as logfile:
logfile_size = os.path.getsize(args.input)
freader = iom.FlowReader(logfile)
pp = pprint.PrettyPrinter(indent=4)
try:
for f in freader.stream():
sys.stdout.write("Progress {0:.2f}%%\r".format(
(logfile.tell() / logfile_size * 100)))
# print(f)
if isinstance(f, http.HTTPFlow):
if not f.request.url.startswith(args.api_prefix):
continue
# strip the api prefix from the url
url = f.request.url[len(args.api_prefix):]
method = f.request.method.lower()
path = strip_query_string(url)
if f.response is None:
print("[WARN] No response for " + url)
continue
status = f.response.status_code
# check if the path matches any of the path templates, and save the index
path_template_index = None
for i, path_template_regex in enumerate(path_template_regexes):
if path_template_regex.match(path):
path_template_index = i
break
if path_template_index is None:
if path in new_path_templates:
continue
new_path_templates.append(path)
continue
path_template_to_set = path_templates[path_template_index]
set_key_if_not_exists(
swagger['paths'], path_template_to_set, {})
set_key_if_not_exists(swagger['paths'][path_template_to_set], method, {
'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set),
'responses': {}
})
params = swagger_util.url_to_params(url, path_template_to_set)
if params is not None and len(params) > 0:
set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params)
if method not in ['get', 'head']:
body_val = None
# try to parse the body as json
try:
body_val = json.loads(f.request.text)
except json.decoder.JSONDecodeError:
pass
if body_val is not None:
content_to_set = {
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(body_val)
}
}
}
if args.examples:
content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
body_val)
set_key_if_not_exists(
swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set)
# try parsing the response as json
try:
response_json = json.loads(f.response.text)
except json.decoder.JSONDecodeError:
response_json = None
if response_json is not None:
resp_data_to_set = {
'description': f.response.reason,
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(response_json)
}
}
}
if args.examples:
resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
response_json)
set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str(
status), resp_data_to_set)
except FlowReadException as e:
print(f"Flow file corrupted: {e}")
new_path_templates.sort()
# add suggested path templates
# basically inspects urls and replaces segments containing only numbers with a parameter
new_path_templates_with_suggestions = []
for idx, path in enumerate(new_path_templates):
# check if path contains number-only segments
segments = path.split('/')
if any(segment.isdigit() for segment in segments):
# replace digit segments with {id}, {id1}, {id2} etc
new_segments = []
param_id = 0
for segment in segments:
if segment.isdigit():
param_name = 'id' + str(param_id)
if param_id == 0:
param_name = 'id'
new_segments.append('{' + param_name + '}')
param_id += 1
else:
new_segments.append(segment)
suggested_path = '/'.join(new_segments)
# prepend the suggested path to the new_path_templates list
if suggested_path not in new_path_templates_with_suggestions:
new_path_templates_with_suggestions.append(
"ignore:" + suggested_path)
new_path_templates_with_suggestions.append("ignore:" + path)
# remove the ending comments not to add them twice
# append the contents of new_path_templates_with_suggestions to swagger['x-path-templates']
for path in new_path_templates_with_suggestions:
swagger['x-path-templates'].append(path)
# remove elements already generated
swagger['x-path-templates'] = [
path for path in swagger['x-path-templates'] if path not in swagger['paths']]
# remove duplicates while preserving order
def f7(seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
swagger['x-path-templates'] = f7(swagger['x-path-templates'])
swagger['x-path-templates'] = ruamel.yaml.comments.CommentedSeq(
swagger['x-path-templates'])
swagger['x-path-templates'].yaml_set_start_comment(
'Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy')
# save the swagger file
with open(args.output, 'w') as f:
yaml.dump(swagger, f)
# f.write(
# " # Uncomment any of the following lines to generate the corresponding endpoint\n")
# f.write(
# " # Lines that are closer to the top take precedence, the matching is greedy\n")
# f.write("\n")
# for path in new_path_templates_with_suggestions:
# f.write(" #- " + path + "\n")
print("Done!")

@ -0,0 +1,250 @@
#!/usr/bin/env python
"""
Converts a mitmproxy dump file to a swagger schema.
"""
from email import header
from mitmproxy import io as iom, http
from mitmproxy.exceptions import FlowReadException
import pprint
import sys
import io
import json
import os
import argparse
import ruamel.yaml
import re
import mitmproxy2swagger.swagger_util.swagger_util as swagger_util
def path_to_regex(path):
# replace the path template with a regex
path = path.replace('{', '(?P<')
path = path.replace('}', '>[^/]+)')
path = path.replace('*', '.*')
path = path.replace('/', '\/')
return "^" + path + "$"
def strip_query_string(path):
# remove the query string from the path
return path.split('?')[0]
def set_key_if_not_exists(dict, key, value):
if key not in dict:
dict[key] = value
def main():
parser = argparse.ArgumentParser(
description='Converts a mitmproxy dump file to a swagger schema.')
parser.add_argument(
'-i', '--input', help='The input mitmproxy dump file', required=True)
parser.add_argument(
'-o', '--output', help='The output swagger schema file (yaml). If it exists, new endpoints will be added', required=True)
parser.add_argument('-p', '--api-prefix', help='The api prefix', required=True)
parser.add_argument('-e', '--examples', action='store_true',
help='Include examples in the schema. This might expose sensitive information.')
args = parser.parse_args()
yaml = ruamel.yaml.YAML()
swagger = None
# try loading the existing swagger file
try:
with open(args.output, 'r') as f:
swagger = yaml.load(f)
except FileNotFoundError:
print("No existing swagger file found. Creating new one.")
pass
if swagger is None:
swagger = ruamel.yaml.comments.CommentedMap({
"openapi": "3.0.0",
"info": {
"title": args.input + " Mitmproxy2Swagger",
"version": "1.0.0"
},
})
# strip the trailing slash from the api prefix
args.api_prefix = args.api_prefix.rstrip('/')
if not 'servers' in swagger or swagger['servers'] is None:
swagger['servers'] = []
# add the server if it doesn't exist
if not any(server['url'] == args.api_prefix for server in swagger['servers']):
swagger['servers'].append({
"url": args.api_prefix,
"description": "The default server"
})
if not 'paths' in swagger or swagger['paths'] is None:
swagger['paths'] = {}
if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None:
swagger['x-path-templates'] = []
path_templates = []
for path in swagger['paths']:
path_templates.append(path)
# also add paths from the the x-path-templates array
if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None:
for path in swagger['x-path-templates']:
path_templates.append(path)
# new endpoints will be added here so that they can be added as comments in the swagger file
new_path_templates = []
path_template_regexes = [re.compile(path_to_regex(path))
for path in path_templates]
with open(args.input, 'rb') as logfile:
logfile_size = os.path.getsize(args.input)
freader = iom.FlowReader(logfile)
pp = pprint.PrettyPrinter(indent=4)
try:
for f in freader.stream():
sys.stdout.write("Progress {0:.2f}%%\r".format(
(logfile.tell() / logfile_size * 100)))
# print(f)
if isinstance(f, http.HTTPFlow):
if not f.request.url.startswith(args.api_prefix):
continue
# strip the api prefix from the url
url = f.request.url[len(args.api_prefix):]
method = f.request.method.lower()
path = strip_query_string(url)
if f.response is None:
print("[WARN] No response for " + url)
continue
status = f.response.status_code
# check if the path matches any of the path templates, and save the index
path_template_index = None
for i, path_template_regex in enumerate(path_template_regexes):
if path_template_regex.match(path):
path_template_index = i
break
if path_template_index is None:
if path in new_path_templates:
continue
new_path_templates.append(path)
continue
path_template_to_set = path_templates[path_template_index]
set_key_if_not_exists(
swagger['paths'], path_template_to_set, {})
set_key_if_not_exists(swagger['paths'][path_template_to_set], method, {
'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set),
'responses': {}
})
params = swagger_util.url_to_params(url, path_template_to_set)
if params is not None and len(params) > 0:
set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params)
if method not in ['get', 'head']:
body_val = None
# try to parse the body as json
try:
body_val = json.loads(f.request.text)
except json.decoder.JSONDecodeError:
pass
if body_val is not None:
content_to_set = {
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(body_val)
}
}
}
if args.examples:
content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
body_val)
set_key_if_not_exists(
swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set)
# try parsing the response as json
try:
response_json = json.loads(f.response.text)
except json.decoder.JSONDecodeError:
response_json = None
if response_json is not None:
resp_data_to_set = {
'description': f.response.reason,
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(response_json)
}
}
}
if args.examples:
resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
response_json)
set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str(
status), resp_data_to_set)
except FlowReadException as e:
print(f"Flow file corrupted: {e}")
new_path_templates.sort()
# add suggested path templates
# basically inspects urls and replaces segments containing only numbers with a parameter
new_path_templates_with_suggestions = []
for idx, path in enumerate(new_path_templates):
# check if path contains number-only segments
segments = path.split('/')
if any(segment.isdigit() for segment in segments):
# replace digit segments with {id}, {id1}, {id2} etc
new_segments = []
param_id = 0
for segment in segments:
if segment.isdigit():
param_name = 'id' + str(param_id)
if param_id == 0:
param_name = 'id'
new_segments.append('{' + param_name + '}')
param_id += 1
else:
new_segments.append(segment)
suggested_path = '/'.join(new_segments)
# prepend the suggested path to the new_path_templates list
if suggested_path not in new_path_templates_with_suggestions:
new_path_templates_with_suggestions.append(
"ignore:" + suggested_path)
new_path_templates_with_suggestions.append("ignore:" + path)
# remove the ending comments not to add them twice
# append the contents of new_path_templates_with_suggestions to swagger['x-path-templates']
for path in new_path_templates_with_suggestions:
swagger['x-path-templates'].append(path)
# remove elements already generated
swagger['x-path-templates'] = [
path for path in swagger['x-path-templates'] if path not in swagger['paths']]
# remove duplicates while preserving order
def f7(seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
swagger['x-path-templates'] = f7(swagger['x-path-templates'])
swagger['x-path-templates'] = ruamel.yaml.comments.CommentedSeq(
swagger['x-path-templates'])
swagger['x-path-templates'].yaml_set_start_comment(
'Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy')
# save the swagger file
with open(args.output, 'w') as f:
yaml.dump(swagger, f)
print("Done!")
if __name__ == "__main__":
main()

@ -1,6 +1,6 @@
[tool.poetry]
name = "mitmproxy2swagger"
version = "0.1.0"
version = "0.2.0"
description = ""
authors = ["alufers <alufers@wp.pl>"]
@ -14,3 +14,6 @@ mitmproxy = "^8.0.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
mitmproxy2swagger = 'mitmproxy2swagger.mitmproxy2swagger:main'

Loading…
Cancel
Save