ডার্টে অ্যাসিনক্রোনাস প্রোগ্রামিং

রেসপন্সিভ অ্যাপ তৈরি করতে অ্যাসিনক্রোনাস প্রোগ্রামিং খুব গুরুত্বপূর্ণ। Dart-এ অ্যাসিনক্রোনাস Future এবং Stream এর মাধ্যমে execution হয়ে থাকে। Future সাধারণত একবারের জন্য কোনো অ্যাসিনক্রোনাস অপারেশনের ফলাফল দেয়, কিন্তু Stream ব্যবহার করা হয় যখন একের পর এক অনেকগুলো অ্যাসিনক্রোনাস ইভেন্টের অপেক্ষা করার দরকার পরে।

ফাইল read করা, ইউজার ইনপুট হ্যান্ডেল করা কিংবা API থেকে ডেটা নেওয়ার মতো কাজগুলোর জন্য Stream সম্পর্কে ভালোভাবে বোঝা Dart-এ দক্ষ অ্যাপ বানাতে খুবই প্রয়োজন।


Stream কী?

Stream হলো অ্যাসিনক্রোনাস ইভেন্টগুলোর একটি ধারাবাহিক সিকোয়েন্স। Future যেখানে একবারের জন্য কোনো রেজাল্ট দেয়, সেখানে Stream একাধিক মান সময়ের সাথে সাথে দিতে পারে। তাই এটা সেই পরিস্থিতিতে খুবই উপকারী, যেখানে ডেটা আসতে থাকে ধাপে ধাপে — যেমন ইউজারের ইনপুট, সেন্সর ডেটা বা বড় আকারের API রেসপন্স।


Stream-এর ধরন

Dart-এ মূলত দুটি ধরণের Stream রয়েছে:

১. Single-subscription Stream

এই ধরণের Stream-এ একবারে একজন লিসেনার (listener) যুক্ত হতে পারে। সাধারণত যখন আপনি জানেন যে একটি নির্দিষ্ট ধাপে ডেটা আসবে — যেমন ফাইল read করা বা সিকোয়েন্সভিত্তিক HTTP রিকোয়েস্ট — তখন এটি ব্যবহার করা হয়।

উদাহরণ:

final stream = Stream.fromIterable([1, 2, 3]);

stream.listen((value) => print('প্রাপ্ত ডেটা: $value'));

আউটপুট:

প্রাপ্ত ডেটা: 1  
প্রাপ্ত ডেটা: 2  
প্রাপ্ত ডেটা: 3  

এখানে Stream একে একে ১, ২, ৩ পাঠাচ্ছে। এটি শুধু একবারই লিসেন করতে পারবে। আবার চেষ্টা করলে Error দিবে।


২. Broadcast Stream

Broadcast Stream-এ একাধিক লিসেনার যুক্ত হতে পারে। যখন আপনার অ্যাপের বিভিন্ন অংশকে একই ইভেন্টে react করতে হয়, তখন এটি খুব কার্যকর — যেমন সেন্সর data আপডেট, ইউজার ইন্টারঅ্যাকশন বা অ্যাপ লাইফসাইকেল ট্র্যাক করা।

উদাহরণ:

final controller = StreamController<int>.broadcast();

controller.stream.listen((val) => print('লিসেনার A: $val'));
controller.stream.listen((val) => print('লিসেনার B: $val'));

controller.sink.add(1);
controller.sink.add(2);

আউটপুট:

লিসেনার A: 1  
লিসেনার B: 1  
লিসেনার A: 2  
লিসেনার B: 2  

এখানে একটি ইভেন্ট একাধিক লিসেনার পাচ্ছে। এটা একই ডেটা অ্যাপের একাধিক জায়গায় পাঠাতে খুব কার্যকর।


Broadcast Stream এমন একটি স্ট্রিম, যেটাতে একাধিক লিসেনার (listener) একই সাথে সাবস্ক্রাইব করতে পারে। অর্থাৎ, একটি ইভেন্ট ঘটলে অ্যাপের একাধিক অংশ সেটা একসাথে জানতে পারে।

এই ধরনের স্ট্রিম তখনই কাজে লাগে, যখন অ্যাপের অনেক জায়গায় এক ইভেন্টের প্রতিক্রিয়া জানাতে হয়। যেমন:

  • আপলোড বা ডাউনলোড প্রোগ্রেস ট্র্যাক করা

  • লাইভ চ্যাট বা রিয়েল-টাইম ডেটা ফিড আপডেট

  • সেন্সরের data (sensor value) এর পরিবর্তন দেখা

  • অ্যাপ জুড়ে কোনো নোটিফিকেশন পাঠানো (যেমন: ইউজার লগআউট হয়েছে)

ব্রডকাস্ট স্ট্রিম ব্যবহার করে রিয়েল-টাইম আপলোড প্রোগ্রেস ট্র্যাকিং

ধরো, তোমার অ্যাপে একটি ফাইল আপলোড ফিচার আছে। এখানে একসাথে দুই জায়গায় প্রোগ্রেস আপডেট দেখাতে হবে — একটি হলো UI-তে প্রোগ্রেস বার, আর অন্যটি লগ সেকশনে টেক্সট আকারে।

এই দুই জায়গায় একসাথে আপডেট পাঠানোর জন্য আমরা ব্যবহার করব StreamController<double>.broadcast() — যেটা একই স্ট্রিমকে একাধিক লিসেনার শোনার সুযোগ দেয়।


ধাপ ১: স্ট্রিম কন্ট্রোলার তৈরি করা

final uploadProgressController = StreamController<double>.broadcast();

এই কন্ট্রোলারই হবে আপলোড প্রোগ্রেস আপডেটের মূল।


ধাপ ২: লগ আপডেট (যেমন সার্ভিস বা ব্যাকগ্রাউন্ড প্রসেসে)

uploadProgressController.stream.listen((progress) {
  print('লগ: ${ (progress * 100).toStringAsFixed(0) }% আপলোড হয়েছে');
});

এটা UI-এর অংশ না — শুধু কনসোলে লগ করার জন্য। উদাহরণস্বরূপ:

লগ: 10% আপলোড হয়েছে  
লগ: 20% আপলোড হয়েছে  
...

ধাপ ৩: UI-তে প্রোগ্রেস আপডেট দেখানো (StatefulWidget এর ভিতরে)

class UploadWidget extends StatefulWidget {
  @override
  _UploadWidgetState createState() => _UploadWidgetState();
}

class _UploadWidgetState extends State<UploadWidget> {
  double _progress = 0.0;
  StreamSubscription<double>? _subscription;

  @override
  void initState() {
    super.initState();
    _subscription = uploadProgressController.stream.listen((progress) {
      setState(() {
        _progress = progress;
      });
    });
  }

  @override
  void dispose() {
    _subscription?.cancel(); // মেমোরি লিক এড়াতে সাবস্ক্রিপশন বন্ধ করে দাও
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return LinearProgressIndicator(value: _progress);
  }
}

এখানে UI আলাদা ভাবে স্ট্রিমটা listen করছে এবং setState ব্যবহার করে স্ক্রিনে আপডেট দেখাচ্ছে। যেটা স্ট্রিম থেকে যতবার ভ্যালু আসে, সেটার উপর ভিত্তি করে প্রগ্রেস বার আপডেট হয়।


ধাপ ৪: আপলোড সিমুলেট করা

void simulateUpload() async {
  for (int i = 1; i <= 10; i++) {
    await Future.delayed(Duration(milliseconds: 500));
    uploadProgressController.sink.add(i / 10); // ০.১, ০.২ ... ১.০ পাঠায়
  }
  await uploadProgressController.close(); // কাজ শেষ হলে স্ট্রিম বন্ধ করো
}

এখানে একই স্ট্রিম থেকে দুই জায়গায় ডেটা যাচ্ছে — UI আর লগ সিস্টেমে। কিন্তু কোড আলাদা ব্লকে রাখা হয়েছে, তাই মেইনটেইন করাও সহজ, বুঝতেও সুবিধা হয়। এই জায়গাতেই BroadcastStream দারুণ কার্যকর — এক উৎস, অনেক গ্রাহক।


Dart-এ Stream তৈরি করার পদ্ধতি

১. Stream.fromIterable() ব্যবহার করে

কেন ব্যবহার করবেন? একটি লিস্ট বা কালেকশনকে ধাপে ধাপে স্ট্রিমের মাধ্যমে পাঠাতে চাইলে।

কখন ব্যবহার করবেন? যখন আপনার কাছে পূর্বনির্ধারিত কিছু ডেটা আছে (যেমন ইউজার আইডির তালিকা, ফাইলের path, কনফিগারেশন ইত্যাদি) এবং সেগুলো অ্যাসিনক্রোনাসভাবে process করতে চান।

