BLOG

รับมือกับ Event Stream ด้วย Reactive Extension (Rx)

ได้ยินมานานแล้วกับ Reactive เราเอามันมาใช้ตอนไหนได้บ้างนะ?

เรื่องหนึ่งที่ผมสนใจมากนานมากๆ แล้วก็คือ วิธีการเขียนแบบ Reactive ด้วย Reactive Extension หรือ ReactiveX หรือ Rx (ไม่เกี่ยวกับ React.js นะ แต่จะเกี่ยวกับ RxJs ซึ่งคือ Reactive Extensions สำหรับ JavaScript มากกว่า) 

Reactive Extension นั้น เป็นการ Implement แนวคิด Reactive Programming โดยทีมของไมโครซอฟท์เพื่อใช้งานภายใน ก่อนจะ Release เป็น Library สำหรับ C# เมื่อราวๆ 10 ปีก่อน และพัฒนาขึ้นสำหรับภาษาอื่นๆ ตามมา (คิดว่าน่าจะเป็นโปรเจค Open Source แรกๆ ของ Microsoft เลยนะ)

ผมได้ยินครั้งแรกตอนที่ไปร่วมงาน MVP Global Summit ตั้งแต่ยังไม่ Release และจำได้เลยว่า ในห้องที่ไปนั่งฟังคนคิดเขา Present ให้ฟัง (ด้วยเดโมที่เป็นภาษา C#) หลายคนก็สงสัยว่า จะทำไปทำไมฟระ~! เพราะว่า ภาษา C# เป็นภาษาที่มี Function Pointer (Delegate) - คือเราสามารถส่ง Function หนึ่ง ให้อีก Function เรียกใช้งานได้ และยังมีระบบ Event คือ เราสามารถ "ลงทะเบียน" Function Pointer หลายๆ ตัวไว้ และให้ทุก Function ที่ลงทะเบียนไว้ สามารถได้ถูกเรียกได้ตามลำดับที่ลงทะเบียนเข้ามาด้วย สำหรับภาษาที่ไม่มีระบบ Event เช่น JavaScript, C++ เนี่ย การมี Rx ดูเข้าใจมากกว่า

ในเมื่อภาษามันก็มีอยู่แล้ว แต่ Microsoft กลับมีทีมงานที่พัฒนาระบบที่เหมือนว่าจะทำซ้ำกัน สำหรับใช้กับภาษานี้ด้วย มันต้องมี Something! 🤔

ปีนี้ เข้าใจถ่องแท้ว่ามันมีไว้ทำอะไร เลยขออาสามาเล่าให้ฟังกัน

นันคอม กับการรับมือบอลยักษ์ ที่ซีแอทเติ้ล~ 😝

มันเริ่มมาจาก Kafka

Kafka เป็นระบบสำหรับ "ประกาศให้โลก(ของระบบเรา)รู้ ว่ามีอะไรเกิดขึ้น" โดยมองว่า สิ่งที่เกิดขึ้นในระบบเป็น Stream of Events และ Kafka เป็นผู้ที่รวบรวม Event เหล่่านั้นไว้ที่เดียวกัน และกระจายบอกต่อให้คนที่สนใจได้รับฟัง

Kafka เดิมพัฒนาโดย LinkedIn ก่อนจะบริจาคให้กับ Apache Software Foundation และ Release เป็น Open Source ครั้งแรกเมื่อประมาณ 10 ปีก่อน พร้อมๆ กับ Rx เหมือนกัน โดยในช่วงนั้น เรากำลังตื่นเต้นกับการทำ Service Oriented Architecture (SOA) และ XML Web Service กันมาก (สมัยนั้น JSON ยังงมๆ กันอยู่เลยแหละ) และแนวคิด Microservices / Serverless ยังไม่เคยมีใครพูดถึงเลยมั๊งนะ พอตอนนี้ใช้ Serverless/Micro-Services กันหมด Kafka เลยมีคนสนใจกันเยอะเลย

เพราะว่าพอเราเริ่มทำให้ทุกอย่างกลายเป็น Service เราจะเริ่มเจอปัญหาของการคุยกันระหว่าง Service ขึ้น จะให้ Servcie เรียกกันไปกันมา ก็จะงง แถมยังเกิด Dependency ระหว่าง Service กันอีก จึงเป็นที่มาของการคิด Service Bus/Message Bus/Message Queue อะไรมากมายวุ่นวายไปหมด เพื่อที่จะให้ Service ทุกตัวทำงานอิสระจากกันได้ ไม่ต้องรอกันไป รอกันมา ไม่ต้องคอย Loop ถามกัน โดย Kafka ก็เป็นหนึ่งในระบบที่ออกมาเพื่อพยายามแก้ปัญหานี้


ภาพ: Event-Driven Microservices Architecture / confluent.io

ความแปลกของ Kafka กว่าระบบ Bus/Queue อื่นๆ ในตอนนั้น คือมันทำหน้าที่เป็นเหมือน Log ของเหตุการณ์ทั้งหมดที่เกิดขึ้นในระบบ โดยแยกเป็นหัวข้อๆ ("Topic") โดย Producer จะเป็นคนส่งข้อมูลเข้า Kafka เข้าใน Topic ที่เกี่ยวข้อง เช่น ถ้าเป็น Topic ของการจ่ายเงิน ชื่อว่า "Payments" ระบบจ่ายเงินก็จะเป็นคนส่งบอกเข้ามา ว่า "คำสั่งซื้อ ABC00123 จ่ายเงินแล้วนะ" เป็นต้น (ส่วนบันทึกการจ่ายเงิน ก็เก็บอยู่ในระบบการจ่ายเงิน ไม่ได้ส่งมา)

เราอาจออกแบบให้ระบบตัดสต็อกสินค้า เป็น Cosumer ของ Topic "Payments" ในขณะเดียวกัน มันก็อาจจะเป็น Producer ของ Topic "Inventory" ได้ด้วยเหมือนกัน

นั่นก็คือ ระบบตัดสต็อกสินค้า เมื่อเข้ามา Subscribe ใน Topic "Payments" นี้ ก็จะได้รับการ Notify จาก Kafaka ว่า "คำสั่งซื้อ ABC00123 มีการจ่ายเงินแล้วนะ" พอรู้แบบนี้ ระบบตัดสต็อก ก็มีหน้าที่ในการตัดส็อกสินค้าในฐานข้อมูล แล้วก็ส่งไปบอก Kafka ใน Topic "Inventory" ว่า "คำสั่งซื้อ ABC00123 นั้น ตัดสต๊อกแล้วนะ" ระบบปลายน้ำอื่นๆ เช่น เช่น ระบบพิมพ์ใบส่งสินค้า ระบบแจ้งพนักงานหยิบสินค้า เป็นต้น ก็จะรู้ว่าตัวเองต้องทำอะไรต่อ หลังจากการตัดสต็อกสำเร็จแล้ว

เรียกว่า ถ้าไม่มี Kafka แล้ว ระบบแบบ Micro-Services/Serverless แทบจะเป็นไปไม่ได้เลย (นอกจากเขียนระบบแบบ Kafka ขึ้นมาเอง 😅)

นอกจากนี้ ข้อมูลที่ส่งเข้ามาใน Kafka จะถูกเก็บไว้แบบถาวร สามารถย้อนเวลาดูข้อมูลทั้งหมดได้ ไม่สาบสูญไปทันทีเหมือนส่งเข้า Queue แล้วมีคนมาหยิบออกไป หรือ Service Bus มันจึงเหมือนเป็น Database สำหรับเก็บ Log ที่มีระบบ Notification ด้วยในตัว ประมาณนั้น

พอดีว่า ช่วงสองสามเดือนก่อน ผมมานั่งไล่อัพเดทเทคโนโลยี จึงพบว่าตอนนี้ทุกคนกำลังชอบ Kafka กันมาก เลยทำให้นึกขึ้นได้ว่า เฮ้ย นี่มันการทำงานของ Rx เลยนี่!!! เลยเกิดสนใจที่จะย้อนมาศึกษาให้ลึกซึ้งขึ้น (ประมาณว่า ตกลงว่ามันจำเป็นว่ะ!!!) เพราะว่าผมเอง ก็กำลังเจอปัญหานี้อยู่เหมือนกัน แต่ว่าอยู่ในระดับ Micro Services ที่อยู่ในโปรแกรม SystemX ของเรา นั่นก็คือ โมดูลบางตัว ต้องอาศัยข้อมูลจากอีกตัว ซึ่งตัวอย่างที่ชัดเจนที่สุด ก็คือข้อมูลจากระบบอ่านค่า Sensors ที่เป็นต้นทางข้อมูลของระบบอื่นๆ หลายตัว เช่น SystemXD, ระบบเก็บ Log, UI เพื่อแสดงผล

เรียกได้ว่า SystemX นั้นต้องการ Kafka อยู่ในตัวมันมากๆ เลย! 🤯

ปัญหาที่เกิดจากการใช้ Event ใน C# หรือ การใช้ Observer Pattern ธรรมดา

เพื่อให้เราเข้าใจตรงกัน ให้เรานิยามคำให้ชัดเจนก่อน แบบนี้นะ

  • Function Pointer คือ Delegate ในภาษา C# เช่น Action<>, Func<> รวมไปถึงตัวแปร ชนิด Delegate ที่เราประกาศขึ้นมาเอง เช่น
     
    public delegate int Adder(int lhs, int rhs);  // นี่คือ Delegate
    
    Adder implementation = (lhs, rhs) => lhs + rhs;
    // implementation นี่คือตัวแปร ชนิด Delegate Adder
    
    Func<int, int, int> aFunc = (lhs, rhs) => lhs + rhs;
    // aFunc นี่คือตัวแปร ชนิด Delegate Func ที่รับ int 2 ตัว return int
    
    สำหรับใน JavaScript ก็คือตัวแปรที่เราใช้เก็บ Function เช่น
    var implementation = (lhs, rhs) => lhs + rhs;
    var implementation = function( lhs, rhs ) { return lhs + rhs; };
    
  • Observer Pattern คือการ ส่ง Function Pointer / instance ของ Class เรา ไปให้อีก Class หนึ่ง เพื่อให้ Class นั้น เรียก Function ของเรา (ถ้าเป็น Function Pointer) หรือ Function ที่เราตกลงกัน (ถ้าเราใช้ Instance ของคลาส) เมื่อเกิดเหตุกาณ์ที่เราสนใจขึ้น ขอยืมภาพ Wikipedia มาเลยนะ

    ในภาพนี้ก็คือ เราตกลงกันว่า Observer จะมี Method ชื่อ Update แล้วใน Subject (เหมือน Topic ของ Kafka) ก็มี Method ให้ส่ง Instance ของ Observer เข้าไป เพื่อรับการ Notify พอเกิดเหตุขึ้นใน Subject (ในภาพนี้คือประมาณว่า State เปลี่ยน) ตัว Subject ก็จะทำการ "Fire Event" หรือ ยิง Event โดยการ For Loop ไปใน Array ของ Observer ทั้งหมด แล้วเรียก Method Update นั้นเอง



    ถ้าท่านที่เคยใช้ C# / VB อยู่แล้ว หรือว่าเขียน Jquery/Pure Javascript มาก่อน ก็จะเข้าใจว่า เนี่ยมันคือระบบ Event ชัดๆ ถ้าเป็น C# จะตรงกับ Observer Pattern เป๊ะๆ ด้วย เพราะว่า Event ของ C# จะเป็นการ Add/Remove (attach/detach) แต่ของ JavaScript จะไม่ตรงเท่าไหร่ เพราะจะเป็นการเก็บค่าตัวแปรธรรมดา ก็คือถ้าเรา Set Function ใหม่เข้าไป คนที่เคยจับ Event อยู่จะโดนทับไป (เลยทำให้ RxJS ดู Make Sense กว่า Rx ของ C# มากๆ และถ้าเคยใช้ jQuery / AngularJS จะพบว่า เขาทำระบบ Event ของเขาเองขึ้นมากันเลยทีเดียว)

ซึ่งปัญหาที่เกิดขึ้นของการใช้ระบบ Event ธรรมดาใน C# (น่าจะรวมถึง Observer Pattern ทั่วไปด้วย) จากที่พบใน SystemX ก็คือ:

  • จับ Event ไม่ทัน

    อย่างเช่น ถ้าเรามี Event ชื่อ "DataAvailable" เพื่อให้ Class อื่นมาจับ แล้วเราอยากให้ทุก Class เริ่มทำงานพร้อมกัน แยก Thread ใคร Thread มัน แล้วจับ Event (แน่นอนว่า นี่ก็ใช้ Dependency Injection เพื่อตามหากันให้เจอแล้วด้วย) โดยส่วนมากแล้ว เราจะพลาด Event DataAvailable นี้ไปบ้างก็ได้ เช่นถ้า DataAvailable นี้ มาเป็นจังหวะ ก็พลาดอันแรกไป อาจยังไม่เป็นไร แต่ถ้า Class ที่สนใจ DataAvailable นี้ รอว่า เมื่อไหร่มี DataAvailable แล้วทำอะไรต่อ (เช่น ตอบกลับต้นทางของ Data) โค๊ดที่ควรจะทำงาน ก็จะไม่มีวันทำงานเลยก็ได้

    พอแบบนี้ กลายเป็นว่า ทุกคลาส จะต้องมีการเรียกคำสั่งที่ชื่อประมาณว่า GetCurrentStatus เพื่อดูว่า ก่อนที่ฉันจะจับ Event DataAvailable ทัน มันมี Data มาแล้วหรือยัง โดยเรียกดูเมื่อทำงานครั้งแรกทุกครั้ง ส่วนฝั่งเจ้าของ DataAvailable ก็จะต้องเก็บตัวแปรประมาณว่า LatestData เอาไว้ให้ในคลาส เพื่อให้คนอื่นเข้ามาเช็คได้
     
  • Pattern การใช้ Event ไม่ส่งเสริมให้มีการปล่อยจาก Event

    อันนี้คอนเฟิร์มด้วยน้องโย่เลย น้องโย่เพิ่งเคยรู้ว่า event ของ C# มันสามารถเขียน Add/Remove ได้ เมื่อผมเล่าเรื่อง Rx ให้ฟังเนี่ยแหละ 😆

    เนื่องจากว่า การใช้ Event 99% ก็จะใช้แค่จับ Event ทั่วไปบนหน้า UI เช่น จับปุ่ม Click พอหน้าจอปิดไป เราก็ไม่จำเป็นจะต้องปล่อย Event ก็ได้ เพราะยังไงโค๊ดเรามันก็ปิดไปพร้อมหน้าจอ เราก็เลยไม่ค่อยเคยชินกับการปล่อย Event เท่าไหร่

    ปัญหาจะเกิด เมื่อเป็นการจับ Event ระหว่าง Class คือเราประยุกต์เอา Event มาใช้ทำ Observable Pattern เช่น ถ้าเราไปจับ Event ของ Class Time ที่มีการ Notify ทุกๆ 1 วินาที พอคลาสของเรา จบการทำงานไปแล้ว เช่น พ้น Scope, ปิดหน้าจอ แต่เราไม่ปล่อย Event ออก ตัว Function ที่เราส่งไป ก็จะยังคงได้รับการเรียกอยู่เรื่อยๆ ต่อให้การทำงานควรจะจบแล้วก็ตาม และมันก็จะไม่ถูก Garbage Collector เก็บไปด้วย เพราะว่า มันยังติดกับ Class Time อยู่

    สำหรับทางแก้ ก็คือ คลาสที่เป็นคนเข้าไปจับ Event ก็ควรจะ Implement IDisposable แล้วถอด Event เมื่อถูก Dispose แต่ส่วนตัวคิดว่าน้อยมากที่เราจะคิดถึงว่าจะต้องทำแบบนี้

    และอีกเรื่องก็คือ เวลาที่เราต้องการจะปล่อย Event ได้ เราจะต้องเก็บตัวแปรของ Function เอาไว้ด้วย เอาไว้ปล่อย ซึ่งถ้าเกิดว่าเขียนเป็น Lambda Expression ก็จะต้อง Assign Lambda ใส่ตัวแปรก่อน จากนั้นค่อยส่งตัวแปร ให้กับ Event อีกที สรุปว่า เขียนเป็น Function ปกติเลย จะง่ายกว่า (เพราะตัว Function เอง ชื่อของ Function ถือเป็น Delegate ของ Function อยู่แล้วไง)
Time t = new();
Action<long> processTime = (time) => {
 
	Console.WriteLine(time);
};
 
t.TimeHasSpassed += processTime;
t.TimeHasSpassed -= processTime;
  • การบริการ State ผิดเพี้ยน และเข้าใจยาก

    อย่างหนึ่งที่ผมต้องการจะทำมากๆ เลย ก็คือ "เมื่อไม่มี Class ไหน สนใจจับ Event เราแล้ว ให้เราปิดตัวเองลง จนกว่าจะมีใครสนใจเราใหม่" แต่ถ้าจะเขียนให้ได้แบบนี้ โค๊ดก็จะกระจายอยู่สามที่เป็นอย่างน้อย คือ
     
    • โค๊ดตรงที่เป็นการทำงานจริงๆ ที่เรียก Event ให้ทำงาน
    • โค๊ดตรง "add" ของ Event เพื่อเริ่ม
    • โค๊ดตรง "remove" ของ Event เพื่อหยุด

และยังรวมไปถึงโค๊ดคนที่เรียกใช้ ที่จะต้องเก็บ Lambda ใส่ตัวแปรไว้ เพื่อใช้ปล่อย Event ด้วยเพื่อให้ Event มันหยุดทำงานได้ (จากข้อที่ผ่านมา) ทำให้การสื่อสารที่จะให้คนอ่านโค๊ดเข้าใจว่า เราจะหยุดเมื่อไม่มีคนสนใจ ทำได้ลำบากขึ้น อันนี้เป็นตัวอย่างของคลาส ที่ทำการเริ่ม/หยุดการทำงาน เมื่อมีคน Add/Remove Event Handler จะเห็นว่าโค๊ดที่เกี่ยวกับเรื่องนี้ทั้งหมด กระจายกันอยู่สามส่วน อย่างที่บอกไปเลย

public class TimeStopsWhenNoOneCares
{
    private long _Time;
    private CancellationTokenSource _Canceller;
 
    private Action<long> _TimeHasSpassed;
    public event Action<long> TimeHasSpassed
    {
        add
        {
            _TimeHasSpassed += value;
            if (_Canceller == null)
            {
                _Canceller = new CancellationTokenSource();
                this.RunTimeInterval(_Canceller.Token);
            }
        }
        remove
        {
            _TimeHasSpassed -= value;
            if (_TimeHasSpassed == null)
            {
                _Canceller.Cancel();
            }
        }
    }
 
    private void RunTimeInterval(CancellationToken token)
    {
        Task.Run(() => {
 
            while (token.IsCancellationRequested == false)
            {
                _TimeHasSpassed?.Invoke(_Time);
 
                Task.Delay(1000).Wait();
                _Time++;
            }
            _Canceller?.Dispose();
            _Canceller = null;
        });
    }
}
 
TimeStopsWhenNoOneCares t = new();
Action<long> processTime = (time) => {
 
    Console.WriteLine(time);
};
 
t.TimeHasSpassed += processTime;
 
Console.ReadLine();
 
t.TimeHasSpassed -= processTime;
 
Console.WriteLine( "Time stop" );
Console.ReadLine();
 
Console.WriteLine( "Time continue from last value" );
t.TimeHasSpassed += processTime;
 
 
Console.ReadLine();

ดูแบบ Gist ทางนี้ RXBlog - CSharp Timer Sample (Sample for RoslynPad) (github.com)

ปัญหาแก้ได้ด้วย Subject~ 🤩

อย่างแรกเลย เราจะต้องเข้าใจก่อนว่า "Event" ใน Rx นั้น จะเรียกว่า "Subject" หรือสิ่งที่เราสนใจเข้าไปรับฟัง ถ้าเปรียบเทียบกับ Kafka มันก็คือ "Topic" นั่นเอง

โดย Subject ใน Rx หลักๆ แล้ว มีอยู่ดังนี้ (ภาพจากเว็บ Rx)

  • PublishSubject ก็คือ Event แบบที่เราเข้าใจ ในภาพนี้คือมี Subject ที่ปล่อยบอล แดง เขียว น้ำเงินออกมา สองเส้นนอกกรอบสี่เหลี่ยมคือ Observer จะเห็นว่า ตอนที่ Observer คนที่สองนั้น เข้าไป Subscribe นั้น เข้ามาช้าไปหน่อย ได้รับแค่บอลสีน้ำเงินอันเดียว ซึ่งก็คือปัญหาแรกๆ เลยที่เราพบจากการใช้ Observer Pattern

  • ReplaySubject - เป็น Subject ที่เมื่อเราเข้าไปรับฟังแล้ว เราจะได้ค่า N ค่าล่าสุด ที่ตัว Subject นั้น เคยส่งให้ทุกคน เช่น ถ้า ReplaySubject นี้ กำหนดไว้ว่าจะส่ง 3 ค่าล่าสุดให้ทุกคน เมื่อ Observer เข้าไปรับฟัง ก็จะได้ค่า 3 ค่าล่าสุดเข้ามาทันที ตามภาพจะเห็นว่า บอลทั้งสามสี ถูกส่งให้กับ Observer คนที่สอง

  • BehaviorSubject - เป็น Subject ที่เมื่อเราเข้าไป "รับฟัง" แล้ว เราจะได้ค่าล่าสุด ที่ตัว Subject นี้เคยส่งให้ทุกคนมาทันที 1 ครั้ง เหมือนกับ ReplaySubject ที่กำหนด Buffer ไว้เป็น 1 แต่ต่างตรงที่ เราสามารถกำหนดค่าเริ่มต้นได้ ตั้งแต่ก่อนจะเกิดเหตุการณ์ (เหตุการณ์คือบอลสีแดง เขียว น้ำเงิน) อย่างในภาพคือเรากำหนดบอลสีชมพูไว้เป็นค่าเริ่มต้น พอ Observer เข้ามารับฟัง ก็ได้สีชมพูไปก่อนเลย ต่อให้ยังไม่เกิดอะไรขึ้นใน Subject ก็ตาม

จะเห็นว่า ด้วย Subject พวกนี้ ก็สามารถแก้ปัญหาแรกที่ผมพูดถึงกันแล้ว ถ้าเราใช้มันแบบเต็มๆ ชีวิตคงจะสดใจขึ้นแน่นอน งั้นเรามาดูกันว่า เราจะสร้าง Subject ของเราเองบ้างได้อย่างไร

การสร้าง Subject ใน Rx - ไม่ใช่การ Implement Class BehaviorSubject / ReplaySubject หรือเอาสองคลาสนี้มาใช้งาน!!!

สิ่งแรกเลยที่ผมทำระหว่างเรียนรู้ Rx ก็คือ การสร้าง Class ที่เป็นคลาสลูกของ BehaviorSubject กับเอาคลาส BehaviorSubject มาไว้ในคลาสของเราเอง ซึ่งเป็นการใช้ Rx ที่ผิดมหันต์! เพราะว่า ถ้าเราเอา XXXXSubject มาไว้ในคลาสเรา ก็มีค่าเท่ากับ การสร้าง Event ใน C# นั่นแหละ (พอเขียนแล้ว ย้อนไปอ่านแล้วจะเห็นชัดมาก เหอๆ 🤣) เพราะว่าเราก็จะต้องมานั่ง Manage State ของตัว Subject อยู่ดี

ในการใช้ Rx เราจะใช้คลาส Observable ในการสร้าง Subject โดยหลังจากศึกษามาราวๆ เดือนหนึ่ง และลองเขียนหลายๆ อย่าง สรุปว่า เขียนตามโครงสร้างนี้ได้เลยครับ เป็น Best Practices ที่ผมขอเสนอ 😎

using System.Reactive.Linq;
 
public class MyStateType
{
}
 
IObservable<MyStateType> observable = Observable.Create<MyStateType>(observer => {
 
    // should be CancellationTokenSource
    // but use bool so it easier to understand
    bool stop = false;
 
    // initialize and start the processing
    // in another thread because you have to
    // "Return" first to create an observable
    // otherwise the thread will stuck here
 
    // (or use theOtherClass.AnEvent += myhandler here instead
    // if you are subscribing to another event)
    Task.Run(() => {
 
        var myStateVariable = new MyStateType();
 
        // some long running operation    
        while (!stop) // or !canceller.Token.IsCancellationRequested
        {
            Task.Delay(1000);
 
            observer.OnNext(myStateVariable);
        }
    });
 
    // return function to call when observable was being destroyed
    return () =>
    {
        stop = true;
        // or canceller.Cancel();
        // or theOtherClass.AnEvent -= myhandler
 
 
        // dispose external resource
        // manage state
    };
 
});

ดูแบบ Gist ได้ทางนี้: https://gist.github.com/nantcom/27cf92f4bc9f4c17fe08dae3bfaad5f8

จุดสังเกตุ:

  • ทุกอย่างเขียนจบอยู่ในที่เดียว ตั้งแต่การสร้าง การทำงาน และการหยุดการทำงาน โดยอยู่ใน Function (Lambda) เดียวเลย ตอนที่ส่งให้ Observable.Create
  • ตัว Lambda ที่เราส่งให้กับ Observable.Create นั้น ไม่ได้ Return ค่าที่เราอยากจะส่ง แต่เป็นการ Return Function ที่ใช้ในการหยุดการทำงานของ Observable ที่เราสร้างขึ้น
  • รู้สึกไหมว่า มันเหมือนกับเราเขียนฟังก์ชั่นที่ Return เป็น IEnumerable ใน C# แล้วใช้คำสั่ง yield return เพื่อค่อยๆ คืนค่าทีละตัวๆ ออกไปให้คนเรียกฟังก์ชั่น? (สร้าง Iterator จาก Function) ถ้าเห็นแบบนั้นคือ เข้าใจถูกต้องแล้ว 🤩

สิ่งที่ต้องทราบ

  • การทำงานของเรา จะต้องไม่ Block การ Return Function ที่ใช้ในการหยุดการทำงานออกไป (โดยในตัวอย่าง ผมแสดงเป็นการเปิด Thread ใหม่ไปเลย โดยใช้ Task.Run)
  • ฟังก์ชั่นสำหรับหยุดการทำงาน ไม่ได้ถูกเรียกให้ทำงานทันที คิดซะว่าเป็นฟังก์ชั่นที่เราจะ Implement ใน void Dispose()

ส่วนการแก้ปัญหาทั้งสามข้อที่กล่าวมา ต้องเขียนให้เป็นคลาสเต็มๆ ก่อน แล้วจะเข้าใจยิ่งขึ้น

แต่ก่อนอื่น เราต้องไปรู้จักกับ Operator กันก่อน

Operator ของ Rx

เนื่องจากว่า เราสร้าง Function สำหรับทำงานส่งออกไป (Delegate) ดังนั้น ตัว Rx จึงสามารถเอา Function นี้ ไปจับใส่ Class อะไรให้เราก็ได้ จึงเป็นที่มาของ Operator ใน Rx ซึ่งมีอยู่เป็นสิบเลย ดูได้จากหน้านี้  http://reactivex.io/documentation/operators.html

ปกติแล้ว Observable ที่เราสร้างขึ้นจาก Observable.Create จะได้เป็น Observable สำหรับ Observer ตัวเดียว และเริ่มทำงานตอนที่มี Observer คนแรกเข้ามา Subscribe ซึ่งไม่ค่อยมีประโยชน์เท่าไหร่ มันจะมีค่าเท่ากับการเขียน ฟังก์ชั่น IEnumerable ใน C# และใช้ Yield Return นั่นแหละ (ซึ่งมันเรียกว่า Iterator Pattern) โดยตัวอย่างส่วนมาก ชอบไปเน้นเรื่องนี้ในช่วงแรกๆ เลยน่าจะทำให้คนที่เคยชินกับภาษา C# ซึ่งมีพวกนี้อยู่แล้ว งงว่าทำไมจะต้องมี Observable ด้วย

ตัว Observable ที่ใช้งานได้จริง จะต้องรองรับ Observer ได้หลายตัว จริงไหม? เราจึงจะต้องสนใจ Operator สามตัวนี้ เป็นพิเศษ ดังนี้

  • Publish - การใช้ Operator Publish คือการทำให้ Observable ที่เราสร้าง กลายเป็น Connectable Observable คือมีหลายคน ต่อเข้ามารับฟังพร้อมๆ กันได้ แบบในภาพตัวอย่างที่เว็บ ReactiveX แสดงให้เราดู

    แต่ว่า การที่จะให้ Publish นั้นเริ่มทำงาน เราจะต้องเรียก Method Connect() ที่ตัว Observable หรือว่าใช้ Operator AutoConnect ร่วมด้วย ตัว Observable จึงจะเริ่มทำงาน ไม่อย่างนั้น เราก็จะ Subscribe ไปเฉยๆ ไม่เกิดอะไรขึ้น (Lazy Evaluation) เหมือนการเรียกใช้ Iterator ของ C# ซึ่ง Function ที่เป็น Iterator จะไม่ถูกเรียกให้ทำงาน จนกว่าจะเกิดการใช้งานข้อมูลนั้นจริงๆ (Deferred Execution)
  • RefCount - เป็นการทำให้ Observable ของเรา นั้น นับจำนวน Observer อัตโนมัติ (Reference Counting) โดยถ้าเรากำหนด RefCount(1) แปลว่า ทันทีที่มีใครก็ตามมา Subscribe ตัว Observable ที่เราเขียน มันจะเริ่มทำงาน จากนั้น พอไม่มีใครรับฟังอีกแล้ว ฟังก์ชั่นที่เรา Return ไป ตอนที่สร้าง Observable ก็จะถูกเรียกให้ทำงาน เป็นการปิดการทำงานของ Observable แบบสวยๆ (ในตัวอย่างคือ ที่ตั้งค่าให้ stop = true)

    แน่นอนว่าเราจะใส่ 2,3,4 ก็ได้ ก็คือ ต้องมีจำนวน Observer อย่างน้อยตามจำนวนนั้น ตัว Observable จึงจะเริ่มทำงาน และพอน้อยกว่านั้น ก็จบการทำงาน
  • Replay - เป็นการทำให้ Observable ของเรา ทำการ Replay ค่าให้ คนที่มารับฟังเรา แต่มาช้าไป เช่น ถ้าเราใส่ว่า Replay(10) คนที่มาทีหลัง หลังจากเราส่งค่าไปแล้ว จนถึงค่าที่ 11 ก็จะได้รับค่าที่ 2,3,4…11 อัตโนมัติ รวมไปถึงสามารถกำหนดเป็นช่วงเวลาก็ได้ เช่น ถ้าต่อเข้ามาแล้ว จะได้รับค่าทั้งหมด ที่เกิดขึ้นในช่วง 1 วินาทีที่ผ่านมา

    จริงๆ แล้ว นี่ก็คือ ReplaySubject หรือ BehaviorSubject (Replay(1)) นั่นเอง

และนอกจากนี้ยังมี Debounce - ป้องกันไม่ให้ Obserable บอกค่าถี่เกินไป กำหนดเวลาได้ว่าจะได้ค่าทุกกี่วินาที, Distinct - ป้องกันค่าซ้ำ, Buffer รวมค่าเป็นก้อนใหญ่แล้วเรียกทีเดียว เนื่องจากการส่งค่ามันเป็นการเรียก Function ถ้าเรียกบ่อยเกินมันมี Overhead - จะต่างจาก Debounce เพราะว่าเราจะได้ค่าครบทุกค่า แต่ Debounce จะ Discard ค่าออกไป

จะเห็นว่า Rx นั้นแก้ปัญหาเกือบทั้งหมดที่เราเจอ จากการประมวลผล Event เลยทีเดียว ผมนึกคำพูดของคนที่มา Present ตอนที่ผมไปฟังในงานได้เลย เขาบอกว่า Reactive Extension ที่เรียกว่า Rx เพราะว่ามันเป็นยาที่มาแก้ปัญหาทั้งหมดที่เราเจอ!

การนำ Operator มาใช้งาน

เพื่อให้เห็นภาพขึ้น เรามาดู Class TimeStopWhenNoOneCares ที่เขียนแบบ Rx กัน (โค๊ดอันนี้ สำหรับ RoslynPad)

#r "nuget:System.Reactive/5.0.0"
using System.Reactive.Linq;
 
public class TimeStopsWhenNoOneCares
{
    private IObservable<long> _Interval;
    private long _Time;
 
    public IDisposable ObserveTime(Action<long> subscriber)
    {
        if (_Interval == null)
        {
            _Interval = Observable.Create<long>(observer => {
 
                bool stop = false;
 
                Task.Run(() => {
 
                    while (!stop)
                    {
                        observer.OnNext(_Time);
 
                        Task.Delay(1000).Wait();
                        _Time++;
                    }
                });
 
                // return function to call when observable was being destroyed
                return () =>
                {
                    _Interval = null;
                    stop = true;
                };
 
            }).Replay(1).RefCount(1);
 
        }
 
        return _Interval.Subscribe(subscriber);
    }
}
 
TimeStopsWhenNoOneCares t = new();
var observer = t.ObserveTime(time => {
 
    Console.WriteLine(time);
 
});
 
Console.ReadLine();
 
observer.Dispose();
 
Console.WriteLine("Time stop");
Console.ReadLine();
 
Console.WriteLine("Time continue from last value");
 
t.ObserveTime(time => {
 
    Console.WriteLine(time);
 
});
 
Console.ReadLine();

ดูแบบ Gist ทางนี้ https://gist.github.com/nantcom/28db47215ec3d6afcf84dfa550ac0b59
 

ตอนที่มันทำงาน จะเห็นว่า เมื่อเรา Dispose Observer ออก เวลาก็จะหยุดเดิน และพอ Subscribe เข้าไปใหม่ เวลาก็เดินต่อ แบบที่เราต้องการ

จุดสังเกต:

  • โค๊ดทั้งหมด อยู่เป็นกลุ่มเดียวกัน จะต่างจากโค๊ดชุดแรก ที่ เรื่องนี้ กระจายอยู่หลายที่
  • เราไม่ต้องเปิดให้คนมาเรียกใช้งานคลาสเรา มองเห็น Observable ของเราเลย เราเป็นคนบริหารจัดการเอง
  • ในการเรียกใช้ Operator เราแค่ใช้ จุด ต่อท้ายไป ผสมกันได้ อย่างในตัวอย่างคือ เราให้ .Replay(1).RefCount(1) ได้เลย

    ระวัง! ด้วยเหตุบางประการ เราผสม Publish() ไปด้วยกับ .Replay(1) ไม่ได้นะ ช่วงที่ยังไม่ค่อยเข้าใจ ไปสั่ง .Publish().Replay(1).AutoConnect() คือเข้าใจว่า ต้องการให้ Rx แปลง Observable เราเป็น Connectable ก่อน แล้วจากนั้น Replay มัน 1 ปรากฏว่ามันไม่ทำงานเลย โดยไม่มี Error แต่พออ่าน Doc ดีๆ แล้ว จึงได้เก็ทว่า ถ้าเราจะใช้ Replay() นั้น ก็ไม่จำเป็นจะต้องสั่ง Publish ก่อน เพราะ Replay มันทำการแปลง Observable เป็น Connectable Observable ให้แล้ว
  • อันที่จริงแล้ว เราควจให้ Class เราเอง สามารถหยุด Observable เองได้ด้วย โดยเอา stop ไว้เป็นตัวแปรของ Class แล้ว Implement IDisposable จากนั้นสัั่ง stop ในนั้น (แต่ว่าถึงไม่ทำแบบนี้ และคนเรียกใช้งานเรา ทำการ Dispose ตัว Observer ถูกต้อง ตัวเราก็จะหยุดเองอัตโนมัติ เพราะ RefCount(1) จำจัดการให้)

ส่งท้าย

จะเห็นว่า การใช้ Rx นั้น เป็นยาดี ที่มาช่วยแก้ปัญหาของการใช้ Event ได้ดีมาก และถ้าสิ่งที่เราทำ เกี่ยวข้องกับการประมวลผล / ยิง Event ต่อเนื่องกันไป เช่น

  • อ่านค่า Sensor ทุก 1 วินาที หรือ เรียก Service ทุก 5 นาที แล้วรายงานผล
  • รอรับข้อมูลเข้าอย่างต่อเนื่อง ประมวลผล แล้วส่งข้อมูลออกไป
  • เริ่มประมวลผลข้อมูล 100,000 ชิ้น แบบกระจายเป็น 32 Thread โดยไม่ต้องเรียงลำดับ มีอะไรเสร็จก่อน ส่งต่อให้ไปแสดงผล หรือส่งต่อให้ระบบอื่นทันที

สามารถนำเอา Rx มาช่วยทำให้โค๊ดเราอ่านง่ายขึ้น ผิดพลาดน้อยลง และอะไรที่เราต้องบริหารจัดการบ่อยๆ แถมยังทำซ้ำซากกันทุกโปรเจค ได้แก่

  • ถ้าไม่มี Service ต่อเข้ามาฟังข้อมูลเราแล้ว ให้หยุดการทำงาน
  • ข้อมูลซ่้ำกันให้ Discard ทิ้ง
  • ไม่ให้คนรับฟัง ซึ่งส่งข้อมูลไปหา Server อีกทอดหนึ่่ง รันถี่เกินไปกว่า 1 วินาที/ครั้ง เนื่องจากเป็นการ Update ข้อมูลทั้ง Row รันกี่ครั้งต่อวินาทีก็ผลเท่าเดิม

เป็นต้น

ถ้าสนใจอ่านเพิ่มเติม สามารถไปที่เว็บ ReactiveX ก็ได้ แต่ขอคุยโม้ไว้ก่อนเลยว่า ผมเชื่อว่าที่สรุปเป็นโครงคลาสให้เนี่ย น่าจะตอบโจทย์การใช้งานเกือบทั้งหมดแล้วเลยละ~🤩 แค่เลือกเปลี่ยนเอาว่าจะ Replay, Debounce, Refcount เท่าไหร่

อีกจุดที่ผมได้ใช้งานเต็มๆ เลยคือการทำงาน ตาม "State" ประมาณว่า เขียน State Machine เพื่อรองรับการทำงาน เช่น ตอนที่ผมเขียนโปรแกรมเพื่อรองรับ SystemXD ครั้งแรกนั้น ผมจะต้องใช้ Flag เพื่อแยกระหว่าง โหมดการทำงานต่างๆ และใช้การปล่อย/จับ Event สลับกันไป พอทำการ Refactor โค๊ดของ SystemXD เป็น Rx แล้ว ปรากฏว่า การเขียนให้มันเป็น State สามารถทำได้ง่ายขึ้นมาก และอ่านเข้าใจได้มากขึ้นอย่างไม่น่าเชื่อ สำหรับบทความหน้า เลยตั้งใจว่า จะเขียนเล่าให้ฟังถึงการประยุกตใช้ Rx กับการอ่านข้อมูลจาก Serial Port เพื่อใช้ต่อกับหน้าจอ Nextion หรือก็คือหน้าจอ SystemXD ของเรานั่นแหละ โดยผมทำเป็น Library ไว้อยู่บน Github แล้ว ถ้าสนใจ ลองไปส่องโค๊ดดูก่อนได้เลย

ถ้าชอบบทความนี้ก็อย่าลืมแชร์ให้เพื่อนอ่าน หรือกดไลค์ให้เป็นกำลังใจกันด้วยนะ~ 😍

 

 

BLOG

LEVEL51 คือใคร?

เราเป็นบริษัทโน๊ตบุ้คของคนไทย ใช้เครื่องจากโรงงาน CLEVO แบบยี่ห้อดังในต่างประเทศ ที่คุณสามารถเลือกสเปคเองได้เกือบทั้งเครื่อง ถ้าโน๊ตบุ้คและคอมพิวเตอร์ของคุณ คืออุปกรณ์สำคัญในการทำงาน นี่คือเครื่องที่ออกแบบมาสำหรับคุณ

1317
Customers
0
THB 100,000 Builds
49
K
Average Build Price
0
K
Most Valuable Build

Our Government and Universities Customers:

Our Video Production, 3D Design, Software House Customers:

Landscape Design

Our Industrial and Construction Customers:

 

พิเศษเฉพาะคุณ - รับคูปองส่วนลด 2,000 บาท สำหรับการสั่งซื้อเครื่องกับเรา