Connect UNIX pipes and message queues
Pipecat allows you to scale any program supporting the FACK contract using traditional UNIX pipes and AMQP. Think of it as netcat but with message acknowledgments. It is the successor of redis-pipe.
# Publish sequence of numbers to a job queue. seq 1 1000 | pipecat publish numbersMultiply each number with 10 and store results in a different queue.
pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results
Aggregate the results and calculate the sum
pipecat consume results --autoack --non-blocking
| python -cu 'import sys; print(sum(map(int, sys.stdin)))'
If you are into streams and UNIX pipes checkout my Haskell based awk and sed alternative
Pipecat supports a local mode and all AMQP 0.9.1 message brokers.
You can download a single binary for Linux, OSX or Windows.
OSX
wget -O pipecat https://github.com/lukasmartinelli/pipecat/releases/download/v0.3/pipecat_darwin_amd64 chmod +x pipecat./pipecat --help
Linux
wget -O pipecat https://github.com/lukasmartinelli/pipecat/releases/download/v0.3/pipecat_linux_amd64 chmod +x pipecat./pipecat --help
Install from Source
go get github.com/lukasmartinelli/pipecat
If you are using Windows or 32-bit architectures you need to download the appropriate binary yourself.
pipecatconnects message queues and UNIX pipes. The need arose when I started building messaging support into utilities in order to make them scalable but still wanted to leave my programs the way they are without heavy dependencies and still be able to scale the process reliably.
In this example we will calculate the sum of a sequence of numbers.
Specify the
AMQP_URIenv var to connect to the message broker.
export AMQP_URI=amqp://user:[email protected]:5672/vhost
Let's create a new queue
numbersand publish a sequence of numbers from 1 to 1000.
seq 1 1000 | pipecat publish numbers
Multiply the input sequence with factor
10and publish the results to an additional
resultsqueue. This step can be run on multiple hosts. We want to acknowledge all received messages automatically with
--autoack.
pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results
Now let's sum up all the numbers. Because we want to end after receiving all numbers we specify the
--non-blockingmode which will close the connection if no messages have been received after a timeout.
pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'
If you do not have an existing AMQP broker at hand you can run RabbitMQ in a docker container, expose the ports and connect to it.
docker run -d -p 5672:5672 --hostname pipecat-rabbit --name pipecat-rabbit rabbitmq:3
Now connect to localhost with the default
guestlogin.
export AMQP_URI=amqp://guest:[email protected]:5672/
If you are using existing message queue infrastructure you can also publish messages to an exchange, with the first parameter used as the routing key. Thanks to @kennon for the implementation.
seq 1 1000 | pipecat publish --exchange "my_exchange" --no-create-queue my.routing.key
The AMQP_EXCHANGE environment variable can also be used:
export AMQP_EXCHANGE=my_exchange
We already have written a small, concise and very scalable set of programs. We can now run the
multiply.pystep on many servers.
However, if the server dies while
multiply.pyis running the input lines already processed are lost.
If your program needs that ability you need to implement the FACK contract, demonstrated for the
multiply.pysample.
Any program that accepts output from
stdinand writes tostdoutshould accept an environment variableFACKcontaining a file descriptor. If a single operation performed on a line fromstdinwas successful , that line should be written toFACK.
Implementing the contract is straightforward.
FACKenvironment variable containing a file name
Below is a Python example
multiply.pywhich multiplies the sequence of numbers as above but writes the input line to
stdackif successfully processed.
import sys import oswith open(os.getenv('FACK', os.devnull), 'w') as stdack: # Works even if FACK is not set for line in sys.stdin: num = int(line.strip()) result = num * 10 sys.stdout.write('{}\n'.format(result)) stdack.write(line) # Ack the processed line stdack.flush() # Make sure line does not get lost in the buffer
Now your program can no longer lose messages with
pipecatbecause you can feed the
FACKoutput back into
pipecatusing named pipes which will only then acknowledge the messages from the message queue.
Fill the queue again.
seq 1 1000 | pipecat publish numbers
And use a named pipe to funnel the acknowledged input lines back into pipecat.
mkfifo ack cat ack | pipecat consume numbers \ | FACK=ack python -u multiply.py \ | pipecat publish results rm ack
Consume all messages to reduce a result. In the reduce operation we need to autoack all received messages because we can't possibly hold the entire result set in memory until the operation has performed.
pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'
With a few lines additional code only depending on the standard library you can now make any program in any language scalable using message queues. Without any dependencies and without changing the behavior bit.
pipecat consume results --autoack --non-blocking > results_backup.json cat results_backup.json | pipecat publish results
We use gox to create distributable binaries for Windows, OSX and Linux.
docker run --rm -v "$(pwd)":/usr/src/pipecat -w /usr/src/pipecat tcnksm/gox:1.4.2-light