- ⚠️ Project Reactor treats any unhandled error in a Flux as a terminal event by default.
- 📉 Using AtomicInteger allows localized error tracking without disrupting the global stream.
- 🚫 Built-in operators like
.retry()and.onErrorContinue()don’t support custom error thresholds. - 🛠️ Time-based
.window()and.handle()operators support granular error handling per batch. - 📊 Integrating logs and metrics using
.doOnError()or Micrometer helps see what's happening in production.
In Java reactive programming, being able to recover and control things is important. This is true when data pipelines have failures. Traditional programs work one step at a time. But streams that use Project Reactor use asynchronous processing. This creates new problems: how do you stop a Flux stream in a smooth way after a set number of errors, but still keep your system quick and steady? This guide explains how to do this. It will help make sure your system handles backpressure, tolerates errors, and lets you see what's happening.
Understanding Flux Error Handling in Project Reactor
In reactive systems that use Project Reactor, error handling is very important for making them reliable. Streams can run for a long time and at the same time. So, it is key to deal with problems without stopping the whole stream, unless a problem really needs to stop everything.
Project Reactor gives operators for handling problems. These operators give different ways to respond when something goes wrong downstream:
.onErrorResume(): Switches to a fallback path if an error occurs in the main processing path. Example: fall back to a cached value..retry(n): Automatically retries the entire sequence up tontimes after any error..onErrorContinue(): Ignores the current error and resumes processing from the next item..doOnError(): Side-effect operator used for logging or diagnostics when an exception occurs.
These operators are very useful. But each one assumes errors can either be fixed right away, or they need the process to try again. None of them lets you set a count for how many errors you will allow before stopping.
This is why you need custom error handling.
The Anatomy of a Flux Stream and Errors
Reactive streams are made with the idea that errors mean the end. A Flux or Mono can put out some items. After that, it does one of two things:
- It sends a
complete()signal, which means it finished successfully. - It sends an
error()signal. This stops the stream and no more items are sent.
This behavior fits with how protocols work and what reactive rules say. But not every type of business agrees. Think about when a service is partly down, or when you get bad records when processing, or short, random system issues. We want to allow some of these to happen and keep going, but only up to a set limit.
Important Considerations in Reactive Environments
- Backpressure: Project Reactor is made to respect what downstream needs. Custom error handling should never make the upstream send too much data or not enough.
- Running in parallel: Streams can run at the same time (
parallel(),flatMap()). This makes it harder to count errors. - Signal order: The order of
onNext(),onError(), andonComplete()must be the same every time. This is key when testing or putting streams together.
Error tolerance needs to take all these things into account. This helps avoid hard-to-find problems later.
Why 3 Errors? Real-World Motivation
There are many real reasons to stop a stream after a certain number of errors (like 3):
- API Grouping: Say your API pulls data from 10 microservices. If one or two fail, it might still be useful. But 3 or more might point to a wider problem.
- Reading Data: If you are reading CSV rows or XML data, skipping one or two bad messages is okay. But many bad messages might mean the whole file is bad.
- Watching/Alerts: In systems where you watch what happens closely, 3 similar errors often cause alerts. Stopping the stream can save CPU and network power.
- IoT Sensor Data: Devices sending bad data now and then should not break things. But ongoing problems should make the system reset or use a backup plan.
In all these cases, you are not just dealing with errors. You are making sure you trust the data stream.
Implementing a 3-Error Threshold with AtomicInteger
The easiest and safest way for multiple tasks to run at once to track how many errors happen in a Flux is using an AtomicInteger. This Java tool lets you count up safely, even when things are running in parallel.
AtomicInteger errorCount = new AtomicInteger(0);
Flux<String> source = getSomeFluxSource(); // Imagine this emits strings
Flux<String> processed = source
.handle((item, sink) -> {
try {
if (item.contains("fail")) {
throw new RuntimeException("Processing failed");
}
sink.next(item);
} catch (Exception ex) {
int current = errorCount.incrementAndGet();
if (current >= 3) {
sink.error(new RuntimeException("Exceeded error threshold"));
}
// silently skip faulty item
}
});
Why This Works
- 🧮
AtomicIntegercounts errors one by one and safely. - 🚫
sink.error(...)is only called when the error limit is reached. - ⏭️ Bad items are skipped using
handle, notmap. This stops it from sending extra items you do not want. - 🧵 Reactive backpressure is kept because it does not push extra items when not needed.
This method works without restarting the stream. This is different from .retry(). And it does not hide errors like .onErrorContinue() does if you do not log them.
The Limits of Built-In Error Operators
Reactor has many error handling features. But built-in operators are often not enough for cases where you need an error limit.
| Operator | Behavior | Limitation |
|---|---|---|
.retry(n) |
Tries the whole publisher sequence again | It does not count how many different errors happened. |
.onErrorContinue() |
Skips bad items and keeps going | It never stops, even if hundreds of items fail. |
.onErrorResume() |
Uses a backup publisher | It's a one-time fix, not based on a count. |
.takeUntil(predicate) |
Stops when a rule is met | It does not watch for errors unless you track them by hand in the rule. |
.window(int/time) |
Puts stream items into groups | It needs custom error counters to work with it. |
None of these by itself makes sure a failure count limit is met without changing them.
Composing Error-Aware Windows
In more complex data processing paths, it is helpful to count how often errors happen inside windows or batches. This lets you stop a group based on a condition.
Here is how it works when you use window() and handle() together:
Flux<String> batchedFlux = getSomeFluxSource()
.window(10) // every 10 items
.flatMap(window -> {
AtomicInteger errorCount = new AtomicInteger(0);
return window.handle((item, sink) -> {
try {
process(item);
sink.next(item);
} catch (Exception ex) {
if (errorCount.incrementAndGet() >= 3) {
sink.complete(); // Just skip this window
}
}
});
});
Advantages:
- Keeps how many errors it will allow per batch (like 3 bad items out of 10).
- Keeps the main stream going without stopping everything.
- Makes it easier to break things down for logs and dashboards.
Use this for ETL processing, tools that collect performance data, or tools that import many records.
Graceful Shutdown vs. Error Termination
Reactor can send two signals to stop things:
sink.error(Throwable)— This stops the stream suddenly and tells anyone listening.sink.complete()— This signals the stream has stopped cleanly.
A good rule to follow:
Use
complete()when you hit a stop rule set by your business (like 3 errors). Only useerror()for problems you really cannot fix.
Stopping things smoothly makes the system more able to recover:
- Metrics tools can figure out how long things took from start to finish.
- User interfaces avoid spinning icons that never stop or network errors.
- Logs show the full life of a session instead of serious errors.
Signals are not just about meaning. They are about how the system acts.
Spring WebFlux Use Case: Aggregating with Partial Failure Tolerance
This way of doing things works well in Spring WebFlux applications. Say you want to pull data from several outside services. But you do not want to stop the whole data gathering process for short-term problems.
@GetMapping("/compose")
public Flux<ResponsePayload> aggregateResults() {
List<ServiceReference> endpoints = getServiceEndpoints();
AtomicInteger errors = new AtomicInteger(0);
return Flux.fromIterable(endpoints)
.flatMap(endpoint -> callDownstream(endpoint)
.onErrorResume(err -> {
if (errors.incrementAndGet() >= 3) {
return Mono.error(new RuntimeException("Too many failures"));
}
return Mono.empty(); // Skip this result
}))
.takeUntil(response -> errors.get() >= 3);
}
Benefits:
- The API stays available when parts of services are down.
- Logs show which services failed.
- You can easily test this with
StepVerifier.
Testing with StepVerifier
Reactive code is known to be hard to fix problems in. This is true mostly when errors happen at specific times. Use StepVerifier to test unusual situations and check that things work the right way.
StepVerifier.create(processedFlux)
.expectNext("item1", "item2")
.expectErrorMatches(err -> err instanceof RuntimeException &&
err.getMessage().contains("Exceeded"))
.verify();
This way of doing things:
- Checks the exact order of events.
- Confirms the stop signal (error or complete).
- Works with fake time using
withVirtualTime()if you usedelayElements()or timers.
Also, add tests for zero errors, max errors, and edge cases.
Avoiding Common Pitfalls
Here are some mistakes to avoid when setting error limits in reactive processes:
- ❌ Not using
try/catchlogic inhandle()orflatMap()means errors you cannot manage. - ❌ Using the same
AtomicIntegerwith many subscribers without making sure they work together. - ❌ Using
Mono.error()for items that were "skipped" instead ofMono.empty(). - ❌ Only logging in
onErrorResume. Think about using.doOnError()for more detailed logs.
Being able to see errors clearly is very important for finding problems in production.
Enhancing Observability and Monitoring
Your code works. Now make sure your operations team can see what it is doing. Add these helpers:
.doOnNext(value -> log.info("Emitted {}", value)).doOnError(ex -> metrics.counter("flux.error").increment()).doFinally(signal -> log.debug("Terminated with: {}", signal))
To see what's happening in production:
- Use Micrometer with Prometheus/Grafana for dashboards.
- Use distributed tracing (with Brave or OpenTelemetry) to link service errors to streams that stop.
- Send alerts if error rates go above normal limits.
Being able to recover without seeing what's happening is not really recovery. It is just being hidden.
Best Practices for Production-Grade Flux Error Handling
Follow these rules to write strong streams that are easy to keep up and know about errors:
- 🧩 Keep error limit logic separate using tools (
ErrorLimiter,ThresholdSkipper). - 🚦 Treat errors as signals that carry state, not just bad results.
- 🔁 Do not retry unless the source upstream makes sense (like an unreliable network).
- 🧵 Use things that do not block. Never call
.block()inside a stream. - 🎛️ Use
.handle(),.onErrorResume(), and.doOnError()together to get full control.
Reactive code is very good at showing complex ideas. But it only works well if used safely.
Final Thoughts
In Java reactive programming, Flux error handling is not just about finding errors. It is about managing them smoothly. Project Reactor lets you make data streams that can handle problems, let you see what's happening, and respond quickly.
You can set smart rules to stop things, like stopping after three errors. This turns your stream into a system that changes and handles things that are not perfect in the real world. No matter if you are building APIs, ETL jobs, or real-time processors, start by thinking about recovery. Count your errors. Set limits. Stop smoothly. And keep your stream going with confidence.
Citations
Lightbend. (2022). Reactive Streams in Practice. Retrieved from https://www.lightbend.com/blog/reactive-streams-in-practice
Pivotal/VMware. (2020). Reactor Docs: Handling Errors Gracefully. Retrieved from https://projectreactor.io/docs/core/release/reference/#error-handling
Baeldung. (2021). Guide to Project Reactor Error Handling in Java. Retrieved from https://www.baeldung.com/reactor-error-handling
JetBrains. (2023). State of Developer Ecosystem Report 2023. Retrieved from https://www.jetbrains.com/lp/devecosystem-2023
Oracle. (2022). Java Developer Insights: Reactive Adoption. Retrieved from https://blogs.oracle.com/java/post/why-java-devs-are-embracing-reactive-programming