JMS Integration

Bpipe supports some integration with JMS, currently through ActiveMQ and Amazon AQS SQS.

This support comes in the form of both inbound and outbound messages:

  • Pipelines can be configured to run on receipt of an inbound message
  • Messages can be sent by the Send Command command.

The Bpipe Agent

To run pipelines in response to messages, configure a Bpipe Agent.

The agent runs permanently in the background on a system and listens to the configured queue. The agent is configured through the usual bpipe.config file, with a special agent section. For example for ActiveMQ:

agent {
    commandQueue='run_pipeline_queue'
    responseQueue='bpipe_results'
    brokerURL='tcp://activemq.server.com:61616'
}

Once you have set up the configuration, you can start the agent in the local directory like so:

bpipe agent -v -n 1

The above arguments turn on verbose mode (-v) and limit the number of messages to process at once to 1 (-n 1).

The agent expects messages to arrive in a specific format containing instructions to define the bpipe command to execute. The format is a JSON payload with the following elements:

    {
        "id": 0,
        "command": <bpipe command>
        "arguments": [
            <argument 1>
            <argument 2>,
            ....
        ],
        "directory": <directory to run command in>
    }

The id in the command is for your own reference and can be hard coded to 0 if you do not need it.

An example of a run command could look like this:

    {
        "id": 0,
        "command": "run",
        "arguments": [
            "test.groovy", 
            "test.txt",
            "test2.txt"
        ] ,
        "directory": "/home/user/test_pipelines"
    }

Transforming Inbound Messages

If you need to have Bpipe respond to messages that are not in this format, you can transform the message with an adapter that is defined in terms of a Closure (or function) that does the transformation, using the transform configuration attribute inside the agent configuration section . For example:

    transform = { message ->
        [
            id: 0,
            command: "run",
            arguments: [
                "-p",
                "sample=$message.sample",
                "-p",
                "batch=$message.batch",
                "-p",
                "run_id=$message.run_id",
                "process_sample.groovy",
                message.sample
            ],

            directory: '/usr/local/batch_processing'
        ]
    }

Note that the message payload is still expected to be JSON and is pre-parsed into a Java object structure (Map, List, etc). The transform needs to produce a Map / List structure identical to the JSON format of the standard message, however it is defined using Groovy code.

Monitoring Status - Ping Message

The Bpipe agent recognises a special message where the body is composed only of the word ping. When this message is received, it will respond by sending a message to the queue specified in the JMSReplyTo or reply-to header. This message will have a JSON body containing information about the agent, and receiving it can be used to confirm that the agent is alive and processing messages.

Using Response Messages

If you are integrating Bpipe into a larger system, it is likely that you will want to know the status of the pipeline that ran (did it succeed, fail, where is it up to, etc).

To support these use cases, the Bpipe Agent will send messages to a "response queue". This can be set to a fixed value in the agent configuration:

agent {
    commandQueue='run_pipeline_queue'
    responseQueue='bpipe_results' // send output to JMS queue called bpipe_results
    brokerURL='tcp://activemq.server.com:61616'
}

Alternatively, Bpipe will respect the JMS Reply-To header ("reply-to" or "JMSReplyTo") so another strategy is to leave this blank and configure the reply queue on the appropriate messages. This latter strategy has the advantage that you can then use different response queues for different messages.

Reply Modes

The Bpipe Agent supports several "reply modes" which contain different amounts of detail. To set these, configure the outputMode setting in the agent bpipe.config file:

agent {
    commandQueue='run_pipeline_queue'
    responseQueue='bpipe_results' 
    brokerURL='tcp://activemq.server.com:61616'
    outputMode="stream"

}

The outputMode can be set to the following values:

  • none : do not send anything
  • stream : send full pipeline output, streamed in chunks
  • reply : send a single message at the end, containing status
  • both : send both streaming output and a final reply message

Handling Pipeline Completion

It can be useful to coordinate downstream actions when the pipeline completes running. For this purpose, Bpipe will observe the reply-to or JMSReplyTo property of messages. When a pipeline initiated by the agent completes, if one of these properties is set, Bpipe will send a message to the corresponding queue as a reply. In such a message, if a correlation id is set, then the message will have the same correlation id.

This capability is designed to interoperate with frameworks such as Apache Camel which can route messages through predefined workflows using this system. For example, a Camel route could be defined using the Groovy DSL to run a pipeline in response to a message and then process the results:

