2021年3月17日星期三

KStream Aggregation based custom object specific field on a window time

I have a custom object like below.

public class CustomObject{        public int source;      public long value;        public long timestamp;  }  

Key and values are like below in Kafka's message.

Key >>>>> 111.10.73.10 Value => -1062731364,31300,1615979127  Key >>>>> 111.10.73.10 Value => -1062729572,31400,1615979127  Key >>>>> 111.10.73.10 Value => -1062731363,31500,1615979127  Key >>>>> 111.10.73.10 Value => -1062729571,31600,1615979127  Key >>>>> 111.10.73.10 Value => -1062731362,31700,1615979127  Key >>>>> 111.10.73.10 Value => -1062729570,31800,1615979127  Key >>>>> 111.10.73.10 Value => -1062731361,31900,1615979127  Key >>>>> 111.10.73.10 Value => -1062729569,32000,1615979127  Key >>>>> 111.10.73.10 Value => -1062731360,32100,1615979127  Key >>>>> 111.10.73.10 Value => -1062729568,32200,1615979127  Key >>>>> 111.10.73.10 Value => -1062731359,32300,1615979127  Key >>>>> 111.10.73.10 Value => -1062729567,32400,1615979127  Key >>>>> 111.10.73.10 Value => -1062731358,32500,1615979127  Key >>>>> 111.10.73.10 Value => -1062729566,32600,1615979127  Key >>>>> 111.10.73.10 Value => -1062731357,32700,1615979127  Key >>>>> 111.10.73.10 Value => -1062729565,32800,1615979127  Key >>>>> 111.10.73.7 Value => -1062731193,14300,1615979127  Key >>>>> 111.10.73.7 Value => -1062729401,14400,1615979127  Key >>>>> 111.10.73.7 Value => -1062731192,14500,1615979127  Key >>>>> 111.10.73.7 Value => -1062729400,14600,1615979127  Key >>>>> 111.10.73.7 Value => -1062731191,14700,1615979127  Key >>>>> 111.10.73.7 Value => -1062729399,14800,1615979127  Key >>>>> 111.10.73.7 Value => -1062731190,14900,1615979127  Key >>>>> 111.10.73.7 Value => -1062729398,15000,1615979127  Key >>>>> 111.10.73.7 Value => -1062731189,15100,1615979127  Key >>>>> 111.10.73.7 Value => -1062729397,15200,1615979127  Key >>>>> 111.10.73.7 Value => -1062731188,15300,1615979127  Key >>>>> 111.10.73.7 Value => -1062729396,15400,1615979127  Key >>>>> 111.10.73.7 Value => -1062731187,15500,1615979127  Key >>>>> 111.10.73.7 Value => -1062729395,15600,1615979127  Key >>>>> 111.10.73.7 Value => -1062731186,15700,1615979127  Key >>>>> 111.10.73.7 Value => -1062729394,15800,1615979127  Key >>>>> 111.10.73.9 Value => -1062731124,28100,1615979127  Key >>>>> 111.10.73.9 Value => -1062729332,28200,1615979127  

I need to aggregate the top 3 'source' field events/messages (by total count) for the last 1 mins Window. After aggregation below reduce messages would be like this from the above list.

Top 1

Key >>>>> 111.10.73.10 Value => -1062731364,31300,1615979127  Key >>>>> 111.10.73.10 Value => -1062731364,31600,1615979127  Key >>>>> 111.10.73.10 Value => -1062731364,31900,1615979127  Key >>>>> 111.10.73.10 Value => -1062731364,32200,1615979127  Key >>>>> 111.10.73.10 Value => -1062731364,32500,1615979127  Key >>>>> 111.10.73.7 Value => -1062731364,14300,1615979127  Key >>>>> 111.10.73.7 Value => -1062731364,14600,1615979127  Key >>>>> 111.10.73.7 Value => -1062731364,15400,1615979127  

Top 2

Key >>>>> 111.10.73.10 Value => -1062729572,31400,1615979127  Key >>>>> 111.10.73.10 Value => -1062729572,32100,1615979127  Key >>>>> 111.10.73.10 Value => -1062729572,32400,1615979127  Key >>>>> 111.10.73.9 Value => -1062729572,28100,1615979127  

Top 3

Key >>>>> 111.10.73.10 Value => -1062731359,32300,1615979127  Key >>>>> 111.10.73.7 Value => -1062731359,15700,1615979127  

Is this requirement is possible to complete through the KStream object?

final KStream<String, CustomObject> source = builder.stream("topic");  source.<???> ???  
https://stackoverflow.com/questions/66673290/kstream-aggregation-based-custom-object-specific-field-on-a-window-time March 17, 2021 at 08:40PM

没有评论:

发表评论