diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..876b68a --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = E501,E222 diff --git a/.gitignore b/.gitignore index 1cc551e..6473f20 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ lisek.swagger.yml dist dupsko.yaml dist +/.mypy_cache/ diff --git a/.mypy.ini b/.mypy.ini new file mode 100644 index 0000000..419b81c --- /dev/null +++ b/.mypy.ini @@ -0,0 +1,4 @@ +[mypy] + +[mypy-json_stream.*] +ignore_missing_imports = True diff --git a/mitmproxy2swagger/console_util.py b/mitmproxy2swagger/console_util.py index 8678366..62456f8 100644 --- a/mitmproxy2swagger/console_util.py +++ b/mitmproxy2swagger/console_util.py @@ -5,51 +5,38 @@ ANSI_RGB_BG = "\033[48;2;{};{};{}m" ANSI_RESET = "\033[0m" RAINBOW_COLORS = [ - (255, 0, 0), - (255, 127, 0), - (255, 255, 0), - (127, 255, 0), - (0, 255, 0), - (0, 255, 127), - (0, 255, 255), - (0, 127, 255), - (0, 0, 255), - (127, 0, 255), - (255, 0, 255), - (255, 0, 127), + (255, 0, 0), + (255, 127, 0), + (255, 255, 0), + (127, 255, 0), + (0, 255, 0), + (0, 255, 127), + (0, 255, 255), + (0, 127, 255), + (0, 0, 255), + (127, 0, 255), + (255, 0, 255), + (255, 0, 127), ] + def rgb_interpolate(start, end, progress): return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3)) + # take a value from 0 to 1 and return an interpolated color from the rainbow def rainbow_at_position(progress): idx_a = int(progress * float(len(RAINBOW_COLORS) - 1)) idx_b = idx_a + 1 - return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a) + return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a) -def print_progress_bar(progress = 0.0): + +def print_progress_bar(progress=0.0): sys.stdout.write("\r") progress_bar_contents = "" PROGRESS_LENGTH = 30 - full_block = '█' - blocks = [ '▉', '▊', '▋', '▌', '▍', '▎', '▏'] - block_values = [0.875, 0.75, 0.625, 0.5, 0.375, 0.25, 0.125] - rainbow_colors = [ - (255, 0, 0), - (255, 127, 0), - (255, 255, 0), - (127, 255, 0), - (0, 255, 0), - (0, 255, 127), - (0, 255, 255), - (0, 127, 255), - (0, 0, 255), - (127, 0, 255), - (255, 0, 255), - (255, 0, 127), + blocks = ['▉', '▊', '▋', '▌', '▍', '▎', '▏'] - ] for i in range(PROGRESS_LENGTH): interpolated = rainbow_at_position(i / PROGRESS_LENGTH) # check if should print a full block @@ -67,6 +54,7 @@ def print_progress_bar(progress = 0.0): else: progress_bar_contents += ANSI_RESET progress_bar_contents += ' ' + progress_bar_contents += ANSI_RESET sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100)) sys.stdout.flush() diff --git a/mitmproxy2swagger/har_capture_reader.py b/mitmproxy2swagger/har_capture_reader.py index 9c203e0..1277ec5 100644 --- a/mitmproxy2swagger/har_capture_reader.py +++ b/mitmproxy2swagger/har_capture_reader.py @@ -1,6 +1,8 @@ import os import json_stream from typing import Iterator + + # a heuristic to determine if a fileis a har archive def har_archive_heuristic(file_path: str) -> int: val = 0 @@ -27,13 +29,17 @@ def har_archive_heuristic(file_path: str) -> int: val += 15 return val + class HarFlowWrapper: def __init__(self, flow: dict): self.flow = flow + def get_url(self): return self.flow['request']['url'] + def get_method(self): return self.flow['request']['method'] + def get_request_headers(self): headers = {} for kv in self.flow['request']['headers']: @@ -42,14 +48,18 @@ class HarFlowWrapper: # create list on key if it does not exist headers[k] = headers.get(k, []) headers[k].append(v) + def get_request_body(self): if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']: return self.flow['request']['postData']['text'] return None + def get_response_status_code(self): return self.flow['response']['status'] + def get_response_reason(self): return self.flow['response']['statusText'] + def get_response_headers(self): headers = {} for kv in self.flow['response']['headers']: @@ -59,16 +69,18 @@ class HarFlowWrapper: headers[k] = headers.get(k, []) headers[k].append(v) return headers + def get_response_body(self): if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']: return self.flow['response']['content']['text'] return None - + class HarCaptureReader: def __init__(self, file_path: str, progress_callback=None): self.file_path = file_path self.progress_callback = progress_callback + def captured_requests(self) -> Iterator[HarFlowWrapper]: har_file_size = os.path.getsize(self.file_path) with open(self.file_path, 'r', encoding='utf-8') as f: @@ -77,4 +89,3 @@ class HarCaptureReader: if self.progress_callback: self.progress_callback(f.tell() / har_file_size) yield HarFlowWrapper(entry) - diff --git a/mitmproxy2swagger/mitmproxy2swagger.py b/mitmproxy2swagger/mitmproxy2swagger.py index 45f3692..e447914 100755 --- a/mitmproxy2swagger/mitmproxy2swagger.py +++ b/mitmproxy2swagger/mitmproxy2swagger.py @@ -2,14 +2,8 @@ """ Converts a mitmproxy dump file to a swagger schema. """ -from email import header -from mitmproxy import io as iom, http from mitmproxy.exceptions import FlowReadException -import pprint -import sys -import io import json -import os import argparse import ruamel.yaml import re @@ -18,12 +12,13 @@ from .har_capture_reader import HarCaptureReader, har_archive_heuristic from .mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic from . import console_util + def path_to_regex(path): # replace the path template with a regex path = path.replace('{', '(?P<') path = path.replace('}', '>[^/]+)') path = path.replace('*', '.*') - path = path.replace('/', '\/') + path = path.replace('/', '\\/') return "^" + path + "$" @@ -35,10 +30,13 @@ def strip_query_string(path): def set_key_if_not_exists(dict, key, value): if key not in dict: dict[key] = value + + def progress_callback(progress): console_util.print_progress_bar(progress) -def main(): + +def main(): parser = argparse.ArgumentParser( description='Converts a mitmproxy dump file or HAR to a swagger schema.') parser.add_argument( @@ -70,29 +68,33 @@ def main(): swagger = ruamel.yaml.comments.CommentedMap({ "openapi": "3.0.0", "info": { - "title": args.input + " Mitmproxy2Swagger", + "title": args.input + " Mitmproxy2Swagger", "version": "1.0.0" }, }) # strip the trailing slash from the api prefix args.api_prefix = args.api_prefix.rstrip('/') - if not 'servers' in swagger or swagger['servers'] is None: + if 'servers' not in swagger or swagger['servers'] is None: swagger['servers'] = [] + # add the server if it doesn't exist if not any(server['url'] == args.api_prefix for server in swagger['servers']): swagger['servers'].append({ "url": args.api_prefix, "description": "The default server" }) - if not 'paths' in swagger or swagger['paths'] is None: + + if 'paths' not in swagger or swagger['paths'] is None: swagger['paths'] = {} + if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None: swagger['x-path-templates'] = [] path_templates = [] for path in swagger['paths']: path_templates.append(path) + # also add paths from the the x-path-templates array if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None: for path in swagger['x-path-templates']: @@ -102,10 +104,10 @@ def main(): new_path_templates = [] path_template_regexes = [re.compile(path_to_regex(path)) - for path in path_templates] + for path in path_templates] + try: for f in caputre_reader.captured_requests(): - # strip the api prefix from the url url = f.get_url() if not url.startswith(args.api_prefix): @@ -130,15 +132,16 @@ def main(): set_key_if_not_exists( swagger['paths'], path_template_to_set, {}) - - set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { + set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), - 'responses': {} }) + params = swagger_util.url_to_params(url, path_template_to_set) + if params is not None and len(params) > 0: set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) + if method not in ['get', 'head']: body = f.get_request_body() if body is not None: @@ -161,6 +164,7 @@ def main(): body_val) set_key_if_not_exists( swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) + # try parsing the response as json response_body = f.get_response_body() if response_body is not None: @@ -186,7 +190,6 @@ def main(): except FlowReadException as e: print(f"Flow file corrupted: {e}") - new_path_templates.sort() # add suggested path templates @@ -240,5 +243,7 @@ def main(): with open(args.output, 'w') as f: yaml.dump(swagger, f) print("Done!") + + if __name__ == "__main__": main() diff --git a/mitmproxy2swagger/mitmproxy_capture_reader.py b/mitmproxy2swagger/mitmproxy_capture_reader.py index 055d931..a2b3abb 100644 --- a/mitmproxy2swagger/mitmproxy_capture_reader.py +++ b/mitmproxy2swagger/mitmproxy_capture_reader.py @@ -1,13 +1,10 @@ - -from tokenize import Number from typing import Iterator from mitmproxy import io as iom, http from mitmproxy.exceptions import FlowReadException -from typing import Iterator - import os -def mitmproxy_dump_file_huristic(file_path: str) -> Number: + +def mitmproxy_dump_file_huristic(file_path: str) -> int: val = 0 if 'flow' in file_path: val += 1 @@ -28,36 +25,43 @@ def mitmproxy_dump_file_huristic(file_path: str) -> Number: if b'regular' in data: val += 10 return val - class MitmproxyFlowWrapper: def __init__(self, flow: http.HTTPFlow): self.flow = flow + def get_url(self): return self.flow.request.url + def get_method(self): return self.flow.request.method + def get_request_headers(self): headers = {} - for k, v in self.flow.request.headers.items(multi = True):\ + for k, v in self.flow.request.headers.items(multi=True): # create list on key if it does not exist headers[k] = headers.get(k, []) headers[k].append(v) return headers + def get_request_body(self): return self.flow.request.content + def get_response_status_code(self): return self.flow.response.status_code + def get_response_reason(self): return self.flow.response.reason + def get_response_headers(self): headers = {} - for k, v in self.flow.response.headers.items(multi = True):\ + for k, v in self.flow.response.headers.items(multi=True): # create list on key if it does not exist headers[k] = headers.get(k, []) headers[k].append(v) return headers + def get_response_body(self): return self.flow.response.content @@ -82,4 +86,3 @@ class MitmproxyCaptureReader: yield MitmproxyFlowWrapper(f) except FlowReadException as e: print(f"Flow file corrupted: {e}") - diff --git a/mitmproxy2swagger/swagger_util.py b/mitmproxy2swagger/swagger_util.py index e2502bd..55f5d1c 100644 --- a/mitmproxy2swagger/swagger_util.py +++ b/mitmproxy2swagger/swagger_util.py @@ -13,6 +13,7 @@ VERBS = [ 'activate' ] + # generate a name for the endpoint from the path template # POST /api/v1/things/{id}/create -> POST create thing by id def path_template_to_endpoint_name(method, path_template): @@ -31,7 +32,7 @@ def path_template_to_endpoint_name(method, path_template): name_parts = [] for segment in segments: if segment in VERBS: - # prepend to the name_parts + # prepend to the name_parts name_parts.insert(0, segment.lower()) else: name_parts.insert(0, segment.lower()) @@ -41,6 +42,7 @@ def path_template_to_endpoint_name(method, path_template): break return method.upper() + ' ' + ' '.join(name_parts) + # when given an url and its path template, generates the parameters section of the request def url_to_params(url, path_template): path_template = path_template.strip('/') @@ -71,6 +73,7 @@ def url_to_params(url, path_template): }) return params + def value_to_schema(value): # check if value is a number if type(value) == int or type(value) == float: @@ -94,7 +97,7 @@ def value_to_schema(value): 'type': 'array', 'items': {} } - + return { 'type': 'array', 'items': value_to_schema(value[0]) @@ -109,13 +112,16 @@ def value_to_schema(value): } } # if it is none, return null - elif value == None: + elif value is None: return { 'type': 'object' } + MAX_EXAMPLE_ARRAY_ELEMENTS = 10 MAX_EXAMPLE_OBJECT_PROPERTIES = 150 + + # recursively scan an example value and limit the number of elements and properties def limit_example_size(example): if type(example) == list: