This article is compatible with the latest version of Silverlight.
This is part 2 of the series “Reactive Extensions in Silverlight”.
1. Introduction
In the first part of the article, I tried to describe what is RX Framework and what can be done with it. One thing I’ve missed to mention was how RX could be used for asynchronous service calls. Namely, this is the accent in the second part of the article. For those who missed the first part, you could read it here.
The demo source code could be downloaded from here.
2. Using RX for Asynchronous Service Calls
To understand how the RX Framework could be used for asynchronous calls to a web service, let’s examine a typical situation in a Silverlight application. Probably every one of you knows how to consume a Silverlight-enabled WCF Service from a Silverlight client application. E.g.: Add service reference; since all calls to a service should be asynchronous, you need an asynchronous method that starts the communication and an event which is raised when the call completes, like in the example below:
EventHandler<GetProductsCompletedEventArgs> temp = null;
temp = ( s, a ) =>
{
serviceClient.GetProductsCompleted -= temp;
};
serviceClient.GetProductsCompleted += temp;
serviceClient.GetProductsAsync();
Now, let’s see how this can be handled by the RX Framework. If you have read the previous part of the article, you should probably know that each event could be turned into an observable, and after that used as any regular object.
IObservable<IEvent<GetProductsCompletedEventArgs>> observable =
Observable.FromEvent<GetProductsCompletedEventArgs>(
serviceClient, "GetProductsCompleted" );
Once you have an observable, you could subscribe to it in the following fashion:
observable.Subscribe(
p =>
{
if ( p.EventArgs.Error == null )
{
foreach ( string product in p.EventArgs.Result )
{
lbProducts.Items.Add( product );
}
}
} );
serviceClient.GetProductsAsync();
Now, what we have here is a RX equivalent of the IAsync pattern for service calls. One common mistake (bug), when making calls to web services is to forget to detach from the XXXCompleted event. So in the last example (the RX example), I do this nowhere. The problem could be solved quite easy using RX. Each time the Subscribe () method is called, it returns an IDisposable object, that can be used to unsubscribe from the observer.
Another possible way for doing this is to use the Take (), which returns a specified number of contiguous values from the start of an observable sequence. For the “count” parameter you just pass 1. The handler is unhooked implicitly after the event is received, and I think it is pretty elegant.
IObservable<IEvent<GetProductsCompletedEventArgs>> observable =
Observable.FromEvent<GetProductsCompletedEventArgs>(
serviceClient, "GetProductsCompleted" ).Take( 1 );
observable.Subscribe(
p =>
{
if ( p.EventArgs.Error == null )
{
foreach ( string product in p.EventArgs.Result )
{
lbProducts.Items.Add( product );
}
}
} );
serviceClient.GetProductsAsync();
3. Using Extension Methods
Let’s slightly modify the example by creating an extension method. If you are using RX, you could create extension methods for each event you want to observe. For example, consider the next static class, with extension method that exposes the UIElement’s MouseLeftButtonDown event.
public static class UIElementExtensions
{
public static IObservable<IEvent<MouseButtonEventArgs>> GetMouseLeftButtonDown(
this UIElement uiElement )
{
return Observable.FromEvent<MouseButtonEventArgs>(
uiElement, "MouseLeftButtonDown" );
}
}
Then you could use it in the following manner.
this.LayoutRoot.GetMouseLeftButtonDown().Subscribe(.....);
We could do the same for our wcf web service client. Create a new static class with an extension method that exposes the service client XXXComplete event.
public static class WebServiceClientExtensions
{
public static IObservable<IEvent<GetProductsCompletedEventArgs>> DownloadProducts(
this SampleWcfServiceClient serviceClient )
{
return Observable.FromEvent<GetProductsCompletedEventArgs>(
serviceClient, "GetProductsCompleted" ).Take( 1 );
}
}
And use it in the same manner.
serviceClient.DownloadProducts().Subscribe(
p =>
{
if ( p.EventArgs.Error == null )
{
foreach ( string product in p.EventArgs.Result )
lbProducts.Items.Add( product );
}
} );
serviceClient.GetProductsAsync();
The previous code could be further enhanced. For that purpose, I’ll create a new class named AnonymousObservable<T>, which implements the IObservable<T>. The code is taken straight from the AnonymousObservable<T> class in the System.Collections.Generic namespace in the System.Reactive.dll assembly. However, the class is marked there as internal and could not be used in our application. That’s why I need to reproduce it.
public class AnonymousObservable<T> : IObservable<T>
{
private Func<IObserver<T>, IDisposable> subscribe;
public AnonymousObservable( Func<IObserver<T>, IDisposable> subscribe )
{
this.subscribe = subscribe;
}
public IDisposable Subscribe( IObserver<T> observer )
{
return subscribe( observer );
}
}
Next, I’ll create a second version of the DownloadProduct () extension method. This time it will return an instance of the generic AnonymousObservable class. The rest of the code is pretty standard, turning the GetProductsCompleted event into an observable and subscribing to it.
public static class WebServiceClientExtensions
{
public static IObservable<string> DownloadProducts_v2(
this SampleWcfServiceClient serviceClient )
{
return new AnonymousObservable<string>(
observer =>
{
IObservable<IEvent<GetProductsCompletedEventArgs>> observable =
Observable.FromEvent<GetProductsCompletedEventArgs>
( serviceClient, "GetProductsCompleted" ).Take( 1 );
IDisposable disposable = observable.Subscribe(
p =>
{
if ( p.EventArgs.Error != null )
observer.OnError( p.EventArgs.Error );
else if ( p.EventArgs.Cancelled == false )
{
//observer.OnError( new Exception( "Test Exception" ) );
foreach ( string product in p.EventArgs.Result )
observer.OnNext( product );
}
},
ex =>
{
// If there is an exception, throw it until it is caught.
observer.OnError( ex );
},
() =>
{
observer.OnCompleted();
} );
return disposable;
} );
}
}
Now, see how easy the web service will be consumed using IObservable.
serviceClient.DownloadProducts_v2().Subscribe
( p =>
{
lbProducts.Items.Add( p );
},
ex =>
{
lbProducts.Items.Add( ex.Message );
},
() =>
{
lbProducts.Items.Add( "----Completed----" );
} );
serviceClient.GetProductsAsync();
And finally, just for a test, lets modify the extension method and call the OnError() method instead of OnNext().
As you can see, the error is propagated to the UI client, where it is caught. And the Completed event of the observable is never raised.
So that’s it, you could download the source code from here and play with it. The demo used in this example is located in the Demo6 folder. If you have missed the first part of the article, you could read it here.