As is often the case, with great power comes great responsibility. While frameworks like Akka shield developers from writing potentially error prone multithreaded code, providing safety through its turn-based concurrency model, problems like unbounded resource usage can arise manifesting itself in buffer or mailbox overflows or worse a giant bill from a cloud vendor.
Handling concurrent processing workflows requires careful consideration to ensure that the resources of a server are used efficiently, and systems stay resilient particularly when processing massive volumes of data. While the creators and the community behind Akka have promoted patterns to address these sort of problems (e.g. Work Pulling Pattern to throttle and prevent mailbox overflow) an even better answer exists.
Akka Streams
Reactive Streams is an attempt to provide a standard that addresses these kinds of problems. In particular, it aims at creating a blueprint for handling streams of data in a controlled way, such that a fast data source does not overwhelm stages downstream. An integral part of this model is the concept of back-pressure, the ability for consumers to communicate to the producers upstream that need to slow down and do so in an asynchronous way. That last part is critical as doing so synchronously would negate the benefits of asynchronous processing, making it much less adoptable across the most demanding, highly distributed environments.
It so happens that Akka with its actor based model turned out to be a great foundation on which to implement this specification. The actual relationship is a bit less direct as Akka Streams implementation uses Reactive Streams interfaces internally to pass data around while the Akka Streams API is geared towards end-users providing a nice set of abstractions through operators and connectors from which one can compose simple flows and complex graphs.
Akka Streams API
Streams API is quite concise and expressive allowing you to compose your logic out of existig operatators and connectors. As these components have been tested by the team and a broader community and a suite of unit tests, it relievs you from having to write, test and maintain code that you would have to otherwise.
Let’s look at an example of building a simple connected graph first.
Stream.From methods provide the simplest way to get started. For example, this stream will emit 100 elements of type int.
Source.From(Enumerable.Range(0, 99));
Another common way of feeding data into a source is by subscribing to a message broker such as RabbitMQ, Kafka, Amazon SQS and the Alpakka project provides a set of connectors that allow you to easily do that.
var amqpSink = AmqpSink.CreateSimple(
AmqpSinkSettings
.Create(connectionSettings)
.WithRoutingKey(queueName)
.WithDeclarations(queueDeclaration));
You can also expose an IActorRef actor that will forward messages onto the stream
var source = Source.ActorRef<IActorType>(100, OverflowStrategy.DropHead);
source.Tell(message);
To build connected graphs, you connect Sources to Flows and Sinks. A Flow will have exactly one input and output allowing you to connect to sources and flows or sinks for downstream processing.
var flow = Flow.Create<int>().Select(x => x * 2);
Finally, Sinks will have exactly one input.
var sink = Sink.ForEach<RepositoryPage>(Console.WriteLine);
Connecting it all together creates a graph that can be run with a materializer, which is the engine that turns your description into a runnable thing (more on materialization here).
using var sys = ActorSystem.Create("system");
using var mat = sys.Materializer();
source.Via(flow).RunWith(sink, mat);
Now that we have seen the basics of building simple programs, let’s explore the API a bit further by looking at the canonical Reactive Tweets sample ported over to the latest release of .NET 6. The sample subscribes to Twitter's streams API to query for tweets matching a particular location - that is the query part. The results are then pushed into the stream for processing. In this case, the IActorRef is provided to feed elements into the stream.
var tweetSource = Source.ActorRef<ITweet>(100, OverflowStrategy.DropHead);
...
stream.MatchingTweetReceived += (_, arg) => actor.Tell(arg.Tweet);
Another common way to push data into the stream is to use connectors provided by the Alpakka project such as Kafka, RabbitMQ or some of the Azure and AWS message brokers. Processing is performed during two parallel stages (called fan-out) that formats and counts the authors simultaneously. GraphDSL provides a nice fluent API for building more complex flows and graphs.
var graph = GraphDsl.Create(countauthors, writeSink, (notUsed, _) => notUsed, (b, count, write) =>
{
var broadcast = b.Add(new Broadcast<ITweet>(2));
var output = b.From(broadcast.Out(0)).Via(formatFlow);
b.From(broadcast.Out(1)).Via(count).To(write);
return new FlowShape<ITweet, string>(broadcast.In, output.Out);
});
As a final stage, the sink will output the results to the console using the Foreach operator.
var actor = tweetSource.Via(graph).To(writeSink).Run(mat);
Note how reusable the elements of the graph are. You can easily direct output to another sink keeping the rest of the graph intact. Similarly, if you wanted to add another stage to your processing logic or increase the level of parallelism you can do so without requiring changes across sources and sinks. This is refered to as Supreme Compositionality and is one of the foundational design principles of Akka Streams.