Akka Stream Extensions

We are proud to opensource

extending Typesafe Akka-Stream.

The main purpose of this project is to:

  1. Develop generic

    not provided out-of-the-box by Akka-Stream.
  2. Make those structures very well tested & production ready.

  3. Study/evaluate streaming concepts based on Akka-Stream & other technologies (AWS, Postgres, ElasticSearch, ...).

We have been developing this library in the context of MFG Labs for our production projects after identifying a few primitive structures that were common to many use-cases, not provided by Akka-Stream out of the box and not so easy to implement in a robust way.


Scaladoc is available there.

Add resolvers to your

resolvers += Resolver.bintrayRepo("mfglabs", "maven")

Add dependencies to your

Currently depends on

libraryDependencies += "com.mfglabs" %% "akka-stream-extensions" % "0.11.2"

Changelog here



// Source from a paginated REST Api val pagesStream: Source[Page, ActorRef] = SourceExt .bulkPullerAsync(0L) { (currentPosition, downstreamDemand) => val futResult: Future[Seq[Page]] = WSService.get(offset = currentPosition, nbPages = downstreamDemand) { case Nil => Nil -> true // stop the stream if the REST Api delivers no more results case p => p -> false } }

someBinaryStream .via(FlowExt.rechunkByteStringBySeparator(ByteString("\n"), maximumChunkBytes = 5 * 1024)) .map(_.utf8String) .via( FlowExt.customStatefulProcessor(Vector.empty[String])( // grouping by 100 except when we encounter a "flush" line (acc, line) => { if (acc.length == 100) (None, acc) else if (line == "flush") (None, acc :+ line) else (Some(acc :+ line), Vector.empty) }, lastPushIfUpstreamEnds = acc => acc ) )

Many more helpers, check the Scaladoc!

Postgres extension

This extension provides tools to stream data from/to Postgres.


libraryDependencies += "com.mfglabs" %% "akka-stream-extensions-postgres" % "0.11.2"


Test only

Pull all docker images launched by the tests

docker pull postgres:8.4
docker pull postgres:9.6



implicit val pgConnection = PgStream.sqlConnAsPgConnUnsafe(sqlConnection) implicit val blockingEc = ExecutionContextForBlockingOps(someEc)

PgStream .getQueryResultAsStream( "select a, b, c from table", options = Map("FORMAT" -> "CSV") ) .via(FlowExt.rechunkByteStringBySeparator(ByteString("\n"), maximumChunkBytes = 5 * 1024))

someLineStream .via(PgStream.insertStreamToTable( "schema", "table", options = Map("FORMAT" -> "CSV") ))

Elasticsearch extension


libraryDependencies += "com.mfglabs" %% "akka-stream-extensions-elasticsearch" % "0.11.2"


import org.elasticsearch.client.Client
import org.elasticsearch.index.query.QueryBuilders

implicit val blockingEc = ExecutionContextForBlockingOps(someEc) implicit val esClient: Client = // ...

EsStream .queryAsStream( QueryBuilders.matchAllQuery(), index = "index", type = "type", scrollKeepAlive = 1 minutes, scrollSize = 1000 )

Shapeless extension

This extension allows to build at compile-time a fully typed-controlled flow that transforms a HList of Flows to a Flow from the Coproduct of inputs to the Coproduct of outputs.

For more details on the history of this extension, read this article.


libraryDependencies += "com.mfglabs" %% "akka-stream-extensions-shapeless" % "0.11.2"


// 1 - Create a type alias for your coproduct
type C = Int :+: String :+: Boolean :+: CNil

// The sink to consume all output data val sink = Sink.foldSeq[C], C(_ :+ _)

// 2 - a sample source wrapping incoming data in the Coproduct val f = GraphDSL.create(sink) { implicit builder => sink => import GraphDSL.Implicits._ val s = Source.fromIterator(() => Seq( CoproductC, CoproductC, CoproductC, CoproductC, CoproductC, CoproductC, CoproductC ).toIterator)

// 3 - our typed flows val flowInt = Flow[Int].map{i => println("i:"+i); i} val flowString = Flow[String].map{s => println("s:"+s); s} val flowBool = Flow[Boolean].map{s => println("s:"+s); s}

// >>>>>> THE IMPORTANT THING // 4 - build the coproductFlow in a 1-liner val fr = builder.add(ShapelessStream.coproductFlow(flowInt :: flowString :: flowBool :: HNil)) // <<<<<< THE IMPORTANT THING

// 5 - plug everything together using akkastream DSL s ~> fr.out ~> sink ClosedShape }

// 6 - run it RunnableGraph.fromGraph(f).run().futureValue.toSet should equal (Set( CoproductC, CoproductC, CoproductC, CoproductC, CoproductC, CoproductC, CoproductC ))


Check our project MFG Labs/commons-aws also providing streaming extensions for Amazon S3 & SQS.


To test postgres-extensions, you need to have Docker installed and running on your computer (the tests will automatically launch a docker container with a Postgres db).


MFG Labs sponsored the development and the opensourcing of this library.

We hope this library will be useful & interesting to a few ones and that some of you will help us debug & build more useful structures.

So don't hesitate to contribute


This software is licensed under the Apache 2 license, quoted below.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License here.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

