From 161f21ae4a29ed362bcc671d239cf3acb04b9876 Mon Sep 17 00:00:00 2001 From: angelozerr Date: Mon, 12 Apr 2021 21:48:04 +0200 Subject: [PATCH] Validation support for properties kafka file Fixes #152 Signed-off-by: azerr --- docs/Consuming.md | 8 + docs/Producing.md | 8 + ...-file-consumer-topic-syntax-validation.png | Bin 0 -> 10540 bytes ...a-file-producer-empty-value-validation.png | Bin 0 -> 13004 bytes src/commands/consumers.ts | 23 +- src/commands/producers.ts | 79 ++-- src/kafka-file/kafkaFileClient.ts | 85 +++- .../kafkaFileLanguageService.ts | 23 +- src/kafka-file/languageservice/model.ts | 46 +- .../languageservice/parser/kafkaFileParser.ts | 49 ++- .../languageservice/services/completion.ts | 22 +- .../languageservice/services/diagnostics.ts | 145 +++++++ src/kafka-file/utils/async.ts | 185 ++++++++ .../completionProperties.test.ts | 30 +- .../diagnosticsProperties.test.ts | 402 ++++++++++++++++++ .../kafka-file/languageservice/kafkaAssert.ts | 17 +- src/validators/commons.ts | 53 +++ src/validators/consumer.ts | 59 +++ src/validators/producer.ts | 41 ++ src/wizards/validators.ts | 64 +-- 20 files changed, 1172 insertions(+), 167 deletions(-) create mode 100644 docs/assets/kafka-file-consumer-topic-syntax-validation.png create mode 100644 docs/assets/kafka-file-producer-empty-value-validation.png create mode 100644 src/kafka-file/languageservice/services/diagnostics.ts create mode 100644 src/kafka-file/utils/async.ts create mode 100644 src/test/suite/kafka-file/languageservice/diagnosticsProperties.test.ts create mode 100644 src/validators/commons.ts create mode 100644 src/validators/consumer.ts create mode 100644 src/validators/producer.ts diff --git a/docs/Consuming.md b/docs/Consuming.md index bcfa595..635d1d9 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -89,6 +89,14 @@ Completion is available for ![Topic completion](assets/kafka-file-consumer-topic-completion.png) +#### Validation + +Validation will help you write valid consumers in .kafka files. + +Here is an example of topic validation: + +![Topic syntax validation](assets/kafka-file-consumer-topic-syntax-validation.png) + ### Start Consumer command ![Start Consumer from command palette](assets/start-consumer-from-command.png) diff --git a/docs/Producing.md b/docs/Producing.md index 3ae9e51..cb2ee8a 100644 --- a/docs/Producing.md +++ b/docs/Producing.md @@ -67,6 +67,14 @@ Completion is available for ![Topic completion](assets/kafka-file-producer-topic-completion.png) +### Validation + +Validation will help you write valid producers in .kafka files. + +Here is an example of value validation: + +![Empty value](assets/kafka-file-producer-empty-value-validation.png) + ## Randomized content Record content can be randomized by injecting mustache-like placeholders of [faker.js properties](https://github.com/Marak/faker.js#api-methods), like ``{{name.lastName}}`` or ``{{random.number}}``. Some randomized properties can be localized via the `kafka.producers.fakerjs.locale` setting. diff --git a/docs/assets/kafka-file-consumer-topic-syntax-validation.png b/docs/assets/kafka-file-consumer-topic-syntax-validation.png new file mode 100644 index 0000000000000000000000000000000000000000..2e3da9e9272a6a887023bd7d6f60eced8c8b6bdc GIT binary patch literal 10540 zcmb_?cUY6_wr5ljkgBMlfCLLzHXzb#3@8Srh^QbPQIryT4-lnVC=rw{5e>bGbP$lf z-IQo3K|(-k2$0Z{P(qUV*!P~f&pk8q%*=DnA0+w0m-k!kx8AkZ?|pgO^oHP}lZQYc zkf0Il+8q#RKMwee%*IxOdGG!h7Je}z(tC3^I)Lky{Mk#Vts4UI5B|6}eb|@H z)4S+wMjG$W&RWs*8|brbZJ4iH;ZdXn^!H}~-)wI853m{NasNRRDL^no;js0jMr`*!;z=I$c&3L>hR%L~S7DP^nET_Ola+m16`*0`H#sU>No) zYR>}D(1VdGz&kHa*+Uv59a@W^)ZlfnfxlvKvw+FMq)m6XE~+ftkc4B3N}!Tx;pP*%hu;uQ^u`$adL+Axh%29Dx3}oWXcQN z{1F_!f!wm)?8{0DK(p)CcfXK77@{VQKzBMtF#A9^o`H#;=BO5qm;Ivsc5AbP?Q+g` zcLo*B*ZWo{c&*0xo9gpEQc=k5kuWis)bj*N5~{eY8>q6evs z`k*<W;UrZYGpN?x7|o!sp(Lsb&3sTJlO2wysOmNqw?ETCwo4$FbWWkVh0?+Yl5x3o?ZE+BQoHK(d7yWZG+K)6{;^_E&jP4^-sy^?OAg zMqkb|uKTqAZevbOj3}}XZK<(VBivNS4v;T zZ!+yYnlDXJS*BZ9z5!H<-XcFv01sWuz45(9`Oi|zY_aHjGS99L`U@6MoWI=Vbca9k zo&qdR^_0^IOFqPcsWd7UD~%y6 zR!_UOtARj-omJ`ZB5md>IshH9-WphkM&xU6{1_yb1wkH0()#NHIIFoyfe>a|-4P5o z7x|_Us&;6Q{Cbg0n}m#oYh+p;k9`l8bdz1!8pO%r%%76;DA#U6Wz{Q`VA|BUXCIP$ zm~Cn?Agg1TK_BtC$O1)C9r+Mr*6&QR%{Lpp#lhJ|sPsML$AXqj2eW*LI~RFTOL;IF z4g%HEN4wIRAYse;o4+lP6X8?toMvsri0vj84XId}Sz9mI2S9EIy3@B3Al$m)&}Tue z7(1dWx2kc_ZM2K*Gt{` zBrl9xx4J=bVZ>pF%h&?7sx_XMA(kb@JqZRb8IHR#K78{Msy*XZGo{T$YxjXhM{ekC zU!b%ohBY95zl1D#wz2|sSsQ3h*yf*SlFs&`5AJ#LAlasc-9>G1pbbX&R3fIAqz|Q< z6gi4=$DaqqKELwbYyWruLZh+63vyjQYxCKaQ{wd)$3?RxNDoA$UMm3oHGs0|gHVq2 zS1fDrB7I~k3`_LJSoBm+VetfkF;#dpr1P3{E`e-7Q=j<_sVqNg_(}Sdzi!7)ZL?!W zu;lcL?y#r-a$#Ukg!;$@la$nbpx@DlF@!Qv46ESBN?18__C&~bI=}rV4dV>QnYhPN z;>7iED?WV_H57&dYY&aSt=9M2pitynq^m-wrc(kGefKGUd)5Q>!lu)(0zx)#JfBn3 zOCMVkri+xTWVhCIxJQrLp%OzUqQ_Nws#y1#UDZe*PP7jnBj&KA-&nR5%XOmVklR7f z?^zKH0WdkFd35#YNv63dW^=Zh&I5WD0Ej1l2)Ehti)W#`;KwNcUdmQK66xP`9lDc0 z$+{iHxTO=L_kJ(6kHdIzlvHk4Ag5;^0?f^|_eliv1PnU>0zCuk9)O+%zThIC_&@Fg zd)XLt6_xWGEC}2gNfY?zB|3J&tV*u{2-^!e$@0QF5Y=n?f|@EqHaN4D(ivoILK6p8S(az%qIv8~0c&cWgSnV{Z8ho5pNef?rZ8XAV;3q}U-EVIC%i_=auRsEOcury&;T+`2lh5?eUvBu6b8N!Z{QE(P0tjpVenP+<0d0Yh0*$- zRb|v{hMYhCjKSsMiUrgR2Y=PraZ*rGd>ym*_sMoK{6 z1JhH}B58Lo!w=z-7Uc$IK_&2#+MbZm<~(TV#Vx?Vl7|7Obc5~0j?kAx8y@>#O^B-A z*5Sfw9Bsm%dpmwfGy!T|SLAjV*}X9osvK2Kwva3bteq$Upw$pM#&s70OEQaPVjrHb zB~S|_eu^#@gW-`C4-8ET=uJ=O@SB7 zi`$;tS@DbXuW$(ie2^#zZM5@41@5cb68bDyL%!l~Up#I6fPXLk!_Ha9Fsc$N;XVugPC+8v50zj|a{Unq>IE;bKwlzXQuRa@C- zqkAlohRPI1-|Pwx z7dhp#25brZ?tugA$0NmuTx5Yr+Rqu6uPuX0`CQ~Oli{8Qn>|?#cw9x>cji*~By8Vf$y;omXKO0c+@ zz#mz+Rj7IVZehE9r|=`JDr;0?(haiC%cL6f&(hOAm!wkO(JKn(!yZhH8+U_`th|1u zG)SPO9_}%&a~=^)O=E3k%~#@CeWXDY7jsv}$eY6hA@isG@2#Ze+G{8BluRny-i`^u z>6s6v4&T$N)n!<{AG==^K@K(>;ImA^#}XW-YMJ53;XT#wG!-r_Im8g}YY2t6mIqH} zVf%gJF*A-o zHHC>=Tks|d_u$}+u9S3>adm&@PMj;tw@vQ_rswTvMWPCPC-{$)oks+-#3}NQ`)106 zfh<`mbPcoU`GltvQ8h`q%6>F;Y4D{jM<^cfe$+wo!dWBJ{fKWpJYW333|*|qD$#5P zr>AJNh#Xh2-@)|RYACaGeA;3R=v^Zv1zAdiGX*_P*((bAPOM&qkMcLg#*1FB>Et;R zEE*gLOG$~!KH3b)l2UUW7qiv+>aA;W5o=TCEw9*b#(1wQJmXyItFymh<}|tB%01*<#9u=h@tkDV zTcrTc@{sKBo1e)_|buHeW4R=Z(Ti+lReU+)R9qQWg6D<$i@Q`b=LO;SD#W#70n37X^2&rg?26& zju&>@6eQHqgW;V$CC>RbD74P3>yKHlhdrJEVn1k|AGd+`Wx8D^5$l_`N#+);b20Ae z6%o5LdRq1l68keJJaMUYo{w3vVM)He&WrK%F+GKr`L~JpTqWjq-et1hN?3GA^v1I8 zUYeY2-_7XB_=R56RMU+GijRp=bGtzruUb&#k|$TznI^-bulTfajmY$quSP2zDVF{Xja}*2Cez&!}vq^4HSj+aVg;q z#1vy)u<6quag%YwbS}b8DZsuI2n|;Dsg7na%*&$}Y=^9LyX~QD>8I69J zc1Cl3CyTaZsY+&NJT`$(JgF(AAjzkzy9FQpHN>qx(ipagiS#E1E_w#mmOav5Zz#Ms z>_sSBcw%3!=HU4Ujx0}d30b~>s;cj6@r=(%jkh7|dH~6PczXAB18FR>XDm^ zS@klvp~jlA-Bj0E-Ispq@+HAgUsnA5=2B^h(@DM=H>NJlE*u|zV)NYz9%YB{K&DwE zFL~{OU+b|KVs}mG%H>W85?NBMMQRtlYPWX~fvras=ewNUA2D9mWowK1QY?qPdMv)v zb7QsHkxG=F2U}%Q5xM~+7e##9%Qd@zzAGEkq4&_8k6p6O`srrg_-Eckw)iT;W_YVf zYntv4CkGfwVxe91KGTY#Nu%y^pO(sQdUqvDpj&M+;vOr7^zw3TaxrVDaDLpDtdT-> z^7J5O%k{(p*#;g}t#oQD=bV`A%4zp&gzU`8lH%|0JCnEe|F|_~i|a9B>|ZVn-F=N> zd+bm=^@?+*svKH}%yNz05uHvbHk9L7(ICVg5xm{BIL5j$@3 z)12N(dT7VeTT4$rLoN{6?}qKKKh7Ct6l^AkA1-a0PJir%S~OvhmMhDOT7P!x6`tZ0 zO5IEPC1#HZJmK0J^D+EGEt)_&)`L2nWT)ja9U@61j?|3Rgf-@mJ+>K0nGL?Xt9Ngu zKrPSDuYcwbO=@)v>HA?BAj~3Adc@9@t-I!8Z;A@!H4e_prU}32X+1oeg?B=(9sukf4|H2La)zWw~WAd+vAiL9Pxk4;oD5Z+b;(PZ9zVjxdo#PTncNL=J|)0~e1 z-&bh5x*R)lajXNj4$l$kWQ(<58eWMIoG@a)YFDizN2t@yrAdr*8{DLXXCKG|@TpQH z?IsP0H%8e@LA9=;^!5R@8_-uwj4p%AqCPb~nC3$1 zKs#FAD%06JpHr)YydD~9c|uJrZVc$tL=IyP{6{AT%oq~-`EjzeBL1cpr%st%M!T1n z6@6+g8ZdoN;VOP32A=5rXfcQq!xCq@c(&)kyH{KaoF7K~2+<1}lctJ>==3uvf%KhX zhw{L|o+&TBg-3%6^y=wDrxJ1b>MJFud-> zz{=9sBMRsdnfNn|-1%|;p6lK^W6oFyOAQyIN~Fp&-M#E*iW6K?GjP(AlNh`A=8iAB z4TQd~sm(^Dw%@t{OPf!auIw%nMQ647Xe)?GD^9N&pbmaM1K7LmjCv4)k2ry9F^ z4pOU%Hhj<*vHg7wCr{m4S8^}W+zj{WpfRz7~G%%yKw{1Jn`uYSYvmydgc(s!EIT4X0k{BIdepD!q&V;D-XdpIV&Kd zrL{RYJ{hcQEJ3(K?_?T|TRyDkWfs*{Z1%*1De^00%F!&9!m^z^DFJk+HIYy-G-r+xc@?1Hc3|-J4S5XsIgQBi|r(>KX z)kVwi2Wx#jycxyoBmy#3|GPbmyMdZG!SoNVGxH-3ZpPxDfI$DzF7B-I;t-W>m!wLx zC=~#K0rW{g9u-akCs^DFCfRmc_<%Id|KB>y|F?EEKk!=*B;xp9*K2^9NFUo*H|~20 zxC;C`i~fH$wf~z()^L#t3V}o^v5FTPo*`S(P1X+T@DSGRIHS&`c zO-bYD)}_a$Cpt}hXXm+(zXkYMw4WG;e5Tq3?pnzS@K?OK;X5u) zEC1@G=g=GmwGVt^zx#YKO+r{XF@QO^%v{%~uI2&tJO^rmd^w!GRU+w_0i!Mbwiwqn zyP5UN;nh%Ux>@lpGfqHIQ$2DTT-Jh>I^)!*J5~Sdhpwd9T!7_N+@EJfq;8=+e@J4y zFqdJ#EC6s*W~4vk`+TOEj?JHK#xXm2`!=njsu#0V23$fF3UltFj?WVVAjXQpwLfTW}>kaW7H!0j(?N;m5;5Ba)GOQax9aqRce7Cjjr*Nw(+PqkpbYANfMiml?+ zOw6IRm6i!za>b|pYnM%g0-U7O1bU#>NGzBBitE?hk&TYr?d6IR_hVy0?Uk38gg0(g z|gE4=+lw+^*{1p9u^=4v(krnk22m@t(%am>DVGMB+C5; zZ%NxIZDXdA-km3jxwSOLI{t_BN^fEF?ABp7o@W3`_qK*h`WZrj3dZNm&K~o64^Gbl z1VZ+Z~k0d=G zg0y;WHh-2P8hW<4I&f}Z=1o&|enL7&|I4}Nq*O$=h+IsO3~yK4Pu$Yh)R_sClEC%|LuXO>tSkpPTT9N5-w7-Af=H>7dWc*k z{BwDc%&&?zwg4ez@!}~7(C2+X#EwPz9|gsHD2aNTm45)kZ!06Z{PlK$ew{UzyHPaWASf7wXzaZ5HT{2 zQJ=~!!m-0AKer9LKPi;qtB7cB&eeNTsw`P@XL35ALrFMo)7^)XM;9*M5eCUc0l3wK zjtR%yIe(}3jb3`ksICeDyk)%x{G47iN&BC@*N z@5=57mn(YF%rip6bl;knG;YpJ4YnEkTb9e7OrE}rG95WQmViee!Zj3R+oQ_)y?rhp zYtMf#%T4yy%!v3nu9zjrQ0q(F=vd=(i|@8hig7?zLpmx2-4!Y%61N>s!=5mCK{p}+ z`5r(+d0l+L{Q8ALmU%y1(pC;k!^L{$?v7rer0ECC3_WlK55!D-R`f+N()bAG^qUn* z0wafT`VJU&9KA4*H78s9iO}D#7~imEVDUA~iKPmIHa@YJ3)U6zH~1!$_FkxVP0O4& zp*{D=!Efg=j~97K83A7c1izxR!Zr<-xBk-KTGV&;?u)NVPDHzETXtyZ+Li0U%*wLJ z4xcO9E3fqQ4Fwbx+jqY{2e!9?O(W&awZ(lyQ zs*^g{rNP_8-iE5%c6%Vwy_h0HIIkyL3ekP@;4)H=^HCm4*PNr6TQm7PYG+`CK$c#H z56wM$aYn!4SwTUTT#=7Ae|xh1fbd5p(c0as`3KS_%Zc}QbtN!)S{S6q(>gn$Ah(P? za`iQ{m=qI^Twg!NZ1$2X#8zB7(ywgxisiFX1aC#uOh$|virZ7_JK^`ZCxh)an`s7y zG~J1$o0E0FwLu6x=hU&S@d(t&nb;>=A0v380*dm>o=a_juM$13|_ev3(2jH+S+VsdN^Pm0-8}`Fjhj=^Xp%nq1*+rj*4JO#?ek*Hat2IEs##b) z!=t10LvUa3g_x&3Tncpq@6wDVBAPsHG7m*uv~`K1X~3;|=rTg3M(2}?#;!P-^Ty^6 zoG%nor6oHq{^`72FRk!)3xD8bZS!p0exwI8f(E%gr28hv8lTWm^tiNtlE(+3tIM8Q z-v&}89H>6kv|-K?aY7$?R9R72y$saxgV#YIAz*F_^gZkaxu4nsgNQQ3Nnq^!S@7>N z;oBWx3;Cw@-zL!iyjkCnz<9N6&r9t8r)l-zPaap-3D=K-Kve)@82uF#cNKMfccitE z_0)*I9|Za%fI8kl%|TGf)D|Z7I<J=^c_@~9(z z?HyGS$%xowGe@xOM(ve;cua@+*@00jXd4;c1J4wXmIN{BY$ z!7U78g}~qda+3u0mIa7|XJr6CLb(J(i(L4o;GA=fDPx^!x;J{ak=bJw_%WcSKQBR+ zCy<;tB7eIM>+X)1!!Vi;VA>on0Zkbkx|4wp;25-8zd6jaG<8ovF5=t_ljz5yk?dOl z>bTiqF6}iTgLl=*rj=~Zn&6-IO`D)bEiv(p>H~3uc_5pQ2R)X zT;u{yW7B-i=E$X)^@#U?vD^Vsv`2w{+2tNAaFC|ex_fVPNYpcv59g#2k&9St4kOG_ z#sZ??h1Pn!EfQ%o?cR%p_RN8qwy)p?)7xfmSs$1-3S7cY*3A;E4XPqqZ6oT3ZnO&o z#`((8Q>Z{&AtqT`00S`sP-a0N!0`RPJ7-4^l7d>KBa#Rq+fio)w7V$BM^f(UzG<0Z zQwNw3Lii$^r!i@Jkl)yIBhZn2tTDPucTJDii`v9i3UkEoZ zEjrJ3ertTyG{3j^-IzkmGw$^mS+KNRVlZp*!+P2j0yCMrK!798c zfNF(7tW7qSxc>8G%`TYe*E;Pf!aZKJ?Yt`3RpEb@5{&ysiQu+(J>mRr@(dh(Q+x4y zCFfe`ZcnW9pZ9wOo)h?|d~Ck&<~Ya!idX{>IL=L;o3+%go#(-K(NdmD6kCFa`2oV| zD6~-uY7OQd-QIOT$n|@j`S0Ie#~|MV)#Qx|_z-FlI00q#3QSA{%cJ%$Km5+bu5lc# zZWN0F;eP=jAQPf!KCe2KEIC_kR8IH|#4RsGjN00&(W+7rX!l=OU2wJu8(pFEO~oz!-QzCGWs}%F~VDQ5$pgUcBd+2`VTJGruTLB zFY;Sg@2dp^ET`m|zgcBUyT`y*OSN=j4<53-V{kilqN=vH{+S1m*;a}`@Kj55fP^CQ zM}>Q2%>%r+Lrri=)2s>lk{G6+7A(Oq3TzD*>)a!or2w+|IT}{5K~H%kmaGgNHLhD4 zqol7&aeE;(k*vSuaMG-1?Ivx`?*XaTR0!}QgTlW@yV|b+ zM%4!B`k>!n`RGSUA-V=TWzzFS)e*^39z6%i;1?wK^USB4FZHgKfWt}BVIP@=_3`G% zFj08TWee9m-;MGCiczAfX^B;wLw~kK(n8HJ{SZT%!`j|)g=gV=d2bOMJ}QT!1Mowb zKDD7e#-{2YjJ?SKX zWw!r5FHvxC4>Q28{|A#kJLcg{HXlyT2FW85DZ*?VX0wdR`ZnQN~3%$bLmZ7gJ^ z{*nR!K-S9g>{S2|q6t3NCBy_z^xLg>2)=}Zu3DG@g&oQ>f*TPp(@UlRP?99Q?jb6; zm%MA~6a)aSGZN;gZTR{k!> z`%m}BXH~I1m9~7BUA4dFB>AP{-GNHHyqEA{dPJV;3;A;*`#f?*das`I8IX8?QmS6` zhUseffvi^m6ZFjf=PYSWJBU!*8MPKg3z|>uMBRw^wn2JxI*3Lhg`+y6jL96%2rGkg z7yvGdp%sBxv^)s_cEmx&fVZmkp})Ru6@{MG8(y9VpxZ=rAVM>=EwSh1Nev=dY0|(h zQ;6{Pr~Vp%qX<#P`b;$*fNuT&*+|O@GRo@O$^NE>+Bgr-e2NpN+58#wg$V28Jm0!* zr3e+BpTT`l%O`SCY>cgORy2~vxsOyYF3E{zFPPJ_L!}r8dxES(Gp-;Eqs|+t6)2gL zS(hiH*_bDND-#Br{7ZDY(Y)O#R}*|$Z*+NAiFFFqWbry%vuN{t-QwoUHC8jCSJRD~ z%l&c|vFqb+B@jROp~JPfr|4%w+U$FhkP@3oCcIAiM{!*5=K4TFZ%|A)XBtXC&0Em* zbdz3~Y))Cwmqf;o4Z8rv53JB@Dlx$q_%l4U`N$+L$c(Zy8w8nugp#0TCH5^(Jp6#L z8(r@jrU%W}n4@?jCQ4uun48S~s-zpI6Em`kL*&#ytyD4nqlUc5m;Cw+dybT6n>j@D zb|#vV(^MrlSwdWYs1V~hfn#(Q_0}8iN0__}4iakE*BbefDHg@&nA4xw`sXIFtEs)V z?5R|n9mo6)c%KJeUEHH+!H7Z9bWdL%+DQMTk{tOmGTS{9ZdVNN?HJoyELm3R*M{Ke zP&f297fVhqW0lDca~0GPgXV_(X8byBf2=^Avo7<8s31_yD9O%DH<|o>?UJL# zlzpeN5};z2|L!wtwUJr34^cKe9RPViP3;P}_dM>+t^|*&OX@c?MDu&&tI}17`P8X; zN|$HT-A7b|S?s<`X(TOn!iUT5Q7&$ZGf$qJO<^VBn%)yhAN+uzvTDtk12(zT(Sr!E zrPzA1;zFLPci7|?<~rmg@tK(a4*i?E|3XT{PezzI-bj{)x!UA*nv5iSw=~@Pn}Xu! z9(@7wR2uwl%;f%ZWEz`LR9gLYM;!wotVvBE$ZpPyL=05gwxPNwg)vaL4qc(n&J+ z#8fS*@tK5gXNi6*J8{%!=raF{QA0*8SsXq{~1J_@mTk=X~%&DPsvKV{OB^@^R9??br&sOzPfz%*l^;-pKsMtZ(-*A&a zD`I2+3@QPGZs~3p1s@zjR=iRMI_A(;-1SP;eAG&!e}|`yA>EG^7~)BOehCEgPlPD_1;J{5ER`tD~=I zghGwq7uMY#Twz}9=9)LB7o`;D@UqzIA&XohGg9i$U=Fd+J>SGsi7vY6@FRjJN(v-q z>?==kCv+-w&cO%rVZR7#U7mFDhEMa*)-xhH#GxeUc@uPa^lurAv6{JvNF91gJ@;AG z?4>KI=Y(AayI+GVZ@F>o1G%@25*WW{M?~uv_U`Cf|D3 zC!Bi3yHA(EB?eQBuW#j%G^qhmP26K6Q|}RfH}`JmY`*6y2W9+3wQ?TLsMWiCmDa_B z?r-b~P?7W9EZ$)HZVm}l1g{|a`RCHaUbRv$v&7?~ zY2{R%{x1Dy>|MWt;LHe$*|^q%p=_;fY1=MI+WF0JdVNQ?@r#b0rz@SZ#9FCCm3>}d z{QEsKtS+OamHW;}NCGB2g4L$yN>+V#XkKT}H5W)I$a-mPC+J|Rkot_mTG(^c08vk} z;ajn>PSJ}%Vb0c4)lC`W_xrLvrbIE@fzHTQ0Jr$ zjs6zEck?alL@__e98%P1kzud5!Ss1!iu+GmPo&fDhy?6Tuw4=i0M0w|IEZ+K5cz5OG;ImdxzUx%DW#%N#JpwW> z1ta5L{TfQr{be}G)i>PmAjcHCkk@8SY=@cPkc{}s7=H;^)&)73k{?YA`<4Z>m8~gZ zs{RQ75Gr%FvYos8pd8IqxNFQ;>Y+hSQK7T$nlD@{4Dx0{AYcdw?7p00mr7!jGIzqVmVw^f@TmTyhIldMDgDpOOE zElFE~k+ckUhsa@X#i8EwP83o_Ez1o}idR6Z!>IlW{__Y7rAK6H-?-r~Ls^5Vrov4Z zd?9K%Gkb0YWMFGfHPI7c9lgEB)1>~;%i~vm5Dl3P`aYRPdJ+ClB~mV8QVZ(!u&$>O z;A#`zo+LSD*Ooxha_VW`)u!#?&IZwAEQLP6~!GoVuDeoEA@j zR+8NYMs0qt7WL0+9S=1+);-TlfNG3*BNx9<^^#3+A2y@s@Ke1ho2~f#_sLOEIdH(YtZ z%YQ6m`INc}IiyZwjyII{JX*N3$82h-HX2{O2eW}go+Q6Sr5DvR3}yZPfw#oGpl=%> zd7_JuPgW+Q{1~IfACWTQ@l#f(h13!9EG{}PlEt>`;^A8=tmc&TVu78VKQ(v^0e~JTrxyLBeak5n= zErytW>lZp^IGBjRXI$s81*h8xH7sZs(WN#qwg7!9I`|SK$s8hl1ic2`O3Le6JIrgL z=Ojr`Z(9i&+@>@@ckS^^>zcZvd)IL*N&FGi?S9ARa~7g^Mbabf*l8#`rLwj&X;>!W z8icA%%n$E#-`D~K@54SiB<{DqR>}*=4ImCB2eGG-KR)<<7zY1O-3$%A*6*Ao1ROjs z@J$LNizI>y0MYvI`lJ8Mp`90HMIt_j@;RfqCSgUzbJMrzAj0R~8siP@aj(tvmuUmy zF*zfyO!eYOR}F(!WW|-fibYfJ^p70=MPymI|0}=EW_Tp8CC0hJ!q_BYSOM`FXVIpx zkBuJ%RigY|dpfLZvRBsTYjfRDY)|$tOe43BLGMe+xCZ*t#-Rw4-8yhu#RVBrGuPQm z6>tyNj2X-YWWIaPCZW(+^JQAPc;+=thn7jMIcK7a<&??Gh~9mD^vt9PORIuEy`8@9 z?LAyB@NQ4iv?9lqiWoA?ow?@wB$RuaC!6rSP~Dc4$~?CXXLccJ?IFgg-mN_8xiRM) z{}b<9jfOrhJsy1FJK$^y0qGfI9kyzG+=p#Z#d_UxflPV1z3BMX{8TK0f6#-{aO?z@ zaq9(_a=cSylptDGJV$fgY&V;G@->(ajb0mkL9f;;^HENQZgfn2MKWL-#Pu)R9eP)4 zr~#Y-VTa!BN&b0u>``08cdp@Mjumq+^%3qZ)Lt8AHL`+5zgE@}=kJ=}-rVa{5W}gg z?)G!I$jI0xIFOVOk99ttNG+H5iKtmwgxTpTe!H(Y|4mhbIz4!EV+B(#CqvE{dzMG5 zEMt~=hvv%Sa`D4-mN(E z1Yczm4XFWroh@o#mk!qDjuI?|L>7E}_eP zoyk+CsSCTGbB}stw(>{Lbz*HZL5-Ki%fN3UP`k1E(7Xj>Sc?xog7fl-d%(DVe;Txq zaR>J0hjFS_=eKF7jak+6IOGnwbOs|Po3T!;3sOxnGZRmjKIfb%|J*cv<%zk{9sing zczcOBujJiqA0^&{KPYQOx?z+Dssr14Ho%VvE^a|`?oE;CK)&|zP0NB;If^_9M0I_DpI8*MvkdCfvp}O^QT5NU3^eN1(2W_x^&?F*{vAn{VH}66k^U&G@_MGr2 zw(#^BRbsIJZk0V-$K1Db%HPuHeO5HpUc38wT6DkpqS%0>s0<_SY-KPCHJ zUfp{oA*?Q)NA2y?dLX^`8RF5_gllRNRlzk`k4JFuv>o=zIcf?dvsjpLnZMm%!81yk z8@G0GdIDrjDQ`3PMQ7eIGnw;+P>J?33P;ZPUU?v50~EGuL=vZNTshbXMc;u8VlBE8#I+fujvhl`~~{xr~A5aEvjiO&lJ;{>tRY z=tuijLleD7#^~^ZV>?LE-yUG{O?FG)+u=+9jMG7$B>vdBrGmi5KYiY)`T3&6%c{p< zvyJ+5b|OKp7Rsbg=_n8V!&nr1@mIe%f5-ET#Ql?;(^!nE<(6x&`OAWROX4}^$&u15q9NF>3P>_p4%9_wDTU2LDn z1Xg`+apt6nYHhzSQi-VWnFwo#b6i9E%M!f;rl1xzC@XQ zC7CDRZPKfQ;0q}&lSeTaJi>W#wJ~UC?^I!G#>Si=7z&V2lA~Q%_k`577e*QxVw<${ zOd*=yKcFjKKHu}T^M1QpgbZ`=95fx4S^DYqk}zWGCgtdi0~=ZuS5L6eIvRHK zROh0ky>~8FWV&*0NEG}a`t`SYFt_ zMdM*jcyGIPu?G0mmO;y;?4lBL0%YMz{qKLNaWnlxSM z*?36yubz%mtAim~rc>Wget13g8^||So7aL-Gii_CBJw6AI=w6)95{n(cq(y0`3w6b z!v6KrT6AYli=+3ftCtAtd9ox-Ij&EfdVx)N_WHm4-h+v)>aoj<|z@)UBHOw)Hdj}++)=mx#1sC4-R z|3SzihyWy2tX{{Q{k__}N*n*2;rE!e%-+5U_`-?3wEG|BXsU*;57x&0%yk@LZ6~R( z$sP+e%-WF05-`qrRg}(nMbjR;8=1Cu(iqZ+n5*tvfD0hn3j-fS=tX$%CczU%bP4d7 zLLZZ*m0K5TD3l&t$buH06EZ%5#Z*s}W}L$~g_Z4P9{`{>nOdC??1XBYcp0@{%<)?D z0(laBIu`+jB*3SO{UDJo2L#Pxp>D6`GOQsj_fka4WIIBfjPLp~S5N}47IYaBTBrEn z*ltqcr48*IO(8&p)g9brN%S)(fz(v;`?TG67Yd`DOyuScPbEzKjXpgIuJLw`uKPW@ zpxYb9#oZ7oFOo(>w`Mp}j5P!!6(WYFD9ODk0DunnhOcIsDobPVH_h{89ZeuqW;b#T z>hIOp$GDfNl?LZh>n}qbvbY}w3KcLGJ*H)MAT_beuy%^(P0_t+k}&5_k|8{O*}LKP z>Y^wp5o$Ad{TX)~Ah4xN;_Zz@7b~+LBF>H-8ouZPwRN<`#1EP3P^Z5-UaTtzeeE4x zc_t7ewi`o<58#_be+=pCKi;Uzvlj2vCLSZbrg|1Yru{Bk1m2r5!#yh}mJ%7#^d0>M z@YAQU6HaBnUp?K9mEIW#5_|E_F{M_?u~8C*h_A``+Ct0ay8RrW1@rY%INoS z{-+W*#)sSwnqJ%FPrDBd;|me|YJ~bfb*5sB99uR0)tK$;s&XwM{9oVbfw%P8cE-5y z=PzbTpg;Fl{F}D~?EGN?{{)Ec#R>t3B?K|U8B+-wzGgFlUP6w^F09)KLf*I$^UWn< z{3LtOw1bkG(9t!62uAkZXl|aW9`+G{s(jx0aK1h9=>O0^ud{n?;(F8=J zP4u&KiuJW-fdX3TGe!yqU~5V-P6>g#zobpI%6g&k=LFo=O&aHLP5Efskl}@n2N?Y$ zU=#GlC$4qHx=xbvm6nV(B*DeSwo?gRTsWH3l7T;#tdoRBe9oS`NM_e$WyG`_UP%6E z%G_gQu&8#YZsYz%`Zttqj*=UGN;8zay$IwIZG1S7kt)T?(ZWZW^K(D@TC zoKL3T;?#H^#5NVzE1g&-!7cp4Fe4eiz($wK z4z0AKa|~ECV*fK~n1g#UL5kj?DjthEQ4cl7e=zYKIejGG06Hw>y+toQXHa!xL$8S( zV3f3ODCv+LRIfs{vcAhEVUtF3{Y?T~R7?WWBC^W~-3lZ;J7Bt%sr62S5I!wM%jgn| z-9|cw)$2>#*fNkfxPuJN--wm*YO|?wJ=nnfz}R3W6gl_)xP^0g=OWGuK=MjH+)SN= z5*&=!7j{-{_)#7<+KC5U?eNz$LeEKz67-~j%VGU>_GX!ROIGc_vS&=W#!8EE&5 z+IO)N%fs}w$7jpH@EXA3HfwaaIh^?#oN{z=Yh~Frh#vKo$*ny%DNtCm%vkYjQ%I-U zOJ7=>D}HlYZF|-CK)j2?u^HNi)|$_zrK1Qw3WIhEvS%CWDRHD@nrp+QZrB1fFJsty+^?jWVh%+^4i-^*Xn90I}?o^%fMIU(T=(tv2YYqd3q&64(i_<2H;~`>ryC(>pSkJSa~V z1|SF>37ArIdrEys?m47%5;VBICU!f5FVov1pe=6vB0u2#qlY%S*(7*E^2L7*t{txE#kEW!fSoXY! z#;TsRO5K7ElGf@3^~dWTecB0s z?G*HksNC)|y6K8+mL5A_XZTuD&tpz;#}W`*q4Nkja;nC;{bgt`j6J`Ero=H)ea@#{ z7cY_X@kq~>LmFXr^;g6div#&jmN09{$17?CZrtWId9nlB)cLZh^MP~Io-3d?D1pjs z7KzH@0VPCbBmWLmi1M*cEj54N|CfHC}q?$*-cUx1EXN(Q*Zt8;(=Y2#3W0NARRr+2+KR)1somr}Ekb?=5Ua zE~Ex-q)~tOypdAB@1t?6O?~glg{>6}RnFXJxhH-J0gMOGjBjr>k>}ciGR;A0zXkj; z`x}BtK|bJrgvS4{$oGFs{45$2W-=(o~yZ z#Oq;@Y~7AI`d!#A!Hg}+q-kmvy1o7WCxb?YuPXhF005eAJTdAlJR=Y6J^%TVhYA1$ z={7+tGYvASI|SVd04)vU?M|a;1JFyaR7e6NaGj?P{Pi1~f2B=&39KMr{l9Nt|JRH( z+GkkwhlesGm>JL8G?>`I=w(UdMtS+%U4sfq!va^EkpEJ8u zoy3#Rn{Ttg^=WwzZ%Z_~dyY_#33xjd#VDQXJL2uy@ps~(icC%`R~4JREFftdzsued zr@e~PA->VEg4hNaSx#AkzT`=Ip-)Se+w!8qhFBXj zae!!;DvO*~$he$jQd^)qs!;&8toB%bqQBIv6bJ2a(#6hmZ=W37z}e>1AW%VUE>@pM ze&nXGbo$11feTQ4Ax1fZBuM?@zPX@7TUK6nL5Q%7DfNrbp&8bw?hSj?a)xb=rfc1M zV`Qfm8&=|yt;&@*h1E_+2G4?T(?i94;e5? zPuW$a@r!r{=Kz{k;ZJwA`ZS&yLJh%V%c+*k0KWGcZ{~E0aVS5%bgJ0fB3RkGK6JLe z3>5E%=|k=tPDvb2i1Dj-51KUMVlmL<&d?hjAE<-dNy3STs&@U1R2vB`?;NUxgLzZO z^xXrfuERt;tK-rEg1rw696da)Td|k9-1e)`G-vL}R?>?c+kB%Rh|z0QkO^U6)~U*) zF|ENhZ2W7!<|n9uU18l`(*3DXsK%Ee)iI5g+oL1eWlwPPbo6i~wMDPk(Y>ki5TW6N z(#ViJJavR0xCMKpW$((k==x_wh5#`T9I? zljmZ<3K%z|={?FUtxC#^02d=y-n3zB_F$qfRX(Kl;x-G7cMpcHR7(~XT)tbt&`lsa zCXQ%G)J3>^gFj9h2AH&nDJRXP4ArUkx~GJqM)2^v1@=YA(Kzx;jd^krM}Zw0ECW+2 za>G4wI7lEQl$RBL@OJJ-ELPO(i?D9fU(noZ5%w*+QjN$8m>AQ8!nXSIp5lh}8mMPq z^$=n2?knW(EVtNm;zR(1et6zL4qE(7b3H_Uni^vZemrduGiH?2nOrW$3Vw#nUIi?*gtu}9xd=<;`FBo!M~A14wf18p4Y<6Ar}C;6xqreH~%mt6zh zn+GSDgb*`t-lVTfN*mL?7MCv6(i1t~OumOJ45i-PM2==n9@@F!s27tW2OYUre~&SrO!mm(PvH?pMinu-60! z4_!5KQ%UD{1!=rkS|i%$o2M{FW4yo*s9u$MJavNJ&1zd$Y8-nDie!#C-GiX`x+~o^ zEF9i~bCv)6p_0uz6+=KXG=dDHQ->(84_PzJe@z9|1vMJ_7 zcSKUmk4JGz$8ndGN#B1tsLgN2;NXY z6@|4HsY^mGa@^@v8eB1qi^(TU4lK&>Ek!e9tikR#WKP z`Gb7p{qoJrPu8jnYk}58ffPBAK z&+PUyzIe-t^7)9tA2kGZ?C7Vtu(9_M>-3C40_&^}ro&xe5o<)KwMhhKSs?X_%8Cc- zO(68w;L{YjN>?_g0Nf>hQ!(5i2fkNO)q%*dz98jd68ktq|7#>w@vb3e&XtT{kB$%` zaIzfD%%bVlFB6h&SySfIYQwtDo!>%Q+u~695`s;XoAptzr9HHAlvEe~t+PUEe7BVN zc?5|FDbDMTfFqWeR)9Ww%*n`Oy?Pl%B^ytgnApaJM#$8d8VvdY<;> zRStD;`JtgClZ907ATrQ^U)p_cPx-rpvLfvf9%JlESxzRSrg#{cMvqw1gA$lFb*QpT zz2SUa=CQS*%%OM2cf7tg6+T*4wqks(tP<*YC=Bfi;JE9wpPBz6>;rQ>rs7wydg`aP z3U7W66E!M}2mKWRZbV|$=YQ~?arfT|wf)8@9rolV9je{s^Zn$ZxZEEV1zjzRYk%{; ztJqqAv&a|bs+Vy}6dP}+o=*!=woiV0uWjw;?y}OSdPn-Umfa>j4&I*N0@2jHD0Y6g zJgKbIzokqN>T9#1b=eys&z!VjFh_`c_&~?=dBab0T!v=&=5}nYY_U8#oGH*0-f6fl z@;;^OT^E>iXiwF9!nOA2Np&Bi>)y@jHzDus+>0g4{qGOIPbnfS8eYHwkq1v7NimQ= zlxhDD{>PsmetE9Z<_68eJ?w_-LhC6n+=kXM9LpQQ&+gM(M|P~W^f?tJs^JU^-SU%T z)MoNb92`0wCNcWzm9_3(#c`4p#Hc_E zCR1?6J(YFgg>=v1Z%f44$_k-fue8>YcxWPBX*bD3N&lD<>1Tit5Oq#m5N!+}#fJ8N zlDjjnLmNI`vG=73Bo&${tSJdodc!P@8tWfq^3as{skhKQTVG3ymHFDuRRKW3dklW9 z(P80mq#g6+u8yk^sywU7Qrx$Kq!8a13++$B7__d9`}rlifkUv^EA>2qP_4 zckc;!bnjLCl~cDugp#Ab=jmXlbZ>>ACX749fZ*3HA7;Z`pAm;l)MfrlW=M?WqaaCZ z9pefCto0=a-C|Jdd*bYt?zEF$^$&K&`5nZ87CShoeFj=-xI~0_ErE?^JbwT zCK{758GbQ^;hfp?*H(bz%A{BOcU&sIKRH`^a6rFNQ&9K<6&tQOCOc&noru3l_v)7c zP=cWMwX7HC$?SzIn+ykdDs&gDompzXJ6QL#8onX;9o2FWN*QB|BBdDVm2Z?uq}M`F ztm5y;GC}-*3D?eHzR9(ux(x#)lbxB@BdGVGbxRlJklh4D3af*4g46n86{6w0p3$N$ zONUIZwZtLS?e0KEc@;|6&lneJNHXjnLd77Vf(54G7jPHucaQc&b;<3=N+T}|Bnr}N zQ!JD7+YCLSfQI`2E>*&=iT;7@Yb`+x@svD|qg@F+uiR_3wz_p$ug@!YGBfKiow@pgrUZkSf);{_1EM8xrSLZ#c*WgN?KzzrvLvNpHl?YfLAKd$XRZ$nz8N^ zTjbO$5)v8Vzox9J^wW9F0h^oXJg?@cWqTy=;RJnEaNsle$bLnDb`&I;JNPF_A{X1u z9Uz=IUEn3up6IgVhwleJ9rvS1;@U*2P{G6X`~~DA{fOC>qc0gy<3fLvkZZ?!R7jlM zY`2^(IF$nv0xm!@p&C(@F3!QUT%515d4@KYd21G?Yy9k^ z7l}`#!37-z z!O0)!RY>6G*1uwIx%#HhC&YatqUK5c{E6Q`b1bXXvBQ;%N@9dpvTs!<#mmx<&!UHQ#J|-&=mnb<&^$IT*-Pq!tKXx0U(O# zaI34?TY%!jf{Q}6k4OM`d=8>1z&~L95bCy#_~!3TO`m8}5kSV`_v5*)wX_TXc&;eI zaze5*-LDkY{#={Nqvtz@Y;Gl~2tw!G1q)EZ+l&ItDn)vx)T~iZI(J4uao+J1XL)57 zz`kkb=7FVBGWj(Md6S*F(j%pqpW;F5l-y0}O(os@!NBo(>3(GGT`K_*sRTpuYtVJ@ z#-~B`tD{b{oWp4$+4Z=pHKUe|d3Bq?(UQ(WXh;5wO~mXf=lDgqXe=9ofkvM*;!*Cd z-S=us)4DhwGIJkN?lQpjxLxVnmI|$Y=iloQwiaGDcb?77;rU7E?M;pUloNCBZa;79 zmfpGgf}%mM9j^m;!geBM$EWJQKjZ!d^q*VI)MCgNHm-fGH(=D+c>NInD#8lc@p{|# zteIo_G6Sm9e$dx&KM1u;d#`eWo$^S=6Ke0`_Ljay&QNY*Gb8S4(Sf~3wxV_1aMqAE z_%{JvO&XOUtY0tD@%r%CV!-CqEt{r-XKAnO*B2zwhXG~{d@%fw#5ZU#+O zKDq09T$cN8oP5Q92HMd#yeK9D!2ES}V60O${P@Z=Q?Q_7gmh*clbekBR%E<1&f@<`UZs2%nrI3 zAHz-nX_h({b>ULxIzurvQLRv2WChRhtCBo$*qM@4I_%Zf-KnL(p1A66Nhp{_w}6i6 zOvre(>qF}-n$0nkJ`?yI%9-eQgNSVng#~UrSEct8e!&8&c6|EDMOTMn1XcVdA1ID* z-CS~5_^O%rWpk}f`iO;8)-RpG!+^MW8SQ?mg!WD0#sP zOaLk+c#vfy3P7*?*Hq^}X+c5Os<(eGV~`s3I~#Hpj|p9F?CedIPGK_I=wB+pdCrJO zt-~wj)osqvJs6XfO7EG;B%=cV?jnp$*O9Hu+#r}Azqc1~oz2}REI^MpR6hHpD zkFZ%of2`2(BqIF8cK^m9ox=i3%{O}tS}8Q!nOyLstfntN+yZ{S4HPd)*6Dk!ndM7iop{Yq(5_ zS?XNb>;KSw%cDJdQ^p2z$zI?q3HQ>a!+{3wk@YtBok|@~e8lRUdO!I2j{RHOP_P-Q zavt!`$Z%)F#-_UTKi^00nR#?mpaZQYsIH0G^}(cz_sa%{yjq08SHv@qOx5FmvyCh% z*o@AUS(ZEfuhj=P!iJ9XEwBG2peT{oLLhqX%Yjln7mCjGMb{roLsMN%FoCeh6K$P> rZ&zqp}LNloIV;O0|KTrUxb2euMx9I-^Wgqa_ literal 0 HcmV?d00001 diff --git a/src/commands/consumers.ts b/src/commands/consumers.ts index dc33afe..ca3b88a 100644 --- a/src/commands/consumers.ts +++ b/src/commands/consumers.ts @@ -1,11 +1,12 @@ import * as vscode from "vscode"; import { pickClient, pickConsumerGroupId, pickTopic } from "./common"; -import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, parsePartitions, ConsumerLaunchState } from "../client"; +import { ConsumerCollection, ClientAccessor, createConsumerUri, ConsumerInfoUri, ConsumerLaunchState } from "../client"; import { KafkaExplorer } from "../explorer"; import { ConsumerVirtualTextDocumentProvider } from "../providers"; import { ProgressLocation, window } from "vscode"; import { getErrorMessage } from "../errors"; +import { ConsumerValidator } from "../validators/consumer"; export interface LaunchConsumerCommand extends ConsumerInfoUri { @@ -63,8 +64,7 @@ abstract class LaunchConsumerCommandHandler { } // Validate start command - validatePartitions(command.partitions); - validateOffset(command.fromOffset); + ConsumerValidator.validate(command); // Open the document which tracks consumer messages. const consumeUri = createConsumerUri(command); @@ -112,23 +112,6 @@ export class StopConsumerCommandHandler extends LaunchConsumerCommandHandler { } } -function validatePartitions(partitions?: string) { - if (!partitions) { - return; - } - parsePartitions(partitions); -} - -function validateOffset(offset?: string) { - if (!offset || offset === 'earliest' || offset === 'latest') { - return; - } - const valueAsNumber = parseInt(offset, 10); - if (isNaN(valueAsNumber) || valueAsNumber < 0) { - throw new Error(`from must be a positive number or equal to 'earliest' or 'latest'.`); - } -} - export class ToggleConsumerCommandHandler { public static commandId = 'vscode-kafka.consumer.toggle'; diff --git a/src/commands/producers.ts b/src/commands/producers.ts index 9d4af5c..0afe0a9 100644 --- a/src/commands/producers.ts +++ b/src/commands/producers.ts @@ -10,6 +10,8 @@ import { pickClient } from "./common"; import { MessageFormat, serialize } from "../client/serialization"; import { createProducerUri, ProducerCollection, ProducerInfoUri, ProducerLaunchState } from "../client/producer"; import { ProducerRecord } from "kafkajs"; +import { ProducerValidator } from "../validators/producer"; +import { getErrorMessage } from "../errors"; export interface ProduceRecordCommand extends ProducerInfoUri { messageKeyFormat?: MessageFormat; @@ -35,47 +37,54 @@ export class ProduceRecordCommandHandler { return; } - const { topicId, key, value } = command; - const channel = this.channelProvider.getChannel("Kafka Producer Log"); - if (topicId === undefined) { - channel.appendLine("No topic"); - return; - } - if (this.settings.producerFakerJSEnabled) { - faker.setLocale(this.settings.producerFakerJSLocale); - } + try { + ProducerValidator.validate(command); - const messages = [...Array(times).keys()].map(() => { + const { topicId, key, value } = command; + const channel = this.channelProvider.getChannel("Kafka Producer Log"); + if (topicId === undefined) { + channel.appendLine("No topic"); + return; + } if (this.settings.producerFakerJSEnabled) { - //Use same seed for key and value so we can generate content like - // key: customer-{{random.uuid}} // same value as in id - // {"id": "{{random.uuid}}"} // same value as in key - const seed = Math.floor(Math.random() * 1000000); - faker.seed(seed); - const randomizedKey = (key) ? faker.fake(key) : key; - faker.seed(seed); - const randomizedValue = faker.fake(value); + faker.setLocale(this.settings.producerFakerJSLocale); + } + + const messages = [...Array(times).keys()].map(() => { + if (this.settings.producerFakerJSEnabled) { + //Use same seed for key and value so we can generate content like + // key: customer-{{random.uuid}} // same value as in id + // {"id": "{{random.uuid}}"} // same value as in key + const seed = Math.floor(Math.random() * 1000000); + faker.seed(seed); + const randomizedKey = (key) ? faker.fake(key) : key; + faker.seed(seed); + const randomizedValue = faker.fake(value); + return { + key: serialize(randomizedKey, command.messageKeyFormat), + value: serialize(randomizedValue, command.messageValueFormat) + }; + } + + // Return key/value message as-is return { - key: serialize(randomizedKey, command.messageKeyFormat), - value: serialize(randomizedValue, command.messageValueFormat) + key: serialize(key, command.messageKeyFormat), + value: serialize(value, command.messageValueFormat) }; - } + }); - // Return key/value message as-is - return { - key: serialize(key, command.messageKeyFormat), - value: serialize(value, command.messageValueFormat) + command.clusterId = client.cluster.id; + const producerUri = createProducerUri(command); + const record = { + topic: topicId, + messages: messages, }; - }); - - command.clusterId = client.cluster.id; - const producerUri = createProducerUri(command); - const record = { - topic: topicId, - messages: messages, - }; - // Start the producer - await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer); + // Start the producer + await startProducerWithProgress(producerUri, record, this.producerCollection, channel, times, this.explorer); + } + catch (e) { + vscode.window.showErrorMessage(`Error while producing: ${getErrorMessage(e)}`); + } } } diff --git a/src/kafka-file/kafkaFileClient.ts b/src/kafka-file/kafkaFileClient.ts index c2f5246..75722f6 100644 --- a/src/kafka-file/kafkaFileClient.ts +++ b/src/kafka-file/kafkaFileClient.ts @@ -9,6 +9,7 @@ import { ConsumerLaunchStateProvider, getLanguageService, LanguageService, Produ import { runSafeAsync } from "./utils/runner"; import { TopicItem } from "../explorer"; import { KafkaModelProvider } from "../explorer/models/kafka"; +import { ThrottledDelayer } from "./utils/async"; export function startLanguageClient( clusterSettings: ClusterSettings, @@ -25,20 +26,6 @@ export function startLanguageClient( // Create the Kafka file language service. const languageService = createLanguageService(clusterSettings, producerCollection, consumerCollection, modelProvider); - // Open / Close document - context.subscriptions.push(vscode.workspace.onDidOpenTextDocument(e => { - if (e.languageId === 'kafka') { - openedDocuments.set(e.uri.toString(), e); - } - })); - - context.subscriptions.push(vscode.workspace.onDidCloseTextDocument(e => { - if (e.languageId === 'kafka') { - openedDocuments.delete(e.uri.toString()); - kafkaFileDocuments.onDocumentRemoved(e); - } - })); - const documentSelector = [ { language: "kafka", scheme: "file" }, { language: "kafka", scheme: "untitled" }, @@ -70,6 +57,32 @@ export function startLanguageClient( new KafkaFileCompletionItemProvider(kafkaFileDocuments, languageService), ':', '{', '.')); + // Validation + const diagnostics = new KafkaFileDiagnostics(kafkaFileDocuments, languageService); + context.subscriptions.push(diagnostics); + + // Open / Close document + context.subscriptions.push(vscode.workspace.onDidOpenTextDocument(e => { + if (e.languageId === 'kafka') { + openedDocuments.set(e.uri.toString(), e); + diagnostics.triggerValidate(e); + } + })); + + context.subscriptions.push(vscode.workspace.onDidChangeTextDocument(e => { + if (e.document.languageId === 'kafka') { + diagnostics.triggerValidate(e.document); + } + })); + + context.subscriptions.push(vscode.workspace.onDidCloseTextDocument(e => { + if (e.languageId === 'kafka') { + openedDocuments.delete(e.uri.toString()); + kafkaFileDocuments.onDocumentRemoved(e); + diagnostics.delete(e); + } + })); + return { dispose() { kafkaFileDocuments.dispose(); @@ -173,3 +186,47 @@ class KafkaFileCompletionItemProvider extends AbstractKafkaFileFeature implement } +class KafkaFileDiagnostics extends AbstractKafkaFileFeature implements vscode.Disposable { + + private diagnosticCollection: vscode.DiagnosticCollection; + private delayers?: { [key: string]: ThrottledDelayer }; + + constructor( + kafkaFileDocuments: LanguageModelCache, + languageService: LanguageService + ) { + super(kafkaFileDocuments, languageService); + this.diagnosticCollection = vscode.languages.createDiagnosticCollection('kafka'); + this.delayers = Object.create(null); + } + + delete(textDocument: vscode.TextDocument) { + this.diagnosticCollection.delete(textDocument.uri); + } + + public triggerValidate(textDocument: vscode.TextDocument): void { + let trigger = () => { + let key = textDocument.uri.toString(); + let delayer = this.delayers![key]; + if (!delayer) { + delayer = new ThrottledDelayer(250); + this.delayers![key] = delayer; + } + delayer.trigger(() => this.doValidate(textDocument)); + }; + trigger(); + } + + private doValidate(document: vscode.TextDocument): Promise { + return new Promise((resolve) => { + const kafkaFileDocument = this.getKafkaFileDocument(document); + const diagnostics = this.languageService.doDiagnostics(document, kafkaFileDocument); + this.diagnosticCollection!.set(document.uri, diagnostics); + resolve(); + }); + } + dispose(): void { + this.diagnosticCollection.clear(); + this.diagnosticCollection.dispose(); + } +} diff --git a/src/kafka-file/languageservice/kafkaFileLanguageService.ts b/src/kafka-file/languageservice/kafkaFileLanguageService.ts index 654f448..2ab0e83 100644 --- a/src/kafka-file/languageservice/kafkaFileLanguageService.ts +++ b/src/kafka-file/languageservice/kafkaFileLanguageService.ts @@ -1,9 +1,10 @@ -import { CodeLens, CompletionList, Position, TextDocument, Uri } from "vscode"; +import { CodeLens, CompletionList, Diagnostic, Position, TextDocument, Uri } from "vscode"; import { ConsumerLaunchState } from "../../client"; import { ProducerLaunchState } from "../../client/producer"; import { KafkaFileDocument, parseKafkaFile } from "./parser/kafkaFileParser"; import { KafkaFileCodeLenses } from "./services/codeLensProvider"; import { KafkaFileCompletion } from "./services/completion"; +import { KafkaFileDiagnostics } from "./services/diagnostics"; /** * Provider API which gets the state for a given producer. @@ -70,7 +71,15 @@ export interface LanguageService { * @param kafkaFileDocument the parsed AST. * @param position the position where the completion was triggered. */ - doComplete(document: TextDocument, kafkaFileDocument: KafkaFileDocument, position: Position): Promise + doComplete(document: TextDocument, kafkaFileDocument: KafkaFileDocument, position: Position): Promise; + + /** + * Returns the diagnostics result for the given text document and parsed AST. + * + * @param document the text document. + * @param kafkaFileDocument the parsed AST. + */ + doDiagnostics(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Diagnostic[]; } /** @@ -83,11 +92,13 @@ export interface LanguageService { */ export function getLanguageService(producerLaunchStateProvider: ProducerLaunchStateProvider, consumerLaunchStateProvider: ConsumerLaunchStateProvider, selectedClusterProvider: SelectedClusterProvider, topicProvider: TopicProvider): LanguageService { - const kafkaFileCodeLenses = new KafkaFileCodeLenses(producerLaunchStateProvider, consumerLaunchStateProvider, selectedClusterProvider); - const kafkaFileCompletion = new KafkaFileCompletion(selectedClusterProvider, topicProvider); + const codeLenses = new KafkaFileCodeLenses(producerLaunchStateProvider, consumerLaunchStateProvider, selectedClusterProvider); + const completion = new KafkaFileCompletion(selectedClusterProvider, topicProvider); + const diagnostics = new KafkaFileDiagnostics(); return { parseKafkaFileDocument: (document: TextDocument) => parseKafkaFile(document), - getCodeLenses: kafkaFileCodeLenses.getCodeLenses.bind(kafkaFileCodeLenses), - doComplete: kafkaFileCompletion.doComplete.bind(kafkaFileCompletion) + getCodeLenses: codeLenses.getCodeLenses.bind(codeLenses), + doComplete: completion.doComplete.bind(completion), + doDiagnostics: diagnostics.doDiagnostics.bind(diagnostics) }; } diff --git a/src/kafka-file/languageservice/model.ts b/src/kafka-file/languageservice/model.ts index f82bd17..cf30f71 100644 --- a/src/kafka-file/languageservice/model.ts +++ b/src/kafka-file/languageservice/model.ts @@ -1,10 +1,43 @@ +export class Model { + + private cache = new Map(); + + constructor(public readonly definitions: ModelDefinition[]) { + definitions.forEach(definition => { + this.cache.set(definition.name, definition); + }); + } + public getDefinition(name: string): ModelDefinition | undefined { + return this.cache.get(name); + } + + public hasDefinition(name: string): boolean { + return this.cache.has(name); + } + + public hasDefinitionEnum(name: string, value: string): boolean { + const definition = this.getDefinition(name); + if (!definition) { + return false; + } + if (definition.enum) { + for (const item of definition.enum) { + if (item.name === value) { + return true; + } + } + } + return false; + } +} + export interface ModelDefinition { name: string; description: string; enum?: ModelDefinition[]; } -export const consumerProperties = [ +const consumerProperties = [ { name: "topic", description: "The topic id *[required]*" @@ -17,7 +50,7 @@ export const consumerProperties = [ name: "earliest" }, { - name: "last" + name: "latest" }, { name: "0" @@ -102,8 +135,9 @@ export const consumerProperties = [ ] } ] as ModelDefinition[]; +export const consumerModel = new Model(consumerProperties); -export const producerProperties = [ +const producerProperties = [ { name: "topic", description: "The topic id *[required]*" @@ -174,7 +208,9 @@ export const producerProperties = [ } ] as ModelDefinition[]; -export const fakerjsAPI = [ +export const producerModel = new Model(producerProperties); + +const fakerjsAPI = [ { name: "address.zipCode" }, { name: "address.zipCodeByState" }, { name: "address.city" }, @@ -343,3 +379,5 @@ export const fakerjsAPI = [ { name: "vehicle.vrm" }, { name: "vehicle.bicycle" } ] as ModelDefinition[]; + +export const fakerjsAPIModel = new Model(fakerjsAPI); diff --git a/src/kafka-file/languageservice/parser/kafkaFileParser.ts b/src/kafka-file/languageservice/parser/kafkaFileParser.ts index 1ead68e..398c5ae 100644 --- a/src/kafka-file/languageservice/parser/kafkaFileParser.ts +++ b/src/kafka-file/languageservice/parser/kafkaFileParser.ts @@ -1,5 +1,6 @@ import { Position, Range, TextDocument } from "vscode"; import { findFirst } from "../../utils/arrays"; +import { consumerModel, Model, producerModel } from "../model"; export enum NodeKind { document, @@ -126,6 +127,32 @@ export class Property extends BaseNode { return new Range(start, end); } + public get propertyTrimmedValueRange(): Range | undefined { + if (!this.assignerCharacter) { + return; + } + const value = this.value?.content; + if (!value) { + return; + } + let startChar = 0; + for (startChar = 0; startChar < value.length; startChar++) { + if (value.charAt(startChar) !== ' ') { + break; + } + } + let endChar = value.length; + for (endChar = value.length - 1; endChar >= 0; endChar--) { + if (value.charAt(endChar) !== ' ') { + endChar++; + break; + } + } + const start = new Position(this.start.line, startChar + this.assignerCharacter + 1); + const end = new Position(this.end.line, endChar + this.assignerCharacter + 1); + return new Range(start, end); + } + isBeforeAssigner(position: Position): boolean { if (this.assignerCharacter) { return position.character <= this.assignerCharacter; @@ -136,7 +163,7 @@ export class Property extends BaseNode { export abstract class Block extends ChildrenNode { - constructor(public readonly type: BlockType, start: Position, end: Position) { + constructor(start: Position, end: Position, public readonly type: BlockType, public readonly model: Model) { super(start, end, type === BlockType.consumer ? NodeKind.consumerBlock : NodeKind.producerBlock); } @@ -157,11 +184,13 @@ export abstract class Block extends ChildrenNode { } export class ProducerBlock extends Block { + public value: Chunk | undefined; constructor(start: Position, end: Position) { - super(BlockType.producer, start, end); + super(start, end, BlockType.producer, producerModel); } + } export class ConsumerBlock extends Block { @@ -169,7 +198,7 @@ export class ConsumerBlock extends Block { public consumerGroupId: Chunk | undefined; constructor(start: Position, end: Position) { - super(BlockType.consumer, start, end); + super(start, end, BlockType.consumer, consumerModel); } } @@ -286,7 +315,7 @@ function parseProducerBlock(block: ProducerBlock, document: TextDocument) { continue; } - if (startsWith(lineText, ["topic:", "key:", "key-format:", "value-format:"])) { + if (isPropertyLine(lineText, block.model)) { // Known properties block.addChild(createProperty(lineText, currentLine, block)); continue; @@ -302,13 +331,13 @@ function parseProducerBlock(block: ProducerBlock, document: TextDocument) { } } -function startsWith(lineText: string, searchStrings: string[]): boolean { - for (let i = 0; i < searchStrings.length; i++) { - if (lineText.startsWith(searchStrings[i])) { - return true; - } +function isPropertyLine(lineText: string, model: Model): boolean { + const index = lineText.indexOf(':'); + if (index === -1) { + return false; } - return false; + const propertyName = lineText.substring(0, index); + return model.hasDefinition(propertyName); } function isIgnoreLine(lineText: string): boolean { diff --git a/src/kafka-file/languageservice/services/completion.ts b/src/kafka-file/languageservice/services/completion.ts index a4ada38..cc03cec 100644 --- a/src/kafka-file/languageservice/services/completion.ts +++ b/src/kafka-file/languageservice/services/completion.ts @@ -1,6 +1,6 @@ import { TextDocument, Position, CompletionList, CompletionItem, SnippetString, MarkdownString, CompletionItemKind, Range } from "vscode"; import { SelectedClusterProvider, TopicDetail, TopicProvider } from "../kafkaFileLanguageService"; -import { consumerProperties, fakerjsAPI, ModelDefinition, producerProperties } from "../model"; +import { consumerModel, fakerjsAPIModel, Model, ModelDefinition, producerModel } from "../model"; import { Block, BlockType, Chunk, ConsumerBlock, KafkaFileDocument, MustacheExpression, NodeKind, ProducerBlock, Property } from "../parser/kafkaFileParser"; /** @@ -117,18 +117,18 @@ export class KafkaFileCompletion { } async collectConsumerPropertyNames(propertyName: string | undefined, lineRange: Range, block: ConsumerBlock, items: Array) { - await this.collectPropertyNames(propertyName, lineRange, block, consumerProperties, items); + await this.collectPropertyNames(propertyName, lineRange, block, consumerModel, items); } async collectProducerPropertyNames(propertyName: string | undefined, lineRange: Range, block: ProducerBlock, items: Array) { - await this.collectPropertyNames(propertyName, lineRange, block, producerProperties, items); + await this.collectPropertyNames(propertyName, lineRange, block, producerModel, items); } - async collectPropertyNames(propertyName: string | undefined, lineRange: Range, block: Block, metadata: ModelDefinition[], items: Array) { + async collectPropertyNames(propertyName: string | undefined, lineRange: Range, block: Block, metadata: Model, items: Array) { const existingProperties = block.properties .filter(property => property.key) .map(property => property.key?.content); - for (const definition of metadata) { + for (const definition of metadata.definitions) { const currentName = definition.name; if (existingProperties.indexOf(currentName) === -1 || propertyName === currentName) { const item = new CompletionItem(currentName); @@ -161,7 +161,7 @@ export class KafkaFileCompletion { default: // CONSUMER // key-format: | - this.collectPropertyValues(propertyValue, property, block, consumerProperties, items); + this.collectPropertyValues(propertyValue, property, block, consumerModel, items); break; } } @@ -177,14 +177,17 @@ export class KafkaFileCompletion { default: // PRODUCER // key-format: | - this.collectPropertyValues(propertyValue, property, block, producerProperties, items); + this.collectPropertyValues(propertyValue, property, block, producerModel, items); break; } } - collectPropertyValues(propertyValue: Chunk | undefined, property: Property, block: Block, metadata: ModelDefinition[], items: Array) { + collectPropertyValues(propertyValue: Chunk | undefined, property: Property, block: Block, metadata: Model, items: Array) { const propertyName = property.propertyName; - const definition = metadata.find(definition => definition.name === propertyName); + if (!propertyName) { + return; + } + const definition = metadata.getDefinition(propertyName); if (!definition || !definition.enum) { return; } @@ -207,6 +210,7 @@ export class KafkaFileCompletion { collectFakerJSExpressions(expression: MustacheExpression, items: CompletionItem[]) { const expressionRange = expression.expressionRange; + const fakerjsAPI = fakerjsAPIModel.definitions; fakerjsAPI.forEach((definition) => { const value = definition.name; const item = new CompletionItem(value); diff --git a/src/kafka-file/languageservice/services/diagnostics.ts b/src/kafka-file/languageservice/services/diagnostics.ts new file mode 100644 index 0000000..2d5fcf9 --- /dev/null +++ b/src/kafka-file/languageservice/services/diagnostics.ts @@ -0,0 +1,145 @@ +import { Diagnostic, DiagnosticSeverity, Position, Range, TextDocument } from "vscode"; +import { Block, BlockType, ConsumerBlock, KafkaFileDocument, ProducerBlock, Property } from "../parser/kafkaFileParser"; +import { ConsumerValidator } from "../../../validators/consumer"; +import { ProducerValidator } from "../../../validators/producer"; +import { CommonsValidator } from "../../../validators/commons"; + +/** + * Kafka file diagnostics support. + */ +export class KafkaFileDiagnostics { + + doDiagnostics(document: TextDocument, kafkaFileDocument: KafkaFileDocument): Diagnostic[] { + const diagnostics: Diagnostic[] = []; + for (const block of kafkaFileDocument.blocks) { + if (block.type === BlockType.consumer) { + this.validateConsumerBlock(block, diagnostics); + } else { + this.validateProducerBlock(block, diagnostics); + } + } + return diagnostics; + } + + validateConsumerBlock(block: ConsumerBlock, diagnostics: Diagnostic[]) { + this.validateProperties(block, diagnostics); + } + + validateProducerBlock(block: ProducerBlock, diagnostics: Diagnostic[]) { + this.validateProperties(block, diagnostics); + this.validateProducerValue(block, diagnostics); + } + validateProducerValue(block: ProducerBlock, diagnostics: Diagnostic[]) { + const value = block.value; + const errorMessage = ProducerValidator.validateProducerValue(value?.content); + if (errorMessage) { + const range = new Range(block.start, new Position(block.start.line, block.start.character + 8)); + diagnostics.push(new Diagnostic(range, errorMessage, DiagnosticSeverity.Error)); + } + } + + validateProperties(block: Block, diagnostics: Diagnostic[]) { + const existingProperties = new Map(); + let topicProperty: Property | undefined; + for (const property of block.properties) { + const propertyName = property.propertyName; + this.validateProperty(property, block, diagnostics); + if (propertyName === 'topic') { + topicProperty = property; + } + if (propertyName) { + let properties = existingProperties.get(propertyName); + if (!properties) { + properties = []; + existingProperties.set(propertyName, properties); + } + properties.push(property); + } + } + // Validate duplicate properties + existingProperties.forEach((properties, propertyName) => { + if (properties.length > 1) { + properties.forEach(property => { + const range = property.propertyKeyRange; + diagnostics.push(new Diagnostic(range, `Duplicate property '${propertyName}'`, DiagnosticSeverity.Warning)); + }); + } + }); + + if (!topicProperty) { + const range = new Range(block.start, new Position(block.start.line, block.start.character + 8)); + diagnostics.push(new Diagnostic(range, `The ${block.type === BlockType.consumer ? 'consumer' : 'producer'} must declare the 'topic:' property.`, DiagnosticSeverity.Error)); + } + } + validateProperty(property: Property, block: Block, diagnostics: Diagnostic[]) { + const propertyName = property.propertyName; + // 1. Validate property syntax + this.validateSyntaxProperty(propertyName, property, diagnostics); + + if (propertyName) { + const definition = block.model.getDefinition(propertyName); + if (!definition) { + // 2. Validate unknown property + this.validateUnknownProperty(propertyName, property, diagnostics); + } else { + // 3. Validate property value + this.validatePropertyValue(property, block.type, diagnostics); + } + } + } + + private validateSyntaxProperty(propertyName: string | undefined, property: Property, diagnostics: Diagnostic[]) { + // 1.1. property must contains ':' assigner + const assigner = property.assignerCharacter; + if (!assigner) { + // Error => topic + const range = property.propertyRange; + diagnostics.push(new Diagnostic(range, `Missing ':' sign after '${propertyName}'`, DiagnosticSeverity.Error)); + return; + } + // 1.2. property must declare a key + if (!propertyName) { + // Error => :string + const range = property.propertyRange; + diagnostics.push(new Diagnostic(range, "Property must define a name before ':' sign", DiagnosticSeverity.Error)); + return; + } + } + + validateUnknownProperty(propertyName: string, property: Property, diagnostics: Diagnostic[]) { + const range = property.propertyKeyRange; + diagnostics.push(new Diagnostic(range, `Unkwown property '${propertyName}'`, DiagnosticSeverity.Warning)); + } + + validatePropertyValue(property: Property, type: BlockType, diagnostics: Diagnostic[]) { + const propertyName = property.propertyName; + if (!propertyName) { + return; + } + const propertyValue = property.propertyValue; + const range = propertyValue ? property.propertyTrimmedValueRange : property.propertyKeyRange; + if (!range) { + return; + } + const errorMessage = this.validate(propertyName, type, propertyValue); + if (errorMessage) { + diagnostics.push(new Diagnostic(range, errorMessage, DiagnosticSeverity.Error)); + } + } + + private validate(propertyName: string, type: BlockType, propertyValue?: string): string | undefined { + switch (propertyName) { + case 'topic': + return CommonsValidator.validateTopic(propertyValue); + case 'key-format': + return type === BlockType.consumer ? ConsumerValidator.validateKeyFormat(propertyValue) : ProducerValidator.validateKeyFormat(propertyValue); + case 'value-format': + return type === BlockType.consumer ? ConsumerValidator.validateValueFormat(propertyValue) : ProducerValidator.validateValueFormat(propertyValue); + case 'from': + return ConsumerValidator.validateOffset(propertyValue); + case 'partitions': { + return ConsumerValidator.validatePartitions(propertyValue); + } + } + } +} diff --git a/src/kafka-file/utils/async.ts b/src/kafka-file/utils/async.ts new file mode 100644 index 0000000..3e736ad --- /dev/null +++ b/src/kafka-file/utils/async.ts @@ -0,0 +1,185 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +export interface ITask { + (): T; +} + +/** + * A helper to prevent accumulation of sequential async tasks. + * + * Imagine a mail man with the sole task of delivering letters. As soon as + * a letter submitted for delivery, he drives to the destination, delivers it + * and returns to his base. Imagine that during the trip, N more letters were submitted. + * When the mail man returns, he picks those N letters and delivers them all in a + * single trip. Even though N+1 submissions occurred, only 2 deliveries were made. + * + * The throttler implements this via the queue() method, by providing it a task + * factory. Following the example: + * + * var throttler = new Throttler(); + * var letters = []; + * + * function letterReceived(l) { + * letters.push(l); + * throttler.queue(() => { return makeTheTrip(); }); + * } + */ +export class Throttler { + + private activePromise: Promise | null; + private queuedPromise: Promise | null; + private queuedPromiseFactory: ITask> | null; + + constructor() { + this.activePromise = null; + this.queuedPromise = null; + this.queuedPromiseFactory = null; + } + + public queue(promiseFactory: ITask>): Promise { + if (this.activePromise) { + this.queuedPromiseFactory = promiseFactory; + + if (!this.queuedPromise) { + let onComplete = () => { + this.queuedPromise = null; + + let result = this.queue(this.queuedPromiseFactory!); + this.queuedPromiseFactory = null; + + return result; + }; + + this.queuedPromise = new Promise((resolve) => { + this.activePromise!.then(onComplete, onComplete).then(resolve); + }); + } + + return new Promise((resolve, reject) => { + this.queuedPromise!.then(resolve, reject); + }); + } + + this.activePromise = promiseFactory(); + + return new Promise((resolve, reject) => { + this.activePromise!.then((result: T) => { + this.activePromise = null; + resolve(result); + }, (err: any) => { + this.activePromise = null; + reject(err); + }); + }); + } +} + +/** + * A helper to delay execution of a task that is being requested often. + * + * Following the throttler, now imagine the mail man wants to optimize the number of + * trips proactively. The trip itself can be long, so the he decides not to make the trip + * as soon as a letter is submitted. Instead he waits a while, in case more + * letters are submitted. After said waiting period, if no letters were submitted, he + * decides to make the trip. Imagine that N more letters were submitted after the first + * one, all within a short period of time between each other. Even though N+1 + * submissions occurred, only 1 delivery was made. + * + * The delayer offers this behavior via the trigger() method, into which both the task + * to be executed and the waiting period (delay) must be passed in as arguments. Following + * the example: + * + * var delayer = new Delayer(WAITING_PERIOD); + * var letters = []; + * + * function letterReceived(l) { + * letters.push(l); + * delayer.trigger(() => { return makeTheTrip(); }); + * } + */ +export class Delayer { + + public defaultDelay: number; + private timeout: NodeJS.Timer | null; + private completionPromise: Promise | null; + private onResolve: ((value: T | PromiseLike | undefined) => void) | null; + private task: ITask | null; + + constructor(defaultDelay: number) { + this.defaultDelay = defaultDelay; + this.timeout = null; + this.completionPromise = null; + this.onResolve = null; + this.task = null; + } + + public trigger(task: ITask, delay: number = this.defaultDelay): Promise { + this.task = task; + this.cancelTimeout(); + + if (!this.completionPromise) { + this.completionPromise = new Promise((resolve) => { + this.onResolve = resolve; + }).then(() => { + this.completionPromise = null; + this.onResolve = null; + + let result = this.task!(); + this.task = null; + + return result; + }); + } + + this.timeout = setTimeout(() => { + this.timeout = null; + this.onResolve!(undefined); + }, delay); + + return this.completionPromise; + } + + public isTriggered(): boolean { + return this.timeout !== null; + } + + public cancel(): void { + this.cancelTimeout(); + + if (this.completionPromise) { + this.completionPromise = null; + } + } + + private cancelTimeout(): void { + if (this.timeout !== null) { + clearTimeout(this.timeout); + this.timeout = null; + } + } +} + +/** + * A helper to delay execution of a task that is being requested often, while + * preventing accumulation of consecutive executions, while the task runs. + * + * Simply combine the two mail man strategies from the Throttler and Delayer + * helpers, for an analogy. + */ +export class ThrottledDelayer extends Delayer> { + + private throttler: Throttler; + + constructor(defaultDelay: number) { + super(defaultDelay); + + this.throttler = new Throttler(); + } + + public trigger(promiseFactory: ITask>, delay?: number): Promise> { + return super.trigger(() => this.throttler.queue(promiseFactory), delay); + } +} diff --git a/src/test/suite/kafka-file/languageservice/completionProperties.test.ts b/src/test/suite/kafka-file/languageservice/completionProperties.test.ts index 43a319b..783c0cc 100644 --- a/src/test/suite/kafka-file/languageservice/completionProperties.test.ts +++ b/src/test/suite/kafka-file/languageservice/completionProperties.test.ts @@ -38,7 +38,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(1, 0), position(1, 0)) }, { @@ -74,7 +74,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(1, 0), position(1, 1)) }, { @@ -109,7 +109,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(1, 0), position(1, 5)) }, { @@ -144,7 +144,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(1, 0), position(1, 6)) }, { @@ -179,7 +179,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(1, 0), position(1, 6)) }, { @@ -214,7 +214,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { }, { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(1, 0), position(1, 11)) }, { @@ -251,7 +251,7 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { /* 'from' is removed from completion because it is declared in the CONSUMER { label: 'from', kind: CompletionItemKind.Property, - insertText: 'from: ${1|earliest,last,0|}', + insertText: 'from: ${1|earliest,latest,0|}', range: range(position(2, 0), position(2, 5)) },*/ { @@ -285,8 +285,8 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { range: range(position(1, 5), position(1, 5)) }, { - label: 'last', kind: CompletionItemKind.Value, - insertText: ' last', + label: 'latest', kind: CompletionItemKind.Value, + insertText: ' latest', range: range(position(1, 5), position(1, 5)) }, { @@ -310,8 +310,8 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { range: range(position(1, 5), position(1, 7)) }, { - label: 'last', kind: CompletionItemKind.Value, - insertText: ' last', + label: 'latest', kind: CompletionItemKind.Value, + insertText: ' latest', range: range(position(1, 5), position(1, 7)) }, { @@ -336,8 +336,8 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { range: range(position(2, 5), position(2, 7)) }, { - label: 'last', kind: CompletionItemKind.Value, - insertText: ' last', + label: 'latest', kind: CompletionItemKind.Value, + insertText: ' latest', range: range(position(2, 5), position(2, 7)) }, { @@ -363,8 +363,8 @@ suite("Kafka File CONSUMER Completion Test Suite", () => { range: range(position(2, 5), position(2, 7)) }, { - label: 'last', kind: CompletionItemKind.Value, - insertText: ' last', + label: 'latest', kind: CompletionItemKind.Value, + insertText: ' latest', range: range(position(2, 5), position(2, 7)) }, { diff --git a/src/test/suite/kafka-file/languageservice/diagnosticsProperties.test.ts b/src/test/suite/kafka-file/languageservice/diagnosticsProperties.test.ts new file mode 100644 index 0000000..0d7bf0f --- /dev/null +++ b/src/test/suite/kafka-file/languageservice/diagnosticsProperties.test.ts @@ -0,0 +1,402 @@ +import { DiagnosticSeverity } from "vscode"; +import { assertDiagnostics, diagnostic, position } from "./kafkaAssert"; + +suite("Kafka File Diagnostics Test Suite", () => { + + test("Empty diagnostics", async () => { + await assertDiagnostics('', []); + }); +}); + +suite("Kafka File CONSUMER Diagnostics Test Suite", () => { + + test("Duplicate property validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'topic:efgh', + [ + diagnostic( + position(1, 0), + position(1, 5), + "Duplicate property 'topic'", + DiagnosticSeverity.Warning + ), + diagnostic( + position(2, 0), + position(2, 5), + "Duplicate property 'topic'", + DiagnosticSeverity.Warning + ) + ] + ); + + }); + + test("Unkwown property validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'abcd:efgh', + [ + diagnostic( + position(2, 0), + position(2, 4), + "Unkwown property 'abcd'", + DiagnosticSeverity.Warning + ) + ] + ); + + }); + + test("Syntax property validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'efgh', + [ + diagnostic( + position(2, 0), + position(2, 4), + "Missing ':' sign after 'efgh'", + DiagnosticSeverity.Error + ), + diagnostic( + position(2, 0), + position(2, 4), + "Unkwown property 'efgh'", + DiagnosticSeverity.Warning + ) + ] + ); + + }); + + test("Topic validation", async () => { + + await assertDiagnostics('CONSUMER', + [ + diagnostic( + position(0, 0), + position(0, 8), + 'The consumer must declare the \'topic:\' property.', + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:', + [ + diagnostic( + position(1, 0), + position(1, 5), + 'The topic is required.', + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:a@bcd', + [ + diagnostic( + position(1, 6), + position(1, 11), + "The topic 'a@bcd' is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:..', + [ + diagnostic( + position(1, 6), + position(1, 8), + "The topic cannot be '.' or '..'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic: .. ', + [ + diagnostic( + position(1, 8), + position(1, 10), + "The topic cannot be '.' or '..'", + DiagnosticSeverity.Error + ) + ] + ); + }); + + test("From validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'from:abcd', + [ + diagnostic( + position(2, 5), + position(2, 9), + "from must be a positive number or equal to 'earliest' or 'latest'.", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'from: 10', + [] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'from: latest', + [] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'from: earliest', + [] + ); + + }); + + test("Partitions validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'partitions:abcd', + [ + diagnostic( + position(2, 11), + position(2, 15), + "Unexpected character 'a' in partitions expression.", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'partitions: 10', + [] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'partitions: 1,10', + [] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'partitions: 1,10-20', + [] + ); + + }); + + test("Key-format validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'key-format:abcd', + [ + diagnostic( + position(2, 11), + position(2, 15), + "Invalid key format for 'abcd'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'key-format: string', + [] + ); + + }); + + test("Value-format validation", async () => { + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'value-format:abcd', + [ + diagnostic( + position(2, 13), + position(2, 17), + "Invalid value format for 'abcd'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'CONSUMER\n' + + 'topic:abcd\n' + + 'value-format: string', + [] + ); + + }); + +}); + +suite("Kafka File PRODUCER Diagnostics Test Suite", () => { + + test("Required topic and value", async () => { + await assertDiagnostics( + 'PRODUCER', + [ + diagnostic( + position(0, 0), + position(0, 8), + 'The producer must declare the \'topic:\' property.', + DiagnosticSeverity.Error + ), + diagnostic( + position(0, 0), + position(0, 8), + 'The producer value is required.', + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:', + [ + diagnostic( + position(1, 0), + position(1, 5), + 'The topic is required.', + DiagnosticSeverity.Error + ), + diagnostic( + position(0, 0), + position(0, 8), + 'The producer value is required.', + DiagnosticSeverity.Error + ) + ] + ); + }); + + test("Syntax topic", async () => { + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:a@bcd\n' + + 'abcd', + [ + diagnostic( + position(1, 6), + position(1, 11), + "The topic 'a@bcd' is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:..\n' + + 'abcd', + [ + diagnostic( + position(1, 6), + position(1, 8), + "The topic cannot be '.' or '..'", + DiagnosticSeverity.Error + ) + ] + ); + }); + + test("Key-format validation", async () => { + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:abcd\n' + + 'key-format:abcd\n' + + 'efgh', + [ + diagnostic( + position(2, 11), + position(2, 15), + "Invalid key format for 'abcd'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:abcd\n' + + 'key-format: string\n' + + 'efgh', + [] + ); + + }); + + test("Value-format validation", async () => { + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:abcd\n' + + 'value-format:abcd\n' + + 'efgh', + [ + diagnostic( + position(2, 13), + position(2, 17), + "Invalid value format for 'abcd'", + DiagnosticSeverity.Error + ) + ] + ); + + await assertDiagnostics( + 'PRODUCER\n' + + 'topic:abcd\n' + + 'value-format: string\n' + + 'efgh', + [] + ); + + }); + +}); + diff --git a/src/test/suite/kafka-file/languageservice/kafkaAssert.ts b/src/test/suite/kafka-file/languageservice/kafkaAssert.ts index 9e2e076..f3339a1 100644 --- a/src/test/suite/kafka-file/languageservice/kafkaAssert.ts +++ b/src/test/suite/kafka-file/languageservice/kafkaAssert.ts @@ -1,5 +1,5 @@ import * as assert from "assert"; -import { CodeLens, Position, Range, Command, Uri, workspace, CompletionList, SnippetString } from "vscode"; +import { CodeLens, Position, Range, Command, Uri, workspace, CompletionList, SnippetString, Diagnostic, DiagnosticSeverity } from "vscode"; import { ConsumerLaunchState } from "../../../../client"; import { ProducerLaunchState } from "../../../../client/producer"; import { ConsumerLaunchStateProvider, getLanguageService, LanguageService, ProducerLaunchStateProvider, SelectedClusterProvider, TopicDetail, TopicProvider } from "../../../../kafka-file/languageservice/kafkaFileLanguageService"; @@ -126,6 +126,21 @@ export async function testCompletion(value: string, expected: CompletionList, pa } } + +// Diagnostics assert + +export function diagnostic(start: Position, end: Position, message: string, severity: DiagnosticSeverity): Diagnostic { + const r = range(start, end); + return new Diagnostic(r, message, severity); +} + +export async function assertDiagnostics(content: string, expected: Array, ls = languageService) { + let document = await getDocument(content); + let ast = ls.parseKafkaFileDocument(document); + const actual = ls.doDiagnostics(document, ast); + assert.deepStrictEqual(actual, expected); +} + // Kafka parser assert export interface ExpectedChunckResult { diff --git a/src/validators/commons.ts b/src/validators/commons.ts new file mode 100644 index 0000000..eaee38a --- /dev/null +++ b/src/validators/commons.ts @@ -0,0 +1,53 @@ +const maxNameLength = 249; +const legalChars = /^[a-zA-Z0-9\\._\\-]*$/; + +export namespace CommonsValidator { + export function validateTopic(topic?: string, existingTopicNames: string[] = [], topicField = 'The topic'): string | undefined { + // See topic name validation rule at https://github.com/apache/kafka/blob/8007211cc982d8458223e866c1ee7d94b69e0249/core/src/main/scala/kafka/common/Topic.scala#L33 + let result = validateFieldRequired(topicField, topic); + if (result) { + return result; + } + if (!topic) { + // Already managed with validateFieldRequired + } else if (topic === "." || topic === "..") { + return `${topicField} cannot be '.' or '..'`; + } + else if (topic.length > maxNameLength) { + return `${topicField} is illegal, cannot be longer than ${maxNameLength} characters`; + } + else if (!legalChars.test(topic)) { + return `${topicField} '${topic}' is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'`; + } + if (!topic) { + return 'The topic value is required.'; + } + return validateFieldUniqueValue(topicField, topic, existingTopicNames); + } + + export function validateFieldRequired(name: string, value?: string): string | undefined { + if (!value || value.length <= 0) { + return `${name} is required.`; + } + if (value.trim().length === 0) { + return `${name} cannot be blank.`; + } + } + + export function validateFieldUniqueValue(name: string, value: string, values: string[]): string | undefined { + if (values.indexOf(value) !== -1) { + return `${name} '${value}' already exists.`; + } + } + + export function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined { + const valueAsNumber = Number(value); + if (isNaN(valueAsNumber) || valueAsNumber < 1) { + return `${name} must be a positive number.`; + } + if (max && valueAsNumber > max) { + return `${name} can not be greater than ${max}.`; + } + } + +} diff --git a/src/validators/consumer.ts b/src/validators/consumer.ts new file mode 100644 index 0000000..9d2e261 --- /dev/null +++ b/src/validators/consumer.ts @@ -0,0 +1,59 @@ +import { parsePartitions } from "../client"; +import { LaunchConsumerCommand } from "../commands"; +import { consumerModel } from "../kafka-file/languageservice/model"; +import { CommonsValidator } from "./commons"; + +export namespace ConsumerValidator { + + export function validate(command: LaunchConsumerCommand) { + let errorMessage = + CommonsValidator.validateTopic(command.topicId) || + validateOffset(command.fromOffset) || + validatePartitions(command.partitions) || + validateKeyFormat(command.messageKeyFormat) || + validateValueFormat(command.messageValueFormat); + if (errorMessage) { + throw new Error(errorMessage); + } + } + + export function validateOffset(offset?: string): string | undefined { + if (!offset || offset === 'earliest' || offset === 'latest') { + return; + } + const valueAsNumber = parseInt(offset, 10); + if (isNaN(valueAsNumber) || valueAsNumber < 0) { + return "from must be a positive number or equal to 'earliest' or 'latest'."; + } + } + + export function validatePartitions(partitions?: string): string | undefined { + if (!partitions) { + return; + } + try { + parsePartitions(partitions); + } + catch (e) { + return e.message; + } + } + + export function validateKeyFormat(propertyValue?: string): string | undefined { + if (!propertyValue) { + return; + } + if (!consumerModel.hasDefinitionEnum('key-format', propertyValue)) { + return `Invalid key format for '${propertyValue}'`; + } + } + + export function validateValueFormat(propertyValue?: string): string | undefined { + if (!propertyValue) { + return; + } + if (!consumerModel.hasDefinitionEnum('value-format', propertyValue)) { + return `Invalid value format for '${propertyValue}'`; + } + } +} diff --git a/src/validators/producer.ts b/src/validators/producer.ts new file mode 100644 index 0000000..6fd5f55 --- /dev/null +++ b/src/validators/producer.ts @@ -0,0 +1,41 @@ +import { ProduceRecordCommand } from "../commands"; +import { producerModel } from "../kafka-file/languageservice/model"; +import { CommonsValidator } from "./commons"; + +export namespace ProducerValidator { + + export function validate(command: ProduceRecordCommand) { + let errorMessage = + CommonsValidator.validateTopic(command.topicId) || + validateProducerValue(command.value) || + validateKeyFormat(command.messageKeyFormat) || + validateValueFormat(command.messageValueFormat); + if (errorMessage) { + throw new Error(errorMessage); + } + } + + export function validateProducerValue(value?: string): string | undefined { + if (!value) { + return 'The producer value is required.'; + } + } + + export function validateKeyFormat(propertyValue?: string): string | undefined { + if (!propertyValue) { + return; + } + if (!producerModel.hasDefinitionEnum('key-format', propertyValue)) { + return `Invalid key format for '${propertyValue}'`; + } + } + + export function validateValueFormat(propertyValue?: string): string | undefined { + if (!propertyValue) { + return; + } + if (!producerModel.hasDefinitionEnum('value-format', propertyValue)) { + return `Invalid value format for '${propertyValue}'`; + } + } +} diff --git a/src/wizards/validators.ts b/src/wizards/validators.ts index 07f9035..5f186af 100644 --- a/src/wizards/validators.ts +++ b/src/wizards/validators.ts @@ -1,23 +1,25 @@ // ------------------ Cluster validators +import { CommonsValidator } from "../validators/commons"; + const BROKER_FIELD = 'Broker'; const CLUSTER_FIELD = 'Cluster name'; const USERNAME_FIELD = 'User name'; export async function validateBroker(broker: string): Promise { - return validateFieldRequired(BROKER_FIELD, broker); + return CommonsValidator.validateFieldRequired(BROKER_FIELD, broker); } export async function validateClusterName(cluster: string, existingClusterNames: string[]): Promise { - const result = validateFieldRequired(CLUSTER_FIELD, cluster); + const result = CommonsValidator.validateFieldRequired(CLUSTER_FIELD, cluster); if (result) { return result; } - return validateFieldUniqueValue(CLUSTER_FIELD, cluster, existingClusterNames); + return CommonsValidator.validateFieldUniqueValue(CLUSTER_FIELD, cluster, existingClusterNames); } export async function validateAuthentificationUserName(userName: string): Promise { - return validateFieldRequired(USERNAME_FIELD, userName); + return CommonsValidator.validateFieldRequired(USERNAME_FIELD, userName); } // ------------------ Topic validators @@ -26,66 +28,22 @@ const TOPIC_FIELD = 'Topic name'; const PARTITIONS_FIELD = 'Number of partitions'; const REPLICATION_FACTOR_FIELD = 'Replication Factor'; -const maxNameLength = 249; -const legalChars = /^[a-zA-Z0-9\\._\\-]*$/; - export async function validateTopicName(topic: string, existingTopicNames: string[]): Promise { - // See topic name validation rule at https://github.com/apache/kafka/blob/8007211cc982d8458223e866c1ee7d94b69e0249/core/src/main/scala/kafka/common/Topic.scala#L33 - const result = validateFieldRequired(TOPIC_FIELD, topic); - if (result) { - return result; - } - else if (topic === "." || topic === "..") { - return `${TOPIC_FIELD} cannot be '.' or '..'`; - } - else if (topic.length > maxNameLength) { - return `${TOPIC_FIELD} is illegal, cannot be longer than ${maxNameLength} characters`; - } - else if (!legalChars.test(topic)) { - return `${TOPIC_FIELD} '${topic}' is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'`; - } - return validateFieldUniqueValue(TOPIC_FIELD, topic, existingTopicNames); + return CommonsValidator.validateTopic(topic, existingTopicNames, TOPIC_FIELD); } export async function validatePartitions(partitions: string): Promise { - const result = validateFieldRequired(PARTITIONS_FIELD, partitions); + const result = CommonsValidator.validateFieldRequired(PARTITIONS_FIELD, partitions); if (result) { return result; } - return validateFieldPositiveNumber(PARTITIONS_FIELD, partitions); + return CommonsValidator.validateFieldPositiveNumber(PARTITIONS_FIELD, partitions); } export async function validateReplicationFactor(replicationFactor: string, max: number): Promise { - const result = validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor); + const result = CommonsValidator.validateFieldRequired(REPLICATION_FACTOR_FIELD, replicationFactor); if (result) { return result; - } - return validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max); -} - -// ------------------ Commons Validators - -function validateFieldRequired(name: string, value: string): string | undefined { - if (value.length <= 0) { - return `${name} is required.`; - } - if (value.trim().length === 0) { - return `${name} cannot be blank.`; - } -} - -function validateFieldUniqueValue(name: string, value: string, values: string[]): string | undefined { - if (values.indexOf(value) !== -1) { - return `${name} '${value}' already exists.`; - } -} - -function validateFieldPositiveNumber(name: string, value: string, max?: number): string | undefined { - const valueAsNumber = Number(value); - if (isNaN(valueAsNumber) || valueAsNumber < 1) { - return `${name} must be a positive number.`; - } - if (max && valueAsNumber > max) { - return `${name} can not be greater than ${max}.`; } + return CommonsValidator.validateFieldPositiveNumber(REPLICATION_FACTOR_FIELD, replicationFactor, max); }