blob: 5828c99610a905b604af149bfd09401447b39fae [file] [log] [blame]
Chinthakayala, Sheshashailavas (sc2914)d1569972017-08-28 05:25:46 -09001/**
2 * Copyright 2014 IBM Corp.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 **/
16
17var ws = require("ws");
18var util = require("util");
19
20var server;
21var settings;
22
23var wsServer;
24var activeConnections = [];
25
26var retained = {};
27
28var heartbeatTimer;
29var lastSentTime;
30
31
32function init(_server,_settings) {
33 server = _server;
34 settings = _settings;
35}
36
37function start() {
38
39 if (!settings.disableEditor) {
40 var webSocketKeepAliveTime = settings.webSocketKeepAliveTime || 15000;
41 var path = settings.httpAdminRoot || "/";
42 path = path + (path.slice(-1) == "/" ? "":"/") + "comms";
43 wsServer = new ws.Server({server:server,path:path});
44
45 wsServer.on('connection',function(ws) {
46 activeConnections.push(ws);
47 ws.on('close',function() {
48 for (var i=0;i<activeConnections.length;i++) {
49 if (activeConnections[i] === ws) {
50 activeConnections.splice(i,1);
51 break;
52 }
53 }
54 });
55 ws.on('message', function(data,flags) {
56 var msg = null;
57 try {
58 msg = JSON.parse(data);
59 } catch(err) {
60 util.log("[red:comms] received malformed message : "+err.toString());
61 return;
62 }
63 if (msg.subscribe) {
64 handleRemoteSubscription(ws,msg.subscribe);
65 }
66 });
67 ws.on('error', function(err) {
68 util.log("[red:comms] error : "+err.toString());
69 });
70 });
71
72 wsServer.on('error', function(err) {
73 util.log("[red:comms] server error : "+err.toString());
74 });
75
76 lastSentTime = Date.now();
77
78 heartbeatTimer = setInterval(function() {
79 var now = Date.now();
80 if (now-lastSentTime > webSocketKeepAliveTime) {
81 publish("hb",lastSentTime);
82 }
83 }, webSocketKeepAliveTime);
84 }
85}
86
87function stop() {
88 if (heartbeatTimer) {
89 clearInterval(heartbeatTimer);
90 }
91 if (wsServer) {
92 wsServer.close();
93 }
94}
95
96function publish(topic,data,retain) {
97 if (retain) {
98 retained[topic] = data;
99 } else {
100 delete retained[topic];
101 }
102 lastSentTime = Date.now();
103 activeConnections.forEach(function(conn) {
104 publishTo(conn,topic,data);
105 });
106}
107
108function publishTo(ws,topic,data) {
109 var msg = JSON.stringify({topic:topic,data:data});
110 try {
111 ws.send(msg);
112 } catch(err) {
113 util.log("[red:comms] send error : "+err.toString());
114 }
115}
116
117function handleRemoteSubscription(ws,topic) {
118 var re = new RegExp("^"+topic.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
119 for (var t in retained) {
120 if (re.test(t)) {
121 publishTo(ws,t,retained[t]);
122 }
123 }
124}
125
126
127module.exports = {
128 init:init,
129 start:start,
130 stop:stop,
131 publish:publish,
132}