Request-Response usando Apache Kafka

R

Request-Response sincrono in kafka

Al giorno d’oggi, l’architettura Event Driven viene utilizzata nello sviluppo di applicazioni software in diverse aree, come microservizi con pattern come CQRS, Saga Pattern, ecc e utilizzando sistemi di messaggistica di publish e subscribe come Apache Kafka, per la comunicazione asincrona tra microservizi. Tuttavia, potrebbe essere necessario stabilire una comunicazione sincrona di tipo request/response se i vostri requisiti lo richiedono. Visto che non e’  proprio immediato implementare una comunicazione request-response su Kafka ho deciso di scrivere questo articolo.

PS: mi raccomando leggete fino in fondo l articolo perché’ Kafka e’  un broker atipico.. occorre capire come funziona altrimenti son dolori 😛

requisiti

  • Kafka e Zookeper running in locale:
    • il setup e’  molto semplice vedi gli step descritti in questo LINK (nel mio esempio faccio riferimento a due istanze di Kafka running)
    • puoi altrimenti tirarti su Kafka utilizzando Docker (https://github.com/lensesio/fast-data-dev)
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev:latest
  • Conoscenza delle Basi di Kafka e della sua architettura

 

fibonacci request-response

Per farvi capire come implementare una comunicazione sincrona request/response su Kafka ho deciso di proporre il seguente esercizio. Supponiamo di avere due microservizi

  • micro servizio client kafka-request: espone un API REST /fibonacci la quale si aspetta nel payload di richiesta un oggetto di tipo FibonacciRequest e risponde con un payload di tipo FibonacciResponse con all’interno il risultato della nota funzione di Fibonacci. La logica di calcolo di Fibonacci e’  implementato da un secondo microservizio. I due microservizi comunicheranno tramite Kafka; pertanto una volta ricevuta la richiesta di calcolo via web service il micro servizio client scriverà’ la richiesta di calcolo sul topic kafka fib-request e attendera’ la risposta sul topic fib-response
  • micro servizio fibonacci calculator: legge da un topic Kafka fib-request la richiesta di calcolo, calcola il valore di fibonacci per tale richiesta e risponde su un topic di risposta.

L architettura della soluzione e’ illustrata nella seguente immagine

Per implementare quanto descritto occorre fare due considerazioni:

  1. Tutte le risposte di fibonacci Calculator arriveranno sul medesimo topic fib-response. Pertanto per legare la risposta alla specifica richiesta occorre introdurre il concetto di correlation id. Richiesta e Risposta dovranno quindi avere all’interno un campo correlation id che dovra’ avere il medesimo valore
  2. Nel messaggio di richiesta, oltre a valore e correlation id, sara’ presente anche il nome del topic dove il client e’  in attesa della risposta

Fortunatamente dalla release di Spring Kafka 2.1.3 e’  stato aggiunto il support al pattern Request Response con la nuova classe ReplyingKafkaTemplate la quale ci permette di implementare facilmente i seguenti punti. Vi mostrerò’ adesso, step by step, le parti saliente del codice soluzione di questo esercizio. Il codice della soluzione completa lo trovate nel mio REPO GITHUB.

creiamo i topic di request e response

Come best practices, e’ sempre consigliato creare da cli i topic e non auto generarli da codice essendo tale configurazione un punto chiave per il tuning di kafka. Nel mio esempio ho due nodi running in locale alla porta 9092 e 9093. Da shell quindi provvedo a creare i due topic fib-request e fib-response configurati con 4 partizioni  e replication factor 2 ( configurazione da ambiente di sviluppo, consiglio sempre per garantire HA almeno 3 nodi con replication factor 3)

kafka-topics.bat --create --topic fib-request -zookeeper localhost:2181 --replication-factor 2 --partitions 4
kafka-topics.bat --create --topic fib-response -zookeeper localhost:2181 --replication-factor 2 --partitions 4

Adesso siamo pronti ad analizzare il codice dei due microservizi.

microservizio client requestor

I punti chiavi che dovranno essere implementati sono:

  1. controller per gestire le richieste http dell endpoint fibonacci
  2. producer kafka che scrive la richiesta nel topic fib-request e  consumer kafka che legge nel topic fib-response la risposta

Di seguito le configuazione dell application.properties

server.port=8090
spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093

kafka.request.topic=fib-request
kafka.reply.topic=fib-response
kafka.group.id=fibonacci-result-group

1. controller per gestire le richieste http dell endpoint fibonacci

Abbiamo poco da commentare, trattasi di un semplice Rest Controller.

@RestController
@Slf4j
public class FibonacciController {
    @Autowired
    FibonacciService fibonacciService;

    @PostMapping("/fibonacci")
    public ResponseEntity<?> postFibonacci(@RequestBody FibonacciRequest request) throws ExecutionException, InterruptedException, JsonProcessingException {
        FibonacciResult result = fibonacciService.calculate(request);
        return ResponseEntity.ok(result);
    }
}

2.producer kafka che scrive la richiesta nel topic fib-request e  consumer kafka che legge nel topic fib-response la risposta

Il lavoro e’  totalmente delegato al servizio fibonacciService. Andiamo a vedere il codice di tale service.

 

@Slf4j
@Service
public class FibonacciServiceImpl implements FibonacciService {

    @Autowired
    ReplyingKafkaTemplate<String, String, String> kafkaTemplate;

    @Autowired
    ObjectMapper objectMapper;

    @Value("${kafka.request.topic}")
    String requestTopic;

    @Value("${kafka.reply.topic}")
    String requestReplyTopic;

    @Override
    public FibonacciResult calculate(FibonacciRequest request) throws InterruptedException, ExecutionException, JsonProcessingException {
        log.info("send request fib [{]]", request.getNumber());
        String requestJSON = objectMapper.writeValueAsString(request);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(requestTopic, requestJSON);
        record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
        RequestReplyFuture<String, String, String> sendAndReceive = kafkaTemplate.sendAndReceive(record);
        // confirm if producer produced successfully
        SendResult<String, String> sendResult = sendAndReceive.getSendFuture().get();
        log.info("ProducerRecord request[{}]", sendResult.getProducerRecord());
        sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
        // get consumer record
        ConsumerRecord<String, String> consumerRecord = sendAndReceive.get();
        String jsonResponse = consumerRecord.value();

        // return consumer value
        return objectMapper.readValue(jsonResponse, FibonacciResult.class);
    }
}

 

La parte hot del codice sopra mostrato e’  sicuramente l utlizzo di

ReplyingKafkaTemplate<String, String, String> kafkaTemplate;

Tale utility e’  una novità’ di Kafka che ci permette di realizzare un meccanismo request-response su Kafka.  Guardando le righe di codice del ProducerRecord creato

String requestJSON = objectMapper.writeValueAsString(request);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(requestTopic, requestJSON);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));

