diff --git a/.vscode/launch.json b/.vscode/launch.json index 576d012..de45074 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,9 +8,9 @@ "name": "Python: mitmproxy2swagger.py", "type": "python", "request": "launch", - "program": "${workspaceRoot}/mitmproxy2swagger.py", + "program": "${workspaceRoot}/mitmproxy2swagger/mitmproxy2swagger.py", "cwd": "${workspaceRoot}", - "args": ["-i", "flows/flows_lisek_filtered", "-o", "ass.yaml", "-p", "https://api2.lisek.app/api"], + "args": ["-i", "flows/www.x-kom.pl.har", "-o", "ass.yaml", "-p", "https://mobileapi.x-kom.pl"], "console": "integratedTerminal", "justMyCode": true } diff --git a/mitmproxy2swagger/console_util.py b/mitmproxy2swagger/console_util.py new file mode 100644 index 0000000..8678366 --- /dev/null +++ b/mitmproxy2swagger/console_util.py @@ -0,0 +1,72 @@ +import sys + +ANSI_RGB = "\033[38;2;{};{};{}m" +ANSI_RGB_BG = "\033[48;2;{};{};{}m" +ANSI_RESET = "\033[0m" + +RAINBOW_COLORS = [ + (255, 0, 0), + (255, 127, 0), + (255, 255, 0), + (127, 255, 0), + (0, 255, 0), + (0, 255, 127), + (0, 255, 255), + (0, 127, 255), + (0, 0, 255), + (127, 0, 255), + (255, 0, 255), + (255, 0, 127), +] + +def rgb_interpolate(start, end, progress): + return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3)) + +# take a value from 0 to 1 and return an interpolated color from the rainbow +def rainbow_at_position(progress): + idx_a = int(progress * float(len(RAINBOW_COLORS) - 1)) + idx_b = idx_a + 1 + return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a) + +def print_progress_bar(progress = 0.0): + sys.stdout.write("\r") + progress_bar_contents = "" + PROGRESS_LENGTH = 30 + full_block = '█' + blocks = [ '▉', '▊', '▋', '▌', '▍', '▎', '▏'] + block_values = [0.875, 0.75, 0.625, 0.5, 0.375, 0.25, 0.125] + rainbow_colors = [ + (255, 0, 0), + (255, 127, 0), + (255, 255, 0), + (127, 255, 0), + (0, 255, 0), + (0, 255, 127), + (0, 255, 255), + (0, 127, 255), + (0, 0, 255), + (127, 0, 255), + (255, 0, 255), + (255, 0, 127), + + ] + for i in range(PROGRESS_LENGTH): + interpolated = rainbow_at_position(i / PROGRESS_LENGTH) + # check if should print a full block + if i < int(progress * PROGRESS_LENGTH): + interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH) + progress_bar_contents += ANSI_RGB.format(*interpolated) + progress_bar_contents += ANSI_RGB_BG.format(*interpolated_2nd_half) + progress_bar_contents += "▌" + # check if should print a non-full block + elif i < int((progress * PROGRESS_LENGTH) + 0.5): + progress_bar_contents += ANSI_RESET + progress_bar_contents += ANSI_RGB.format(*interpolated) + progress_bar_contents += blocks[int((progress * PROGRESS_LENGTH) + 0.5) - i - 1] + # otherwise, print a space + else: + progress_bar_contents += ANSI_RESET + progress_bar_contents += ' ' + progress_bar_contents += ANSI_RESET + sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100)) + sys.stdout.flush() diff --git a/mitmproxy2swagger/har_capture_reader.py b/mitmproxy2swagger/har_capture_reader.py index 04d434f..ed287ef 100644 --- a/mitmproxy2swagger/har_capture_reader.py +++ b/mitmproxy2swagger/har_capture_reader.py @@ -1,8 +1,8 @@ import os -from tokenize import Number import json_stream +from typing import Iterator # a heuristic to determine if a fileis a har archive -def har_archive_heuristic(file_path: str) -> Number: +def har_archive_heuristic(file_path: str) -> int: val = 0 # if has the har extension if file_path.endswith('.har'): @@ -27,12 +27,54 @@ def har_archive_heuristic(file_path: str) -> Number: val += 15 return val +class HarFlowWrapper: + def __init__(self, flow: dict): + self.flow = flow + def get_url(self): + return self.flow['request']['url'] + def get_method(self): + return self.flow['request']['method'] + def get_request_headers(self): + headers = {} + for kv in self.flow['request']['headers']: + k = kv['name'] + v = kv['value'] + # create list on key if it does not exist + headers[k] = headers.get(k, []) + headers[k].append(v) + def get_request_body(self): + if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']: + return self.flow['request']['postData']['text'] + return None + def get_response_status_code(self): + return self.flow['response']['status'] + def get_response_reason(self): + return self.flow['response']['statusText'] + def get_response_headers(self): + headers = {} + for kv in self.flow['response']['headers']: + k = kv['name'] + v = kv['value'] + # create list on key if it does not exist + headers[k] = headers.get(k, []) + headers[k].append(v) + return headers + def get_response_body(self): + if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']: + return self.flow['response']['content']['text'] + return None + + class HarCaptureReader: - def __init__(self, file_path: str): + def __init__(self, file_path: str, progress_callback=None): self.file_path = file_path + self.progress_callback = progress_callback def captured_requests(self) -> Iterator[HarFlowWrapper]: + har_file_size = os.path.getsize(self.file_path) with open(self.file_path, 'r') as f: - data = json_stream.load(f) - for entry in data['log']['entries']: - yield HarFlowWrapper(entry.persistent()) + data = json_stream.load(f) + for entry in data['log']['entries'].persistent(): + if self.progress_callback: + self.progress_callback(f.tell() / har_file_size) + yield HarFlowWrapper(entry) diff --git a/mitmproxy2swagger/mitmproxy2swagger.py b/mitmproxy2swagger/mitmproxy2swagger.py index 1b1d6cf..de100f3 100755 --- a/mitmproxy2swagger/mitmproxy2swagger.py +++ b/mitmproxy2swagger/mitmproxy2swagger.py @@ -13,9 +13,10 @@ import os import argparse import ruamel.yaml import re -import mitmproxy2swagger.swagger_util.swagger_util as swagger_util - - +import swagger_util +from har_capture_reader import HarCaptureReader, har_archive_heuristic +from mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic +import console_util def path_to_regex(path): # replace the path template with a regex @@ -34,7 +35,8 @@ def strip_query_string(path): def set_key_if_not_exists(dict, key, value): if key not in dict: dict[key] = value - +def progress_callback(progress): + console_util.print_progress_bar(progress) def main(): parser = argparse.ArgumentParser( @@ -49,7 +51,12 @@ def main(): args = parser.parse_args() yaml = ruamel.yaml.YAML() - + caputre_reader = None + # detect the input file type + if har_archive_heuristic(args.input) > mitmproxy_dump_file_huristic(args.input): + caputre_reader = HarCaptureReader(args.input, progress_callback) + else: + caputre_reader = MitmproxyCaptureReader(args.input, progress_callback) swagger = None # try loading the existing swagger file @@ -94,103 +101,90 @@ def main(): # new endpoints will be added here so that they can be added as comments in the swagger file new_path_templates = [] - - - - path_template_regexes = [re.compile(path_to_regex(path)) for path in path_templates] - - - with open(args.input, 'rb') as logfile: - logfile_size = os.path.getsize(args.input) - freader = iom.FlowReader(logfile) - - pp = pprint.PrettyPrinter(indent=4) - try: - for f in freader.stream(): - sys.stdout.write("Progress {0:.2f}%%\r".format( - (logfile.tell() / logfile_size * 100))) - # print(f) - if isinstance(f, http.HTTPFlow): - if not f.request.url.startswith(args.api_prefix): - continue - # strip the api prefix from the url - url = f.request.url[len(args.api_prefix):] - method = f.request.method.lower() - path = strip_query_string(url) - if f.response is None: - print("[WARN] No response for " + url) - continue - status = f.response.status_code - - # check if the path matches any of the path templates, and save the index - path_template_index = None - for i, path_template_regex in enumerate(path_template_regexes): - if path_template_regex.match(path): - path_template_index = i - break - if path_template_index is None: - if path in new_path_templates: - continue - new_path_templates.append(path) - continue - - path_template_to_set = path_templates[path_template_index] - set_key_if_not_exists( - swagger['paths'], path_template_to_set, {}) - - - set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { - 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), - - 'responses': {} - }) - params = swagger_util.url_to_params(url, path_template_to_set) - if params is not None and len(params) > 0: - set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) - if method not in ['get', 'head']: - body_val = None - # try to parse the body as json - try: - body_val = json.loads(f.request.text) - except json.decoder.JSONDecodeError: - pass - if body_val is not None: - content_to_set = { - 'content': { - 'application/json': { - 'schema': swagger_util.value_to_schema(body_val) - } - } - } - if args.examples: - content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( - body_val) - set_key_if_not_exists( - swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) - # try parsing the response as json + try: + for f in caputre_reader.captured_requests(): + + # strip the api prefix from the url + url = f.get_url() + if not url.startswith(args.api_prefix): + continue + method = f.get_method().lower() + path = strip_query_string(url).removeprefix(args.api_prefix) + status = f.get_response_status_code() + + # check if the path matches any of the path templates, and save the index + path_template_index = None + for i, path_template_regex in enumerate(path_template_regexes): + if path_template_regex.match(path): + path_template_index = i + break + if path_template_index is None: + if path in new_path_templates: + continue + new_path_templates.append(path) + continue + + path_template_to_set = path_templates[path_template_index] + set_key_if_not_exists( + swagger['paths'], path_template_to_set, {}) + + + set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { + 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), + + 'responses': {} + }) + params = swagger_util.url_to_params(url, path_template_to_set) + if params is not None and len(params) > 0: + set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) + if method not in ['get', 'head']: + body = f.get_request_body() + if body is not None: + body_val = None + # try to parse the body as json try: - response_json = json.loads(f.response.text) + body_val = json.loads(f.get_request_body()) except json.decoder.JSONDecodeError: - response_json = None - if response_json is not None: - resp_data_to_set = { - 'description': f.response.reason, + pass + if body_val is not None: + content_to_set = { 'content': { 'application/json': { - 'schema': swagger_util.value_to_schema(response_json) + 'schema': swagger_util.value_to_schema(body_val) } } } if args.examples: - resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( - response_json) - set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str( - status), resp_data_to_set) - - except FlowReadException as e: - print(f"Flow file corrupted: {e}") + content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( + body_val) + set_key_if_not_exists( + swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) + # try parsing the response as json + response_body = f.get_response_body() + if response_body is not None: + try: + response_json = json.loads(response_body) + except json.decoder.JSONDecodeError: + response_json = None + if response_json is not None: + resp_data_to_set = { + 'description': f.get_response_reason(), + 'content': { + 'application/json': { + 'schema': swagger_util.value_to_schema(response_json) + } + } + } + if args.examples: + resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( + response_json) + set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str( + status), resp_data_to_set) + + except FlowReadException as e: + print(f"Flow file corrupted: {e}") new_path_templates.sort() diff --git a/mitmproxy2swagger/mitmproxy_capture_reader.py b/mitmproxy2swagger/mitmproxy_capture_reader.py index 0112a7f..055d931 100644 --- a/mitmproxy2swagger/mitmproxy_capture_reader.py +++ b/mitmproxy2swagger/mitmproxy_capture_reader.py @@ -3,6 +3,8 @@ from tokenize import Number from typing import Iterator from mitmproxy import io as iom, http from mitmproxy.exceptions import FlowReadException +from typing import Iterator + import os def mitmproxy_dump_file_huristic(file_path: str) -> Number: @@ -17,8 +19,8 @@ def mitmproxy_dump_file_huristic(file_path: str) -> Number: # if file contains non-ascii characters if data.decode('utf-8', 'ignore').isprintable() is False: val += 50 - # if first character is a digit - if data[0].isdigit(): + # if first character of the byte array is a digit + if str(data[0]).isdigit() is True: val += 5 # if it contains the word status_code if b'status_code' in data: @@ -47,6 +49,8 @@ class MitmproxyFlowWrapper: return self.flow.request.content def get_response_status_code(self): return self.flow.response.status_code + def get_response_reason(self): + return self.flow.response.reason def get_response_headers(self): headers = {} for k, v in self.flow.response.headers.items(multi = True):\ @@ -72,6 +76,9 @@ class MitmproxyCaptureReader: if self.progress_callback: self.progress_callback(logfile.tell() / logfile_size) if isinstance(f, http.HTTPFlow): + if f.response is None: + print("[warn] flow without response: {}".format(f.request.url)) + continue yield MitmproxyFlowWrapper(f) except FlowReadException as e: print(f"Flow file corrupted: {e}") diff --git a/mitmproxy2swagger/swagger_util/swagger_util.py b/mitmproxy2swagger/swagger_util.py similarity index 100% rename from mitmproxy2swagger/swagger_util/swagger_util.py rename to mitmproxy2swagger/swagger_util.py diff --git a/mitmproxy2swagger/swagger_util/__init__.py b/mitmproxy2swagger/swagger_util/__init__.py deleted file mode 100644 index e69de29..0000000