blob: b0b5d5146c236b7e335af5478507387dd504273f [file] [log] [blame]
Timoney, Daniel (dt5972)324ee362017-02-15 10:37:53 -05001/**
2 * Copyright 2014 IBM Corp.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 **/
16
17var util = require("util");
18var when = require("when");
19
20var typeRegistry = require("./registry");
21var credentials = require("./credentials");
22var log = require("../log");
23var events = require("../events");
24
25var storage = null;
26
27var nodes = {};
28var activeConfig = [];
29var missingTypes = [];
30
31events.on('type-registered',function(type) {
32 if (missingTypes.length > 0) {
33 var i = missingTypes.indexOf(type);
34 if (i != -1) {
35 missingTypes.splice(i,1);
36 util.log("[red] Missing type registered: "+type);
37 if (missingTypes.length === 0) {
38 parseConfig();
39 }
40 }
41 }
42});
43
44/**
45 * Parses the current activeConfig and creates the required node instances
46 */
47function parseConfig() {
48 var i;
49 var nt;
50 missingTypes = [];
51
52 // Scan the configuration for any unknown node types
53 for (i=0;i<activeConfig.length;i++) {
54 var type = activeConfig[i].type;
55 // TODO: remove workspace in next release+1
56 if (type != "workspace" && type != "tab") {
57 nt = typeRegistry.get(type);
58 if (!nt && missingTypes.indexOf(type) == -1) {
59 missingTypes.push(type);
60 }
61 }
62 }
63 // Abort if there are any missing types
64 if (missingTypes.length > 0) {
65 util.log("[red] Waiting for missing types to be registered:");
66 for (i=0;i<missingTypes.length;i++) {
67 util.log("[red] - "+missingTypes[i]);
68 }
69 return;
70 }
71
72 util.log("[red] Starting flows");
73 events.emit("nodes-starting");
74
75 // Instantiate each node in the flow
76 for (i=0;i<activeConfig.length;i++) {
77 var nn = null;
78 // TODO: remove workspace in next release+1
79 if (activeConfig[i].type != "workspace" && activeConfig[i].type != "tab") {
80 nt = typeRegistry.get(activeConfig[i].type);
81 if (nt) {
82 try {
83 nn = new nt(activeConfig[i]);
84 }
85 catch (err) {
86 util.log("[red] "+activeConfig[i].type+" : "+err);
87 }
88 }
89 // console.log(nn);
90 if (nn === null) {
91 util.log("[red] unknown type: "+activeConfig[i].type);
92 }
93 }
94 }
95 // Clean up any orphaned credentials
96 credentials.clean(flowNodes.get);
97 events.emit("nodes-started");
98}
99
100/**
101 * Stops the current activeConfig
102 */
103function stopFlows() {
104 if (activeConfig&&activeConfig.length > 0) {
105 util.log("[red] Stopping flows");
106 }
107 return flowNodes.clear();
108}
109
110var flowNodes = module.exports = {
111 init: function(_storage) {
112 storage = _storage;
113 },
114
115 /**
116 * Load the current activeConfig from storage and start it running
117 * @return a promise for the loading of the config
118 */
119 load: function() {
120 return storage.getFlows().then(function(flows) {
121 return credentials.load().then(function() {
122 activeConfig = flows;
123 if (activeConfig && activeConfig.length > 0) {
124 parseConfig();
125 }
126 });
127 }).otherwise(function(err) {
128 util.log("[red] Error loading flows : "+err);
129 });
130 },
131
132 /**
133 * Add a node to the current active set
134 * @param n the node to add
135 */
136 add: function(n) {
137 nodes[n.id] = n;
138 n.on("log",log.log);
139 },
140
141 /**
142 * Get a node
143 * @param i the node id
144 * @return the node
145 */
146 get: function(i) {
147 return nodes[i];
148 },
149
150 /**
151 * Stops all active nodes and clears the active set
152 * @return a promise for the stopping of all active nodes
153 */
154 clear: function() {
155 return when.promise(function(resolve) {
156 events.emit("nodes-stopping");
157 var promises = [];
158 for (var n in nodes) {
159 if (nodes.hasOwnProperty(n)) {
160 try {
161 var p = nodes[n].close();
162 if (p) {
163 promises.push(p);
164 }
165 } catch(err) {
166 nodes[n].error(err);
167 }
168 }
169 }
170 when.settle(promises).then(function() {
171 events.emit("nodes-stopped");
172 nodes = {};
173 resolve();
174 });
175 });
176 },
177
178 /**
179 * Provides an iterator over the active set of nodes
180 * @param cb a function to be called for each node in the active set
181 */
182 each: function(cb) {
183 for (var n in nodes) {
184 if (nodes.hasOwnProperty(n)) {
185 cb(nodes[n]);
186 }
187 }
188 },
189
190 /**
191 * @return the active configuration
192 */
193 getFlows: function() {
194 return activeConfig;
195 },
196
197 /**
198 * Sets the current active config.
199 * @param config the configuration to enable
200 * @return a promise for the starting of the new flow
201 */
202 setFlows: function (config) {
203 // Extract any credential updates
204 for (var i=0; i<config.length; i++) {
205 var node = config[i];
206 if (node.credentials) {
207 credentials.extract(node);
208 delete node.credentials;
209 }
210 }
211 return credentials.save()
212 .then(function() { return storage.saveFlows(config);})
213 .then(function() { return stopFlows();})
214 .then(function () {
215 activeConfig = config;
216 parseConfig();
217 });
218 },
219 stopFlows: stopFlows
220};