كيفية تبسيط التزامن باستخدام النمذجة التفاعلية على Android

نشرت: 2022-03-11

التزامن وعدم التزامن متأصلان في برمجة الأجهزة المحمولة.

يمكن أن يكون التعامل مع التزامن من خلال البرمجة ذات النمط الإلزامي ، وهو ما تتضمنه البرمجة على Android عمومًا ، سببًا للعديد من المشكلات. باستخدام البرمجة التفاعلية مع RxJava ، يمكنك تجنب مشاكل التزامن المحتملة من خلال توفير حل أكثر نظافة وأقل عرضة للخطأ.

بصرف النظر عن تبسيط المهام المتزامنة غير المتزامنة ، توفر RxJava أيضًا القدرة على أداء عمليات النمط الوظيفي التي تحول وتجمع وتجمع الانبعاثات من المرصودة حتى نحقق النتيجة المرجوة.

من خلال الجمع بين النموذج التفاعلي وعمليات الأسلوب الوظيفي في RxJava ، يمكننا نمذجة مجموعة واسعة من بنيات التزامن بطريقة تفاعلية ، حتى في عالم Android غير التفاعلي. في هذه المقالة ، سوف تتعلم كيف يمكنك القيام بذلك بالضبط. سوف تتعلم أيضًا كيفية اعتماد RxJava في مشروع قائم بشكل تدريجي.

إذا كنت جديدًا على RxJava ، فإنني أوصي بقراءة المنشور هنا الذي يتحدث عن بعض أساسيات RxJava.

تجسير المواد غير التفاعلية في العالم التفاعلي

أحد تحديات إضافة RxJava كواحدة من المكتبات إلى مشروعك هو أنه يغير بشكل جذري الطريقة التي تفكر بها بشأن التعليمات البرمجية الخاصة بك.

يتطلب منك RxJava التفكير في البيانات على أنها مدفوعة بدلاً من سحبها. على الرغم من أن المفهوم بحد ذاته بسيط ، إلا أن تغيير قاعدة التعليمات البرمجية الكاملة التي تستند إلى نموذج السحب يمكن أن يكون أمرًا شاقًا بعض الشيء. على الرغم من أن الاتساق دائمًا ما يكون مثاليًا ، فقد لا يكون لديك دائمًا امتياز إجراء هذا الانتقال عبر قاعدة التعليمات البرمجية بأكملها دفعة واحدة ، لذلك قد تكون هناك حاجة إلى المزيد من النهج التدريجي.

ضع في اعتبارك الكود التالي:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

تحصل هذه الوظيفة على قائمة بكائنات User من ذاكرة التخزين المؤقت ، وتقوم بتصفية كل عنصر بناءً على ما إذا كان لدى المستخدم مدونة أم لا ، وفرزها حسب اسم المستخدم ، ثم تقوم في النهاية بإعادتها إلى المتصل. بالنظر إلى هذا المقتطف ، نلاحظ أن العديد من هذه العمليات يمكن أن تستفيد من مشغلي RxJava ؛ على سبيل المثال ، filter() sorted() .

تعطينا إعادة كتابة هذا المقتطف:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

يحول السطر الأول من الوظيفة List<User> التي يتم إرجاعها بواسطة UserCache.getAllUsers() إلى عنصر يمكن ملاحظته Observable<User> عبر fromIterable() . هذه هي الخطوة الأولى في جعل كودنا تفاعليًا. الآن بعد أن أصبحنا نعمل على Observable ، يمكننا هذا من تنفيذ أي عامل يمكن ملاحظته في مجموعة أدوات Observable - filter() والفرز sorted() في هذه الحالة.

هناك بعض النقاط الأخرى التي يجب ملاحظتها حول هذا التغيير.

أولاً ، توقيع الطريقة لم يعد هو نفسه. قد لا تكون هذه صفقة ضخمة إذا تم استخدام استدعاء الأسلوب هذا فقط في أماكن قليلة وكان من السهل نشر التغييرات إلى مناطق أخرى من المكدس ؛ ومع ذلك ، إذا كان يكسر العملاء الذين يعتمدون على هذه الطريقة ، فإن ذلك يمثل مشكلة ويجب إعادة توقيع الطريقة.

