commit
60ac5da34c
@ -0,0 +1,72 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
|
ANSI_RGB = "\033[38;2;{};{};{}m"
|
||||||
|
ANSI_RGB_BG = "\033[48;2;{};{};{}m"
|
||||||
|
ANSI_RESET = "\033[0m"
|
||||||
|
|
||||||
|
RAINBOW_COLORS = [
|
||||||
|
(255, 0, 0),
|
||||||
|
(255, 127, 0),
|
||||||
|
(255, 255, 0),
|
||||||
|
(127, 255, 0),
|
||||||
|
(0, 255, 0),
|
||||||
|
(0, 255, 127),
|
||||||
|
(0, 255, 255),
|
||||||
|
(0, 127, 255),
|
||||||
|
(0, 0, 255),
|
||||||
|
(127, 0, 255),
|
||||||
|
(255, 0, 255),
|
||||||
|
(255, 0, 127),
|
||||||
|
]
|
||||||
|
|
||||||
|
def rgb_interpolate(start, end, progress):
|
||||||
|
return tuple(int(start[i] + (end[i] - start[i]) * progress) for i in range(3))
|
||||||
|
|
||||||
|
# take a value from 0 to 1 and return an interpolated color from the rainbow
|
||||||
|
def rainbow_at_position(progress):
|
||||||
|
idx_a = int(progress * float(len(RAINBOW_COLORS) - 1))
|
||||||
|
idx_b = idx_a + 1
|
||||||
|
return rgb_interpolate(RAINBOW_COLORS[idx_a], RAINBOW_COLORS[idx_b], progress * float(len(RAINBOW_COLORS) - 1) - idx_a)
|
||||||
|
|
||||||
|
def print_progress_bar(progress = 0.0):
|
||||||
|
sys.stdout.write("\r")
|
||||||
|
progress_bar_contents = ""
|
||||||
|
PROGRESS_LENGTH = 30
|
||||||
|
full_block = '█'
|
||||||
|
blocks = [ '▉', '▊', '▋', '▌', '▍', '▎', '▏']
|
||||||
|
block_values = [0.875, 0.75, 0.625, 0.5, 0.375, 0.25, 0.125]
|
||||||
|
rainbow_colors = [
|
||||||
|
(255, 0, 0),
|
||||||
|
(255, 127, 0),
|
||||||
|
(255, 255, 0),
|
||||||
|
(127, 255, 0),
|
||||||
|
(0, 255, 0),
|
||||||
|
(0, 255, 127),
|
||||||
|
(0, 255, 255),
|
||||||
|
(0, 127, 255),
|
||||||
|
(0, 0, 255),
|
||||||
|
(127, 0, 255),
|
||||||
|
(255, 0, 255),
|
||||||
|
(255, 0, 127),
|
||||||
|
|
||||||
|
]
|
||||||
|
for i in range(PROGRESS_LENGTH):
|
||||||
|
interpolated = rainbow_at_position(i / PROGRESS_LENGTH)
|
||||||
|
# check if should print a full block
|
||||||
|
if i < int(progress * PROGRESS_LENGTH):
|
||||||
|
interpolated_2nd_half = rainbow_at_position((i + 0.5) / PROGRESS_LENGTH)
|
||||||
|
progress_bar_contents += ANSI_RGB.format(*interpolated)
|
||||||
|
progress_bar_contents += ANSI_RGB_BG.format(*interpolated_2nd_half)
|
||||||
|
progress_bar_contents += "▌"
|
||||||
|
# check if should print a non-full block
|
||||||
|
elif i < int((progress * PROGRESS_LENGTH) + 0.5):
|
||||||
|
progress_bar_contents += ANSI_RESET
|
||||||
|
progress_bar_contents += ANSI_RGB.format(*interpolated)
|
||||||
|
progress_bar_contents += blocks[int((progress * PROGRESS_LENGTH) + 0.5) - i - 1]
|
||||||
|
# otherwise, print a space
|
||||||
|
else:
|
||||||
|
progress_bar_contents += ANSI_RESET
|
||||||
|
progress_bar_contents += ' '
|
||||||
|
progress_bar_contents += ANSI_RESET
|
||||||
|
sys.stdout.write("[{}] {:.1f}%".format(progress_bar_contents, progress * 100))
|
||||||
|
sys.stdout.flush()
|
@ -0,0 +1,80 @@
|
|||||||
|
import os
|
||||||
|
import json_stream
|
||||||
|
from typing import Iterator
|
||||||
|
# a heuristic to determine if a fileis a har archive
|
||||||
|
def har_archive_heuristic(file_path: str) -> int:
|
||||||
|
val = 0
|
||||||
|
# if has the har extension
|
||||||
|
if file_path.endswith('.har'):
|
||||||
|
val += 15
|
||||||
|
# read the first 2048 bytes
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
data = f.read(2048)
|
||||||
|
# if file contains only ascii characters
|
||||||
|
if data.decode('utf-8', 'ignore').isprintable() is True:
|
||||||
|
val += 40
|
||||||
|
# if first character is a '{'
|
||||||
|
if data[0] == '{':
|
||||||
|
val += 15
|
||||||
|
# if it contains the word '"WebInspector"'
|
||||||
|
if b'"WebInspector"' in data:
|
||||||
|
val += 15
|
||||||
|
# if it contains the word '"entries"'
|
||||||
|
if b'"entries"' in data:
|
||||||
|
val += 15
|
||||||
|
# if it contains the word '"version"'
|
||||||
|
if b'"version"' in data:
|
||||||
|
val += 15
|
||||||
|
return val
|
||||||
|
|
||||||
|
class HarFlowWrapper:
|
||||||
|
def __init__(self, flow: dict):
|
||||||
|
self.flow = flow
|
||||||
|
def get_url(self):
|
||||||
|
return self.flow['request']['url']
|
||||||
|
def get_method(self):
|
||||||
|
return self.flow['request']['method']
|
||||||
|
def get_request_headers(self):
|
||||||
|
headers = {}
|
||||||
|
for kv in self.flow['request']['headers']:
|
||||||
|
k = kv['name']
|
||||||
|
v = kv['value']
|
||||||
|
# create list on key if it does not exist
|
||||||
|
headers[k] = headers.get(k, [])
|
||||||
|
headers[k].append(v)
|
||||||
|
def get_request_body(self):
|
||||||
|
if 'request' in self.flow and 'postData' in self.flow['request'] and 'text' in self.flow['request']['postData']:
|
||||||
|
return self.flow['request']['postData']['text']
|
||||||
|
return None
|
||||||
|
def get_response_status_code(self):
|
||||||
|
return self.flow['response']['status']
|
||||||
|
def get_response_reason(self):
|
||||||
|
return self.flow['response']['statusText']
|
||||||
|
def get_response_headers(self):
|
||||||
|
headers = {}
|
||||||
|
for kv in self.flow['response']['headers']:
|
||||||
|
k = kv['name']
|
||||||
|
v = kv['value']
|
||||||
|
# create list on key if it does not exist
|
||||||
|
headers[k] = headers.get(k, [])
|
||||||
|
headers[k].append(v)
|
||||||
|
return headers
|
||||||
|
def get_response_body(self):
|
||||||
|
if 'response' in self.flow and 'content' in self.flow['response'] and 'text' in self.flow['response']['content']:
|
||||||
|
return self.flow['response']['content']['text']
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class HarCaptureReader:
|
||||||
|
def __init__(self, file_path: str, progress_callback=None):
|
||||||
|
self.file_path = file_path
|
||||||
|
self.progress_callback = progress_callback
|
||||||
|
def captured_requests(self) -> Iterator[HarFlowWrapper]:
|
||||||
|
har_file_size = os.path.getsize(self.file_path)
|
||||||
|
with open(self.file_path, 'r') as f:
|
||||||
|
data = json_stream.load(f)
|
||||||
|
for entry in data['log']['entries'].persistent():
|
||||||
|
if self.progress_callback:
|
||||||
|
self.progress_callback(f.tell() / har_file_size)
|
||||||
|
yield HarFlowWrapper(entry)
|
||||||
|
|
@ -0,0 +1,85 @@
|
|||||||
|
|
||||||
|
from tokenize import Number
|
||||||
|
from typing import Iterator
|
||||||
|
from mitmproxy import io as iom, http
|
||||||
|
from mitmproxy.exceptions import FlowReadException
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
def mitmproxy_dump_file_huristic(file_path: str) -> Number:
|
||||||
|
val = 0
|
||||||
|
if 'flow' in file_path:
|
||||||
|
val += 1
|
||||||
|
if 'mitmproxy' in file_path:
|
||||||
|
val += 1
|
||||||
|
# read the first 2048 bytes
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
data = f.read(2048)
|
||||||
|
# if file contains non-ascii characters
|
||||||
|
if data.decode('utf-8', 'ignore').isprintable() is False:
|
||||||
|
val += 50
|
||||||
|
# if first character of the byte array is a digit
|
||||||
|
if str(data[0]).isdigit() is True:
|
||||||
|
val += 5
|
||||||
|
# if it contains the word status_code
|
||||||
|
if b'status_code' in data:
|
||||||
|
val += 5
|
||||||
|
if b'regular' in data:
|
||||||
|
val += 10
|
||||||
|
return val
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class MitmproxyFlowWrapper:
|
||||||
|
def __init__(self, flow: http.HTTPFlow):
|
||||||
|
self.flow = flow
|
||||||
|
def get_url(self):
|
||||||
|
return self.flow.request.url
|
||||||
|
def get_method(self):
|
||||||
|
return self.flow.request.method
|
||||||
|
def get_request_headers(self):
|
||||||
|
headers = {}
|
||||||
|
for k, v in self.flow.request.headers.items(multi = True):\
|
||||||
|
# create list on key if it does not exist
|
||||||
|
headers[k] = headers.get(k, [])
|
||||||
|
headers[k].append(v)
|
||||||
|
return headers
|
||||||
|
def get_request_body(self):
|
||||||
|
return self.flow.request.content
|
||||||
|
def get_response_status_code(self):
|
||||||
|
return self.flow.response.status_code
|
||||||
|
def get_response_reason(self):
|
||||||
|
return self.flow.response.reason
|
||||||
|
def get_response_headers(self):
|
||||||
|
headers = {}
|
||||||
|
for k, v in self.flow.response.headers.items(multi = True):\
|
||||||
|
# create list on key if it does not exist
|
||||||
|
headers[k] = headers.get(k, [])
|
||||||
|
headers[k].append(v)
|
||||||
|
return headers
|
||||||
|
def get_response_body(self):
|
||||||
|
return self.flow.response.content
|
||||||
|
|
||||||
|
|
||||||
|
class MitmproxyCaptureReader:
|
||||||
|
def __init__(self, file_path, progress_callback=None):
|
||||||
|
self.file_path = file_path
|
||||||
|
self.progress_callback = progress_callback
|
||||||
|
|
||||||
|
def captured_requests(self) -> Iterator[MitmproxyFlowWrapper]:
|
||||||
|
with open(self.file_path, 'rb') as logfile:
|
||||||
|
logfile_size = os.path.getsize(self.file_path)
|
||||||
|
freader = iom.FlowReader(logfile)
|
||||||
|
try:
|
||||||
|
for f in freader.stream():
|
||||||
|
if self.progress_callback:
|
||||||
|
self.progress_callback(logfile.tell() / logfile_size)
|
||||||
|
if isinstance(f, http.HTTPFlow):
|
||||||
|
if f.response is None:
|
||||||
|
print("[warn] flow without response: {}".format(f.request.url))
|
||||||
|
continue
|
||||||
|
yield MitmproxyFlowWrapper(f)
|
||||||
|
except FlowReadException as e:
|
||||||
|
print(f"Flow file corrupted: {e}")
|
||||||
|
|
Loading…
Reference in new issue