Fix code formatting and minor bugs

pull/8/head
kpcyrd 2 years ago
parent 90130fb524
commit e31a962b32

@ -0,0 +1,2 @@
[flake8]
ignore = E501,E222

1
.gitignore vendored

@ -5,3 +5,4 @@ lisek.swagger.yml
dist dist
dupsko.yaml dupsko.yaml
dist dist
/.mypy_cache/

@ -0,0 +1,4 @@
[mypy]
[mypy-json_stream.*]
ignore_missing_imports = True

@ -5,51 +5,38 @@ ANSI_RGB_BG = "\033[48;2;{};{};{}m"
ANSI_RESET = "\033[0m" ANSI_RESET = "\033[0m"
RAINBOW_COLORS = [ RAINBOW_COLORS = [
(255, 0, 0), (255, 0, 0),
(255, 127, 0), (255, 127, 0),
(255, 255, 0), (255, 255, 0),
(127, 255, 0), (127, 255, 0),
(0, 255, 0), (0, 255, 0),
(0, 255, 127), (0, 255, 127),
(0, 255, 255), (0, 255, 255),
(0, 127, 255), (0, 127, 255),
(0, 0, 255), (0, 0, 255),
(127, 0, 255), (127, 0, 255),
(255, 0, 255), (255, 0, 255),
(255, 0, 127), (255, 0, 127),
] ]
def rgb_interpolate(start, end, progress): def rgb_interpolate(start, end, progress):
return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3)) return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3))
# take a value from 0 to 1 and return an interpolated color from the rainbow # take a value from 0 to 1 and return an interpolated color from the rainbow
def rainbow_at_position(progress): def rainbow_at_position(progress):
idx_a = int(progress * float(len(RAINBOW_COLORS) - 1)) idx_a = int(progress * float(len(RAINBOW_COLORS) - 1))
idx_b = idx_a + 1 idx_b = idx_a + 1
return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a) return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a)
def print_progress_bar(progress = 0.0):
def print_progress_bar(progress=0.0):
sys.stdout.write("\r") sys.stdout.write("\r")
progress_bar_contents = "" progress_bar_contents = ""
PROGRESS_LENGTH = 30 PROGRESS_LENGTH = 30
full_block = '' blocks = ['', '', '', '', '', '', '']
blocks = [ '', '', '', '', '', '', '']
block_values = [0.875, 0.75, 0.625, 0.5, 0.375, 0.25, 0.125]
rainbow_colors = [
(255, 0, 0),
(255, 127, 0),
(255, 255, 0),
(127, 255, 0),
(0, 255, 0),
(0, 255, 127),
(0, 255, 255),
(0, 127, 255),
(0, 0, 255),
(127, 0, 255),
(255, 0, 255),
(255, 0, 127),
]
for i in range(PROGRESS_LENGTH): for i in range(PROGRESS_LENGTH):
interpolated = rainbow_at_position(i / PROGRESS_LENGTH) interpolated = rainbow_at_position(i / PROGRESS_LENGTH)
# check if should print a full block # check if should print a full block
@ -67,6 +54,7 @@ def print_progress_bar(progress = 0.0):
else: else:
progress_bar_contents += ANSI_RESET progress_bar_contents += ANSI_RESET
progress_bar_contents += ' ' progress_bar_contents += ' '
progress_bar_contents += ANSI_RESET progress_bar_contents += ANSI_RESET
sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100)) sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100))
sys.stdout.flush() sys.stdout.flush()

