Async Programming in Dart
Asynchronous programming is essential for building responsive applications. In Dart, the concept of asynchronous execution is closely tied with Future
and Stream
. While Future
is used for single event async operations, Stream
is used when you expect a sequence of asynchronous events. Whether you’re reading files, handling user input, or working with APIs, understanding Streams is crucial for writing efficient Dart applications.
What is a Stream?
A Stream
represents a sequence of asynchronous data events. Unlike a Future
, which completes once, a Stream can emit multiple values over time. This makes it perfect for handling data that arrives gradually like user inputs, sensor data, or API responses in chunks.
There are two main types of Streams:
1. Single-subscription Streams
These allow only one listener at a time. They're ideal when you have a known, linear flow of data like reading a file or making an HTTP request that delivers a sequence of values.
Example:
final stream = Stream.fromIterable([1, 2, 3]);
stream.listen((value) => print('Received: \$value'));
Output:
Received: 1
Received: 2
Received: 3
This stream emits the numbers 1, 2, and 3 in order. It supports only one listener. If you try to listen again, it will throw an error.
2. Broadcast Streams
These allow multiple listeners to subscribe to the same stream. Broadcast streams are useful when you want multiple parts of your app to react to the same events, such as user interactions or state changes like tracking app lifecycle changes, sensor updates, or user interactions.
Example:
final controller = StreamController<int>.broadcast();
controller.stream.listen((val) => print('Listener A: \$val'));
controller.stream.listen((val) => print('Listener B: \$val'));
controller.sink.add(1);
controller.sink.add(2);
Output:
Listener A: 1
Listener B: 1
Listener A: 2
Listener B: 2
Here, two listeners are added to a single stream. Both will receive the same events emitted by the controller.
Broadcast streams allow multiple listeners to subscribe to the same stream. They’re useful when you want various parts of your app to respond to the same events, such as:
Listening to upload/download progress
Live chat or data feed updates
Tracking sensor changes
App-wide notifications (e.g., user logged out)
Real-World Example: Upload Progress Using Broadcast Stream
Let’s say you have a file upload feature in your app, and both the UI progress bar and the logs section need to update simultaneously.
We’ll use a StreamController<double>.broadcast()
so multiple listeners can listen to the same stream.
Step 1: Create the controller (shared source of updates)
final uploadProgressController = StreamController<double>.broadcast();
Step 2: Log progress (e.g., in a service or background part)
uploadProgressController.stream.listen((progress) {
print('Log: Uploaded ${(progress * 100).toStringAsFixed(0)}%');
});
This runs separately from UI. It prints progress like:
Log: Uploaded 10%
, Log: Uploaded 20%
, ...
Step 3: Update UI (inside a StatefulWidget)
class UploadWidget extends StatefulWidget {
@override
_UploadWidgetState createState() => _UploadWidgetState();
}
class _UploadWidgetState extends State<UploadWidget> {
double _progress = 0.0;
StreamSubscription<double>? _subscription;
@override
void initState() {
super.initState();
_subscription = uploadProgressController.stream.listen((progress) {
setState(() {
_progress = progress;
});
});
}
@override
void dispose() {
_subscription?.cancel(); // Always cancel to avoid memory leaks
super.dispose();
}
@override
Widget build(BuildContext context) {
return LinearProgressIndicator(value: _progress);
}
}
The UI listens separately to the same stream, but manages state using setState
.
Step 4: Simulate file upload
void simulateUpload() async {
for (int i = 1; i <= 10; i++) {
await Future.delayed(Duration(milliseconds: 500));
uploadProgressController.sink.add(i / 10); // Emits 0.1, 0.2 ... 1.0
}
await uploadProgressController.close(); // Close when done
}
Now both parts of the app receive progress updates independently but simultaneously. This is where BroadcastStream
shines — a single stream, multiple consumers.
Creating Streams in Dart
1. Using Stream.fromIterable()
Stream.fromIterable()
Why? Turns an existing collection into a stream of events, emitting each element sequentially.
When? You have a fixed list of data (e.g., a list of IDs, file paths, or static configuration) and want to process items asynchronously.
final stream = Stream.fromIterable(["user1", "user2", "user3"]);
stream.listen((user) => print('Processing user: $user'));
Output:
Processing user: user1
Processing user: user2
Processing user: user3
This creates a stream that emits elements from a list one by one.
2. Using Stream.periodic()
Stream.periodic()
Why? Emits events at a fixed interval, ideal for polling or timers.
When? You need repeated actions over time, such as refreshing data every few seconds or updating a stopwatch UI.
final stream = Stream.periodic(Duration(seconds: 1), (count) => count).take(3);
stream.listen((tick) => print('Tick $tick'));
Output (1 second delay between each line):
Tick: 0
Tick: 1
Tick: 2
This emits a new value (based on count
) every second. Useful for timers or polling.
Example: Polling for Server Status
final statusStream = Stream.periodic(Duration(seconds: 5), (_) => fetchServerStatus());
statusStream.listen((status) {
print("Server status: $status");
});
Future<String> fetchServerStatus() async {
// Simulate fetching server status
await Future.delayed(Duration(milliseconds: 500));
return DateTime.now().second % 2 == 0 ? 'Online' : 'Offline';
}
This demonstrates how to use Stream.periodic
for polling server health, something common in dashboards or admin panels.
3. Async Generator Function with async*
async*
Why? Lets you yield events from within an async function, combining asynchronous delay or I/O with stream emission.
When? You need to fetch or compute data incrementally—e.g., loading paginated API results one page at a time, or reading large files chunk by chunk.
Stream<String> fetchPages(int totalPages) async* {
for (int i = 1; i <= totalPages; i++) {
await Future.delayed(Duration(seconds: 1)); // simulate delay
yield 'Fetched page $i';
}
}
fetchPages(3).listen((page) => print('Received: $page'));
Hypothetical Output (if fetchPageFromApi
returns "Page $i" after delay):
Received page: Page 1
Received page: Page 2
Received page: Page 3
This is an asynchronous generator. It yields a value after each delay. Ideal for step-by-step processing with delays or awaiting async tasks.
Custom Streams with StreamController
StreamController
provides fine-grained control over when and how events are emitted. Use it when events originate from multiple sources or when you need broadcast capabilities.
Why? To manually add, pause, resume, or close a stream; useful for event buses, user interactions, or combining multiple inputs.
When? You have custom data flows, such as mixing user clicks and network events, or implementing global notification systems.
final controller = StreamController<String>();
controller.stream.listen((event) => print("Received: \$event"));
controller.sink.add("Hello");
controller.sink.add("World");
await controller.close();
Expected Output:
Received: Hello
Received: World
This approach is helpful when you're manually handling events—such as user inputs or system data.
Managing StreamSubscription
A StreamSubscription
gives you control over how and when to listen, pause, resume, or cancel stream events.
final subscription = controller.stream.listen((data) => print(data));
// Pause
subscription.pause();
// Resume
subscription.resume();
// Cancel
subscription.cancel();
Use this to manage the lifecycle of your listeners, especially in Flutter where you might need to stop listening on widget disposal.
Why is canceling subscriptions important?
If you don’t cancel a subscription (especially in widgets), it can continue running even after the widget is disposed. This may cause memory leaks, unexpected UI behavior, or performance issues. Always cancel in dispose()
to clean up properly.
Stream Transformations
Transforming a stream means changing or filtering data before it reaches listeners. This helps you write cleaner, modular, and more efficient reactive code.
Common Methods:
map()
– Transforms each element.
stream.map((e) => e * 2);
where()
– Filters based on a condition.
stream.where((e) => e > 5);
take(n)
– Takes only the first n
items.
stream.take(3);
skip(n)
– Skips the first n
items.
stream.skip(1);
When to use these:
Use
map()
to convert raw values (like API responses) into UI models.Use
where()
to filter results (e.g., only active users).Use
take()
to limit load (like top 3 items).Use
skip()
to remove headers or unwanted initial data.
Asynchronous Variants:
asyncMap()
– Useful for calling APIs or async processing.
stream.asyncMap((id) async => await fetchData(id));
Used when transforming stream data into another async value. Useful for network calls or async parsing.
expand()
/ asyncExpand()
– Emits multiple values per input.
stream.expand((item) => [item, item * 2]);
stream.asyncExpand((item) async* {
yield item;
yield await getNext(item);
});
expand
flattens items into multiple values; asyncExpand
does this with async logic. Useful when each item needs to result in multiple outputs.
Best Practice: Use these to avoid deeply nested callbacks and isolate transformation logic from business logic.
Error Handling in Streams
stream.listen(
(data) => print(data),
onError: (err) => print('Error: \$err'),
onDone: () => print('Stream closed'),
);
Always handle errors to prevent unhandled exceptions, especially in production. onDone
lets you clean up resources.
Stream Optimization Tips
1. Debounce Rapid Inputs
Use debounceTime
to prevent handling every keystroke (useful for search input). When handling high-frequency input (e.g., typing), reacting to every keystroke can overload your system or make unnecessary API calls.
Before:
searchController.stream.listen((query) {
print('Searching for: \$query');
});
After (Use debounceTime()
(e.g., via rxdart
) to delay reaction until the user pauses typing:):
searchController.stream
.debounceTime(Duration(milliseconds: 300))
.listen((query) {
print('Searching for: \$query');
});
This waits 300ms after the last input before emitting, reducing API calls.
Why debounce?
Without it, you might trigger 20+ API calls in a few seconds. Debouncing reduces noise, saves bandwidth, and improves responsiveness.
When is it not useful?
If the user stops typing entirely (i.e., becomes inactive), the stream will emit the last value after the debounce duration. Inactivity alone won’t "cancel" the event—but debounce ensures that only the most recent action triggers a response after a delay.
2. Eliminate Redundant Events
Use distinct()
to ignore repeated consecutive values.
stream.distinct().listen((value) {
print('Received: \$value');
});
This avoids processing the same value multiple times in a row.
3. Cancel Unused Streams
Free up memory and processing by canceling listeners when they’re no longer needed.
final subscription = stream.listen(print);
// Later in code
await subscription.cancel();
4. Always Close Controllers
Avoid memory leaks by closing controllers when done.
final controller = StreamController<int>();
controller.sink.add(1);
await controller.close();
Real-World Example: Debounced Search in a Flutter App
final searchController = StreamController<String>.broadcast();
TextField(
onChanged: (value) => searchController.sink.add(value),
);
searchController.stream
.debounceTime(Duration(milliseconds: 500))
.listen((searchQuery) {
fetchResults(searchQuery);
});
In this example, we debounce the user input in a TextField
. If the user types quickly, the stream emits only the final input after 500ms of inactivity.
Conclusion
Streams are a powerful part of Dart’s asynchronous capabilities. Understanding how to create, consume, transform, and optimize them will make your apps faster and more maintainable. Whether you're tracking live uploads, syncing real-time data, or reacting to user events, mastering Streams can dramatically elevate your Dart and Flutter development.
Last updated