Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for replacing with regex group #63

Merged
merged 2 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions jupyterlab_search_replace/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ async def get(self, path: str = ""):
def post(self, path: str = ""):
"""POST request handler to perform a replace action."""
json_body = self.get_json_body()
results = json_body["results"]
query = json_body["query"]
self._engine.replace(results, path, query)
matches = json_body["matches"]
self._engine.replace(matches, path)

self.set_status(201)

Expand Down
91 changes: 49 additions & 42 deletions jupyterlab_search_replace/search_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os

from functools import partial
from pathlib import Path
from subprocess import Popen, PIPE
from typing import ClassVar, Iterable, List, Optional, Tuple

Expand Down Expand Up @@ -78,7 +79,7 @@ def __init__(self, root_dir: str) -> None:
Args:
root_dir (str): Server root path
"""
self._root_dir = os.path.expanduser(root_dir)
self._root_dir = Path(os.path.expanduser(root_dir)).resolve()

async def _execute(
self, cmd: List[str], cwd: Optional[str] = None
Expand Down Expand Up @@ -112,14 +113,19 @@ async def _execute(
if returncode == 0:
output = output.decode("utf-8")
else:
self.log.debug("exit code: {!s}".format(returncode))
self.log.debug("error: {!s}".format(error.decode("utf-8")))
output = output.decode("utf-8")

self.log.debug("output: {!s}".format(output[:MAX_LOG_OUTPUT]))
self.log.debug(f"exit code: {returncode!s}")
error_msg = error.decode("utf-8")
if returncode == 1 and output:
# This is the case for no match found
self.log.debug(f"error: {error_msg}")
output = output.decode("utf-8")
else:
output = error_msg

if len(output) > MAX_LOG_OUTPUT:
self.log.debug("...")
self.log.debug(
f"output: {output[:MAX_LOG_OUTPUT]}"
+ ("..." if len(output) > MAX_LOG_OUTPUT else "")
)

return returncode, output

Expand Down Expand Up @@ -186,6 +192,9 @@ async def search(
"match": match.get("match", {}).get("text"),
"start": match.get("start"),
"end": match.get("end"),
# TODO Provision the ability to get the replacement string from ripgrep
# See https://github.com/BurntSushi/ripgrep/issues/1872
"replace": None,
}
# Compute positions for utf-8 string
positions = get_utf8_positions(
Expand Down Expand Up @@ -228,55 +237,53 @@ def group_matches_by_line(self, line_matches: List[dict]) -> dict:
Args:
line_matches: The matches to group by
Returns:
The mapping line/matches positions ``{line_number: List[Tuple[start, end]]}``
The mapping line/matches positions ``{line_number: List[Tuple[start, end, replace_bytes]]}``
"""
d = {}
for each_match in line_matches:
if each_match["line_number"] not in d:
d[each_match["line_number"]] = [
(each_match["start"], each_match["end"])
for match in line_matches:
if match["line_number"] not in d:
d[match["line_number"]] = [
(match["start"], match["end"], match["replace"].encode("utf-8"))
]
else:
d[each_match["line_number"]].append(
(each_match["start"], each_match["end"])
d[match["line_number"]].append(
(match["start"], match["end"], match["replace"].encode("utf-8"))
)
for each_line_number in d:
d[each_line_number] = sorted(d[each_line_number], key=lambda tup: tup[0])
for line, matches in d.items():
d[line] = sorted(matches, key=lambda tup: tup[0])
return d

def replace(self, matches: List, path: str, replace: str) -> None:
"""Replace the ``matches`` within ``path`` by ``replace``.
def replace(self, matches: List, path: str) -> None:
"""Replace the ``matches`` within ``path``.

A match is described by a dictionary: {"line_number", "start", "end", "replace"}
where ``line_number`` is base 1, ``start`` and ``end`` are bytes positions
in the line and ``replace`` is UTF-8 string to use as replacement.

Args:
matches: The search matches to replace
path: The root folder in which to apply the replace
replace: The replace text to use
"""
replace = bytes(replace, "utf-8")
for each_result in matches:
file_path = each_result["path"]
line_matches = each_result["matches"]
for file_match in matches:
file_relative_path = file_match["path"]
line_matches = file_match["matches"]

file_path = os.path.join(self._root_dir, url2path(path), file_path)
file_path: Path = self._root_dir / url2path(path) / file_relative_path
grouped_line_matches = self.group_matches_by_line(line_matches)

with open(file_path, "rb") as fp:
with file_path.open("rb") as fp:
data = fp.readlines()
for line_number, offsets in grouped_line_matches.items():
original_line = data[line_number - 1]
replaced_line = b""
start = 0
end = offsets[0][0]
for line_number, matches in grouped_line_matches.items():
original_line = data[line_number - 1]
replaced_line = b""
for i, match in enumerate(matches):
start = 0 if i == 0 else matches[i - 1][1]
end = match[0]
replace = match[2]
replaced_line += original_line[start:end] + replace
for i in range(len(offsets)):
if i + 1 < len(offsets):
end = offsets[i + 1][0]
start = offsets[i][1]
if start < end:
replaced_line += original_line[start:end] + replace
else:
replaced_line += original_line[start:]
data[line_number - 1] = replaced_line

with open(file_path, "wb") as fp:

start = matches[-1][1]
data[line_number - 1] = replaced_line + original_line[start:]

with file_path.open("wb") as fp:
fp.writelines(data)
48 changes: 39 additions & 9 deletions jupyterlab_search_replace/tests/schema.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema",
"title": "Model",
"type": "object",
"properties": {
Expand Down Expand Up @@ -27,30 +28,58 @@
"type": "string"
},
"start": {
"title": "Start",
"type": "integer"
"title": "Match start position in binary format",
"type": "integer",
"minimum": 0
},
"end": {
"title": "End",
"type": "integer"
"title": "Match end position in binary format",
"type": "integer",
"minimum": 0
},
"start_utf8": {
"title": "Match start position in UTF-8 format",
"type": "integer",
"minimum": 0
},
"end_utf8": {
"title": "Match end position in UTF-8 format",
"type": "integer",
"minimum": 0
},
"line_number": {
"title": "Line Number",
"type": "integer"
"type": "integer",
"minimum": 1
},
"absolute_offset": {
"title": "Absolute Offset",
"type": "integer"
},
"replace": {
"title": "Replacement string for the match",
"oneOf": [
{
"type": "null"
},
{
"type": "string"
}
]
}
},
"required": [
"line",
"match",
"start",
"start_utf8",
"end",
"end_utf8",
"line_number",
"absolute_offset"
]
"absolute_offset",
"replace"
],
"additionalProperties": false
},
"fileMatches": {
"title": "fileMatches",
Expand All @@ -71,7 +100,8 @@
"required": [
"path",
"matches"
]
],
"additionalProperties": false
}
}
}
}
Loading