var sys = require('util'); var stomp = require('stomp'); /* * Test hash filter happy path, that messages are filtered correctly. */ var stomp_args = { port: 61613, host: process.argv[2] ? process.argv[2] : 'localhost', debug: false, login: "xtomp", passcode: "passcode", }; var destination = "memtop-f"; var testMessage1 = "Testing01"; var testMessage2 = "Testing02"; var errorHandler = function(error_frame) { console.log("error"); console.log("headers: " + sys.inspect(error_frame.headers)); console.log("body: " + error_frame.body); setTimeout(function() { publisher.disconnect(); subscriber1.disconnect(); subscriber2.disconnect(); process.exit(1); }, 2000); }; var subscriber1 = new stomp.Stomp(stomp_args); var subscriber2 = new stomp.Stomp(stomp_args); var publisher = new stomp.Stomp(stomp_args); // SUBSCRIBER1 - start subscriber1.connect(); subscriber1.on("connected", function() { subscriber1.subscribe({ destination: destination, id : "1", ack: "client", filter: "grp=1", receipt: "sub" }); }); subscriber1.on("receipt", function(id) { if (id === "sub") { subscriber1.subscribed = true; } else if (id === "unsub") { subscriber1.disconnect(); } else console.error("unexpected receipt"); }); subscriber1.on("message", function(message) { //console.log("grp=" + message.headers.grp); if (testMessage1 !== "" + message.body) { console.error("wrong msg"); console.error("want:" + testMessage1); console.error("got:" + message.body); errorHandler({}); } else { subscriber1.ack(message.headers["message-id"]); subscriber1.unsubscribe({ destination: destination, id : "1", receipt: "unsub" }); } }); subscriber1.on("error", errorHandler); // SUBSCRIBER1 - end // SUBSCRIBER2 - start subscriber2.connect(); subscriber2.on("connected", function() { subscriber2.subscribe({ destination: destination, id : "2", ack: "client", filter: "grp=2", receipt: "sub" }); }); subscriber2.on("receipt", function(id) { if (id === "sub") { subscriber2.subscribed = true; } else if (id === "unsub") { subscriber2.disconnect(); } else console.error("unexpected receipt"); }); subscriber2.on("message", function(message) { //console.log("grp=" + message.headers.grp); if (testMessage2 !== "" + message.body) { console.error("wrong msg"); console.error("want:" + testMessage2); console.error("got:" + message.body); errorHandler({}); } else { subscriber2.ack(message.headers["message-id"]); subscriber2.unsubscribe({ destination: destination, id : "2", receipt: "unsub" }); } }); subscriber2.on("error", errorHandler); // SUBSCRIBER2 - end // PUBLISHER - start var publisher = new stomp.Stomp(stomp_args); publisher.connect(); publisher.on("connected", function() { if (subscriber1.subscribed && subscriber2.subscribed) { publisher.send({ "destination" : destination, "grp" : "1", "body" : testMessage1, "receipt" : "send1" }); publisher.send({ "destination" : destination, "grp" : "2", "body" : testMessage2, "receipt" : "send2" }); } else { setTimeout(function() { publisher.send({ "destination" : destination, "grp" : "1", "body" : testMessage1, "receipt" : "send1" }); publisher.send({ "destination" : destination, "grp" : "2", "body" : testMessage2, "receipt" : "send2" }); }, 100); } }); publisher.on("receipt", function(id) { if (id === "send1") { publisher.sent1 = true; } else if (id === "send2") { publisher.sent2 = true; } else console.error("unexpected receipt"); if (publisher.sent1 && publisher.sent2) { //console.log("pub disconnet") publisher.disconnect(); } }); publisher.on("error", errorHandler); // PUBLISHER - end process.on("SIGINT", function() { publisher.disconnect(); subscriber1.disconnect(); subscriber2.disconnect(); });