2 Frage: Wie verbraucht man 100 Nachrichten ohne Bestätigung, arbeitet dann und bestätigt sie dann?

Frage erstellt am Fri, May 3, 2019 12:00 AM

Ich verwende Nachrichten von einem Kaninchen-MQ mit Spring AmQP. Ich konsumiere jeweils eine Nachricht und es ist ziemlich langsam, da ich sie in der Datenbank speichere. So öffnen und schließen Sie Transaktionen jedes Mal.

Im Moment habe ich einen Verbraucher wie diesen eingerichtet.

@RabbitListener(queues = "queuename")
public void receive(Message message) {
 someservice.saveToDb(message);
}

Aber das ist wirklich langsam. Ich möchte eine Reihe von Nachrichten lesen, bevor ich sie speichere. Dann kann ich eine Transaktion eröffnen. Speichern Sie 300 und übernehmen Sie dann den nächsten Stapel und laden Sie ihn.

Würde so etwas funktionieren?

class MessageChannelTag {
  Message message;
  Channel channel;
  long tag;
}

@Component
class ConsumerClass {

 List<MessageChannelTag> messagesToSave = new ArrayList<>();

 @RabbitListener(queues = "queuename")
 public void receive(Message message, Channel channel,  @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws IOException {
    messagesToSave.add(new MessageChannelTag(message, channel, tag));
 }

 @Scheduled(fixedDelay=500)
 public void saveMessagesToDb() {
    List saveTheese = new ArrayList(messagesToSave);
    messagesToSave.clear();
    service.saveMessages(saveTheese);
    for(MessageChannelTag messageChannelTag:messagesToSave) {
    //In the service I could mark the rows if save succeded or not and
    //then out here I could ack or nack.. 

 messageChannelTag.getChannel().basicAck(messageChannelTag.getTag(), false);
    }
 }
}

Oder wenn es eine einfachere Lösung gibt, lassen Sie es mich wissen. Ich bevorzuge schnell, einfach und robust =)

    
2
2 Antworten                              2                         

Es könnte sich auch lohnen, zu untersuchen, ob ein "vorgelagerter" Produzent Stapel von Nachrichten anstelle einzelner Nachrichten bereitstellen kann.

    
1
05.05.2019, 07: 39: 50Z
  1. Diese Antwort enthält zwei Fehler. Erstens ist die Pull-API (basic.get) ineffizient, da sie vom clientseitigen Polling abhängt. Zweitens sendet die "Push-API" (basic.consume) keine Nachrichten nacheinander, sofern dies nicht über die Prefetch-Einstellung konfiguriert wurde.
    2019-05-04 15: 09: 10Z
  2. Danke für die Korrekturen!
    2019-05-04 16: 58: 57Z

Verwenden Sie nicht die API "pull" (basic.get). Sie ist bei weitem nicht so effizient wie das Konsumieren von Nachrichten.

Setzen Sie Prefetch (auch als QoS bezeichnet) auf 300 und bestätigen Sie die Nachrichten sofort, wenn Sie fertig sind. Ich bin nicht mit Spring vertraut, aber ich bin sicher, dass es Dekorateure oder andere Möglichkeiten gibt, dies zu erreichen.

Dies alles wird in den Dokumenten und Tutorials behandelt - https://www.rabbitmq.com /consumer-prefetch.html


HINWEIS: Das RabbitMQ-Team überwacht die folgenden rabbitmq-users Mailingliste und beantwortet nur gelegentlich Fragen zu StackOverflow.

    
0
04.05.2019 15: 06: 57Z
Quelle platziert Hier