from('activemq:analyse_file')
.transform { e, c ->
    groovy.json.JsonOutput.toJson(
        "command" : "run",
        "arguments": [
            "pipeline/batch.groovy",
            e.in.body // the file to analyse
        ] + 
        "directory": "/some/path/on/your/system"
    )
}
.inOut()
    .to('activemq:run_bpipe?requestTimeout=720000') // 2 hour timeout
.inOnly()
    .process { e ->
        println "The results from the pipeline were: $e.body.in"
    }

Note that the inOut automatically handles the correlation id and reply-to headers and waits for the reply. The bpipe agent, in this case, would be configured to listen on the run_bpipe queue.

Configuring Security

You can cause Bpipe to authenticate using a username and password when creating the connection by adding these properties to the configuration:

agent {
    commandQueue='run_pipeline_queue'
    responseQueue='bpipe_results'
    brokerURL='tcp://activemq.server.com:61616'
    username='myuser'
    passsword='secretpassword'
}

If you prefer not to hard code the password into your configuration, you can use regular Groovy language features to resolve it an alternative way. For example, to read it from an environment variable:

agent {
    ....
    passsword=System.getenv('ACTIVEMQ_PASSWORD')
}

The same configuration properties are applicable when configuring ActiveMQ as a notification channel.

AWS SQS

SQS is an AWS hosted queuing service that enables smooth cloud integration with messaging services. Bpipe supports notifications and the Bpipe agent connection through SQS.

To configure notifications to SQS, add an AWSSQS section to the notifications configuration in the bpipe.config file:

notifications {
    AWSSQS {
        queue='my-test-queue'
        region='ap-southeast-2'
        accessKey = "..."
        accessSecret = "..."
        events=''
    }
}

Similarly, to run the agent using SQS, specify the type as sqs in the agent configuration block:

agent {
    type='sqs'
    commandQueue='my-test-queue'
    region='ap-southeast-2'
    accessKey = "..."
    accessSecret = "..."
}

Note: for SQS integration, an AWS profile name can be specified instead of directly including the key and secret. This will cause Bpipe to attempt to read the user's ~/.aws/credentials file to locate the corresponding profile. To do this, specify a profile attribute instead of the accessKey and accessSecret details.

Example:

agent {
    type='sqs'
    commandQueue='my-test-queue'
    region='ap-southeast-2'
    profile='default'
}

Listening to Multiple Queues

You may wish to have a single agent listen to more than one queue, and have these process jobs independently. You can achieve this by adding an agents block in the configuration and then adding each agent you wish to configure.

Concurrency can be configured seperately for each agent using a concurrency setting.

Here is an example of an Agent that is configured to listen to two queues, one which is treated as a priority and has a higher concurrency:

agents {

    main {
        commandQueue='run_pipeline'
        brokerURL='tcp://activemq.server.com:61616'
        concurrency=2
    }

    priority {
        commandQueue='priority_pipeline'
        brokerURL='tcp://activemq.server.com:61616'
        concurrency=4
    }
}

Selectors

You can perform content-based routing for your agent by using JMS "message selectors". This allows you to select which messages to respond to using SQL-like syntax that is applied to headers, properties and other attributes of the message.

This example shows how we can achieve a similar result to the last example but using only a single queue to process messages at two different priority levels. The different priorities select on the standard JMSType header which can be set by the sender.

agents {

    def defaults = {
        commandQueue='run_analysis'
        brokerURL='tcp://activemq.server.com:61616'
    }

    main {
        defaults()
        messageSelector = "JMSType = 'normal'"
    }

    priority {
        defaults()
        concurrency=4
        messageSelector = "JMSType = 'priority'"
    }
}

Specifying the Path to Bpipe

By default, the agent runs commands using the same Bpipe installation that launched the agent. You can change it to use a different one on a per-agent basis using the bpipeHome setting:

agents {

    main {
        commandQueue='run_pipeline'
        brokerURL='tcp://activemq.server.com:61616'
        messageSelector = "JMSType = 'main'"
    }

    legacy {
        commandQueue='run_pipeline'
        brokerURL='tcp://activemq.server.com:61616'
        messageSelector = "JMSType = 'legacy'"
        bpipeHome = '/opt/bpipe/0.9.1'
    }
}

This example supports a "legacy" option that uses an old version of Bpipe when a header is set to "legacy".