Migrating from Promises to Observables in Grafana Plugins

Recently, I decided to write a Grafana datasource plugin that would collect data from the NASDAQ Data Link website and graph it. It was a good way to learn a bit more JS, React and add some new data to a dashboard I am building.
So as I started following the Datasource plugin tutorial and writing my plugin, I noticed the following error in my code:

The function datasourceRequest is marked as deprecated.

The function datasourceRequest() is deprecated in favor of the fetch() function. fetch() however, returns an RxJS Observable and not a Promise. Luckily, the prototype for the query() function allows us to return an Observable instead of a Promise if we want:

/**
   * Query for data, and optionally stream results
   */
  abstract query(request: DataQueryRequest<TQuery>): Promise<DataQueryResponse> | Observable<DataQueryResponse>;

There isn’t much documentation on how to migrate from Promises to Observables in this situation, and being fairly new to it, the solution wasn’t immediately obvious to me. So, after a bit of reading, this is how I changed it.

Promise Based Example

This is what we start with (full code on Github):

// Async because it returns a promise  
async doRequest(path: string, apiParams: Record<string, any> | undefined = undefined): Promise<FetchResponse> {
    const result = getBackendSrv().datasourceRequest({
      method: "GET",
      url: this.instanceUrl + "/quandlApi" + path,
      params: apiParams
    });

    // We don't need to do anything to the response, we simply return the promise 
    //  that we get from the datasourceRequest function
    return result;
  }

// Async because we again return a promise
async query(options: DataQueryRequest<MyQuery>): Promise<DataQueryResponse> {

    // Process each query individually....
    const promises = options.targets.map((query) => {
            // Take the info in the query and get it ready for the request
            // not shown
            // ......

            // Call doRequest, and provide a handler to the Promise through then()
            // Return the result.
            return this.doRequest(apiPath, Object.fromEntries(apiParams)).then((r) => {
              // Start building the dataframe
              let df = new MutableDataFrame({
                refId: query.refId,
                fields: [],
              })
  
              // More dataframe building
              // ......
              for(const r of dataset_data.data) {
                df.appendRow(r);
              }
              
              // Return the Dataframe
              return df; 
            })
    });

    // Merge all the promises from all the queries into one big promise 
    //   and return it. 
    return Promise.all(promises).then((data) => ({data}));
}

Doing the same thing using Observables – My First (Naïve?) Approach

RxJS has some good documentation on how to use Observables here. It discusses the primary difference, which is that the callback function can return multiple updates instead of just the one that a Promise can return.

Essentially, when you want to do something long running, you create an Observable object and provide it with a function that does the task and notifies the subscriber when the result is available.

To kick off the execution, you subscribe to the Observable and provide it with 3 call backs:

  • next() – Handle the result of the long running request. Can happen multiple times
  • error() – Handle an error in execution
  • complete() – Do something when the Observable notifies that the task is complete.

In this case, we need to wait for the raw data to come back from the api and then process it, and Grafana needs to wait for the processed data. So we are working with two nested Observables.

First we build the Observable that we are going to return to Grafana.

   const queryObservable = new Observable<DataQueryResponse>((subscriber) => {

In the closure we pass to the new Observable, we call doRequest() and subscribe to the returned Observable (the one that notifies us when raw data is available) providing an object with 3 closures (next(), error(), and complete()).

        let response: Observable<FetchResponse> =  this.doRequest(apiPath, Object.fromEntries(apiParams));
        let respSubscriber = response.subscribe({
          next(r) { 
            //console.log(`Response for query ${query.refId}`);
            //console.log(r);

            if(r.status !== 200) {
              subscriber.error(`Unexpected HTTP Response from API: ${r.status} - ${r.statusText}`);
              return;
            }

// Do something with the raw data

Once the processing is done, our Observable notifies Grafana about it using subscriber.next().

            // Alert the subscriber that we have new formatted data. 
            subscriber.next( {data: [df] } ); 

When the fetch Observable calls complete(), we do the same:

          complete() { 
            // Once we are done reading the response, we can call it complete here too. 
            subscriber.complete(); 
            respSubscriber.unsubscribe()
          }

doRequest() is basically the same as before, but we specify an Observable instead of a Promise as the return type and drop the async keyword. Leaving async in throws an error because it requires the return type to be a Promise.

  doRequest(path: string, apiParams: Record<string, any> | undefined = undefined): Observable<FetchResponse> {
    const result = getBackendSrv().fetch({
      method: "GET",
      url: this.instanceUrl + "/quandlApi" + path,
      params: apiParams
    });

    return result;
  }

Finally, we return our Observable back to the map function to be added to the Observables array and use merge() to combine them to be returned to Grafana.

     return queryObservable;
    });

    // The query function only returns one observable. we use merge to combine them all?
    return merge(...observableResponses);

And that’s that. We have reworked the query() function to use Observables.

Using Pipe and Map

As I was doing further reading on the topic, I came across arguments that nested Observables were not the best approach, and that pipe and operators like map() were the correct approach. This actually simplifies things quite a bit.

RxJS Documentation on Operators

First we move all of the processing out of the outer Observable and into its own function. See the signature of that function below:

// Process responses from a timeseries api call. 
// We need to receive the refId as well, because we add it to the data frame. 
handleTimeSeriesResponse(r: FetchResponse, refId: string): DataQueryResponse {

Next we use the pipe() and map() operator to process any data coming from the inner Observable. The result is an Observable that can be given back to the query function just like before.

// Use pipe to map a processing functon to the observable returned by doRequest. 
// We need to add a parameter to the function, so we make a closure that calls the function with both the response and the refId.
return this.doRequest(apiPath, Object.fromEntries(apiParams)).pipe(map(resp => this.handleTimeSeriesResponse(resp, query.refId)));

The full code I used in the Nasdaq Data Link data source for all three scenarios can be found here:

Leave a Reply

Your email address will not be published. Required fields are marked *