উদাহরণ:

final stream = Stream.fromIterable(["user1", "user2", "user3"]);

stream.listen((user) => print('ইউজার process হচ্ছে: $user'));

আউটপুট:

ইউজার process হচ্ছে: user1  
ইউজার process হচ্ছে: user2  
ইউজার process হচ্ছে: user3  

২. Stream.periodic() ব্যবহার করে

কেন ব্যবহার করবেন? নির্দিষ্ট সময় পরপর একটি ইভেন্ট ট্রিগার করতে চাইলে এটি আদর্শ। যেমন টাইমার বা সার্ভার পোলিং।

কখন ব্যবহার করবেন? যখন আপনার দরকার নির্দিষ্ট সময় পরপর অ্যাকশন চালানো — যেমন প্রতি ৫ সেকেন্ডে সার্ভারের অবস্থা দেখা বা প্রতি ১ সেকেন্ডে টাইমার আপডেট করা।

উদাহরণ:

final stream = Stream.periodic(Duration(seconds: 1), (count) => count).take(3);

stream.listen((tick) => print('টিক: $tick'));

আউটপুট (প্রতি সেকেন্ডে একবার):

টিক: 0  
টিক: 1  
টিক: 2  

সার্ভার স্ট্যাটাস চেকের উদাহরণ:

final statusStream = Stream.periodic(Duration(seconds: 5), (_) => fetchServerStatus());

statusStream.listen((status) {
  print("সার্ভারের অবস্থা: $status");
});

Future<String> fetchServerStatus() async {
  await Future.delayed(Duration(milliseconds: 500));
  return DateTime.now().second % 2 == 0 ? 'অনলাইনে' : 'অফলাইনে';
}

৩. async* দিয়ে অ্যাসিনক্রোনাস জেনারেটর ফাংশন

কেন ব্যবহার করবেন? যখন প্রতিটি ইভেন্টের আগে একটু অপেক্ষা করতে হয়, বা ইভেন্ট তৈরি হয় কোনো অ্যাসিনক্রোনাস ফাংশনের মাধ্যমে।

কখন ব্যবহার করবেন? যখন আপনি ধাপে ধাপে ডেটা আনছেন — যেমন API থেকে পেজ বাই পেজ রেসপন্স আনছেন বা বড় ফাইল ভাগে ভাগে পড়ছেন।

উদাহরণ:

Stream<String> fetchPages(int totalPages) async* {
  for (int i = 1; i <= totalPages; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield 'পৃষ্ঠা $i ডাউনলোড হয়েছে';
  }
}

fetchPages(3).listen((page) => print('প্রাপ্ত: $page'));

আউটপুট:

প্রাপ্ত: পৃষ্ঠা 1 ডাউনলোড হয়েছে  
প্রাপ্ত: পৃষ্ঠা 2 ডাউনলোড হয়েছে  
প্রাপ্ত: পৃষ্ঠা 3 ডাউনলোড হয়েছে  

StreamController দিয়ে নিজের মতো করে Custom Stream তৈরি করা

StreamController আপনাকে স্ট্রিম কবে শুরু হবে, কী পাঠাবে, কখন বন্ধ হবে ইত্যাদি ব্যাপারে পূর্ণ নিয়ন্ত্রণ দেয়।

কেন ব্যবহার করবেন? যখন আপনি নিজে থেকে ইভেন্ট ট্রিগার করতে চান — যেমন ব্যবহারকারী ক্লিক করলে, অথবা একাধিক ডেটা সোর্স থেকে ইভেন্ট আসলে।

উদাহরণ:

final controller = StreamController<String>();

controller.stream.listen((event) => print("প্রাপ্ত: $event"));

controller.sink.add("হ্যালো");
controller.sink.add("ওয়ার্ল্ড");
await controller.close();

আউটপুট:

প্রাপ্ত: হ্যালো  
প্রাপ্ত: ওয়ার্ল্ড  

StreamSubscription ম্যানেজ করা

StreamSubscription দিয়ে আপনি একটি স্ট্রিমে লিসেন করা, পজ করা, রিজিউম বা বাতিল করতে পারেন।

final subscription = controller.stream.listen((data) => print(data));

// থামানো
subscription.pause();

// আবার চালু করা
subscription.resume();

// বাতিল করা
subscription.cancel();

Flutter-এ Widget dispose হওয়ার সময় StreamSubscription বন্ধ করা খুব জরুরি।

