alufers 3 years ago
commit 81aea50e86

@ -8,9 +8,9 @@
"name": "Python: mitmproxy2swagger.py",
"type": "python",
"request": "launch",
"program": "${workspaceRoot}/mitmproxy2swagger.py",
"program": "${workspaceRoot}/mitmproxy2swagger/mitmproxy2swagger.py",
"cwd": "${workspaceRoot}",
"args": ["-i", "flows/flows_lisek_filtered", "-o", "ass.yaml", "-p", "https://api2.lisek.app/api"],
"args": ["-i", "flows/www.x-kom.pl.har", "-o", "ass.yaml", "-p", "https://mobileapi.x-kom.pl"],
"console": "integratedTerminal",
"justMyCode": true
}

@ -0,0 +1,72 @@
import sys
ANSI_RGB = "\033[38;2;{};{};{}m"
ANSI_RGB_BG = "\033[48;2;{};{};{}m"
ANSI_RESET = "\033[0m"
RAINBOW_COLORS = [
(255, 0, 0),
(255, 127, 0),
(255, 255, 0),
(127, 255, 0),
(0, 255, 0),
(0, 255, 127),
(0, 255, 255),
(0, 127, 255),
(0, 0, 255),
(127, 0, 255),
(255, 0, 255),
(255, 0, 127),
]
def rgb_interpolate(start, end, progress):
return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3))
# take a value from 0 to 1 and return an interpolated color from the rainbow
def rainbow_at_position(progress):
idx_a = int(progress * float(len(RAINBOW_COLORS) - 1))
idx_b = idx_a + 1
return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a)
def print_progress_bar(progress = 0.0):
sys.stdout.write("\r")
progress_bar_contents = ""
PROGRESS_LENGTH = 30
full_block = ''
blocks = [ '', '', '', '', '', '', '']
block_values = [0.875, 0.75, 0.625, 0.5, 0.375, 0.25, 0.125]
rainbow_colors = [
(255, 0, 0),
(255, 127, 0),
(255, 255, 0),
(127, 255, 0),
(0, 255, 0),
(0, 255, 127),
(0, 255, 255),
(0, 127, 255),
(0, 0, 255),
(127, 0, 255),
(255, 0, 255),
(255, 0, 127),
]
for i in range(PROGRESS_LENGTH):
interpolated = rainbow_at_position(i / PROGRESS_LENGTH)
# check if should print a full block
if i < int(progress * PROGRESS_LENGTH):
interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH)
progress_bar_contents += ANSI_RGB.format(*interpolated)
progress_bar_contents += ANSI_RGB_BG.format(*interpolated_2nd_half)
progress_bar_contents += ""
# check if should print a non-full block
elif i < int((progress * PROGRESS_LENGTH) + 0.5):
progress_bar_contents += ANSI_RESET
progress_bar_contents += ANSI_RGB.format(*interpolated)
progress_bar_contents += blocks[int((progress * PROGRESS_LENGTH) + 0.5) - i - 1]
# otherwise, print a space
else:
progress_bar_contents += ANSI_RESET
progress_bar_contents += ' '
progress_bar_contents += ANSI_RESET
sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100))
sys.stdout.flush()

@ -0,0 +1,80 @@
import os
import json_stream
from typing import Iterator
# a heuristic to determine if a fileis a har archive
def har_archive_heuristic(file_path: str) -> int:
val = 0
# if has the har extension
if file_path.endswith('.har'):
val += 15
# read the first 2048 bytes
with open(file_path, 'rb') as f:
data = f.read(2048)
# if file contains only ascii characters
if data.decode('utf-8', 'ignore').isprintable() is True:
val += 40
# if first character is a '{'
if data[0] == '{':
val += 15
# if it contains the word '"WebInspector"'
if b'"WebInspector"' in data:
val += 15
# if it contains the word '"entries"'
if b'"entries"' in data:
val += 15
# if it contains the word '"version"'
if b'"version"' in data:
val += 15
return val
class HarFlowWrapper:
def __init__(self, flow: dict):
self.flow = flow
def get_url(self):
return self.flow['request']['url']
def get_method(self):
return self.flow['request']['method']
def get_request_headers(self):
headers = {}
for kv in self.flow['request']['headers']:
k = kv['name']
v = kv['value']
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
def get_request_body(self):
if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']:
return self.flow['request']['postData']['text']
return None
def get_response_status_code(self):
return self.flow['response']['status']
def get_response_reason(self):
return self.flow['response']['statusText']
def get_response_headers(self):
headers = {}
for kv in self.flow['response']['headers']:
k = kv['name']
v = kv['value']
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
return headers
def get_response_body(self):
if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']:
return self.flow['response']['content']['text']
return None
class HarCaptureReader:
def __init__(self, file_path: str, progress_callback=None):
self.file_path = file_path
self.progress_callback = progress_callback
def captured_requests(self) -> Iterator[HarFlowWrapper]:
har_file_size = os.path.getsize(self.file_path)
with open(self.file_path, 'r') as f:
data = json_stream.load(f)
for entry in data['log']['entries'].persistent():
if self.progress_callback:
self.progress_callback(f.tell() / har_file_size)
yield HarFlowWrapper(entry)

