blob: 0e6fc5259f92da4a7b5523e2b8954e5ece77b16e [file] [log] [blame]
Chinthakayala, Sheshashailavas (sc2914)d1569972017-08-28 05:25:46 -09001/**
2 * Copyright 2014 IBM Corp.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 **/
16
17var util = require("util");
18var EventEmitter = require("events").EventEmitter;
19var clone = require("clone");
20var when = require("when");
21
22var flows = require("./flows");
23var comms = require("../comms");
24
25function Node(n) {
26 this.id = n.id;
27 flows.add(this);
28 this.type = n.type;
29 if (n.name) {
30 this.name = n.name;
31 }
32 this.wires = n.wires||[];
33}
34
35util.inherits(Node,EventEmitter);
36
37Node.prototype._on = Node.prototype.on;
38
39Node.prototype.on = function(event,callback) {
40 var node = this;
41 if (event == "close") {
42 if (callback.length == 1) {
43 this.close = function() {
44 return when.promise(function(resolve) {
45 callback.call(node,function() {
46 resolve();
47 });
48 });
49 }
50 } else {
51 this.close = callback;
52 }
53 } else {
54 this._on(event,callback);
55 }
56}
57
58Node.prototype.close = function() {
59}
60
61Node.prototype.send = function(msg) {
62 // instanceof doesn't work for some reason here
63 if (msg == null) {
64 return;
65 } else if (!util.isArray(msg)) {
66 msg = [msg];
67 }
68 for (var i=0;i<this.wires.length;i++) {
69 var wires = this.wires[i];
70 if (i < msg.length) {
71 if (msg[i] != null) {
72 var msgs = msg[i];
73 if (!util.isArray(msg[i])) {
74 msgs = [msg[i]];
75 }
76 //if (wires.length == 1) {
77 // // Single recipient, don't need to clone the message
78 // var node = flows.get(wires[0]);
79 // if (node) {
80 // for (var k in msgs) {
81 // var mm = msgs[k];
82 // node.receive(mm);
83 // }
84 // }
85 //} else {
86 // Multiple recipients, must send message copies
87 for (var j=0;j<wires.length;j++) {
88 var node = flows.get(wires[j]);
89 if (node) {
90 for (var k=0;k<msgs.length;k++) {
91 var mm = msgs[k];
92 // Temporary fix for #97
93 // TODO: remove this http-node-specific fix somehow
94 var req = mm.req;
95 var res = mm.res;
96 delete mm.req;
97 delete mm.res;
98 var m = clone(mm);
99 if (req) {
100 m.req = req;
101 mm.req = req;
102 }
103 if (res) {
104 m.res = res;
105 mm.res = res;
106 }
107 node.receive(m);
108 }
109 }
110 }
111 //}
112 }
113 }
114 }
115}
116
117Node.prototype.receive = function(msg) {
118 this.emit("input",msg);
119}
120
121function log_helper(self, level, msg) {
122 var o = {level:level, id:self.id, type:self.type, msg:msg};
123 if (self.name) {
124 o.name = self.name;
125 }
126 self.emit("log",o);
127}
128
129Node.prototype.log = function(msg) {
130 log_helper(this, 'log', msg);
131}
132
133Node.prototype.warn = function(msg) {
134 log_helper(this, 'warn', msg);
135}
136
137Node.prototype.error = function(msg) {
138 log_helper(this, 'error', msg);
139}
140
141/**
142 * status: { fill:"red|green", shape:"dot|ring", text:"blah" }
143 */
144Node.prototype.status = function(status) {
145 comms.publish("status/"+this.id,status,true);
146}
147module.exports = Node;