ثانيًا ، تم تصميم RxJava مع وضع الكسل في الاعتبار. أي أنه لا ينبغي إجراء عمليات طويلة في حالة عدم وجود مشتركين في Observable . مع هذا التعديل ، لم يعد هذا الافتراض صحيحًا حيث يتم استدعاء UserCache.getAllUsers() حتى قبل وجود أي مشتركين.

مغادرة العالم التفاعلي

لمعالجة المشكلة الأولى من التغيير الذي أجريناه ، يمكننا الاستفادة من أي من عوامل الحظر المتاحة لـ Observable مثل blockingFirst() و blockingNext() . بشكل أساسي ، سيتم حظر كل من هذين المشغلين حتى يتم إصدار عنصر في اتجاه المصب: سيعيد blockingFirst() العنصر الأول المنبعث والانتهاء ، في حين أن blockingNext() سيعيد Iterable والذي يسمح لك بتنفيذ حلقة لكل حلقة على البيانات الأساسية ( كل تكرار خلال الحلقة سوف يمنع).

من الآثار الجانبية لاستخدام عملية الحظر التي من المهم أن تكون على دراية بها ، هو أن الاستثناءات يتم طرحها على مؤشر ترابط الاستدعاء بدلاً من تمريرها إلى طريقة onError() الخاصة بالمراقب.

باستخدام عامل حظر لتغيير توقيع الطريقة مرة أخرى إلى List<User> ، سيبدو مقتطفنا الآن على النحو التالي:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

قبل استدعاء عامل الحظر (أي blockingGet() ) ، نحتاج أولاً إلى ربط عامل التشغيل التجميعي toList() بحيث يتم تعديل الدفق من عنصر Observable<User> إلى Single<List<User>> ( Single هو نوع خاص من الملاحظة التي ترسل قيمة واحدة فقط في onSuccess() ، أو خطأ عبر Observable onError() ).

بعد ذلك ، يمكننا استدعاء عامل blockingGet() الذي يقوم بفك ملف Single ويعيد List<User> .

على الرغم من أن RxJava تدعم هذا ، إلا أنه يجب تجنب ذلك قدر الإمكان لأن هذه ليست برمجة تفاعلية اصطلاحية. عند الضرورة القصوى ، فإن عوامل الحظر هي طريقة أولية لطيفة للخروج من العالم التفاعلي.

النهج الكسول

كما ذكرنا سابقًا ، تم تصميم RxJava مع مراعاة الكسل. بمعنى ، يجب تأجيل العمليات طويلة المدى لأطول فترة ممكنة (على سبيل المثال ، حتى يتم استدعاء الاشتراك في Observable ). لجعل حلنا كسولًا ، نستخدم عامل التشغيل defer() .

defer() يأخذ مصنع ObservableSource الذي ينشئ Observable لكل مراقب جديد يشترك. في حالتنا ، نريد إرجاع Observable.fromIterable(UserCache.getAllUser()) كلما اشترك مراقب.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

الآن بعد أن تم تغليف العملية طويلة المدى في defer() ، لدينا سيطرة كاملة على الخيط الذي يجب أن يعمل فيه ببساطة عن طريق تحديد Scheduler المناسب في subscribeOn() . مع هذا التغيير ، يكون الكود الخاص بنا تفاعليًا بالكامل ويجب أن يتم الاشتراك فقط في الوقت الذي تكون فيه البيانات مطلوبة.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

عامل آخر مفيد جدًا لتأجيل الحساب هو طريقة fromCallable() . على عكس defer() ، الذي يتوقع أن يتم إرجاع Observable في دالة lambda وبالتالي "تسطيح" الملاحظة التي تم إرجاعها ، فإن Observable fromCallable() lambda ويعيد القيمة إلى المصب.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

المنفردة التي تستخدم fromCallable() في قائمة ستعيد الآن قائمة يمكن Observable<List<User>> ، نحتاج إلى تسوية هذه القائمة باستخدام flatMap() .

رد الفعل-كل شيء

