-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ssh tunnel for postgres normalization #5818
Changes from 25 commits
6d531f9
078be20
b1f4d81
36959b1
c22a678
1c23695
f797107
545be65
22c903e
91db2cd
8aaadaa
04c09fa
f1bfd1b
b5aa5f4
800f16c
c78df0b
b1943d5
8cb4988
df19286
e2f30b8
0755c53
6cd9311
817f1fa
53e92c6
b529041
202af8f
5924283
a697133
48a71b0
3b4723d
90115d4
4ac2f85
508efa0
881afbe
7334102
fdc3dc3
252b996
b64724a
97787c2
f7f4b25
fa7b131
48381ad
4ab8d79
475e00c
8b2b27f
0d92570
1ee9f90
69f6143
b3e0e2e
defc0fc
82de69d
e588198
61452a9
02bf9af
69d065c
47d3f33
d43b500
e63a5a3
5f19695
260a204
323129e
935f1af
ea1f7f8
f1cc6cb
4ca4696
728c3e5
e342018
5e341bc
5fd3fc0
3337c48
67815d3
442c707
82a4de4
d769abc
1a98048
8be8147
3cd895e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* MIT License | ||
* | ||
* Copyright (c) 2020 Airbyte | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
*/ | ||
|
||
package io.airbyte.integrations.base.ssh; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.commons.resources.MoreResources; | ||
import io.airbyte.integrations.base.AirbyteMessageConsumer; | ||
import io.airbyte.integrations.base.Destination; | ||
import io.airbyte.protocol.models.AirbyteConnectionStatus; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.ConnectorSpecification; | ||
import java.util.List; | ||
import java.util.function.Consumer; | ||
|
||
/** | ||
* Decorates a Destination with an SSH Tunnel using the standard configuration that Airbyte uses for | ||
* configuring SSH. | ||
*/ | ||
public class SshWrappedDestination implements Destination { | ||
|
||
private final Destination delegate; | ||
private final List<String> hostKey; | ||
private final List<String> portKey; | ||
|
||
public SshWrappedDestination(final Destination delegate, | ||
final List<String> hostKey, | ||
final List<String> portKey) { | ||
this.delegate = delegate; | ||
this.hostKey = hostKey; | ||
this.portKey = portKey; | ||
} | ||
|
||
@Override | ||
public ConnectorSpecification spec() throws Exception { | ||
// inject the standard ssh configuration into the spec. | ||
final ConnectorSpecification originalSpec = delegate.spec(); | ||
final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties"); | ||
propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json"))); | ||
return originalSpec; | ||
} | ||
|
||
@Override | ||
public AirbyteConnectionStatus check(final JsonNode config) throws Exception { | ||
return SshTunnel.sshWrap(config, hostKey, portKey, delegate::check); | ||
} | ||
|
||
@Override | ||
public AirbyteMessageConsumer getConsumer(final JsonNode config, | ||
final ConfiguredAirbyteCatalog catalog, | ||
final Consumer<AirbyteMessage> outputRecordCollector) | ||
throws Exception { | ||
final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey); | ||
return AirbyteMessageConsumer.appendOnClose(delegate.getConsumer(tunnel.getConfigInTunnel(), catalog, outputRecordCollector), tunnel::close); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
* | ||
!Dockerfile | ||
!entrypoint.sh | ||
!ssh/sshtunneling.sh | ||
!setup.py | ||
!normalization | ||
!dbt-project-template |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import json | ||
import os | ||
import pkgutil | ||
import socket | ||
from enum import Enum | ||
from typing import Any, Dict | ||
|
||
|
@@ -48,6 +49,8 @@ def run(self, args): | |
integration_type = inputs["integration_type"] | ||
transformed_config = self.transform(integration_type, original_config) | ||
self.write_yaml_config(inputs["output_path"], transformed_config) | ||
if self.is_ssh_tunnelling(original_config): | ||
self.write_ssh_port(inputs["output_path"], self.pick_a_port()) | ||
|
||
@staticmethod | ||
def parse(args): | ||
|
@@ -86,6 +89,43 @@ def transform(self, integration_type: DestinationType, config: Dict[str, Any]): | |
|
||
return base_profile | ||
|
||
@staticmethod | ||
def is_ssh_tunnelling(config: Dict[str, Any]) -> bool: | ||
tunnel_methods = ["SSH_KEY_AUTH", "SSH_PASSWORD_AUTH"] | ||
if ( | ||
"tunnel_method" in config.keys() and | ||
"tunnel_method" in config["tunnel_method"] and | ||
config["tunnel_method"]["tunnel_method"].upper() in tunnel_methods | ||
): | ||
return True | ||
else: | ||
return False | ||
|
||
@staticmethod | ||
def is_port_free(port: int) -> bool: | ||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | ||
try: | ||
s.bind(("localhost", port)) | ||
except Exception as e: | ||
print(f"port {port} unsuitable: {e}") | ||
return False | ||
else: | ||
print(f"port {port} is free") | ||
return True | ||
|
||
@staticmethod | ||
def pick_a_port() -> int: | ||
""" | ||
This function finds a free port, starting with 50001 and adding 1 until we find an open port. | ||
""" | ||
port_to_check = 50001 # just past start of dynamic port range (49152:65535) | ||
while not TransformConfig.is_port_free(port_to_check): | ||
port_to_check += 1 | ||
# error if we somehow hit end of port range | ||
if port_to_check > 65535: | ||
raise RuntimeError("Couldn't find a free port to use.") | ||
return port_to_check | ||
|
||
@staticmethod | ||
def transform_bigquery(config: Dict[str, Any]): | ||
print("transform_bigquery") | ||
|
@@ -108,13 +148,22 @@ def transform_bigquery(config: Dict[str, Any]): | |
@staticmethod | ||
def transform_postgres(config: Dict[str, Any]): | ||
print("transform_postgres") | ||
|
||
# set port & host correctly depending on whether we're ssh tunnelling | ||
# TODO: this should be a separate function to stay DRY when adding support for other destinations | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mind doing the separate function now? I know this feels silly, but I'd like for to establish the pattern we prefer now, so that there is no room for confusion when we ask GL to work on it. |
||
port = config["port"] | ||
host = config["host"] | ||
if TransformConfig.is_ssh_tunnelling(config): | ||
port = TransformConfig.pick_a_port() | ||
host = "localhost" | ||
|
||
# https://docs.getdbt.com/reference/warehouse-profiles/postgres-profile | ||
dbt_config = { | ||
"type": "postgres", | ||
"host": config["host"], | ||
"host": host, | ||
"user": config["username"], | ||
"pass": config.get("password", ""), | ||
"port": config["port"], | ||
"port": port, | ||
"dbname": config["database"], | ||
"schema": config["schema"], | ||
"threads": 32, | ||
|
@@ -191,6 +240,19 @@ def write_yaml_config(output_path: str, config: Dict[str, Any]): | |
with open(os.path.join(output_path, "profiles.yml"), "w") as fh: | ||
fh.write(yaml.dump(config)) | ||
|
||
@staticmethod | ||
def write_ssh_port(output_path: str, port: int): | ||
""" | ||
This function writes a small json file with content like {"port":xyz} | ||
This is being used only when ssh tunneling. | ||
We do this because we need to decide on and save this port number into our dbt config | ||
and then use that same port in sshtunneling.sh when opening the tunnel. | ||
""" | ||
if not os.path.exists(output_path): | ||
os.makedirs(output_path) | ||
with open(os.path.join(output_path, "localsshport.json"), "w") as fh: | ||
json.dump({"port": port}, fh) | ||
|
||
|
||
def main(args=None): | ||
TransformConfig().run(args) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# This function opens an ssh tunnel if required using values provided in config. | ||
# Requires two arguments, | ||
# path to config file ($1) | ||
# path to file containing local port to use ($2) | ||
function openssh() { | ||
# check if jq is missing, and if so try to install it.. | ||
# this is janky but for custom dbt transform we can't be sure jq is installed as using user docker image | ||
if ! command -v jq &> /dev/null ; then | ||
echo "CRITICAL: jq not installed... attempting to install on the fly but will fail if unable." | ||
{ apt-get update && apt-get install -y jq; } || | ||
apk --update add jq || | ||
{ yum install epel-release -y && yum install jq -y; } || | ||
{ dnf install epel-release -y && dnf install jq -y; } || exit 1 | ||
fi | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like this at all but custom dbt transforms use a user-provided docker image so we can't guarantee libs installed. The install attempts are foremost to pass tests and secondly to try and make things smoother for user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fair. i might suggest always doing |
||
tunnel_method=$(cat $1 | jq -r '.tunnel_method.tunnel_method' | tr '[:lower:]' '[:upper:]') | ||
tunnel_username=$(cat $1 | jq -r '.tunnel_method.tunnel_user') | ||
tunnel_db_host=$(cat $1 | jq -r '.host') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will these also need to change with different config schemas? i think we are kinda lucky right now that all dbs just use host and port (i think?). if we do have a database for which that is not the case though we should at least have a comment in here explaining what is going on. |
||
tunnel_db_port=$(cat $1 | jq -r '.port') | ||
tunnel_host=$(cat $1 | jq -r '.tunnel_method.tunnel_host') | ||
tunnel_local_port=$(cat $2 | jq -r '.port') | ||
# set a path for a control socket, allowing us to close this specific ssh connection when desired | ||
tmpcontrolsocket="/tmp/sshsocket${tunnel_db_remote_port}-${RANDOM}" | ||
if [[ ${tunnel_method} = "SSH_KEY_AUTH" ]] ; then | ||
echo "Detected tunnel method SSH_KEY_AUTH for normalization" | ||
# create a temporary file to hold ssh key and trap to delete on EXIT | ||
trap 'rm -f "$tmpkeyfile"' EXIT | ||
tmpkeyfile=$(mktemp /tmp/xyzfile.XXXXXXXXXXX) || exit 1 | ||
echo "$(cat $1 | jq -r '.tunnel_method.ssh_key')" > $tmpkeyfile | ||
# -f=background -N=no remote command -M=master mode StrictHostKeyChecking=no auto-adds host | ||
echo "Running: ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -i {key file} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}" | ||
ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -i $tmpkeyfile -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} && | ||
sshopen="yes" && | ||
echo "ssh tunnel opened" | ||
rm -f $tmpkeyfile | ||
elif [[ ${tunnel_method} = "SSH_PASSWORD_AUTH" ]] ; then | ||
echo "Detected tunnel method SSH_PASSWORD_AUTH for normalization" | ||
if ! command -v sshpass &> /dev/null ; then | ||
echo "CRITICAL: sshpass not installed... attempting to install on the fly but will fail if unable." | ||
{ apt-get update && apt-get install -y sshpass; } || | ||
{ apk add --update openssh && apk --update add sshpass; } || | ||
{ yum install epel-release -y && yum install sshpass -y; } || | ||
{ dnf install epel-release -y && dnf install sshpass -y; } || exit 1 | ||
fi | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see comment above, same applies here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above. i generally think we should make it clear to the user what is wrong, but solving their dependencies for them is a little too far. there's so many odd ways this could not work. fine with keeping it though, since it does at least mention the key piece of info to fix it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will leave this in, I agree solving deps for user is overkill but this also solves deps for one or two of destination integration tests without needing to override those tests |
||
# put ssh password in env var for use in sshpass. Better than directly passing with -p | ||
export SSHPASS=$(cat $1 | jq -r '.tunnel_method.tunnel_user_password') | ||
echo "Running: sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}" | ||
sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} && | ||
sshopen="yes" && | ||
echo "ssh tunnel opened" | ||
fi | ||
} | ||
|
||
# This function checks if $sshopen variable has been set and if so, closes the ssh open via $tmpcontrolsocket | ||
# This only works after calling openssh() | ||
function closessh() { | ||
# $sshopen $tmpcontrolsocket comes from openssh() function | ||
if [ ! -z "$sshopen" ] ; then | ||
ssh -S $tmpcontrolsocket -O exit ${tunnel_host} && | ||
echo "closed ssh tunnel" | ||
trap 'rm -f "$tmpcontrolsocket"' EXIT | ||
fi | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way we can get the operating system to do this for us? Looking at this SO post, I think we can. Probably not a big deal, but ultimately if we can just have the OS worry about this that might be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree that this is preferable, but going to skip doing this, for following reasons: