Getting started with the Pipelines client
Introduction
The Pipelines client is a higher-level client, built on the HTTP client, that makes it easy to process multiple documents in parallel either continuously as they arrive or as a one-off batch. This client is best when you need to handle documents arriving as part a business process, or when you need to process a large number of documents in a document management system or archive.
This client exposes a pipeline model where you specify:
- A Document Source responsible for fetching the documents to be processed from your system (a file system, cloud storage, database, etc.)
- A Pipeline which defines the operations that should be run for each document, such as document classification, data extraction, any other Aluma operations and your own logic
- A Document Sink responsible for writing data back to your system
You can find samples demonstrating use of this client in the .NET client libraries repository (see Overview).
Installing the Pipelines client
The Pipelines client is available as a NuGet package and can be installed from their using one of these methods:
Using the NuGet Package Manager
Install-Package Aluma.Pipelines
Using the dotnet CLI
dotnet add package Aluma.Pipelines
Creating a class to represent a document
First we'll need to create a class which represents a document system that is to be processed by the pipeline. This has two responsibilities - storing a document identifier which you can use later to write results back to the correct location, and providing a method by which the pipeline can access the document's contents when it is ready to process the document.
Note: If you are processing files on disk you can just use the FileSystemDocument
class in the Aluma.Pipelines.Extensions.DocumentSources.FileSystem
extension package.
Our class must inherit from Aluma.Pipelines.Document
, call the base class constructor with a document identifier and implement the OpenStream()
method, like this:
public class Document : Aluma.Pipelines.Document
{
public Document(string sourceDocumentId) : base(sourceDocumentId)
{
}
public override Task<Stream> OpenStream()
{
// This method is only called once the pipeline is about to upload the document to Aluma.
// The stream is closed by the pipeline once it has been used.
// Create your stream and return it here.
}
}
Creating a document source
Now we can create a document source class responsible for fetching the documents to be processed from your system.
At this point we have a choice - we can either:
- Create a document source that will get a finite batch of documents from our system and make them available to the pipeline as it has capacity, or
- Create a document source that will continuously monitor our system for documents that need processing and make them available to the pipeline as they arrive.
For a finite batch of documents
For processing a finite batch of documents, our document source must inherit from RunOnceDocumentSource
and be typed using the document class we just created. We override the GetDocuments()
method and return an enumerable of documents to be processed.
public class BatchDocumentSource : RunOnceDocumentSource<Document>
{
protected override IEnumerable<Document> GetDocuments()
{
// Replace this code with code that interrogates your system to discover
// which documents require processing, using a query against a database for example,
// and return an IEnumerable<Document> with one <Document> for each document to be processed.
}
}
For monitoring a system continuously
If instead we want to monitor our system and process any documents that arrive, our document source should inherit from RunContinuouslyDocumentSource
and call this base class's AddDocument()
method for any document that should be processed by the pipeline.
It doesn't matter what the internal structure of this class is - that will be determined by whether you can hook into an event that your system fires when a document needs processing, or if you need to poll your system to check.
public class MonitoringDocumentSource : RunContinuouslyDocumentSource<Document>
{
...
public async Task MonitorAsync(CancellationToken cancellationToken)
{
...
// Either poll for new documents periodically, or hook into an event that
// your system fires when new documents need processing.
// Add the document to the pipeline
AddDocument(new Document(internalId));
...
}
}
Creating a document sink
We also need a document sink class that will be responsible for writing data back to our system.
This must inherit from DocumentSink
and also be typed using the document class we created. We override the ProcessDocumentResults
method. This has two parameters. The first is an AlumaDocument
that contains results from Aluma extraction & classification operations. The second is the document object our document source fed to the pipeline, so we can determine the document's identifier in our system.
public class DocumentSink : DocumentSink<Document>
{
...
protected override void ProcessDocumentResults(AlumaDocument alumaDocument, Document document)
{
// Write results for this document back to our system.
}
}
Building and running the pipeline
We're ready to build our pipeline now.
First we create our pipeline with an API client ID and secret. You can create a new API client in the Aluma dashboard if you don't already have one.
var cts = new CancellationTokenSource();
var pipeline = await AlumaApi.CreatePipelineAsync(new AlumaOptions
{
ClientId = MyAPICredentials.ID,
ClientSecret = MyAPICredentials.Secret
},
cts.Token);
Then we pass the pipeline our document source and define the operations that should be run for each document, such as document classification, data extraction, any other Aluma operations and your own logic. We build these operations in sequence using a Fluent API.
var documentSource = new DocumentSource();
var documentSink = new DocumentSink();
await pipeline
.WithDocumentsFrom(documentSource) // <-- Provide our document source
.ExtractWith("aluma.invoices.gb") // <-- Equivalents are available for other Aluma operations
.Then(document => // <-- We can run any other code we like using Then()
{
documentSink.OnProcessingComplete(document); // <-- Write results back to our system
})
.OnDocumentError(error =>
{
Log.Error(error.Exception, error.ToString());
})
.OnPipelineCompleted(() =>
{
Log.Information("Pipeline completed");
})
.RunAsync(cancellationToken);
When we run the pipeline, the behaviour of the RunAsync()
method depends on what sort of document source we passed it:
- Document sources inheriting from
RunOnceDocumentSource
causeRunAsync()
to blocks until all documents have been processed and then return. - Document sources inheriting from
RunContinuouslyDocumentSource
causeRunAsync()
to block and not return until the cancellation token is cancelled.
It's good practice to put this whole code block including the RunAsync()
method in a try...catch block and catch anyPipelineException
that might be thrown if an error occurs that the pipeline cannot recover from. Transient issues like network blips between your server and the Aluma service are automatically retried and handled for you.
Updated about 2 years ago