Como criar um custom serde para Kafka Streams
Se você não está familiarizado, o Kafka Streams é uma biblioteca fornecida pela Apache para interagir com o Kafka usando streams - pequenos pedaços de dados que são entregues ao longo do tempo e em tempo real.
Embora este post não ensine Kafka, aqui está uma breve introdução: no Kafka Streams, você pode construir "topologias", que são essencialmente o fluxo de transformação, processamento e filtragem de dados que você pode realizar usando KStreams e KTables.
Na topologia acima, recebemos três mensagens de uma fonte (neste caso, um tópico), agrupamos por chave, agregamos em uma única mensagem e enviamos para um destino (outro tópico).
Para simplificar o trabalho com JSON, vamos trabalhar em HashMap. Vamos criar um Serde personalizado, incluindo serializadores e deserializadores, para ajudar na manipulação de JSON em formato de HashMap.
Serde significa “Serialization and Deserialization” (Serialização e Desserialização) e é responsável por traduzir os bytes do valor da mensagem para algum que consigamos trabalhar. Kafka possui alguns Serde built-in, como Serdes.String()
Vamos usar o ObjectMapper da biblioteca Jackson JSON para simplificar as operações.
Serializador
1
2
3
4
5
6
7
8
9
10
public class MapSerializer implements Serializer<HashMap<String, String>> {
@Override
public byte[] serialize(String s, HashMap<String, String> stringStringHashMap) {
try {
return new ObjectMapper().writeValueAsBytes(stringStringHashMap);
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
}
}
Deserializador
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MapDeserializer implements Deserializer<HashMap<String, String>> {
@Override
public HashMap<String, String> deserialize(String s, byte[] bytes) {
try {
return new ObjectMapper().readValue(
new String(bytes, StandardCharsets.UTF_8),
HashMap.class
);
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
}
}
Por fim, nosso serde:
Serde HashMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MapSerde implements Serde<HashMap<String, String>> {
private final MapDeserializer mapDeserializer = new MapDeserializer();
private final MapSerializer mapSerializer = new MapSerializer();
@Override
public Serializer<HashMap<String, String>> serializer() {
return mapSerializer;
}
@Override
public Deserializer<HashMap<String, String>> deserializer() {
return mapDeserializer;
}
}
Agora que temos o Serde, vamos aplicar na nossa topologia e processar o dados:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
@RequiredArgsConstructor
public class ConsumerStream {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@Value("${topic.name.consumer}")
private String replyTopic;
@PostConstruct
public void init() {
StreamsBuilder streamsBuilder = null;
try {
streamsBuilder = streamsBuilderFactoryBean.getObject();
} catch (Exception e) {
throw new RuntimeException("error to get stream builder", e);
}
if (streamsBuilder == null) {
throw new RuntimeException("No builder for streams");
}
MapSerde mapSerde = new MapSerde();
KStream<String, HashMap<String, String>> replyStream = streamsBuilder.stream(
replyTopic, Consumed.with(Serdes.String(), mapSerde)
);
replyStream
.groupByKey() // agrupar por chave
.aggregate(
HashMap::new,
(key, value, aggregate) -> {
aggregate.putAll(value); // agregar resultados
return aggregate;
},
Named.as("data-grouped"),
Materialized.with(Serdes.String(), mapSerde)
)
.toStream()
.mapValues((k, v) -> {
try {
// transformaos em String para postar no sink e manter a mensagem no mesmo formato
return new ObjectMapper().writeValueAsString(v);
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
})
.to(replyTopic, Produced.with(Serdes.String(), Serdes.String()));
}
}
Este post fornece um guia sobre como criar um Serde personalizado para o Kafka Streams, permitindo que você processe dados de forma eficiente em tempo real usando o Kafka. Se você tiver alguma dúvida ou encontrar informações incorretas sobre o Kafka, por favor, avise-me, e ficarei feliz em fazer as correções necessárias.