Need help with pipecat?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

435 Stars 11 Forks MIT License 68 Commits 5 Opened issues


Connect UNIX pipes and message queues

Services available


Need anything else?

Contributors list

# 25,472
56 commits
# 347,734
2 commits
# 67,486
1 commit
# 27,234
1 commit

pipecat Build Status MIT licensed


Pipecat allows you to scale any program supporting the FACK contract using traditional UNIX pipes and AMQP. Think of it as netcat but with message acknowledgments. It is the successor of redis-pipe.

# Publish sequence of numbers to a job queue.
seq 1 1000 | pipecat publish numbers

Multiply each number with 10 and store results in a different queue.

pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results

Aggregate the results and calculate the sum

pipecat consume results --autoack --non-blocking
| python -cu 'import sys; print(sum(map(int, sys.stdin)))'

If you are into streams and UNIX pipes checkout my Haskell based awk and sed alternative


Pipecat supports a local mode and all AMQP 0.9.1 message brokers.


You can download a single binary for Linux, OSX or Windows.


wget -O pipecat
chmod +x pipecat

./pipecat --help


wget -O pipecat
chmod +x pipecat

./pipecat --help

Install from Source

go get

If you are using Windows or 32-bit architectures you need to download the appropriate binary yourself.

Using pipecat

connects message queues and UNIX pipes. The need arose when I started building messaging support into utilities in order to make them scalable but still wanted to leave my programs the way they are without heavy dependencies and still be able to scale the process reliably.

In this example we will calculate the sum of a sequence of numbers.

Connect the broker

Specify the

env var to connect to the message broker.
export AMQP_URI=amqp://user:[email protected]:5672/vhost

Create the queue

Let's create a new queue

and publish a sequence of numbers from 1 to 1000.
seq 1 1000 | pipecat publish numbers

Process input

Multiply the input sequence with factor

and publish the results to an additional
queue. This step can be run on multiple hosts. We want to acknowledge all received messages automatically with
pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results

Aggregate results

Now let's sum up all the numbers. Because we want to end after receiving all numbers we specify the

mode which will close the connection if no messages have been received after a timeout.
pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

Local RabbitMQ with Docker

If you do not have an existing AMQP broker at hand you can run RabbitMQ in a docker container, expose the ports and connect to it.

docker run -d -p 5672:5672 --hostname pipecat-rabbit --name pipecat-rabbit rabbitmq:3

Now connect to localhost with the default

export AMQP_URI=amqp://guest:[email protected]:5672/

Publish messages to Exchange

If you are using existing message queue infrastructure you can also publish messages to an exchange, with the first parameter used as the routing key. Thanks to @kennon for the implementation.

seq 1 1000 | pipecat publish --exchange "my_exchange" --no-create-queue my.routing.key

The AMQP_EXCHANGE environment variable can also be used:

export AMQP_EXCHANGE=my_exchange

Make it failsafe

We already have written a small, concise and very scalable set of programs. We can now run the
step on many servers.

However, if the server dies while
is running the input lines already processed are lost.

If your program needs that ability you need to implement the FACK contract, demonstrated for the

FACK Contract

Any program that accepts output from

and writes to
should accept an environment variable
containing a file descriptor. If a single operation performed on a line from
was successful , that line should be written to

FACK contract Flow

Implement the contract

Implementing the contract is straightforward.

  1. Support the optional
    environment variable containing a file name
  2. Write the received input into this file handle if we performed the operation successfully on it

Python Example

Below is a Python example
which multiplies the sequence of numbers as above but writes the input line to
if successfully processed.
import sys
import os

with open(os.getenv('FACK', os.devnull), 'w') as stdack: # Works even if FACK is not set for line in sys.stdin: num = int(line.strip()) result = num * 10 sys.stdout.write('{}\n'.format(result)) stdack.write(line) # Ack the processed line stdack.flush() # Make sure line does not get lost in the buffer

Use named queues for ACKs

Now your program can no longer lose messages with

because you can feed the
output back into
using named pipes which will only then acknowledge the messages from the message queue.

Pipecat Flow Diagram

Fill the queue again.

seq 1 1000 | pipecat publish numbers

And use a named pipe to funnel the acknowledged input lines back into pipecat.

mkfifo ack
cat ack | pipecat consume numbers \
| FACK=ack python -u \
| pipecat publish results
rm ack

Consume all messages to reduce a result. In the reduce operation we need to autoack all received messages because we can't possibly hold the entire result set in memory until the operation has performed.

pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

With a few lines additional code only depending on the standard library you can now make any program in any language scalable using message queues. Without any dependencies and without changing the behavior bit.

Usage Examples

Create local Queue Backup

pipecat consume results --autoack --non-blocking > results_backup.json
cat results_backup.json | pipecat publish results

Cross Compile Release

We use gox to create distributable binaries for Windows, OSX and Linux.

docker run --rm -v "$(pwd)":/usr/src/pipecat -w /usr/src/pipecat tcnksm/gox:1.4.2-light

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.