feat: add har input

wip/har-support
alufers 3 years ago
parent 6cf8578d06
commit ddf16578b3

@ -8,9 +8,9 @@
"name": "Python: mitmproxy2swagger.py", "name": "Python: mitmproxy2swagger.py",
"type": "python", "type": "python",
"request": "launch", "request": "launch",
"program": "${workspaceRoot}/mitmproxy2swagger.py", "program": "${workspaceRoot}/mitmproxy2swagger/mitmproxy2swagger.py",
"cwd": "${workspaceRoot}", "cwd": "${workspaceRoot}",
"args": ["-i", "flows/flows_lisek_filtered", "-o", "ass.yaml", "-p", "https://api2.lisek.app/api"], "args": ["-i", "flows/www.x-kom.pl.har", "-o", "ass.yaml", "-p", "https://mobileapi.x-kom.pl"],
"console": "integratedTerminal", "console": "integratedTerminal",
"justMyCode": true "justMyCode": true
} }

@ -0,0 +1,72 @@
import sys
ANSI_RGB = "\033[38;2;{};{};{}m"
ANSI_RGB_BG = "\033[48;2;{};{};{}m"
ANSI_RESET = "\033[0m"
RAINBOW_COLORS = [
(255, 0, 0),
(255, 127, 0),
(255, 255, 0),
(127, 255, 0),
(0, 255, 0),
(0, 255, 127),
(0, 255, 255),
(0, 127, 255),
(0, 0, 255),
(127, 0, 255),
(255, 0, 255),
(255, 0, 127),
]
def rgb_interpolate(start, end, progress):
return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3))
# take a value from 0 to 1 and return an interpolated color from the rainbow
def rainbow_at_position(progress):
idx_a = int(progress * float(len(RAINBOW_COLORS) - 1))
idx_b = idx_a + 1
return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a)
def print_progress_bar(progress = 0.0):
sys.stdout.write("\r")
progress_bar_contents = ""
PROGRESS_LENGTH = 30
full_block = ''
blocks = [ '', '', '', '', '', '', '']
block_values = [0.875, 0.75, 0.625, 0.5, 0.375, 0.25, 0.125]
rainbow_colors = [
(255, 0, 0),
(255, 127, 0),
(255, 255, 0),
(127, 255, 0),
(0, 255, 0),
(0, 255, 127),
(0, 255, 255),
(0, 127, 255),
(0, 0, 255),
(127, 0, 255),
(255, 0, 255),
(255, 0, 127),
]
for i in range(PROGRESS_LENGTH):
interpolated = rainbow_at_position(i / PROGRESS_LENGTH)
# check if should print a full block
if i < int(progress * PROGRESS_LENGTH):
interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH)
progress_bar_contents += ANSI_RGB.format(*interpolated)
progress_bar_contents += ANSI_RGB_BG.format(*interpolated_2nd_half)
progress_bar_contents += ""
# check if should print a non-full block
elif i < int((progress * PROGRESS_LENGTH) + 0.5):
progress_bar_contents += ANSI_RESET
progress_bar_contents += ANSI_RGB.format(*interpolated)
progress_bar_contents += blocks[int((progress * PROGRESS_LENGTH) + 0.5) - i - 1]
# otherwise, print a space
else:
progress_bar_contents += ANSI_RESET
progress_bar_contents += ' '
progress_bar_contents += ANSI_RESET
sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100))
sys.stdout.flush()

@ -1,8 +1,8 @@
import os import os
from tokenize import Number
import json_stream import json_stream
from typing import Iterator
# a heuristic to determine if a fileis a har archive # a heuristic to determine if a fileis a har archive
def har_archive_heuristic(file_path: str) -> Number: def har_archive_heuristic(file_path: str) -> int:
val = 0 val = 0
# if has the har extension # if has the har extension
if file_path.endswith('.har'): if file_path.endswith('.har'):
@ -27,12 +27,54 @@ def har_archive_heuristic(file_path: str) -> Number:
val += 15 val += 15
return val return val
class HarFlowWrapper:
def __init__(self, flow: dict):
self.flow = flow
def get_url(self):
return self.flow['request']['url']
def get_method(self):
return self.flow['request']['method']
def get_request_headers(self):
headers = {}
for kv in self.flow['request']['headers']:
k = kv['name']
v = kv['value']
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
def get_request_body(self):
if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']:
return self.flow['request']['postData']['text']
return None
def get_response_status_code(self):
return self.flow['response']['status']
def get_response_reason(self):
return self.flow['response']['statusText']
def get_response_headers(self):
headers = {}
for kv in self.flow['response']['headers']:
k = kv['name']
v = kv['value']
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
return headers
def get_response_body(self):
if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']:
return self.flow['response']['content']['text']
return None
class HarCaptureReader: class HarCaptureReader:
def __init__(self, file_path: str): def __init__(self, file_path: str, progress_callback=None):
self.file_path = file_path self.file_path = file_path
self.progress_callback = progress_callback
def captured_requests(self) -> Iterator[HarFlowWrapper]: def captured_requests(self) -> Iterator[HarFlowWrapper]:
har_file_size = os.path.getsize(self.file_path)
with open(self.file_path, 'r') as f: with open(self.file_path, 'r') as f:
data = json_stream.load(f) data = json_stream.load(f)
for entry in data['log']['entries']: for entry in data['log']['entries'].persistent():
yield HarFlowWrapper(entry.persistent()) if self.progress_callback:
self.progress_callback(f.tell() / har_file_size)
yield HarFlowWrapper(entry)

@ -13,9 +13,10 @@ import os
import argparse import argparse
import ruamel.yaml import ruamel.yaml
import re import re
import mitmproxy2swagger.swagger_util.swagger_util as swagger_util import swagger_util
from har_capture_reader import HarCaptureReader, har_archive_heuristic
from mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic
import console_util
def path_to_regex(path): def path_to_regex(path):
# replace the path template with a regex # replace the path template with a regex
@ -34,7 +35,8 @@ def strip_query_string(path):
def set_key_if_not_exists(dict, key, value): def set_key_if_not_exists(dict, key, value):
if key not in dict: if key not in dict:
dict[key] = value dict[key] = value
def progress_callback(progress):
console_util.print_progress_bar(progress)
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -49,7 +51,12 @@ def main():
args = parser.parse_args() args = parser.parse_args()
yaml = ruamel.yaml.YAML() yaml = ruamel.yaml.YAML()
caputre_reader = None
# detect the input file type
if har_archive_heuristic(args.input) > mitmproxy_dump_file_huristic(args.input):
caputre_reader = HarCaptureReader(args.input, progress_callback)
else:
caputre_reader = MitmproxyCaptureReader(args.input, progress_callback)
swagger = None swagger = None
# try loading the existing swagger file # try loading the existing swagger file
@ -94,35 +101,18 @@ def main():
# new endpoints will be added here so that they can be added as comments in the swagger file # new endpoints will be added here so that they can be added as comments in the swagger file
new_path_templates = [] new_path_templates = []
path_template_regexes = [re.compile(path_to_regex(path)) path_template_regexes = [re.compile(path_to_regex(path))
for path in path_templates] for path in path_templates]
with open(args.input, 'rb') as logfile:
logfile_size = os.path.getsize(args.input)
freader = iom.FlowReader(logfile)
pp = pprint.PrettyPrinter(indent=4)
try: try:
for f in freader.stream(): for f in caputre_reader.captured_requests():
sys.stdout.write("Progress {0:.2f}%%\r".format(
(logfile.tell() / logfile_size * 100)))
# print(f)
if isinstance(f, http.HTTPFlow):
if not f.request.url.startswith(args.api_prefix):
continue
# strip the api prefix from the url # strip the api prefix from the url
url = f.request.url[len(args.api_prefix):] url = f.get_url()
method = f.request.method.lower() if not url.startswith(args.api_prefix):
path = strip_query_string(url)
if f.response is None:
print("[WARN] No response for " + url)
continue continue
status = f.response.status_code method = f.get_method().lower()
path = strip_query_string(url).removeprefix(args.api_prefix)
status = f.get_response_status_code()
# check if the path matches any of the path templates, and save the index # check if the path matches any of the path templates, and save the index
path_template_index = None path_template_index = None
@ -150,10 +140,12 @@ def main():
if params is not None and len(params) > 0: if params is not None and len(params) > 0:
set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params)
if method not in ['get', 'head']: if method not in ['get', 'head']:
body = f.get_request_body()
if body is not None:
body_val = None body_val = None
# try to parse the body as json # try to parse the body as json
try: try:
body_val = json.loads(f.request.text) body_val = json.loads(f.get_request_body())
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
pass pass
if body_val is not None: if body_val is not None:
@ -170,13 +162,15 @@ def main():
set_key_if_not_exists( set_key_if_not_exists(
swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set)
# try parsing the response as json # try parsing the response as json
response_body = f.get_response_body()
if response_body is not None:
try: try:
response_json = json.loads(f.response.text) response_json = json.loads(response_body)
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
response_json = None response_json = None
if response_json is not None: if response_json is not None:
resp_data_to_set = { resp_data_to_set = {
'description': f.response.reason, 'description': f.get_response_reason(),
'content': { 'content': {
'application/json': { 'application/json': {
'schema': swagger_util.value_to_schema(response_json) 'schema': swagger_util.value_to_schema(response_json)

@ -3,6 +3,8 @@ from tokenize import Number
from typing import Iterator from typing import Iterator
from mitmproxy import io as iom, http from mitmproxy import io as iom, http
from mitmproxy.exceptions import FlowReadException from mitmproxy.exceptions import FlowReadException
from typing import Iterator
import os import os
def mitmproxy_dump_file_huristic(file_path: str) -> Number: def mitmproxy_dump_file_huristic(file_path: str) -> Number:
@ -17,8 +19,8 @@ def mitmproxy_dump_file_huristic(file_path: str) -> Number:
# if file contains non-ascii characters # if file contains non-ascii characters
if data.decode('utf-8', 'ignore').isprintable() is False: if data.decode('utf-8', 'ignore').isprintable() is False:
val += 50 val += 50
# if first character is a digit # if first character of the byte array is a digit
if data[0].isdigit(): if str(data[0]).isdigit() is True:
val += 5 val += 5
# if it contains the word status_code # if it contains the word status_code
if b'status_code' in data: if b'status_code' in data:
@ -47,6 +49,8 @@ class MitmproxyFlowWrapper:
return self.flow.request.content return self.flow.request.content
def get_response_status_code(self): def get_response_status_code(self):
return self.flow.response.status_code return self.flow.response.status_code
def get_response_reason(self):
return self.flow.response.reason
def get_response_headers(self): def get_response_headers(self):
headers = {} headers = {}
for k, v in self.flow.response.headers.items(multi = True):\ for k, v in self.flow.response.headers.items(multi = True):\
@ -72,6 +76,9 @@ class MitmproxyCaptureReader:
if self.progress_callback: if self.progress_callback:
self.progress_callback(logfile.tell() / logfile_size) self.progress_callback(logfile.tell() / logfile_size)
if isinstance(f, http.HTTPFlow): if isinstance(f, http.HTTPFlow):
if f.response is None:
print("[warn] flow without response: {}".format(f.request.url))
continue
yield MitmproxyFlowWrapper(f) yield MitmproxyFlowWrapper(f)
except FlowReadException as e: except FlowReadException as e:
print(f"Flow file corrupted: {e}") print(f"Flow file corrupted: {e}")

Loading…
Cancel
Save