none
Advice re database polling RRS feed

  • Question

  • Hi I have some code to do some polling of a database using Rx. I lifted it from Lee Campbell's solution over on SO (so kudos to him - https://stackoverflow.com/questions/33714408/database-polling-with-reactive-extensions), but I've added a bit which I'm not sure quite "does" the Rx style and I'm wondering if I'm mixing pushing and pulling (or perhaps sync and async)!

    So I'm simply checking a table for new entries and expose those new entries as an IObservable to which subscribers can subscribe. I've been doing DB access via async and I am bringing back data into an enumerable and then converting to an Observable. I am using the Task.Result to get this, but I feel I may be able to stay in the async world throughout. What is the right/best way to go about this? 

    Secondly, this "poller" will get instantiated when an entry (or entries) get added to the table and will terminate when all the entries has been processed and no new entries has subsequently been added. Hence I have a state variable (see _hasEntries) which is keeping track of this based on the IEnumerable count. The instantiating class will terminate once this gets set to false. However, I'm really not sure that this is the right way to be doing this at all. Again - how should I be doing this?

    I'm not at all experienced with Rx and "getting" the mindset isn't coming easy! I'd be really grateful if someone could check my code and offer the proper ways I should be thinking about and doing this.

    Many thx IA.

    Here's the code.

    public class DbMonitor : IMonitor
        {
            private const string EventSql = @"SELECT ID, EventID, Data FROM [dbo].[TaskEvents] WHERE NotifiedTime IS NULL";
            private const int PollingPeriodSecs = 30;
            private const int QueryTimeoutSecs = 10;
    
            private readonly string _cx = "";        
            private readonly TimeSpan _pollingPeriod; 
            private readonly TimeSpan _queryTimeout;
            private bool _hasEntries = true;
    
            internal DbMonitor(string cx) : this(cx, PollingPeriodSecs, QueryTimeoutSecs) { }
            internal DbMonitor(string cx, int pollingPeriod, int queryTimeout)
            {
                _cx = cx;            
                _pollingPeriod = TimeSpan.FromSeconds(pollingPeriod);
                _queryTimeout = TimeSpan.FromSeconds(queryTimeout);
            }
    
    
            bool IMonitor.IsProcessing() => _hasEntries;
            
            IObservable<INotification> IMonitor.Notifications()
            {         
               
                //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
                var rxQueryTimeOut = _pollingPeriod + _queryTimeout;
    
                var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
                
                return Observable
                            .Timer(_pollingPeriod, scheduler)
                            .SelectMany(_ => DatabaseQuery(_cx, EventSql).ToObservable())
                            .Timeout(rxQueryTimeOut, Observable.Return(TimeOut.Notification()), scheduler)
                            .Retry()    //Loop on errors
                            .Repeat();  //Loop on success
                
            }
                       
            public async Task<IEnumerable<INotification>> DatabaseQuery(string cx, string sql)
            {
                List<INotification> events = new List<INotification>();
                
                using (SqlConnection connection = new SqlConnection(cx))
                {
                    await connection.OpenAsync();
                    //events = connection.Query<Event>(_sql);
    
                    // Cast the XML into NVarChar to enable GetTextReader - trying to use GetTextReader on an XML type will throw an exception
                    using (SqlCommand command = new SqlCommand(sql, connection))
                    {
    
                        // The reader needs to be executed with the SequentialAccess behavior to enable network streaming
                        // Otherwise ReadAsync will buffer the entire Xml Document into memory which can cause scalability issues or even OutOfMemoryExceptions
                        using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
                        {                        
                            while (await reader.ReadAsync())
                            {
                                events.Add(EventBuilder(reader));
                            }
                        }
                    }
                }
    
                _hasEntries = events.Any();
    
                return events;
            }
            
            private Event EventBuilder(SqlDataReader reader) =>
                new Event
                {
                    ID = int.Parse(reader[0].ToString()),
                    EventID = reader[1].ToString(),                
                    Data = reader[2].ToString()
                };
            
        

    oh yeah ... this bit as well

    internal static class Extensions
        {
            public static IObservable<T> ToObservable<T>(this Task<IEnumerable<T>> items) =>
                items.Result.ToObservable();
    
            public static IObservable<T> ToObservable<T>(this IEnumerable<T> items) =>
                Observable.Create<T>(o =>
                {
                    foreach (var itm in items)
                    {
                        o.OnNext(itm);
                    }
    
                    return Disposable.Create(() => { });
                });
        }

    EDIT0: Just noticed the frequency of posts to this forum - has interest in Rx dropped off?

    EDIT1: So I've mod'ed it slightly and I hope it is a bit more Rx-ey but would still appreciate correction. I'm now returning different types of notification via the stream. Thus I have an Error notification, a Timeout notification as well as a "No Notifications" notification! So the salient bit of my code looks like this now

    private IObservable<INotification> Poller() =>                    
                Observable
                    .Timer(_pollingPeriod, _scheduler)                
                    .SelectMany(_ => NewEvents(_cx, EventSql))                  
                    .Timeout(_pollingPeriod + _queryTimeout, Observable.Return(TimeOut.Notification()), _scheduler) //Rx query timeout after the expected silence of the timer, and then further maximum silence.           
                    .Catch<INotification, Exception>(err => Observable.Return(Error.Notification(err))) //.Retry()    //Loop on errors                
                    .Repeat();  //Loop on success
    
    
    private IObservable<INotification> NewEvents(string cx, string sql)
    {
        try
        {
            return Common.SqlRead<INotification>(cx, sql, sdr => EventBuilder(sdr), Empty.Notification());
        }
        catch (Exception ex)
        {
            throw ex;
        }
    }
    
    internal static IObservable<T> SqlRead<T>(string cx, string sql, Func<SqlDataReader, T> mapper, T noRows) =>
                Observable.Create<T>(o =>
                {
                    using (var conn = new SqlConnection(cx))
                    {
                        conn.Open();
                        using (var cmd = new SqlCommand(sql, conn))
                        {                        
                            using (var rdr = cmd.ExecuteReader())
                            {
                                if (!rdr.HasRows)
                                {
                                    o.OnNext(noRows);
                                }
                                else
                                {
                                    while (rdr.Read())
                                    {
                                        o.OnNext(mapper(rdr));
                                    }
                                }
                            }
                        }
                    }
                    o.OnCompleted();
                    return Disposable.Empty;
                });
    • Edited by Simon Woods Tuesday, April 16, 2019 3:29 PM tidy up
    Monday, April 15, 2019 3:31 PM