Skip to content

Commit

Permalink
WIP: Implement multi process
Browse files Browse the repository at this point in the history
  • Loading branch information
Shougo committed Jan 30, 2018
1 parent 572fa2c commit f3663f1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 75 deletions.
46 changes: 27 additions & 19 deletions rplugin/python3/deoplete/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,28 @@ def enable_logging(self):
def main_loop(self):
for queue_id in sys.stdin:
queue_id = queue_id.strip()

if queue_id not in self._vim.vars['deoplete#_child_in']:
continue

self.debug('main_loop: begin')
child_in = self._vim.vars['deoplete#_child_in']
name = child_in[queue_id]['name']
args = child_in[queue_id]['args']
error(self._vim, name)
self.debug('main_loop: %s', name)

# if name == 'add_source':
# self._add_source(args[0])
# elif name == 'add_filter':
# self._add_filter(args[0])
# elif name == 'set_source_attributes':
# self._set_source_attributes(args[0])
# elif name == 'set_custom':
# self._set_custom(args[0])
# elif name == 'on_event':
# self._on_event(args[0])
# elif name == 'merge_results':
# self._merge_results(args[0])
if name == 'add_source':
self._add_source(args[0])
elif name == 'add_filter':
self._add_filter(args[0])
elif name == 'set_source_attributes':
self._set_source_attributes(args[0])
elif name == 'set_custom':
self._set_custom(args[0])
elif name == 'on_event':
self._on_event(args[0])
elif name == 'merge_results':
self._merge_results(args[0], queue_id)

def _add_source(self, path):
source = None
Expand Down Expand Up @@ -125,7 +125,13 @@ def _merge_results(self, context, queue_id):
if not self._is_skip(x['context'], x['source'])]:
source_result = self._source_result(result, context['input'])
if source_result:
merged_results.append(source_result)
merged_results.append({
'input': source_result['input'],
'complete_position': source_result['complete_position'],
'mark': result['source'].mark,
'filetypes': result['source'].filetypes,
'candidates': result['candidates'],
})

is_async = len([x for x in results if x['context']['is_async']]) > 0

Expand Down Expand Up @@ -199,6 +205,9 @@ def _gather_results(self, context):
'is_async': ctx['is_async'],
'prev_linenr': ctx['position'][1],
'prev_input': ctx['input'],
'input': ctx['input'],
'complete_position': ctx['complete_position'],
'candidates': ctx['candidates'],
}
self._prev_results[source.name] = result
results.append(result)
Expand Down Expand Up @@ -268,8 +277,8 @@ def _source_result(self, result, context_input):
if result['is_async']:
self._gather_async_results(result, source)

if not result['context']['candidates']:
return []
if not result['candidates']:
return None

# Source context
ctx = copy.deepcopy(result['context'])
Expand All @@ -296,9 +305,8 @@ def _source_result(self, result, context_input):
if hasattr(source, 'on_post_filter'):
ctx['candidates'] = source.on_post_filter(ctx)

if ctx['candidates']:
return [ctx['candidates'], result]
return []
result['candidates'] = ctx['candidates']
return result if result['candidates'] else None

def _itersource(self, context):
sources = sorted(self._sources.items(),
Expand Down
19 changes: 10 additions & 9 deletions rplugin/python3/deoplete/deoplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from deoplete import logger
from deoplete.parent import Parent
from deoplete.util import (error, error_tb, find_rplugins)
from deoplete.util import (error_tb, find_rplugins)
# from deoplete.util import error


class Deoplete(logger.LoggingMixin):
Expand Down Expand Up @@ -92,26 +93,26 @@ def merge_results(self, context):
if not merged_results:
return (is_async, -1, [])

complete_position = min([x[1]['context']['complete_position']
complete_position = min([x['complete_position']
for x in merged_results])

all_candidates = []
for [candidates, result] in merged_results:
ctx = result['context']
source = result['source']
prefix = ctx['input'][complete_position:ctx['complete_position']]
for result in merged_results:
candidates = result['candidates']
prefix = result['input'][
complete_position:result['complete_position']]

mark = source.mark + ' '
mark = result['mark'] + ' '
for candidate in candidates:
# Add prefix
candidate['word'] = prefix + candidate['word']

# Set default menu and icase
candidate['icase'] = 1
if (source.mark != '' and
if (mark != ' ' and
candidate.get('menu', '').find(mark) != 0):
candidate['menu'] = mark + candidate.get('menu', '')
if source.filetypes:
if result['filetypes']:
candidate['dup'] = 1

all_candidates += candidates
Expand Down
12 changes: 7 additions & 5 deletions rplugin/python3/deoplete/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from deoplete import logger
from deoplete.process import Process
from deoplete.util import error
# from deoplete.util import error


class Parent(logger.LoggingMixin):
Expand Down Expand Up @@ -45,10 +45,14 @@ def merge_results(self, context):
if not queue_id:
return (False, [])

time.sleep(0.5)
time.sleep(1.0)

results = self._get(queue_id)
return results if results else (False, [])
if not results:
return (False, [])
self._vim.vars['deoplete#_child_out'] = {}
return (results['is_async'],
results['merged_results']) if results else (False, [])

def on_event(self, context):
if context['event'] == 'VimLeavePre':
Expand All @@ -60,8 +64,6 @@ def _start_process(self, context, serveraddr):
self._proc = Process(
[context['python3'], context['dp_main'], serveraddr],
context, context['cwd'])
time.sleep(1)
error(self._vim, self._proc.communicate(100))

def _stop_process(self):
if self._proc:
Expand Down
44 changes: 2 additions & 42 deletions rplugin/python3/deoplete/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
# ============================================================================

import subprocess
from threading import Thread
from queue import Queue
from time import time
import os


Expand All @@ -19,15 +16,12 @@ def __init__(self, commands, context, cwd):
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.__proc = subprocess.Popen(commands,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
startupinfo=startupinfo,
cwd=cwd)
self.__eof = False
self.__context = context
self.__queue_out = Queue()
self.__thread = Thread(target=self.enqueue_output)
self.__thread.start()

def eof(self):
return self.__eof
Expand All @@ -38,42 +32,8 @@ def kill(self):
self.__proc.kill()
self.__proc.wait()
self.__proc = None
self.__queue_out = None
self.__thread.join(1.0)
self.__thread = None

def write(self, text):
self.__proc.stdin.write(text.encode(
self.__context['encoding'], errors='replace'))
self.__proc.stdin.flush()

def enqueue_output(self):
for line in self.__proc.stdout:
if not self.__queue_out:
return
self.__queue_out.put(
line.decode(self.__context['encoding'],
errors='replace').strip('\r\n'))

def communicate(self, timeout):
if not self.__proc:
return ([], [])

start = time()
outs = []

while not self.__queue_out.empty() and time() < start + timeout:
outs.append(self.__queue_out.get_nowait())

if self.__thread.is_alive() or not self.__queue_out.empty():
return (outs, [])

_, errs = self.__proc.communicate(timeout=timeout)
errs = errs.decode(self.__context['encoding'],
errors='replace').splitlines()
self.__eof = True
self.__proc = None
self.__thread = None
self.__queue = None

return (outs, errs)

0 comments on commit f3663f1

Please sign in to comment.