How to Fast Process Text Files Using Reactive .NET?
Processing text files is something we’ve all done as programmers. However, manipulating large (multi-GB) files raises an issue: How can you do it as quickly as possible, and without loading up the entire file in the memory?
Text files usually have a header (one, to several lines), followed by some lines of data. Processing that file would generate another text file, with each input line resulting in one or more output lines.
In this short article, I’m going to explain how to process such files using the Rx.NET library. The entire example project is available at Bitbucket. Here is how to solve this task.
First, load a file from a URL and return an IObservable<string>
, with each string being an individual line. I have split this into the following classes:
The WebLoader
class is responsible for opening a resource (a local or remote file) and returning it as a stream. I extracted this as a separate class, instead of just using the Open
method in the RemoteTextLoader
, because I wanted to be able to write self-contained unit tests, without accessing the network.
The RemoteTextLoader
class will take that stream and return it as an observable of strings, with each string being a separate line. You can see the main code below; loader
is an instance of the WebLoader
class, and the ReadLoop
method simply returns one line at a time.
public IObservable<string> Read(string url)
{
return Observable.Using(
() => new StreamReader(loader.Open(url)),
sr => ReadLoop(sr).ToObservable(scheduler));
}
The RemoteTextLoader.Read
method ensures that the StreamReader
and its underlying stream are closed when the observable has completes with the Observable.Using
call. This is important to prevent resource leaks. The call to the private ReadLoop
method will run on the given scheduler; we don’t want to block the calling thread.
Finally, the WebStream
class is responsible for closing not only the input stream but also the web request, also to prevent leaking resources.
Then, after the input file has been turned into a (reactive) stream of lines, that stream must be split into several streams and each of those processed in parallel on a different thread.
The ProducerConsumerSplitter
class handles the first part; its Split
method takes an input observable and a count and returns an array of observables.
public class ProducerConsumerSplitter<T> : Splitter<T>
{
public IObservable<T>[] Split(IObservable<T> observable, int count)
{
// block if the collection grows past (thread count * 10) items
var collection = new BlockingCollection<T>(count * 10);
observable.Subscribe(collection.Add, () => collection.CompleteAdding());
return Enumerable
.Range(0, count)
.Select(_ => CreateConsumer(collection))
.ToArray();
}
//
private static IObservable<T> CreateConsumer(BlockingCollection<T> collection)
{
return Observable.Create<T>(o =>
{
while (!collection.IsCompleted)
{
T item;
if (collection.TryTake(out item))
o.OnNext(item);
}
o.OnCompleted();
return Disposable.Empty;
});
}
}
The splitter uses a producer-consumer pattern to split one observable into many. The producer part subscribes to the input observable and starts adding its items to the collection, blocking if the number of items is larger than an arbitrarily chosen limit of ten times the number of output observables. This way, the consumer observables aren’t starved of items to process, and we don’t fill up the memory with a multi-GB input file either.
The consumers simply read items from the collection until IsCompleted
returns true
, which occurs once the collection is empty and there are no more items waiting to be added to it. Note that you should not use the IsAddingCompleted
method here since it gets set to true
as soon as the producer has finished adding items to the collection, even if there are still items to be processed.
The actual processing on multiple threads is done in the ParallelProcessor
class, which delegates loading and splitting to the previous classes and then calls a LineProcessor
to turn each input line into one or more output lines.
public IObservable<string> Process(string url, int headerSize, LineProcessor lineProcessor)
{
var lines = loader.Read(url);
// need to use Publish here because otherwise the stream will be enumerated twice
return lines.Publish(shared =>
{
var header = shared.Take(headerSize).ToArray();
var rest = shared.Skip(headerSize);
var streams = splitter.Split(rest, ThreadCount);
// using SubscribeOn instead of ObserveOn because processing starts immediately when subscribing
return header
.SelectMany(h => streams
.Select(stream => ProcessLines(stream, h, lineProcessor)
.SubscribeOn(scheduler)))
.Merge();
});
}
This class is responsible for loading the file, processing each line (using as many threads as requested) and then merging the results into a single observable.
Finally, the main program writes the result to an output.txt
file and also measures the time spent processing the entire file. The processing done by the tester program is rather simplistic, so no doubt the real code would be slower, but I’m getting close to one million lines processed per second on my machine, which is encouraging.
Contributors
Marcel Popescu
Marcel is a senior developer with over 20 years of experience. He prefers back-end development, is great with algorithms, and prides himself on well-designed code. He has written an introductory book on TDD and is currently mentoring several junior programmers.
Show More