@ -13,9 +13,10 @@ import os
import argparse
import ruamel.yaml
import re
import mitmproxy2swagger.swagger_util.swagger_util as swagger_util
import swagger_util
from har_capture_reader import HarCaptureReader, har_archive_heuristic
from mitmproxy_capture_reader import MitmproxyCaptureReader, mitmproxy_dump_file_huristic
import console_util
def path_to_regex(path):
# replace the path template with a regex
@ -34,7 +35,8 @@ def strip_query_string(path):
def set_key_if_not_exists(dict, key, value):
if key not in dict:
dict[key] = value
def progress_callback(progress):
console_util.print_progress_bar(progress)
def main():
parser = argparse.ArgumentParser(
@ -49,7 +51,12 @@ def main():
args = parser.parse_args()
yaml = ruamel.yaml.YAML()
caputre_reader = None
# detect the input file type
if har_archive_heuristic(args.input) > mitmproxy_dump_file_huristic(args.input):
caputre_reader = HarCaptureReader(args.input, progress_callback)
else:
caputre_reader = MitmproxyCaptureReader(args.input, progress_callback)
swagger = None
# try loading the existing swagger file
@ -96,97 +103,88 @@ def main():
path_template_regexes = [re.compile(path_to_regex(path))
for path in path_templates]
with open(args.input, 'rb') as logfile:
logfile_size = os.path.getsize(args.input)
freader = iom.FlowReader(logfile)
pp = pprint.PrettyPrinter(indent=4)
try:
for f in freader.stream():
sys.stdout.write("Progress {0:.2f}%%\r".format(
(logfile.tell() / logfile_size * 100)))
# print(f)
if isinstance(f, http.HTTPFlow):
if not f.request.url.startswith(args.api_prefix):
continue
# strip the api prefix from the url
url = f.request.url[len(args.api_prefix):]
method = f.request.method.lower()
path = strip_query_string(url)
if f.response is None:
print("[WARN] No response for " + url)
continue
status = f.response.status_code
# check if the path matches any of the path templates, and save the index
path_template_index = None
for i, path_template_regex in enumerate(path_template_regexes):
if path_template_regex.match(path):
path_template_index = i
break
if path_template_index is None:
if path in new_path_templates:
continue
new_path_templates.append(path)
continue
path_template_to_set = path_templates[path_template_index]
set_key_if_not_exists(
swagger['paths'], path_template_to_set, {})
set_key_if_not_exists(swagger['paths'][path_template_to_set], method, {
'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set),
'responses': {}
})
params = swagger_util.url_to_params(url, path_template_to_set)
if params is not None and len(params) > 0:
set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params)
if method not in ['get', 'head']:
body_val = None
# try to parse the body as json
try:
body_val = json.loads(f.request.text)
except json.decoder.JSONDecodeError:
pass
if body_val is not None:
content_to_set = {
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(body_val)
}
}
}
if args.examples:
content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
body_val)
set_key_if_not_exists(
swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set)
# try parsing the response as json
try:
for f in caputre_reader.captured_requests():
# strip the api prefix from the url
url = f.get_url()
if not url.startswith(args.api_prefix):
continue
method = f.get_method().lower()
path = strip_query_string(url).removeprefix(args.api_prefix)
status = f.get_response_status_code()
# check if the path matches any of the path templates, and save the index
path_template_index = None
for i, path_template_regex in enumerate(path_template_regexes):
if path_template_regex.match(path):
path_template_index = i
break
if path_template_index is None:
if path in new_path_templates:
continue
new_path_templates.append(path)
continue
path_template_to_set = path_templates[path_template_index]
set_key_if_not_exists(
swagger['paths'], path_template_to_set, {})
set_key_if_not_exists(swagger['paths'][path_template_to_set], method, {
'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set),
'responses': {}
})
params = swagger_util.url_to_params(url, path_template_to_set)
if params is not None and len(params) > 0:
set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params)
if method not in ['get', 'head']:
body = f.get_request_body()
if body is not None:
body_val = None
# try to parse the body as json
try:
response_json = json.loads(f.response.text)
body_val = json.loads(f.get_request_body())
except json.decoder.JSONDecodeError:
response_json = None
if response_json is not None:
resp_data_to_set = {
'description': f.response.reason,
pass
if body_val is not None:
content_to_set = {
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(response_json)
'schema': swagger_util.value_to_schema(body_val)
}
}
}
if args.examples:
resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
response_json)
set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str(
status), resp_data_to_set)
except FlowReadException as e:
print(f"Flow file corrupted: {e}")
content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
body_val)
set_key_if_not_exists(
swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set)
# try parsing the response as json
response_body = f.get_response_body()
if response_body is not None:
try:
response_json = json.loads(response_body)
except json.decoder.JSONDecodeError:
response_json = None
if response_json is not None:
resp_data_to_set = {
'description': f.get_response_reason(),
'content': {
'application/json': {
'schema': swagger_util.value_to_schema(response_json)
}
}
}
if args.examples:
resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size(
response_json)
set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str(
status), resp_data_to_set)
except FlowReadException as e:
print(f"Flow file corrupted: {e}")
new_path_templates.sort()

