DBeam exports SQL tables into Avro files using JDBC and Apache Beam
A connector tool to extract data from SQL databases and import into GCS using Apache Beam.
This tool is runnable locally, or on any other backend supported by Apache Beam, e.g. Cloud Dataflow.
DEVELOPMENT STATUS: Beta, with production usage since August 2017.
DBeam is tool based that reads all the data from single SQL database table, converts the data into Avro and stores it into appointed location, usually in GCS. It runs as a single threaded Apache Beam pipeline.
DBeam requires the database credentials, the database table name to read, and the output location to store the extracted data into. DBeam first makes a single select into the target table with limit one to infer the table schema. After the schema is created the job will be launched which simply streams the table contents via JDBC into target location as Avro.
--passwordFile) or an external KMS encrypted password file (
--passwordFileKmsEncrypted)
--partitionColumnparameter
--skipPartitionCheck)
DirectRunnerand
DataflowRunner)
com.spotify.dbeam.options.DBeamPipelineOptions:--connectionUrl= The JDBC connection url to perform the export. --password= Plaintext password used by JDBC connection. --passwordFile= A path to a file containing the database password. --passwordFileKmsEncrypted= A path to a file containing the database password, KMS encrypted and base64 encoded. --sqlFile= A path to a file containing a SQL query (used instead of --table parameter). --table= The database table to query and perform the export. --username= Default: dbeam-extractor The database user name used by JDBC to authenticate.
com.spotify.dbeam.options.OutputOptions:
--output= The path for storing the output. --dataOnly= Default: false Store only the data files in output folder, skip queries, metrics and metadata files.
com.spotify.dbeam.options.JdbcExportPipelineOptions: Configures the DBeam SQL export
--avroCodec= Default: deflate6 Avro codec (e.g. deflate6, deflate9, snappy). --avroDoc= The top-level record doc string of the generated avro schema. --avroSchemaFilePath= Path to file with a target AVRO schema. --avroSchemaName= The name of the generated avro schema. By default it uses the table name. --avroSchemaNamespace= Default: dbeam_generated The namespace of the generated avro schema. --exportTimeout= Default: P7D Export timeout, after this duration the job is cancelled and the export terminated. --fetchSize= Default: 10000 Configures JDBC Statement fetch size. --limit= Limit the output number of rows, indefinite by default. --minPartitionPeriod= The minimum partition required for the job not to fail (when partition column is not specified),by default
now() - 2*partitionPeriod
. --partition= The date/timestamp of the current partition. --partitionColumn= The name of a date/timestamp column to filter data based on current partition. --partitionPeriod= The period frequency which the export runs, used to filter based on current partition and also to check if exports are running for too old partitions. --preCommand= SQL commands to be executed before query. --queryParallelism= Max number of queries to run in parallel for exports. Single query used if nothing specified. Should be used with splitColumn. --skipPartitionCheck= Default: false When partition column is not specified, fails if partition is too old; set this flag to ignore this check. --splitColumn= A long/integer column used to create splits for parallel queries. Should be used with queryParallelism. --useAvroLogicalTypes= Default: false Controls whether generated Avro schema will contain logicalTypes or not.
If provided an input Avro schema file, dbeam will read input schema file and use some of the properties when an output Avro schema is created.
record.doc
record.namespace
record.field.doc
This is a pre-alpha feature currently under development and experimentation.
Read queries used by dbeam to extract data generally don't place any locks, and hence multiple read queries can run in parallel. When running in parallel mode with
--queryParallelismspecified, dbeam looks for
--splitColumnargument to find the max and min values in that column. The max and min are then used as range bounds for generating
queryParallelismnumber of queries which are then run in parallel to read data. Since the splitColumn is used to calculate the query bounds, and dbeam needs to calculate intermediate bounds for each query, the type of the column must be long / int. It is assumed that the distribution of values on the
splitColumnis sufficiently random and sequential. Example if the min and max of the split column is divided equally into query parallelism parts, each part would contain approximately equal number of records. Having skews in this data would result in straggling queries, and hence wont provide much improvement. Having the records sequential would help in having the queries run faster and it would reduce random disk seeks.
Recommended usage: Beam would run each query generated by DBeam in 1 dedicated vCPU (when running with Dataflow Runner), thus for best performance it is recommended that the total number of vCPU available for a given job should be equal to the
queryParallelismspecified. Hence if
workerMachineTypefor Dataflow is
n1-standard-wand
numWorkersis
nthen
queryParallelism
qshould be a multiple of
n*wand the job would be fastest if
q = n * w.
For an export of a table running from a dedicated PostgresQL replica, we have seen best performance over vCPU time and wall time when having a
queryParallelismof 16. Bumping
queryParallelismfurther increases the vCPU time without offering much gains on the wall time of the complete export. It is probably good to use
queryParallelismless than 16 for experimenting.
Building and testing can be achieved with
mvn:
mvn verify
In order to create a jar with all dependencies under
./dbeam-core/target/dbeam-core-shaded.jarrun the following:
mvn clean package -Ppack
Using Java from the command line:
java -cp ./dbeam-core/target/dbeam-core-shaded.jar \ com.spotify.dbeam.jobs.JdbcAvroJob \ --output=gs://my-testing-bucket-name/ \ --username=my_database_username \ --password=secret \ --connectionUrl=jdbc:postgresql://some.database.uri.example.org:5432/my_database \ --table=my_table
For CloudSQL:
java -cp ./dbeam-core/target/dbeam-core-shaded.jar \ com.spotify.dbeam.jobs.JdbcAvroJob \ --output=gs://my-testing-bucket-name/ \ --username=my_database_username \ --password=secret \ --connectionUrl=jdbc:postgresql://google/database?socketFactory=com.google.cloud.sql.postgres.SocketFactory&socketFactoryArg=project:region:cloudsql-instance \ --table=my_table
--connectionUrl=jdbc:mysql://google/database?socketFactory=com.google.cloud.sql.mysql.SocketFactory&cloudSqlInstance=project:region:cloudsql-instance&useCursorFetch=true
?useCursorFetch=trueis important for MySQL, to avoid early fetching all rows, more details on MySQL docs.
To run a cheap data extraction, as a way to validate, one can add
--limit=10 --skipPartitionCheckparameters. It will run the queries, generate the schemas and export only 10 records, which should be done in a few seconds.
Database password can be configured by simply passing
--password=writepasswordhere,
--passwordFile=/path/to/file/containing/passwordor
--passwordFile=gs://gcs-bucket/path/to/file/containing/password.
A more robust configuration is to point to a Google KMS encrypted file. DBeam will try to decrypt using KMS if the file ends with
.encrypted(e.g.
--passwordFileKmsEncrypted=gs://gcs-bucket/path/to/db-password.encrypted).
The file should contain a base64 encoded encrypted content. It can be generated using
gcloudlike the following:
echo -n "super_secret_password" \ | gcloud kms encrypt \ --location "global" \ --keyring "dbeam" \ --key "default" \ --project "mygcpproject" \ --plaintext-file - \ --ciphertext-file - \ | base64 \ | gsutil cp - gs://gcs-bucket/path/to/db-password.encrypted
KMS location, keyring, and key can be configured via Java Properties, defaults are:
java \ -DKMS_KEYRING=dbeam \ -DKMS_KEY=default \ -DKMS_LOCATION=global \ -DKMS_PROJECT=default_gcp_project \ -cp ./dbeam-core/target/dbeam-core-shaded.jar \ com.spotify.dbeam.jobs.JdbcAvroJob \ ...
To include DBeam library in a mvn project add the following dependency in
pom.xml:
com.spotify dbeam-core ${dbeam.version}
To include DBeam library in a SBT project add the following dependency in
build.sbt:
libraryDependencies ++= Seq( "com.spotify" % "dbeam-core" % dbeamVersion )
Make sure you have mvn installed. For editor, IntelliJ IDEA is recommended.
To test and verify changes during development, run:
mvn verify
Or:
mvn verify -Pcoverage
This project adheres to the Open Code of Conduct. By participating, you are expected to honor this code.
Every push to master will deploy a snapshot version to Sonatype. You can check the deployment in the following links:
Copyright 2016-2020 Spotify AB.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0