-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathevent-queue.js
More file actions
362 lines (310 loc) · 10.3 KB
/
event-queue.js
File metadata and controls
362 lines (310 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
'use strict'
var EventEmitter = require('events'),
Promise = require('bluebird'),
DocloopError = require('./docloop-error-handling.js').DocloopError,
manageCalls = require('./manage-calls.js')
/**
* Class representing a queued event.
* Instances of this class will be created by an {@link module:docloop.EventQueue EventQueue}.
*
* @memberof EventQueue
*
* @inner
*
* @param {module:docloop.EventQueue} queue The event queue this event is queued in.
* @param {Object} data
* @param {Object} [data.event={}] Sets data property.
* @param {String} data.eventName Sets eventName property.
* @param {lastAttempt} [data.lastAttempt=0] Sets lastAttempt property.
* @param {attempts} [data.attempts=0] Sets attempts property.
*
* @property {Object} event Data of the original event.
* @property {String} eventName Name of the original event.
* @property {lastAttempt} lastAttempt Timestamp of the last attempt to process this event.
* @property {attempts} attempts Number of attempts.
*/
class QueuedEvent {
constructor(queue, data){
if(!(queue instanceof EventQueue)) throw new TypeError('QueuedEvent.constructor(): queue must be and EventQueue. '+queue)
this.queue = queue
this.event = data.event || {}
this.eventName = data.eventName
this.lastAttempt = data.lastAttempt || 0
this.attempts = data.attempts || 0
}
/**
* Stores the queued event to the database.
*
* @async
*
* @return {Object|Boolean} Result of db.collection.findOneAndReplace()
*/
store(){
return this.queue.collection.findOneAndReplace(
{
event: this.event,
eventName: this.eventName
},
{
event: this.event,
eventName: this.eventName,
lastAttempt: this.lastAttempt,
attempts: this.attempts
},
{ upsert:true }
)
}
/**
* Updates the event in the database, increasing the number of attempts by one and setting lastAttempt to now.
*
* @async
*
* @return {} undefined
*/
update(){
return this.queue.collection.findOneAndUpdate(
{
event: this.event,
eventName: this.eventName
},
{
'$set':{ lastAttempt: Date.now() },
'$inc':{ attempts: 1 }
},
{ returnNewDocument : true }
)
.then( result => result.value)
.then( event_data => {
this.lastAttempt = event_data.lastAttempt
this.attempts = event_data.attempts
})
}
/**
* Removes the queued event form the database.
*
* @async
*
* @return {Object} Result of db.collection.remove()
*/
remove(){
return this.queue.collection.remove({
event: this.event,
eventName: this.eventName
})
}
/**
* Emits an {@link module:docloop.EventQueue.event:-attempt -attempt event} and increases the number of attempts.
*
* @async
*
* @return {} undefined
*/
attempt(){
return Promise.resolve()
.then( () => this.update() )
.then( () => this.queue.emit(this.eventName+'-attempt', this) )
.then( () => undefined)
}
/**
* Emits an {@link module:docloop.EventQueue.event:-fail -fail event} and removes the queued event from the queue.
*
* @async
*
* @param {Error|String} reason Error or Reason that let to the abandonment.
*
* @return {} undefined
*/
abandon(reason){
this.reason = reason
return Promise.resolve()
.then( () => this.remove() )
.then( () => this.queue.emit(this.eventName+'-fail', this) )
.then( () => undefined)
}
/**
* Emits an {@link module:docloop.EventQueue.event:-done -done event} and removes the queued event from the queue.
*
* @async
*
* @return {} undefined
*/
checkOff(){
return Promise.resolve()
.then( () => this.remove() )
.then( () => this.queue.emit(this.eventName+'-done', this) )
.then( () => undefined)
}
}
/**
* An {@link module:docloop.EventQueue EventQueue} emits this event periodically for every due event.
*
* The event name will be prefixed with the wrapped event's name (e.g. 'my-event-attempt').
*
* @memberOf module:docloop.EventQueue
*
* @event -attempt
*
* @type {module:docloop.EventQueue~QueuedEvent}
*/
/**
* An {@link module:docloop.EventQueue EventQueue} emits this event when the wrapped event had not been checked off after all retries where spent.
*
* The event name will be prefixed with the wrapped event's name (e.g. 'my-event-fail').
*
* @memberOf module:docloop.EventQueue
*
* @event -fail
*
* @type {module:docloop.EventQueue~QueuedEvent}
*
*/
/**
* An {@link module:docloop.EventQueue EventQueue} emits this event when the wrapped event is checked off.
*
* The event name will be prefixed with the wrapped event's name (e.g. 'my-event-done').
*
* @memberOf module:docloop.EventQueue
*
* @event -done
*
* @type {module:docloop.EventQueue~QueuedEvent}
*
*/
/**
* Class representing a queue of events to be checked off or repeated if need be.
*
* Every event added to the qeue will be wrapped into a {@link module:docloop.EventQueue~QueuedEvent QueuedEvent}.
* And that in turn will be emitted periodically as {@link module:docloop.EventQueue.event:-attempt -attempt event}
* until it exceeds the maximal number of retries, is {@link module:docloop.EventQueue~QueuedEvent.abandon abandoned} for some other reason or is {@link module:docloop.EventQueue~QueuedEvent.checkOff checked off}.
*
* @alias EventQueue
*
* @memberof module:docloop
*
* @param {Object} config
* @param {Collection} config.collection MongoDb collection. Sets collection property.
* @param {Number|Number[]|Function} [config.delay] Sets delay property.
* @param {Number} [config.maxRetries=3] Sets maxRetries property.
* @param {Number} [config.processInterval] Milliseconds. Time until the qeue checks again for due events.
* @param {Number} [config.spread=1000] Sets spread property.
*
* @property {Collection} collection This is where the queue stores active events.
* @property {Number|Number[]|Function} [delay] Delay until the event will fire again if not checked off. If an array is provided the nth retry will be delayed for this.delay[n-1] milliseconds. If a function is provided the nth attempt will be delayed for this.delay(n) milliseconds. Defaults to: (attempts => Math.pow(10, attempts)*1000)
* @property {Number} [maxRetries=3] Number of retries until the event will be marked as failed. if this.delay is an array, this value defaults to this.delay.length.
* @property {Number} [spread=1000] Milliseconds. If multiple events are due to retrial, they wont fire all at once but will be spread with a fixed delay between each of them.
* @property {Number} processInterval Milliseconds. Time until the queue checks again for due events.
* @property {Timeout} timeout The Timeout object returned by setInterval for the periodical check.
*
*/
class EventQueue extends EventEmitter{
constructor(config){
if(!config) throw new ReferenceError("missing config")
if(!config.collection) throw new ReferenceError("missing collection")
super()
var defaults = {
collection: undefined,
delay: (attempts => Math.pow(10, attempts)*1000),
maxRetries: config.delay.length || 3,
processInterval: 10*60*1000,
spread: 1000
}
for(var key in defaults){
this[key] = config[key] != undefined ? config[key] : defaults[key]
}
this.timeout = undefined
//TODO: document
manageCalls.limitCalls(this, 'process', config.spread)
}
/**
* Starts looking periodically for due or failed events.
*
* @return {this}
*/
start(){
if(this.timeout) clearInterval(this.timeout)
this.process()
this.timeout = setInterval(this.process.bind(this), this.processInterval)
return this
}
/**
* Stops looking periodically for due or failed events.
*
* @return {this}
*/
stop(){
clearInterval(this.timeout)
return this
}
/**
* Removes all events from the queue without further notice.
*
* @async
*
*/
clear(){
return this.collection.remove({})
}
/**
* Adds an event to the queue. For every added event the queue will immediately emit an {@link module:docloop.EventQueue.event:attempt}.
*
* @async
*
* @param {String} event_name An event name
* @param {Object} [event={}] Data associated with the event
*
* @returns {module:docloop.EventQueue~QueuedEvent}
*/
add(event_name, event = {}){
var queued_event = new QueuedEvent(this, {eventName:event_name, event:event} )
return Promise.resolve()
.then( () => queued_event.store() )
.then( () => this.process() )
.then( () => queued_event)
}
/**
* Returns the delay for the nth retry. The 0th retry is considered the first attempt and has no delay.
*
* @param {Number} attempts Number of previous attempts.
* @return {Number|undefined}
*/
_getDelay(attempts){
if(attempts == 0) return 0
if(typeof this.delay == 'number') return this.delay
if(typeof this.delay == 'object') return this.delay[attempts-1]
if(typeof this.delay == 'function') return this.delay(attempts)
return undefined
}
/**
* Checks if events are due for retries or have failed.
*
* Every event that has more attempts than .maxRetries will be abandoned.
* Every event that is due will be attempted.
*
* @async
*
* @emits {@link module:docloop.EventQueue.event:-attempt -attempt}
* @emits {@link module:docloop.EventQueue.event:-fail -fail}
*
*/
process(){
var now = new Date().getTime()
console.log('process()...', now)
return Promise.map(
this.collection.find({}).toArray(),
event_data => new QueuedEvent(this, event_data)
)
.map( (queued_event, index) => {
var delta = now - queued_event.lastAttempt,
delay = this._getDelay(queued_event.attempts),
due = delta >= (delay||0),
failed = queued_event.attempts > this.maxRetries
if(failed) return Promise.delay(this.spread*index)
.then( () => queued_event.abandon(new DocloopError("EventQueue.process() queued event exceeded maxRetries.")) )
.then( () => false)
if(due) return Promise.delay(this.spread*index)
.then( () => queued_event.attempt() )
.then( () => true)
})
}
}
module.exports = EventQueue