@ -0,0 +1,85 @@
from tokenize import Number
from typing import Iterator
from mitmproxy import io as iom, http
from mitmproxy.exceptions import FlowReadException
from typing import Iterator
import os
def mitmproxy_dump_file_huristic(file_path: str) -> Number:
val = 0
if 'flow' in file_path:
val += 1
if 'mitmproxy' in file_path:
val += 1
# read the first 2048 bytes
with open(file_path, 'rb') as f:
data = f.read(2048)
# if file contains non-ascii characters
if data.decode('utf-8', 'ignore').isprintable() is False:
val += 50
# if first character of the byte array is a digit
if str(data[0]).isdigit() is True:
val += 5
# if it contains the word status_code
if b'status_code' in data:
val += 5
if b'regular' in data:
val += 10
return val
class MitmproxyFlowWrapper:
def __init__(self, flow: http.HTTPFlow):
self.flow = flow
def get_url(self):
return self.flow.request.url
def get_method(self):
return self.flow.request.method
def get_request_headers(self):
headers = {}
for k, v in self.flow.request.headers.items(multi = True):\
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
return headers
def get_request_body(self):
return self.flow.request.content
def get_response_status_code(self):
return self.flow.response.status_code
def get_response_reason(self):
return self.flow.response.reason
def get_response_headers(self):
headers = {}
for k, v in self.flow.response.headers.items(multi = True):\
# create list on key if it does not exist
headers[k] = headers.get(k, [])
headers[k].append(v)
return headers
def get_response_body(self):
return self.flow.response.content
class MitmproxyCaptureReader:
def __init__(self, file_path, progress_callback=None):
self.file_path = file_path
self.progress_callback = progress_callback
def captured_requests(self) -> Iterator[MitmproxyFlowWrapper]:
with open(self.file_path, 'rb') as logfile:
logfile_size = os.path.getsize(self.file_path)
freader = iom.FlowReader(logfile)
try:
for f in freader.stream():
if self.progress_callback:
self.progress_callback(logfile.tell() / logfile_size)
if isinstance(f, http.HTTPFlow):
if f.response is None:
print("[warn] flow without response: {}".format(f.request.url))
continue
yield MitmproxyFlowWrapper(f)
except FlowReadException as e:
print(f"Flow file corrupted: {e}")

88
poetry.lock generated

