From a9c5966109299d56af755ef89949008d5385739b Mon Sep 17 00:00:00 2001 From: alufers Date: Thu, 12 May 2022 02:08:56 +0200 Subject: [PATCH] add a binary to pypi --- dist/mitmproxy2swagger-0.1.0-py3-none-any.whl | Bin 0 -> 3875 bytes dist/mitmproxy2swagger-0.1.0.tar.gz | Bin 0 -> 3650 bytes mitmproxy2swagger.py | 250 ------------------ mitmproxy2swagger/__init__.py | 0 mitmproxy2swagger/mitmproxy2swagger.py | 250 ++++++++++++++++++ mitmproxy2swagger/swagger_util/__init__.py | 0 .../swagger_util/swagger_util.py | 0 pyproject.toml | 5 +- 8 files changed, 254 insertions(+), 251 deletions(-) create mode 100644 dist/mitmproxy2swagger-0.1.0-py3-none-any.whl create mode 100644 dist/mitmproxy2swagger-0.1.0.tar.gz delete mode 100755 mitmproxy2swagger.py create mode 100644 mitmproxy2swagger/__init__.py create mode 100755 mitmproxy2swagger/mitmproxy2swagger.py create mode 100644 mitmproxy2swagger/swagger_util/__init__.py rename swagger_util.py => mitmproxy2swagger/swagger_util/swagger_util.py (100%) diff --git a/dist/mitmproxy2swagger-0.1.0-py3-none-any.whl b/dist/mitmproxy2swagger-0.1.0-py3-none-any.whl new file mode 100644 index 0000000000000000000000000000000000000000..88991053537254e80c115966a2145e0dce212bf3 GIT binary patch literal 3875 zcma)92T&8*whdK^^iTpwQ>06%(t8g`2T_V3BtYmQG4v{2q&MkJx`0S%7MchW2sI#0 zNVH;6ej#;IqT(V$VuSPIg;L_4A-7uyLdtwpfg-+o{9>cdZt zVb_nkq{TNbnwZiE*4wK|+}aMHgULH|yLlx$gC=gdsGyX_|ThHTK@Ao2aT z#F|0@BJq*)KIB@+P<9B;#`weHEb$RH^o;Qr&sgkuNFV9h+u{*9EM&8PCsZ@mzl_n1lbFh^unY^YtrVfu9YN4 zh>_JhgUT;bESY;zpmip^LPyT8)8m)V!v-zmW3n;6+hzaeTj1ev1LHvf zfB0SN2M#xWWa!CYf0FO1kUtZmIgaos=P$c6Zkk$6^CGw^sNN_vtqbAaVRX2bbR%s? z8z=vSdA+_?n!39>C5K*lVy!N1>+u-IA;|JeL8sF*_ycwgben<^Sp9XOwQXieB2rgT zgf2DWUbLbOPdo#5=}M!zWnh1w`7Z!t18hMCOiz?ZC+MH>1>Nro!d#PgaCxV3pM@C4 zQQ5A5&2yX2nGqU#Gb0j!NT%pH1JM?^(F$G1rUb>b*RH68rSNg?HjV1jHQk0p zqN8!aEa%4*h6}@Nb#9_M9B~Ub0&IW_I9k?_3%s|Z5{-Yd+M-yfVrM8LcT`Y<4|?Sx z4}oG2_a#!V9?|IB?M+k$iHxm1uI!38auYslTG>lk4tm2i&{3ztF-92&D*il2{}vK^ z*Yq*(1Mdr!1Xk8N-SHa-*Hz1$G9#(WEBOBKdmWuf+eg@ZWTlq*fFkO}hwNwir0&WT zoh;yIueIUOs3S>PX?!N<&r)qp|7OQjv04;OUcmJ)ue>o&?JVH_@;#_C zz04!JD_O#sDM_DgSM;t#;Gq1~_pazwp_!~bGYS}5P@FvCYn`n;Fc9r+th;Sws zS_z*YtWovV68R<~8U2jvq z$_4FsvKCL(GK#9a8K?Bn;xQ!up~5c4zNyD`^+@z3qcE>e9Zsv&#FaWymNR_Tl_nvg zE|61qo%*N zv#>KHgvO`V=BZcAOfLjirO*uytgH%hJ(YNHM$`&uH&J-m3jmxpP|mI(Z!fmdIKGD^*p!a^$r5(>t%#>t?r zQp~9B>6pcw3s$UX0&TYz?--7K#O%>v@^i~~*QOg?88m1jnz;^tzLg31`KEeLlgee> z;#SY60}+20cJ7Spz91{fiH7~UwMk~(I6h)`^Qx-(9_hs(kBDSX=h2${6)v9iu=l5g z4eqseEOtkEqH4z|e4RZ@N0)6e>XXULw3|*`pwK&)rr6F3q=ART<*c5M- zoAnuNQB85v%1yzql&jk7yHrve7Fr2Y!&-c9gMMj>-(7o&9IM(5l(1qWqU zzaKHCi?wh*!TS9~-i0`%Z0GSiupi6vozr5}Zv6Zqfya;v>r}%V@!A#BTF2h4kZZD{=Z zxU;o}f07X9MtZ7ioV-?Y?L3SjB+~3Wu03$9`Yul}&RJ$COa9Zx{ZuOtG$HV?XG8kx zYEseML*Hg(&(j>#t9)eOks6|MimSa_wbGAz% zhEsYWjV(2*yj~|xg$-T$DrUw!pI!Km`2)O9Hrd=d0SH!0?C^F|G7)*Y4z^U+_wj0d zLyO^JJpX(FdvH_V78li=LgiPcF2q;HQpAd1V{S4J=e+%0Uu1V;f=xuoXOjE-X^i(& zf|hEkJlBh!ydHP%I{#sbDzHOL4h*kH<^U;~E+;T8U(FK{w|89i!BF+i? zW|s}H`DFFV=)yc+^8U#v}E7F zuM*1FZ{jWehyedhOCTbkXyB5LArSx|{EuiEX-WS2UmT&~L zl5LD;#d9?S$+n5Gz7HDTL7`Qr+e2Aqy?K?vJ6}W6mCXAPn7i$%Wuu7UmeFiwN%=wE zeV((a=?bPmW2br(8kz}wGx$ZqaPDcEYM?L6LwVsx)gn$;cW37d$k|GfB_oW{W{=sJ z&0c?p5*aBwh+FRQIvL>D^m=K>gVgb($kXTo@o#8_??r?Pk6j1*kkDV;nggBnO}FQU zsN6@5wkp7<5QDK+Ew7fBSp-#HI2=* z5{KReM1lc;(Rpu5>9k`1Z(TyBB6O5=B=dZG{qp=gTENbZ^k?a+KHef0b9D0|vV9z{ zHvC3UK?yXsF_YGdWpeY|v6yx*w9oOIlaKvkL$gohU-+_M1Xh_{Z(9x=@RZ#?`qXXBuYiW5 zD<%*kVj#)?KgnH6^Y8UN;?Lvn=DOby{!a<~l@9=v5m8eA4Z`0S#P6WL8~8t<#s3KO mf9(7_?(a7E4{qbte?s;zQZE$jLczH5&cVTB|WpXVrE-@}JE_7jX0PP!VbK5pD zpZP0L<~k%3iI(J7qKVR`X>;ve;(C(wZf+EpLyM5ihOYonY@Nsd{dNKHP1;VhaySwE(wvR`X@%Rq8xbqpFH5Wbu z@^`+>XL3kl5yo^jKG>h^jt(F0J#_bW4-WPw#+~i+*MH(r#2HI3HWPmCuU3=|NAB1i z?X=q6Z1XAhf4INjWB;IalY=|sy~*w&I06YbKA7z9-XWu{{r}<5-LH1mob8;3$qr54 zlWZf-(!?-L(|DRD?$W= z+aT}+lrLB)xt&=bAy~`Ngj_!eJYQ2#xUFGm4dDM_m}F}KuV*yMX4dO7N@PnH5rAU! z-8y76n7w9edM)}?p-QZ(NRA65A1g&$`TUBv5}gqmKDUpEC-EqIW4ANl9C)C4SVSr%1X&IzSQRmm*U7?+p3Pt^+zUig`P zSxm#k$%3w!Q)2GABX?w$D{SJTK*wcm%}o)Ch(a@wB2t!hpP1xD5h!0NR?NDxm|*K1 z))z^QC8?tON7`RR;qsZrHMEXNfQ*EgyE_{3HTX~;qi_0WF+u8xG zLfIv$AQmVUGGn2J=5k9}{yH!EPdY_h?VpkP2`~+&q*PsYkWkuskW!vl+U9fGezLI@ z)ZALL2vM|{20dT}s-%Og)5i}Wf%Ock3k*a+;tdLwMXq>cTp9Eo0bTm*NN5(!d~I^+ zWxhD;m!0bElU&|ABbQf*(7G66(@;=w8NL9wBWd*DZUOg^=>U`~Wni;cG^<9GHn!Jf ztr^vJTN{-N8Cy|f{fMVh1LT-ePw`)q5m*T3Z+d24nYEt;H%%MmR8GBf677Md1G{cc zFsW+-6u5f^s-D6GeSx#wkcFR=-QXy5w^M|Gs<#EyUe`Jp1}L89EA<#%(F?_oV)`zG zF#umE!OFSj2$bLUi5yd`m<%A|%+k=cJ{mtJoN!h1_MUbyr?g*=s!*g?2}KnLvu^4^aiOHhPCQK2ojNdzi2 z2*2jgi?$`l7!CVUYp9l&QH(_ytiRo{!cJ#eL7IwP*BYEqBVfyDgz@@R1?(tYp~@+q z^@qp9|59Kn<*t;`iVAG0AW+;anatJ!5^{1kPqrbE)p)>6&R$m$nwk?+DCgAPI?#~g>G z%eI?_eUFT5zIJ42AQzp2D_w|$$rJi6-w+N+htomZq6utm7&O_PUa3dp!etjeSI({K zk~>$Xll8C{{uGSHqXwnb`f>DcI(~5bED@#9y0T0|5cZNV}6VtqsA?G9we?h(bM&C)~)msVT<|qGU`_5Pg$C|k|8XSCFxLsd#Qour;}A34s=>Hw;+eWTo3De)f~HK>uIp@ zAWiD?Wy+~yy+Wj_@F9I94#P}F7VNsl)bfgJh#~@6rr>rN!vwt$-G;0S8FExWF$OUA zAFqBs`d+?QY8u^skn=t_Rj}0B!u0tQNTN!AaoL|Kq8@8_YkC8q1O{&wg$r49+QEt5 zYLv27S<-5ns!W0$cR7w$diiPL~8aREN9KWD(p8eBs~5V695&2~4RnSxAe3+`#x`Gygaf zzomKCWaIp<7JOxVnoT#?;qzJaErjxW#Oiq^V*&GpomD?uY9HeaT2j#|UtY6s9;cxx zSM*$Oi%o=6<@!a+*g6w50HvZ=CZOx6ZMi&U!j?5PtP%1x%#y>H#?}bBPkj!{5=9%x zDR@Q~g5ySx%OptGlsE`)r7CN zp7b9XxIZJ~S}%=wM|9|-rN2w$g>iVgdIDeIZ%fPCgH@tEN%Ng17D^`_2_n(YVRuA(Z+`IYi9z@XD8O7u8g zk>*EOF%jqCLY|!}d~%GA9vsU$xA7-{@0vr1+KJ1p#e5CI*6D?wE{&*~qaR_NbwgG( zkq#{|Za69`i~6@bSSdVU{rIW~XD1c5g0%|3po-^b7)U^i@vKeya)viVuui``1Os`O zAN5g+X&MAAU24Pz57%raU-2-%TK%7^L&OW^oZjWtSX(y$GU>n+4*SA!PHvD0a?=k` zp1t=Bzy3`66@6#lw8qYL6eMB{>7J}Gdzys4xbd=7er+3hxft{`R|yC^#etWK7$GRm zFE5|JNuGxZER{U%@);OIl;TYn?O#Ni5#hg;i@2ZxO%`+@J7mQWmlU$*6{8e37V>;M zuYT6f#QH8CoYyWMNb%v_m3%D*M3?FBLi((Z2=u3PD){nCqJ>c{%P!LhvKVA8Qgf@w zLD@S~`IU4PxmeyL=1tUoFHv=nGOKML<3JRf1v+RLlImHU#IK9%-@_K?EBM^ri~Ld^ z_y6Ml_g?${_hftj`?K8teo(ppJ$Z0&_z>@ZPacdWhg&c31>gV9HqiPT?8aS@#?hzS z|H0l~ul?_jcjf)>gTsT#{`e5=e}4?!+xy?2;hBpxjod7yf^AL=y=^vIT45Uc8YA{f z4Ka;M9TQtk!(WTDl*u}R=|}4&)@2^)mv7HAH;W#dupy|DbOX7Og`6nFn=R`4Ib!>D z*Bt|m^6oOWJy7>2%}QEr$`0V@(4CCU7V`JQIss#Tx(=gY$TzUlj86=GMOg8yMhzhg zq>pFnuI7oM;7|RTM$NScHUk;g|+hLC{0#g zy${cQb+5{B5MHM>R;|O6KYOcmtE?vnf=+o%L|Gkc*Pnwd6J2s9WtD2IQZWslS zR1)Kry1P9k#uJ(UOo`5W^!HGvGexV)hK!dw=NKN#9Aru=iHGqt@;MKeAq8aeS3iCn z;}O|&KUuB)6(v(>S5+c-+x<{@pi|QSKIEb@OJdiPQl|-7$xsotk)v)CqwTYOw$Jw2 UKHF#e{5hWg0a+>kX#h|F0KtM3(f|Me literal 0 HcmV?d00001 diff --git a/mitmproxy2swagger.py b/mitmproxy2swagger.py deleted file mode 100755 index 7c6157c..0000000 --- a/mitmproxy2swagger.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python -""" -Converts a mitmproxy dump file to a swagger schema. -""" -from email import header -from mitmproxy import io as iom, http -from mitmproxy.exceptions import FlowReadException -import pprint -import sys -import io -import json -import os -import argparse -import ruamel.yaml -import re -import swagger_util - - -parser = argparse.ArgumentParser( - description='Converts a mitmproxy dump file to a swagger schema.') -parser.add_argument( - '-i', '--input', help='The input mitmproxy dump file', required=True) -parser.add_argument( - '-o', '--output', help='The output swagger schema file (yaml). If it exists, new endpoints will be added', required=True) -parser.add_argument('-p', '--api-prefix', help='The api prefix', required=True) -parser.add_argument('-e', '--examples', action='store_true', - help='Include examples in the schema. This might expose sensitive information.') -args = parser.parse_args() - -yaml = ruamel.yaml.YAML() - -swagger = None - -# try loading the existing swagger file -try: - with open(args.output, 'r') as f: - swagger = yaml.load(f) -except FileNotFoundError: - print("No existing swagger file found. Creating new one.") - pass -if swagger is None: - swagger = ruamel.yaml.comments.CommentedMap({ - "openapi": "3.0.0", - "info": { - "title": args.input + " Mitmproxy2Swagger", - "version": "1.0.0" - }, - }) -# strip the trailing slash from the api prefix -args.api_prefix = args.api_prefix.rstrip('/') - -if not 'servers' in swagger or swagger['servers'] is None: - swagger['servers'] = [] -# add the server if it doesn't exist -if not any(server['url'] == args.api_prefix for server in swagger['servers']): - swagger['servers'].append({ - "url": args.api_prefix, - "description": "The default server" - }) -if not 'paths' in swagger or swagger['paths'] is None: - swagger['paths'] = {} -if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None: - swagger['x-path-templates'] = [] - -path_templates = [] -for path in swagger['paths']: - path_templates.append(path) -# also add paths from the the x-path-templates array -if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None: - for path in swagger['x-path-templates']: - path_templates.append(path) - -# new endpoints will be added here so that they can be added as comments in the swagger file -new_path_templates = [] - - -def path_to_regex(path): - # replace the path template with a regex - path = path.replace('{', '(?P<') - path = path.replace('}', '>[^/]+)') - path = path.replace('*', '.*') - path = path.replace('/', '\/') - return "^" + path + "$" - - -def strip_query_string(path): - # remove the query string from the path - return path.split('?')[0] - - -def set_key_if_not_exists(dict, key, value): - if key not in dict: - dict[key] = value - - -path_template_regexes = [re.compile(path_to_regex(path)) - for path in path_templates] - - -with open(args.input, 'rb') as logfile: - logfile_size = os.path.getsize(args.input) - freader = iom.FlowReader(logfile) - - pp = pprint.PrettyPrinter(indent=4) - try: - for f in freader.stream(): - sys.stdout.write("Progress {0:.2f}%%\r".format( - (logfile.tell() / logfile_size * 100))) - # print(f) - if isinstance(f, http.HTTPFlow): - if not f.request.url.startswith(args.api_prefix): - continue - # strip the api prefix from the url - url = f.request.url[len(args.api_prefix):] - method = f.request.method.lower() - path = strip_query_string(url) - if f.response is None: - print("[WARN] No response for " + url) - continue - status = f.response.status_code - - # check if the path matches any of the path templates, and save the index - path_template_index = None - for i, path_template_regex in enumerate(path_template_regexes): - if path_template_regex.match(path): - path_template_index = i - break - if path_template_index is None: - if path in new_path_templates: - continue - new_path_templates.append(path) - continue - - path_template_to_set = path_templates[path_template_index] - set_key_if_not_exists( - swagger['paths'], path_template_to_set, {}) - - - set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { - 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), - - 'responses': {} - }) - params = swagger_util.url_to_params(url, path_template_to_set) - if params is not None and len(params) > 0: - set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) - if method not in ['get', 'head']: - body_val = None - # try to parse the body as json - try: - body_val = json.loads(f.request.text) - except json.decoder.JSONDecodeError: - pass - if body_val is not None: - content_to_set = { - 'content': { - 'application/json': { - 'schema': swagger_util.value_to_schema(body_val) - } - } - } - if args.examples: - content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( - body_val) - set_key_if_not_exists( - swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) - # try parsing the response as json - try: - response_json = json.loads(f.response.text) - except json.decoder.JSONDecodeError: - response_json = None - if response_json is not None: - resp_data_to_set = { - 'description': f.response.reason, - 'content': { - 'application/json': { - 'schema': swagger_util.value_to_schema(response_json) - } - } - } - if args.examples: - resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( - response_json) - set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str( - status), resp_data_to_set) - - except FlowReadException as e: - print(f"Flow file corrupted: {e}") - - -new_path_templates.sort() - -# add suggested path templates -# basically inspects urls and replaces segments containing only numbers with a parameter -new_path_templates_with_suggestions = [] -for idx, path in enumerate(new_path_templates): - # check if path contains number-only segments - segments = path.split('/') - if any(segment.isdigit() for segment in segments): - # replace digit segments with {id}, {id1}, {id2} etc - new_segments = [] - param_id = 0 - for segment in segments: - if segment.isdigit(): - param_name = 'id' + str(param_id) - if param_id == 0: - param_name = 'id' - new_segments.append('{' + param_name + '}') - param_id += 1 - else: - new_segments.append(segment) - suggested_path = '/'.join(new_segments) - # prepend the suggested path to the new_path_templates list - if suggested_path not in new_path_templates_with_suggestions: - new_path_templates_with_suggestions.append( - "ignore:" + suggested_path) - new_path_templates_with_suggestions.append("ignore:" + path) - -# remove the ending comments not to add them twice - -# append the contents of new_path_templates_with_suggestions to swagger['x-path-templates'] -for path in new_path_templates_with_suggestions: - swagger['x-path-templates'].append(path) - -# remove elements already generated -swagger['x-path-templates'] = [ - path for path in swagger['x-path-templates'] if path not in swagger['paths']] - -# remove duplicates while preserving order -def f7(seq): - seen = set() - seen_add = seen.add - return [x for x in seq if not (x in seen or seen_add(x))] -swagger['x-path-templates'] = f7(swagger['x-path-templates']) - -swagger['x-path-templates'] = ruamel.yaml.comments.CommentedSeq( - swagger['x-path-templates']) -swagger['x-path-templates'].yaml_set_start_comment( - 'Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy') -# save the swagger file -with open(args.output, 'w') as f: - yaml.dump(swagger, f) - # f.write( - # " # Uncomment any of the following lines to generate the corresponding endpoint\n") - # f.write( - # " # Lines that are closer to the top take precedence, the matching is greedy\n") - # f.write("\n") - # for path in new_path_templates_with_suggestions: - # f.write(" #- " + path + "\n") -print("Done!") diff --git a/mitmproxy2swagger/__init__.py b/mitmproxy2swagger/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mitmproxy2swagger/mitmproxy2swagger.py b/mitmproxy2swagger/mitmproxy2swagger.py new file mode 100755 index 0000000..1b1d6cf --- /dev/null +++ b/mitmproxy2swagger/mitmproxy2swagger.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python +""" +Converts a mitmproxy dump file to a swagger schema. +""" +from email import header +from mitmproxy import io as iom, http +from mitmproxy.exceptions import FlowReadException +import pprint +import sys +import io +import json +import os +import argparse +import ruamel.yaml +import re +import mitmproxy2swagger.swagger_util.swagger_util as swagger_util + + + +def path_to_regex(path): + # replace the path template with a regex + path = path.replace('{', '(?P<') + path = path.replace('}', '>[^/]+)') + path = path.replace('*', '.*') + path = path.replace('/', '\/') + return "^" + path + "$" + + +def strip_query_string(path): + # remove the query string from the path + return path.split('?')[0] + + +def set_key_if_not_exists(dict, key, value): + if key not in dict: + dict[key] = value + +def main(): + + parser = argparse.ArgumentParser( + description='Converts a mitmproxy dump file to a swagger schema.') + parser.add_argument( + '-i', '--input', help='The input mitmproxy dump file', required=True) + parser.add_argument( + '-o', '--output', help='The output swagger schema file (yaml). If it exists, new endpoints will be added', required=True) + parser.add_argument('-p', '--api-prefix', help='The api prefix', required=True) + parser.add_argument('-e', '--examples', action='store_true', + help='Include examples in the schema. This might expose sensitive information.') + args = parser.parse_args() + + yaml = ruamel.yaml.YAML() + + swagger = None + + # try loading the existing swagger file + try: + with open(args.output, 'r') as f: + swagger = yaml.load(f) + except FileNotFoundError: + print("No existing swagger file found. Creating new one.") + pass + if swagger is None: + swagger = ruamel.yaml.comments.CommentedMap({ + "openapi": "3.0.0", + "info": { + "title": args.input + " Mitmproxy2Swagger", + "version": "1.0.0" + }, + }) + # strip the trailing slash from the api prefix + args.api_prefix = args.api_prefix.rstrip('/') + + if not 'servers' in swagger or swagger['servers'] is None: + swagger['servers'] = [] + # add the server if it doesn't exist + if not any(server['url'] == args.api_prefix for server in swagger['servers']): + swagger['servers'].append({ + "url": args.api_prefix, + "description": "The default server" + }) + if not 'paths' in swagger or swagger['paths'] is None: + swagger['paths'] = {} + if 'x-path-templates' not in swagger or swagger['x-path-templates'] is None: + swagger['x-path-templates'] = [] + + path_templates = [] + for path in swagger['paths']: + path_templates.append(path) + # also add paths from the the x-path-templates array + if 'x-path-templates' in swagger and swagger['x-path-templates'] is not None: + for path in swagger['x-path-templates']: + path_templates.append(path) + + # new endpoints will be added here so that they can be added as comments in the swagger file + new_path_templates = [] + + + + + + path_template_regexes = [re.compile(path_to_regex(path)) + for path in path_templates] + + + with open(args.input, 'rb') as logfile: + logfile_size = os.path.getsize(args.input) + freader = iom.FlowReader(logfile) + + pp = pprint.PrettyPrinter(indent=4) + try: + for f in freader.stream(): + sys.stdout.write("Progress {0:.2f}%%\r".format( + (logfile.tell() / logfile_size * 100))) + # print(f) + if isinstance(f, http.HTTPFlow): + if not f.request.url.startswith(args.api_prefix): + continue + # strip the api prefix from the url + url = f.request.url[len(args.api_prefix):] + method = f.request.method.lower() + path = strip_query_string(url) + if f.response is None: + print("[WARN] No response for " + url) + continue + status = f.response.status_code + + # check if the path matches any of the path templates, and save the index + path_template_index = None + for i, path_template_regex in enumerate(path_template_regexes): + if path_template_regex.match(path): + path_template_index = i + break + if path_template_index is None: + if path in new_path_templates: + continue + new_path_templates.append(path) + continue + + path_template_to_set = path_templates[path_template_index] + set_key_if_not_exists( + swagger['paths'], path_template_to_set, {}) + + + set_key_if_not_exists(swagger['paths'][path_template_to_set], method, { + 'summary': swagger_util.path_template_to_endpoint_name(method, path_template_to_set), + + 'responses': {} + }) + params = swagger_util.url_to_params(url, path_template_to_set) + if params is not None and len(params) > 0: + set_key_if_not_exists(swagger['paths'][path_template_to_set][method], 'parameters', params) + if method not in ['get', 'head']: + body_val = None + # try to parse the body as json + try: + body_val = json.loads(f.request.text) + except json.decoder.JSONDecodeError: + pass + if body_val is not None: + content_to_set = { + 'content': { + 'application/json': { + 'schema': swagger_util.value_to_schema(body_val) + } + } + } + if args.examples: + content_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( + body_val) + set_key_if_not_exists( + swagger['paths'][path_template_to_set][method], 'requestBody', content_to_set) + # try parsing the response as json + try: + response_json = json.loads(f.response.text) + except json.decoder.JSONDecodeError: + response_json = None + if response_json is not None: + resp_data_to_set = { + 'description': f.response.reason, + 'content': { + 'application/json': { + 'schema': swagger_util.value_to_schema(response_json) + } + } + } + if args.examples: + resp_data_to_set['content']['application/json']['example'] = swagger_util.limit_example_size( + response_json) + set_key_if_not_exists(swagger['paths'][path_template_to_set][method]['responses'], str( + status), resp_data_to_set) + + except FlowReadException as e: + print(f"Flow file corrupted: {e}") + + + new_path_templates.sort() + + # add suggested path templates + # basically inspects urls and replaces segments containing only numbers with a parameter + new_path_templates_with_suggestions = [] + for idx, path in enumerate(new_path_templates): + # check if path contains number-only segments + segments = path.split('/') + if any(segment.isdigit() for segment in segments): + # replace digit segments with {id}, {id1}, {id2} etc + new_segments = [] + param_id = 0 + for segment in segments: + if segment.isdigit(): + param_name = 'id' + str(param_id) + if param_id == 0: + param_name = 'id' + new_segments.append('{' + param_name + '}') + param_id += 1 + else: + new_segments.append(segment) + suggested_path = '/'.join(new_segments) + # prepend the suggested path to the new_path_templates list + if suggested_path not in new_path_templates_with_suggestions: + new_path_templates_with_suggestions.append( + "ignore:" + suggested_path) + new_path_templates_with_suggestions.append("ignore:" + path) + + # remove the ending comments not to add them twice + + # append the contents of new_path_templates_with_suggestions to swagger['x-path-templates'] + for path in new_path_templates_with_suggestions: + swagger['x-path-templates'].append(path) + + # remove elements already generated + swagger['x-path-templates'] = [ + path for path in swagger['x-path-templates'] if path not in swagger['paths']] + + # remove duplicates while preserving order + def f7(seq): + seen = set() + seen_add = seen.add + return [x for x in seq if not (x in seen or seen_add(x))] + swagger['x-path-templates'] = f7(swagger['x-path-templates']) + + swagger['x-path-templates'] = ruamel.yaml.comments.CommentedSeq( + swagger['x-path-templates']) + swagger['x-path-templates'].yaml_set_start_comment( + 'Remove the ignore: prefix to generate an endpoint with its URL\nLines that are closer to the top take precedence, the matching is greedy') + # save the swagger file + with open(args.output, 'w') as f: + yaml.dump(swagger, f) + print("Done!") +if __name__ == "__main__": + main() diff --git a/mitmproxy2swagger/swagger_util/__init__.py b/mitmproxy2swagger/swagger_util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/swagger_util.py b/mitmproxy2swagger/swagger_util/swagger_util.py similarity index 100% rename from swagger_util.py rename to mitmproxy2swagger/swagger_util/swagger_util.py diff --git a/pyproject.toml b/pyproject.toml index 410d11f..88d757d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mitmproxy2swagger" -version = "0.1.0" +version = "0.2.0" description = "" authors = ["alufers "] @@ -14,3 +14,6 @@ mitmproxy = "^8.0.0" [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +mitmproxy2swagger = 'mitmproxy2swagger.mitmproxy2swagger:main'