Streaming reference architecture for ETL with Kafka and Kafka-Connect. You can find more on http://lenses.io on how we provide a unified solution to manage your connectors, most advanced SQL engine for Kafka and Kafka Streams, cluster monitoring and alerting, and more.
Lenses offers SQL (for data browsing and Kafka Streams), Kafka Connect connector management, cluster monitoring and more.
You can find more on lenses.io
A collection of components to build a real time ingestion pipeline.
The following connectors will be deprecated and no longer included in any release from 3.0
Please take a moment and read the documentation and make sure the software prerequisites are met!!
|Connector | Type | Description | Docs | |----------------|--------|-----------------------------------------------------------------------------------------------|------| | AWS S3 | Sink | Copy data from Kafka to AWS S3. | Docs | | AzureDocumentDb| Sink | Copy data from Kafka and Azure Document Db. | Docs | | Bloomberg | Source | Copy data from Bloomberg streams and Kafka. | Docs | | Cassandra | Source | Copy data from Cassandra and Kafka. | Docs | | *Cassandra | Sink | Certified DSE Cassandra, copy data from Kafka to Cassandra. | Docs | | Coap | Source | Copy data from IoT Coap endpoints (using Californium) to Kafka. | Docs | | Coap | Sink | Copy data from Kafka to IoT Coap endpoints.| Docs | | Elastic 6 | Sink | Copy data from Kafka to Elastic Search 6.x w. tcp or http | Docs | | FTP/HTTP | Source | Copy data from FTP/HTTP to Kafka. | Docs | | Hazelcast | Sink | Copy data from Kafka to Hazelcast. | Docs | | HBase | Sink | Copy data from Kafka to HBase. | Docs | | Hive | Source | Copy data from Hive/HDFS to Kafka. | Docs | | Hive | Sink | Copy data from Kafka to Hive/HDFS | Docs | | InfluxDb | Sink | Copy data from Kafka to InfluxDb. | Docs| | Kudu | Sink | Copy data from Kafka to Kudu. | Docs | | JMS | Source | Copy data from JMS topics/queues to Kafka. | Docs | | JMS | Sink | Copy data from Kafka to JMS. | Docs | | MongoDB | Sink | Copy data from Kafka to MongoDB. | Docs | | MQTT | Source | Copy data from MQTT to Kafka. | Docs | | MQTT | Sink | Copy data from Kafka to MQTT. | Docs | | Pulsar | Source | Copy data from Pulsar to Kafka. | Docs | | Pulsar | Sink | Copy data from Kafka to Pulsar. | Docs | | Redis | Sink | Copy data from Kafka to Redis. | Docs | | ReThinkDB | Source | Copy data from RethinkDb to Kafka. | Docs | | ReThinkDB | Sink | Copy data from Kafka to RethinkDb. | Docs | | VoltDB | Sink | Copy data from Kafka to Voltdb. | Docs |
2.1.3
Move to connect-common 2.0.5 that adds complex type support to KCQL
2.1.2
2.1.0
2.0.1
Hive Source
connect.hive.hive.metastoreto
connect.hive.metastore
connect.hive.hive.metastore.uristo
connect.hive.metastore.uris
Fix Elastic start up NPE
Fix to correct batch size extraction from KCQL on Pulsar
2.0.0
Deprecated: * Druid Sink (not scala 2.12 compatible) * Elastic Sink (not scala 2.12 compatible) * Elastic5 Sink(not scala 2.12 compatible) * RabbitMQ (not support and JMS connector can be used)
Redis
Cassandra
ReThinkDB
FTP Source
MQTT Source
1.2.7
Features * MQTT Source
Support dynamic topic names in Kafka from a wildcard subscription.Example: INSERT INTO
$
SELECT * FROM /mqttSourceTopic/+/testIf the MQTT topic is /mqttSourceTopic/A/test this Will result in topics in kafka mqttSourceTopic_A_test
Cassandra (source)
Support for sending JSON formatted message (with string key) to kafka topic.
Sample KCQL would be like:
INSERT INTO SELECT FROM PK WITHFORMAT JSON WITHUNWRAP INCREMENTALMODE= WITHKEY()
This would send field's values as JSON object to the said topic.
Note that in kafka connect properties one needs to set
key.converterand
value.converteras
org.apache.kafka.connect.storage.StringConverter
Added a new INCREMENTALMODE called dsesearchtimestamp that will make a DSE Search queries using Solr instead of a native Cassandra query.
Instead of the native query:
SELECT a, b, c, d FROM keyspace.table WHERE pkCol > ? AND pkCol <= ? ALLOW FILTERING; We will have now the query with Solr on the dsesearchtimestamp INCREMENTALMODE:
SELECT a, b, c, d FROM keyspace.table WHERE solrquery=?; Where the solrquery will be something like this:
pkCol:{2020-03-23T15:02:21Z TO 2020-03-23T15:30:12.989Z]
AzureDocumentDB
Bug fixes
JMS Source
Allow for tasks parallelization and how the connector tasks parallelization is decided.
Changes:
tasks.maxvalue provided if the user
connect.jms.scale.type. Available values are
kcqland
default. If
KCQLis provided it will be based on the number of KCQL statements written, otherwise it will be driven based on the connector
tasks.max
Kudu Sink
Handle null decimal types correctly
Mongo Sink
Handle decimal types
1.2.4 Bug fixes
JMS Source
Ack the JMS messages was not always possible. Also there was an issue with producing the messages to Kafka out of order from the JMS queue. Changes:
1.2.3 Features
Bug fixes * Hive * Fix for writing nested structures to Hive * Improves the code for the async function call to use the CAS
1.2.2 Features
Bug fixes
1.2.1
1.2.0
1.1.0
connect.mongodb.batch.sizeis deprecated
connect.mapping.collection.to.jsonto treat maps, list, sets as json when inserting into Cassandra
connect.rethink.batch.sizeis deprecated
INSERT INTO targetTopic SELECT * FROM mqttTopic ... WITHREGEX=`$THE_REGEX`
connect.elastic.retry.intervalto elastic5 and elastic6
DEFAULT UNSETto be added on insert. Omitted columns from maps default to null. Alternatively, if set
UNSET, pre-existing value will be preserved
connect.cassandra.batch.sizeis deprecated .
1.0.0
0.4.0
ftp.protocolintroduced, either ftp (default) or ftps.
0.3.0
0.2.6
connect.progress.enabledwhich will periodically report log messages processed
connect.documentdb.dbto
connect.documentdb.db
connect.documentdb.database.createto
connect.documentdb.db.create
connect.cassandra.source.kcqlto
connect.cassandra.kcql
connect.cassandra.source.timestamp.typeto
connect.cassandra.timestamp.type
connect.cassandra.source.import.poll.intervalto
connect.cassandra.import.poll.interval
connect.cassandra.source.error.policyto
connect.cassandra.error.policy
connect.cassandra.source.max.retriesto
connect.cassandra.max.retries
connect.cassandra.source.retry.intervalto
connect.cassandra.retry.interval
connect.cassandra.sink.kcqlto
connect.cassandra.kcql
connect.cassandra.sink.error.policyto
connect.cassandra.error.policy
connect.cassandra.sink.max.retriesto
connect.cassandra.max.retries
connect.cassandra.sink.retry.intervalto
connect.cassandra.retry.interval
connect.coap.bind.portto
connect.coap.port
connect.coap.bind.portto
connect.coap.port
connect.coap.bind.hostto
connect.coap.host
connect.coap.bind.hostto
connect.coap.host
connect.mongo.databaseto
connect.mongo.db
connect.mongo.sink.batch.sizeto
connect.mongo.batch.size
connect.druid.sink.kcqlto
connect.druid.kcql
connect.druid.sink.conf.fileto
connect.druid.kcql
connect.druid.sink.write.timeoutto
connect.druid.write.timeout
connect.elastic.sink.kcqlto
connect.elastic.kcql
connect.hbase.sink.column.familyto
connect.hbase.column.family
connect.hbase.sink.kcqlto
connect.hbase.kcql
connect.hbase.sink.error.policyto
connect.hbase.error.policy
connect.hbase.sink.max.retriesto
connect.hbase.max.retries
connect.hbase.sink.retry.intervalto
connect.hbase.retry.interval
connect.influx.sink.kcqlto
connect.influx.kcql
connect.influx.connection.userto
connect.influx.username
connect.influx.connection.passwordto
connect.influx.password
connect.influx.connection.databaseto
connect.influx.db
connect.influx.connection.urlto
connect.influx.url
connect.kudu.sink.kcqlto
connect.kudu.kcql
connect.kudu.sink.error.policyto
connect.kudu.error.policy
connect.kudu.sink.retry.intervalto
connect.kudu.retry.interval
connect.kudu.sink.max.retriesto
connect.kudu.max.reties
connect.kudu.sink.schema.registry.urlto
connect.kudu.schema.registry.url
connect.redis.connection.passwordto
connect.redis.password
connect.redis.sink.kcqlto
connect.redis.kcql
connect.redis.connection.hostto
connect.redis.host
connect.redis.connection.portto
connect.redis.port
connect.rethink.source.hostto
connect.rethink.host
connect.rethink.source.portto
connect.rethink.port
connect.rethink.source.dbto
connect.rethink.db
connect.rethink.source.kcqlto
connect.rethink.kcql
connect.rethink.sink.hostto
connect.rethink.host
connect.rethink.sink.portto
connect.rethink.port
connect.rethink.sink.dbto
connect.rethink.db
connect.rethink.sink.kcqlto
connect.rethink.kcql
connect.jms.userto
connect.jms.username
connect.jms.source.convertersto
connect.jms.converters
connect.jms.convertersand replace my kcql
withConverters
connect.jms.queuesand replace my kcql
withType QUEUE
connect.jms.topicsand replace my kcql
withType TOPIC
connect.mqtt.source.kcqlto
connect.mqtt.kcql
connect.mqtt.userto
connect.mqtt.username
connect.mqtt.hoststo
connect.mqtt.connection.hosts
connect.mqtt.convertersand replace my kcql
withConverters
connect.mqtt.queuesand replace my kcql
withType=QUEUE
connect.mqtt.topicsand replace my kcql
withType=TOPIC
connect.hazelcast.sink.kcqlto
connect.hazelcast.kcql
connect.hazelcast.sink.group.nameto
connect.hazelcast.group.name
connect.hazelcast.sink.group.passwordto
connect.hazelcast.group.password
connect.hazelcast.sink.cluster.memberstp
connect.hazelcast.cluster.members
connect.hazelcast.sink.batch.sizeto
connect.hazelcast.batch.size
connect.hazelcast.sink.error.policyto
connect.hazelcast.error.policy
connect.hazelcast.sink.max.retriesto
connect.hazelcast.max.retries
connect.hazelcast.sink.retry.intervalto
connect.hazelcast.retry.interval
connect.volt.sink.kcqlto
connect.volt.kcql
connect.volt.sink.connection.serversto
connect.volt.servers
connect.volt.sink.connection.userto
connect.volt.username
connect.volt.sink.connection.passwordto
connect.volt.password
connect.volt.sink.error.policyto
connect.volt.error.policy
connect.volt.sink.max.retriesto
connect.volt.max.retries
connect.volt.sink.retry.intervalto
connect.volt.retry.interval
0.2.5 (8 Apr 2017)
withunwrap
timestampin the Cassandra Source for timestamp tracking.
0.2.4 (26 Jan 2017)
SELECT * FROM influx-topic WITHTIMESTAMP sys_time() WITHTAG(field1, CONSTANT_KEY1=CONSTANT_VALUE1, field2,CONSTANT_KEY2=CONSTANT_VALUE1)
ALL. Use
connect.influx.consistency.levelto set it to ONE/QUORUM/ALL/ANY
connect.influx.sink.route.querywas renamed to
connect.influx.sink.kcql
0.2.3 (5 Jan 2017)
Struct,
Schema.STRINGand
Jsonwith schema in the Cassandra, ReThinkDB, InfluxDB and MongoDB sinks.
export.query.routeto
sink.kcql.
import.query.routeto
source.kcql.
STOREASso specify target sink types, e.g. Redis Sorted Sets, Hazelcast map, queues, ringbuffers.
Requires gradle 6.0 to build.
To build
gradle compile
To test
gradle test
To create a fat jar
gradle shadowJar
You can also use the gradle wrapper
./gradlew shadowJar
To view dependency trees
gradle dependencies # or gradle :kafka-connect-cassandra:dependencies
To build a particular project
gradle :kafka-connect-elastic5:build
To create a jar of a particular project:
gradle :kafka-connect-elastic5:shadowJar
We'd love to accept your contributions! Please use GitHub pull requests: fork the repo, develop and test your code, semantically commit and submit a pull request. Thanks!