Queue API
Process jobs with async message queues and worker functions. This API is automatically available from the inside of any Codehook function.
API quick overview
Datastore.open()
Datastore.open() opens a connection to a datastore in the active project space.
import {Datastore} from 'codehooks-js'
async function myfunc() {
const conn = await Datastore.open();
// use conn to call API functions
...
}
Returns Promise / Datastore connection
app.worker(topic, handler, options)
Define a worker function that processes jobs from a queue topic. Each time a job is enqueued with a matching topic, this worker function will be invoked to process it.
Parameters
- topic: queue topic string to listen for
- handler: function
(req, res) => {}that processes the jobreq.body.payloadcontains the job data- Call
res.end()when processing is complete
- options (optional): worker configuration
- workers: number of parallel workers (on paid plans more than 1 parallel worker can be applied)
- timeout: number in milliseconds (on paid plans the execution time can be up to 10 minutes)
Code example
import { app, Datastore } from 'codehooks-js';
// Define a worker with 3 parallel workers
app.worker('sendEmail', (req, res) => {
const { email, subject, body } = req.body.payload;
console.log('Sending email to', email);
// Process the job...
res.end(); // Signal completion
}, { workers: 3 });
export default app.init();
For more details on worker queues, parallel processing, and architecture, see the Worker Queues documentation.
enqueue(topic, payload, options)
Add a queued job in a datastore. Jobs in the queue are processed by your worker function.
Parameters
- topic: queue topic string
- payload: any json data
- options: NOT IN USE
Returns Promise / JobId
Code example
import { app, Datastore } from 'codehooks-js';
// your worker function
app.worker('sendEmail', (req, res) => {
console.debug('Sending email to', req.body.payload.email);
// send email with your favorite service, e.g. Mailgun or Sendgrid
res.end(); // done processing queue item
});
app.post('/welcome', async (req, res) => {
const conn = await Datastore.open();
const jobId = await conn.enqueue('sendEmail', req.body);
res.status(200).json({
message: 'Thanks for the signup, you will receive an email soon :)',
});
});
export default app.init(); // Bind functions to the serverless runtime
Running the example with curl
curl --location --request POST 'https://<YOUR-PROJECT-NAME>.api.codehooks.io/dev/welcome' \
--header 'x-apikey: <YOUR-API-TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{"email": "[email protected]"}'
{"message":"Thanks for the signup, you will receive an email soon :)"}
Your project name and token(s) can be retrieved by running the command 'coho info' when you are inside the project folder.
enqueueFromQuery(collection, query, topic, options)
Add multiple queued jobs from the result of a database query in a datastore.
Each object from the collection in the query result will be processed by your worker function.
If your collection has 1000 items, a {} query will add 1000 items to the queue and call your worker function 1000 times.
Parameters
- collection: datastore collection name
- query: JSON query object
- topic: queue topic string
- options (advanced optional settings for indexes and filtering)
- useIndex: indexed field name
- Indexes are created with CLI command
coho createindex
- Indexes are created with CLI command
- startIndex: jump into the index
- endIndex: where to end in the index
- limit: how many items to return
- offset: how many items to skip before returning any
- reverse: reverese scan of index, i.e. descending sort
- useIndex: indexed field name
Returns Promise / {JobId, count}
Code example
import { app, Datastore } from 'codehooks-js';
// Codehooks route hooks
app.post('/add', add);
app.worker('mailworker', mailWorker);
// add all items in a collection to the queue worker function
async function add(req, res) {
const conn = await Datastore.open();
const query = { emailConsent: true }; // all objects that matches query
const topic = 'mailworker';
const job = await conn.enqueueFromQuery('emaildata', query, topic);
console.log('Queue job', job);
res.end(job);
}
// worker function
async function mailWorker(req, res) {
let { payload } = req.body;
console.log('Mail worker', payload.email);
// fake that the job takes 100 ms
setTimeout(res.end, 100);
}
export default app.init(); // Bind functions to the serverless runtime
Running the example with curl
curl --location --request POST 'https://<YOUR-PROJECT-NAME>.api.codehooks.io/dev/add' \
--header 'x-apikey: <YOUR-API-TOKEN>'
{"count":2018,"jobId":"7c29faa2-d075-4fe6-af54-60dce6024f04"}
Monitoring Queue Health
Use the queue-status CLI command to monitor queue health, item counts, and worker activity in real-time:
# Monitor queue status in real-time
coho queue-status --follow
# View queue status in compact format
coho queue-status --format compact
# Export queue status as JSON
coho queue-status --format json
This is particularly useful for:
- Monitoring queue depths and processing rates
- Identifying stuck or slow workers
- Debugging queue-based workflows
- Verifying worker configuration
Related Topics
- Worker Queues - Detailed guide on worker queue architecture and configuration
- Workflow API - Build complex stateful workflows with queues
- CLI Tools - Command-line tools for monitoring queues