Problem
Bei hohen Fehlerquoten während der Nachrichtenverarbeitung kann es von Vorteil sein, den Nachrichtenkonsum für eine gewisse Zeit zu unterbrechen. Dies reduziert die Anzahl der Fehler im System und ermöglicht anderen Konsumenten eine erfolgreiche Verarbeitung.
Fehlerraten zu erkennen und Workflows zu stoppen und wiederaufzunehmen sind die Kernkompetenzen von Circuit–Breakern.
Das Spring Framework bietet keine Möglichkeit, beides zu kombinieren. Die hier beschriebene Lösung schließt die Lücke.
Lösung
- Mit RegistryEventConsumer<CircuitBreaker> kann auf das Öffnen und Schließen des Circuits reagiert werden
- RabbitMQ – Listener können mit RabbitListenerEndpointRegistry zur Laufzeit gestartet oder gestoppt werden
Beides kombiniert ergibt einen resilienten RabbitMQ – Listener.
RabbitCircuitBreakerIntegration
public void onEntryAddedEvent(EntryAddedEvent<CircuitBreaker> event) {
event.getAddedEntry().getEventPublisher().onStateTransition(t -> {
var listener = rabbitRegistry.getListenerContainer(t.getCircuitBreakerName());
var state = t.getStateTransition().getToState();
if ((state == CLOSED || state == HALF_OPEN) && !listener.isRunning()) {
listener.start();
}
if ((state == OPEN || state == FORCED_OPEN) && listener.isRunning()) {
listener.stop();
}
});
}
Beispiel
public class MyMessageListener {
@RabbitListener(id = "my-message-type", ...)
@CircuitBreaker(name = "my-message-type")
public void processMessage(MyMessageDTO payload, Message message) {...}
}
Weitere Aspekte
- Vollständige Implementierung mit Error – Handling: Resiliente RabbitMQ-Listener with Spring