@ -1,6 +1,8 @@
import os import os
import json_stream import json_stream
from typing import Iterator from typing import Iterator
# a heuristic to determine if a fileis a har archive # a heuristic to determine if a fileis a har archive
def har_archive_heuristic(file_path: str) -> int: def har_archive_heuristic(file_path: str) -> int:
val = 0 val = 0
@ -27,13 +29,17 @@ def har_archive_heuristic(file_path: str) -> int:
val += 15 val += 15
return val return val
class HarFlowWrapper: class HarFlowWrapper:
def __init__(self, flow: dict): def __init__(self, flow: dict):
self.flow = flow self.flow = flow
def get_url(self): def get_url(self):
return self.flow['request']['url'] return self.flow['request']['url']
def get_method(self): def get_method(self):
return self.flow['request']['method'] return self.flow['request']['method']
def get_request_headers(self): def get_request_headers(self):
headers = {} headers = {}
for kv in self.flow['request']['headers']: for kv in self.flow['request']['headers']:
@ -42,14 +48,18 @@ class HarFlowWrapper:
# create list on key if it does not exist # create list on key if it does not exist
headers[k] = headers.get(k, []) headers[k] = headers.get(k, [])
headers[k].append(v) headers[k].append(v)
def get_request_body(self): def get_request_body(self):
if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']: if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']:
return self.flow['request']['postData']['text'] return self.flow['request']['postData']['text']
return None return None
def get_response_status_code(self): def get_response_status_code(self):
return self.flow['response']['status'] return self.flow['response']['status']
def get_response_reason(self): def get_response_reason(self):
return self.flow['response']['statusText'] return self.flow['response']['statusText']
def get_response_headers(self): def get_response_headers(self):
headers = {} headers = {}
for kv in self.flow['response']['headers']: for kv in self.flow['response']['headers']:
@ -59,16 +69,18 @@ class HarFlowWrapper:
headers[k] = headers.get(k, []) headers[k] = headers.get(k, [])
headers[k].append(v) headers[k].append(v)
return headers return headers
def get_response_body(self): def get_response_body(self):
if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']: if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']:
return self.flow['response']['content']['text'] return self.flow['response']['content']['text']
return None return None
class HarCaptureReader: class HarCaptureReader:
def __init__(self, file_path: str, progress_callback=None): def __init__(self, file_path: str, progress_callback=None):
self.file_path = file_path self.file_path = file_path
self.progress_callback = progress_callback self.progress_callback = progress_callback
def captured_requests(self) -> Iterator[HarFlowWrapper]: def captured_requests(self) -> Iterator[HarFlowWrapper]:
har_file_size = os.path.getsize(self.file_path) har_file_size = os.path.getsize(self.file_path)
with open(self.file_path, 'r', encoding='utf-8') as f: with open(self.file_path, 'r', encoding='utf-8') as f:
@ -77,4 +89,3 @@ class HarCaptureReader:
if self.progress_callback: if self.progress_callback:
self.progress_callback(f.tell() / har_file_size) self.progress_callback(f.tell() / har_file_size)
yield HarFlowWrapper(entry) yield HarFlowWrapper(entry)

