ADVANCED
Message Queue Integration
Test message queue systems like ActiveMQ, RabbitMQ, and JMS using Karate's listen keyword and karate.signal() for async event handling. The pattern works with any queue system through custom Java wrappers.
On this page:
- Listen and Signal - Core async pattern for queue testing
- Queue Consumer Pattern - Java wrapper integration
- Message Filtering - Handle specific messages
- Correlation - Track related messages
- Timeouts - Handle missing messages
Listen and Signal
The listen keyword waits for an async event, and karate.signal() triggers completion. The listenResult variable holds the signaled value.
Feature: Basic async pattern
Scenario: Wait for async event
# Setup handler that signals when message arrives
* def handler = function(msg){ karate.signal(msg) }
# Trigger some async operation
Given url 'https://jsonplaceholder.typicode.com'
And path 'posts'
And request { title: 'Test', body: 'Content', userId: 1 }
When method post
Then status 201
# Wait for async event (timeout in milliseconds)
* listen 5000
* def result = listenResult
* match result == '#notnull'
The listen keyword blocks until either:
karate.signal(value)is called (from any thread)- The timeout expires (listenResult will be
null)
Queue Consumer Pattern
The core pattern involves a custom Java class that wraps your message queue client. The Java class calls back into Karate when messages arrive.
Feature: ActiveMQ queue testing
Background:
# Load custom Java queue consumer class
* def QueueConsumer = Java.type('mock.contract.QueueConsumer')
* def queue = new QueueConsumer(queueName)
# Create handler that signals Karate when message arrives
* def handler = function(msg){ karate.signal(msg) }
# Register handler - karate.toJava() converts JS function to Java Function
* queue.listen(karate.toJava(handler))
* url paymentServiceUrl + '/payments'
Scenario: Create payment and verify queue message
Given request { amount: 5.67, description: 'test one' }
When method post
Then status 200
And match response == { id: '#number', amount: 5.67, description: 'test one' }
And def id = response.id
# Wait for message on queue
* listen 5000
* json shipment = listenResult
* match shipment == { paymentId: '#(id)', status: 'shipped' }
JavaScript functions must be wrapped with karate.toJava() when passed to Java code. This converts the JS function to a Java Function interface that your queue consumer can call.
Java Consumer Wrapper
Your Java wrapper class handles the queue-specific connection logic and calls the handler function when messages arrive:
public class QueueConsumer {
private final MessageConsumer consumer;
public QueueConsumer(String queueName) throws Exception {
// Queue-specific connection setup
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
this.consumer = session.createConsumer(queue);
}
public void listen(Function<String, Void> handler) {
consumer.setMessageListener(msg -> {
try {
String text = ((TextMessage) msg).getText();
handler.apply(text); // Calls back to karate.signal()
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
For better performance with multi-threaded code, consider using a Java factory method that returns a Function directly instead of wrapping JavaScript functions:
public static Function<String, Void> createHandler(Object karateSignal) {
return msg -> { /* call karate.signal */ return null; };
}
Message Filtering
Filter messages in your handler to wait for specific events:
Feature: Filtered message handling
Background:
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def orderQueue = new QueueConsumer('order-events')
* def targetOrderId = null
# Handler filters for specific order ID
* def orderHandler = function(msg) {
var order = JSON.parse(msg);
if (order.id == targetOrderId) {
karate.signal(order);
}
}
* orderQueue.listen(karate.toJava(orderHandler))
Scenario: Wait for specific order event
# Create order via API
Given url apiUrl + '/orders'
And request { customerId: 123, items: [{ sku: 'WIDGET-001', quantity: 2 }] }
When method post
Then status 201
* targetOrderId = response.id
# Wait for matching order event
* listen 10000
* def orderEvent = listenResult
* match orderEvent.id == targetOrderId
* match orderEvent.status == 'PENDING'
Message Correlation
Track related messages across distributed workflows using correlation IDs:
Feature: Correlated message handling
Background:
* def correlationId = java.util.UUID.randomUUID().toString()
* def messages = []
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new QueueConsumer('workflow-events')
# Collect all messages with matching correlation ID
* def correlationHandler = function(msg) {
var event = JSON.parse(msg);
if (event.correlationId == correlationId) {
messages.push(event);
if (messages.length >= 3) {
karate.signal(messages);
}
}
}
* consumer.listen(karate.toJava(correlationHandler))
Scenario: Multi-step workflow with correlation
# Start workflow with correlation ID
Given url apiUrl + '/workflows'
And request { correlationId: '#(correlationId)', steps: ['validate', 'process', 'notify'] }
When method post
Then status 202
# Wait for all correlated events
* listen 20000
* def workflowEvents = listenResult
* match workflowEvents == '#[3]'
* match each workflowEvents[*].correlationId == correlationId
Handling Timeouts
Always check for null when listen times out:
Feature: Timeout handling
Scenario: Handle missing message
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def queue = new QueueConsumer('test-queue')
* def handler = function(msg){ karate.signal(msg) }
* queue.listen(karate.toJava(handler))
# Trigger async operation
Given url apiUrl + '/trigger'
When method post
# Wait with timeout
* listen 5000
# Check if message arrived
* if (listenResult == null) karate.fail('Timeout waiting for queue message')
# Safe to use result
* match listenResult.status == 'completed'
Timeout Guidelines
| Processing Type | Recommended Timeout |
|---|---|
| Real-time notifications | 2-5 seconds |
| Order processing | 10-15 seconds |
| Heavy processing (video, reports) | 30-60 seconds |
| Batch jobs | 2-5 minutes |
Dead Letter Queue Testing
Verify failed messages route to dead letter queues:
Feature: Dead letter queue validation
Background:
* def DLQConsumer = Java.type('com.mycompany.QueueConsumer')
* def dlq = new DLQConsumer('DLQ.failed-messages')
* def dlqHandler = function(msg){ karate.signal(JSON.parse(msg)) }
* dlq.listen(karate.toJava(dlqHandler))
Scenario: Invalid message goes to DLQ
# Send invalid message that will fail processing
Given url apiUrl + '/messages'
And request { invalid: 'data', missing: 'required-fields' }
When method post
Then status 202
# Verify message ends up in DLQ
* listen 15000
* def dlqMessage = listenResult
* match dlqMessage.error == '#string'
* match dlqMessage.originalMessage == '#present'
Test Isolation
Use unique queue names per test to prevent interference:
Feature: Isolated queue tests
Background:
# Unique queue name per test run
* def testId = java.util.UUID.randomUUID().toString()
* def queueName = 'test-queue-' + testId
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new QueueConsumer(queueName)
* def handler = function(msg){ karate.signal(msg) }
* consumer.listen(karate.toJava(handler))
Scenario: Isolated test
Given url apiUrl + '/send-to-queue'
And request { queueName: '#(queueName)', message: 'test data' }
When method post
* listen 5000
* match listenResult == 'test data'
Next Steps
- WebSocket testing: WebSocket Testing
- Async polling patterns: Polling and Async
- Java integration: Java API
- Conditional logic: Conditional Logic