]> git.madduck.net Git - etc/vim.git/blob - .vim/bundle/vim-lsp/autoload/lsp/callbag.vim

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Merge commit '294584081929424aec883f90c7d6515b3743358d' as '.vim/bundle/vim-lsp-ale'
[etc/vim.git] / .vim / bundle / vim-lsp / autoload / lsp / callbag.vim
1 " https://github.com/prabirshrestha/callbag.vim#82f96a7d97342fbf0286e6578b65a60f2bc1ce33
2 "    :CallbagEmbed path=autoload/lsp/callbag.vim namespace=lsp#callbag
3
4 let s:undefined_token = '__callbag_undefined__'
5 let s:str_type = type('')
6
7 function! lsp#callbag#undefined() abort
8     return s:undefined_token
9 endfunction
10
11 function! lsp#callbag#isUndefined(d) abort
12     return type(a:d) == s:str_type && a:d ==# s:undefined_token
13 endfunction
14
15 function! s:noop(...) abort
16 endfunction
17
18 function! s:createArrayWithSize(size, defaultValue) abort
19     let l:i = 0
20     let l:array = []
21     while l:i < a:size
22         call add(l:array, a:defaultValue)
23         let l:i = l:i + 1
24     endwhile
25     return l:array
26 endfunction
27
28 " pipe() {{{
29 function! lsp#callbag#pipe(...) abort
30     let l:Res = a:1
31     let l:i = 1
32     while l:i < a:0
33         let l:Res = a:000[l:i](l:Res)
34         let l:i = l:i + 1
35     endwhile
36     return l:Res
37 endfunction
38 " }}}
39
40 " operate() {{{
41 function! lsp#callbag#operate(...) abort
42     let l:data = { 'cbs': a:000 }
43     return function('s:operateFactory', [l:data])
44 endfunction
45
46 function! s:operateFactory(data, src) abort
47     let l:Res = a:src
48     let l:n = len(a:data['cbs'])
49     let l:i = 0
50     while l:i < l:n
51         let l:Res = a:data['cbs'][l:i](l:Res)
52         let l:i = l:i + 1
53     endwhile
54     return l:Res
55 endfunction
56 " }}}
57
58 " makeSubject() {{{
59 function! lsp#callbag#makeSubject() abort
60     let l:data = { 'sinks': [] }
61     return function('s:makeSubjectFactory', [l:data])
62 endfunction
63
64 function! s:makeSubjectFactory(data, t, d) abort
65     if a:t == 0
66         let l:Sink = a:d
67         call add(a:data['sinks'], l:Sink)
68         call l:Sink(0, function('s:makeSubjectSinkCallback', [a:data, l:Sink]))
69     else
70         let l:zinkz = copy(a:data['sinks'])
71         let l:i = 0
72         let l:n = len(l:zinkz)
73         while l:i < l:n
74             let l:Sink = l:zinkz[l:i]
75             let l:j = -1
76             let l:found = 0
77             for l:Item in a:data['sinks']
78                 let l:j += 1
79                 if l:Item == l:Sink
80                     let l:found = 1
81                     break
82                 endif
83             endfor
84
85             if l:found
86                 call l:Sink(a:t, a:d)
87             endif
88             let l:i += 1
89         endwhile
90     endif
91 endfunction
92
93 function! s:makeSubjectSinkCallback(data, Sink, t, d) abort
94     if a:t == 2
95         let l:i = -1
96         let l:found = 0
97         for l:Item in a:data['sinks']
98             let l:i += 1
99             if l:Item == a:Sink
100                 let l:found = 1
101                 break
102             endif
103         endfor
104         if l:found
105             call remove(a:data['sinks'], l:i)
106         endif
107     endif
108 endfunction
109 " }}}
110
111 " create() {{{
112 function! lsp#callbag#create(...) abort
113     let l:data = {}
114     if a:0 > 0
115         let l:data['prod'] = a:1
116     endif
117     return function('s:createProd', [l:data])
118 endfunction
119
120 function! s:createProd(data, start, sink) abort
121     if a:start != 0 | return | endif
122     let a:data['sink'] = a:sink
123     if !has_key(a:data, 'prod') || type(a:data['prod']) != type(function('s:noop'))
124         call a:sink(0, function('s:noop'))
125         call a:sink(2, lsp#callbag#undefined())
126         return
127     endif
128     let a:data['end'] = 0
129     call a:sink(0, function('s:createSinkCallback', [a:data]))
130     if a:data['end'] | return | endif
131     let a:data['clean'] = a:data['prod'](function('s:createNext', [a:data]), function('s:createError', [a:data]), function('s:createComplete', [a:data]))
132 endfunction
133
134 function! s:createSinkCallback(data, t, ...) abort
135     if !a:data['end']
136         let a:data['end'] = (a:t == 2)
137         if a:data['end'] && has_key(a:data, 'clean') && type(a:data['clean']) == type(function('s:noop'))
138             call a:data['clean']()
139         endif
140     endif
141 endfunction
142
143 function! s:createNext(data, d) abort
144     if !a:data['end'] | call a:data['sink'](1, a:d) | endif
145 endfunction
146
147 function! s:createError(data, e) abort
148     if !a:data['end'] && !lsp#callbag#isUndefined(a:e)
149         let a:data['end'] = 1
150         call a:data['sink'](2, a:e)
151     endif
152 endfunction
153
154 function! s:createComplete(data) abort
155     if !a:data['end']
156         let a:data['end'] = 1
157         call a:data['sink'](2, lsp#callbag#undefined())
158     endif
159 endfunction
160 " }}}
161
162 " lazy() {{{
163 function! lsp#callbag#lazy(F) abort
164     let l:data = { 'F': a:F }
165     return function('s:lazyFactory', [l:data])
166 endfunction
167
168 function! s:lazyFactory(data, start, sink) abort
169     if a:start != 0 | return | endif
170     let a:data['sink'] = a:sink
171     let a:data['unsubed'] = 0
172     call a:data['sink'](0, function('s:lazySinkCallback', [a:data]))
173     call a:data['sink'](1, a:data['F']())
174     if !a:data['unsubed'] | call a:data['sink'](2, lsp#callbag#undefined()) | endif
175 endfunction
176
177 function! s:lazySinkCallback(data, t, d) abort
178     if a:t == 2 | let a:data['unsubed'] = 1 | endif
179 endfunction
180 " }}}
181
182 " empty() {{{
183 function! lsp#callbag#empty() abort
184     let l:data = {}
185     return function('s:emptyStart', [l:data])
186 endfunction
187
188 function! s:emptyStart(data, start, sink) abort
189     if a:start != 0 | return | endif
190     let a:data['disposed'] = 0
191     call a:sink(0, function('s:emptySinkCallback', [a:data]))
192     if a:data['disposed'] | return | endif
193     call a:sink(2, lsp#callbag#undefined())
194 endfunction
195
196 function! s:emptySinkCallback(data, t, ...) abort
197     if a:t != 2 | return | endif
198     let a:data['disposed'] = 1
199 endfunction
200
201 function! s:empty_sink_callback(data, t, ...) abort
202     if a:t == 2 | call timer_stop(a:data['timer']) | endif
203 endfunction
204 " }}}
205
206 " never() {{{
207 function! lsp#callbag#never() abort
208     return function('s:never')
209 endfunction
210
211 function! s:never(start, sink) abort
212     if a:start != 0 | return | endif
213     call a:sink(0, function('s:noop'))
214 endfunction
215 " }}}
216
217 " forEach() {{{
218 function! lsp#callbag#forEach(operation) abort
219     let l:data = { 'operation': a:operation }
220     return function('s:forEachOperation', [l:data])
221 endfunction
222
223 function! s:forEachOperation(data, source) abort
224     return a:source(0, function('s:forEachOperationSource', [a:data]))
225 endfunction
226
227 function! s:forEachOperationSource(data, t, d) abort
228     if a:t == 0 | let a:data['talkback'] = a:d | endif
229     if a:t == 1 | call a:data['operation'](a:d) | endif
230     if (a:t == 1 || a:t == 0) | call a:data['talkback'](1, lsp#callbag#undefined()) | endif
231 endfunction
232 " }}}
233
234 " tap() {{{
235 function! lsp#callbag#tap(...) abort
236     let l:data = {}
237     if a:0 > 0 && type(a:1) == type({}) " a:1 { next, error, complete }
238         if has_key(a:1, 'next') | let l:data['next'] = a:1['next'] | endif
239         if has_key(a:1, 'error') | let l:data['error'] = a:1['error'] | endif
240         if has_key(a:1, 'complete') | let l:data['complete'] = a:1['complete'] | endif
241     else " a:1 = next, a:2 = error, a:3 = complete
242         if a:0 >= 1 | let l:data['next'] = a:1 | endif
243         if a:0 >= 2 | let l:data['error'] = a:2 | endif
244         if a:0 >= 3 | let l:data['complete'] = a:3 | endif
245     endif
246     return function('s:tapFactory', [l:data])
247 endfunction
248
249 function! s:tapFactory(data, source) abort
250     let a:data['source'] = a:source
251     return function('s:tapSouceFactory', [a:data])
252 endfunction
253
254 function! s:tapSouceFactory(data, start, sink) abort
255     if a:start != 0 | return | endif
256     let a:data['sink'] = a:sink
257     call a:data['source'](0, function('s:tapSourceCallback', [a:data]))
258 endfunction
259
260 function! s:tapSourceCallback(data, t, d) abort
261     if a:t == 1 && has_key(a:data, 'next') | call a:data['next'](a:d) | endif
262     if a:t == 2 && lsp#callbag#isUndefined(a:d) && has_key(a:data, 'complete') | call a:data['complete']() | endif
263     if a:t == 2 && !lsp#callbag#isUndefined(a:d) && has_key(a:data, 'error') | call a:data['error'](a:d) | endif
264     call a:data['sink'](a:t, a:d)
265 endfunction
266 " }}}
267
268 " interval() {{{
269 function! lsp#callbag#interval(period) abort
270     let l:data = { 'period': a:period }
271     return function('s:intervalPeriod', [l:data])
272 endfunction
273
274 function! s:intervalPeriod(data, start, sink) abort
275     if a:start != 0 | return | endif
276     let a:data['i'] = 0
277     let a:data['sink'] = a:sink
278     let a:data['timer'] = timer_start(a:data['period'], function('s:interval_callback', [a:data]), { 'repeat': -1 })
279     call a:sink(0, function('s:interval_sink_callback', [a:data]))
280 endfunction
281
282 function! s:interval_callback(data, ...) abort
283     let l:i = a:data['i']
284     let a:data['i'] = a:data['i'] + 1
285     call a:data['sink'](1, l:i)
286 endfunction
287
288 function! s:interval_sink_callback(data, t, ...) abort
289     if a:t == 2 | call timer_stop(a:data['timer']) | endif
290 endfunction
291 " }}}
292
293 " delay() {{{
294 function! lsp#callbag#delay(period) abort
295     let l:data = { 'period': a:period }
296     return function('s:delayPeriod', [l:data])
297 endfunction
298
299 function! s:delayPeriod(data, source) abort
300     let a:data['source'] = a:source
301     return function('s:delayFactory', [a:data])
302 endfunction
303
304 function! s:delayFactory(data, start, sink) abort
305     if a:start != 0 | return | endif
306     let a:data['sink'] = a:sink
307     call a:data['source'](0, function('s:delaySourceCallback', [a:data]))
308 endfunction
309
310 function! s:delaySourceCallback(data, t, d) abort
311     if a:t != 1
312         call a:data['sink'](a:t, a:d)
313         return
314     endif
315     let a:data['d'] = a:d
316     call timer_start(a:data['period'], function('s:delayTimerCallback', [a:data]))
317 endfunction
318
319 function! s:delayTimerCallback(data, ...) abort
320     call a:data['sink'](1, a:data['d'])
321 endfunction
322 " }}}
323
324 " take() {{{
325 function! lsp#callbag#take(max) abort
326     let l:data = { 'max': a:max }
327     return function('s:takeMax', [l:data])
328 endfunction
329
330 function! s:takeMax(data, source) abort
331     let a:data['source'] = a:source
332     return function('s:takeMaxSource', [a:data])
333 endfunction
334
335 function! s:takeMaxSource(data, start, sink) abort
336     if a:start != 0 | return | endif
337     let a:data['taken'] = 0
338     let a:data['end'] = 0
339     let a:data['sink'] = a:sink
340     let a:data['talkback'] = function('s:takeTalkback', [a:data])
341     call a:data['source'](0, function('s:takeSourceCallback', [a:data]))
342 endfunction
343
344 function! s:takeTalkback(data, t, d) abort
345     if a:t == 2
346         let a:data['end'] = 1
347         call a:data['sourceTalkback'](a:t, a:d)
348     elseif a:data['taken'] < a:data['max']
349         call a:data['sourceTalkback'](a:t, a:d)
350     endif
351 endfunction
352
353 function! s:takeSourceCallback(data, t, d) abort
354     if a:t == 0
355         let a:data['sourceTalkback'] = a:d
356         call a:data['sink'](0, a:data['talkback'])
357     elseif a:t == 1
358         if a:data['taken'] < a:data['max']
359             let a:data['taken'] = a:data['taken'] + 1
360             call a:data['sink'](a:t, a:d)
361             if a:data['taken'] == a:data['max'] && !a:data['end']
362                 let a:data['end'] = 1
363                 call a:data['sink'](2, lsp#callbag#undefined())
364                 call a:data['sourceTalkback'](2, lsp#callbag#undefined())
365             endif
366         endif
367     else
368         call a:data['sink'](a:t, a:d)
369     endif
370 endfunction
371 " }}}
372
373 " skip() {{{
374 function! lsp#callbag#skip(max) abort
375     let l:data = { 'max': a:max }
376     return function('s:skipMax', [l:data])
377 endfunction
378
379 function! s:skipMax(data, source) abort
380     let a:data['source'] = a:source
381     return function('s:skipMaxSource', [a:data])
382 endfunction
383
384 function! s:skipMaxSource(data, start, sink) abort
385     if a:start != 0 | return | endif
386     let a:data['sink'] = a:sink
387     let a:data['skipped'] = 0
388     call a:data['source'](0, function('s:skipSouceCallback', [a:data]))
389 endfunction
390
391 function! s:skipSouceCallback(data, t, d) abort
392     if a:t == 0
393         let a:data['talkback'] = a:d
394         call a:data['sink'](a:t, a:d)
395     elseif a:t == 1
396         if a:data['skipped'] < a:data['max']
397             let a:data['skipped'] = a:data['skipped'] + 1
398             call a:data['talkback'](1, lsp#callbag#undefined())
399         else
400             call a:data['sink'](a:t, a:d)
401         endif
402     else
403         call a:data['sink'](a:t, a:d)
404     endif
405 endfunction
406 " }}}
407
408 " map() {{{
409 function! lsp#callbag#map(F) abort
410     let l:data = { 'f': a:F }
411     return function('s:mapF', [l:data])
412 endfunction
413
414 function! s:mapF(data, source) abort
415     let a:data['source'] = a:source
416     return function('s:mapFSource', [a:data])
417 endfunction
418
419 function! s:mapFSource(data, start, sink) abort
420     if a:start != 0 | return | endif
421     let a:data['sink'] = a:sink
422     call a:data['source'](0, function('s:mapFSourceCallback', [a:data]))
423 endfunction
424
425 function! s:mapFSourceCallback(data, t, d) abort
426     call a:data['sink'](a:t, a:t == 1 ? a:data['f'](a:d) : a:d)
427 endfunction
428 " }}}
429
430 " filter() {{{
431 function! lsp#callbag#filter(condition) abort
432     let l:data = { 'condition': a:condition }
433     return function('s:filterCondition', [l:data])
434 endfunction
435
436 function! s:filterCondition(data, source) abort
437     let a:data['source'] = a:source
438     return function('s:filterConditionSource', [a:data])
439 endfunction
440
441 function! s:filterConditionSource(data, start, sink) abort
442     if a:start != 0 | return | endif
443     let a:data['sink'] = a:sink
444     call a:data['source'](0, function('s:filterSourceCallback', [a:data]))
445 endfunction
446
447 function! s:filterSourceCallback(data, t, d) abort
448     if a:t == 0
449         let a:data['talkback'] = a:d
450         call a:data['sink'](a:t, a:d)
451     elseif a:t == 1
452         if a:data['condition'](a:d)
453             call a:data['sink'](a:t, a:d)
454         else
455             call a:data['talkback'](1, lsp#callbag#undefined())
456         endif
457     else
458         call a:data['sink'](a:t, a:d)
459     endif
460 endfunction
461 " }}}
462
463 " fromEvent() {{{
464 let s:event_prefix_index = 0
465 function! lsp#callbag#fromEvent(events, ...) abort
466     let l:data = { 'events': a:events }
467     if a:0 > 0
468         let l:data['augroup'] = a:1
469     else
470         let l:data['augroup'] = '__callbag_fromEvent_prefix_' . s:event_prefix_index . '__'
471         let s:event_prefix_index = s:event_prefix_index + 1
472     endif
473     return function('s:fromEventFactory', [l:data])
474 endfunction
475
476 let s:event_handler_index = 0
477 let s:event_handlers_data = {}
478 function! s:fromEventFactory(data, start, sink) abort
479     if a:start != 0 | return | endif
480     let a:data['sink']  = a:sink
481     let a:data['disposed'] = 0
482     let a:data['handler'] = function('s:fromEventHandlerCallback', [a:data])
483     let a:data['handler_index'] = s:event_handler_index
484     let s:event_handler_index = s:event_handler_index + 1
485     call a:sink(0, function('s:fromEventSinkHandler', [a:data]))
486
487     if a:data['disposed'] | return | endif
488     let s:event_handlers_data[a:data['handler_index']] = a:data
489
490     execute 'augroup ' . a:data['augroup']
491     execute 'autocmd!'
492     let l:events = type(a:data['events']) == type('') ? [a:data['events']] : a:data['events']
493     for l:event in l:events
494         let l:exec =  'call s:notify_event_handler(' . a:data['handler_index'] . ')'
495         if type(l:event) == type('')
496             execute 'au ' . l:event . ' * ' . l:exec
497         else
498             execute 'au ' . join(l:event, ' ') .' ' .  l:exec
499         endif
500     endfor
501     execute 'augroup end'
502 endfunction
503
504 function! s:fromEventHandlerCallback(data) abort
505     " send v:event if it exists
506     call a:data['sink'](1, lsp#callbag#undefined())
507 endfunction
508
509 function! s:fromEventSinkHandler(data, t, ...) abort
510     if a:t != 2 | return | endif
511     let a:data['disposed'] = 1
512     execute 'augroup ' a:data['augroup']
513     autocmd!
514     execute 'augroup end'
515     if has_key(s:event_handlers_data, a:data['handler_index'])
516         call remove(s:event_handlers_data, a:data['handler_index'])
517     endif
518 endfunction
519
520 function! s:notify_event_handler(index) abort
521     let l:data = s:event_handlers_data[a:index]
522     call l:data['handler']()
523 endfunction
524 " }}}
525
526 " fromPromise() {{{
527 function! lsp#callbag#fromPromise(promise) abort
528     let l:data = { 'promise': a:promise }
529     return function('s:fromPromiseFactory', [l:data])
530 endfunction
531
532 function! s:fromPromiseFactory(data, start, sink) abort
533     if a:start != 0 | return | endif
534     let a:data['sink'] = a:sink
535     let a:data['ended'] = 0
536     call a:data['promise'].then(
537         \ function('s:fromPromiseOnFulfilledCallback', [a:data]),
538         \ function('s:fromPromiseOnRejectedCallback', [a:data]),
539         \ )
540     call a:sink(0, function('s:fromPromiseSinkCallback', [a:data]))
541 endfunction
542
543 function! s:fromPromiseOnFulfilledCallback(data, ...) abort
544     if a:data['ended'] | return | endif
545     call a:data['sink'](1, a:0 > 0 ? a:1 : lsp#callbag#undefined())
546     if a:data['ended'] | return | endif
547     call a:data['sink'](2, lsp#callbag#undefined())
548 endfunction
549
550 function! s:fromPromiseOnRejectedCallback(data, err) abort
551     if a:data['ended'] | return | endif
552     call a:data['sink'](2, a:err)
553 endfunction
554
555 function! s:fromPromiseSinkCallback(data, t, ...) abort
556     if a:t == 2 | let a:data['ended'] = 1 | endif
557 endfunction
558 " }}}
559
560 " debounceTime() {{{
561 function! lsp#callbag#debounceTime(duration) abort
562     let l:data = { 'duration': a:duration }
563     return function('s:debounceTimeDuration', [l:data])
564 endfunction
565
566 function! s:debounceTimeDuration(data, source) abort
567     let a:data['source'] = a:source
568     return function('s:debounceTimeDurationSource', [a:data])
569 endfunction
570
571 function! s:debounceTimeDurationSource(data, start, sink) abort
572     if a:start != 0 | return | endif
573     let a:data['sink'] = a:sink
574     call a:data['source'](0, function('s:debounceTimeSourceCallback', [a:data]))
575 endfunction
576
577 function! s:debounceTimeSourceCallback(data, t, d) abort
578     if has_key(a:data, 'timer') | call timer_stop(a:data['timer']) | endif
579     if a:t == 1
580         let a:data['timer'] = timer_start(a:data['duration'], function('s:debounceTimeTimerCallback', [a:data, a:d]))
581     else
582         call a:data['sink'](a:t, a:d)
583     endif
584 endfunction
585
586 function! s:debounceTimeTimerCallback(data, d, ...) abort
587     call a:data['sink'](1, a:d)
588 endfunction
589 " }}}
590
591 " subscribe() {{{
592 function! lsp#callbag#subscribe(...) abort
593     let l:data = {}
594     if a:0 > 0 && type(a:1) == type({}) " a:1 { next, error, complete }
595         if has_key(a:1, 'next') | let l:data['next'] = a:1['next'] | endif
596         if has_key(a:1, 'error') | let l:data['error'] = a:1['error'] | endif
597         if has_key(a:1, 'complete') | let l:data['complete'] = a:1['complete'] | endif
598     else " a:1 = next, a:2 = error, a:3 = complete
599         if a:0 >= 1 | let l:data['next'] = a:1 | endif
600         if a:0 >= 2 | let l:data['error'] = a:2 | endif
601         if a:0 >= 3 | let l:data['complete'] = a:3 | endif
602     endif
603     return function('s:subscribeListener', [l:data])
604 endfunction
605
606 function! s:subscribeListener(data, source) abort
607     call a:source(0, function('s:subscribeSourceCallback', [a:data]))
608     return function('s:subscribeDispose', [a:data])
609 endfunction
610
611 function! s:subscribeSourceCallback(data, t, d) abort
612     if a:t == 0 | let a:data['talkback'] = a:d | endif
613     if a:t == 1 && has_key(a:data, 'next') | call a:data['next'](a:d) | endif
614     if a:t == 1 || a:t == 0 | call a:data['talkback'](1, lsp#callbag#undefined()) | endif
615     if a:t == 2 && lsp#callbag#isUndefined(a:d) && has_key(a:data, 'complete') | call a:data['complete']() | endif
616     if a:t == 2 && !lsp#callbag#isUndefined(a:d) && has_key(a:data, 'error') | call a:data['error'](a:d) | endif
617 endfunction
618
619 function! s:subscribeDispose(data, ...) abort
620     if has_key(a:data, 'talkback') | call a:data['talkback'](2, lsp#callbag#undefined()) | endif
621 endfunction
622 " }}}
623
624 " toList() {{{
625 function! lsp#callbag#toList() abort
626     let l:data = { 'done': 0, 'items': [], 'unsubscribed': 0 }
627     return function('s:toListFactory', [l:data])
628 endfunction
629
630 function! s:toListFactory(data, source) abort
631     let a:data['unsubscribe'] = lsp#callbag#subscribe(
632         \ function('s:toListOnNext', [a:data]),
633         \ function('s:toListOnError', [a:data]),
634         \ function('s:toListOnComplete', [a:data])
635         \ )(a:source)
636     if a:data['done'] | call s:toListUnsubscribe(a:data) | endif
637     return {
638         \ 'unsubscribe': function('s:toListUnsubscribe', [a:data]),
639         \ 'wait': function('s:toListWait', [a:data])
640         \ }
641 endfunction
642
643 function! s:toListUnsubscribe(data) abort
644     if !has_key(a:data, 'unsubscribe') | return | endif
645     if !a:data['unsubscribed']
646         call a:data['unsubscribe']()
647         let a:data['unsubscribed'] = 1
648         if !a:data['done']
649             let a:data['done'] = 1
650             try
651                 throw 'lsp#callbag toList() is already unsubscribed.'
652             catch
653                 let a:data['error'] = v:exception . ' ' . v:throwpoint
654             endtry
655         endif
656     endif
657 endfunction
658
659 function! s:toListOnNext(data, item) abort
660     call add(a:data['items'], a:item)
661 endfunction
662
663 function! s:toListOnError(data, error) abort
664     let a:data['done'] = 1
665     let a:data['error'] = a:error
666     call s:toListUnsubscribe(a:data)
667 endfunction
668
669 function! s:toListOnComplete(data) abort
670     let a:data['done'] = 1
671     call s:toListUnsubscribe(a:data)
672 endfunction
673
674 function! s:toListWait(data, ...) abort
675     if a:data['done']
676         if has_key(a:data, 'error')
677             throw a:data['error']
678         else
679             return a:data['items']
680         endif
681     else
682         let l:opt = a:0 > 0 ? copy(a:1) : {}
683         let l:opt['timedout'] = 0
684         let l:opt['sleep'] = get(l:opt, 'sleep', 1)
685         let l:opt['timeout'] = get(l:opt, 'timeout', -1)
686
687         if l:opt['timeout'] > -1
688             let l:opt['timer'] = timer_start(l:opt['timeout'], function('s:toListTimeoutCallback', [l:opt]))
689         endif
690
691         while !a:data['done'] && !l:opt['timedout']
692             exec 'sleep ' . l:opt['sleep'] . 'm'
693         endwhile
694
695         if has_key(l:opt, 'timer')
696             silent! call timer_stop(l:opt['timer'])
697         endif
698
699         if l:opt['timedout']
700             throw 'lsp#callbag toList().wait() timedout.'
701         endif
702
703         if has_key(a:data, 'error')
704             throw a:data['error']
705         else
706             return a:data['items']
707         endif
708     endif
709 endfunction
710
711 function! s:toListTimeoutCallback(opt, ...) abort
712     let a:opt['timedout'] = 1
713 endfunction
714 " }}}
715
716 " throwError() {{{
717 function! lsp#callbag#throwError(error) abort
718     let l:data = { 'error': a:error }
719     return function('s:throwErrorFactory', [l:data])
720 endfunction
721
722 function! s:throwErrorFactory(data, start, sink) abort
723     if a:start != 0 | return | endif
724     let a:data['disposed'] = 0
725     call a:sink(0, function('s:throwErrorSinkCallback', [a:data]))
726     if a:data['disposed'] | return | endif
727     call a:sink(2, a:data['error'])
728 endfunction
729
730 function! s:throwErrorSinkCallback(data, t, ...) abort
731     if a:t != 2 | return | endif
732     let a:data['disposed'] = 1
733 endfunction
734 " }}}
735
736 " of() {{{
737 function! lsp#callbag#of(...) abort
738     let l:data = { 'values': a:000 }
739     return function('s:listFactory', [l:data])
740 endfunction
741 " }}}
742
743 " fromList() {{{
744 function! lsp#callbag#fromList(list) abort
745     let l:data = { 'values': a:list }
746     return function('s:listFactory', [l:data])
747 endfunction
748
749 function! s:listFactory(data, start, sink) abort
750     if a:start != 0 | return | endif
751     let a:data['disposed'] = 0
752     call a:sink(0, function('s:listSinkCallback', [a:data]))
753     let l:i = 0
754     let l:n = len(a:data['values'])
755     while l:i < l:n
756         if a:data['disposed'] | break | endif
757         call a:sink(1, a:data['values'][l:i])
758         let l:i += 1
759     endwhile
760     if a:data['disposed'] | return | endif
761     call a:sink(2, lsp#callbag#undefined())
762 endfunction
763
764
765 function! s:listSinkCallback(data, t, ...) abort
766     if a:t != 2 | return | endif
767     let a:data['disposed'] = 1
768 endfunction
769 " }}}
770
771 " merge() {{{
772 function! lsp#callbag#merge(...) abort
773     let l:data = { 'sources': a:000 }
774     return function('s:mergeFactory', [l:data])
775 endfunction
776
777 function! s:mergeFactory(data, start, sink) abort
778     if a:start != 0 | return | endif
779     let a:data['sink'] = a:sink
780     let a:data['n'] = len(a:data['sources'])
781     let a:data['sourceTalkbacks'] = []
782     let a:data['startCount'] = 0
783     let a:data['endCount'] = 0
784     let a:data['ended'] = 0
785     let a:data['talkback'] = function('s:mergeTalkbackCallback', [a:data])
786     let l:i = 0
787     while l:i < a:data['n']
788         if a:data['ended'] | return | endif
789         call a:data['sources'][l:i](0, function('s:mergeSourceCallback', [a:data, l:i]))
790         let l:i += 1
791     endwhile
792 endfunction
793
794 function! s:mergeTalkbackCallback(data, t, d) abort
795     if a:t == 2 | let a:data['ended'] = 1 | endif
796     let l:i = 0
797     while l:i < a:data['n']
798         if l:i < len(a:data['sourceTalkbacks']) && a:data['sourceTalkbacks'][l:i] != 0
799             call a:data['sourceTalkbacks'][l:i](a:t, a:d)
800         endif
801         let l:i += 1
802     endwhile
803 endfunction
804
805 function! s:mergeSourceCallback(data, i, t, d) abort
806     if a:t == 0
807         call insert(a:data['sourceTalkbacks'], a:d, a:i)
808         let a:data['startCount'] += 1
809         if a:data['startCount'] == 1 | call a:data['sink'](0, a:data['talkback']) | endif
810     elseif a:t == 2 && !lsp#callbag#isUndefined(a:d)
811         let a:data['ended'] = 1
812         let l:j = 0
813         while l:j < a:data['n']
814             if l:j != a:i && l:j < len(a:data['sourceTalkbacks']) && a:data['sourceTalkbacks'][l:j] != 0
815                 call a:data['sourceTalkbacks'][l:j](2, lsp#callbag#undefined())
816             endif
817             let l:j += 1
818         endwhile
819         call a:data['sink'](2, a:d)
820     elseif a:t == 2
821         let a:data['sourceTalkbacks'][a:i] = 0
822         let a:data['endCount'] += 1
823         if a:data['endCount'] == a:data['n'] | call a:data['sink'](2, lsp#callbag#undefined()) | endif
824     else
825         call a:data['sink'](a:t, a:d)
826     endif
827 endfunction
828 " }}}
829
830 " concat() {{{
831 function! lsp#callbag#concat(...) abort
832     let l:data = { 'sources': a:000 }
833     return function('s:concatFactory', [l:data])
834 endfunction
835
836 let s:concatUniqueToken = '__callback__concat_unique_token__'
837 function! s:concatFactory(data, start, sink) abort
838     if a:start != 0 | return | endif
839     let a:data['sink'] = a:sink
840     let a:data['n'] = len(a:data['sources'])
841     if a:data['n'] == 0
842         call a:data['sink'](0, function('s:noop'))
843         call a:data['sink'](2, lsp#callbag#undefined())
844         return
845     endif
846     let a:data['i'] = 0
847     let a:data['lastPull'] = s:concatUniqueToken
848     let a:data['talkback'] = function('s:concatTalkbackCallback', [a:data])
849     let a:data['next'] = function('s:concatNext', [a:data])
850     call a:data['next']()
851 endfunction
852
853 function! s:concatTalkbackCallback(data, t, d) abort
854     if a:t == 1 | let a:data['lastPull'] = a:d | endif
855     call a:data['sourceTalkback'](a:t, a:d)
856 endfunction
857
858 function! s:concatNext(data) abort
859     if a:data['i'] == a:data['n']
860         call a:data['sink'](2, lsp#callbag#undefined())
861         return
862     endif
863     call a:data['sources'][a:data['i']](0, function('s:concatSourceCallback', [a:data]))
864 endfunction
865
866 function! s:concatSourceCallback(data, t, d) abort
867     if a:t == 0
868         let a:data['sourceTalkback'] = a:d
869         if a:data['i'] == 0
870             call a:data['sink'](0, a:data['talkback'])
871         elseif (a:data['lastPull']) != s:concatUniqueToken
872             call a:data['sourceTalkback'](1, a:data['lastPull'])
873         endif
874     elseif a:t == 2 && a:d != lsp#callbag#undefined()
875         call a:data['sink'](2, a:d)
876     elseif a:t == 2 
877         let a:data['i'] = a:data['i'] + 1
878         call a:data['next']()
879     else
880         call a:data['sink'](a:t, a:d)
881     endif
882 endfunction
883 " }}}
884
885 " combine() {{{
886 function! lsp#callbag#combine(...) abort
887     let l:data = { 'sources': a:000 }
888     return function('s:combineFactory', [l:data])
889 endfunction
890
891 let s:combineEmptyToken = '__callback__combine_empty_token__'
892 function! s:combineFactory(data, start, sink) abort
893     if a:start != 0 | return | endif
894     let a:data['sink'] = a:sink
895     let a:data['n'] = len(a:data['sources'])
896     if a:data['n'] == 0
897         call a:data['sink'](0, function('s:noop'))
898         call a:data['sink'](1, [])
899         call a:data['sink'](2, lsp#callbag#undefined())
900         return
901     endif
902     let a:data['Ns'] = a:data['n'] " start counter
903     let a:data['Nd'] = a:data['n'] " data counter
904     let a:data['Ne'] = a:data['n'] " end counter
905     let a:data['vals'] = s:createArrayWithSize(a:data['n'], lsp#callbag#undefined())
906     let a:data['sourceTalkbacks'] = s:createArrayWithSize(a:data['n'], lsp#callbag#undefined())
907     let a:data['talkback'] = function('s:combineTalkbackCallback', [a:data])
908     let l:i = 0
909     for l:Source in a:data['sources']
910         let a:data['vals'][l:i] = s:combineEmptyToken
911         call l:Source(0, function('s:combineSourceCallback', [a:data, l:i]))
912         let l:i = l:i + 1
913     endfor
914 endfunction
915
916 function! s:combineTalkbackCallback(data, t, d) abort
917     if a:t == 0 | return | endif
918     let l:i = 0
919     while l:i < a:data['n']
920         call a:data['sourceTalkbacks'][l:i](a:t, a:d)
921         let l:i = l:i + 1
922     endwhile
923 endfunction
924
925 function! s:combineSourceCallback(data, i, t, d) abort
926     if a:t == 0
927         let a:data['sourceTalkbacks'][a:i] = a:d
928         let a:data['Ns'] = a:data['Ns'] - 1
929         if a:data['Ns'] == 0 | call a:data['sink'](0, a:data['talkback']) | endif
930     elseif a:t == 1
931         if a:data['Nd'] <= 0
932             let l:_Nd = 0
933         else
934             if a:data['vals'][a:i] == s:combineEmptyToken
935                 let a:data['Nd'] = a:data['Nd'] - 1
936             endif
937             let l:_Nd = a:data['Nd']
938         endif
939         let a:data['vals'][a:i] = a:d
940         if l:_Nd == 0
941             let l:arr = s:createArrayWithSize(a:data['n'], lsp#callbag#undefined())
942             let l:j = 0
943             while l:j < a:data['n']
944                 let l:arr[l:j] = a:data['vals'][l:j]
945                 let l:j = l:j + 1
946             endwhile
947             call a:data['sink'](1, l:arr)
948         endif
949     elseif a:t == 2
950         let a:data['Ne'] = a:data['Ne'] - 1
951         if a:data['Ne'] == 0
952             call a:data['sink'](2, lsp#callbag#undefined())
953         endif
954     else
955         call a:data['sink'](a:t, a:d)
956     endif
957 endfunction
958 " }}}
959
960 " distinctUntilChanged {{{
961 function! s:distinctUntilChangedDefaultCompare(a, b) abort
962     return a:a == a:b
963 endfunction
964
965 function! lsp#callbag#distinctUntilChanged(...) abort
966     let l:data = { 'compare': a:0 == 0 ? function('s:distinctUntilChangedDefaultCompare') : a:1 }
967     return function('s:distinctUntilChangedSourceFactory', [l:data])
968 endfunction
969
970 function! s:distinctUntilChangedSourceFactory(data, source) abort
971     let a:data['source'] = a:source
972     return function('s:distinctUntilChangedSinkFactory', [a:data])
973 endfunction
974
975 function! s:distinctUntilChangedSinkFactory(data, start, sink) abort
976     if a:start != 0 | return | endif
977     let a:data['sink'] = a:sink
978     let a:data['inited'] = 0
979     call a:data['source'](0, function('s:distinctUntilChangedSourceCallback', [a:data]))
980 endfunction
981
982 function! s:distinctUntilChangedSourceCallback(data, t, d) abort
983     if a:t == 0 | let a:data['talkback'] = a:d | endif
984     if a:t != 1
985         call a:data['sink'](a:t, a:d)
986         return
987     endif
988
989     if a:data['inited'] && has_key(a:data, 'prev') && a:data['compare'](a:data['prev'], a:d)
990         call a:data['talkback'](1, lsp#callbag#undefined())
991         return
992     endif
993
994     let a:data['inited'] = 1
995     let a:data['prev'] = a:d
996     call a:data['sink'](1, a:d)
997 endfunction
998 " }}}
999
1000 " takeUntil() {{{
1001 function! lsp#callbag#takeUntil(notfier) abort
1002     let l:data = { 'notifier': a:notfier }
1003     return function('s:takeUntilNotifier', [l:data])
1004 endfunction
1005
1006 function! s:takeUntilNotifier(data, source) abort
1007     let a:data['source'] = a:source
1008     return function('s:takeUntilFactory', [a:data])
1009 endfunction
1010
1011 let s:takeUntilUniqueToken = '__callback__take_until_unique_token__'
1012
1013 function! s:takeUntilFactory(data, start, sink) abort
1014     if a:start != 0 | return | endif
1015     let a:data['sink'] = a:sink
1016     let a:data['inited'] = 1
1017     let a:data['sourceTalkback'] = 0
1018     let a:data['notiferTalkback'] = 0
1019     let a:data['done'] = s:takeUntilUniqueToken
1020     call a:data['source'](0, function('s:takeUntilSourceCallback', [a:data]))
1021 endfunction
1022
1023 function! s:takeUntilSourceCallback(data, t, d) abort
1024     if a:t == 0
1025         let a:data['sourceTalkback'] = a:d
1026         call a:data['notifier'](0, function('s:takeUntilNotifierCallback', [a:data]))
1027         let a:data['inited'] = 1
1028         call a:data['sink'](0, function('s:takeUntilSinkCallback', [a:data]))
1029         if a:data['done'] != s:takeUntilUniqueToken | call a:data['sink'](2, a:data['done']) | endif
1030         return
1031     endif
1032     if a:t == 2
1033         call a:data['notifierTalkback'](2, lsp#callbag#undefined())
1034     endif
1035     if a:data['done'] == s:takeUntilUniqueToken
1036         call a:data['sink'](a:t, a:d)
1037     endif
1038 endfunction
1039
1040 function! s:takeUntilNotifierCallback(data, t, d) abort
1041     if a:t == 0
1042         let a:data['notifierTalkback'] = a:d
1043         call a:data['notifierTalkback'](1, lsp#callbag#undefined())
1044         return
1045     endif
1046     if a:t == 1
1047         let a:data['done'] = 0
1048         call a:data['notifierTalkback'](2, lsp#callbag#undefined())
1049         call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1050         if a:data['inited'] | call a:data['sink'](2, lsp#callbag#undefined()) | endif
1051         return
1052     endif
1053     if a:t ==2
1054         let a:data['notifierTalkback'] = 0
1055         let a:data['done'] = a:d
1056         if a:d != 0
1057             call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1058             if a:data['inited'] | call a:data['sink'](a:t, a:d) | endif
1059         endif
1060     endif
1061 endfunction
1062
1063 function! s:takeUntilSinkCallback(data, t, d) abort
1064     if a:data['done'] != s:takeUntilUniqueToken | return | endif
1065     if a:t == 2 && has_key(a:data, 'notifierTalkback') && a:data['notifierTalkback'] != 0 | call a:data['notifierTalkback'](2, lsp#callbag#undefined()) | endif
1066     call a:data['sourceTalkback'](a:t, a:d)
1067 endfunction
1068 " }}}
1069
1070 " takeWhile() {{{
1071 function! lsp#callbag#takeWhile(predicate) abort
1072     let l:data = { 'predicate': a:predicate }
1073     return function('s:takeWhileFactory', [l:data])
1074 endfunction
1075
1076 function! s:takeWhileFactory(data, source) abort
1077     let a:data['source'] = a:source
1078     return function('s:takeWhileSourceFactory', [a:data])
1079 endfunction
1080
1081 function! s:takeWhileSourceFactory(data, start, sink) abort
1082     if a:start != 0 | return | endif
1083     let a:data['sink'] = a:sink
1084     call a:data['source'](0, function('s:takeWhileSourceCallback', [a:data]))
1085 endfunction
1086
1087 function! s:takeWhileSourceCallback(data, t, d) abort
1088     if a:t == 0
1089         let a:data['sourceTalkback'] = a:d
1090     endif
1091
1092     if a:t == 1 && !a:data['predicate'](a:d)
1093         call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1094         call a:data['sink'](2, lsp#callbag#undefined())
1095         return
1096     endif
1097
1098     call a:data['sink'](a:t, a:d)
1099 endfunction
1100 " }}}
1101
1102 " group() {{{
1103 function! lsp#callbag#group(n) abort
1104     let l:data = { 'n': a:n }
1105     return function('s:groupN', [l:data])
1106 endfunction
1107
1108 function! s:groupN(data, source) abort
1109     let a:data['source'] = a:source
1110     return function('s:groupFactory', [a:data])
1111 endfunction
1112
1113 function! s:groupFactory(data, start, sink) abort
1114     if a:start != 0 | return | endif
1115     let a:data['sink'] = a:sink
1116     let a:data['chunk'] = []
1117     call a:data['source'](0, function('s:groupSourceCallback', [a:data]))
1118 endfunction
1119
1120 function! s:groupSourceCallback(data, t, d) abort
1121     if a:t == 0 | let a:data['talkback'] = a:d | endif
1122     if a:t == 1
1123         call add(a:data['chunk'], a:d)
1124         if len(a:data['chunk']) == a:data['n']
1125             call a:data['sink'](a:t, remove(a:data['chunk'], 0, a:data['n'] - 1))
1126         endif
1127         call a:data['talkback'](1, lsp#callbag#undefined())
1128     else
1129         if a:t == 2 && len(a:data['chunk']) > 0
1130             call a:data['sink'](1, remove(a:data['chunk'], 0, len(a:data['chunk']) - 1))
1131         else
1132             call a:data['sink'](a:t, a:d)
1133         endif
1134     endif
1135 endfunction
1136 " }}}
1137
1138 " flatten() {{{
1139 function! lsp#callbag#flatten() abort
1140     return function('s:flattenSource')
1141 endfunction
1142
1143 function! s:flattenSource(source) abort
1144     let l:data = { 'source': a:source }
1145     return function('s:flattenFactory', [l:data])
1146 endfunction
1147
1148 function! s:flattenFactory(data, start, sink) abort
1149     if a:start != 0 | return | endif
1150     let a:data['sink'] = a:sink
1151     let a:data['outerEnded'] = 0
1152     let a:data['outerTalkback'] = 0
1153     let a:data['innerTalkback'] = 0
1154     let a:data['talkback'] = function('s:flattenTalkbackCallback', [a:data])
1155     call a:data['source'](0, function('s:flattenSourceCallback', [a:data]))
1156 endfunction
1157
1158 function! s:flattenTalkbackCallback(data, t, d) abort
1159     if a:t == 1
1160         if a:data['innerTalkback'] != 0
1161             call a:data['innerTalkback'](1, a:d)
1162         else
1163             call a:data['outerTalkback'](1, a:d)
1164         endif
1165     endif
1166     if a:t == 2
1167         if a:data['innerTalkback'] != 0 | call a:data['innerTalkback'](2, lsp#callbag#undefined()) | endif
1168         call a:data['outerTalkback'](2, lsp#callbag#undefined())
1169     endif
1170 endfunction
1171
1172 function! s:flattenSourceCallback(data, t, d) abort
1173     if a:t == 0
1174         let a:data['outerTalkback'] = a:d
1175         call a:data['sink'](0, a:data['talkback'])
1176     elseif a:t == 1
1177         let l:InnerSource = a:d
1178         if a:data['innerTalkback'] != 0 | call a:data['innerTalkback'](2, lsp#callbag#undefined()) | endif
1179         call l:InnerSource(0, function('s:flattenInnerSourceCallback', [a:data]))
1180     elseif a:t == 2 && !lsp#callbag#isUndefined(a:d)
1181         if a:data['innerTalkback'] != 0 | call a:data['innerTalkback'](2, lsp#callbag#undefined()) | endif
1182         call a:data['outerTalkback'](1, a:d)
1183     elseif a:t == 2
1184         if a:data['innerTalkback'] == 0
1185             call a:data['sink'](2, lsp#callbag#undefined())
1186         else
1187             let a:data['outerEnded'] = 1
1188         endif
1189     endif
1190 endfunction
1191
1192 function! s:flattenInnerSourceCallback(data, t, d) abort
1193     if a:t == 0
1194         let a:data['innerTalkback'] = a:d
1195         call a:data['innerTalkback'](1, lsp#callbag#undefined())
1196     elseif a:t == 1
1197         call a:data['sink'](1, a:d)
1198     elseif a:t == 2 && !lsp#callbag#isUndefined(a:d)
1199         call a:data['outerTalkback'](2, lsp#callbag#undefined())
1200         call a:data['sink'](2, a:d)
1201     elseif a:t == 2
1202         if a:data['outerEnded'] != 0
1203             call a:data['sink'](2, lsp#callbag#undefined())
1204         else
1205             let a:data['innerTalkback'] = 0
1206             call a:data['outerTalkback'](1, lsp#callbag#undefined())
1207         endif
1208     endif
1209 endfunction
1210 " }}}
1211
1212 " flatMap() {{{
1213 function! lsp#callbag#flatMap(F) abort
1214     return lsp#callbag#operate(
1215         \ lsp#callbag#map(a:F),
1216         \ lsp#callbag#flatten(),
1217         \ )
1218 endfunction
1219 " }}}
1220
1221 " scan() {{{
1222 function! lsp#callbag#scan(reducer, seed) abort
1223     let l:data = { 'reducer': a:reducer, 'seed': a:seed }
1224     return function('s:scanSource', [l:data])
1225 endfunction
1226
1227 function! s:scanSource(data, source) abort
1228     let a:data['source'] = a:source
1229     return function('s:scanFactory', [a:data])
1230 endfunction
1231
1232 function! s:scanFactory(data, start, sink) abort
1233     if a:start != 0 | return | endif
1234     let a:data['sink'] = a:sink
1235     let a:data['acc'] = a:data['seed']
1236     call a:data['source'](0, function('s:scanSourceCallback', [a:data]))
1237 endfunction
1238
1239 function! s:scanSourceCallback(data, t, d) abort
1240     if a:t == 1
1241         let a:data['acc'] = a:data['reducer'](a:data['acc'], a:d)
1242         call a:data['sink'](1, a:data['acc'])
1243     else
1244         call a:data['sink'](a:t, a:d)
1245     endif
1246 endfunction
1247 " }}}
1248
1249 " reduce() {{{
1250 function! lsp#callbag#reduce(reducer, seed) abort
1251     let l:data = { 'reducer': a:reducer, 'seed': a:seed }
1252     return function('s:reduceSource', [l:data])
1253 endfunction
1254
1255 function! s:reduceSource(data, source) abort
1256     let a:data['source'] = a:source
1257     return function('s:reduceFactory', [a:data])
1258 endfunction
1259
1260 function! s:reduceFactory(data, start, sink) abort
1261     if a:start != 0 | return | endif
1262     let a:data['sink'] = a:sink
1263     let a:data['acc'] = a:data['seed']
1264     call a:data['source'](0, function('s:reduceSourceCallback', [a:data]))
1265 endfunction
1266
1267 function! s:reduceSourceCallback(data, t, d) abort
1268     if a:t == 1
1269         let a:data['acc'] = a:data['reducer'](a:data['acc'], a:d)
1270     elseif a:t == 2 && lsp#callbag#isUndefined(a:d)
1271         call a:data['sink'](1, a:data['acc'])
1272         call a:data['sink'](2, lsp#callbag#undefined())
1273     else
1274         call a:data['sink'](a:t, a:d)
1275     endif
1276 endfunction
1277 " }}}
1278
1279 " switchMap() {{{
1280 function! lsp#callbag#switchMap(makeSource, ...) abort
1281     let l:data = { 'makeSource': a:makeSource }
1282     if a:0 == 1
1283         let l:data['combineResults'] = a:1
1284     else
1285         let l:data['combineResults'] = function('s:switchMapDefaultCombineResults')
1286     endif
1287     return function('s:switchMapSourceCallback', [l:data])
1288 endfunction
1289
1290 function! s:switchMapDefaultCombineResults(a, b) abort
1291     return a:b
1292 endfunction
1293
1294 function! s:switchMapSourceCallback(data, inputSource) abort
1295     let a:data['inputSource'] = a:inputSource
1296     return function('s:switchMapFactory', [a:data])
1297 endfunction
1298
1299 function! s:switchMapFactory(data, start, outputSink) abort
1300     if a:start != 0 | return | endif
1301     let a:data['outputSink'] = a:outputSink
1302     let a:data['sourceEnded'] = 0
1303     call a:data['inputSource'](0, function('s:switchMapInputSourceCallback', [a:data]))
1304 endfunction
1305
1306 function! s:switchMapInputSourceCallback(data, t, d) abort
1307     if a:t == 0 | call a:data['outputSink'](a:t, a:d) | endif
1308     if a:t == 1
1309         if has_key(a:data, 'currSourceTalkback')
1310             call a:data['currSourceTalkback'](2, lsp#callbag#undefined())
1311             call remove(a:data, 'currSourceTalkback')
1312         endif
1313         let l:CurrSource = a:data['makeSource'](a:d)
1314         call l:CurrSource(0, function('s:switchMapCurrSourceCallback', [a:data, a:t, a:d]))
1315     endif
1316     if a:t == 2
1317         let a:data['sourceEnded'] = 1
1318         if !has_key(a:data, 'currSourceTalkback') | call a:data['outputSink'](a:t, a:d) | endif
1319     endif
1320 endfunction
1321
1322 function! s:switchMapCurrSourceCallback(data, t, d, currT, currD) abort
1323     if a:currT == 0 | let a:data['currSourceTalkback'] = a:currD | endif
1324     if a:currT == 1 | call a:data['outputSink'](a:t, a:data['combineResults'](a:d, a:currD)) | endif
1325     if (a:currT == 0 || a:currT == 1) && has_key(a:data, 'currSourceTalkback')
1326         call a:data['currSourceTalkback'](1, lsp#callbag#undefined())
1327     endif
1328     if a:currT == 2
1329         call remove(a:data, 'currSourceTalkback')
1330         if a:data['sourceEnded'] | call a:data['outputSink'](a:currT, a:currD) | endif
1331     endif
1332 endfunction
1333 " }}}
1334
1335 " {{{
1336 function! lsp#callbag#share(source) abort
1337     let l:data = { 'source': a:source, 'sinks': [] }
1338     return function('s:shareFactory', [l:data])
1339 endfunction
1340
1341 function! s:shareFactory(data, start, sink) abort
1342     if a:start != 0 | return | endif
1343     call add(a:data['sinks'], a:sink)
1344
1345     let a:data['talkback'] = function('s:shareTalkbackCallback', [a:data, a:sink])
1346
1347     if len(a:data['sinks']) == 1
1348         call a:data['source'](0, function('s:shareSourceCallback', [a:data, a:sink]))
1349         return
1350     endif
1351
1352     call a:sink(0, a:data['talkback'])
1353 endfunction
1354
1355 function! s:shareTalkbackCallback(data, sink, t, d) abort
1356     if a:t == 2
1357         let l:i = 0
1358         let l:found = 0
1359         while l:i < len(a:data['sinks'])
1360             if a:data['sinks'][l:i] == a:sink
1361                 let l:found = 1
1362                 break
1363             endif
1364             let l:i += 1
1365         endwhile
1366
1367         if l:found
1368             call remove(a:data['sinks'], l:i)
1369         endif
1370
1371         if empty(a:data['sinks'])
1372             call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1373         endif
1374     else
1375         call a:data['sourceTalkback'](a:t, a:d)
1376     endif
1377 endfunction
1378
1379 function! s:shareSourceCallback(data, sink, t, d) abort
1380     if a:t == 0
1381         let a:data['sourceTalkback'] = a:d
1382         call a:sink(0, a:data['talkback'])
1383     else
1384         for l:S in a:data['sinks']
1385             call l:S(a:t, a:d)
1386         endfor
1387     endif
1388     if a:t == 2
1389         let a:data['sinks'] = []
1390     endif
1391 endfunction
1392 " }}}
1393
1394 " materialize() {{{
1395 function! lsp#callbag#materialize() abort
1396     let l:data = {}
1397     return function('s:materializeF', [l:data])
1398 endfunction
1399
1400 function! s:materializeF(data, source) abort
1401     let a:data['source'] = a:source
1402     return function('s:materializeFSource', [a:data])
1403 endfunction
1404
1405 function! s:materializeFSource(data, start, sink) abort
1406     if a:start != 0 | return | endif
1407     let a:data['sink'] = a:sink
1408     call a:data['source'](0, function('s:materializeFSourceCallback', [a:data]))
1409 endfunction
1410
1411 function! s:materializeFSourceCallback(data, t, d) abort
1412     if a:t == 1
1413         call a:data['sink'](1, lsp#callbag#createNextNotification(a:d))
1414     elseif a:t == 2
1415         call a:data['sink'](1, lsp#callbag#isUndefined(a:d)
1416                     \ ? lsp#callbag#createCompleteNotification()
1417                     \ : lsp#callbag#createErrorNotification(a:d))
1418         call a:data['sink'](2, lsp#callbag#undefined())
1419     else
1420         call a:data['sink'](a:t, a:d)
1421     endif
1422 endfunction
1423 " }}}
1424
1425 " Notifications {{{
1426 function! lsp#callbag#createNextNotification(d) abort
1427     return { 'kind': 'N', 'value': a:d }
1428 endfunction
1429
1430 function! lsp#callbag#createCompleteNotification() abort
1431     return { 'kind': 'C' }
1432 endfunction
1433
1434 function! lsp#callbag#createErrorNotification(d) abort
1435     return { 'kind': 'E', 'error': a:d }
1436 endfunction
1437
1438 function! lsp#callbag#isNextNotification(d) abort
1439     return a:d['kind'] ==# 'N'
1440 endfunction
1441
1442 function! lsp#callbag#isCompleteNotification(d) abort
1443     return a:d['kind'] ==# 'C'
1444 endfunction
1445
1446 function! lsp#callbag#isErrorNotification(d) abort
1447     return a:d['kind'] ==# 'E'
1448 endfunction
1449 " }}}
1450
1451 " spawn {{{
1452 " let s:Stdin = lsp#callbag#makeSubject()
1453 " call lsp#callbag#spawn(['bash', '-c', 'read i; echo $i'], {
1454 "   \ 'stdin': s:Stdin,
1455 "   \ 'stdout': 0,
1456 "   \ 'stderr': 0,
1457 "   \ 'exit': 0,
1458 "   \ 'start': 0, " when job starts before subscribing to stdin
1459 "   \ 'ready': 0, " when job starts and after subscribing to stdin
1460 "   \ 'pid': 0,
1461 "   \ 'failOnNonZeroExitCode': 1,
1462 "   \ 'failOnStdinError': 1,
1463 "   \ 'normalize': 'raw' | 'string' | 'array', (defaults to raw),
1464 "   \ 'env': {},
1465 "   \ })
1466 "   call s:Stdin(1, 'hi')
1467 "   call s:Stdin(2, lsp#callbag#undefined()) " requried to close stdin
1468 function! lsp#callbag#spawn(cmd, ...) abort
1469     let l:data = { 'cmd': a:cmd, 'opt': a:0 > 0 ? copy(a:000[0]) : {} }
1470     return lsp#callbag#create(function('s:spawnCreate', [l:data]))
1471 endfunction
1472
1473 function! s:spawnCreate(data, next, error, complete) abort
1474     let a:data['next'] = a:next
1475     let a:data['error'] = a:error
1476     let a:data['complete'] = a:complete
1477     let a:data['state'] = {}
1478     let a:data['dispose'] = 0
1479     let a:data['exit'] = 0
1480     let a:data['close'] = 0
1481
1482     let l:normalize = get(a:data['opt'], 'normalize', 'raw')
1483
1484     if has('nvim')
1485         let a:data['jobopt'] = {
1486             \ 'on_exit': function('s:spawnNeovimOnExit', [a:data]),
1487             \ }
1488         if l:normalize ==# 'string'
1489             let a:data['normalize'] = function('s:spawnNormalizeNeovimString')
1490         else
1491             let a:data['normalize'] = function('s:spawnNormalizeRaw')
1492         endif
1493         if get(a:data['opt'], 'stdout', 0) | let a:data['jobopt']['on_stdout'] = function('s:spawnNeovimOnStdout', [a:data]) | endif
1494         if get(a:data['opt'], 'stderr', 0) | let a:data['jobopt']['on_stderr'] = function('s:spawnNeovimOnStderr', [a:data]) | endif
1495         if has_key(a:data['opt'], 'env') | let a:data['jobopt']['env'] = a:data['opt']['env'] | endif
1496         let a:data['jobid'] = jobstart(a:data['cmd'], a:data['jobopt'])
1497     else
1498         let a:data['jobopt'] = {
1499             \ 'exit_cb': function('s:spawnVimExitCb', [a:data]),
1500             \ 'close_cb': function('s:spawnVimCloseCb', [a:data]),
1501             \ }
1502         if get(a:data['opt'], 'stdout', 0) | let a:data['jobopt']['out_cb'] = function('s:spawnVimOutCb', [a:data]) | endif
1503         if get(a:data['opt'], 'stderr', 0) | let a:data['jobopt']['err_cb'] = function('s:spawnVimErrCb', [a:data]) | endif
1504         if has_key(a:data['opt'], 'env') | let a:data['jobopt']['env'] = a:data['opt']['env'] | endif
1505         if l:normalize ==# 'array'
1506             let a:data['normalize'] = function('s:spawnNormalizeVimArray')
1507         else
1508             let a:data['normalize'] = function('s:spawnNormalizeRaw')
1509         endif
1510         if has('patch-8.1.350') | let a:data['jobopt']['noblock'] = 1 | endif
1511         let a:data['stdinBuffer'] = ''
1512         let a:data['job'] = job_start(a:data['cmd'], a:data['jobopt'])
1513         let a:data['jobchannel'] = job_getchannel(a:data['job'])
1514         let a:data['jobid'] = ch_info(a:data['jobchannel'])['id']
1515     endif
1516
1517     if a:data['jobid'] < 0 | return | endif " jobstart failed. on_exit will notify with error
1518
1519     if get(a:data['opt'], 'pid', 0)
1520         if has('nvim')
1521             let a:data['pid'] = jobpid(a:data['jobid'])
1522             let l:startdata['pid'] = a:data['pid']
1523         else
1524             let l:jobinfo = job_info(a:data['job'])
1525             if type(l:jobinfo) == type({}) && has_key(l:jobinfo, 'process')
1526                 let a:data['pid'] = l:jobinfo['process']
1527                 let l:startdata['pid'] = a:data['pid']
1528             endif
1529         endif
1530     endif
1531
1532     if get(a:data['opt'], 'start', 0)
1533         let l:startdata = { 'id': a:data['jobid'], 'state': a:data['state'] }
1534         call a:data['next']({ 'event': 'start', 'data': l:startdata })
1535     endif
1536
1537     if has_key(a:data['opt'], 'stdin')
1538         let a:data['stdinDispose'] = lsp#callbag#pipe(
1539             \ a:data['opt']['stdin'],
1540             \ lsp#callbag#subscribe({
1541             \   'next': (has('nvim') ? function('s:spawnNeovimStdinNext', [a:data]) : function('s:spawnVimStdinNext', [a:data])),
1542             \   'error': (has('nvim') ? function('s:spawnNeovimStdinError', [a:data]) : function('s:spawnVimStdinError', [a:data])),
1543             \   'complete': (has('nvim') ? function('s:spawnNeovimStdinComplete', [a:data]) : function('s:spawnVimStdinComplete', [a:data])),
1544             \ }),
1545             \ )
1546     endif
1547
1548     if get(a:data['opt'], 'ready', 0)
1549         let l:readydata = { 'id': a:data['jobid'], 'state': a:data['state'] }
1550         if has_key(a:data, 'pid') | let l:readydata['pid'] = a:data['pid'] | endif
1551         call a:data['next']({ 'event': 'ready', 'data': l:readydata })
1552     endif
1553
1554     return function('s:spawnDispose', [a:data])
1555 endfunction
1556
1557 function! s:spawnJobStop(data) abort
1558     if has('nvim')
1559         try
1560             call jobstop(a:data['jobid'])
1561         catch /^Vim\%((\a\+)\)\=:E900/
1562             " NOTE:
1563             " Vim does not raise exception even the job has already closed so fail
1564             " silently for 'E900: Invalid job id' exception
1565         endtry
1566     else
1567         call job_stop(a:data['job'])
1568     endif
1569 endfunction
1570
1571 function! s:spawnDispose(data) abort
1572     let a:data['dispose'] = 1
1573     call s:spawnJobStop(a:data)
1574 endfunction
1575
1576 function! s:spawnNeovimStdinNext(data, x) abort
1577     call jobsend(a:data['jobid'], a:x)
1578 endfunction
1579
1580 function! s:spawnVimStdinNext(data, x) abort
1581     " Ref: https://groups.google.com/d/topic/vim_dev/UNNulkqb60k/discussion
1582     let a:data['stdinBuffer'] .= a:x
1583     call s:spawnVimStdinNextFlushBuffer(a:data)
1584 endfunction
1585
1586 function! s:spawnVimStdinNextFlushBuffer(data) abort
1587     " https://github.com/vim/vim/issues/2548
1588     " https://github.com/natebosch/vim-lsc/issues/67#issuecomment-357469091
1589     sleep 1m
1590     if len(a:data['stdinBuffer']) <= 4096
1591         call ch_sendraw(a:data['jobchannel'], a:data['stdinBuffer'])
1592         let a:data['stdinBuffer'] = ''
1593     else
1594         let l:to_send = a:data['stdinBuffer'][:4095]
1595         let a:data['stdinBuffer'] = a:data['stdinBuffer'][4096:]
1596         call ch_sendraw(a:data['jobchannel'], l:to_send)
1597         call timer_start(1, function('s:spawnVimStdinNextFlushBuffer', [a:data]))
1598     endif
1599 endfunction
1600
1601 function! s:spawnNeovimStdinError(data, x) abort
1602     let a:data['stdinError'] = a:x
1603     if get(a:data['opt'], 'failOnStdinError', 1) | call s:spawnJobStop(a:data) | endif
1604 endfunction
1605
1606 function! s:spawnVimStdinError(data, x) abort
1607     let a:data['stdinError'] = a:x
1608     if get(a:data['opt'], 'failOnStdinError', 1) | call s:spawnJobStop(a:data) | endif
1609 endfunction
1610
1611 function! s:spawnNeovimStdinComplete(data) abort
1612     call chanclose(a:data['jobid'], 'stdin')
1613 endfunction
1614
1615 function! s:spawnVimStdinComplete(data) abort
1616    " There is no easy way to know when ch_sendraw() finishes writing data
1617    " on a non-blocking channels -- has('patch-8.1.889') -- and because of
1618    " this, we cannot safely call ch_close_in().
1619     while len(a:data['stdinBuffer']) != 0
1620         sleep 1m
1621     endwhile
1622     call ch_close_in(a:data['jobchannel'])
1623 endfunction
1624
1625 function! s:spawnNormalizeRaw(data) abort
1626     return a:data
1627 endfunction
1628
1629 function! s:spawnNormalizeNeovimString(data) abort
1630     " convert array to string since neovim uses array split by \n by default
1631     return join(a:data, "\n")
1632 endfunction
1633
1634 function! s:spawnNormalizeVimArray(data) abort
1635     " convert string to array since vim uses string by default.
1636     return split(a:data, "\n", 1)
1637 endfunction
1638
1639 function! s:spawnNeovimOnStdout(data, id, d, event) abort
1640     call a:data['next']({ 'event': 'stdout', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1641 endfunction
1642
1643 function! s:spawnNeovimOnStderr(data, id, d, event) abort
1644     call a:data['next']({ 'event': 'stderr', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1645 endfunction
1646
1647 function! s:spawnNeovimOnExit(data, id, d, event) abort
1648     let a:data['exit'] = 1
1649     let a:data['close'] = 1
1650     let a:data['exitcode'] = a:d
1651     call s:spawnNotifyExit(a:data)
1652 endfunction
1653
1654 function! s:spawnVimOutCb(data, id, d, ...) abort
1655     call a:data['next']({ 'event': 'stdout', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1656 endfunction
1657
1658 function! s:spawnVimErrCb(data, id, d, ...) abort
1659     call a:data['next']({ 'event': 'stderr', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1660 endfunction
1661
1662 function! s:spawnVimExitCb(data, id, d) abort
1663     let a:data['exit'] = 1
1664     let a:data['exitcode'] = a:d
1665     " for more info refer to :h job-start
1666     " job may exit before we read the output and output may be lost.
1667     " in unix this happens because closing the write end of a pipe
1668     " causes the read end to get EOF.
1669     " close and exit has race condition, so wait for both to complete
1670     if a:data['close'] && a:data['exit']
1671         call s:spawnNotifyExit(a:data)
1672     endif
1673 endfunction
1674
1675 function! s:spawnVimCloseCb(data, id) abort
1676     let a:data['close'] = 1
1677     if a:data['close'] && a:data['exit']
1678         call s:spawnNotifyExit(a:data)
1679     endif
1680 endfunction
1681
1682 function! s:spawnNotifyExit(data) abort
1683     if a:data['dispose'] | return | end
1684     if has_key(a:data, 'stdinDispose') | call a:data['stdinDispose']() | endif
1685     if get(a:data['opt'], 'failOnStdinError', 1) && has_key(a:data, 'stdinError')
1686         call a:data['error'](a:data['stdinError'])
1687         return
1688     endif
1689     if get(a:data['opt'], 'exit', 0)
1690         call a:data['next']({ 'event': 'exit', 'data': a:data['exitcode'], 'state': a:data['state'] })
1691     endif
1692     if get(a:data['opt'], 'failOnNonZeroExitCode', 1) && a:data['exitcode'] != 0
1693         call a:data['error']('Spawn for job ' . a:data['jobid'] .' failed with exit code ' . a:data['exitcode'] . '. ')
1694     else
1695         call a:data['complete']()
1696     endif
1697 endfunction
1698 " }}}
1699
1700 " vim:ts=4:sw=4:ai:foldmethod=marker:foldlevel=0: