All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
1 " https://github.com/prabirshrestha/callbag.vim#82f96a7d97342fbf0286e6578b65a60f2bc1ce33
2 " :CallbagEmbed path=autoload/lsp/callbag.vim namespace=lsp#callbag
4 let s:undefined_token = '__callbag_undefined__'
5 let s:str_type = type('')
7 function! lsp#callbag#undefined() abort
8 return s:undefined_token
11 function! lsp#callbag#isUndefined(d) abort
12 return type(a:d) == s:str_type && a:d ==# s:undefined_token
15 function! s:noop(...) abort
18 function! s:createArrayWithSize(size, defaultValue) abort
22 call add(l:array, a:defaultValue)
29 function! lsp#callbag#pipe(...) abort
33 let l:Res = a:000[l:i](l:Res)
41 function! lsp#callbag#operate(...) abort
42 let l:data = { 'cbs': a:000 }
43 return function('s:operateFactory', [l:data])
46 function! s:operateFactory(data, src) abort
48 let l:n = len(a:data['cbs'])
51 let l:Res = a:data['cbs'][l:i](l:Res)
59 function! lsp#callbag#makeSubject() abort
60 let l:data = { 'sinks': [] }
61 return function('s:makeSubjectFactory', [l:data])
64 function! s:makeSubjectFactory(data, t, d) abort
67 call add(a:data['sinks'], l:Sink)
68 call l:Sink(0, function('s:makeSubjectSinkCallback', [a:data, l:Sink]))
70 let l:zinkz = copy(a:data['sinks'])
72 let l:n = len(l:zinkz)
74 let l:Sink = l:zinkz[l:i]
77 for l:Item in a:data['sinks']
93 function! s:makeSubjectSinkCallback(data, Sink, t, d) abort
97 for l:Item in a:data['sinks']
105 call remove(a:data['sinks'], l:i)
112 function! lsp#callbag#create(...) abort
115 let l:data['prod'] = a:1
117 return function('s:createProd', [l:data])
120 function! s:createProd(data, start, sink) abort
121 if a:start != 0 | return | endif
122 let a:data['sink'] = a:sink
123 if !has_key(a:data, 'prod') || type(a:data['prod']) != type(function('s:noop'))
124 call a:sink(0, function('s:noop'))
125 call a:sink(2, lsp#callbag#undefined())
128 let a:data['end'] = 0
129 call a:sink(0, function('s:createSinkCallback', [a:data]))
130 if a:data['end'] | return | endif
131 let a:data['clean'] = a:data['prod'](function('s:createNext', [a:data]), function('s:createError', [a:data]), function('s:createComplete', [a:data]))
134 function! s:createSinkCallback(data, t, ...) abort
136 let a:data['end'] = (a:t == 2)
137 if a:data['end'] && has_key(a:data, 'clean') && type(a:data['clean']) == type(function('s:noop'))
138 call a:data['clean']()
143 function! s:createNext(data, d) abort
144 if !a:data['end'] | call a:data['sink'](1, a:d) | endif
147 function! s:createError(data, e) abort
148 if !a:data['end'] && !lsp#callbag#isUndefined(a:e)
149 let a:data['end'] = 1
150 call a:data['sink'](2, a:e)
154 function! s:createComplete(data) abort
156 let a:data['end'] = 1
157 call a:data['sink'](2, lsp#callbag#undefined())
163 function! lsp#callbag#lazy(F) abort
164 let l:data = { 'F': a:F }
165 return function('s:lazyFactory', [l:data])
168 function! s:lazyFactory(data, start, sink) abort
169 if a:start != 0 | return | endif
170 let a:data['sink'] = a:sink
171 let a:data['unsubed'] = 0
172 call a:data['sink'](0, function('s:lazySinkCallback', [a:data]))
173 call a:data['sink'](1, a:data['F']())
174 if !a:data['unsubed'] | call a:data['sink'](2, lsp#callbag#undefined()) | endif
177 function! s:lazySinkCallback(data, t, d) abort
178 if a:t == 2 | let a:data['unsubed'] = 1 | endif
183 function! lsp#callbag#empty() abort
185 return function('s:emptyStart', [l:data])
188 function! s:emptyStart(data, start, sink) abort
189 if a:start != 0 | return | endif
190 let a:data['disposed'] = 0
191 call a:sink(0, function('s:emptySinkCallback', [a:data]))
192 if a:data['disposed'] | return | endif
193 call a:sink(2, lsp#callbag#undefined())
196 function! s:emptySinkCallback(data, t, ...) abort
197 if a:t != 2 | return | endif
198 let a:data['disposed'] = 1
201 function! s:empty_sink_callback(data, t, ...) abort
202 if a:t == 2 | call timer_stop(a:data['timer']) | endif
207 function! lsp#callbag#never() abort
208 return function('s:never')
211 function! s:never(start, sink) abort
212 if a:start != 0 | return | endif
213 call a:sink(0, function('s:noop'))
218 function! lsp#callbag#forEach(operation) abort
219 let l:data = { 'operation': a:operation }
220 return function('s:forEachOperation', [l:data])
223 function! s:forEachOperation(data, source) abort
224 return a:source(0, function('s:forEachOperationSource', [a:data]))
227 function! s:forEachOperationSource(data, t, d) abort
228 if a:t == 0 | let a:data['talkback'] = a:d | endif
229 if a:t == 1 | call a:data['operation'](a:d) | endif
230 if (a:t == 1 || a:t == 0) | call a:data['talkback'](1, lsp#callbag#undefined()) | endif
235 function! lsp#callbag#tap(...) abort
237 if a:0 > 0 && type(a:1) == type({}) " a:1 { next, error, complete }
238 if has_key(a:1, 'next') | let l:data['next'] = a:1['next'] | endif
239 if has_key(a:1, 'error') | let l:data['error'] = a:1['error'] | endif
240 if has_key(a:1, 'complete') | let l:data['complete'] = a:1['complete'] | endif
241 else " a:1 = next, a:2 = error, a:3 = complete
242 if a:0 >= 1 | let l:data['next'] = a:1 | endif
243 if a:0 >= 2 | let l:data['error'] = a:2 | endif
244 if a:0 >= 3 | let l:data['complete'] = a:3 | endif
246 return function('s:tapFactory', [l:data])
249 function! s:tapFactory(data, source) abort
250 let a:data['source'] = a:source
251 return function('s:tapSouceFactory', [a:data])
254 function! s:tapSouceFactory(data, start, sink) abort
255 if a:start != 0 | return | endif
256 let a:data['sink'] = a:sink
257 call a:data['source'](0, function('s:tapSourceCallback', [a:data]))
260 function! s:tapSourceCallback(data, t, d) abort
261 if a:t == 1 && has_key(a:data, 'next') | call a:data['next'](a:d) | endif
262 if a:t == 2 && lsp#callbag#isUndefined(a:d) && has_key(a:data, 'complete') | call a:data['complete']() | endif
263 if a:t == 2 && !lsp#callbag#isUndefined(a:d) && has_key(a:data, 'error') | call a:data['error'](a:d) | endif
264 call a:data['sink'](a:t, a:d)
269 function! lsp#callbag#interval(period) abort
270 let l:data = { 'period': a:period }
271 return function('s:intervalPeriod', [l:data])
274 function! s:intervalPeriod(data, start, sink) abort
275 if a:start != 0 | return | endif
277 let a:data['sink'] = a:sink
278 let a:data['timer'] = timer_start(a:data['period'], function('s:interval_callback', [a:data]), { 'repeat': -1 })
279 call a:sink(0, function('s:interval_sink_callback', [a:data]))
282 function! s:interval_callback(data, ...) abort
283 let l:i = a:data['i']
284 let a:data['i'] = a:data['i'] + 1
285 call a:data['sink'](1, l:i)
288 function! s:interval_sink_callback(data, t, ...) abort
289 if a:t == 2 | call timer_stop(a:data['timer']) | endif
294 function! lsp#callbag#delay(period) abort
295 let l:data = { 'period': a:period }
296 return function('s:delayPeriod', [l:data])
299 function! s:delayPeriod(data, source) abort
300 let a:data['source'] = a:source
301 return function('s:delayFactory', [a:data])
304 function! s:delayFactory(data, start, sink) abort
305 if a:start != 0 | return | endif
306 let a:data['sink'] = a:sink
307 call a:data['source'](0, function('s:delaySourceCallback', [a:data]))
310 function! s:delaySourceCallback(data, t, d) abort
312 call a:data['sink'](a:t, a:d)
315 let a:data['d'] = a:d
316 call timer_start(a:data['period'], function('s:delayTimerCallback', [a:data]))
319 function! s:delayTimerCallback(data, ...) abort
320 call a:data['sink'](1, a:data['d'])
325 function! lsp#callbag#take(max) abort
326 let l:data = { 'max': a:max }
327 return function('s:takeMax', [l:data])
330 function! s:takeMax(data, source) abort
331 let a:data['source'] = a:source
332 return function('s:takeMaxSource', [a:data])
335 function! s:takeMaxSource(data, start, sink) abort
336 if a:start != 0 | return | endif
337 let a:data['taken'] = 0
338 let a:data['end'] = 0
339 let a:data['sink'] = a:sink
340 let a:data['talkback'] = function('s:takeTalkback', [a:data])
341 call a:data['source'](0, function('s:takeSourceCallback', [a:data]))
344 function! s:takeTalkback(data, t, d) abort
346 let a:data['end'] = 1
347 call a:data['sourceTalkback'](a:t, a:d)
348 elseif a:data['taken'] < a:data['max']
349 call a:data['sourceTalkback'](a:t, a:d)
353 function! s:takeSourceCallback(data, t, d) abort
355 let a:data['sourceTalkback'] = a:d
356 call a:data['sink'](0, a:data['talkback'])
358 if a:data['taken'] < a:data['max']
359 let a:data['taken'] = a:data['taken'] + 1
360 call a:data['sink'](a:t, a:d)
361 if a:data['taken'] == a:data['max'] && !a:data['end']
362 let a:data['end'] = 1
363 call a:data['sink'](2, lsp#callbag#undefined())
364 call a:data['sourceTalkback'](2, lsp#callbag#undefined())
368 call a:data['sink'](a:t, a:d)
374 function! lsp#callbag#skip(max) abort
375 let l:data = { 'max': a:max }
376 return function('s:skipMax', [l:data])
379 function! s:skipMax(data, source) abort
380 let a:data['source'] = a:source
381 return function('s:skipMaxSource', [a:data])
384 function! s:skipMaxSource(data, start, sink) abort
385 if a:start != 0 | return | endif
386 let a:data['sink'] = a:sink
387 let a:data['skipped'] = 0
388 call a:data['source'](0, function('s:skipSouceCallback', [a:data]))
391 function! s:skipSouceCallback(data, t, d) abort
393 let a:data['talkback'] = a:d
394 call a:data['sink'](a:t, a:d)
396 if a:data['skipped'] < a:data['max']
397 let a:data['skipped'] = a:data['skipped'] + 1
398 call a:data['talkback'](1, lsp#callbag#undefined())
400 call a:data['sink'](a:t, a:d)
403 call a:data['sink'](a:t, a:d)
409 function! lsp#callbag#map(F) abort
410 let l:data = { 'f': a:F }
411 return function('s:mapF', [l:data])
414 function! s:mapF(data, source) abort
415 let a:data['source'] = a:source
416 return function('s:mapFSource', [a:data])
419 function! s:mapFSource(data, start, sink) abort
420 if a:start != 0 | return | endif
421 let a:data['sink'] = a:sink
422 call a:data['source'](0, function('s:mapFSourceCallback', [a:data]))
425 function! s:mapFSourceCallback(data, t, d) abort
426 call a:data['sink'](a:t, a:t == 1 ? a:data['f'](a:d) : a:d)
431 function! lsp#callbag#filter(condition) abort
432 let l:data = { 'condition': a:condition }
433 return function('s:filterCondition', [l:data])
436 function! s:filterCondition(data, source) abort
437 let a:data['source'] = a:source
438 return function('s:filterConditionSource', [a:data])
441 function! s:filterConditionSource(data, start, sink) abort
442 if a:start != 0 | return | endif
443 let a:data['sink'] = a:sink
444 call a:data['source'](0, function('s:filterSourceCallback', [a:data]))
447 function! s:filterSourceCallback(data, t, d) abort
449 let a:data['talkback'] = a:d
450 call a:data['sink'](a:t, a:d)
452 if a:data['condition'](a:d)
453 call a:data['sink'](a:t, a:d)
455 call a:data['talkback'](1, lsp#callbag#undefined())
458 call a:data['sink'](a:t, a:d)
464 let s:event_prefix_index = 0
465 function! lsp#callbag#fromEvent(events, ...) abort
466 let l:data = { 'events': a:events }
468 let l:data['augroup'] = a:1
470 let l:data['augroup'] = '__callbag_fromEvent_prefix_' . s:event_prefix_index . '__'
471 let s:event_prefix_index = s:event_prefix_index + 1
473 return function('s:fromEventFactory', [l:data])
476 let s:event_handler_index = 0
477 let s:event_handlers_data = {}
478 function! s:fromEventFactory(data, start, sink) abort
479 if a:start != 0 | return | endif
480 let a:data['sink'] = a:sink
481 let a:data['disposed'] = 0
482 let a:data['handler'] = function('s:fromEventHandlerCallback', [a:data])
483 let a:data['handler_index'] = s:event_handler_index
484 let s:event_handler_index = s:event_handler_index + 1
485 call a:sink(0, function('s:fromEventSinkHandler', [a:data]))
487 if a:data['disposed'] | return | endif
488 let s:event_handlers_data[a:data['handler_index']] = a:data
490 execute 'augroup ' . a:data['augroup']
492 let l:events = type(a:data['events']) == type('') ? [a:data['events']] : a:data['events']
493 for l:event in l:events
494 let l:exec = 'call s:notify_event_handler(' . a:data['handler_index'] . ')'
495 if type(l:event) == type('')
496 execute 'au ' . l:event . ' * ' . l:exec
498 execute 'au ' . join(l:event, ' ') .' ' . l:exec
501 execute 'augroup end'
504 function! s:fromEventHandlerCallback(data) abort
505 " send v:event if it exists
506 call a:data['sink'](1, lsp#callbag#undefined())
509 function! s:fromEventSinkHandler(data, t, ...) abort
510 if a:t != 2 | return | endif
511 let a:data['disposed'] = 1
512 execute 'augroup ' a:data['augroup']
514 execute 'augroup end'
515 if has_key(s:event_handlers_data, a:data['handler_index'])
516 call remove(s:event_handlers_data, a:data['handler_index'])
520 function! s:notify_event_handler(index) abort
521 let l:data = s:event_handlers_data[a:index]
522 call l:data['handler']()
527 function! lsp#callbag#fromPromise(promise) abort
528 let l:data = { 'promise': a:promise }
529 return function('s:fromPromiseFactory', [l:data])
532 function! s:fromPromiseFactory(data, start, sink) abort
533 if a:start != 0 | return | endif
534 let a:data['sink'] = a:sink
535 let a:data['ended'] = 0
536 call a:data['promise'].then(
537 \ function('s:fromPromiseOnFulfilledCallback', [a:data]),
538 \ function('s:fromPromiseOnRejectedCallback', [a:data]),
540 call a:sink(0, function('s:fromPromiseSinkCallback', [a:data]))
543 function! s:fromPromiseOnFulfilledCallback(data, ...) abort
544 if a:data['ended'] | return | endif
545 call a:data['sink'](1, a:0 > 0 ? a:1 : lsp#callbag#undefined())
546 if a:data['ended'] | return | endif
547 call a:data['sink'](2, lsp#callbag#undefined())
550 function! s:fromPromiseOnRejectedCallback(data, err) abort
551 if a:data['ended'] | return | endif
552 call a:data['sink'](2, a:err)
555 function! s:fromPromiseSinkCallback(data, t, ...) abort
556 if a:t == 2 | let a:data['ended'] = 1 | endif
561 function! lsp#callbag#debounceTime(duration) abort
562 let l:data = { 'duration': a:duration }
563 return function('s:debounceTimeDuration', [l:data])
566 function! s:debounceTimeDuration(data, source) abort
567 let a:data['source'] = a:source
568 return function('s:debounceTimeDurationSource', [a:data])
571 function! s:debounceTimeDurationSource(data, start, sink) abort
572 if a:start != 0 | return | endif
573 let a:data['sink'] = a:sink
574 call a:data['source'](0, function('s:debounceTimeSourceCallback', [a:data]))
577 function! s:debounceTimeSourceCallback(data, t, d) abort
578 if has_key(a:data, 'timer') | call timer_stop(a:data['timer']) | endif
580 let a:data['timer'] = timer_start(a:data['duration'], function('s:debounceTimeTimerCallback', [a:data, a:d]))
582 call a:data['sink'](a:t, a:d)
586 function! s:debounceTimeTimerCallback(data, d, ...) abort
587 call a:data['sink'](1, a:d)
592 function! lsp#callbag#subscribe(...) abort
594 if a:0 > 0 && type(a:1) == type({}) " a:1 { next, error, complete }
595 if has_key(a:1, 'next') | let l:data['next'] = a:1['next'] | endif
596 if has_key(a:1, 'error') | let l:data['error'] = a:1['error'] | endif
597 if has_key(a:1, 'complete') | let l:data['complete'] = a:1['complete'] | endif
598 else " a:1 = next, a:2 = error, a:3 = complete
599 if a:0 >= 1 | let l:data['next'] = a:1 | endif
600 if a:0 >= 2 | let l:data['error'] = a:2 | endif
601 if a:0 >= 3 | let l:data['complete'] = a:3 | endif
603 return function('s:subscribeListener', [l:data])
606 function! s:subscribeListener(data, source) abort
607 call a:source(0, function('s:subscribeSourceCallback', [a:data]))
608 return function('s:subscribeDispose', [a:data])
611 function! s:subscribeSourceCallback(data, t, d) abort
612 if a:t == 0 | let a:data['talkback'] = a:d | endif
613 if a:t == 1 && has_key(a:data, 'next') | call a:data['next'](a:d) | endif
614 if a:t == 1 || a:t == 0 | call a:data['talkback'](1, lsp#callbag#undefined()) | endif
615 if a:t == 2 && lsp#callbag#isUndefined(a:d) && has_key(a:data, 'complete') | call a:data['complete']() | endif
616 if a:t == 2 && !lsp#callbag#isUndefined(a:d) && has_key(a:data, 'error') | call a:data['error'](a:d) | endif
619 function! s:subscribeDispose(data, ...) abort
620 if has_key(a:data, 'talkback') | call a:data['talkback'](2, lsp#callbag#undefined()) | endif
625 function! lsp#callbag#toList() abort
626 let l:data = { 'done': 0, 'items': [], 'unsubscribed': 0 }
627 return function('s:toListFactory', [l:data])
630 function! s:toListFactory(data, source) abort
631 let a:data['unsubscribe'] = lsp#callbag#subscribe(
632 \ function('s:toListOnNext', [a:data]),
633 \ function('s:toListOnError', [a:data]),
634 \ function('s:toListOnComplete', [a:data])
636 if a:data['done'] | call s:toListUnsubscribe(a:data) | endif
638 \ 'unsubscribe': function('s:toListUnsubscribe', [a:data]),
639 \ 'wait': function('s:toListWait', [a:data])
643 function! s:toListUnsubscribe(data) abort
644 if !has_key(a:data, 'unsubscribe') | return | endif
645 if !a:data['unsubscribed']
646 call a:data['unsubscribe']()
647 let a:data['unsubscribed'] = 1
649 let a:data['done'] = 1
651 throw 'lsp#callbag toList() is already unsubscribed.'
653 let a:data['error'] = v:exception . ' ' . v:throwpoint
659 function! s:toListOnNext(data, item) abort
660 call add(a:data['items'], a:item)
663 function! s:toListOnError(data, error) abort
664 let a:data['done'] = 1
665 let a:data['error'] = a:error
666 call s:toListUnsubscribe(a:data)
669 function! s:toListOnComplete(data) abort
670 let a:data['done'] = 1
671 call s:toListUnsubscribe(a:data)
674 function! s:toListWait(data, ...) abort
676 if has_key(a:data, 'error')
677 throw a:data['error']
679 return a:data['items']
682 let l:opt = a:0 > 0 ? copy(a:1) : {}
683 let l:opt['timedout'] = 0
684 let l:opt['sleep'] = get(l:opt, 'sleep', 1)
685 let l:opt['timeout'] = get(l:opt, 'timeout', -1)
687 if l:opt['timeout'] > -1
688 let l:opt['timer'] = timer_start(l:opt['timeout'], function('s:toListTimeoutCallback', [l:opt]))
691 while !a:data['done'] && !l:opt['timedout']
692 exec 'sleep ' . l:opt['sleep'] . 'm'
695 if has_key(l:opt, 'timer')
696 silent! call timer_stop(l:opt['timer'])
700 throw 'lsp#callbag toList().wait() timedout.'
703 if has_key(a:data, 'error')
704 throw a:data['error']
706 return a:data['items']
711 function! s:toListTimeoutCallback(opt, ...) abort
712 let a:opt['timedout'] = 1
717 function! lsp#callbag#throwError(error) abort
718 let l:data = { 'error': a:error }
719 return function('s:throwErrorFactory', [l:data])
722 function! s:throwErrorFactory(data, start, sink) abort
723 if a:start != 0 | return | endif
724 let a:data['disposed'] = 0
725 call a:sink(0, function('s:throwErrorSinkCallback', [a:data]))
726 if a:data['disposed'] | return | endif
727 call a:sink(2, a:data['error'])
730 function! s:throwErrorSinkCallback(data, t, ...) abort
731 if a:t != 2 | return | endif
732 let a:data['disposed'] = 1
737 function! lsp#callbag#of(...) abort
738 let l:data = { 'values': a:000 }
739 return function('s:listFactory', [l:data])
744 function! lsp#callbag#fromList(list) abort
745 let l:data = { 'values': a:list }
746 return function('s:listFactory', [l:data])
749 function! s:listFactory(data, start, sink) abort
750 if a:start != 0 | return | endif
751 let a:data['disposed'] = 0
752 call a:sink(0, function('s:listSinkCallback', [a:data]))
754 let l:n = len(a:data['values'])
756 if a:data['disposed'] | break | endif
757 call a:sink(1, a:data['values'][l:i])
760 if a:data['disposed'] | return | endif
761 call a:sink(2, lsp#callbag#undefined())
765 function! s:listSinkCallback(data, t, ...) abort
766 if a:t != 2 | return | endif
767 let a:data['disposed'] = 1
772 function! lsp#callbag#merge(...) abort
773 let l:data = { 'sources': a:000 }
774 return function('s:mergeFactory', [l:data])
777 function! s:mergeFactory(data, start, sink) abort
778 if a:start != 0 | return | endif
779 let a:data['sink'] = a:sink
780 let a:data['n'] = len(a:data['sources'])
781 let a:data['sourceTalkbacks'] = []
782 let a:data['startCount'] = 0
783 let a:data['endCount'] = 0
784 let a:data['ended'] = 0
785 let a:data['talkback'] = function('s:mergeTalkbackCallback', [a:data])
787 while l:i < a:data['n']
788 if a:data['ended'] | return | endif
789 call a:data['sources'][l:i](0, function('s:mergeSourceCallback', [a:data, l:i]))
794 function! s:mergeTalkbackCallback(data, t, d) abort
795 if a:t == 2 | let a:data['ended'] = 1 | endif
797 while l:i < a:data['n']
798 if l:i < len(a:data['sourceTalkbacks']) && a:data['sourceTalkbacks'][l:i] != 0
799 call a:data['sourceTalkbacks'][l:i](a:t, a:d)
805 function! s:mergeSourceCallback(data, i, t, d) abort
807 call insert(a:data['sourceTalkbacks'], a:d, a:i)
808 let a:data['startCount'] += 1
809 if a:data['startCount'] == 1 | call a:data['sink'](0, a:data['talkback']) | endif
810 elseif a:t == 2 && !lsp#callbag#isUndefined(a:d)
811 let a:data['ended'] = 1
813 while l:j < a:data['n']
814 if l:j != a:i && l:j < len(a:data['sourceTalkbacks']) && a:data['sourceTalkbacks'][l:j] != 0
815 call a:data['sourceTalkbacks'][l:j](2, lsp#callbag#undefined())
819 call a:data['sink'](2, a:d)
821 let a:data['sourceTalkbacks'][a:i] = 0
822 let a:data['endCount'] += 1
823 if a:data['endCount'] == a:data['n'] | call a:data['sink'](2, lsp#callbag#undefined()) | endif
825 call a:data['sink'](a:t, a:d)
831 function! lsp#callbag#concat(...) abort
832 let l:data = { 'sources': a:000 }
833 return function('s:concatFactory', [l:data])
836 let s:concatUniqueToken = '__callback__concat_unique_token__'
837 function! s:concatFactory(data, start, sink) abort
838 if a:start != 0 | return | endif
839 let a:data['sink'] = a:sink
840 let a:data['n'] = len(a:data['sources'])
842 call a:data['sink'](0, function('s:noop'))
843 call a:data['sink'](2, lsp#callbag#undefined())
847 let a:data['lastPull'] = s:concatUniqueToken
848 let a:data['talkback'] = function('s:concatTalkbackCallback', [a:data])
849 let a:data['next'] = function('s:concatNext', [a:data])
850 call a:data['next']()
853 function! s:concatTalkbackCallback(data, t, d) abort
854 if a:t == 1 | let a:data['lastPull'] = a:d | endif
855 call a:data['sourceTalkback'](a:t, a:d)
858 function! s:concatNext(data) abort
859 if a:data['i'] == a:data['n']
860 call a:data['sink'](2, lsp#callbag#undefined())
863 call a:data['sources'][a:data['i']](0, function('s:concatSourceCallback', [a:data]))
866 function! s:concatSourceCallback(data, t, d) abort
868 let a:data['sourceTalkback'] = a:d
870 call a:data['sink'](0, a:data['talkback'])
871 elseif (a:data['lastPull']) != s:concatUniqueToken
872 call a:data['sourceTalkback'](1, a:data['lastPull'])
874 elseif a:t == 2 && a:d != lsp#callbag#undefined()
875 call a:data['sink'](2, a:d)
877 let a:data['i'] = a:data['i'] + 1
878 call a:data['next']()
880 call a:data['sink'](a:t, a:d)
886 function! lsp#callbag#combine(...) abort
887 let l:data = { 'sources': a:000 }
888 return function('s:combineFactory', [l:data])
891 let s:combineEmptyToken = '__callback__combine_empty_token__'
892 function! s:combineFactory(data, start, sink) abort
893 if a:start != 0 | return | endif
894 let a:data['sink'] = a:sink
895 let a:data['n'] = len(a:data['sources'])
897 call a:data['sink'](0, function('s:noop'))
898 call a:data['sink'](1, [])
899 call a:data['sink'](2, lsp#callbag#undefined())
902 let a:data['Ns'] = a:data['n'] " start counter
903 let a:data['Nd'] = a:data['n'] " data counter
904 let a:data['Ne'] = a:data['n'] " end counter
905 let a:data['vals'] = s:createArrayWithSize(a:data['n'], lsp#callbag#undefined())
906 let a:data['sourceTalkbacks'] = s:createArrayWithSize(a:data['n'], lsp#callbag#undefined())
907 let a:data['talkback'] = function('s:combineTalkbackCallback', [a:data])
909 for l:Source in a:data['sources']
910 let a:data['vals'][l:i] = s:combineEmptyToken
911 call l:Source(0, function('s:combineSourceCallback', [a:data, l:i]))
916 function! s:combineTalkbackCallback(data, t, d) abort
917 if a:t == 0 | return | endif
919 while l:i < a:data['n']
920 call a:data['sourceTalkbacks'][l:i](a:t, a:d)
925 function! s:combineSourceCallback(data, i, t, d) abort
927 let a:data['sourceTalkbacks'][a:i] = a:d
928 let a:data['Ns'] = a:data['Ns'] - 1
929 if a:data['Ns'] == 0 | call a:data['sink'](0, a:data['talkback']) | endif
934 if a:data['vals'][a:i] == s:combineEmptyToken
935 let a:data['Nd'] = a:data['Nd'] - 1
937 let l:_Nd = a:data['Nd']
939 let a:data['vals'][a:i] = a:d
941 let l:arr = s:createArrayWithSize(a:data['n'], lsp#callbag#undefined())
943 while l:j < a:data['n']
944 let l:arr[l:j] = a:data['vals'][l:j]
947 call a:data['sink'](1, l:arr)
950 let a:data['Ne'] = a:data['Ne'] - 1
952 call a:data['sink'](2, lsp#callbag#undefined())
955 call a:data['sink'](a:t, a:d)
960 " distinctUntilChanged {{{
961 function! s:distinctUntilChangedDefaultCompare(a, b) abort
965 function! lsp#callbag#distinctUntilChanged(...) abort
966 let l:data = { 'compare': a:0 == 0 ? function('s:distinctUntilChangedDefaultCompare') : a:1 }
967 return function('s:distinctUntilChangedSourceFactory', [l:data])
970 function! s:distinctUntilChangedSourceFactory(data, source) abort
971 let a:data['source'] = a:source
972 return function('s:distinctUntilChangedSinkFactory', [a:data])
975 function! s:distinctUntilChangedSinkFactory(data, start, sink) abort
976 if a:start != 0 | return | endif
977 let a:data['sink'] = a:sink
978 let a:data['inited'] = 0
979 call a:data['source'](0, function('s:distinctUntilChangedSourceCallback', [a:data]))
982 function! s:distinctUntilChangedSourceCallback(data, t, d) abort
983 if a:t == 0 | let a:data['talkback'] = a:d | endif
985 call a:data['sink'](a:t, a:d)
989 if a:data['inited'] && has_key(a:data, 'prev') && a:data['compare'](a:data['prev'], a:d)
990 call a:data['talkback'](1, lsp#callbag#undefined())
994 let a:data['inited'] = 1
995 let a:data['prev'] = a:d
996 call a:data['sink'](1, a:d)
1001 function! lsp#callbag#takeUntil(notfier) abort
1002 let l:data = { 'notifier': a:notfier }
1003 return function('s:takeUntilNotifier', [l:data])
1006 function! s:takeUntilNotifier(data, source) abort
1007 let a:data['source'] = a:source
1008 return function('s:takeUntilFactory', [a:data])
1011 let s:takeUntilUniqueToken = '__callback__take_until_unique_token__'
1013 function! s:takeUntilFactory(data, start, sink) abort
1014 if a:start != 0 | return | endif
1015 let a:data['sink'] = a:sink
1016 let a:data['inited'] = 1
1017 let a:data['sourceTalkback'] = 0
1018 let a:data['notiferTalkback'] = 0
1019 let a:data['done'] = s:takeUntilUniqueToken
1020 call a:data['source'](0, function('s:takeUntilSourceCallback', [a:data]))
1023 function! s:takeUntilSourceCallback(data, t, d) abort
1025 let a:data['sourceTalkback'] = a:d
1026 call a:data['notifier'](0, function('s:takeUntilNotifierCallback', [a:data]))
1027 let a:data['inited'] = 1
1028 call a:data['sink'](0, function('s:takeUntilSinkCallback', [a:data]))
1029 if a:data['done'] != s:takeUntilUniqueToken | call a:data['sink'](2, a:data['done']) | endif
1033 call a:data['notifierTalkback'](2, lsp#callbag#undefined())
1035 if a:data['done'] == s:takeUntilUniqueToken
1036 call a:data['sink'](a:t, a:d)
1040 function! s:takeUntilNotifierCallback(data, t, d) abort
1042 let a:data['notifierTalkback'] = a:d
1043 call a:data['notifierTalkback'](1, lsp#callbag#undefined())
1047 let a:data['done'] = 0
1048 call a:data['notifierTalkback'](2, lsp#callbag#undefined())
1049 call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1050 if a:data['inited'] | call a:data['sink'](2, lsp#callbag#undefined()) | endif
1054 let a:data['notifierTalkback'] = 0
1055 let a:data['done'] = a:d
1057 call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1058 if a:data['inited'] | call a:data['sink'](a:t, a:d) | endif
1063 function! s:takeUntilSinkCallback(data, t, d) abort
1064 if a:data['done'] != s:takeUntilUniqueToken | return | endif
1065 if a:t == 2 && has_key(a:data, 'notifierTalkback') && a:data['notifierTalkback'] != 0 | call a:data['notifierTalkback'](2, lsp#callbag#undefined()) | endif
1066 call a:data['sourceTalkback'](a:t, a:d)
1071 function! lsp#callbag#takeWhile(predicate) abort
1072 let l:data = { 'predicate': a:predicate }
1073 return function('s:takeWhileFactory', [l:data])
1076 function! s:takeWhileFactory(data, source) abort
1077 let a:data['source'] = a:source
1078 return function('s:takeWhileSourceFactory', [a:data])
1081 function! s:takeWhileSourceFactory(data, start, sink) abort
1082 if a:start != 0 | return | endif
1083 let a:data['sink'] = a:sink
1084 call a:data['source'](0, function('s:takeWhileSourceCallback', [a:data]))
1087 function! s:takeWhileSourceCallback(data, t, d) abort
1089 let a:data['sourceTalkback'] = a:d
1092 if a:t == 1 && !a:data['predicate'](a:d)
1093 call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1094 call a:data['sink'](2, lsp#callbag#undefined())
1098 call a:data['sink'](a:t, a:d)
1103 function! lsp#callbag#group(n) abort
1104 let l:data = { 'n': a:n }
1105 return function('s:groupN', [l:data])
1108 function! s:groupN(data, source) abort
1109 let a:data['source'] = a:source
1110 return function('s:groupFactory', [a:data])
1113 function! s:groupFactory(data, start, sink) abort
1114 if a:start != 0 | return | endif
1115 let a:data['sink'] = a:sink
1116 let a:data['chunk'] = []
1117 call a:data['source'](0, function('s:groupSourceCallback', [a:data]))
1120 function! s:groupSourceCallback(data, t, d) abort
1121 if a:t == 0 | let a:data['talkback'] = a:d | endif
1123 call add(a:data['chunk'], a:d)
1124 if len(a:data['chunk']) == a:data['n']
1125 call a:data['sink'](a:t, remove(a:data['chunk'], 0, a:data['n'] - 1))
1127 call a:data['talkback'](1, lsp#callbag#undefined())
1129 if a:t == 2 && len(a:data['chunk']) > 0
1130 call a:data['sink'](1, remove(a:data['chunk'], 0, len(a:data['chunk']) - 1))
1132 call a:data['sink'](a:t, a:d)
1139 function! lsp#callbag#flatten() abort
1140 return function('s:flattenSource')
1143 function! s:flattenSource(source) abort
1144 let l:data = { 'source': a:source }
1145 return function('s:flattenFactory', [l:data])
1148 function! s:flattenFactory(data, start, sink) abort
1149 if a:start != 0 | return | endif
1150 let a:data['sink'] = a:sink
1151 let a:data['outerEnded'] = 0
1152 let a:data['outerTalkback'] = 0
1153 let a:data['innerTalkback'] = 0
1154 let a:data['talkback'] = function('s:flattenTalkbackCallback', [a:data])
1155 call a:data['source'](0, function('s:flattenSourceCallback', [a:data]))
1158 function! s:flattenTalkbackCallback(data, t, d) abort
1160 if a:data['innerTalkback'] != 0
1161 call a:data['innerTalkback'](1, a:d)
1163 call a:data['outerTalkback'](1, a:d)
1167 if a:data['innerTalkback'] != 0 | call a:data['innerTalkback'](2, lsp#callbag#undefined()) | endif
1168 call a:data['outerTalkback'](2, lsp#callbag#undefined())
1172 function! s:flattenSourceCallback(data, t, d) abort
1174 let a:data['outerTalkback'] = a:d
1175 call a:data['sink'](0, a:data['talkback'])
1177 let l:InnerSource = a:d
1178 if a:data['innerTalkback'] != 0 | call a:data['innerTalkback'](2, lsp#callbag#undefined()) | endif
1179 call l:InnerSource(0, function('s:flattenInnerSourceCallback', [a:data]))
1180 elseif a:t == 2 && !lsp#callbag#isUndefined(a:d)
1181 if a:data['innerTalkback'] != 0 | call a:data['innerTalkback'](2, lsp#callbag#undefined()) | endif
1182 call a:data['outerTalkback'](1, a:d)
1184 if a:data['innerTalkback'] == 0
1185 call a:data['sink'](2, lsp#callbag#undefined())
1187 let a:data['outerEnded'] = 1
1192 function! s:flattenInnerSourceCallback(data, t, d) abort
1194 let a:data['innerTalkback'] = a:d
1195 call a:data['innerTalkback'](1, lsp#callbag#undefined())
1197 call a:data['sink'](1, a:d)
1198 elseif a:t == 2 && !lsp#callbag#isUndefined(a:d)
1199 call a:data['outerTalkback'](2, lsp#callbag#undefined())
1200 call a:data['sink'](2, a:d)
1202 if a:data['outerEnded'] != 0
1203 call a:data['sink'](2, lsp#callbag#undefined())
1205 let a:data['innerTalkback'] = 0
1206 call a:data['outerTalkback'](1, lsp#callbag#undefined())
1213 function! lsp#callbag#flatMap(F) abort
1214 return lsp#callbag#operate(
1215 \ lsp#callbag#map(a:F),
1216 \ lsp#callbag#flatten(),
1222 function! lsp#callbag#scan(reducer, seed) abort
1223 let l:data = { 'reducer': a:reducer, 'seed': a:seed }
1224 return function('s:scanSource', [l:data])
1227 function! s:scanSource(data, source) abort
1228 let a:data['source'] = a:source
1229 return function('s:scanFactory', [a:data])
1232 function! s:scanFactory(data, start, sink) abort
1233 if a:start != 0 | return | endif
1234 let a:data['sink'] = a:sink
1235 let a:data['acc'] = a:data['seed']
1236 call a:data['source'](0, function('s:scanSourceCallback', [a:data]))
1239 function! s:scanSourceCallback(data, t, d) abort
1241 let a:data['acc'] = a:data['reducer'](a:data['acc'], a:d)
1242 call a:data['sink'](1, a:data['acc'])
1244 call a:data['sink'](a:t, a:d)
1250 function! lsp#callbag#reduce(reducer, seed) abort
1251 let l:data = { 'reducer': a:reducer, 'seed': a:seed }
1252 return function('s:reduceSource', [l:data])
1255 function! s:reduceSource(data, source) abort
1256 let a:data['source'] = a:source
1257 return function('s:reduceFactory', [a:data])
1260 function! s:reduceFactory(data, start, sink) abort
1261 if a:start != 0 | return | endif
1262 let a:data['sink'] = a:sink
1263 let a:data['acc'] = a:data['seed']
1264 call a:data['source'](0, function('s:reduceSourceCallback', [a:data]))
1267 function! s:reduceSourceCallback(data, t, d) abort
1269 let a:data['acc'] = a:data['reducer'](a:data['acc'], a:d)
1270 elseif a:t == 2 && lsp#callbag#isUndefined(a:d)
1271 call a:data['sink'](1, a:data['acc'])
1272 call a:data['sink'](2, lsp#callbag#undefined())
1274 call a:data['sink'](a:t, a:d)
1280 function! lsp#callbag#switchMap(makeSource, ...) abort
1281 let l:data = { 'makeSource': a:makeSource }
1283 let l:data['combineResults'] = a:1
1285 let l:data['combineResults'] = function('s:switchMapDefaultCombineResults')
1287 return function('s:switchMapSourceCallback', [l:data])
1290 function! s:switchMapDefaultCombineResults(a, b) abort
1294 function! s:switchMapSourceCallback(data, inputSource) abort
1295 let a:data['inputSource'] = a:inputSource
1296 return function('s:switchMapFactory', [a:data])
1299 function! s:switchMapFactory(data, start, outputSink) abort
1300 if a:start != 0 | return | endif
1301 let a:data['outputSink'] = a:outputSink
1302 let a:data['sourceEnded'] = 0
1303 call a:data['inputSource'](0, function('s:switchMapInputSourceCallback', [a:data]))
1306 function! s:switchMapInputSourceCallback(data, t, d) abort
1307 if a:t == 0 | call a:data['outputSink'](a:t, a:d) | endif
1309 if has_key(a:data, 'currSourceTalkback')
1310 call a:data['currSourceTalkback'](2, lsp#callbag#undefined())
1311 call remove(a:data, 'currSourceTalkback')
1313 let l:CurrSource = a:data['makeSource'](a:d)
1314 call l:CurrSource(0, function('s:switchMapCurrSourceCallback', [a:data, a:t, a:d]))
1317 let a:data['sourceEnded'] = 1
1318 if !has_key(a:data, 'currSourceTalkback') | call a:data['outputSink'](a:t, a:d) | endif
1322 function! s:switchMapCurrSourceCallback(data, t, d, currT, currD) abort
1323 if a:currT == 0 | let a:data['currSourceTalkback'] = a:currD | endif
1324 if a:currT == 1 | call a:data['outputSink'](a:t, a:data['combineResults'](a:d, a:currD)) | endif
1325 if (a:currT == 0 || a:currT == 1) && has_key(a:data, 'currSourceTalkback')
1326 call a:data['currSourceTalkback'](1, lsp#callbag#undefined())
1329 call remove(a:data, 'currSourceTalkback')
1330 if a:data['sourceEnded'] | call a:data['outputSink'](a:currT, a:currD) | endif
1336 function! lsp#callbag#share(source) abort
1337 let l:data = { 'source': a:source, 'sinks': [] }
1338 return function('s:shareFactory', [l:data])
1341 function! s:shareFactory(data, start, sink) abort
1342 if a:start != 0 | return | endif
1343 call add(a:data['sinks'], a:sink)
1345 let a:data['talkback'] = function('s:shareTalkbackCallback', [a:data, a:sink])
1347 if len(a:data['sinks']) == 1
1348 call a:data['source'](0, function('s:shareSourceCallback', [a:data, a:sink]))
1352 call a:sink(0, a:data['talkback'])
1355 function! s:shareTalkbackCallback(data, sink, t, d) abort
1359 while l:i < len(a:data['sinks'])
1360 if a:data['sinks'][l:i] == a:sink
1368 call remove(a:data['sinks'], l:i)
1371 if empty(a:data['sinks'])
1372 call a:data['sourceTalkback'](2, lsp#callbag#undefined())
1375 call a:data['sourceTalkback'](a:t, a:d)
1379 function! s:shareSourceCallback(data, sink, t, d) abort
1381 let a:data['sourceTalkback'] = a:d
1382 call a:sink(0, a:data['talkback'])
1384 for l:S in a:data['sinks']
1389 let a:data['sinks'] = []
1395 function! lsp#callbag#materialize() abort
1397 return function('s:materializeF', [l:data])
1400 function! s:materializeF(data, source) abort
1401 let a:data['source'] = a:source
1402 return function('s:materializeFSource', [a:data])
1405 function! s:materializeFSource(data, start, sink) abort
1406 if a:start != 0 | return | endif
1407 let a:data['sink'] = a:sink
1408 call a:data['source'](0, function('s:materializeFSourceCallback', [a:data]))
1411 function! s:materializeFSourceCallback(data, t, d) abort
1413 call a:data['sink'](1, lsp#callbag#createNextNotification(a:d))
1415 call a:data['sink'](1, lsp#callbag#isUndefined(a:d)
1416 \ ? lsp#callbag#createCompleteNotification()
1417 \ : lsp#callbag#createErrorNotification(a:d))
1418 call a:data['sink'](2, lsp#callbag#undefined())
1420 call a:data['sink'](a:t, a:d)
1426 function! lsp#callbag#createNextNotification(d) abort
1427 return { 'kind': 'N', 'value': a:d }
1430 function! lsp#callbag#createCompleteNotification() abort
1431 return { 'kind': 'C' }
1434 function! lsp#callbag#createErrorNotification(d) abort
1435 return { 'kind': 'E', 'error': a:d }
1438 function! lsp#callbag#isNextNotification(d) abort
1439 return a:d['kind'] ==# 'N'
1442 function! lsp#callbag#isCompleteNotification(d) abort
1443 return a:d['kind'] ==# 'C'
1446 function! lsp#callbag#isErrorNotification(d) abort
1447 return a:d['kind'] ==# 'E'
1452 " let s:Stdin = lsp#callbag#makeSubject()
1453 " call lsp#callbag#spawn(['bash', '-c', 'read i; echo $i'], {
1454 " \ 'stdin': s:Stdin,
1458 " \ 'start': 0, " when job starts before subscribing to stdin
1459 " \ 'ready': 0, " when job starts and after subscribing to stdin
1461 " \ 'failOnNonZeroExitCode': 1,
1462 " \ 'failOnStdinError': 1,
1463 " \ 'normalize': 'raw' | 'string' | 'array', (defaults to raw),
1466 " call s:Stdin(1, 'hi')
1467 " call s:Stdin(2, lsp#callbag#undefined()) " requried to close stdin
1468 function! lsp#callbag#spawn(cmd, ...) abort
1469 let l:data = { 'cmd': a:cmd, 'opt': a:0 > 0 ? copy(a:000[0]) : {} }
1470 return lsp#callbag#create(function('s:spawnCreate', [l:data]))
1473 function! s:spawnCreate(data, next, error, complete) abort
1474 let a:data['next'] = a:next
1475 let a:data['error'] = a:error
1476 let a:data['complete'] = a:complete
1477 let a:data['state'] = {}
1478 let a:data['dispose'] = 0
1479 let a:data['exit'] = 0
1480 let a:data['close'] = 0
1482 let l:normalize = get(a:data['opt'], 'normalize', 'raw')
1485 let a:data['jobopt'] = {
1486 \ 'on_exit': function('s:spawnNeovimOnExit', [a:data]),
1488 if l:normalize ==# 'string'
1489 let a:data['normalize'] = function('s:spawnNormalizeNeovimString')
1491 let a:data['normalize'] = function('s:spawnNormalizeRaw')
1493 if get(a:data['opt'], 'stdout', 0) | let a:data['jobopt']['on_stdout'] = function('s:spawnNeovimOnStdout', [a:data]) | endif
1494 if get(a:data['opt'], 'stderr', 0) | let a:data['jobopt']['on_stderr'] = function('s:spawnNeovimOnStderr', [a:data]) | endif
1495 if has_key(a:data['opt'], 'env') | let a:data['jobopt']['env'] = a:data['opt']['env'] | endif
1496 let a:data['jobid'] = jobstart(a:data['cmd'], a:data['jobopt'])
1498 let a:data['jobopt'] = {
1499 \ 'exit_cb': function('s:spawnVimExitCb', [a:data]),
1500 \ 'close_cb': function('s:spawnVimCloseCb', [a:data]),
1502 if get(a:data['opt'], 'stdout', 0) | let a:data['jobopt']['out_cb'] = function('s:spawnVimOutCb', [a:data]) | endif
1503 if get(a:data['opt'], 'stderr', 0) | let a:data['jobopt']['err_cb'] = function('s:spawnVimErrCb', [a:data]) | endif
1504 if has_key(a:data['opt'], 'env') | let a:data['jobopt']['env'] = a:data['opt']['env'] | endif
1505 if l:normalize ==# 'array'
1506 let a:data['normalize'] = function('s:spawnNormalizeVimArray')
1508 let a:data['normalize'] = function('s:spawnNormalizeRaw')
1510 if has('patch-8.1.350') | let a:data['jobopt']['noblock'] = 1 | endif
1511 let a:data['stdinBuffer'] = ''
1512 let a:data['job'] = job_start(a:data['cmd'], a:data['jobopt'])
1513 let a:data['jobchannel'] = job_getchannel(a:data['job'])
1514 let a:data['jobid'] = ch_info(a:data['jobchannel'])['id']
1517 if a:data['jobid'] < 0 | return | endif " jobstart failed. on_exit will notify with error
1519 if get(a:data['opt'], 'pid', 0)
1521 let a:data['pid'] = jobpid(a:data['jobid'])
1522 let l:startdata['pid'] = a:data['pid']
1524 let l:jobinfo = job_info(a:data['job'])
1525 if type(l:jobinfo) == type({}) && has_key(l:jobinfo, 'process')
1526 let a:data['pid'] = l:jobinfo['process']
1527 let l:startdata['pid'] = a:data['pid']
1532 if get(a:data['opt'], 'start', 0)
1533 let l:startdata = { 'id': a:data['jobid'], 'state': a:data['state'] }
1534 call a:data['next']({ 'event': 'start', 'data': l:startdata })
1537 if has_key(a:data['opt'], 'stdin')
1538 let a:data['stdinDispose'] = lsp#callbag#pipe(
1539 \ a:data['opt']['stdin'],
1540 \ lsp#callbag#subscribe({
1541 \ 'next': (has('nvim') ? function('s:spawnNeovimStdinNext', [a:data]) : function('s:spawnVimStdinNext', [a:data])),
1542 \ 'error': (has('nvim') ? function('s:spawnNeovimStdinError', [a:data]) : function('s:spawnVimStdinError', [a:data])),
1543 \ 'complete': (has('nvim') ? function('s:spawnNeovimStdinComplete', [a:data]) : function('s:spawnVimStdinComplete', [a:data])),
1548 if get(a:data['opt'], 'ready', 0)
1549 let l:readydata = { 'id': a:data['jobid'], 'state': a:data['state'] }
1550 if has_key(a:data, 'pid') | let l:readydata['pid'] = a:data['pid'] | endif
1551 call a:data['next']({ 'event': 'ready', 'data': l:readydata })
1554 return function('s:spawnDispose', [a:data])
1557 function! s:spawnJobStop(data) abort
1560 call jobstop(a:data['jobid'])
1561 catch /^Vim\%((\a\+)\)\=:E900/
1563 " Vim does not raise exception even the job has already closed so fail
1564 " silently for 'E900: Invalid job id' exception
1567 call job_stop(a:data['job'])
1571 function! s:spawnDispose(data) abort
1572 let a:data['dispose'] = 1
1573 call s:spawnJobStop(a:data)
1576 function! s:spawnNeovimStdinNext(data, x) abort
1577 call jobsend(a:data['jobid'], a:x)
1580 function! s:spawnVimStdinNext(data, x) abort
1581 " Ref: https://groups.google.com/d/topic/vim_dev/UNNulkqb60k/discussion
1582 let a:data['stdinBuffer'] .= a:x
1583 call s:spawnVimStdinNextFlushBuffer(a:data)
1586 function! s:spawnVimStdinNextFlushBuffer(data) abort
1587 " https://github.com/vim/vim/issues/2548
1588 " https://github.com/natebosch/vim-lsc/issues/67#issuecomment-357469091
1590 if len(a:data['stdinBuffer']) <= 4096
1591 call ch_sendraw(a:data['jobchannel'], a:data['stdinBuffer'])
1592 let a:data['stdinBuffer'] = ''
1594 let l:to_send = a:data['stdinBuffer'][:4095]
1595 let a:data['stdinBuffer'] = a:data['stdinBuffer'][4096:]
1596 call ch_sendraw(a:data['jobchannel'], l:to_send)
1597 call timer_start(1, function('s:spawnVimStdinNextFlushBuffer', [a:data]))
1601 function! s:spawnNeovimStdinError(data, x) abort
1602 let a:data['stdinError'] = a:x
1603 if get(a:data['opt'], 'failOnStdinError', 1) | call s:spawnJobStop(a:data) | endif
1606 function! s:spawnVimStdinError(data, x) abort
1607 let a:data['stdinError'] = a:x
1608 if get(a:data['opt'], 'failOnStdinError', 1) | call s:spawnJobStop(a:data) | endif
1611 function! s:spawnNeovimStdinComplete(data) abort
1612 call chanclose(a:data['jobid'], 'stdin')
1615 function! s:spawnVimStdinComplete(data) abort
1616 " There is no easy way to know when ch_sendraw() finishes writing data
1617 " on a non-blocking channels -- has('patch-8.1.889') -- and because of
1618 " this, we cannot safely call ch_close_in().
1619 while len(a:data['stdinBuffer']) != 0
1622 call ch_close_in(a:data['jobchannel'])
1625 function! s:spawnNormalizeRaw(data) abort
1629 function! s:spawnNormalizeNeovimString(data) abort
1630 " convert array to string since neovim uses array split by \n by default
1631 return join(a:data, "\n")
1634 function! s:spawnNormalizeVimArray(data) abort
1635 " convert string to array since vim uses string by default.
1636 return split(a:data, "\n", 1)
1639 function! s:spawnNeovimOnStdout(data, id, d, event) abort
1640 call a:data['next']({ 'event': 'stdout', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1643 function! s:spawnNeovimOnStderr(data, id, d, event) abort
1644 call a:data['next']({ 'event': 'stderr', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1647 function! s:spawnNeovimOnExit(data, id, d, event) abort
1648 let a:data['exit'] = 1
1649 let a:data['close'] = 1
1650 let a:data['exitcode'] = a:d
1651 call s:spawnNotifyExit(a:data)
1654 function! s:spawnVimOutCb(data, id, d, ...) abort
1655 call a:data['next']({ 'event': 'stdout', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1658 function! s:spawnVimErrCb(data, id, d, ...) abort
1659 call a:data['next']({ 'event': 'stderr', 'data': a:data['normalize'](a:d), 'state': a:data['state'] })
1662 function! s:spawnVimExitCb(data, id, d) abort
1663 let a:data['exit'] = 1
1664 let a:data['exitcode'] = a:d
1665 " for more info refer to :h job-start
1666 " job may exit before we read the output and output may be lost.
1667 " in unix this happens because closing the write end of a pipe
1668 " causes the read end to get EOF.
1669 " close and exit has race condition, so wait for both to complete
1670 if a:data['close'] && a:data['exit']
1671 call s:spawnNotifyExit(a:data)
1675 function! s:spawnVimCloseCb(data, id) abort
1676 let a:data['close'] = 1
1677 if a:data['close'] && a:data['exit']
1678 call s:spawnNotifyExit(a:data)
1682 function! s:spawnNotifyExit(data) abort
1683 if a:data['dispose'] | return | end
1684 if has_key(a:data, 'stdinDispose') | call a:data['stdinDispose']() | endif
1685 if get(a:data['opt'], 'failOnStdinError', 1) && has_key(a:data, 'stdinError')
1686 call a:data['error'](a:data['stdinError'])
1689 if get(a:data['opt'], 'exit', 0)
1690 call a:data['next']({ 'event': 'exit', 'data': a:data['exitcode'], 'state': a:data['state'] })
1692 if get(a:data['opt'], 'failOnNonZeroExitCode', 1) && a:data['exitcode'] != 0
1693 call a:data['error']('Spawn for job ' . a:data['jobid'] .' failed with exit code ' . a:data['exitcode'] . '. ')
1695 call a:data['complete']()
1700 " vim:ts=4:sw=4:ai:foldmethod=marker:foldlevel=0: