var sys = require('util'); var stomp = require('stomp'); /* * Subscribe to 2 different topics on the same connection. */ var stomp_args = { port: 61613, host: process.argv[2] ? process.argv[2] : 'localhost', debug: false, login: "xtomp", passcode: "passcode", }; var testAMessage = "TestA\n" + process.env.TEST_DATE + "\n123"; var testBMessage = "TestB\n" + process.env.TEST_DATE + "\n123"; var errorHandler = function(error_frame) { console.log("error"); console.log("headers: " + sys.inspect(error_frame.headers)); console.log("body: " + error_frame.body); publisher.disconnect(); subscriber.disconnect(); setTimeout(function() { process.exit(1); }, 10); }; var subscriber = new stomp.Stomp(stomp_args); var publisher = new stomp.Stomp(stomp_args); // SUBSCRIBER - start subscriber.connect(); subscriber.on("connected", function() { subscriber.subscribe({ destination: "memtop-a", id : "1", ack: "client", receipt: "sub-a" }); subscriber.subscribe({ destination: "memtop-b", id : "2", ack: "client", receipt: "sub-b" }); }); subscriber.on("receipt", function(id) { if (id === "sub-a") { subscriber.subscribedA = true; } else if (id === "sub-b") { subscriber.subscribedB = true; } else if (id === "unsub") { subscriber.disconnect(); } else console.error("unexpected receipt"); }); subscriber.on("message", function(message) { if ( message.headers.destination === "memtop-a" && "" + message.body === testAMessage ) { subscriber.recvA = true; } if ( message.headers.destination === "memtop-b" && "" + message.body === testBMessage ) { subscriber.recvB = true; } if ( subscriber.recvA && subscriber.recvB ) { subscriber.disconnect(); setTimeout(function() { process.exit(0); }, 10); } }); subscriber.on("error", errorHandler); // SUBSCRIBER - end // PUBLISHER - start var publisher = new stomp.Stomp(stomp_args); publisher.connect(); publisher.on("connected", function() { if ( subscriber.subscribedA && subscriber.subscribedB ) { publisher.send({ "destination" : "memtop-a", "body" : testAMessage, "receipt" : "send-a" }); publisher.send({ "destination" : "memtop-b", "body" : testBMessage, "receipt" : "send-b" }); } else { setTimeout(function() { publisher.send({ "destination" : "memtop-a", "body" : testAMessage, "receipt" : "send-a" }); publisher.send({ "destination" : "memtop-b", "body" : testBMessage, "receipt" : "send-b" }); }, 100); } }); publisher.on("receipt", function(id) { if (id === "send-a") { publisher.sentA = true; } if (id === "send-a") { publisher.sentB = true; } if ( publisher.sentA && publisher.sentB ) { publisher.disconnect(); } }); publisher.on("error", errorHandler); // PUBLISHER - end process.on("SIGINT", function() { publisher.disconnect(); subscriber.disconnect(); }); setTimeout(function() { process.exit(1); }, 1000);