Need help with forklift?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

131 Stars 12 Forks Apache License 2.0 248 Commits 4 Opened issues


Forklift: Moving big databases around. A ruby ETL tool.

Services available


Need anything else?

Contributors list

Forklift ETL

Moving heavy databases around. Gem Version Build Status



Forklift is a ruby gem that makes it easy for you to move your data around. Forklift can be an integral part of your datawarehouse pipeline or a backup tool. Forklift can collect and collapse data from multiple sources or across a single source. In forklift's first version, it was only a MySQL tool but now, you can create transports to deal with the data of your choice.

Set up

Make a new directory with a

like this:
source ''
gem 'forklift_etl'



Use the generator by doing

(bundle exec) forklift --generate

Make your

using the examples below.

Run your plan

forklift plan.rb
You can run specific parts of your plan like
forklift plan.rb step1 step5

Directory structure

Forklift expects your project to be arranged like:

├── config/
|   ├── email.yml
├── connections/
|   ├── mysql/
|       ├── (DB).yml
|   ├── elasticsearch/
|       ├── (DB).yml
|   ├── csv/
|       ├── (file).yml
├── log/
├── pid/
├── template/
├── patterns/
├── transformations/
├── Gemfile
├── Gemfile.lock
├── plan.rb

To enable a foklift connection, all you need to do is place the yml config file for it within

Files you place within
will be loaded automatically.


Example Project

Visit the

directory to see a whole forklift project.

Simple extract and load (no transformations)

If you have multiple databases and want to consolidate into one, this plan should suffice.

plan =! do

==> Connections

service1 = plan.connections[:mysql][:service1] service2 = plan.connections[:mysql][:service2] analytics_working = plan.connections[:mysql][:analytics_working] analytics = plan.connections[:mysql][:analytics]

==> Extract

Load data from your services into your working database

If you want every table: service1.tables.each do |table|

Data will be extracted in 1000 row collections

%w(users organizations).each do |table|"select * from #{table}") { |data| analytics_working.write(data, table) } end

%w(orders line_items).each do |table|"select * from #{table}") { |data| analytics_working.write(data, table) } end

==> Load

Load data from the working database to the final database

analytics_working.tables.each do |table| # will attempt to do an incremental pipe, will fall back to a full table copy # by default, incremental updates happen off of the updated_at column, but you can modify this by setting the matcher in the options # If you want a full pipe instead of incremental, then just use pipe instead of optimistic_pipe # The pipe pattern works within the same database. To copy across databases, try the mysql_optimistic_import method # This example show the options with their default values. Forklift::Patterns::Mysql.optimistic_pipe(analytics_working.current_database, table, analytics.current_database, table, matcher: 'updated_at', primary_key: 'id') end end

Simple MySQL ETL

plan =! do
  # Do some SQL transformations
  # SQL transformations are done exactly as they are written
  destination = plan.connections[:mysql][:destination]

Do some Ruby transformations

Ruby transformations expect do!(connection, forklift) to be defined

destination = plan.connections[:mysql][:destination] destination.exec!("./transformations/email_suffix.rb")

mySQL Dump the destination

destination = plan.connections[:mysql][:destination] destination.dump('/tmp/destination.sql.gz') end

Elasticsearch to MySQL

plan =! do
  source = plan.connections[:elasticsearch][:source]
  destination = plan.connections[:mysql][:destination]
  table = 'es_import'
  index = 'aaa'
  query = { query: { match_all: {} } } # pagination will happen automatically
  destination.truncate!(table) if destination.tables.include? table, query) {|data| destination.write(data, table) }

MySQL to Elasticsearch

plan =! do
  source = plan.connections[:mysql][:source]
  destination = plan.connections[:elasticsearch][:source]
  table = 'users'
  index = 'users'
  query = "select * from users" # pagination will happen automatically {|data| destination.write(data, table, true, 'user') }

Forklift Emails


Put this at the end of your plan inside the