সাবস্ক্রিপশন ক্যানসেল করা কেন জরুরি?

যদি কোনো স্ট্রিম সাবস্ক্রিপশন আপনি ক্যানসেল না করেন (বিশেষ করে উইজেটের ভিতরে), তাহলে সেটি উইজেট destroy হওয়ার পরেও চালু থাকতে পারে। এর ফলে মেমোরি লিক, অপ্রত্যাশিত UI বিহেভিয়ার, এমনকি পারফরম্যান্স প্রবলেম পর্যন্ত দেখা দিতে পারে।

তাই সবসময় dispose() মেথডের ভিতরে সাবস্ক্রিপশন ক্যানসেল করে দেওয়া উচিত, যেন সব কিছু পরিষ্কারভাবে বন্ধ হয়।


স্ট্রিম ট্রান্সফরমেশন (Stream Transformations)

স্ট্রিম ট্রান্সফর্ম মানে হচ্ছে—লিসেনারে পৌঁছানোর আগেই ডেটাকে পরিবর্তন, ছাঁটাই বা প্রসেস করে নেওয়া। এটা কোডকে আরও ক্লিন, মডুলার আর এফিশিয়েন্ট করতে সাহায্য করে।

কিছু কমন মেথড:

map() – প্রতিটি উপাদান পরিবর্তন করে:

stream.map((e) => e * 2).where((e) => e > 5);

একটি স্ট্রিমের প্রতিটি ভ্যালু দ্বিগুণ করে এবং যেগুলো ৫-এর বেশি, সেগুলো ধরে।

where() – শর্ত দিয়ে ফিল্টার করে:

stream.where((e) => e > 5);

take(n) – শুরু থেকে নির্দিষ্ট সংখ্যক আইটেম নেয়:

stream.take(3);

skip(n) – শুরু থেকে নির্দিষ্ট সংখ্যক আইটেম বাদ দেয়:

stream.skip(1);

কখন কোনটা ব্যবহার করবেন:

  • map() – যখন API থেকে পাওয়া র’ ডেটাকে UI-ফ্রেন্ডলি মডেলে রূপান্তর করতে হয়

  • where() – যেমন: শুধু Active ইউজারগুলো ফিল্টার করতে

  • take() – যেমন: শীর্ষ ৩টি আইটেম দেখাতে

  • skip() – যেমন: কোনো হেডার বা অপ্রয়োজনীয় শুরুর ডেটা বাদ দিতে


অ্যাসিনক্রোনাস ভ্যারিয়েন্ট (Asynchronous Variants)

asyncMap() – যখন প্রতিটি স্ট্রিম আইটেমকে অ্যাসিনক্রোনাসভাবে প্রসেস করতে হয় (যেমন API কল)

stream.asyncMap((id) async => await fetchData(id));

এটা তখন দরকার পড়ে, যখন স্ট্রিমের প্রতিটি উপাদান থেকে অন্য কোনো async ভ্যালু তৈরি করতে হয়।

expand() / asyncExpand() – একটি ইনপুট থেকে একাধিক আউটপুট বের করে:

stream.expand((item) => [item, item * 2]);
stream.asyncExpand((item) async* {
  yield item;
  yield await getNext(item);
});

expand() মূলত একটি ইনপুট থেকে একাধিক সিঙ্ক আউটপুট দেয়, আর asyncExpand() অ্যাসিনক্রোনাসভাবে করে। যেমন: যদি প্রতিটি আইটেমের জন্য একাধিক আউটপুট দরকার হয় বা ধাপে ধাপে কিছু আনতে হয়।

বেস্ট প্র্যাকটিস:

এই ট্রান্সফরমেশনগুলো ব্যবহার করলে আপনার কোডে ডিপলি নেস্টেড কলব্যাক লাগবে না। ফলে বিজনেস লজিক থেকে প্রসেসিং লজিক আলাদা রাখা সহজ হয়।


স্ট্রিমে এরর হ্যান্ডলিং

stream.listen(
  (data) => print(data),
  onError: (err) => print('এরর: $err'),
  onDone: () => print('স্ট্রিম শেষ'),
);

সবসময় onErroronDone হ্যান্ডল করা উচিত, বিশেষ করে প্রোডাকশন অ্যাপে।


Stream অপ্টিমাইজেশন টিপস

১. দ্রুত ইনপুট ডিবাউন্স করা

