Getting started with the Pipelines client

Introduction

The Pipelines client is a higher-level client, built on the HTTP client, that makes it easy to process multiple documents in parallel either continuously as they arrive or as a one-off batch. This client is best when you need to handle documents arriving as part a business process, or when you need to process a large number of documents in a document management system or archive.

This client exposes a pipeline model where you specify:

  • A Document Source responsible for fetching the documents to be processed from your system (a file system, cloud storage, database, etc.)
  • A Pipeline which defines the operations that should be run for each document, such as document classification, data extraction, any other Aluma operations and your own logic
  • A Document Sink responsible for writing data back to your system

You can find samples demonstrating use of this client in the .NET client libraries repository (see Overview).

Installing the Pipelines client

The Pipelines client is available as a NuGet package and can be installed from their using one of these methods:

Using the NuGet Package Manager

Install-Package Aluma.Pipelines

Using the dotnet CLI

dotnet add package Aluma.Pipelines

Creating a class to represent a document

First we'll need to create a class which represents a document system that is to be processed by the pipeline. This has two responsibilities - storing a document identifier which you can use later to write results back to the correct location, and providing a method by which the pipeline can access the document's contents when it is ready to process the document.

Note: If you are processing files on disk you can just use the FileSystemDocument class in the Aluma.Pipelines.Extensions.DocumentSources.FileSystem extension package.

Our class must inherit from Aluma.Pipelines.Document, call the base class constructor with a document identifier and implement the OpenStream() method, like this:

public class Document : Aluma.Pipelines.Document
{
    public Document(string sourceDocumentId) : base(sourceDocumentId)
    {
    }

    public override Task<Stream> OpenStream()
    {        
        // This method is only called once the pipeline is about to upload the document to Aluma.        
        // The stream is closed by the pipeline once it has been used.
        // Create your stream and return it here.
    }
}

Creating a document source

Now we can create a document source class responsible for fetching the documents to be processed from your system.

At this point we have a choice - we can either:

  • Create a document source that will get a finite batch of documents from our system and make them available to the pipeline as it has capacity, or
  • Create a document source that will continuously monitor our system for documents that need processing and make them available to the pipeline as they arrive.

For a finite batch of documents

For processing a finite batch of documents, our document source must inherit from RunOnceDocumentSource and be typed using the document class we just created. We override the GetDocuments() method and return an enumerable of documents to be processed.

public class BatchDocumentSource : RunOnceDocumentSource<Document>
{
    protected override IEnumerable<Document> GetDocuments()
    {
        // Replace this code with code that interrogates your system to discover
        // which documents require processing, using a query against a database for example,
        // and return an IEnumerable<Document> with one <Document> for each document to be processed.        
    }
}

For monitoring a system continuously

If instead we want to monitor our system and process any documents that arrive, our document source should inherit from RunContinuouslyDocumentSource and call this base class's AddDocument() method for any document that should be processed by the pipeline.

It doesn't matter what the internal structure of this class is - that will be determined by whether you can hook into an event that your system fires when a document needs processing, or if you need to poll your system to check.

public class MonitoringDocumentSource : RunContinuouslyDocumentSource<Document>
{
    ...
    public async Task MonitorAsync(CancellationToken cancellationToken)
    {
         ...              
          // Either poll for new documents periodically, or hook into an event that
          // your system fires when new documents need processing.

          // Add the document to the pipeline
          AddDocument(new Document(internalId));
          ...
    }
}

Creating a document sink

We also need a document sink class that will be responsible for writing data back to our system.

This must inherit from DocumentSink and also be typed using the document class we created. We override the ProcessDocumentResults method. This has two parameters. The first is an AlumaDocument that contains results from Aluma extraction & classification operations. The second is the document object our document source fed to the pipeline, so we can determine the document's identifier in our system.

public class DocumentSink : DocumentSink<Document>
{
   ...
    protected override void ProcessDocumentResults(AlumaDocument alumaDocument, Document document)
    {
        // Write results for this document back to our system.
    }
}

Building and running the pipeline

We're ready to build our pipeline now.

First we create our pipeline with an API client ID and secret. You can create a new API client in the Aluma dashboard if you don't already have one.

var cts = new CancellationTokenSource();
var pipeline = await AlumaApi.CreatePipelineAsync(new AlumaOptions
    {
        ClientId = MyAPICredentials.ID,
        ClientSecret = MyAPICredentials.Secret
    }, 
    cts.Token);

Then we pass the pipeline our document source and define the operations that should be run for each document, such as document classification, data extraction, any other Aluma operations and your own logic. We build these operations in sequence using a Fluent API.

var documentSource = new DocumentSource();
var documentSink = new DocumentSink();

await pipeline
.WithDocumentsFrom(documentSource)   // <-- Provide our document source
.ExtractWith("aluma.invoices.gb")    // <-- Equivalents are available for other Aluma operations
.Then(document =>                    // <-- We can run any other code we like using Then()
{
    documentSink.OnProcessingComplete(document); // <-- Write results back to our system
})
.OnDocumentError(error =>
{
    Log.Error(error.Exception, error.ToString());
})
.OnPipelineCompleted(() =>
{
    Log.Information("Pipeline completed");
})
.RunAsync(cancellationToken);

When we run the pipeline, the behaviour of the RunAsync() method depends on what sort of document source we passed it:

  • Document sources inheriting from RunOnceDocumentSource cause RunAsync() to blocks until all documents have been processed and then return.
  • Document sources inheriting from RunContinuouslyDocumentSource cause RunAsync() to block and not return until the cancellation token is cancelled.

It's good practice to put this whole code block including the RunAsync() method in a try...catch block and catch anyPipelineException that might be thrown if an error occurs that the pipeline cannot recover from. Transient issues like network blips between your server and the Aluma service are automatically retried and handled for you.