si puo’  notare che abbiamo valorizzato nel messaggio un header KafkaHeaders.REPLY_TOPIC in modo da comunicare al microservizio fibonacci calculator dove attendiamo la risposta. A questo punto possiamo passare il producer record all utility sendAndReceive di ReplyingKafkaTemplate in modo da inviare il messaggio su Kafka. ReplyingKafkaTemplate  in modo del tutto trasparente andrà’ a valorizzare un altro header kafka_correlationId con identificativo univoco.

RequestReplyFuture<String, String, String> sendAndReceive = kafkaTemplate.sendAndReceive(record);
SendResult<String, String> sendResult = sendAndReceive.getSendFuture().get();
log.info("ProducerRecord request[{}]", sendResult.getProducerRecord());

Ho inserito a scopo didattico un log che mostra il ProducedRecord con gli header sopra spiegati.

A questo punto possiamo metterci in attesa della risposta sul topic di risposta incluso nell header del messaggio appena inviato: fib-response topic.

ConsumerRecord<String, String> consumerRecord = sendAndReceive.get();
String jsonResponse = consumerRecord.value();

return objectMapper.readValue(jsonResponse, FibonacciResult.class);

Riassumendo velocemente con ReplyingKafkaTemplate  riusciamo ad inviare un messaggio su kafka il quale contiene due header: correlation id ( auto valorizzato) e topic di risposta.

Il resto del codice lo trovate all URL 

microservizio fibonacci calculator

Il comportamente del microservizio Fibonacci Calculator e’  il seguente:

  1. Leggere messaggi dal topic fib-request
  2. Per ogni messaggio calcolare il numero di fibonacci
  3. Scrivere nella Risposta il risultato; il framework ci garantirà’ di includere nella risposta il correlation id della richiesta nell header kafka_correlationId in modo che l altro microservizio possa fare match tra request e response.

Di seguito le configuazione dell application.properties

server.port=8091
spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093


kafka.request.topic=fib-request
kafka.reply.topic=fib-response
spring.kafka.consumer.group-id=fibonacci-calculator-group

I 3 punti sopra citati sono implementati nelle seguenti linee di codice

@Component
public class KafkaConsumer {

    @Autowired
    private ObjectMapper objectMapper;

