CouchDB Queue Service: API-compatible Amazon SQS implementation with NodeJS, CouchDB
CQS is a message queue system, using Apache CouchDB. It is exactly like Amazon Simple Queue Service (SQS). The API is the same. Everything is exactly the same, it just runs on CouchDB.
CQS is implented in Javascript and supports:
Use CQS if you use Javascript, you know (or appreciate) Amazon SQS, and you want the same thing on your server.
For Node, install with NPM.
$ npm install cqs
The test script
test/run.jswill copy itself into a Couch app which you can run from the browser.
Initialize the CQS module to point to a database on your couch.
// A normal import. var cqs = require('cqs');// Pre-apply my couch and db name. cqs = cqs.defaults({ "couch": "https://user:[email protected]" , "db" : "cqs_queue" });
cqs.ListQueues(function(error, queues) { console.log("Found " + queues.length + " queues:"); queues.forEach(function(queue) { console.log(" * " + queue.name); })// Output: // Found 2 queues: // * a_queue // * another_queue })
Creating queues requires database administrator access.
// Just create with a name. cqs.CreateQueue("important_stuff", function(error, queue) { if(!error) console.log("Important stuff queue is ready"); })// Create with an options object. var opts = { QueueName : "unimportant_stuff" , DefaultVisibilityTimeout: 3600 // 1 hour , browser_attachments : true // Attach browser libs and test suite };
cqs.CreateQueue(opts, function(error, queue) { if(!error) console.log("Created " + queue.name + " with timeout + " queue.VisibilityTimeout);
// Output // Created unimportant_stuff with timeout 3600 })
Everything is like SQS, except the message body is any JSON value.
// The convenient object API: important_stuff.send(["keep these", "things", "in order"], function(error, message) { if(!error) console.log('Sent: ' + JSON.stringify(message.Body));// Output: // Sent: ["keep these","things","in order"] })
cqs.SendMessage(important_stuff, "This message is important!", function(error, message) { if(!error) console.log('Sent message: ' + message.Body);
// Output: // Sent message: This message is important! })
// Or, just use the queue name. cqs.SendMessage('some_other_queue', {going_to: "the other queue"}, function(error, message) { if(!error) console.log('Message ' + message.MessageId + ' is going to ' + message.Body.going_to);
// Output: // Message a9b1c48bd6ae433eb7879013332cd3cd is going to the other queue })
Note, like the SQS API,
ReceiveMessagealways returns a list.
// The convenient object API: my_queue.receive(function(error, messages) { if(!error) console.log('Received message: ' + JSON.stringify(messages[0].Body));// Output: // Received message: })
// The standard API, receiving multiple messages cqs.ReceiveMessage(some_queue, 5, function(er, messages) { if(!error) console.log('Received ' + messages.length + ' messages');
// Output: // Received <0 through 5> messages })
When a message is "done", remove it from the queue.
// The convenient object API: message.del(function(error) { // Message deletion never results in an error. If a message is successfully // deleted, it will simply never appear in the queue again. console.log('Message deleted!'); })// The standard API: cqs.DeleteMessage(my_message, function(error) { console.log('Message deleted'); })
These parameters are useful with the
.defaults()method to customize CQS behavior.
couch| URL to CouchDB
db| Database storing the CQS queue
time_C| Coefficient of timeouts. CQS treats a delayed response as a failure. Timeout durations (default 0.5s) are multipled by
time_C.
The test suite uses node-tap. Install tap via
npm install -g tapthen run
tap test/tapin this project.
$ tap test/tap ok test/tap/couch.js ............................ 12368/12368 ok test/tap/cqs.js .................................... 82/82 ok test/tap/lib.js .................................... 11/11 ok test/tap/once.js ............................. 10007/10007 ok test/tap/tap.js ...................................... 1/1 total ........................................... 22469/22469ok
Use environment variables to set operational parameters, for example:
env couch=https://admin:[email protected] C=20 ./tests/run.js
List of variables:
cqs_couch| URL of CouchDB; the
couchAPI parameter
cqs_db| Name of database storing the queue; the
dbAPI parameter
Cor
timeout_coefficient| Timeout coefficient;
time_CAPI parameter
exit| Halt all testing on a timeout error
skip_browser| Do not attach the browser test suite Couch app
cqs_log_level| log4js log level (
"debug"and
"info"are useful)
The test suite copies itself into CouchDB as a Couch app. Just visit
/cqs_test/_design/CQS%2fapi_tests/test.htmlin your browser.
To simulate environment variables, use the URL query string, for example:
http://localhost:5984/cqs_test/_design/CQS%2fapi_tests/test.html?C=10&exit=true
I wish CQS had many more features.
CouchDB stores delete operations indefinitely. This allows delete operations to replicate. Unfortunately, deleted documents accumulate, consuming disk space.
Damien describes CouchDB purging in the mailing list. Purging permanently removes documents, as if they never existed. Databases with high create/delete churn ultimately must be purged at some point. Purge operations, by intention, cannot replicate; thus purging is essentially local database maintenance, only done to documents which nobody will ever miss. A final concern is that purging too often will destroy view indexes. Applications using views will be effectively offline until the views rebuild.
The following procedure is a cooperative technique for safe, zero-impact, zero-downtime, purging. Since purging is local to a database, the procedure makes some assumptions:
Dateheaders indicate this clock's current time.
_localdocument
In this procedure, this is the criteria for purging a document:
deleted_attimestamp, older than Age (preferred)
deleted_atfield, but its update sequence is greater than Updates ago
Thus, Age and Updates are site-specific parameters. Both are effectively a replication deadline. The documents had better finish replicating before the delete becomes Age old and before Updates subsequent changes! Choosing an age is better (24 hours, or 7 days both seem reasonable); however the update deadline is necessary to purge documents from legacy applications which use HTTP DELETE.
The procedure:
committed_update_seq
Dateheader
_idand
_rev
?reduce=false&limit=1
_info. If
purge_seqchanges during this loop, something is wrong. Abort.
_local/maintenancewhich should have an
expires_atvalue.
Dateheader timestamp, abort
_local/maintenanceand abort if the request fails
activity = "purge"
started_attimestamp = now
expires_attimestamp when maintenance is expected to be done (5 minutes?)
expires_atoccurs
_ids and
_revs to purge (optionally, begin this step immediatly after ping the db completes). Ideas:
_changes?since=0, anything with
"deleted":trueand
seq < update_seq - Updatescan purge
&include_docs=trueand check
deleted_atvs. now - Age
_changesfilter to do all this server-side?
POST _all_docs?include_docs=true {"keys":[...]}) looking for old
deleted_at
if(doc._deleted && doc.deleted_at) emit(doc.deleted_at, 1);
compact_runningbecomes false, the documents are gone forever. Fire an event or callback or something.
expires_at= now and update
_local/maintenance.