Skip to the content.

Standard nodes

QTopology contains some already implemented spouts and bolts that are ready to use out-of-the-box.

To use such bolts and spouts, set it’s type to "sys" and cmd to appropriate value - see below for values.

List of standard spouts:

List of standard bolts:

Base classes that can be extended with custom logic:

File-reader spout

cmd="file_reader"

This spout reads target file and emits messages that are stored inside.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "file_reader",
    "init": {
        "file_name": "/some/file.txt",
        "file_format": "json"
    }
}

Messages can be stored in several formats:

Optional settings:

Process spout

cmd="process"

This spout behaves like the file spout - the difference is that it executes specified command-line, reads stdout and emits messages.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "process",
    "init": {
        "cmd_line": "my_executable -param1 -x -y -z",
        "file_format": "json"
    }
}

For definition of input parameters and explanation of the output handling, see file spout.

NOTE: This spout waits for the child process to finish, before it emits the data. This is not suitable for large outputs or long-running processes. See “Process spout continuous” for a version that reads thedata continuously.

The child process can be run repeatedly by setting the run_interval settings to the number of milliseconds that we want the process to run.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "process",
    "init": {
        "cmd_line": "my_executable -param1 -x -y -z",
        "run_interval": 60000,
        "file_format": "json"
    }
}

The above example will run the child process each minute and collect the results.

Process spout continuous

cmd="process-continuous"

This spout behaves like the process spout - the difference is that it spawns child process, specified by the command-line, and reads its stdout as it is written (and emits the messages). The two most important config parameters are cmd_line - the command to be executed and cwd - the current working directory.

Error handling

If emit_error_on_exit flag (false by default) is set to true, the spout will emit an exception when the child process exits. Setting emit_stderr_errors to true (false by default) will emit any data read from stderr as an exception. Setting emit_parse_errors to true (default) will emit parse exceptions, otherwise they will be silently ignored if set to false.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "process-continuous",
    "init": {
        "cmd_line": "my_executable -param1 -x -y -z",
        "cwd": "./",
        "emit_parse_errors" : true,
        "emit_stderr_errors": false,
        "emit_error_on_exit" : false,
        "file_format": "json"
    }
}

For definition of input parameters and explanation of the output handling, see file spout.

Process bolt

cmd="process"

This bolt spawns a child process and communicates with it using stdin and stdout. Incoming messages are sent to the process as a JSON-serialized strings (single lines) via stdin and whenever a line is written to stdout, it is assumed to be JSON serialized object. The text is parsed and sent down the topology.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "process",
    "init": {
        "cmd_line": "my_executable -param1 -x -y -z"
    }
}

Optionally, a stream_id for the outgoing messages can be specified.

Date-transform bolt

cmd="date_transform"

This bolt takes incoming messages and transforms predefined fields into Date objects, since this is the only one that cannot be properly serialized and deserialized from JSON.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "date_transform",
    "init": {
        "date_transform_fields": ["field1", "field2"],
        "reuse_stream_id": true
    }
}

Using this bolt only makes sense when messages are passed in binary form.

NOTE: This bolt is obsolete, use type_transform bolt in the future.

Date-to-numeric bolt

cmd="date2numeric_transform"

This bolt takes incoming messages and transforms predefined fields from Date objects into their numeric value (Unix timestamp).

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "date2numeric_transform",
    "init": {
        "date_transform_fields": ["field1", "field2"],
        "reuse_stream_id": true
    }
}

Type-transform bolt

cmd="type_transform"

This bolt takes incoming messages and transforms predefined fields into Date objects, numerics or booleans. It is a successor of date_transform bolt.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "type_transform",
    "inputs": [{ "source": "pump" }],
    "init": {
        "date_transform_fields": ["field1", "field2"],
        "date_n_transform_fields": ["field1n", "field2n"],
        "numeric_transform_fields": ["field3"],
        "bool_transform_fields": ["field4"],
        "reuse_stream_id": true
    }
}

The following settings can be used in init section:

Using this bolt only makes sense when messages are passed in binary form.

Accumulator bolt

cmd="accumulator"

This bolt takes incoming messages in GDR format and emits periodic statistics.

GDR record format is:

{
    "ts": "2018-05-20T12:34:56",
    "tags": {
        "field1": "val1",
        "field2": "val2"
    },
    "values": {
        "metric1": 433,
        "metric2": 676.343
    }
}

The bolt that would emit statistics once per minute can be defined as:

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "accumulator",
    "inputs": [{ "source": "pump" }],
    "init": {
        "granularity": 60000
    }
}

The result would be something like:

{
    "ts": 12340000,
    "name": "amount.field1=val1.field2=val2",
    "stats": { "min": 123, "max": 123, "avg": 123, "count": 1 }
}

Other options:

{
    "ts": 12340000,
    "tags": {
        "$name": "amount.field1=val1.field2=val2",
        "$metric": "amount",
        "field1": "val1",
        "field2": "val2"
    },
    "values": { "min": 123, "max": 123, "avg": 123, "count": 1 }
}

Dir spout

cmd="dir"

This spout emits a message each time a file is created, changed or deleted inside some target directory.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "dir",
    "init": {
        "dir_name": "/some/input/dir"
    }
}

This spout will emit these messages:

{
    "change_type":"rename",
    "file_name":"temp_file.tmp",
    "target_dir": "/some/input/dir",
    "ts":"2017-05-15T15:45:08.695Z"
}

Change type rename is sent when file is created or deleted, while change is sent when file content is changed.

Test spout

cmd="test"

This spout emits pre-defined records. The records need to be defined in the configuration, in field tuples.

This spout is primarily meant for testing.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "test",
    "init": {
        "tuples": [
            { "ts": "2017-03-16T12:34:33.942Z", "a": 12 },
            { "ts": "2017-03-16T12:35:33.947Z", "a": 15 },
            { "ts": "2017-03-16T12:36:33.952Z", "a": 14.3 }
        ]
    }
}

This spout will emit these messages:

{ "ts": "2017-03-16T12:34:33.942Z", "a": 12 }
{ "ts": "2017-03-16T12:35:33.947Z", "a": 15 }
{ "ts": "2017-03-16T12:36:33.952Z", "a": 14.3 }

Delaying the messages

Optionally, one can use parameters delay_start (delay before the first tuple is emitted) and delay_between (delay between two consecutive tuples) to control when tuples are emitted. The parameter values are in miliseconds.

Timer spout

cmd="timer"

This spout emits new record on every heartbeat. The record contains title (title) and timestamp (ts) fields. Additional constant fields can be defined via configuration - these fields are simply attached to the message.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "timer",
    "init": {
        "title": "some title",
        "extra_fields": {
            "field1": "a"
        }
    }
}

This spout will emit message like this:

{
    "title": "some title",
    "ts": "2017-03-16T12:34:33.942Z",
    "field1": "a"
}

REST spout

cmd="rest"

This spout opens HTTP server on specified port and emits new record on every POST request.

Incoming data is expected to be in JSON format. Optionally, fixed stream ID can be assigned to data tuple.

{
    "name": "pump1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "rest",
    "init": {
        "port": 6789,
        "stream_id": "SomeOptionalStreamId"
    }
}

Attacher bolt

cmd="attacher"

This bolt just attaches fixed data fields to every incoming message and forwards it on to listeners.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "attacher",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "extra_fields": {
            "field1": "a"
        }
    }
}

This bolt will, upon receiving a new message like this one:

{ "previous_data": "some text" }

Emit a new message like this:

{ "previous_data": "some text", "field1": "a" }

Transform bolt

cmd="transform"

This bolt transforms incoming data object according to output_template and forwards it on to listeners.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "transform",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "output_template": {
            "ts": "ts",
            "tags": {
                "country": "country",
                "browser": "user.browser"
            },
            "values": {
                "amount": "amount",
                "duration": "duration"
            }
        }
    }
}

The structure of output object is the same as template.

The values of output data are retrieved from the source object, by using property names. Nested properties are referenced by parent.child.child... notation.

This bolt will, upon receiving a new message like this one:

{
    "ts": "2017-10-01",
    "country": "SI",
    "user": { "browser": "Chrome" },
    "amount": 123.45,
    "duration": 432
}

Emit a new message like this:

{
    "ts": "2017-10-01",
    "tags": { "country": "SI", "browser": "Chrome" },
    "values": { "amount": 123.45, "duration": 432 }
}

Multiple emits are supported. Just pass an array of templates as input of initialization

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "transform",
    "inputs": [{ "source": "pump1" }],
    "init": {
        "output_template": [
            { "ts": "ts" },
            { "a": "ts", "b": "name" }
        ]
    }
}

This bolt will, upon receiving a new message like this one:

{ "ts": "2017-10-01", "name": "SI" }