debounceTime ব্যবহার করা যেন প্রতি কীবোর্ড চাপের (keystroke) অ্যাকশন না নেওয়া হয় — এটি সার্চ ইনপুটের মতো ক্ষেত্রে খুবই কার্যকর।

যখন আপনি খুব ঘন ঘন ইনপুট (যেমন টাইপিং) হ্যান্ডল করছেন, তখন প্রতিটি অক্ষরের জন্য কাজ চালালে সিস্টেমের ওপর চাপ পড়তে পারে বা অপ্রয়োজনীয় API কল হতে পারে। debounceTime ব্যবহার করে আপনি নির্দিষ্ট সময় অপেক্ষা করে কেবল শেষ ইনপুটের ভিত্তিতে রেসপন্স করতে পারেন, যা পারফরম্যান্স উন্নত করে।

আগের কোড:

searchController.stream.listen((query) {
  print('সার্চ করা হচ্ছে: $query');
});

ডিবাউন্স করার পর (debounceTime() (যেমন rxdart এর মাধ্যমে) ব্যবহার করে আপনি ইনপুটে রেসপন্স দেওয়া কিছু সময়ের জন্য বিলম্বিত করতে পারেন — যতক্ষণ না ইউজার টাইপিং থামায়।):

searchController.stream
  .debounceTime(Duration(milliseconds: 300))
  .listen((query) {
    print('সার্চ করা হচ্ছে: $query');
  });

একটি ইনপুটের পরে ৩০০ মিলিসেকেন্ড ইনঅ্যাকটিভ থাকবে এর পরেই পরের API কল হবে।

ডিবাউন্স কেন দরকার?

ডিবাউন্স না করলে কয়েক সেকেন্ডেই আপনি ২০টারও বেশি API কল ট্রিগার করে ফেলতে পারেন। ডিবাউন্স এই অপ্রয়োজনীয় “noise” কমায়, ব্যান্ডউইথ বাঁচায় এবং অ্যাপকে আরও দ্রুত ও স্মার্টভাবে রেসপন্ড করতে সাহায্য করে।

কখন এটা তেমন উপকারে আসে না?

যদি ইউজার একেবারে টাইপিং বন্ধ করে দেয় (মানে একদম ইনঅ্যাকটিভ হয়ে যায়), তাহলে ডিবাউন্স টাইম পিরিয়ড শেষে স্ট্রিম শেষ ইনপুটটাই এমিট করবে। ইনঅ্যাকটিভ হওয়াটা নিজে থেকে ইভেন্ট বাতিল করে না — বরং ডিবাউন্স নিশ্চিত করে যে, সবশেষ কাজটাই নির্দিষ্ট দেরি পর রেসপন্স ট্রিগার করবে।

২. একই ডেটা বারবার এড়ানো

stream.distinct().listen((value) {
  print('প্রাপ্ত: $value');
});

একই মান পরপর আসলে একবারই প্রসেস করা হবে।

৩. প্রয়োজন শেষ হলে স্ট্রিম বন্ধ করা

final subscription = stream.listen(print);

// পরে
await subscription.cancel();

৪. সবসময় Controller বন্ধ করা

final controller = StreamController<int>();
controller.sink.add(1);
await controller.close();

না বন্ধ করলে মেমোরি লিক হতে পারে।


বাস্তব উদাহরণ: Flutter অ্যাপে ডিবাউন্স সার্চ

final searchController = StreamController<String>.broadcast();

TextField(
  onChanged: (value) => searchController.sink.add(value),
);

searchController.stream
  .debounceTime(Duration(milliseconds: 500))
  .listen((searchQuery) {
    fetchResults(searchQuery);
  });

ব্যবহারকারী যত দ্রুত টাইপই করুক না কেন, ৫০০ মিলিসেকেন্ড থেমে তবেই সার্চ চালানো হবে।


উপসংহার

Stream Dart-এর অ্যাসিনক্রোনাস প্রোগ্রামিংয়ের একটি শক্তিশালী দিক। সঠিকভাবে Stream তৈরি, ব্যবহার, রূপান্তর ও অপ্টিমাইজ করতে পারলে আপনার অ্যাপ আরও দ্রুততর, স্মার্ট ও user-friendly হবে। লাইভ আপলোড ট্র্যাকিং, রিয়েলটাইম ডেটা সিঙ্কিং, বা ইউজার ইভেন্টে রেসপন্স — সব ক্ষেত্রেই Stream দক্ষভাবে ব্যবহার করা আপনাকে এগিয়ে রাখবে।

Last updated