A replicated Akka Persistence journal backed by Apache Kafka
Replicated Akka Persistence journal and snapshot store backed by Apache Kafka.
To include the Kafka plugins into your
sbtproject, add the following lines to your
build.sbtfile:
resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"libraryDependencies += "com.github.krasserm" %% "akka-persistence-kafka" % “0.4”
This version of
akka-persistence-kafkadepends on Kafka 0.8.2.1, Akka 2.3.11 and is cross-built against Scala 2.10.4 and 2.11.6. A complete list of released versions is here.
Kafka does not permanently store log entries but rather deletes them after a configurable retention time which defaults to 7 days in Kafka 0.8.x. Therefore, applications need to take snapshots of their persistent actors at intervals that are smaller than the configured retention time (for example, every 3 days). This ensures that persistent actors can always be recovered successfully.
Alternatively, the retention time can be set to a maximum value so that Kafka will never delete old entries. In this case, all events written by a single persistent actor must fit on a single node. This is a limitation of the current implementation which may be removed in later versions. However, this limitation is likely not relevant when running Kafka with default (or comparable) retention times and taking snapshots.
The latest snapshot of a persistent actor is never deleted if log compaction is enabled. See also section Configuration hints for details how to properly configure Kafka for being used with the storage plugins.
To activate the journal plugin, add the following line to
application.conf:
akka.persistence.journal.plugin = "kafka-journal"
This will run the journal plugin with default settings and connect to a Zookeeper instance running on
localhost:2181. The Zookeeper connect string can be customized with the
kafka-journal.zookeeper.connectconfiguration key (see also section Kafka cluster). Recommended Kafka broker configurations are given in section Configuration hints.
For each persistent actor, the plugin creates a Kafka topic where the topic name equals the actor's
persistenceId(only if it contains alphanumeric,
.,
-or
_characters, otherwise, all other characters are replaced by
_). Events published to these topics are serialized
akka.persistence.PersistentReprobjects (see journal plugin API). Serialization of
PersistentReprobjects can be customized. Journal topics are mainly intended for internal use (for recovery of persistent actors) but can also be consumed externally.
The journal plugin can also publish events to user-defined topics. By default, all events generated by all persistent actors are published to a single
eventstopic. This topic is intended for external consumption only. Events published to user-defined topics are serialized
Eventobjects
package akka.persistence.kafka/**
data
.where
datais the actual event written by a persistent actor (by calling
persistor
persistAsync),
sequenceNris the event's sequence number and
persistenceIdthe id of the persistent actor.
Eventobjects are serialized with a protobuf serializer and event
dataserialization can be customized with a user-defined serializer in the same way as for journal topics. Custom serializer configurations always apply to both, journal topics and user-defined topics.
For publishing events to user-defined topics the journal plugin uses an
EventTopicMapper:
package akka.persistence.kafka/**
The default mapper is
DefaultEventTopicMapperwhich maps all events to the
eventstopic. It is configured in the reference configuration as follows:
kafka-journal.event.producer.topic.mapper.class = "akka.persistence.kafka.DefaultEventTopicMapper"
To customize the mapping of events to user-defined topics, applications can implement and configure a custom
EventTopicMapper. For example, in order to publish
ato topics
topic-a-1and
topic-a-2and
bto topic
topic-b
and to turn of publishing of events from all other actors, one would implement the following
ExampleEventTopicMapper
package akka.persistence.kafka.exampleclass ExampleEventTopicMapper extends EventTopicMapper { def topicsFor(event: Event): Seq[String] = event.persistenceId match { case "a" => List("topic-a-1", "topic-a-2") case "b" => List("topic-b") case _ => Nil }
and configure it in
application.conf:
kafka-journal.event.producer.topic.mapper.class = "akka.persistence.kafka.example.ExampleEventTopicMapper"
To turn off publishing events to user-defined topics, the
EmptyEventTopicMappershould be configured.
kafka-journal.event.producer.topic.mapper.class = "akka.persistence.kafka.EmptyEventTopicMapper"
The following example shows how to consume
Events from a user-defined topic with name
topic-a-2(see previous example) using Kafka's high-level consumer API:
import java.util.Propertiesimport akka.persistence.kafka.{EventDecoder, Event}
import kafka.consumer.{Consumer, ConsumerConfig} import kafka.serializer.StringDecoder
val props = new Properties() props.put("group.id", "consumer-1") props.put("zookeeper.connect", "localhost:2181") // ...
val system = ActorSystem("consumer")
val consConn = Consumer.create(new ConsumerConfig(props)) val streams = consConn.createMessageStreams(Map("topic-a-2" -> 1), keyDecoder = new StringDecoder, valueDecoder = new EventDecoder(system))
streams("topic-a-2")(0).foreach { mm => val event: Event = mm.message println(s"consumed ${event}") }
Applications may also consume serialized
PersistentReprobjects from journal topics and deserialize them with Akka's serialization extension:
import java.util.Propertiesimport akka.actor._ import akka.persistence.PersistentRepr import akka.serialization.SerializationExtension
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig} import kafka.serializer.{DefaultDecoder, StringDecoder}
val props = new Properties() props.put("group.id", "consumer-2") props.put("zookeeper.connect", "localhost:2181") // ...
val system = ActorSystem("example") val extension = SerializationExtension(system)
val consConn = Consumer.create(new ConsumerConfig(props)) val streams = consConn.createMessageStreams(Map("a" -> 1), keyDecoder = new StringDecoder, valueDecoder = new DefaultDecoder)
streams("a")(0).foreach { mm => val persistent: PersistentRepr = extension.deserialize(mm.message, classOf[PersistentRepr]).get println(s"consumed ${persistent}") }
There are many other libraries that can be used to consume (event) streams from Kafka topics, such as Spark Streaming, to mention only one example.
PersistentReprentries to partition 0 of journal topics. This ensures that all events written by a single persistent actor are stored in correct order. Later versions of the plugin may switch to a higher partition after having written a configurable number of events to the current partition.
Evententries to all available partitions of user-defined topics. The partition key is the event's
persistenceIdso that a partial ordering of events is preserved when consuming events from user-defined topics. In other words, events written by a single persistent actor are always consumed in correct order but the relative ordering of events from different persistent actors is not defined.
The complete source code of all examples from previous sections is in Example.scala, the corresponding configuration in example.conf.
To activate the snapshot store plugin, add the following line to
application.conf:
akka.persistence.snapshot-store.plugin = "kafka-snapshot-store"
This will run the snapshot store plugin with default settings and connect to a Zookeeper instance running on
localhost:2181. The Zookeeper connect string can be customized with the
kafka-snapshot-store.zookeeper.connectconfiguration key (see also section Kafka cluster). Recommended Kafka broker configurations are given in section Configuration hints.
For each persistent actor, the plugin creates a Kafka topic where the topic name equals the actor's
persistenceId, prefixed by the value of the
kafka-snapshot-store.prefixconfiguration key which defaults to
snapshot-. For example, if an actor's
persistenceIdis
example, its snapshots are published to topic
snapshot-example. For persistent views, the
viewIdis taken instead of the
persistenceId.
To connect to an existing Kafka cluster, an application must set a value for the
kafka-journal.zookeeper.connectkey in its
application.conf:
kafka-journal.zookeeper.connect = ":,:,..."
If you want to run a Kafka cluster on a single node, you may find this article useful.
To use the test server, the following additional dependencies must be added to
build.sbt:
libraryDependencies ++= Seq( "com.github.krasserm" %% "akka-persistence-kafka" % "0.4" % "test" classifier "tests", "org.apache.curator" % "curator-test" % "2.7.1" % "test" )
This makes the
TestServerclass available which can be used to start a single Kafka and Zookeeper instance:
import akka.persistence.kafka.server.TestServer// start a local Kafka and Zookeeper instance val server = new TestServer()
// use the local instance // ...
// and stop it server.stop()
The
TestServerconfiguration can be customized with the
test-server.*configuration keys (see reference configuration for details).
The following broker configurations are recommended for being used with the storage plugins:
num.partitionsshould be set to
1by default because the plugins only write to partition 0 of journal topics and snapshot topics. If a higher number of partitions is needed for user-defined topics (e.g. for scalability or throughput reasons) then this should be configured manually with the
kafka-topicscommand line tool.
default.replication.factorshould be set to at least
2for high-availability of topics created by the plugins.
message.max.bytesand
replica.fetch.max.bytesshould be set to a value that is larger than the largest snapshot size. The default value is
1024 * 1024which may be large enough for journal entries but likely to small for snapshots. When changing these settings make sure to also set
kafka-snapshot-store.consumer.fetch.message.max.bytesand
kafka-journal.consumer.fetch.message.max.bytesto this value.
log.cleanup.policymust be set to
"compact"otherwise the most recent snapshot may be deleted if the retention time is exceeded and complete state recovery of persistent actors is not possible any more.
See also section Usage hints.
kafka-journal {FQCN of the Kafka journal plugin
class = "akka.persistence.kafka.journal.KafkaJournal"
Dispatcher for the plugin actor
plugin-dispatcher = "kafka-journal.default-dispatcher"
Number of concurrent writers (should be <= number of available threads in
dispatcher).
write-concurrency = 8
The partition to use when publishing to and consuming from journal topics.
partition = 0
Default dispatcher for plugin actor.
default-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-max = 8 } }
consumer { # ------------------------------------------------------------------- # Simple consumer configuration (used for message replay and reading # metadata). # # See http://kafka.apache.org/documentation.html#consumerconfigs # See http://kafka.apache.org/documentation.html#simpleconsumerapi # -------------------------------------------------------------------
socket.timeout.ms = 30000 socket.receive.buffer.bytes = 65536 fetch.message.max.bytes = 1048576
}
producer { # ------------------------------------------------------------------- # PersistentRepr producer (to journal topics) configuration. # # See http://kafka.apache.org/documentation.html#producerconfigs # # The metadata.broker.list property is set dynamically by the journal. # No need to set it here. # -------------------------------------------------------------------
request.required.acks = 1 # DO NOT CHANGE! producer.type = "sync" # DO NOT CHANGE! partitioner.class = "akka.persistence.kafka.StickyPartitioner" # DO NOT CHANGE! key.serializer.class = "kafka.serializer.StringEncoder" # Increase if hundreds of topics are created during initialization. message.send.max.retries = 5 # Increase if hundreds of topics are created during initialization. retry.backoff.ms = 100 # Add further Kafka producer settings here, if needed. # ...
}
event.producer { # ------------------------------------------------------------------- # Event producer (to user-defined topics) configuration. # # See http://kafka.apache.org/documentation.html#producerconfigs # -------------------------------------------------------------------
producer.type = "sync" request.required.acks = 0 topic.mapper.class = "akka.persistence.kafka.DefaultEventTopicMapper" key.serializer.class = "kafka.serializer.StringEncoder" # Add further Kafka producer settings here, if needed. # ...
}
zookeeper { # ------------------------------------------------------------------- # Zookeeper client configuration # -------------------------------------------------------------------
connect = "localhost:2181" session.timeout.ms = 6000 connection.timeout.ms = 6000 sync.time.ms = 2000
} }
kafka-snapshot-store {
FQCN of the Kafka snapshot store plugin
class = "akka.persistence.kafka.snapshot.KafkaSnapshotStore"
Dispatcher for the plugin actor.
plugin-dispatcher = "kafka-snapshot-store.default-dispatcher"
The partition to use when publishing to and consuming from snapshot topics.
partition = 0
Topic name prefix (which prepended to persistenceId)
prefix = "snapshot-"
Default dispatcher for plugin actor.
default-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-max = 8 } }
consumer { # ------------------------------------------------------------------- # Simple consumer configuration (used for loading snapshots and # reading metadata). # # See http://kafka.apache.org/documentation.html#consumerconfigs # See http://kafka.apache.org/documentation.html#simpleconsumerapi # -------------------------------------------------------------------
socket.timeout.ms = 30000 socket.receive.buffer.bytes = 65536 fetch.message.max.bytes = 1048576
}
producer { # ------------------------------------------------------------------- # Snapshot producer configuration. # # See http://kafka.apache.org/documentation.html#producerconfigs # # The metadata.broker.list property is set dynamically by the journal. # No need to set it here. # -------------------------------------------------------------------
request.required.acks = 1 producer.type = "sync" # DO NOT CHANGE! partitioner.class = "akka.persistence.kafka.StickyPartitioner" # DO NOT CHANGE! key.serializer.class = "kafka.serializer.StringEncoder" # Add further Kafka producer settings here, if needed. # ...
}
zookeeper { # ------------------------------------------------------------------- # Zookeeper client configuration # -------------------------------------------------------------------
connect = "localhost:2181" session.timeout.ms = 6000 connection.timeout.ms = 6000 sync.time.ms = 2000
} }
test-server {
-------------------------------------------------------------------
Test Kafka and Zookeeper server configuration.
See http://kafka.apache.org/documentation.html#brokerconfigs
-------------------------------------------------------------------
zookeeper {
port = 2181 dir = "data/zookeeper"
}
kafka {
broker.id = 1 port = 6667 num.partitions = 2 log.cleanup.policy = "compact" log.dirs = data/kafka log.index.size.max.bytes = 1024
} }
akka { actor { serializers { kafka-snapshot = "akka.persistence.kafka.snapshot.KafkaSnapshotSerializer" }
serialization-bindings { "akka.persistence.kafka.snapshot.KafkaSnapshot" = kafka-snapshot }
} }