@ -2,14 +2,8 @@
""" """
Converts a mitmproxy dump file to a swagger schema. Converts a mitmproxy dump file to a swagger schema.
""" """
from email import header
from mitmproxy import io as iom, http
from mitmproxy.exceptions import FlowReadException from mitmproxy.exceptions import FlowReadException
import pprint
import sys
import io
import json import json
import os
import argparse import argparse
import ruamel.yaml import ruamel.yaml
import re import re
@ -18,12 +12,13 @@ from .har_capture_reader import HarCaptureReader, har_archive_heuristic
from .mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic from .mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic
from . import console_util from . import console_util
def path_to_regex(path): def path_to_regex(path):
# replace the path template with a regex # replace the path template with a regex
path = path.replace('{', '(?P<') path = path.replace('{', '(?P<')
path = path.replace('}', '>[^/]+)') path = path.replace('}', '>[^/]+)')
path = path.replace('*', '.*') path = path.replace('*', '.*')
path = path.replace('/', '\/') path = path.replace('/', '\\/')
return "^" + path + "$" return "^" + path + "$"
@ -35,10 +30,13 @@ def strip_query_string(path):
def set_key_if_not_exists(dict, key, value): def set_key_if_not_exists(dict, key, value):
if key not in dict: if key not in dict:
dict[key] = value dict[key] = value
def progress_callback(progress): def progress_callback(progress):
console_util.print_progress_bar(progress) console_util.print_progress_bar(progress)
def main():
def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Converts a mitmproxy dump file or HAR to a swagger schema.') description='Converts a mitmproxy dump file or HAR to a swagger schema.')
parser.add_argument( parser.add_argument(
@ -70,29 +68,33 @@ def main():
swagger = ruamel.yaml.comments.CommentedMap({ swagger = ruamel.yaml.comments.CommentedMap({
"openapi": "3.0.0", "openapi": "3.0.0",
"info": { "info": {
"title": args.input + " Mitmproxy2Swagger", "title": args.input + " Mitmproxy2Swagger",
"version": "1.0.0" "version": "1.0.0"
}, },
}) })
# strip the trailing slash from the api prefix # strip the trailing slash from the api prefix
args.api_prefix = args.api_prefix.rstrip('/') args.api_prefix = args.api_prefix.rstrip('/')
if not 'servers' in swagger or swagger['servers'] is None: if 'servers' not in swagger or swagger['servers'] is None:
swagger['servers'] = [] swagger['servers'] = []
# add the server if it doesn't exist # add the server if it doesn't exist
if not any(server['url'] == args.api_prefix for server in swagger['servers']): if not any(server['url'] == args.api_prefix for server in swagger['servers']):
swagger['servers'].append({ swagger['servers'].append({
"url": args.api_prefix, "url": args.api_prefix,
"description": "The default server" "description": "The default server"
}) })
if not 'paths' in swagger or swagger['paths'] is None:
if 'paths' not in swagger or swagger['paths'] is None:
swagger['paths'] = {} swagger['paths'] = {}
if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None: if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None:
swagger['x-path-templates'] = [] swagger['x-path-templates'] = []
path_templates = [] path_templates = []
for path in swagger['paths']: for path in swagger['paths']:
path_templates.append(path) path_templates.append(path)
# also add paths from the the x-path-templates array # also add paths from the the x-path-templates array
if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None: if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None:
for path in swagger['x-path-templates']: for path in swagger['x-path-templates']:
@ -102,10 +104,10 @@ def main():
new_path_templates = [] new_path_templates = []
path_template_regexes = [re.compile(path_to_regex(path)) path_template_regexes = [re.compile(path_to_regex(path))
for path in path_templates] for path in path_templates]
try: try:
for f in caputre_reader.captured_requests(): for f in caputre_reader.captured_requests():
# strip the api prefix from the url # strip the api prefix from the url
url = f.get_url() url = f.get_url()
if not url.startswith(args.api_prefix): if not url.startswith(args.api_prefix):
@ -130,15 +132,16 @@ def main():
set_key_if_not_exists( set_key_if_not_exists(
swagger['paths'], path_template_to_set, {}) swagger['paths'], path_template_to_set, {})
set_key_if_not_exists(swagger['paths'][path_template_to_set], method, {
set_key_if_not_exists(swagger['paths'][path_template_to_set], method, {
'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set),
'responses': {} 'responses': {}
}) })
params = swagger_util.url_to_params(url, path_template_to_set) params = swagger_util.url_to_params(url, path_template_to_set)
if params is not None and len(params) > 0: if params is not None and len(params) > 0:
set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params)
if method not in ['get', 'head']: if method not in ['get', 'head']:
body = f.get_request_body() body = f.get_request_body()
if body is not None: if body is not None:
@ -161,6 +164,7 @@ def main():
body_val) body_val)
set_key_if_not_exists( set_key_if_not_exists(
swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set)
# try parsing the response as json # try parsing the response as json
response_body = f.get_response_body() response_body = f.get_response_body()
if response_body is not None: if response_body is not None:
@ -186,7 +190,6 @@ def main():
except FlowReadException as e: except FlowReadException as e:
print(f"Flow file corrupted: {e}") print(f"Flow file corrupted: {e}")
new_path_templates.sort() new_path_templates.sort()
# add suggested path templates # add suggested path templates
@ -240,5 +243,7 @@ def main():
with open(args.output, 'w') as f: with open(args.output, 'w') as f:
yaml.dump(swagger, f) yaml.dump(swagger, f)
print("Done!") print("Done!")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

@ -1,13 +1,10 @@
from tokenize import Number
from typing import Iterator from typing import Iterator
from mitmproxy import io as iom, http from mitmproxy import io as iom, http
from mitmproxy.exceptions import FlowReadException from mitmproxy.exceptions import FlowReadException
from typing import Iterator
import os import os
def mitmproxy_dump_file_huristic(file_path: str) -> Number:
def mitmproxy_dump_file_huristic(file_path: str) -> int:
val = 0 val = 0
if 'flow' in file_path: if 'flow' in file_path:
val += 1 val += 1
@ -28,36 +25,43 @@ def mitmproxy_dump_file_huristic(file_path: str) -> Number:
if b'regular' in data: if b'regular' in data:
val += 10 val += 10
return val return val
class MitmproxyFlowWrapper: class MitmproxyFlowWrapper:
def __init__(self, flow: http.HTTPFlow): def __init__(self, flow: http.HTTPFlow):
self.flow = flow self.flow = flow
def get_url(self): def get_url(self):
return self.flow.request.url return self.flow.request.url
def get_method(self): def get_method(self):
return self.flow.request.method return self.flow.request.method
def get_request_headers(self): def get_request_headers(self):
headers = {} headers = {}
for k, v in self.flow.request.headers.items(multi = True):\ for k, v in self.flow.request.headers.items(multi=True):
# create list on key if it does not exist # create list on key if it does not exist
headers[k] = headers.get(k, []) headers[k] = headers.get(k, [])
headers[k].append(v) headers[k].append(v)
return headers return headers
def get_request_body(self): def get_request_body(self):
return self.flow.request.content return self.flow.request.content
def get_response_status_code(self): def get_response_status_code(self):
return self.flow.response.status_code return self.flow.response.status_code
def get_response_reason(self): def get_response_reason(self):
return self.flow.response.reason return self.flow.response.reason
def get_response_headers(self): def get_response_headers(self):
headers = {} headers = {}
for k, v in self.flow.response.headers.items(multi = True):\ for k, v in self.flow.response.headers.items(multi=True):
# create list on key if it does not exist # create list on key if it does not exist
headers[k] = headers.get(k, []) headers[k] = headers.get(k, [])
headers[k].append(v) headers[k].append(v)
return headers return headers
def get_response_body(self): def get_response_body(self):
return self.flow.response.content return self.flow.response.content
@ -82,4 +86,3 @@ class MitmproxyCaptureReader:
yield MitmproxyFlowWrapper(f) yield MitmproxyFlowWrapper(f)
except FlowReadException as e: except FlowReadException as e:
print(f"Flow file corrupted: {e}") print(f"Flow file corrupted: {e}")

@ -13,6 +13,7 @@ VERBS = [
'activate' 'activate'
] ]
# generate a name for the endpoint from the path template # generate a name for the endpoint from the path template
# POST /api/v1/things/{id}/create -> POST create thing by id # POST /api/v1/things/{id}/create -> POST create thing by id
def path_template_to_endpoint_name(method, path_template): def path_template_to_endpoint_name(method, path_template):
@ -31,7 +32,7 @@ def path_template_to_endpoint_name(method, path_template):
name_parts = [] name_parts = []
for segment in segments: for segment in segments:
if segment in VERBS: if segment in VERBS:
# prepend to the name_parts # prepend to the name_parts
name_parts.insert(0, segment.lower()) name_parts.insert(0, segment.lower())
else: else:
name_parts.insert(0, segment.lower()) name_parts.insert(0, segment.lower())
@ -41,6 +42,7 @@ def path_template_to_endpoint_name(method, path_template):
break break
return method.upper() + ' ' + ' '.join(name_parts) return method.upper() + ' ' + ' '.join(name_parts)
# when given an url and its path template, generates the parameters section of the request # when given an url and its path template, generates the parameters section of the request
def url_to_params(url, path_template): def url_to_params(url, path_template):
path_template = path_template.strip('/') path_template = path_template.strip('/')
@ -71,6 +73,7 @@ def url_to_params(url, path_template):
}) })
return params return params
def value_to_schema(value): def value_to_schema(value):
# check if value is a number # check if value is a number
if type(value) == int or type(value) == float: if type(value) == int or type(value) == float:
@ -94,7 +97,7 @@ def value_to_schema(value):
'type': 'array', 'type': 'array',
'items': {} 'items': {}
} }
return { return {
'type': 'array', 'type': 'array',
'items': value_to_schema(value[0]) 'items': value_to_schema(value[0])
@ -109,13 +112,16 @@ def value_to_schema(value):
} }
} }
# if it is none, return null # if it is none, return null
elif value == None: elif value is None:
return { return {
'type': 'object' 'type': 'object'
} }
MAX_EXAMPLE_ARRAY_ELEMENTS = 10 MAX_EXAMPLE_ARRAY_ELEMENTS = 10
MAX_EXAMPLE_OBJECT_PROPERTIES = 150 MAX_EXAMPLE_OBJECT_PROPERTIES = 150
# recursively scan an example value and limit the number of elements and properties # recursively scan an example value and limit the number of elements and properties
def limit_example_size(example): def limit_example_size(example):
if type(example) == list: if type(example) == list:

Loading…
Cancel
Save