parent
b5a0fb5b88
commit
6cf8578d06
@ -0,0 +1,38 @@
|
||||
import os
|
||||
from tokenize import Number
|
||||
import json_stream
|
||||
# a heuristic to determine if a fileis a har archive
|
||||
def har_archive_heuristic(file_path: str) -> Number:
|
||||
val = 0
|
||||
# if has the har extension
|
||||
if file_path.endswith('.har'):
|
||||
val += 15
|
||||
# read the first 2048 bytes
|
||||
with open(file_path, 'rb') as f:
|
||||
data = f.read(2048)
|
||||
# if file contains only ascii characters
|
||||
if data.decode('utf-8', 'ignore').isprintable() is True:
|
||||
val += 40
|
||||
# if first character is a '{'
|
||||
if data[0] == '{':
|
||||
val += 15
|
||||
# if it contains the word '"WebInspector"'
|
||||
if b'"WebInspector"' in data:
|
||||
val += 15
|
||||
# if it contains the word '"entries"'
|
||||
if b'"entries"' in data:
|
||||
val += 15
|
||||
# if it contains the word '"version"'
|
||||
if b'"version"' in data:
|
||||
val += 15
|
||||
return val
|
||||
|
||||
class HarCaptureReader:
|
||||
def __init__(self, file_path: str):
|
||||
self.file_path = file_path
|
||||
def captured_requests(self) -> Iterator[HarFlowWrapper]:
|
||||
with open(self.file_path, 'r') as f:
|
||||
data = json_stream.load(f)
|
||||
for entry in data['log']['entries']:
|
||||
yield HarFlowWrapper(entry.persistent())
|
||||
|
@ -0,0 +1,78 @@
|
||||
|
||||
from tokenize import Number
|
||||
from typing import Iterator
|
||||
from mitmproxy import io as iom, http
|
||||
from mitmproxy.exceptions import FlowReadException
|
||||
import os
|
||||
|
||||
def mitmproxy_dump_file_huristic(file_path: str) -> Number:
|
||||
val = 0
|
||||
if 'flow' in file_path:
|
||||
val += 1
|
||||
if 'mitmproxy' in file_path:
|
||||
val += 1
|
||||
# read the first 2048 bytes
|
||||
with open(file_path, 'rb') as f:
|
||||
data = f.read(2048)
|
||||
# if file contains non-ascii characters
|
||||
if data.decode('utf-8', 'ignore').isprintable() is False:
|
||||
val += 50
|
||||
# if first character is a digit
|
||||
if data[0].isdigit():
|
||||
val += 5
|
||||
# if it contains the word status_code
|
||||
if b'status_code' in data:
|
||||
val += 5
|
||||
if b'regular' in data:
|
||||
val += 10
|
||||
return val
|
||||
|
||||
|
||||
|
||||
class MitmproxyFlowWrapper:
|
||||
def __init__(self, flow: http.HTTPFlow):
|
||||
self.flow = flow
|
||||
def get_url(self):
|
||||
return self.flow.request.url
|
||||
def get_method(self):
|
||||
return self.flow.request.method
|
||||
def get_request_headers(self):
|
||||
headers = {}
|
||||
for k, v in self.flow.request.headers.items(multi = True):\
|
||||
# create list on key if it does not exist
|
||||
headers[k] = headers.get(k, [])
|
||||
headers[k].append(v)
|
||||
return headers
|
||||
def get_request_body(self):
|
||||
return self.flow.request.content
|
||||
def get_response_status_code(self):
|
||||
return self.flow.response.status_code
|
||||
def get_response_headers(self):
|
||||
headers = {}
|
||||
for k, v in self.flow.response.headers.items(multi = True):\
|
||||
# create list on key if it does not exist
|
||||
headers[k] = headers.get(k, [])
|
||||
headers[k].append(v)
|
||||
return headers
|
||||
def get_response_body(self):
|
||||
return self.flow.response.content
|
||||
|
||||
|
||||
class MitmproxyCaptureReader:
|
||||
def __init__(self, file_path, progress_callback=None):
|
||||
self.file_path = file_path
|
||||
self.progress_callback = progress_callback
|
||||
|
||||
def captured_requests(self) -> Iterator[MitmproxyFlowWrapper]:
|
||||
with open(self.file_path, 'rb') as logfile:
|
||||
logfile_size = os.path.getsize(self.file_path)
|
||||
freader = iom.FlowReader(logfile)
|
||||
try:
|
||||
for f in freader.stream():
|
||||
if self.progress_callback:
|
||||
self.progress_callback(logfile.tell() / logfile_size)
|
||||
if isinstance(f, http.HTTPFlow):
|
||||
yield MitmproxyFlowWrapper(f)
|
||||
except FlowReadException as e:
|
||||
print(f"Flow file corrupted: {e}")
|
||||
|
Loading…
Reference in new issue