From cac017ff9b1ba36889f45f0dd5fe5c60a9b6d086 Mon Sep 17 00:00:00 2001 From: alufers Date: Wed, 12 Apr 2023 12:33:54 +0200 Subject: [PATCH] fix: Not matching urls due to a weird flow file This fixes a weir edge case where the mitmproxy library reports the URL to have an IP instead of the hostname --- mitmproxy2swagger/har_capture_reader.py | 12 +++++-- ...oxy_to_swagger.py => mitmproxy2swagger.py} | 5 +-- mitmproxy2swagger/mitmproxy_capture_reader.py | 32 ++++++++++++++++--- 3 files changed, 40 insertions(+), 9 deletions(-) rename mitmproxy2swagger/{mitmproxy_to_swagger.py => mitmproxy2swagger.py} (99%) diff --git a/mitmproxy2swagger/har_capture_reader.py b/mitmproxy2swagger/har_capture_reader.py index b46f9d6..2cf6330 100644 --- a/mitmproxy2swagger/har_capture_reader.py +++ b/mitmproxy2swagger/har_capture_reader.py @@ -1,10 +1,10 @@ import os import json_stream from base64 import b64decode -from typing import Iterator +from typing import Iterator, Union -# a heuristic to determine if a fileis a har archive +# a heuristic to determine if a file is a har archive def har_archive_heuristic(file_path: str) -> int: val = 0 # if has the har extension @@ -37,7 +37,13 @@ class HarFlowWrapper: def get_url(self): return self.flow["request"]["url"] - + def get_matching_url(self, prefix) -> Union[str, None]: + """ + Get the requests URL if the prefix matches the URL, None otherwise + """ + if self.flow["request"]["url"].startswith(prefix): + return self.flow["request"]["url"] + return None def get_method(self): return self.flow["request"]["method"] diff --git a/mitmproxy2swagger/mitmproxy_to_swagger.py b/mitmproxy2swagger/mitmproxy2swagger.py similarity index 99% rename from mitmproxy2swagger/mitmproxy_to_swagger.py rename to mitmproxy2swagger/mitmproxy2swagger.py index bf9cbac..53af975 100644 --- a/mitmproxy2swagger/mitmproxy_to_swagger.py +++ b/mitmproxy2swagger/mitmproxy2swagger.py @@ -165,8 +165,9 @@ def main(): try: for f in capture_reader.captured_requests(): # strip the api prefix from the url - url = f.get_url() - if not url.startswith(args.api_prefix): + url = f.get_matching_url(args.api_prefix) + + if url is None: continue method = f.get_method().lower() path = strip_query_string(url).removeprefix(args.api_prefix) diff --git a/mitmproxy2swagger/mitmproxy_capture_reader.py b/mitmproxy2swagger/mitmproxy_capture_reader.py index 6fb9c31..bfed2b7 100644 --- a/mitmproxy2swagger/mitmproxy_capture_reader.py +++ b/mitmproxy2swagger/mitmproxy_capture_reader.py @@ -2,7 +2,8 @@ from typing import Iterator from mitmproxy import io as iom, http from mitmproxy.exceptions import FlowReadException import os - +import typing +from urllib.parse import urlparse def mitmproxy_dump_file_huristic(file_path: str) -> int: val = 0 @@ -31,13 +32,36 @@ class MitmproxyFlowWrapper: def __init__(self, flow: http.HTTPFlow): self.flow = flow - def get_url(self): + def get_url(self) -> str: return self.flow.request.url - def get_method(self): + def get_matching_url(self, prefix) -> typing.Union[str, None]: + """ + Get the requests URL if the prefix matches the URL, None otherwise + + This takes into account a quirk of mitmproxy where it sometimes puts the + raw IP address in the URL instead of the hostname. Then the hostname is + in the Host header. + """ + if self.flow.request.url.startswith(prefix): + return self.flow.request.url + # All the stuff where the real hostname could be + replacement_hostnames = [ + self.flow.request.headers.get("Host", ""), + self.flow.request.host_header, + self.flow.request.host + ] + for replacement_hostname in replacement_hostnames: + if replacement_hostname is not None and replacement_hostname != "": + fixed_url = urlparse(self.flow.request.url)._replace(netloc=replacement_hostname).geturl() + if fixed_url.startswith(prefix): + return fixed_url + return None + + def get_method(self) -> str: return self.flow.request.method - def get_request_headers(self): + def get_request_headers(self) -> dict: headers = {} for k, v in self.flow.request.headers.items(multi=True): # create list on key if it does not exist