Emit 2 new messages like this:

{ "ts": "2017-10-01" }
{ "a": "2017-10-01", "b": "SI" }

QEWD syntax

If needed one can use qewd syntax (uses mustache-like syntax ``) and its library. Just set option use_qewd to true:

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "transform",
    "inputs": [{ "source": "pump1" }],
    "init": {
        "use_qewd": true,
        "output_template": {
            "ts": "",
            "tags": { "composite": "-" },
            "values": { "amount": "" }
        }
    }
}

This bolt will, upon receiving a new message like this one:

{
    "ts": "2017-10-01",
    "country": "SI",
    "user": { "browser": "Chrome", },
    "amount": 123.45,
    "duration": 432
}

Emit a new message like this:

{
    "ts": "2017-10-01",
    "tags": { "composite": "SI-Chrome" },
    "values": { "amount": 123.45 }
}

Console bolt

cmd="console"

This bolt just logs every incoming message to stdout and forwards it on to listeners.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "console",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {}
}

Counter bolt

cmd="counter"

This bolt counts incoming messages and output single line of code to log in predefined intervals. The line displays the number of received messages since last display.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "counter",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "timeout": 5000
    }
}

Filter bolt

cmd="filter"

This bolt filters incoming messages and only forwards the ones that pass its filter.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "filter",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "filter": {
            "my_field1": "target_value1",
            "my_field2": ["target_value2", "target_value3"],
            "my_field3": {
                "$like": "regex expression"
            }
        }
    }
}

Forward bolt

cmd="forward"

This bolt forwards all incoming messages to all listeners. Useful for collecting many inputs and for broadcasting to many listeners.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "filter",
    "inputs": [
        { "source": "pump1" },
        { "source": "pump2" }
    ],
    "init": {}
}

File-append bolt

cmd="file_append"

This bolt attaches incoming messages as JSON and appends them to output file.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "file_append",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "file_name_template": "./logs/log.txt"
    }
}

This example will log all incoming data into file log.txt inside logs subdirectory of current directory. The directory must already exist.

Timestamp

You can instruct the bolt to prepend timestamp (in local timezone using ISO format), by setting option prepend_timestamp to true. The new line in the log file will look something like this:

2017-05-14T15:52:07.341 {"ts":"2017-03-16T12:36:33.952Z","a":14.3}

File splitting

The log file can be huge, so the bolt provides option to split files after each time interval. Just set option split_over_time to true and split_period to number of milliseconds that you want to use (3600000 means one hour). The bolt will create new file for each interval (as long as there is some data to write). The timestamp of the interval will be injected into the name of the file.

For example, setting options to

{
    "file_name_template": "./log.txt",
    "split_over_time": true,
    "split_period": 3600000
}

will write data into files with names like:

Splitting by field value

If you want to send data to different file depending on the value of single field, you can set split_by_field settings

For example, setting options to

{
    "file_name_template": "./log.txt",
    "split_over_time": true,
    "split_period": 3600000,
    "split_by_field": "server"
}

and processing the data such as

{ "server": "server1" }
{ "server": "server2" }
{ "server": "server3" }

will write data into files with names like:

Values of this field must be filename-friendly!

Delete existing file

You can instruct the bolt to delete existing file at startup by setting option delete_existing to true. The initialization options in the config file will look something like this:

{
    "file_name_template": "./log.txt",
    "delete_existing": true
}

This option only works when split_over_time is set to false or skipped.

Compressing older files

You can instruct the bolt to compress older files by setting option compress to true. The initialization options in the config file will look something like this:

{
    "file_name_template": "./log.txt",
    "split_over_time": true,
    "split_period": 5000,
    "compress": true
}

This will cause the files to be zipped like this:

Only the current file (the one that we are still writing into) will not be compressed. The latest file will be compressed upon shutdown.

This option only works when split_over_time is set to true.

If there already exists a gzipped file with the same name (e.g. log_2017_05_15T13:00:00.txt_0.gz), a new file with an increased postfix will be created (e.g. log_2017_05_15T13:00:00.txt_1.gz).

File-append bolt extended

cmd="file_append_ex"

This bolt is in essence very similar to the file_append bolt type, but with the following differences:

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "file_append_ex",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "file_name_template": "./log.txt",
        "split_period": 60000,
        "split_by_field": "server",
        "timestamp_field": "ts"
    }
}

CSV file-append bolt

cmd=file_append_csv

