diff --git a/api/graphql/schema.graphql b/api/graphql/schema.graphql index 68efb9bd5c..5f0de3311f 100644 --- a/api/graphql/schema.graphql +++ b/api/graphql/schema.graphql @@ -6,6 +6,7 @@ scalar AWSJSON schema { query: Query mutation: Mutation + subscription: Subscription } type Mutation { @@ -21,6 +22,7 @@ type Mutation { deleteRule(input: DeleteRuleInput!): Boolean deleteUser(id: ID!): Boolean inviteUser(input: InviteUserInput): User! + queryDone(input: QueryDoneInput!): QueryDone! @aws_iam remediateResource(input: RemediateResourceInput!): Boolean resetUserPassword(id: ID!): User! suppressPolicies(input: SuppressPoliciesInput!): Boolean @@ -62,6 +64,10 @@ type Query { users: [User!]! } +type Subscription { + queryDone(userData: String!): QueryDone @aws_subscribe(mutations: ["queryDone"]) @aws_iam +} + input GetComplianceIntegrationTemplateInput { awsAccountId: String! integrationLabel: String! @@ -713,6 +719,18 @@ input InviteUserInput { email: AWSEmail } +input QueryDoneInput { + userData: String! + queryId: String! + workflowId: String! +} + +type QueryDone @aws_iam { + userData: String! + queryId: String! + workflowId: String! +} + enum ComplianceStatusEnum { ERROR FAIL diff --git a/api/lambda/database/models/api.go b/api/lambda/database/models/api.go new file mode 100644 index 0000000000..133a336854 --- /dev/null +++ b/api/lambda/database/models/api.go @@ -0,0 +1,254 @@ +package models + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// NOTE: different kinds of databases (e.g., Athena, Snowflake) will use different endpoints (lambda functions), same api. + +// NOTE: if a json tag _set_ is used more than once it is factored into a struct to avoid inconsistencies + +const ( + QuerySucceeded = "succeeded" + QueryFailed = "failed" + QueryRunning = "running" + QueryCancelled = "cancelled" +) + +// LambdaInput is the collection of all possible args to the Lambda function. +type LambdaInput struct { + // run a query, returning immediately with an id for the running query + ExecuteAsyncQuery *ExecuteAsyncQueryInput `json:"executeAsyncQuery"` + // run a query, returning immediately with an id for the step function running the query (will invoke lambda callback when done) + ExecuteAsyncQueryNotify *ExecuteAsyncQueryNotifyInput `json:"executeAsyncQueryNotify"` + // run a query, waiting for results + ExecuteQuery *ExecuteQueryInput `json:"executeQuery"` + // list databases + GetDatabases *GetDatabasesInput `json:"getDatabases"` + // given a query id, return paged results + GetQueryResults *GetQueryResultsInput `json:"getQueryResults"` + // given a query id, return a presigned s3 link to the results + GetQueryResultsLink *GetQueryResultsLinkInput `json:"getQueryResultsLink"` + // given a query id, return the status of the query + GetQueryStatus *GetQueryStatusInput `json:"getQueryStatus"` + // given a database, list tables + GetTables *GetTablesInput `json:"getTables"` + // given a database and list of tables, return tables + GetTablesDetail *GetTablesDetailInput `json:"getTablesDetail"` + // given a lambda and method, execute callback for step function + InvokeNotifyLambda *InvokeNotifyLambdaInput `json:"invokeNotifyLambda"` + // used as a callback lambda, will notify appsync that a UI query is complete + NotifyAppSync *NotifyAppSyncInput `json:"notifyAppSync"` + // given a query id, cancel query + StopQuery *StopQueryInput `json:"stopQuery"` +} + +type GetDatabasesInput struct { + OptionalName // if nil get all databases +} + +// NOTE: we will assume this is small an not paginate +type GetDatabasesOutput struct { + Databases []*NameAndDescription `json:"databases,omitempty"` +} + +type GetTablesInput struct { + Database + IncludePopulatedTablesOnly *bool `json:"includePopulatedTablesOnly,omitempty"` // if true OR nil, return only tables that have data +} + +// NOTE: we will assume this is small an not paginate +type GetTablesOutput struct { + TablesDetail +} + +type TablesDetail struct { + Tables []*TableDetail `json:"tables"` +} + +type TableDetail struct { + TableDescription + Columns []*TableColumn `json:"columns"` +} + +type TableDescription struct { + Database + NameAndDescription +} + +type GetTablesDetailInput struct { + Database + Names []string `json:"names" validate:"required"` +} + +// NOTE: we will assume this is small an not paginate +type GetTablesDetailOutput struct { + TablesDetail +} + +type TableColumn struct { + NameAndDescription + Type string `json:"type" validate:"required"` +} + +type ExecuteAsyncQueryNotifyInput struct { + ExecuteAsyncQueryInput + LambdaInvoke + UserPassThruData + DelaySeconds int `json:"delaySeconds"` // wait this long before starting workflow (default 0) +} + +type ExecuteAsyncQueryNotifyOutput struct { + Workflow +} + +// Blocking query +type ExecuteQueryInput = ExecuteAsyncQueryInput + +type ExecuteQueryOutput = GetQueryResultsOutput // call GetQueryResults() to page through results + +type ExecuteAsyncQueryInput struct { + Database + SQLQuery + UserID *string `json:"userId,omitempty"` +} + +type ExecuteAsyncQueryOutput struct { + QueryStatus + QueryInfo +} + +type GetQueryStatusInput = QueryInfo + +type GetQueryStatusOutput struct { + QueryStatus + SQLQuery + Stats *QueryResultsStats `json:"stats,omitempty"` // present only on successful queries +} + +type GetQueryResultsInput struct { + QueryInfo + Pagination + PageSize *int64 `json:"pageSize" validate:"omitempty,gt=1,lt=1000"` // only return this many rows per call + // NOTE: gt=1 above to ensure there are results on the first page w/header. If PageSize = 1 then + // user will get no rows for the first page with Athena because Athena returns header as first row and we remove it. +} + +type GetQueryResultsOutput struct { + GetQueryStatusOutput + ColumnInfo []*Column `json:"columnInfo" validate:"required"` + ResultsPage QueryResultsPage `json:"resultsPage" validate:"required"` +} + +type QueryResultsPage struct { + Pagination + NumRows int `json:"numRows" validate:"required"` // number of rows in page of results, len(Rows) + Rows []*Row `json:"rows" validate:"required"` +} + +type QueryResultsStats struct { + ExecutionTimeMilliseconds int64 `json:"executionTimeMilliseconds" validate:"required"` + DataScannedBytes int64 `json:"dataScannedBytes" validate:"required"` +} + +type GetQueryResultsLinkInput struct { + QueryInfo +} + +type GetQueryResultsLinkOutput struct { + QueryStatus + PresignedLink string `json:"presignedLink"` // presigned s3 link to results +} + +type StopQueryInput = QueryInfo + +type StopQueryOutput = GetQueryStatusOutput + +type InvokeNotifyLambdaInput struct { + LambdaInvoke + QueryInfo + Workflow + UserPassThruData +} + +type InvokeNotifyLambdaOutput = InvokeNotifyLambdaInput // so input can be confirmed + +type NotifyAppSyncInput struct { + NotifyInput +} + +type NotifyAppSyncOutput struct { + StatusCode int `json:"statusCode" validate:"required"` // the http status returned from POSTing callback to appsync +} + +type NotifyInput struct { // notify lambdas need to have this as input + GetQueryStatusInput + ExecuteAsyncQueryNotifyOutput + UserPassThruData +} + +type NameAndDescription struct { + Name string `json:"name" validate:"required"` + Description *string `json:"description,omitempty"` +} + +type OptionalName struct { + Name *string `json:"name,omitempty"` +} + +type SQLQuery struct { + SQL string `json:"sql" validate:"required"` +} + +type QueryInfo struct { + QueryID string `json:"queryId" validate:"required"` +} + +type Database struct { + DatabaseName string `json:"databaseName" validate:"required"` +} + +type Row struct { + Columns []*Column `json:"columns" validate:"required"` +} + +type Column struct { + Value *string `json:"value"` // NULL values are nil + Type *string `json:"type,omitempty"` +} + +type Pagination struct { + PaginationToken *string `json:"paginationToken,omitempty"` +} + +type QueryStatus struct { + Status string `json:"status" validate:"required,oneof=running,succeeded,failed,canceled"` + SQLError string `json:"sqlError,omitempty"` +} + +type Workflow struct { + WorkflowID string `json:"workflowId" validate:"required"` +} + +type UserPassThruData struct { + UserData string `json:"userData" validate:"required,gt=0"` // token passed though to notifications (usually the userid) +} + +type LambdaInvoke struct { + LambdaName string `json:"lambdaName" validate:"required"` // the name of the lambda to call when done + MethodName string `json:"methodName" validate:"required"` // the method to call on the lambda +} diff --git a/deployments/appsync.yml b/deployments/appsync.yml index 1bfb22444b..5d374293ef 100644 --- a/deployments/appsync.yml +++ b/deployments/appsync.yml @@ -159,6 +159,14 @@ Resources: SigningServiceName: execute-api Endpoint: !Ref RemediationApi + QueryDoneDataSource: + Type: AWS::AppSync::DataSource + Properties: + ApiId: !Ref ApiId + Name: PantherQueryDone + Type: NONE + ServiceRoleArn: !Ref ServiceRole + ########## Resolvers ########## ResetUserPasswordResolver: @@ -1492,3 +1500,19 @@ Resources: #else $util.error($ctx.result.body, "$statusCode", $input) #end + + QueryDoneResolver: + Type: AWS::AppSync::Resolver + DependsOn: GraphQLSchema + Properties: + ApiId: !Ref ApiId + TypeName: Mutation + FieldName: queryDone # noop that returns {userData,queryId,workflowId} (used for subscriptions to trigger async query completion) + DataSourceName: !GetAtt QueryDoneDataSource.Name + RequestMappingTemplate: | + { + "version" : "2017-02-28", + "payload": $utils.toJson($context.arguments.input) + } + ResponseMappingTemplate: | + $util.toJson($context.result) diff --git a/deployments/bootstrap.yml b/deployments/bootstrap.yml index 4b4dbf5e8a..d4a220d735 100644 --- a/deployments/bootstrap.yml +++ b/deployments/bootstrap.yml @@ -702,6 +702,8 @@ Resources: # * The Panther user interface will show errors. # AuthenticationType: AMAZON_COGNITO_USER_POOLS + AdditionalAuthenticationProviders: + - AuthenticationType: AWS_IAM # this is used for lambda callbacks to AppSync (e.g., to signal Athena query completion) UserPoolConfig: AwsRegion: !Ref AWS::Region UserPoolId: !Ref UserPool diff --git a/deployments/log_analysis.yml b/deployments/log_analysis.yml index 3cf1cca0b1..e7193e0e32 100644 --- a/deployments/log_analysis.yml +++ b/deployments/log_analysis.yml @@ -23,6 +23,15 @@ Parameters: AnalysisApiId: Type: String Description: API Gateway for analysis-api + AthenaResultsBucket: + Type: String + Description: Bucket created to hold Athena results + GraphQLApiEndpoint: + Type: String + Description: The URL to the Appsync GraphQL Endpoint + GraphQLApiId: + Type: String + Description: The id of the Appsync GraphQL Endpoint ProcessedDataBucket: Type: String Description: S3 bucket which stores processed logs @@ -37,6 +46,11 @@ Parameters: Description: KMS key ID for SQS encryption # Passed in from config file + AthenaPantherTablesOnly: + Type: String + Default: true + Description: If true, access only Panther tables in Athena API + AllowedValues: [true, false] CloudWatchLogRetentionDays: Type: Number Description: CloudWatch log retention period @@ -671,3 +685,222 @@ Resources: - Effect: Allow Action: execute-api:Invoke Resource: !Sub arn:${AWS::Partition}:execute-api:${AWS::Region}:${AWS::AccountId}:${AnalysisApiId}/v1/GET/enabled + + ##### Athena API ##### + AthenaApiFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: ../out/bin/internal/core/database_api/athena/driver/main + Description: Executes Athena and Glue queries + Environment: + Variables: + ATHENA_BUCKET: !Ref AthenaResultsBucket + ATHENA_STATEMACHINE_ARN: !Sub arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:panther-athena-workflow + DEBUG: !Ref Debug + GRAPHQL_ENDPOINT: !Ref GraphQLApiEndpoint + PANTHER_TABLES_ONLY: !Ref AthenaPantherTablesOnly + FunctionName: panther-athena-api + # + # The `panther-athena-api` lambda is used by AppSync to query Athena and Glue. + # + # Failure Impact + # * Failure of this lambda will stop the Panther UI from doing Athena queries. + # + Handler: main + Layers: !If [AttachLayers, !Ref LayerVersionArns, !Ref 'AWS::NoValue'] + MemorySize: 128 + Runtime: go1.x + Timeout: 900 # we want this as long as possible + Tracing: !If [TracingEnabled, !Ref TracingMode, !Ref 'AWS::NoValue'] + Policies: + - Id: AthenaPermissions + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - athena:StartQueryExecution + - athena:StopQueryExecution + - athena:GetQuery* + Resource: '*' + - Id: GluePermissions # used for all table metadata queries + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - glue:GetDatabase* + - glue:GetTable* + - glue:GetPartition* + Resource: '*' + - Id: S3ResultsPermissions # athena writes results to S3 + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - s3:GetBucketLocation + - s3:List* + - s3:GetObject + - s3:PutObject + Resource: !Sub arn:aws:s3:::${AthenaResultsBucket}* + - Id: PantherS3ReadPermissions # athena reads from S3, restrict to our processed data buckets + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - s3:GetBucketLocation + - s3:List* + - s3:GetObject + Resource: + - !Sub arn:${AWS::Partition}:s3:::${ProcessedDataBucket}* + - arn:aws:s3:::panther-athena-api-processeddata-test-* # used for integration tests + - Id: StepsPermissions # start an async workflow that tracks the query execution + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - states:StartExecution + Resource: !Sub arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:panther-athena-workflow + - Id: InvokePantherLambdas # allow calling other panther lambdas + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: # list lambdas that can be called below + - !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:panther-athena-api + - Id: CallAppSync # allow calling appsync mutation 'queryDone()' for notify support + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - appsync:GraphQL + Resource: !Sub arn:${AWS::Partition}:appsync:${AWS::Region}:${AWS::AccountId}:apis/${GraphQLApiId}/types/Mutation/fields/queryDone + + AthenaApiLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: /aws/lambda/panther-athena-api + RetentionInDays: !Ref CloudWatchLogRetentionDays + + AthenaStepFunctionRole: + Type: AWS::IAM::Role + Properties: + Description: Role used by Step Functions to run Athena queries + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: sts:AssumeRole + Principal: + Service: !Sub states.${AWS::Region}.amazonaws.com + Policies: + - PolicyName: InvokeAthenaLambda + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: lambda:InvokeFunction + Resource: !GetAtt AthenaApiFunction.Arn + + AthenaQueryStateMachine: + Type: AWS::StepFunctions::StateMachine + Properties: + StateMachineName: panther-athena-workflow + # + # The `panther-athena-workflow` is a Step Functions state machine used execute Athena queries and notify callers when done. + # + # Failure Impact + # * Failure of this state machine will stop the Panther UI from doing Athena queries. + # + DefinitionString: !Sub | + { + "Comment": "Run Athena query. Parameters are: userId, userData, delay, databaseName, sql, lambdaName, methodName", + "StartAt": "delay", + "States": { + "delay": { + "Comment": "Configurable delay to start", + "Type": "Wait", + "SecondsPath": "$.delaySeconds", + "Next": "query" + }, + "query": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${AthenaApiFunction.Arn}", + "Payload": { + "executeAsyncQuery": { + "userId.$": "$.userId", + "databaseName.$": "$.databaseName", + "sql.$": "$.sql" + } + } + }, + "ResultPath": "$.queryResults", + "Next": "status" + }, + "running (wait)": { + "Comment": "Pause to allow query to run.", + "Type": "Wait", + "Seconds": 2, + "Next": "status" + }, + "status": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${AthenaApiFunction.Arn}", + "Payload": { + "getQueryStatus": { + "queryId.$": "$.queryResults.Payload.queryId" + } + } + }, + "ResultPath": "$.queryStatus", + "Next": "done?" + }, + "done?": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.queryStatus.Payload.status", + "StringEquals": "succeeded", + "Next": "succeeded" + }, + { + "Variable": "$.queryStatus.Payload.status", + "StringEquals": "failed", + "Next": "failed" + } + ], + "Default": "running (wait)" + }, + "succeeded": { + "Type": "Pass", + "Next": "notify" + }, + "failed": { + "Type": "Pass", + "Next": "notify" + }, + "notify": { + "Comment": "Call lambdaName.methodName with userData, queryId and workflowId to notify done", + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${AthenaApiFunction.Arn}", + "Payload": { + "invokeNotifyLambda": { + "userData.$": "$.userData", + "queryId.$": "$.queryResults.Payload.queryId", + "workflowId.$": "$$.Execution.Id", + "lambdaName.$": "$.lambdaName", + "methodName.$": "$.methodName" + } + } + }, + "ResultPath": "$.notifyOutput", + "End": true + } + } + } + RoleArn: !GetAtt AthenaStepFunctionRole.Arn diff --git a/deployments/panther_config.yml b/deployments/panther_config.yml index ee2387b64c..b85717d790 100644 --- a/deployments/panther_config.yml +++ b/deployments/panther_config.yml @@ -99,8 +99,11 @@ Setup: # A list of ARNs of Principals that want to subscribe to log data. # These ARNs will be granted read permission on the processed log bucket and # SNS subscribe permission to the panther-processed-data-notifications topic. + # For example: + # PrincipalARNs: + # - arn:aws:iam::123456789012:role/mysystem-access + # - arn:aws:iam::123456789012:user/mysystem-iam-user PrincipalARNs: - - Web: # ARN of an AWS ACM certificate used on the loadbalancer presenting the panther web app diff --git a/docs/gitbook/operations/runbooks.md b/docs/gitbook/operations/runbooks.md index 479d5fa584..2f9c0ce8e0 100644 --- a/docs/gitbook/operations/runbooks.md +++ b/docs/gitbook/operations/runbooks.md @@ -92,9 +92,6 @@ This ddb table holds the policies applied by the `panther-rules-engine` lambda a * Processing of policies could be slowed or stopped if there are errors/throttles. * The Panther user interface could be impacted. -## panther-analysis-api -The `panther-analysis-api` API Gateway calles the `panther-analysis-api` lambda. - ## panther-analysis-api This lambda implements the analysis API which is responsible for policies/rules from being created, updated, and deleted. @@ -102,6 +99,21 @@ This lambda implements the analysis API which is responsible for Failure Impact * Failure of this lambda will prevent policies/rules from being created, updated, deleted. Additionally, policies and rules will stop being evaluated by the policy/rules engines. +## panther-analysis-api +The `panther-analysis-api` API Gateway calles the `panther-analysis-api` lambda. + +## panther-athena-api +The `panther-athena-api` lambda is used by AppSync to query Athena and Glue. + + Failure Impact + * Failure of this lambda will stop the Panther UI from doing Athena queries. + +## panther-athena-workflow +The `panther-athena-workflow` is a Step Functions state machine used execute Athena queries and notify callers when done. + + Failure Impact + * Failure of this state machine will stop the Panther UI from doing Athena queries. + ## panther-auditlog-processing The panther-auditlog-processing topic is used to send s3 notifications to log processing for log sources internal to the Panther account. @@ -142,6 +154,9 @@ This ddb table holds policy violation events for associated resources in the `pa Failure Impact * Processing of policies could be slowed or stopped if there are errors/throttles. +## panther-compliance-api +The `panther-compliance-api` API Gateway calls the `panther-compliance-api` lambda. + ## panther-compliance-api This lambda implements the compliance API which is responsible for tracking resource and policy pass/fail states. @@ -150,9 +165,6 @@ This lambda implements the compliance API which is responsible for tracking reso * Alerts for cloud security stop. * Policy failures are no longer be recorded. -## panther-compliance-api -The `panther-compliance-api` API Gateway calls the `panther-compliance-api` lambda. - ## panther-datacatalog-updater This lambda reads events from the `panther-datacatalog-updater-queue` generated by generated by the `panther-rules-engine` and `panther-log-processor` lambda. It creates new partitions to the Glue tables in `panther*` Glue Databases. @@ -284,15 +296,15 @@ This lambda executes the user-defined policies against infrastructure events. ## panther-processed-data-notifications This topic triggers the log analysis flow -## panther-remediation-api -The `panther-remediation-api` API Gateway calls the `panther-remediation-api` lambda. - ## panther-remediation-api The `panther-remediation-api` lambda triggers AWS remediations. Failure Impact * Failure of this lambda will impact performing remediations and infrastructure will remain in violation of policy. +## panther-remediation-api +The `panther-remediation-api` API Gateway calls the `panther-remediation-api` lambda. + ## panther-remediation-processor The `panther-remediation-processor` lambda processes queued remediations in the `panther-remediation-queue` and calls the `panther-aws-remediation` lambda. @@ -332,9 +344,6 @@ This table holds descriptions of the AWS resources in all accounts being monitor * Processing of policies could be slowed or stopped if there are errors/throttles. * The Panther user interface could be impacted. -## panther-resources-api -The `panther-resources-api` API Gateway calls the `panther-resources-api` lambda. - ## panther-resources-api The `panther-resources-api` lambda implements the resources API. @@ -342,6 +351,9 @@ The `panther-resources-api` lambda implements the resources API. * Infrastructure scans may be impacted when updating resources. * The Panther user interface for display of resources. +## panther-resources-api +The `panther-resources-api` API Gateway calls the `panther-resources-api` lambda. + ## panther-resources-queue This sqs queue has events from recently changed infrastructure. The lambda `panther-resource-processor` consumes these events to generate alerts. diff --git a/go.mod b/go.mod index 2c2bb7f8ed..a33e7d6c9a 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,8 @@ require ( github.com/stretchr/testify v1.5.1 github.com/tidwall/gjson v1.6.0 go.uber.org/zap v1.14.1 - golang.org/x/tools v0.0.0-20200407144507-5fc56a9a2104 // indirect + golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect + golang.org/x/tools v0.0.0-20200413161937-250b2131eb8b // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/validator.v9 v9.31.0 gopkg.in/yaml.v2 v2.2.8 diff --git a/go.sum b/go.sum index 734bfe6984..2ad7005f8e 100644 --- a/go.sum +++ b/go.sum @@ -197,14 +197,16 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190321052220-f7bb7a8bee54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -218,8 +220,8 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200407144507-5fc56a9a2104 h1:BgjF1Nn5zNEp8cxfwjYGMLT28bm1GD1Uir2/OnI1Wn4= -golang.org/x/tools v0.0.0-20200407144507-5fc56a9a2104/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200413161937-250b2131eb8b h1:FvD0+J5ZtXZrrc2bVxQaUSnJYUhSNlB1P3XHuZohH9I= +golang.org/x/tools v0.0.0-20200413161937-250b2131eb8b/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/internal/core/database_api/athena/driver/api/api.go b/internal/core/database_api/athena/driver/api/api.go new file mode 100644 index 0000000000..a1534ec97c --- /dev/null +++ b/internal/core/database_api/athena/driver/api/api.go @@ -0,0 +1,74 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/athena" + "github.com/aws/aws-sdk-go/service/athena/athenaiface" + "github.com/aws/aws-sdk-go/service/glue" + "github.com/aws/aws-sdk-go/service/glue/glueiface" + "github.com/aws/aws-sdk-go/service/lambda" + "github.com/aws/aws-sdk-go/service/lambda/lambdaiface" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/sfn" + "github.com/aws/aws-sdk-go/service/sfn/sfniface" + "github.com/kelseyhightower/envconfig" +) + +var ( + awsSession *session.Session + glueClient glueiface.GlueAPI + athenaClient athenaiface.AthenaAPI + lambdaClient lambdaiface.LambdaAPI + sfnClient sfniface.SFNAPI + s3Client s3iface.S3API + + envConfig EnvConfig + athenaS3ResultsPath *string +) + +type EnvConfig struct { + AthenaStatemachineARN string `required:"true" split_words:"true"` + AthenaBucket string `default:"" split_words:"true"` + GraphqlEndpoint string `required:"true" split_words:"true"` + PantherTablesOnly bool `default:"false" split_words:"true"` // if true, only return tables from Panther databases +} + +func SessionInit() { + awsSession = session.Must(session.NewSession()) + glueClient = glue.New(awsSession) + athenaClient = athena.New(awsSession) + lambdaClient = lambda.New(awsSession) + sfnClient = sfn.New(awsSession) + s3Client = s3.New(awsSession) + + err := envconfig.Process("", &envConfig) + if err != nil { + panic(err) + } + if envConfig.AthenaBucket != "" { + results := "s3://" + envConfig.AthenaBucket + "/athena_api/" + athenaS3ResultsPath = &results + } +} + +// API provides receiver methods for each route handler. +type API struct{} diff --git a/internal/core/database_api/athena/driver/api/error.go b/internal/core/database_api/athena/driver/api/error.go new file mode 100644 index 0000000000..80ca42e6b7 --- /dev/null +++ b/internal/core/database_api/athena/driver/api/error.go @@ -0,0 +1,29 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/panther-labs/panther/pkg/genericapi" +) + +// conform to generic api +func apiError(err error) error { + err = &genericapi.InternalError{Message: err.Error()} + return err +} diff --git a/internal/core/database_api/athena/driver/api/execute_async_query.go b/internal/core/database_api/athena/driver/api/execute_async_query.go new file mode 100644 index 0000000000..e1f5dc109c --- /dev/null +++ b/internal/core/database_api/athena/driver/api/execute_async_query.go @@ -0,0 +1,65 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/service/athena" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsathena" +) + +func (API) ExecuteAsyncQuery(input *models.ExecuteAsyncQueryInput) (*models.ExecuteAsyncQueryOutput, error) { + var output models.ExecuteAsyncQueryOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + var userID string + if input.UserID != nil { + userID = *input.UserID + } + zap.L().Info("ExecuteAsyncQuery", + zap.String("userId", userID), + zap.String("queryId", output.QueryID), + zap.Error(err)) + }() + + startOutput, err := awsathena.StartQuery(athenaClient, input.DatabaseName, input.SQL, athenaS3ResultsPath) + if err != nil { + output.Status = models.QueryFailed + + // try to dig out the athena error if there is one + if athenaErr, ok := err.(*athena.InvalidRequestException); ok { + output.SQLError = athenaErr.Message() + return &output, nil // no lambda err + } + + return &output, err + } + + output.Status = models.QueryRunning + output.QueryID = *startOutput.QueryExecutionId + return &output, nil +} diff --git a/internal/core/database_api/athena/driver/api/execute_async_query_notify.go b/internal/core/database_api/athena/driver/api/execute_async_query_notify.go new file mode 100644 index 0000000000..6c0697d280 --- /dev/null +++ b/internal/core/database_api/athena/driver/api/execute_async_query_notify.go @@ -0,0 +1,74 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sfn" + "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" +) + +// Execute an Athena query via step function workflow. + +func (API) ExecuteAsyncQueryNotify(input *models.ExecuteAsyncQueryNotifyInput) (*models.ExecuteAsyncQueryNotifyOutput, error) { + var output models.ExecuteAsyncQueryNotifyOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + var userID string + if input.UserID != nil { + userID = *input.UserID + } + zap.L().Info("ExecuteAsyncQueryNotify", + zap.String("userId", userID), + zap.String("userData", input.UserData), + zap.String("workflowID", output.WorkflowID), + zap.Error(err)) + }() + + worflowJSON, err := jsoniter.Marshal(input) + if err != nil { + err = errors.Wrapf(err, "failed to marshal %#v", input) + return &output, err + } + + startExecutionInput := &sfn.StartExecutionInput{ + Input: aws.String(string(worflowJSON)), + Name: aws.String(uuid.New().String()), + StateMachineArn: &envConfig.AthenaStatemachineARN, + } + startExecutionOutput, err := sfnClient.StartExecution(startExecutionInput) + if err != nil { + err = errors.Wrapf(err, "failed to start workflow execution for: %#v", input) + return &output, err + } + output.Workflow.WorkflowID = *startExecutionOutput.ExecutionArn + + return &output, err +} diff --git a/internal/core/database_api/athena/driver/api/execute_query.go b/internal/core/database_api/athena/driver/api/execute_query.go new file mode 100644 index 0000000000..9b3b3182df --- /dev/null +++ b/internal/core/database_api/athena/driver/api/execute_query.go @@ -0,0 +1,68 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "time" + + "github.com/panther-labs/panther/api/lambda/database/models" +) + +const ( + pollWait = time.Second * 2 +) + +func (api API) ExecuteQuery(input *models.ExecuteQueryInput) (*models.ExecuteQueryOutput, error) { + var output models.ExecuteQueryOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + }() + + executeAsyncQueryOutput, err := api.ExecuteAsyncQuery(input) + if err != nil || executeAsyncQueryOutput.SQLError != "" { // either API error OR sql error + output.Status = models.QueryFailed + output.QueryStatus = executeAsyncQueryOutput.QueryStatus + output.SQL = input.SQL + return &output, err + } + + // poll + getQueryStatusInput := &models.GetQueryStatusInput{ + QueryID: executeAsyncQueryOutput.QueryID, + } + for { + time.Sleep(pollWait) + getQueryStatusOutput, err := api.GetQueryStatus(getQueryStatusInput) + if err != nil { + return &output, err + } + if getQueryStatusOutput.Status != models.QueryRunning { + break + } + } + + // get the results + getQueryResultsInput := &models.GetQueryResultsInput{} + getQueryResultsInput.QueryID = executeAsyncQueryOutput.QueryID + return api.GetQueryResults(getQueryResultsInput) +} diff --git a/internal/core/database_api/athena/driver/api/get_databases.go b/internal/core/database_api/athena/driver/api/get_databases.go new file mode 100644 index 0000000000..4b773b622c --- /dev/null +++ b/internal/core/database_api/athena/driver/api/get_databases.go @@ -0,0 +1,74 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/service/glue" + "github.com/pkg/errors" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsglue" +) + +func (API) GetDatabases(input *models.GetDatabasesInput) (*models.GetDatabasesOutput, error) { + var output models.GetDatabasesOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + }() + + if input.Name != nil { + if envConfig.PantherTablesOnly && awsglue.PantherDatabases[*input.Name] == "" { + return &output, err // nothing + } + var glueOutput *glue.GetDatabaseOutput + glueOutput, err = glueClient.GetDatabase(&glue.GetDatabaseInput{ + Name: input.Name, + }) + if err != nil { + err = errors.WithStack(err) + return &output, err + } + output.Databases = append(output.Databases, &models.NameAndDescription{ + Name: *glueOutput.Database.Name, + Description: glueOutput.Database.Description, // optional + }) + return &output, err + } + + // list + err = glueClient.GetDatabasesPages(&glue.GetDatabasesInput{}, + func(page *glue.GetDatabasesOutput, lastPage bool) bool { + for _, database := range page.DatabaseList { + if envConfig.PantherTablesOnly && awsglue.PantherDatabases[*database.Name] == "" { + continue // skip + } + output.Databases = append(output.Databases, &models.NameAndDescription{ + Name: *database.Name, + Description: database.Description, // optional + }) + } + return false + }) + + return &output, errors.WithStack(err) +} diff --git a/internal/core/database_api/athena/driver/api/get_query_results.go b/internal/core/database_api/athena/driver/api/get_query_results.go new file mode 100644 index 0000000000..2a7f2681aa --- /dev/null +++ b/internal/core/database_api/athena/driver/api/get_query_results.go @@ -0,0 +1,103 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/service/athena" + "github.com/aws/aws-sdk-go/service/athena/athenaiface" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsathena" +) + +func (api API) GetQueryResults(input *models.GetQueryResultsInput) (*models.GetQueryResultsOutput, error) { + var output models.GetQueryResultsOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + zap.L().Info("GetQueryResults", + zap.String("queryId", input.QueryID), + zap.Error(err)) + }() + + getStatusOutput, err := api.GetQueryStatus(&input.QueryInfo) + if err != nil { + return &output, err + } + + output.GetQueryStatusOutput = *getStatusOutput + + switch output.Status { + case models.QuerySucceeded: + var nextToken *string + if input.PaginationToken != nil { // paging thru results + nextToken = input.PaginationToken + } + err = getQueryResults(athenaClient, input.QueryID, &output, nextToken, input.PageSize) + if err != nil { + return &output, err + } + } + return &output, nil +} + +func getQueryResults(client athenaiface.AthenaAPI, queryID string, + output *models.GetQueryResultsOutput, nextToken *string, maxResults *int64) (err error) { + + queryResult, err := awsathena.Results(client, queryID, nextToken, maxResults) + if err != nil { + return err + } + + // header with types + for _, columnInfo := range queryResult.ResultSet.ResultSetMetadata.ColumnInfo { + output.ColumnInfo = append(output.ColumnInfo, &models.Column{ + Value: columnInfo.Name, + Type: columnInfo.Type, + }) + } + + skipHeader := nextToken == nil // athena puts header in first row of first page + collectResults(skipHeader, queryResult, output) + return nil +} + +func collectResults(skipHeader bool, queryResult *athena.GetQueryResultsOutput, output *models.GetQueryResultsOutput) { + output.ResultsPage.Rows = make([]*models.Row, 0, len(queryResult.ResultSet.Rows)) // pre-alloc + for _, row := range queryResult.ResultSet.Rows { + if skipHeader { + skipHeader = false + continue + } + columns := make([]*models.Column, len(row.Data)) + for colIndex := range row.Data { + columns[colIndex] = &models.Column{Value: row.Data[colIndex].VarCharValue} + } + output.ResultsPage.Rows = append(output.ResultsPage.Rows, &models.Row{Columns: columns}) + } + + output.ResultsPage.NumRows = len(output.ResultsPage.Rows) + output.ResultsPage.PaginationToken = queryResult.NextToken +} diff --git a/internal/core/database_api/athena/driver/api/get_query_results_link.go b/internal/core/database_api/athena/driver/api/get_query_results_link.go new file mode 100644 index 0000000000..47645f273e --- /dev/null +++ b/internal/core/database_api/athena/driver/api/get_query_results_link.go @@ -0,0 +1,98 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "net/url" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsathena" +) + +const ( + presignedLinkTimeLimit = time.Minute +) + +func (api API) GetQueryResultsLink(input *models.GetQueryResultsLinkInput) (*models.GetQueryResultsLinkOutput, error) { + var output models.GetQueryResultsLinkOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + zap.L().Info("GetQueryResultsLink", + zap.String("queryId", input.QueryID), + zap.Error(err)) + }() + + executionStatus, err := awsathena.Status(athenaClient, input.QueryID) + if err != nil { + return &output, err + } + + output.Status = getQueryStatus(executionStatus) + + if output.Status != models.QuerySucceeded { + output.SQLError = "results not available" + return &output, nil + } + + s3path := *executionStatus.QueryExecution.ResultConfiguration.OutputLocation + + parsedPath, err := url.Parse(s3path) + if err != nil { + err = errors.Errorf("bad s3 url: %s,", err) + return &output, err + } + + if parsedPath.Scheme != "s3" { + err = errors.Errorf("not s3 protocol (expecting s3://): %s,", s3path) + return &output, err + } + + bucket := parsedPath.Host + if bucket == "" { + err = errors.Errorf("missing bucket: %s,", s3path) + return &output, err + } + var key string + if len(parsedPath.Path) > 0 { + key = parsedPath.Path[1:] // remove leading '/' + } + + req, _ := s3Client.GetObjectRequest(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + output.PresignedLink, err = req.Presign(presignedLinkTimeLimit) + if err != nil { + err = errors.Errorf("failed to sign: %s,", s3path) + return &output, err + } + return &output, nil +} diff --git a/internal/core/database_api/athena/driver/api/get_query_status.go b/internal/core/database_api/athena/driver/api/get_query_status.go new file mode 100644 index 0000000000..4cbed8980e --- /dev/null +++ b/internal/core/database_api/athena/driver/api/get_query_status.go @@ -0,0 +1,86 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/service/athena" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsathena" +) + +func (API) GetQueryStatus(input *models.GetQueryStatusInput) (*models.GetQueryStatusOutput, error) { + var output models.GetQueryStatusOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + zap.L().Info("GetQueryStatus", + zap.String("queryId", input.QueryID), + zap.Error(err)) + }() + + executionStatus, err := awsathena.Status(athenaClient, input.QueryID) + if err != nil { + return &output, err + } + + output.SQL = *executionStatus.QueryExecution.Query + output.Status = getQueryStatus(executionStatus) + + switch output.Status { + case models.QuerySucceeded: + output.Stats = &models.QueryResultsStats{ + ExecutionTimeMilliseconds: *executionStatus.QueryExecution.Statistics.TotalExecutionTimeInMillis, + DataScannedBytes: *executionStatus.QueryExecution.Statistics.DataScannedInBytes, + } + case models.QueryFailed: // lambda succeeded BUT query failed (could be for many reasons) + output.SQLError = *executionStatus.QueryExecution.Status.StateChangeReason + case models.QueryCancelled: + output.SQLError = "Query canceled" + } + return &output, nil +} + +func getQueryStatus(executionStatus *athena.GetQueryExecutionOutput) string { + switch *executionStatus.QueryExecution.Status.State { + case + athena.QueryExecutionStateSucceeded: + return models.QuerySucceeded + case + // failure modes + athena.QueryExecutionStateFailed: + return models.QueryFailed + case + athena.QueryExecutionStateCancelled: + return models.QueryCancelled + case + // still going + athena.QueryExecutionStateRunning, + athena.QueryExecutionStateQueued: + return models.QueryRunning + default: + panic("unknown athena status: " + *executionStatus.QueryExecution.Status.State) + } +} diff --git a/internal/core/database_api/athena/driver/api/get_tables.go b/internal/core/database_api/athena/driver/api/get_tables.go new file mode 100644 index 0000000000..01a85234cc --- /dev/null +++ b/internal/core/database_api/athena/driver/api/get_tables.go @@ -0,0 +1,75 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/glue" + "github.com/pkg/errors" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsglue" +) + +func (API) GetTables(input *models.GetTablesInput) (*models.GetTablesOutput, error) { + var output models.GetTablesOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + }() + + if envConfig.PantherTablesOnly && awsglue.PantherDatabases[input.DatabaseName] == "" { + return &output, err // nothing + } + + var partitionErr error + err = glueClient.GetTablesPages(&glue.GetTablesInput{DatabaseName: aws.String(input.DatabaseName)}, + func(page *glue.GetTablesOutput, lastPage bool) bool { + for _, table := range page.TableList { + // Default to only listing tables that have data, if input.IncludePopulatedTablesOnly is set, then + // defer to the setting. Implemented by checking there is at least 1 partition + if input.IncludePopulatedTablesOnly == nil || *input.IncludePopulatedTablesOnly { + var gluePartitionOutput *glue.GetPartitionsOutput + gluePartitionOutput, partitionErr = glueClient.GetPartitions(&glue.GetPartitionsInput{ + DatabaseName: aws.String(input.DatabaseName), + TableName: table.Name, + MaxResults: aws.Int64(1), + }) + if partitionErr != nil { + return false // stop + } + if len(gluePartitionOutput.Partitions) == 0 { // skip if no partitions + continue + } + } + detail := newTableDetail(input.DatabaseName, *table.Name, table.Description) + populateTableDetailColumns(detail, table) + output.Tables = append(output.Tables, detail) + } + return true + }) + if partitionErr != nil { + err = partitionErr + } + + return &output, errors.WithStack(err) +} diff --git a/internal/core/database_api/athena/driver/api/get_tables_detail.go b/internal/core/database_api/athena/driver/api/get_tables_detail.go new file mode 100644 index 0000000000..08d4f3cade --- /dev/null +++ b/internal/core/database_api/athena/driver/api/get_tables_detail.go @@ -0,0 +1,114 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/glue" + "github.com/pkg/errors" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsglue" +) + +func (API) GetTablesDetail(input *models.GetTablesDetailInput) (*models.GetTablesDetailOutput, error) { + var output models.GetTablesDetailOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + }() + + if envConfig.PantherTablesOnly && awsglue.PantherDatabases[input.DatabaseName] == "" { + return &output, err // nothing + } + + for _, tableName := range input.Names { + var glueTableOutput *glue.GetTableOutput + glueTableOutput, err = glueClient.GetTable(&glue.GetTableInput{ + DatabaseName: aws.String(input.DatabaseName), + Name: aws.String(tableName), + }) + if err != nil { + err = errors.WithStack(err) + return &output, err + } + detail := newTableDetail(input.DatabaseName, *glueTableOutput.Table.Name, glueTableOutput.Table.Description) + populateTableDetailColumns(detail, glueTableOutput.Table) + output.Tables = append(output.Tables, detail) + } + return &output, nil +} + +func populateTableDetailColumns(tableDetail *models.TableDetail, glueTableData *glue.TableData) { + columnMap := getColumnMap(glueTableData) + lookupColumnMapping := func(col *string) string { // use mapped name if it can be found + if col == nil { + return "" + } + if mappedCol, found := columnMap["mapping."+*col]; found { // e.g: "mapping.srcaddr": "srcAddr" + return *mappedCol + } + return *col + } + for _, column := range glueTableData.StorageDescriptor.Columns { + tableDetail.Columns = append(tableDetail.Columns, + newTableColumn(lookupColumnMapping(column.Name), aws.StringValue(column.Type), column.Comment)) + } + for _, column := range glueTableData.PartitionKeys { + tableDetail.Columns = append(tableDetail.Columns, + newTableColumn(lookupColumnMapping(column.Name), aws.StringValue(column.Type), column.Comment)) + } +} + +// tables can have a mapping table of column name to JSON attr, if present use that for column names +func getColumnMap(glueTableData *glue.TableData) map[string]*string { + if glueTableData.StorageDescriptor.SerdeInfo != nil && glueTableData.StorageDescriptor.SerdeInfo.Parameters != nil { + return glueTableData.StorageDescriptor.SerdeInfo.Parameters + } + return make(map[string]*string) +} + +// wrap complex constructors to make code more readable above + +func newTableDetail(databaseName, tableName string, description *string) *models.TableDetail { + return &models.TableDetail{ + TableDescription: models.TableDescription{ + Database: models.Database{ + DatabaseName: databaseName, + }, + NameAndDescription: models.NameAndDescription{ + Name: tableName, + Description: description, // optional + }, + }, + } +} + +func newTableColumn(colName, colType string, description *string) *models.TableColumn { + return &models.TableColumn{ + NameAndDescription: models.NameAndDescription{ + Name: colName, + Description: description, + }, + Type: colType, + } +} diff --git a/internal/core/database_api/athena/driver/api/integration_test.go b/internal/core/database_api/athena/driver/api/integration_test.go new file mode 100644 index 0000000000..ed19fecc38 --- /dev/null +++ b/internal/core/database_api/athena/driver/api/integration_test.go @@ -0,0 +1,728 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/lambda" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/sfn" + "github.com/aws/aws-sdk-go/service/sts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/internal/core/database_api/athena/testutils" + "github.com/panther-labs/panther/pkg/awsglue" + "github.com/panther-labs/panther/pkg/genericapi" +) + +const ( + stateMachineName = "panther-athena-workflow" + + printJSON = false // set to true to print json input/output (useful for sharing with frontend devs) + + testUserID = "testUserID" + testSQL = `select * from ` + testutils.TestTable + ` order by col1 asc` // tests may break w/out order by + badExecutingSQL = `select * from nosuchtable` // fails AFTER query starts + malformedSQL = `wewewewew` // fails when query starts + dropTableSQL = `drop table ` + testutils.TestTable // tests for mutating permissions + createTableAsSQL = `create table ishouldfail as select * from ` + testutils.TestTable // tests for mutating permissions +) + +var ( + integrationTest bool + + api = API{} + + maxRowsPerResult int64 = 3 // force pagination to test +) + +func TestMain(m *testing.M) { + integrationTest = strings.ToLower(os.Getenv("INTEGRATION_TEST")) == "true" + if integrationTest { + os.Setenv("GRAPHQL_ENDPOINT", "placeholder, this is required") + os.Setenv("ATHENA_STATEMACHINE_ARN", "placeholder, this is required") + SessionInit() + lambdaClient = lambda.New(awsSession) + s3Client = s3.New(awsSession) + + // get the ARN for the statemachine + identity, err := sts.New(awsSession).GetCallerIdentity(&sts.GetCallerIdentityInput{}) + if err != nil || identity.Account == nil { + panic("failed to get identity") + } + envConfig.AthenaStatemachineARN = fmt.Sprintf("arn:aws:states:%s:%s:stateMachine:%s", + *awsSession.Config.Region, *identity.Account, stateMachineName) + } + os.Exit(m.Run()) +} + +func TestIntegrationAthenaAPI(t *testing.T) { + if !integrationTest { + t.Skip() + } + + testAthenaAPI(t, false) +} + +func TestIntegrationAthenaAPILambda(t *testing.T) { + if !integrationTest { + t.Skip() + } + + testAthenaAPI(t, true) +} + +func TestIntegrationGlueAPI(t *testing.T) { + if !integrationTest { + t.Skip() + } + + t.Log("direct glue calls from client") + const useLambda = false // local client testing + + testutils.SetupTables(t, glueClient, s3Client) + defer func() { + testutils.RemoveTables(t, glueClient, s3Client) + }() + + // -------- GetDatabases() + + // list + var getDatabasesInput models.GetDatabasesInput + getDatabasesOutput, err := runGetDatabases(useLambda, &getDatabasesInput) + require.NoError(t, err) + foundDB := false + for _, db := range getDatabasesOutput.Databases { + if db.Name == testutils.TestDb { + foundDB = true + } + } + assert.True(t, foundDB) + + // specific lookup + getDatabasesInput.Name = aws.String(testutils.TestDb) + getDatabasesOutput, err = runGetDatabases(useLambda, &getDatabasesInput) + require.NoError(t, err) + require.Len(t, getDatabasesOutput.Databases, 1) + require.Equal(t, testutils.TestDb, getDatabasesOutput.Databases[0].Name) + + // -------- GetDatabases() with envConfig.PantherTablesOnly (should not find any) + + envConfig.PantherTablesOnly = true + + // list + var getPantherDatabasesInput models.GetDatabasesInput + getPantherDatabasesOutput, err := runGetDatabases(useLambda, &getPantherDatabasesInput) + require.NoError(t, err) + foundDB = false + for _, db := range getPantherDatabasesOutput.Databases { + if db.Name == testutils.TestDb { + foundDB = true + } + } + assert.False(t, foundDB) // should NOT find + + // specific lookup + getPantherDatabasesInput.Name = aws.String(testutils.TestDb) + getPantherDatabasesOutput, err = runGetDatabases(useLambda, &getPantherDatabasesInput) + require.NoError(t, err) + assert.Len(t, getPantherDatabasesOutput.Databases, 0) + + envConfig.PantherTablesOnly = false + + // -------- GetTables() + + var getTablesInput models.GetTablesInput + getTablesInput.DatabaseName = testutils.TestDb + getTablesOutput, err := runGetTables(useLambda, &getTablesInput) + require.NoError(t, err) + require.Len(t, getTablesOutput.Tables, 1) + testutils.CheckTableDetail(t, getTablesOutput.Tables) + + // -------- GetTables() with envConfig.PantherTablesOnly (should not find any) + + envConfig.PantherTablesOnly = true + + var getPantherTablesInput models.GetTablesInput + getPantherTablesInput.DatabaseName = testutils.TestDb + getPantherTablesOutput, err := runGetTables(useLambda, &getPantherTablesInput) + require.NoError(t, err) + assert.Len(t, getPantherTablesOutput.Tables, 0) + + envConfig.PantherTablesOnly = false + + // -------- GetTablesDetail() + + var getTablesDetailInput models.GetTablesDetailInput + getTablesDetailInput.DatabaseName = testutils.TestDb + getTablesDetailInput.Names = []string{testutils.TestTable} + getTablesDetailOutput, err := runGetTablesDetail(useLambda, &getTablesDetailInput) + require.NoError(t, err) + testutils.CheckTableDetail(t, getTablesDetailOutput.Tables) + + // -------- GetTablesDetail() with envConfig.PantherTablesOnly (should not find any) + + envConfig.PantherTablesOnly = true + + var getPantherTablesDetailInput models.GetTablesDetailInput + getPantherTablesDetailInput.DatabaseName = testutils.TestDb + getPantherTablesDetailInput.Names = []string{testutils.TestTable} + getPantherTablesDetailOutput, err := runGetTablesDetail(useLambda, &getPantherTablesDetailInput) + require.NoError(t, err) + assert.Empty(t, len(getPantherTablesDetailOutput.Tables)) + + envConfig.PantherTablesOnly = false +} + +func TestIntegrationGlueAPILambda(t *testing.T) { + if !integrationTest { + t.Skip() + } + + t.Log("indirect glue calls thru deployed lambdas") + const useLambda = true + + // here we use all panther tables, since the default is to restrict to these (presumes deployment) + const pantherDatabase = awsglue.LogProcessingDatabaseName + const pantherTable = "aws_cloudtrail" + + includePopulatedTablesOnly := false // these tables may not be populated but we want to list + + // -------- GetDatabases() + + // list + var getDatabasesInput models.GetDatabasesInput + getDatabasesOutput, err := runGetDatabases(useLambda, &getDatabasesInput) + require.NoError(t, err) + foundDB := false + nonPanther := false + for _, db := range getDatabasesOutput.Databases { + if db.Name == pantherDatabase { + foundDB = true + } + if !strings.HasPrefix(db.Name, "panther") { + nonPanther = true + } + } + assert.True(t, foundDB) + assert.False(t, nonPanther) + assert.Len(t, awsglue.PantherDatabases, len(getDatabasesOutput.Databases)) + + // specific lookup + getDatabasesInput.Name = aws.String(pantherDatabase) + getDatabasesOutput, err = runGetDatabases(useLambda, &getDatabasesInput) + require.NoError(t, err) + assert.Len(t, getDatabasesOutput.Databases, 1) + + // -------- GetTables() + + var getTablesInput models.GetTablesInput + getTablesInput.DatabaseName = pantherDatabase + getTablesInput.IncludePopulatedTablesOnly = &includePopulatedTablesOnly + getTablesOutput, err := runGetTables(useLambda, &getTablesInput) + require.NoError(t, err) + assert.Greater(t, len(getTablesOutput.Tables), 0) + + // -------- GetTablesDetail() + + var getTablesDetailInput models.GetTablesDetailInput + getTablesDetailInput.DatabaseName = pantherDatabase + getTablesDetailInput.Names = []string{pantherTable} + getTablesInput.IncludePopulatedTablesOnly = &includePopulatedTablesOnly + getTablesDetailOutput, err := runGetTablesDetail(useLambda, &getTablesDetailInput) + require.NoError(t, err) + require.Len(t, getTablesDetailOutput.Tables, 1) + // check that we are getting the mapped column names, we expect: "mapping.useragent": "userAgent" + mappedColumnName := "userAgent" // notice camel case + foundMappedColumn := false + for _, col := range getTablesDetailOutput.Tables[0].Columns { + if col.Name == mappedColumnName { + foundMappedColumn = true + break + } + } + assert.True(t, foundMappedColumn) +} + +func testAthenaAPI(t *testing.T, useLambda bool) { + if useLambda { + t.Log("indirect anthena calls thru deployed lambdas") + } else { + t.Log("direct athena calls from client") + } + + testutils.SetupTables(t, glueClient, s3Client) + defer func() { + testutils.RemoveTables(t, glueClient, s3Client) + }() + + // -------- ExecuteQuery() + + var executeQueryInput models.ExecuteQueryInput + executeQueryInput.UserID = aws.String(testUserID) + executeQueryInput.DatabaseName = testutils.TestDb + executeQueryInput.SQL = testSQL + executeQueryOutput, err := runExecuteQuery(useLambda, &executeQueryInput) + require.NoError(t, err) + assert.Equal(t, "", executeQueryOutput.QueryStatus.SQLError) + require.Equal(t, models.QuerySucceeded, executeQueryOutput.Status) + assert.Greater(t, executeQueryOutput.Stats.ExecutionTimeMilliseconds, int64(0)) // at least something + assert.Greater(t, executeQueryOutput.Stats.DataScannedBytes, int64(0)) // at least something + assert.Len(t, executeQueryOutput.ColumnInfo, len(testutils.TestTableColumns)+len(testutils.TestTablePartitions)) + for i, c := range executeQueryOutput.ColumnInfo { + if i < len(testutils.TestTableColumns) { + assert.Equal(t, *c.Value, *testutils.TestTableColumns[i].Name) + } else { // partitions + assert.Equal(t, *c.Value, *testutils.TestTablePartitions[i-len(testutils.TestTableColumns)].Name) + } + } + assert.Len(t, executeQueryOutput.ResultsPage.Rows, len(testutils.TestTableRows)) + checkQueryResults(t, len(testutils.TestTableRows), 0, executeQueryOutput.ResultsPage.Rows) + + // -------- ExecuteQuery() BAD SQL + + var executeBadQueryInput models.ExecuteQueryInput + executeBadQueryInput.UserID = aws.String(testUserID) + executeBadQueryInput.DatabaseName = testutils.TestDb + executeBadQueryInput.SQL = malformedSQL + executeBadQueryOutput, err := runExecuteQuery(useLambda, &executeBadQueryInput) + require.NoError(t, err) // NO LAMBDA ERROR here! + require.Equal(t, models.QueryFailed, executeBadQueryOutput.Status) + assert.True(t, strings.Contains(executeBadQueryOutput.SQLError, "mismatched input 'wewewewew'")) + assert.Equal(t, malformedSQL, executeBadQueryOutput.SQL) + + // -------- ExecuteQuery() DROP TABLE + + if useLambda { // only for lambda to test access restrictions + var executeDropTableInput models.ExecuteQueryInput + executeDropTableInput.UserID = aws.String(testUserID) + executeDropTableInput.DatabaseName = testutils.TestDb + executeDropTableInput.SQL = dropTableSQL + executeDropTableOutput, err := runExecuteQuery(useLambda, &executeDropTableInput) + require.NoError(t, err) // NO LAMBDA ERROR here! + require.Equal(t, models.QueryFailed, executeDropTableOutput.Status) + assert.True(t, strings.Contains(executeDropTableOutput.SQLError, "AccessDeniedException")) + assert.Equal(t, dropTableSQL, executeDropTableOutput.SQL) + } + + // -------- ExecuteQuery() CREATE TABLE AS + + if useLambda { // only for lambda to test access restrictions + var executeCreateTableAsInput models.ExecuteQueryInput + executeCreateTableAsInput.UserID = aws.String(testUserID) + executeCreateTableAsInput.DatabaseName = testutils.TestDb + executeCreateTableAsInput.SQL = createTableAsSQL + executeCreateTableAsOutput, err := runExecuteQuery(useLambda, &executeCreateTableAsInput) + require.NoError(t, err) // NO LAMBDA ERROR here! + require.Equal(t, models.QueryFailed, executeCreateTableAsOutput.Status) + assert.True(t, strings.Contains(executeCreateTableAsOutput.SQLError, "Insufficient permissions")) + assert.Equal(t, createTableAsSQL, executeCreateTableAsOutput.SQL) + } + + // -------- ExecuteQuery() Panther table + + if useLambda { // only for lambda to test s3 read permissions on panther data + var executeCreateTableAsInput models.ExecuteQueryInput + executeCreateTableAsInput.UserID = aws.String(testUserID) + executeCreateTableAsInput.DatabaseName = "panther_logs" + executeCreateTableAsInput.SQL = "select count(1) from aws_s3serveraccess" + executeCreateTableAsOutput, err := runExecuteQuery(useLambda, &executeCreateTableAsInput) + require.NoError(t, err) + require.Equal(t, models.QuerySucceeded, executeCreateTableAsOutput.Status) + } + + // -------- ExecuteAsyncQuery() + + var executeAsyncQueryInput models.ExecuteAsyncQueryInput + executeAsyncQueryInput.UserID = aws.String(testUserID) + executeAsyncQueryInput.DatabaseName = testutils.TestDb + executeAsyncQueryInput.SQL = testSQL + executeAsyncQueryOutput, err := runExecuteAsyncQuery(useLambda, &executeAsyncQueryInput) + require.NoError(t, err) + + // -------- GetQueryStatus() + + for { + time.Sleep(time.Second * 10) + getQueryStatusInput := &models.GetQueryStatusInput{ + QueryID: executeAsyncQueryOutput.QueryID, + } + getQueryStatusOutput, err := runGetQueryStatus(useLambda, getQueryStatusInput) + require.NoError(t, err) + if getQueryStatusOutput.Status != models.QueryRunning { + break + } + } + + // -------- GetQueryResults() test paging + + var getQueryResultsInput models.GetQueryResultsInput + getQueryResultsInput.QueryID = executeAsyncQueryOutput.QueryID + getQueryResultsInput.PageSize = &maxRowsPerResult + getQueryResultsOutput, err := runGetQueryResults(useLambda, &getQueryResultsInput) + require.NoError(t, err) + + if getQueryResultsOutput.Status == models.QuerySucceeded { + resultRowCount := 0 + + // -1 because header is removed + expectedRowCount := int(maxRowsPerResult) - 1 + require.Len(t, getQueryResultsOutput.ResultsPage.Rows, expectedRowCount) + checkQueryResults(t, expectedRowCount, 0, getQueryResultsOutput.ResultsPage.Rows) + resultRowCount += expectedRowCount + + for getQueryResultsOutput.ResultsPage.PaginationToken != nil { // when done this is nil + getQueryResultsInput.PaginationToken = getQueryResultsOutput.ResultsPage.PaginationToken + getQueryResultsOutput, err = runGetQueryResults(useLambda, &getQueryResultsInput) + require.NoError(t, err) + if getQueryResultsOutput.ResultsPage.NumRows > 0 { + expectedRowCount = int(maxRowsPerResult) + // the last page will have 1 less because we remove the header in the first page + if resultRowCount+len(getQueryResultsOutput.ResultsPage.Rows) == testutils.TestTableDataNrows { + expectedRowCount-- + } + require.Len(t, getQueryResultsOutput.ResultsPage.Rows, expectedRowCount) + checkQueryResults(t, expectedRowCount, resultRowCount, getQueryResultsOutput.ResultsPage.Rows) + resultRowCount += expectedRowCount + } + } + require.Equal(t, testutils.TestTableDataNrows, resultRowCount) + } else { + assert.Fail(t, "GetQueryResults failed") + } + + // -------- GetQueryResultsLink() for above query + + var getQueryResultsLinkInput models.GetQueryResultsLinkInput + getQueryResultsLinkInput.QueryID = executeAsyncQueryOutput.QueryID + + getQueryResultsLinkOutput, err := runGetQueryResultsLink(useLambda, &getQueryResultsLinkInput) + require.NoError(t, err) + + // try it ... + resultsResponse, err := http.Get(getQueryResultsLinkOutput.PresignedLink) + require.NoError(t, err) + require.Equal(t, 200, resultsResponse.StatusCode) + + // -------- ExecuteAsyncQuery() BAD SQL + + var executeBadAsyncQueryInput models.ExecuteAsyncQueryInput + executeBadAsyncQueryInput.UserID = aws.String(testUserID) + executeBadAsyncQueryInput.DatabaseName = testutils.TestDb + executeBadAsyncQueryInput.SQL = badExecutingSQL + executeBadAsyncQueryOutput, err := runExecuteAsyncQuery(useLambda, &executeBadAsyncQueryInput) + require.NoError(t, err) + + for { + time.Sleep(time.Second * 2) + var getQueryStatusInput models.GetQueryStatusInput + getQueryStatusInput.QueryID = executeBadAsyncQueryOutput.QueryID + getQueryStatusOutput, err := runGetQueryStatus(useLambda, &getQueryStatusInput) + require.NoError(t, err) + if getQueryStatusOutput.Status != models.QueryRunning { + require.Equal(t, models.QueryFailed, getQueryStatusOutput.Status) + assert.True(t, strings.Contains(getQueryStatusOutput.SQLError, "does not exist")) + assert.Equal(t, badExecutingSQL, getQueryStatusOutput.SQL) + break + } + } + + // -------- GetQueryResultsLink() for above FAILED query + + var getBadAsyncQueryResultsLinkInput models.GetQueryResultsLinkInput + getBadAsyncQueryResultsLinkInput.QueryID = executeBadAsyncQueryOutput.QueryID + + getBadAsyncQueryResultsLinkOutput, err := runGetQueryResultsLink(useLambda, &getBadAsyncQueryResultsLinkInput) + require.NoError(t, err) + require.Equal(t, models.QueryFailed, getBadAsyncQueryResultsLinkOutput.Status) + assert.Equal(t, "results not available", getBadAsyncQueryResultsLinkOutput.SQLError) + + // -------- StopQuery() + + var executeStopQueryInput models.ExecuteAsyncQueryInput + executeStopQueryInput.DatabaseName = testutils.TestDb + executeStopQueryInput.SQL = testSQL + executeStopQueryOutput, err := runExecuteAsyncQuery(useLambda, &executeStopQueryInput) + require.NoError(t, err) + + var stopQueryInput models.StopQueryInput + stopQueryInput.QueryID = executeStopQueryOutput.QueryID + _, err = runStopQuery(useLambda, &stopQueryInput) + require.NoError(t, err) + + for { + time.Sleep(time.Second * 2) + var getQueryStatusInput models.GetQueryStatusInput + getQueryStatusInput.QueryID = executeStopQueryOutput.QueryID + getQueryStatusOutput, err := runGetQueryStatus(useLambda, &getQueryStatusInput) + require.NoError(t, err) + if getQueryStatusOutput.Status != models.QueryRunning { + require.Equal(t, models.QueryCancelled, getQueryStatusOutput.Status) + assert.Equal(t, getQueryStatusOutput.SQLError, "Query canceled") + assert.Equal(t, testSQL, getQueryStatusOutput.SQL) + break + } + } + + // -------- ExecuteAsyncQueryNotify() + + /* + See: https://aws.amazon.com/premiumsupport/knowledge-center/appsync-notify-subscribers-real-time/ + + To see queryDone subscriptions work in the AppSync console: + - Go to Queries + - Pick IAM as auth method + - Add a subscription below and click "play" button ... you should see "Subscribed to 1 mutations" and a spinner: + + subscription integQuerySub { + queryDone(userData: "testUserData") { + userData + queryId + workflowId + } + } + + - Run integration tests: + pushd internal/core/database_api/athena/driver/api/ + export INTEGRATION_TEST=true + aws-vault exec dev--admin -d 3h -- go test -v + + - After a minute or two in the console you should see in the results pane something like: + + { + "data": { + "queryDone": { + "userData": "testUserData", + "queryId": "4c223d6e-a41a-418f-b97b-b01f044cbdc9", + "workflowId": "arn:aws:states:us-east-2:0506036XXXXX:execution:panther-athena-workflow:cf56beb0-7493-42ae-a9fd-a024812b8eac" + } + } + } + + NOTE: the UI should call the lambda panther-athena-api:ExecuteAsyncQueryNotify as below and set up + a subscription filtering by user id (or session id). When the query finishes appsync will be notified. + UI should use the queryId to call panther-athena-api:GetQueryResults to display results. + */ + + userData := "testUserData" // this is expected to be passed all the way through the workflow, validations will enforce + + var executeAsyncQueryNotifyInput models.ExecuteAsyncQueryNotifyInput + executeAsyncQueryNotifyInput.UserID = aws.String(testUserID) + executeAsyncQueryNotifyInput.DatabaseName = testutils.TestDb + executeAsyncQueryNotifyInput.SQL = testSQL + executeAsyncQueryNotifyInput.LambdaName = "panther-athena-api" + executeAsyncQueryNotifyInput.MethodName = "notifyAppSync" + executeAsyncQueryNotifyInput.UserData = userData + executeAsyncQueryNotifyOutput, err := runExecuteAsyncQueryNotify(useLambda, &executeAsyncQueryNotifyInput) + require.NoError(t, err) + + // wait for workflow to finish + for { + time.Sleep(time.Second * 10) + descExecutionInput := &sfn.DescribeExecutionInput{ + ExecutionArn: &executeAsyncQueryNotifyOutput.WorkflowID, + } + descExecutionOutput, err := sfnClient.DescribeExecution(descExecutionInput) + require.NoError(t, err) + if *descExecutionOutput.Status != sfn.ExecutionStatusRunning { + require.Equal(t, sfn.ExecutionStatusSucceeded, *descExecutionOutput.Status) + break + } + } +} + +func runGetDatabases(useLambda bool, input *models.GetDatabasesInput) (*models.GetDatabasesOutput, error) { + if useLambda { + var getDatabasesInput = struct { + GetDatabases *models.GetDatabasesInput + }{ + input, + } + var getDatabasesOutput *models.GetDatabasesOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", getDatabasesInput, &getDatabasesOutput) + printAPI(getDatabasesInput, getDatabasesOutput) + return getDatabasesOutput, err + } + return api.GetDatabases(input) +} + +func runGetTables(useLambda bool, input *models.GetTablesInput) (*models.GetTablesOutput, error) { + if useLambda { + var getTablesInput = struct { + GetTables *models.GetTablesInput + }{ + input, + } + var getTablesOutput *models.GetTablesOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", getTablesInput, &getTablesOutput) + printAPI(getTablesInput, getTablesOutput) + return getTablesOutput, err + } + return api.GetTables(input) +} + +func runGetTablesDetail(useLambda bool, input *models.GetTablesDetailInput) (*models.GetTablesDetailOutput, error) { + if useLambda { + var getTablesDetailInput = struct { + GetTablesDetail *models.GetTablesDetailInput + }{ + input, + } + var getTablesDetailOutput *models.GetTablesDetailOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", getTablesDetailInput, &getTablesDetailOutput) + printAPI(getTablesDetailInput, getTablesDetailOutput) + return getTablesDetailOutput, err + } + return api.GetTablesDetail(input) +} + +func runExecuteQuery(useLambda bool, input *models.ExecuteQueryInput) (*models.ExecuteQueryOutput, error) { + if useLambda { + var executeQueryInput = struct { + ExecuteQuery *models.ExecuteQueryInput + }{ + input, + } + var executeQueryOutput *models.ExecuteQueryOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", executeQueryInput, &executeQueryOutput) + printAPI(executeQueryInput, executeQueryOutput) + return executeQueryOutput, err + } + return api.ExecuteQuery(input) +} + +func runExecuteAsyncQuery(useLambda bool, input *models.ExecuteAsyncQueryInput) (*models.ExecuteAsyncQueryOutput, error) { + if useLambda { + var executeAsyncQueryInput = struct { + ExecuteAsyncQuery *models.ExecuteAsyncQueryInput + }{ + input, + } + var executeAsyncQueryOutput *models.ExecuteAsyncQueryOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", executeAsyncQueryInput, &executeAsyncQueryOutput) + printAPI(executeAsyncQueryInput, executeAsyncQueryOutput) + return executeAsyncQueryOutput, err + } + return api.ExecuteAsyncQuery(input) +} + +func runExecuteAsyncQueryNotify(useLambda bool, input *models.ExecuteAsyncQueryNotifyInput) (*models.ExecuteAsyncQueryNotifyOutput, error) { + if useLambda { + var executeAsyncQueryNotifyInput = struct { + ExecuteAsyncQueryNotify *models.ExecuteAsyncQueryNotifyInput + }{ + input, + } + var executeAsyncQueryNotifyOutput *models.ExecuteAsyncQueryNotifyOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", executeAsyncQueryNotifyInput, &executeAsyncQueryNotifyOutput) + printAPI(executeAsyncQueryNotifyInput, executeAsyncQueryNotifyOutput) + return executeAsyncQueryNotifyOutput, err + } + return api.ExecuteAsyncQueryNotify(input) +} + +func runGetQueryStatus(useLambda bool, input *models.GetQueryStatusInput) (*models.GetQueryStatusOutput, error) { + if useLambda { + var getQueryStatusInput = struct { + GetQueryStatus *models.GetQueryStatusInput + }{ + input, + } + var getQueryStatusOutput *models.GetQueryStatusOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", getQueryStatusInput, &getQueryStatusOutput) + printAPI(getQueryStatusInput, getQueryStatusOutput) + return getQueryStatusOutput, err + } + return api.GetQueryStatus(input) +} + +func runGetQueryResults(useLambda bool, input *models.GetQueryResultsInput) (*models.GetQueryResultsOutput, error) { + if useLambda { + var getQueryResultsInput = struct { + GetQueryResults *models.GetQueryResultsInput + }{ + input, + } + var getQueryResultsOutput *models.GetQueryResultsOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", getQueryResultsInput, &getQueryResultsOutput) + printAPI(getQueryResultsInput, getQueryResultsOutput) + return getQueryResultsOutput, err + } + return api.GetQueryResults(input) +} + +func runGetQueryResultsLink(useLambda bool, input *models.GetQueryResultsLinkInput) (*models.GetQueryResultsLinkOutput, error) { + if useLambda { + var getQueryResultsLinkInput = struct { + GetQueryResultsLink *models.GetQueryResultsLinkInput + }{ + input, + } + var getQueryResultsLinkOutput *models.GetQueryResultsLinkOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", getQueryResultsLinkInput, &getQueryResultsLinkOutput) + printAPI(getQueryResultsLinkInput, getQueryResultsLinkOutput) + return getQueryResultsLinkOutput, err + } + return api.GetQueryResultsLink(input) +} + +func runStopQuery(useLambda bool, input *models.StopQueryInput) (*models.StopQueryOutput, error) { + if useLambda { + var stopQueryInput = struct { + StopQuery *models.StopQueryInput + }{ + input, + } + var stopQueryOutput *models.StopQueryOutput + err := genericapi.Invoke(lambdaClient, "panther-athena-api", stopQueryInput, &stopQueryOutput) + printAPI(stopQueryInput, stopQueryOutput) + return stopQueryOutput, err + } + return api.StopQuery(input) +} + +func checkQueryResults(t *testing.T, expectedRowCount, offset int, rows []*models.Row) { + require.Len(t, rows, expectedRowCount) + for i := 0; i < len(rows); i++ { + require.Equal(t, strconv.Itoa(i+offset), *rows[i].Columns[0].Value) + require.Equal(t, (*string)(nil), rows[i].Columns[1].Value) + require.Equal(t, testutils.TestEventTime, *rows[i].Columns[2].Value) + } +} + +// useful to share examples of json APi usage +func printAPI(input, output interface{}) { + if !printJSON { + return + } + inputJSON, _ := json.MarshalIndent(input, "", " ") + outputJSON, _ := json.MarshalIndent(output, "", " ") + fmt.Printf("\nrequest:\n%s\nreply:\n%s\n", string(inputJSON), string(outputJSON)) +} diff --git a/internal/core/database_api/athena/driver/api/invoke_notify_lambda.go b/internal/core/database_api/athena/driver/api/invoke_notify_lambda.go new file mode 100644 index 0000000000..2b5902f5e4 --- /dev/null +++ b/internal/core/database_api/athena/driver/api/invoke_notify_lambda.go @@ -0,0 +1,79 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/service/lambda" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" +) + +// called by Step workflow to execute callback lambda after query has finished +func (API) InvokeNotifyLambda(input *models.InvokeNotifyLambdaInput) (*models.InvokeNotifyLambdaOutput, error) { + // copy input so parameters can be confirmed in caller (useful for debugging Step functions) + var output = *input + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + zap.L().Info("InvokeNotifyLambda", + zap.String("userData", input.UserData), + zap.String("queryId", input.QueryID), + zap.String("workflowID", input.WorkflowID), + zap.String("lambdaName", input.LambdaName), + zap.String("methodName", input.MethodName), + zap.Error(err)) + }() + + // these lambdas are expected to take userData, queryId and workflowId in as arguments + var notifyInput models.NotifyInput + notifyInput.QueryID = input.QueryID + notifyInput.WorkflowID = input.WorkflowID + notifyInput.UserData = input.UserData + payload, err := jsoniter.MarshalToString(¬ifyInput) + if err != nil { + err = errors.Wrapf(err, "failed to marshal %#v", input) + return &output, err + } + + // genericapi used + resp, err := lambdaClient.Invoke(&lambda.InvokeInput{ + FunctionName: &input.LambdaName, + Payload: []byte(fmt.Sprintf(`{ "%s": %s}`, input.MethodName, payload)), // genericapi + }) + if err != nil { + err = errors.Wrapf(err, "failed to invoke %#v", input) + return &output, err + } + if resp.FunctionError != nil { + err = errors.Errorf("%s: failed to invoke %#v", *resp.FunctionError, input) + return &output, err + } + + return &output, nil +} diff --git a/internal/core/database_api/athena/driver/api/notify_appsync.go b/internal/core/database_api/athena/driver/api/notify_appsync.go new file mode 100644 index 0000000000..e29800c6dc --- /dev/null +++ b/internal/core/database_api/athena/driver/api/notify_appsync.go @@ -0,0 +1,136 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "time" + + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" +) + +// https://docs.aws.amazon.com/appsync/latest/devguide/tutorial-local-resolvers.html + +const ( + // what we send to appsync + mutationTemplate = ` +mutation { + queryDone(input: { + userData: "%s", + queryId: "%s", + workflowId: "%s" + }) { + userData + queryId + workflowId + } +} +` +) + +type GraphQlQuery struct { + Query string `json:"query"` // when we marshal, this will escape the mutation JSON as required by graphQL +} + +type GraphQlResponse struct { + Data interface{} `json:"data"` + Errors []interface{} `json:"errors"` +} + +func (API) NotifyAppSync(input *models.NotifyAppSyncInput) (*models.NotifyAppSyncOutput, error) { + var output models.NotifyAppSyncOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + zap.L().Info("NotifyAppSync", + zap.String("userData", input.UserData), + zap.String("queryId", input.QueryID), + zap.String("workflowID", input.WorkflowID), + zap.Error(err)) + }() + + // make sigv4 https request to appsync endpoint notifying query is complete, sending userData, queryId and workflowId + var httpClient http.Client + signer := v4.NewSigner(awsSession.Config.Credentials) + + mutation := &GraphQlQuery{ + Query: fmt.Sprintf(mutationTemplate, input.UserData, input.QueryID, input.WorkflowID), + } + jsonMessage, err := jsoniter.Marshal(mutation) + if err != nil { + err = errors.Wrapf(err, "json marshal failed for: %#v", input) + return &output, err + } + + body := bytes.NewReader(jsonMessage) // JSON envelope for graphQL + + req, err := http.NewRequest("POST", envConfig.GraphqlEndpoint, body) + if err != nil { + err = errors.Wrapf(err, "new htttp request failed for: %#v", input) + return &output, err + } + req.Header.Add("Content-Type", "application/json") + + _, err = signer.Sign(req, body, "appsync", *awsSession.Config.Region, time.Now().UTC()) + if err != nil { + err = errors.Wrapf(err, "failed to v4 sign %#v", input) + return &output, err + } + + resp, err := httpClient.Do(req) + if err != nil { + err = errors.Wrapf(err, "failed to POST %#v", req) + return &output, err + } + defer resp.Body.Close() + + output.StatusCode = resp.StatusCode + + respBody, _ := ioutil.ReadAll(resp.Body) // used for error messages below to add context + if resp.StatusCode != http.StatusOK { + err = errors.Errorf("failed to POST (%d): %s", resp.StatusCode, string(respBody)) + return &output, err + } + + graphQlResp := &GraphQlResponse{} + err = jsoniter.Unmarshal(respBody, graphQlResp) + if err != nil { + err = errors.Wrapf(err, "json unmarshal failed for: %#v", string(respBody)) + return &output, err + } + if len(graphQlResp.Errors) > 0 { + err = errors.Errorf("graphQL error for %#v: %#v", input, graphQlResp) + return &output, err + } + + return &output, nil +} diff --git a/internal/core/database_api/athena/driver/api/stop_query.go b/internal/core/database_api/athena/driver/api/stop_query.go new file mode 100644 index 0000000000..d6a48baa5f --- /dev/null +++ b/internal/core/database_api/athena/driver/api/stop_query.go @@ -0,0 +1,49 @@ +package api + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "go.uber.org/zap" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsathena" +) + +func (api API) StopQuery(input *models.StopQueryInput) (*models.StopQueryOutput, error) { + var output models.StopQueryOutput + + var err error + defer func() { + if err != nil { + err = apiError(err) // lambda failed + } + + // allows tracing queries + zap.L().Info("StopQuery", + zap.String("queryId", input.QueryID), + zap.Error(err)) + }() + + _, err = awsathena.StopQuery(athenaClient, input.QueryID) + if err != nil { + return &output, err + } + + return api.GetQueryStatus(input) +} diff --git a/internal/core/database_api/athena/driver/main/main.go b/internal/core/database_api/athena/driver/main/main.go new file mode 100644 index 0000000000..938f2a990b --- /dev/null +++ b/internal/core/database_api/athena/driver/main/main.go @@ -0,0 +1,47 @@ +package main + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "context" + + "github.com/aws/aws-lambda-go/lambda" + "gopkg.in/go-playground/validator.v9" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/internal/core/database_api/athena/driver/api" + "github.com/panther-labs/panther/pkg/genericapi" + "github.com/panther-labs/panther/pkg/lambdalogger" +) + +var router *genericapi.Router + +func init() { + router = genericapi.NewRouter("database", "athena", validator.New(), api.API{}) +} + +func lambdaHandler(ctx context.Context, request *models.LambdaInput) (interface{}, error) { + lambdalogger.ConfigureGlobal(ctx, nil) + return router.Handle(request) +} + +func main() { + api.SessionInit() + lambda.Start(lambdaHandler) +} diff --git a/internal/core/database_api/athena/testutils/testutils.go b/internal/core/database_api/athena/testutils/testutils.go new file mode 100644 index 0000000000..a02b97234c --- /dev/null +++ b/internal/core/database_api/athena/testutils/testutils.go @@ -0,0 +1,295 @@ +package testutils + +/** + * Panther is a Cloud-Native SIEM for the Modern Security Team. + * Copyright (C) 2020 Panther Labs Inc + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/glue" + "github.com/aws/aws-sdk-go/service/glue/glueiface" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/stretchr/testify/require" + + "github.com/panther-labs/panther/api/lambda/database/models" + "github.com/panther-labs/panther/pkg/awsbatch/s3batch" +) + +/* + This file has functions to create a bucket and a Panther JSON table, populated with a small amount of data. + There are also functions to clean up after running the tests. + + This can be used to drive Athena API related integration tests for packages that need example data. +*/ + +const ( + TestBucketPrefix = "panther-athena-api-processeddata-test-" + TestDb = "panther_athena_api_test_db" + TestTable = "panther_athena_test_table" +) + +var ( + TestBucket string + + TestPartitionTime = time.Date(2020, 3, 2, 1, 0, 0, 0, time.UTC) + + TestYear = fmt.Sprintf("%d", TestPartitionTime.Year()) + TestMonth = fmt.Sprintf("%02d", TestPartitionTime.Month()) + TestDay = fmt.Sprintf("%02d", TestPartitionTime.Day()) + TestHour = fmt.Sprintf("%02d", TestPartitionTime.Hour()) + + TestEventTime = TestPartitionTime.Format(`2006-01-02 15:04:05.000`) + + TestTableColumns = []*glue.Column{ + { + Name: aws.String("col1"), + Type: aws.String("int"), + Comment: aws.String("this is a column"), + }, + { + Name: aws.String("col2"), + Type: aws.String("int"), + Comment: aws.String("this is a column"), + }, + { + Name: aws.String("p_event_time"), + Type: aws.String("timestamp"), + Comment: aws.String("this is a panther column"), + }, + } + + YearPartitionName = "year" + MonthPartitionName = "month" + DayPartitionName = "day" + HourPartitionName = "hour" + + TestTablePartitions = []*glue.Column{ + { + Name: aws.String(YearPartitionName), + Type: aws.String("int"), + Comment: aws.String("this is the year"), + }, + { + Name: aws.String(MonthPartitionName), + Type: aws.String("int"), + Comment: aws.String("this is the month"), + }, + { + Name: aws.String(DayPartitionName), + Type: aws.String("int"), + Comment: aws.String("this is the day"), + }, + { + Name: aws.String(HourPartitionName), + Type: aws.String("int"), + Comment: aws.String("this is the hour"), + }, + } + + TestKey string + + TestTableDataNrows = 10 + TestTableRowTemplate = `{"col1": %d, "col2": null, "p_event_time": "%s"}` + TestTableRows []string +) + +func init() { + TestBucket = TestBucketPrefix + time.Now().Format("20060102150405") + + // make it look like log data + TestKey = "logs/aws_cloudtrail/" + TestKey += YearPartitionName + "=" + TestYear + "/" + TestKey += MonthPartitionName + "=" + TestMonth + "/" + TestKey += DayPartitionName + "=" + TestDay + "/" + TestKey += HourPartitionName + "=" + TestHour + "/" + TestKey += "testdata.json" + + for i := 0; i < TestTableDataNrows; i++ { + TestTableRows = append(TestTableRows, fmt.Sprintf(TestTableRowTemplate, i, TestEventTime)) + } +} + +func CheckTableDetail(t *testing.T, tables []*models.TableDetail) { + require.Equal(t, TestTable, tables[0].Name) + require.Equal(t, len(TestTableColumns)+len(TestTablePartitions), len(tables[0].Columns)) + + // col1 + require.Equal(t, *TestTableColumns[0].Name, tables[0].Columns[0].Name) + require.Equal(t, *TestTableColumns[0].Type, tables[0].Columns[0].Type) + require.Equal(t, *TestTableColumns[0].Comment, *tables[0].Columns[0].Description) + + // col2 + require.Equal(t, *TestTableColumns[1].Name, tables[0].Columns[1].Name) + require.Equal(t, *TestTableColumns[1].Type, tables[0].Columns[1].Type) + require.Equal(t, *TestTableColumns[1].Comment, *tables[0].Columns[1].Description) + + // p_event_time + require.Equal(t, *TestTableColumns[2].Name, tables[0].Columns[2].Name) + require.Equal(t, *TestTableColumns[2].Type, tables[0].Columns[2].Type) + require.Equal(t, *TestTableColumns[2].Comment, *tables[0].Columns[2].Description) + + // year + require.Equal(t, *TestTablePartitions[0].Name, tables[0].Columns[3].Name) + require.Equal(t, *TestTablePartitions[0].Type, tables[0].Columns[3].Type) + require.Equal(t, *TestTablePartitions[0].Comment, *tables[0].Columns[3].Description) + + // month + require.Equal(t, *TestTablePartitions[1].Name, tables[0].Columns[4].Name) + require.Equal(t, *TestTablePartitions[1].Type, tables[0].Columns[4].Type) + require.Equal(t, *TestTablePartitions[1].Comment, *tables[0].Columns[4].Description) + + // day + require.Equal(t, *TestTablePartitions[2].Name, tables[0].Columns[5].Name) + require.Equal(t, *TestTablePartitions[2].Type, tables[0].Columns[5].Type) + require.Equal(t, *TestTablePartitions[2].Comment, *tables[0].Columns[5].Description) + + // hour + require.Equal(t, *TestTablePartitions[3].Name, tables[0].Columns[6].Name) + require.Equal(t, *TestTablePartitions[3].Type, tables[0].Columns[6].Type) + require.Equal(t, *TestTablePartitions[3].Comment, *tables[0].Columns[6].Description) +} + +func SetupTables(t *testing.T, glueClient glueiface.GlueAPI, s3Client s3iface.S3API) { + RemoveTables(t, glueClient, s3Client) // in case of left over + AddTables(t, glueClient, s3Client) +} + +func AddTables(t *testing.T, glueClient glueiface.GlueAPI, s3Client s3iface.S3API) { + var err error + + bucketInput := &s3.CreateBucketInput{Bucket: aws.String(TestBucket)} + _, err = s3Client.CreateBucket(bucketInput) + require.NoError(t, err) + + dbInput := &glue.CreateDatabaseInput{ + DatabaseInput: &glue.DatabaseInput{ + Name: aws.String(TestDb), + }, + } + _, err = glueClient.CreateDatabase(dbInput) + require.NoError(t, err) + + storageDecriptor := &glue.StorageDescriptor{ // configure as JSON + Columns: TestTableColumns, + Location: aws.String("s3://" + TestBucket + "/"), + InputFormat: aws.String("org.apache.hadoop.mapred.TextInputFormat"), + OutputFormat: aws.String("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"), + SerdeInfo: &glue.SerDeInfo{ + SerializationLibrary: aws.String("org.openx.data.jsonserde.JsonSerDe"), + Parameters: map[string]*string{ + "serialization.format": aws.String("1"), + "case.insensitive": aws.String("TRUE"), // treat as lower case + }, + }, + } + + tableInput := &glue.CreateTableInput{ + DatabaseName: aws.String(TestDb), + TableInput: &glue.TableInput{ + Name: aws.String(TestTable), + PartitionKeys: TestTablePartitions, + StorageDescriptor: storageDecriptor, + TableType: aws.String("EXTERNAL_TABLE"), + }, + } + _, err = glueClient.CreateTable(tableInput) + require.NoError(t, err) + + putInput := &s3.PutObjectInput{ + Body: strings.NewReader(strings.Join(TestTableRows, "\n")), + Bucket: &TestBucket, + Key: &TestKey, + } + _, err = s3Client.PutObject(putInput) + require.NoError(t, err) + time.Sleep(time.Second / 4) // short pause since S3 is eventually consistent + + _, err = glueClient.CreatePartition(&glue.CreatePartitionInput{ + DatabaseName: aws.String(TestDb), + TableName: aws.String(TestTable), + PartitionInput: &glue.PartitionInput{ + StorageDescriptor: storageDecriptor, + Values: []*string{ + aws.String(TestYear), + aws.String(TestMonth), + aws.String(TestDay), + aws.String(TestHour), + }, + }, + }) + require.NoError(t, err) +} + +func RemoveTables(t *testing.T, glueClient glueiface.GlueAPI, s3Client s3iface.S3API) { + // best effort, no error checks + + tableInput := &glue.DeleteTableInput{ + DatabaseName: aws.String(TestDb), + Name: aws.String(TestTable), + } + glueClient.DeleteTable(tableInput) // nolint (errcheck) + + dbInput := &glue.DeleteDatabaseInput{ + Name: aws.String(TestDb), + } + glueClient.DeleteDatabase(dbInput) // nolint (errcheck) + + RemoveBucket(s3Client, TestBucket) +} + +func RemoveBucket(client s3iface.S3API, bucketName string) { + input := &s3.ListObjectVersionsInput{Bucket: &bucketName} + var objectVersions []*s3.ObjectIdentifier + + // List all object versions (including delete markers) + err := client.ListObjectVersionsPages(input, func(page *s3.ListObjectVersionsOutput, lastPage bool) bool { + for _, marker := range page.DeleteMarkers { + objectVersions = append(objectVersions, &s3.ObjectIdentifier{ + Key: marker.Key, VersionId: marker.VersionId}) + } + + for _, version := range page.Versions { + objectVersions = append(objectVersions, &s3.ObjectIdentifier{ + Key: version.Key, VersionId: version.VersionId}) + } + return false + }) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "NoSuchBucket" { + return + } + } + + err = s3batch.DeleteObjects(client, 2*time.Minute, &s3.DeleteObjectsInput{ + Bucket: &bucketName, + Delete: &s3.Delete{Objects: objectVersions}, + }) + if err != nil { + return + } + time.Sleep(time.Second / 4) // short pause since S3 is eventually consistent to avoid next call from failing + if _, err = client.DeleteBucket(&s3.DeleteBucketInput{Bucket: &bucketName}); err != nil { + return + } +} diff --git a/pkg/awsglue/glue.go b/pkg/awsglue/glue.go index 44e2c73cee..3e2b634e83 100644 --- a/pkg/awsglue/glue.go +++ b/pkg/awsglue/glue.go @@ -50,6 +50,15 @@ const ( ViewsDatabaseDescription = "Holds views useful for querying Panther data" ) +var ( + // PantherDatabases is exposed as public var to allow code to get/lookup the Panther databases + PantherDatabases = map[string]string{ + LogProcessingDatabaseName: LogProcessingDatabaseDescription, + RuleMatchDatabaseName: RuleMatchDatabaseDescription, + ViewsDatabaseName: ViewsDatabaseDescription, + } +) + type PartitionKey struct { Name string Type string diff --git a/tools/mage/deploy.go b/tools/mage/deploy.go index 0d8bebdb37..aec859eb00 100644 --- a/tools/mage/deploy.go +++ b/tools/mage/deploy.go @@ -199,17 +199,8 @@ func bootstrap(awsSession *session.Session, settings *config.PantherConfig) map[ // Deploy first bootstrap stack go func() { - // the example yml has an empty string to make it clear it is a list, remove empty strings - var sanitizedLogSubscriptionArns []string - for _, arn := range settings.Setup.LogSubscriptions.PrincipalARNs { - if arn == "" { - continue - } - sanitizedLogSubscriptionArns = append(sanitizedLogSubscriptionArns, arn) - } - params := map[string]string{ - "LogSubscriptionPrincipals": strings.Join(sanitizedLogSubscriptionArns, ","), + "LogSubscriptionPrincipals": strings.Join(settings.Setup.LogSubscriptions.PrincipalARNs, ","), "EnableS3AccessLogs": strconv.FormatBool(settings.Setup.EnableS3AccessLogs), "AccessLogsBucket": settings.Setup.S3AccessLogsBucket, "CertificateArn": certificateArn(awsSession, settings), @@ -413,6 +404,9 @@ func deployMainStacks(awsSession *session.Session, settings *config.PantherConfi go func(result chan string) { deployTemplate(awsSession, logAnalysisTemplate, sourceBucket, logAnalysisStack, map[string]string{ "AnalysisApiId": outputs["AnalysisApiId"], + "AthenaResultsBucket": outputs["AthenaResultsBucket"], + "GraphQLApiEndpoint": outputs["GraphQLApiEndpoint"], + "GraphQLApiId": outputs["GraphQLApiId"], "ProcessedDataBucket": outputs["ProcessedDataBucket"], "ProcessedDataTopicArn": outputs["ProcessedDataTopicArn"], "PythonLayerVersionArn": outputs["PythonLayerVersionArn"], diff --git a/web/__generated__/schema.tsx b/web/__generated__/schema.tsx index 1f5e42de39..b7849c8e38 100644 --- a/web/__generated__/schema.tsx +++ b/web/__generated__/schema.tsx @@ -502,6 +502,7 @@ export type Mutation = { deleteRule?: Maybe; deleteUser?: Maybe; inviteUser: User; + queryDone: QueryDone; remediateResource?: Maybe; resetUserPassword: User; suppressPolicies?: Maybe; @@ -564,6 +565,10 @@ export type MutationInviteUserArgs = { input?: Maybe; }; +export type MutationQueryDoneArgs = { + input: QueryDoneInput; +}; + export type MutationRemediateResourceArgs = { input: RemediateResourceInput; }; @@ -820,6 +825,19 @@ export type QueryRulesArgs = { input?: Maybe; }; +export type QueryDone = { + __typename?: 'QueryDone'; + userData: Scalars['String']; + queryId: Scalars['String']; + workflowId: Scalars['String']; +}; + +export type QueryDoneInput = { + userData: Scalars['String']; + queryId: Scalars['String']; + workflowId: Scalars['String']; +}; + export type RemediateResourceInput = { policyId: Scalars['ID']; resourceId: Scalars['ID']; @@ -939,6 +957,15 @@ export type SqsConfigInput = { queueUrl: Scalars['String']; }; +export type Subscription = { + __typename?: 'Subscription'; + queryDone?: Maybe; +}; + +export type SubscriptionQueryDoneArgs = { + userData: Scalars['String']; +}; + export type SuppressPoliciesInput = { policyIds: Array>; resourcePatterns: Array>; @@ -1190,6 +1217,8 @@ export type ResolversTypes = { DeleteRuleInput: DeleteRuleInput; DeleteRuleInputItem: DeleteRuleInputItem; InviteUserInput: InviteUserInput; + QueryDoneInput: QueryDoneInput; + QueryDone: ResolverTypeWrapper; RemediateResourceInput: RemediateResourceInput; SuppressPoliciesInput: SuppressPoliciesInput; TestPolicyInput: TestPolicyInput; @@ -1202,6 +1231,7 @@ export type ResolversTypes = { UpdateUserInput: UpdateUserInput; UploadPoliciesInput: UploadPoliciesInput; UploadPoliciesResponse: ResolverTypeWrapper; + Subscription: ResolverTypeWrapper<{}>; AccountTypeEnum: AccountTypeEnum; }; @@ -1300,6 +1330,8 @@ export type ResolversParentTypes = { DeleteRuleInput: DeleteRuleInput; DeleteRuleInputItem: DeleteRuleInputItem; InviteUserInput: InviteUserInput; + QueryDoneInput: QueryDoneInput; + QueryDone: QueryDone; RemediateResourceInput: RemediateResourceInput; SuppressPoliciesInput: SuppressPoliciesInput; TestPolicyInput: TestPolicyInput; @@ -1312,6 +1344,7 @@ export type ResolversParentTypes = { UpdateUserInput: UpdateUserInput; UploadPoliciesInput: UploadPoliciesInput; UploadPoliciesResponse: UploadPoliciesResponse; + Subscription: {}; AccountTypeEnum: AccountTypeEnum; }; @@ -1701,6 +1734,12 @@ export type MutationResolvers< ContextType, RequireFields >; + queryDone?: Resolver< + ResolversTypes['QueryDone'], + ParentType, + ContextType, + RequireFields + >; remediateResource?: Resolver< Maybe, ParentType, @@ -2027,6 +2066,16 @@ export type QueryResolvers< users?: Resolver, ParentType, ContextType>; }; +export type QueryDoneResolvers< + ContextType = any, + ParentType extends ResolversParentTypes['QueryDone'] = ResolversParentTypes['QueryDone'] +> = { + userData?: Resolver; + queryId?: Resolver; + workflowId?: Resolver; + __isTypeOf?: isTypeOfResolverFn; +}; + export type ResourceDetailsResolvers< ContextType = any, ParentType extends ResolversParentTypes['ResourceDetails'] = ResolversParentTypes['ResourceDetails'] @@ -2146,6 +2195,19 @@ export type SqsConfigResolvers< __isTypeOf?: isTypeOfResolverFn; }; +export type SubscriptionResolvers< + ContextType = any, + ParentType extends ResolversParentTypes['Subscription'] = ResolversParentTypes['Subscription'] +> = { + queryDone?: SubscriptionResolver< + Maybe, + 'queryDone', + ParentType, + ContextType, + RequireFields + >; +}; + export type TestPolicyResponseResolvers< ContextType = any, ParentType extends ResolversParentTypes['TestPolicyResponse'] = ResolversParentTypes['TestPolicyResponse'] @@ -2226,6 +2288,7 @@ export type Resolvers = { PolicyUnitTest?: PolicyUnitTestResolvers; PolicyUnitTestError?: PolicyUnitTestErrorResolvers; Query?: QueryResolvers; + QueryDone?: QueryDoneResolvers; ResourceDetails?: ResourceDetailsResolvers; ResourceSummary?: ResourceSummaryResolvers; RuleDetails?: RuleDetailsResolvers; @@ -2235,6 +2298,7 @@ export type Resolvers = { SlackConfig?: SlackConfigResolvers; SnsConfig?: SnsConfigResolvers; SqsConfig?: SqsConfigResolvers; + Subscription?: SubscriptionResolvers; TestPolicyResponse?: TestPolicyResponseResolvers; UploadPoliciesResponse?: UploadPoliciesResponseResolvers; User?: UserResolvers;