@ -6,6 +6,9 @@ category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
typing-extensions = {version = "*", markers = "python_version < \"3.8\""}
[package.extras]
tests = ["pytest", "pytest-asyncio", "mypy (>=0.800)"]
@ -54,6 +57,7 @@ python-versions = ">=3.7"
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
importlib-metadata = {version = "*", markers = "python_version < \"3.8\""}
[[package]]
name = "colorama"
@ -82,6 +86,14 @@ sdist = ["setuptools_rust (>=0.11.4)"]
ssh = ["bcrypt (>=3.1.5)"]
test = ["pytest (>=6.2.0)", "pytest-cov", "pytest-subtests", "pytest-xdist", "pretend", "iso8601", "pytz", "hypothesis (>=1.11.4,!=3.79.2)"]
[[package]]
name = "dataclasses"
version = "0.8"
description = "A backport of the dataclasses module for Python 3.6"
category = "main"
optional = false
python-versions = ">=3.6, <3.7"
[[package]]
name = "flask"
version = "2.0.3"
@ -108,6 +120,10 @@ category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
dataclasses = {version = "*", markers = "python_version < \"3.7\""}
typing-extensions = {version = "*", markers = "python_version < \"3.8\""}
[[package]]
name = "h2"
version = "4.1.0"
@ -136,6 +152,23 @@ category = "main"
optional = false
python-versions = ">=3.6.1"
[[package]]
name = "importlib-metadata"
version = "4.11.3"
description = "Read metadata from Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
typing-extensions = {version = ">=3.6.4", markers = "python_version < \"3.8\""}
zipp = ">=0.5"
[package.extras]
docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)"]
perf = ["ipython"]
testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "packaging", "pyfakefs", "flufl.flake8", "pytest-perf (>=0.9.2)", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)", "importlib-resources (>=1.3)"]
[[package]]
name = "itsdangerous"
version = "2.1.2"
@ -158,6 +191,17 @@ MarkupSafe = ">=2.0"
[package.extras]
i18n = ["Babel (>=2.7)"]
[[package]]
name = "json-stream"
version = "1.3.0"
description = "Streaming JSON decoder"
category = "main"
optional = false
python-versions = "<4,>=3.5"
[package.extras]
requests = ["requests"]
[[package]]
name = "kaitaistruct"
version = "0.9"
@ -362,6 +406,14 @@ category = "main"
optional = false
python-versions = ">= 3.5"
[[package]]
name = "typing-extensions"
version = "4.1.1"
description = "Backported and Experimental Type Hints for Python 3.6+"
category = "main"
optional = false
python-versions = ">=3.6"
[[package]]
name = "urwid"
version = "2.1.2"
@ -392,6 +444,18 @@ python-versions = ">=3.7.0"
[package.dependencies]
h11 = ">=0.9.0,<1"
[[package]]
name = "zipp"
version = "3.8.0"
description = "Backport of pathlib-compatible object wrapper for zip files"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)"]
testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"]
[[package]]
name = "zstandard"
version = "0.17.0"
@ -408,8 +472,8 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "6f13a784668fbbf96c0c732392cf912f243163fa01a7dd6ea4a7938d1ff84d32"
python-versions = "^3.5"
content-hash = "8d0c85f2b28deebc09b485b5b606fbb73a1d3129453125a9d483798dc9a924a8"
[metadata.files]
asgiref = [
@ -569,6 +633,10 @@ cryptography = [
{file = "cryptography-36.0.2-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:e167b6b710c7f7bc54e67ef593f8731e1f45aa35f8a8a7b72d6e42ec76afd4b3"},
{file = "cryptography-36.0.2.tar.gz", hash = "sha256:70f8f4f7bb2ac9f340655cbac89d68c527af5bb4387522a8413e841e3e6628c9"},
]
dataclasses = [
{file = "dataclasses-0.8-py3-none-any.whl", hash = "sha256:0201d89fa866f68c8ebd9d08ee6ff50c0b255f8ec63a71c16fda7af82bb887bf"},
{file = "dataclasses-0.8.tar.gz", hash = "sha256:8479067f342acf957dc82ec415d355ab5edb7e7646b90dc6e2fd1d96ad084c97"},
]
flask = [
{file = "Flask-2.0.3-py3-none-any.whl", hash = "sha256:59da8a3170004800a2837844bfa84d49b022550616070f7cb1a659682b2e7c9f"},
{file = "Flask-2.0.3.tar.gz", hash = "sha256:e1120c228ca2f553b470df4a5fa927ab66258467526069981b3eb0a91902687d"},
@ -589,6 +657,10 @@ hyperframe = [
{file = "hyperframe-6.0.1-py3-none-any.whl", hash = "sha256:0ec6bafd80d8ad2195c4f03aacba3a8265e57bc4cff261e802bf39970ed02a15"},
{file = "hyperframe-6.0.1.tar.gz", hash = "sha256:ae510046231dc8e9ecb1a6586f63d2347bf4c8905914aa84ba585ae85f28a914"},
]
importlib-metadata = [
{file = "importlib_metadata-4.11.3-py3-none-any.whl", hash = "sha256:1208431ca90a8cca1a6b8af391bb53c1a2db74e5d1cef6ddced95d4b2062edc6"},
{file = "importlib_metadata-4.11.3.tar.gz", hash = "sha256:ea4c597ebf37142f827b8f39299579e31685c31d3a438b59f469406afd0f2539"},
]
itsdangerous = [
{file = "itsdangerous-2.1.2-py3-none-any.whl", hash = "sha256:2c2349112351b88699d8d4b6b075022c0808887cb7ad10069318a8b0bc88db44"},
{file = "itsdangerous-2.1.2.tar.gz", hash = "sha256:5dbbc68b317e5e42f327f9021763545dc3fc3bfe22e6deb96aaf1fc38874156a"},
@ -597,6 +669,10 @@ jinja2 = [
{file = "Jinja2-3.1.2-py3-none-any.whl", hash = "sha256:6088930bfe239f0e6710546ab9c19c9ef35e29792895fed6e6e31a023a182a61"},
{file = "Jinja2-3.1.2.tar.gz", hash = "sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852"},
]
json-stream = [
{file = "json-stream-1.3.0.tar.gz", hash = "sha256:2790c16bccde6a77640c85e911a44de9c9de648206d9a4474b9ce398cfc7d14c"},
{file = "json_stream-1.3.0-py3-none-any.whl", hash = "sha256:bbb8bc29eed00d53d245224c893831c995bedc18b15fea08ed00c0cb485913ef"},
]
kaitaistruct = [
{file = "kaitaistruct-0.9.tar.gz", hash = "sha256:3d5845817ec8a4d5504379cc11bd570b038850ee49c4580bc0998c8fb1d327ad"},
]
@ -836,6 +912,10 @@ tornado = [
{file = "tornado-6.1-cp39-cp39-win_amd64.whl", hash = "sha256:548430be2740e327b3fe0201abe471f314741efcb0067ec4f2d7dcfb4825f3e4"},
{file = "tornado-6.1.tar.gz", hash = "sha256:33c6e81d7bd55b468d2e793517c909b139960b6c790a60b7991b9b6b76fb9791"},
]
typing-extensions = [
{file = "typing_extensions-4.1.1-py3-none-any.whl", hash = "sha256:21c85e0fe4b9a155d0799430b0ad741cdce7e359660ccbd8b530613e8df88ce2"},
{file = "typing_extensions-4.1.1.tar.gz", hash = "sha256:1a9462dcc3347a79b1f1c0271fbe79e844580bb598bafa1ed208b94da3cdcd42"},
]
urwid = [
{file = "urwid-2.1.2.tar.gz", hash = "sha256:588bee9c1cb208d0906a9f73c613d2bd32c3ed3702012f51efe318a3f2127eae"},
]
@ -847,6 +927,10 @@ wsproto = [
{file = "wsproto-1.1.0-py3-none-any.whl", hash = "sha256:2218cb57952d90b9fca325c0dcfb08c3bda93e8fd8070b0a17f048e2e47a521b"},
{file = "wsproto-1.1.0.tar.gz", hash = "sha256:a2e56bfd5c7cd83c1369d83b5feccd6d37798b74872866e62616e0ecf111bda8"},
]
zipp = [
{file = "zipp-3.8.0-py3-none-any.whl", hash = "sha256:c4f6e5bbf48e74f7a38e7cc5b0480ff42b0ae5178957d564d18932525d5cf099"},
{file = "zipp-3.8.0.tar.gz", hash = "sha256:56bf8aadb83c24db6c4b577e13de374ccfb67da2078beba1d037c17980bf43ad"},
]
zstandard = [
{file = "zstandard-0.17.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a1991cdf2e81e643b53fb8d272931d2bdf5f4e70d56a457e1ef95bde147ae627"},
{file = "zstandard-0.17.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4768449d8d1b0785309ace288e017cc5fa42e11a52bf08c90d9c3eb3a7a73cc6"},

@ -9,6 +9,7 @@ readme = "README.md"
python = "^3.5"
mitmproxy = "^8.0.0"
"ruamel.yaml" = "^0.17.21"
json-stream = "^1.3.0"
[tool.poetry.dev-dependencies]

Loading…
Cancel
Save