This bolt attaches incoming messages as JSON and appends them to output file in CSV format.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "file_append_csv",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "file_name": "./logs/log.txt",
        "delete_existing": true,
        "delimiter": ",",
        "fields": ["ts", "tags.name", "values.value1"],
        "header": "ts,name,value"
    }
}

This example will log all incoming data into file log.txt inside logs subdirectory of current directory. The directory must already exist.

The output will be written in CSV format. The parameters are the following:

Router bolt

cmd="router"

This bolt routes incoming messages to separate stream ids, depending on the route filters. If filters for several routes succeed, the message is sent to all stream ids.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "router",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "routes": {
            "stream1": {
                "field1": "a"
            },
            "stream2": {
                "field1": "b"
            },
            "stream3": {
                "field1": ["a", "c"]
            }
        }
    }
}

POST bolt

cmd="post"

This bolt sends POST request to specified url (fixed or provided inside data) and forwards the response. The result of the call will be emitted into topology, with a single property body that will contain the body of the response.

Sending using fixed URL

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "post",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "url": "http://my.server.com/api"
    }
}

This way the bolt will send complete, unmodified incoming request to specified URL.

Sending using dynamic URL

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "post",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": { }
}

This way the bolt will send the body property of the incomming message to the URL, specified by the url property:

{
    "url": "http://my.server.com/api2",
    "body": {
        "a": true,
        "b": 12
    }
}

GET bolt

cmd="get"

This bolt sends GET request without any request-body to a specified url (fixed or provided inside data) and forwards the response.

The result of the call will be emitted into topology, with a single property body that will contain the body of the response.

GET bolt - Sending using fixed URL

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "get",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": {
        "url": "http://my.server.com/api"
    }
}

GET bolt - Sending using dynamic URL

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "post",
    "inputs": [
        { "source": "pump1" }
    ],
    "init": { }
}

This way the bolt will send the GET request to the URL, specified by the url property:

{
    "url": "http://my.server.com/api2"
}

GET spout

cmd="get"

This spout sends GET request without any request-body to a specified url (fixed or provided inside data) in regualr time intervals and forwards the response.

The result of the call will be emitted into the topology after each time interval (the repeat parameter), with a single property body that will contain the body of the response.

{
    "name": "spout1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "get",
    "init": {
        "url": "http://my.server.com/api",
        "repeat": 60000,
        "stream_id": "SomeStreamId"
    }
}

RSS spout

cmd="rss"

This spout retrieves content of specified RSS feed in regular time intervals and forwards the parsed response.

The result of the call will be emitted into the topology after each time interval (the repeat parameter).

{
    "name": "spout1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "rss",
    "init": {
        "url": "http://my.source.com/rss",
        "repeat": 60000,
        "stream_id": "SomeStreamId"
    }
}

Bomb bolt

This bolt is used for testing the disaster-recorvery of the topology. It causes an exception that propagates to the root of the process after predefined time interval.

{
    "name": "bolt1",
    "working_dir": ".",
    "type": "sys",
    "cmd": "bomb",
    "inputs": [],
    "init": {
        "explode_after": 10000
    }
}

This bolt will cause an exception after 10 seconds after it’s init method was called.

Bolt can have inputs and it will just forward the data on to listeners, preserving the stream_id.

Task-bolt base

This bolt-base is used for bolts that need to execute some functionality on predefioned intervals (similar to CRON jobs). Developer needs to overwrite runInternal method to execute the custom functionality, other features aretaken care of by the base class - e.g. parsing of settings, calling the custom code in predefined intervals, shutdown handling etc.

Custom code:

const qt = require("qtopology");

class CustomTaskBolt extends qt.TaskBoltBase {

    constructor() {
        super();
        this.custom_text = null;
    }

    init(name, config, context, callback) {
        let self = this;
        super.init(name, config, context, (err) => {
            if (err) return callback(err);
            self.custom_text = config.text;
            callback();
        })
    }

    runInternal(callback) {
        let self = this;
        console.log("Custom output from task bolt (1): " + self.custom_text);
        callback();
    }
}

/////////////////////////////////////////////////////////
exports.create = function () {
    return new CustomTaskBolt();
};

Bolt configuration inside the topology:

{
    "name": "task_bolt",
    "working_dir": ".",
    "type": "inproc",
    "cmd": "custom_task.js",
    "inputs": [],
    "init": {
        "repeat_after": 5000,
        "text": "Some custom text from config"
    }
}