add test for generic keys

pull/148/head
Tim Vahlbrock 2 years ago
parent d75efb07c0
commit f4c5404b7f
No known key found for this signature in database

@ -53,6 +53,33 @@ def test_mitmproxy2swagger_generates_swagger_from_mitmproxy_flow_file_with_form_
)
def test_mitmproxy2swagger_generates_swagger_from_mitmproxy_flow_file_with_generic_keys():
data = mitmproxy2swagger_e2e_test(
"testdata/generic_keys_flows",
"http://localhost:8082/",
[
"--format",
"flow",
],
)
assert data is not None
assert (
get_nested_key(
data,
"paths./.post.responses.200.content.application/json.schema.properties",
)
is None
)
assert (
get_nested_key(
data,
"paths./.post.responses.200.content.application/json.schema.additionalProperties",
)
is not None
)
def test_mitmproxy2swagger_generates_headers_for_flow_files():
data = mitmproxy2swagger_e2e_test(
"testdata/form_data_flows",

@ -0,0 +1 @@
2333:9:websocket;0:~8:response;539:6:reason;2:OK,11:status_code;3:200#13:timestamp_end;17:1709845739.967148^15:timestamp_start;17:1709845739.966887^8:trailers;0:~7:content;200:{"1234": {"lorem": "ipsum", "dolor": "sit", "amet": "consectetur"}, "5678": {"lorem": "ipsum", "dolor": "sit", "amet": "consectetur"}, "789": {"lorem": "ipsum", "dolor": "sit", "amet": "consectetur"}},7:headers;155:39:6:Server,26:BaseHTTP/0.6 Python/3.11.7,]40:4:Date,29:Thu, 07 Mar 2024 21:08:59 GMT,]36:12:Content-type,16:application/json,]24:14:Content-length,3:200,]]12:http_version;8:HTTP/1.0,}7:request;607:4:path;1:/,9:authority;0:,6:scheme;4:http,6:method;4:POST,4:port;4:8082#4:host;9:localhost;13:timestamp_end;18:1709845739.9647229^15:timestamp_start;17:1709845739.964001^8:trailers;0:~7:content;134:{"1234": {"lorem": "ipsum", "dolor": "sit", "amet": "consectetur"}, "5678": {"lorem": "ipsum", "dolor": "sit", "amet": "consectetur"}},7:headers;232:25:4:Host,14:localhost:8082,]40:10:User-Agent,22:python-requests/2.31.0,]36:15:Accept-Encoding,13:gzip, deflate,]15:6:Accept,3:*/*,]28:10:Connection,10:keep-alive,]36:12:Content-Type,16:application/json,]24:14:Content-Length,3:134,]]12:http_version;8:HTTP/1.1,}6:backup;0:~17:timestamp_created;17:1709845739.964161^7:comment;0:;8:metadata;0:}6:marked;0:;9:is_replay;0:~11:intercepted;5:false!11:server_conn;464:3:via;0:~19:timestamp_tcp_setup;17:1709845739.966096^7:address;19:9:localhost;4:8082#]19:timestamp_tls_setup;0:~13:timestamp_end;17:1709845739.967722^15:timestamp_start;16:1709845739.96509^3:sni;0:~11:tls_version;0:~11:cipher_list;0:]6:cipher;0:~11:alpn_offers;0:]4:alpn;0:~16:certificate_list;0:]3:tls;5:false!5:error;0:~18:transport_protocol;3:tcp;2:id;36:f0fd10d5-8f3b-4687-81c0-c1dd5b765a1c;8:sockname;20:9:127.0.0.1;5:59865#]8:peername;19:9:127.0.0.1;4:8082#]}11:client_conn;421:10:proxy_mode;7:regular;8:mitmcert;0:~19:timestamp_tls_setup;0:~13:timestamp_end;17:1709845739.967637^15:timestamp_start;17:1709845739.962744^3:sni;0:~11:tls_version;0:~11:cipher_list;0:]6:cipher;0:~11:alpn_offers;0:]4:alpn;0:~16:certificate_list;0:]3:tls;5:false!5:error;0:~18:transport_protocol;3:tcp;2:id;36:91886e5c-85ef-4509-b320-7c9345ce3c45;8:sockname;21:3:::1;4:8080#1:0#1:0#]8:peername;22:3:::1;5:59863#1:0#1:0#]}5:error;0:~2:id;36:960bd3ff-4193-46f6-914f-6ea1d66336d1;4:type;4:http;7:version;2:20#}

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
import json
import requests # type: ignore
# Sample data
data = {
"1234": {
"lorem": "ipsum",
"dolor": "sit",
"amet": "consectetur",
},
"5678": {
"lorem": "ipsum",
"dolor": "sit",
"amet": "consectetur",
},
}
url = "http://localhost:8082"
headers = {"Content-Type": "application/json"}
response = requests.post(
url,
data=json.dumps(data),
headers=headers,
proxies={"http": "http://localhost:8080", "https": "http://localhost:8080"},
)
# Print the response
print(response.status_code)
print(response.headers)
# convert the response data from MessagePack to JSON
data = json.loads(response.content)
print(data)

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import http.server
import json
import socketserver
class GenericKeysHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers["Content-Length"])
raw_data = self.rfile.read(content_length)
try:
# Decode received data
print(raw_data)
data = json.loads(raw_data)
data["789"] = {
"lorem": "ipsum",
"dolor": "sit",
"amet": "consectetur",
}
# Encode the modified data
modified_data = bytes(json.dumps(data), "utf-8")
# Send the response
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Content-length", len(modified_data))
self.end_headers()
self.wfile.write(modified_data)
except Exception as e:
print(f"Error processing request: {str(e)}")
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(f"Error processing request: {str(e)}".encode())
if __name__ == "__main__":
PORT = 8082
with socketserver.TCPServer(("", PORT), GenericKeysHandler) as httpd:
print(f"Serving on port {PORT}")
httpd.serve_forever()
Loading…
Cancel
Save