    private static long fibonacci(int i) {
        /* F(i) non e` definito per interi i negativi! */
        if (i == 0) return 0;
        else if (i == 1) return 1;
        else return fibonacci(i-1) + fibonacci(i-2);
    }

    @KafkaListener(topics = {"${kafka.request.topic}"})
    @SendTo
    public String onMessage(String req) throws JsonProcessingException {
        FibonacciRequest fibonacciRequest = objectMapper.readValue(req, FibonacciRequest.class);
        long result = fibonacci(fibonacciRequest.getNumber());
        FibonacciResult calc = new FibonacciResult();
        calc.setResult(result);
        return objectMapper.writeValueAsString(calc);
    }
}

Analizziamo il codice appena mostrato

  • Con l annotation KafkaListener andiamo a realizzare il punto 1) leggere dal topic fib-request.
  • L annotation SendTo su un listener del topic A, permette di inviare  ad un topic B il valore di ritorno di tale listener. In breve mette in relazione due topic permettendo facilmente la creazione di pipeline. Il topic di destinazione della risposta deve essere passato come argomento o altrimenti viene dedotto dal framework se nel messaggio e’  contenuto l header kafka_replyTopic come nel nostro caso. Pertanto in tale listener leggiamo messaggi provenienti da fib-request, calcoliamo il risultato di fibonacci per tali richieste e ripondiamo sul topic fib-response includendo in tale risposta il correlation id.

Trovate il codice completo del secondo microservizio all URL.

concurrent consumer

Nel nostro esempio abbiamo volutamente configurato in maniera concorrente i consumer ( 3 thread a consumer) e abbiamo volutamente configurato il topic con più partizioni. Questo perché’  il container in listening e’  in grado di sistemare il matching tra request e response grazie al correlation id.

multiple istanze del micro servizio client requestor

In un architettura a microservice si scala orizzontalmente eseguendo più’ repliche di un microservizio se il carico lo necessita. Kafka e’ fantastico in questo. Se conoscete Kafka sapete che se eseguo più’ istanze di un consumer con il medesimo consumer-group, il carico viene distribuito tra le diverse istanze del consumer.

Supponiamo di avere il topic A con 4 partizioni. Posso eseguire 4 istanze di un consumer con medesimo consumer group. In questo modo

  • ad ogni consumer viene assegnata una partizione del topic
  • i messaggi vengono creati da kafka in modo da distribuire equamente i messaggi tra le varie partizioni
  • ogni singola istanza del consumer riceverà’ i messaggi della propria partizione di competenza.

In questo modo si aumenta notevolmente throughput dei consumer.

Tutto molto bello… pero se scaliamo in questo modo la nostra soluzione di request-response non funziona più’. Questo perché l istanza che invia la richiesta di fibonacci potrebbe non ricevere la risposta se la risposta viene salvata su una partizione assegnata ad un altra instanza del consumer. Abbiamo due soluzione a questo problema.

  1. La documentazione di Spring Kafka consiglia di usare un topic ad hoc per istanza di consumer. Nel nostro esempio se decidiamo di eseguire 4 istanze del microservizio client requestor occorrerebbe creare 4 topic di risposta: fib-response-1 … fib-response-4. Questa soluzione funziona ma non e’ il massimo della flessibilità’.
  2. Un altra soluzione e’  quella di inviare nel messaggio, oltre al topic di risposta e correlation id, un altro header KafkaHeaders.REPLY_PARTITION contenente la rappresentazione BIG-ENDIAN dell intero della partizione assegnata a tale istanza di consumer. Il framework gestirà’ tale header come quanto fatto con il topic di risposta: la risposta verrà’ salvata nel topic di risposta e nella partizione specificata. In questo modo  siamo sicuri che il consumer riceverà’ la risposta.

conclusioni

Abbiamo spiegato come realizzare il pattern request-response su Kafka in maniera sincrona. Come specificato ad inizio articolo ponderate se e’  veramente necessario applicare il pattern request-response sincrono nella vostra architettura. Se potete affidatevi ai pattern event driven asincronici e semplicemente gestire request e response in maniera totalmente asincrona

A proposito di me

Dario Frongillo

Software architect con un forte background in Java, Architettura REST, framework Spring , Database Design,progettazione e sviluppo di SPA e RIA Web application con framework Javascript. Attualmente mi occupo di sviluppo soluzioni software in ambito Banking e Finance: in particolare progettazione e sviluppo di applicativi web based per la realizzazione di sistemi di trading, interfacce con i mercati finanziari e di servizi real time.

I nostri Partner

Gli articoli più letti

Articoli recenti

Commenti recenti