Need help with fetch?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

47degrees
460 Stars 43 Forks Apache License 2.0 979 Commits 14 Opened issues

Description

Simple & Efficient data access for Scala and Scala.js

Services available

!
?

Need anything else?

Contributors list

Fetch

Join the chat at https://gitter.im/47deg/fetch codecov.io Maven Central License Latest version Scala.js GitHub Issues

A library for Simple & Efficient data access in Scala and Scala.js


Installation

Add the following dependency to your project's build file.

For Scala 2.11.x and 2.12.x:

"com.47deg" %% "fetch" % "1.3.2"

Or, if using Scala.js (0.6.x):

"com.47deg" %%% "fetch" % "1.3.2"

Remote data

Fetch is a library for making access to data both simple and efficient. Fetch is especially useful when querying data that has a latency cost, such as databases or web services.

Define your data sources

To tell Fetch how to get the data you want, you must implement the

DataSource
typeclass. Data sources have
fetch
and
batch
methods that define how to fetch such a piece of data.

Data Sources take two type parameters:

  1. Identity is a type that has enough information to fetch the data
  2. Result is the type of data we want to fetch
import cats.data.NonEmptyList
import cats.effect.Concurrent

trait DataSource[F[_], Identity, Result]{ def data: Data[Identity, Result] def CF: Concurrent[F] def fetch(id: Identity): F[Option[Result]] def batch(ids: NonEmptyList[Identity]): F[Map[Identity, Result]] }

Returning

Concurrent
instances from the fetch methods allows us to specify if the fetch must run synchronously or asynchronously, and use all the goodies available in
cats
and
cats-effect
.

We'll implement a dummy data source that can convert integers to strings. For convenience, we define a

fetchString
function that lifts identities (
Int
in our dummy data source) to a
Fetch
.
import cats._
import cats.data.NonEmptyList
import cats.effect._
import cats.implicits._

import fetch._

def latency[F[_] : Concurrent](milis: Long): F[Unit] = Concurrent[F].delay(Thread.sleep(milis))

object ToString extends Data[Int, String] { def name = "To String"

def source[F[_] : Concurrent]: DataSource[F, Int, String] = new DataSource[F, Int, String]{ override def data = ToString

override def CF = Concurrent[F]

override def fetch(id: Int): F[Option[String]] = for {
  _  [${Thread.currentThread.getId}] One ToString $id"))
  _  [${Thread.currentThread.getId}] Batch ToString $ids"))
  _  (i, i.toString)).toMap

} }

def fetchString[F[_] : Concurrent](n: Int): Fetch[F, String] = Fetch(n, ToString.source)

Creating a runtime

Since

Fetch
relies on
Concurrent
from the
cats-effect
library, we'll need a runtime for executing our effects. We'll be using
IO
from
cats-effect
to run fetches, but you can use any type that has a
Concurrent
instance.

For executing

IO
, we need a
ContextShift[IO]
used for running
IO
instances and a
Timer[IO]
that is used for scheduling. Let's go ahead and create them. We'll use a
java.util.concurrent.ScheduledThreadPoolExecutor
with a couple of threads to run our fetches.
import java.util.concurrent._
import scala.concurrent.ExecutionContext

val executor = new ScheduledThreadPoolExecutor(4) val executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)

implicit val timer: Timer[IO] = IO.timer(executionContext) implicit val cs: ContextShift[IO] = IO.contextShift(executionContext)

Creating and running a fetch

Now that we can convert

Int
values to
Fetch[F, String]
, let's try creating a fetch.
def fetchOne[F[_] : Concurrent]: Fetch[F, String] =
  fetchString(1)

Let's run it and wait for the fetch to complete. We'll use

IO#unsafeRunTimed
for testing purposes, which will run an
IO[A]
to
Option[A]
and return
None
if it didn't complete in time:
import scala.concurrent.duration._

Fetch.runIO.unsafeRunTimed(5.seconds) // --> [236] One ToString 1 //

As you can see in the previous example, the

ToStringSource
is queried once to get the value of 1.

Batching

Multiple fetches to the same data source are automatically batched. For illustrating this, we are going to compose three independent fetch results as a tuple.

def fetchThree[F[_] : Concurrent]: Fetch[F, (String, String, String)] =
  (fetchString(1), fetchString(2), fetchString(3)).tupled

When executing the above fetch, note how the three identities get batched, and the data source is only queried once.

Fetch.run[IO](fetchThree).unsafeRunTimed(5.seconds)
// --> [236] Batch ToString NonEmptyList(1, 2, 3)
// 

Note that the

DataSource#batch
method is not mandatory. It will be implemented in terms of
DataSource#fetch
if you don't provide an implementation.
object UnbatchedToString extends Data[Int, String] {
  def name = "Unbatched to string"

def source[F[_] : Concurrent] = new DataSource[F, Int, String] { override def data = UnbatchedToString

override def CF = Concurrent[F]

override def fetch(id: Int): F[Option[String]] = 
  CF.delay(println(s"--> [${Thread.currentThread.getId}] One UnbatchedToString $id")) >>
  latency(100) >>
  CF.delay(println(s">
  CF.pure(Option(id.toString))

} }

def unbatchedString[F[_] : Concurrent](n: Int): Fetch[F, String] = Fetch(n, UnbatchedToString.source)

Let's create a tuple of unbatched string requests.

def fetchUnbatchedThree[F[_] : Concurrent]: Fetch[F, (String, String, String)] =
  (unbatchedString(1), unbatchedString(2), unbatchedString(3)).tupled

When executing the above fetch, note how the three identities get requested in parallel. You can override

batch
to execute queries sequentially if you need to.
Fetch.run[IO](fetchUnbatchedThree).unsafeRunTimed(5.seconds)
// --> [236] One UnbatchedToString 1
// --> [237] One UnbatchedToString 2
// --> [238] One UnbatchedToString 3
// 

Parallelism

If we combine two independent fetches from different data sources, the fetches can be run in parallel. First, let's add a data source that fetches a string's size.

object Length extends Data[String, Int] {
  def name = "Length"

def source[F[_] : Concurrent] = new DataSource[F, String, Int] { override def data = Length

override def CF = Concurrent[F]

override def fetch(id: String): F[Option[Int]] = for {
  _  [${Thread.currentThread.getId}] One Length $id"))
  _  [${Thread.currentThread.getId}] Batch Length $ids"))
  _  (i, i.size)).toMap

} }

def fetchLength[F[_] : Concurrent](s: String): Fetch[F, Int] = Fetch(s, Length.source)

And now we can easily receive data from the two sources in a single fetch.

def fetchMulti[F[_] : Concurrent]: Fetch[F, (String, Int)] =
  (fetchString(1), fetchLength("one")).tupled

Note how the two independent data fetches run in parallel, minimizing the latency cost of querying the two data sources.

Fetch.run[IO](fetchMulti).unsafeRunTimed(5.seconds)
// --> [239] One Length one
// --> [236] One ToString 1
// 

Deduplication & Caching

The Fetch library supports deduplication and optional caching. By default, fetches that are chained together will share the same cache backend, providing some deduplication.

When fetching an identity twice within the same

Fetch
, such as a batch of fetches or when you
flatMap
one fetch into another, subsequent fetches for the same identity are cached. Let's try creating a fetch that asks for the same identity twice, by using
flatMap
(in a for-comprehension) to chain the requests together:
def fetchTwice[F[_] : Concurrent]: Fetch[F, (String, String)] = for {
  one 

While running it, notice that the data source is only queried once. The next time the identity is requested, it's served from the internal cache.

val runFetchTwice = Fetch.run[IO](fetchTwice)
runFetchTwice.unsafeRunTimed(5.seconds)
// --> [237] One ToString 1
// 

This will still fetch the data again, however, if we call it once more:

scala
runFetchTwice.unsafeRunTimed(5.seconds)
// --> [239] One ToString 1
// 

If we want to cache between multiple individual fetches, you should use

Fetch.runCache
or
Fetch.runAll
to return the cache for reusing later. Here is an example where we fetch four separate times, and explicitly share the cache to keep the deduplication functionality:
//We get the cache from the first run and pass it to all subsequent fetches
val runFetchFourTimesSharedCache = for {
  (cache, one) 
runFetchFourTimesSharedCache.unsafeRunTimed(5.seconds)
// --> [238] One ToString 1
// 

As you can see above, the cache will now work between calls and can be used to deduplicate requests over a period of time. Note that this does not support any kind of automatic cache invalidation, so you will need to keep track of which values you want to re-fetch if you plan on sharing the cache.


For more in-depth information, take a look at our documentation.

Copyright

Fetch is designed and developed by 47 Degrees

Copyright (C) 2016-2021 47 Degrees. http://47deg.com

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.