# ==> Email
# Let your team know the outcome. Attaches the log.
email_args = {
  to: "[email protected]",
  from: "Forklift",
  subject: "Forklift has moved your database @ #{}",
  body: "So much data!"
plan.mailer.send(email_args, plan.logger.messages)

ERB templates

You can get fancy by using an ERB template for your email and SQL variables:

# ==> Email
# Let your team know the outcome. Attaches the log.
email_args = {
  to: "[email protected]",
  from: "Forklift",
  subject: "Forklift has moved your database @ #{}"
email_variables = {
  total_users_count:'select count(1) as "count" from users')[0][:count]
email_template = "./template/email.erb"
plan.mailer.send_template(email_args, email_template, email_variables, plan.logger.messages)

Then in


Your forklift email

  • Total Users:


When you run

forklift --generate
, we create
for you:
# Configuration is passed to Pony (

==> SMTP

If testing locally, mailcatcher ( is a helpful gem

via: smtp via_options: address: localhost port: 1025

user_name: user

password: password

authentication: :plain # :plain, :login, :cram_md5, no auth by default

domain: "localhost.localdomain" # the HELO domain provided by the client to the server

==> Sendmail

via: sendmail


location: /usr/sbin/sendmail

arguments: '-t -i'


# do! is a wrapper around common setup methods (pidfile locking, setting up the logger, etc)
# you don't need to use do! if you want finer control
def do!
  # you can use `plan.logger.log` in your plan for logging
  self.logger.log "Starting forklift"

use a pidfile to ensure that only one instance of forklift is running at a time; store the file if OK!

this will load all connections in /config/connections/#{type}/#{name}.yml into the plan.connections hash

and build all the connection objects (and try to connect in some cases)


yield # your stuff here!

remove the pidfile

self.logger.log "Completed forklift"! end


You can optionally divide up your forklift plan into steps:

plan =! do

plan.step('Mysql Import'){ source = plan.connections[:mysql][:source] destination = plan.connections[:mysql][:destination] source.tables.each do |table| Forklift::Patterns::Mysql.optimistic_pipe(source, table, destination, table) end }

plan.step('Elasticsearch Import'){ source = plan.connections[:elasticsearch][:source] destination = plan.connections[:mysql][:destination] table = 'es_import' index = 'aaa' query = { query: { match_all: {} } } # pagination will happen automatically destination.truncate!(table) if destination.tables.include? table, query) {|data| destination.write(data, table) } }


When you use steps, you can run your whole plan, or just part if it with command line arguments. For example,

forklift plan.rb "Elasticsearch Import"
would just run that single portion of the plan. Note that any parts of your plan not within a step will be run each time.

Error Handling

By default, exceptions within your plan will raise and crash your application. However, you can pass an optional

lambda to your step about how to handle the error. the
will be passed (
). If you don't re-raise within your error handler, your plan will continue to excecute. For example:
error_handler = lambda { |name, exception|
  if exception.class =~ /connection/
    # I can't connect, I should halt
    raise e
  elsif exception.class =~ /SoftError/
    # this type of error is OK
    raise e

plan.step('a_complex_step', error_handler){




Transports are how you interact with your data. Every transport defines

methods which handle arrays of data objects (and the helper methods required).

Each transport should have a config file in

. It will be loaded at boot.

Transports optionally define helper methods which are a shortcut to copy data within a transport, like the mysql

methods (i.e.:
insert into #{to_db}.#{to_table}; select * from #{from_db}.#{from_table})
. A transport may also define other helpers (like how to create a MySQL dump). These should be defined in
within the

Creating your own transport

In the

directory in your project, create a file that defines at least the following:
module Forklift
  module Connection
    class Mixpanel < Forklift::Base::Connection

  def initialize(config, forklift)
    @config = config
    @forklift = forklift

  def config

  def forklift

  def read(index, query, args)
    # ...
    data = [] # data is an array of hashes
    # ...
    if block_given?
      yield data
      return data

  def write(data, table)
    # data is an array of hashes
    # "table" can be any argument(s) you need to know where/how to write
    # ...

  def pipe(from_table, from_db, to_table, to_db)
    # ...




end end

Existing transports and patterns for them are documented here





Forklift allows you to create both Ruby transformations and script transformations.

  • It is up to the transport to define
    , and not all transports will support it. Mysql can run
    files, but there is not an equivalent for elasticsearch. Mysql scripts evaluate statement by statement. The delimeter (by default
    ) can be redefined using the
    command as described here
  • .exec
    runs and logs exceptions, while
    will raise on an error. For example,
    will run cleanup.rb on the destination database.
  • Script files are run as-is, but ruby transformations must define a
    method in their class and are passed
    def do!(connection, forklift)
  • args is optional, and can be passed in from your plan
# Example transformation to count users
# count_users.rb

class CountUsers def do!(connection, forklift, args) forklift.logger.log "counting users" count = connection.count('users') forklift.logger.log "[#{}] found #{count} users" end end

# in your plan.rb
plan =! do
  destination = plan.connections[:mysql][:destination]
  destination.exec!("./transformations/combined_name.sql", {name: 'user counter'})


Options & Notes

  • Thanks to @rahilsondhi, @rgarver and Looksharp for all their help
  • email_options is a hash consumed by the Pony mail gem
  • Forklift's logger is Lumberjack with a wrapper to also echo the log lines to stdout and save them to an array to be accessed later by the email system.
  • The mysql connections hash will be passed directly to a mysql2 connection.
  • The elasticsearch connections hash will be passed directly to a elasticsearch connection.
  • Your databases must exist. Forklift will not create them for you.
  • Ensure your databases have the right encoding (eg utf8) or you will get errors like
    #<:error: incorrect string value: for column at row>
  • If testing locally, mailcatcher ( is a helpful gem to test your email sending

Contributing and Testing



If you want something similar for Node.js try Empujar

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.