من الأمثلة السابقة ، رأينا أنه يمكننا لف أي كائن في " Observable " والقفز بين الحالات غير التفاعلية والمتفاعلة باستخدام عمليات الحجب والتأجيل ( defer() / fromCallable() . باستخدام هذه التركيبات ، يمكننا البدء في تحويل مناطق تطبيق Android لتكون تفاعلية.

عمليات طويلة

من الأماكن الجيدة للتفكير مبدئيًا في استخدام RxJava عندما يكون لديك عملية تستغرق بعض الوقت لإنجازها ، مثل مكالمات الشبكة (راجع المنشور السابق للحصول على أمثلة) ، أو قراءة القرص وكتابته ، وما إلى ذلك ، يوضح المثال التالي وظيفة بسيطة سيكتب نصًا إلى نظام الملفات:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

عند استدعاء هذه الوظيفة ، نحتاج إلى التأكد من إجرائها على مؤشر ترابط منفصل لأن هذه العملية محظورة. يؤدي فرض مثل هذا التقييد على المتصل إلى تعقيد الأمور للمطور مما يزيد من احتمالية حدوث أخطاء ويمكن أن يؤدي إلى إبطاء التطور.

ستساعد إضافة تعليق على الوظيفة بالطبع في تجنب أخطاء المتصل ، لكن هذا لا يزال بعيدًا عن الرصاص.

باستخدام RxJava ، يمكننا بسهولة التفاف هذا في Observable وتحديد Scheduler الذي يجب تشغيله عليه. بهذه الطريقة ، لا يحتاج المتصل إلى القلق على الإطلاق من استدعاء الوظيفة في سلسلة منفصلة ؛ ستعتني الوظيفة بهذا الأمر بنفسه.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

باستخدام fromCallable() ، يتم تأجيل كتابة النص إلى ملف حتى وقت الاشتراك.

نظرًا لأن الاستثناءات هي كائنات من الدرجة الأولى في RxJava ، فإن إحدى المزايا الأخرى للتغييرات التي أجريناها هي أننا لم نعد بحاجة إلى التفاف العملية في كتلة try / catch. سيتم ببساطة نشر الاستثناء في اتجاه مجرى النهر بدلاً من ابتلاعه. يتيح ذلك للمتصل التعامل مع الاستثناء الذي يراه مناسبًا (على سبيل المثال ، إظهار خطأ للمستخدم بناءً على الاستثناء الذي تم طرحه ، وما إلى ذلك).

أحد التحسينات الأخرى التي يمكننا إجراؤها هو إرجاع "قابل Completable " بدلاً من "قابل Observable ". يعد Completable في الأساس نوعًا خاصًا من Observable - على غرار Single - يشير ببساطة إلى ما إذا كان الحساب قد نجح ، عبر onComplete() ، أو فشل ، عبر onError() . يبدو أن إرجاع "قابل Completable " يبدو أكثر منطقية في هذه الحالة لأنه يبدو من السخف إعادة حقيقة واحدة في دفق Observable .

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

لإكمال العملية ، نستخدم عملية fromAction() لـ Completable لأن القيمة المرتجعة لم تعد تهمنا. إذا لزم الأمر ، مثل Observable ، فإن Completable تدعم أيضًا fromCallable() و defer() .

استبدال عمليات الاسترجاعات

حتى الآن ، تصدر جميع الأمثلة التي نظرنا إليها إما قيمة واحدة (على سبيل المثال ، يمكن نمذجتها على أنها Single ) ، أو تخبرنا ما إذا كانت العملية قد نجحت أو فشلت (على سبيل المثال ، يمكن نمذجتها على أنها قابلة Completable ).

كيف يمكننا تحويل المناطق في تطبيقنا التي تتلقى تحديثات أو أحداث مستمرة (مثل تحديثات الموقع وعرض أحداث النقر وأحداث المستشعر وما إلى ذلك)؟

سننظر في طريقتين للقيام بذلك ، باستخدام create() واستخدام Subjects .

يسمح لنا create() باستدعاء onNext() للمراقب صراحةً onComplete() | onError() لأننا نتلقى تحديثات من مصدر البيانات الخاص بنا. لاستخدام create() ، نقوم بتمرير ObservableOnSubscribe الذي يتلقى ObservableEmitter كلما اشترك مراقب. باستخدام الباعث المستلم ، يمكننا بعد ذلك إجراء جميع مكالمات الإعداد اللازمة لبدء تلقي التحديثات ثم استدعاء حدث Emitter المناسب.

في حالة تحديثات الموقع ، يمكننا التسجيل لتلقي التحديثات في هذا المكان وإرسال تحديثات الموقع كما وردت.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

تطلب الوظيفة داخل استدعاء create() تحديثات الموقع وتمرير رد اتصال يتم استدعاؤه عندما يتغير موقع الجهاز. كما نرى هنا ، فإننا نستبدل بشكل أساسي واجهة نمط رد الاتصال وبدلاً من ذلك نبعث الموقع المستلم في الدفق المرصود الذي تم إنشاؤه (من أجل الأغراض التعليمية ، تخطيت بعض التفاصيل مع إنشاء طلب موقع ، إذا كنت تريد الخوض أعمق في التفاصيل يمكنك قراءتها هنا).

هناك شيء آخر يجب ملاحظته حول create() وهو أنه كلما تم استدعاء subscribe() ، يتم توفير باعث جديد. بعبارة أخرى ، تُرجع الدالة create() قيمة Observable باردة. هذا يعني أنه في الوظيفة أعلاه ، من المحتمل أن نطلب تحديثات الموقع عدة مرات ، وهذا ليس ما نريده.

للتغلب على هذا ، نريد تغيير الوظيفة لإرجاع Observable ساخنة بمساعدة Subjects .

أدخل الموضوعات

يقوم Subject بتمديد Observable وتنفيذ Observer في نفس الوقت. هذا مفيد بشكل خاص عندما نريد إرسال نفس الحدث أو إرساله إلى عدة مشتركين في نفس الوقت. من حيث التنفيذ ، نود أن نعرض Subject باعتباره موضوعًا Observable للعملاء ، مع الاحتفاظ به كموضوع Subject .

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

في هذا التطبيق الجديد ، يتم استخدام النوع الفرعي PublishSubject الذي يرسل الأحداث فور وصولها بدءًا من وقت الاشتراك. وفقًا لذلك ، إذا تم تنفيذ الاشتراك في نقطة تم فيها بالفعل إصدار تحديثات الموقع ، فلن يتم استلام الانبعاثات السابقة من قبل المراقب ، ولكن الانبعاثات اللاحقة فقط. إذا كان هذا السلوك غير مرغوب فيه ، فهناك نوعان من الأنواع الفرعية الأخرى للموضوعات في مجموعة أدوات Subject التي يمكن استخدامها.

بالإضافة إلى ذلك ، أنشأنا أيضًا وظيفة connect() منفصلة والتي تبدأ طلب تلقي تحديثات الموقع. لا يزال بإمكان observeLocation() إجراء استدعاء connect() ، لكننا أعدناه خارج الوظيفة من أجل الوضوح / البساطة.

ملخص

لقد درسنا عددًا من الآليات والتقنيات:

  • defer() ومتغيراته لتأخير تنفيذ الحساب حتى الاكتتاب
  • Observables الباردة التي تم إنشاؤها من خلال create()
  • Observables الساخنة باستخدام Subjects
  • blockingX عندما نريد مغادرة العالم التفاعلي

نأمل أن تكون الأمثلة الواردة في هذه المقالة قد ألهمت بعض الأفكار المتعلقة بالمناطق المختلفة في تطبيقك والتي يمكن تحويلها إلى رد الفعل. لقد غطينا الكثير وإذا كان لديك أي أسئلة أو اقتراحات أو إذا كان هناك أي شيء غير واضح ، فلا تتردد في ترك تعليق أدناه!

إذا كنت مهتمًا بمعرفة المزيد عن RxJava ، فأنا أعمل على كتاب متعمق يشرح كيفية عرض المشكلات بطريقة رد الفعل باستخدام أمثلة Android. إذا كنت ترغب في تلقي تحديثات حوله ، يرجى الاشتراك هنا.