Skip to content

Commit

Permalink
Athena Lambda API (#647)
Browse files Browse the repository at this point in the history
* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* Checkpoint work

* mage fmt fix

* Make athena api json tag camel case

* Rename DoQuery to ExecuteQuery and StartQuery to ExecuteAsyncQuery

* Make athena api optional attributes pointers

* Move HavingData in athena api to table listing call from table detail call

* Remove wrapped panther athena api functions, api methods now execute code directly

* Change Done() method to IsFinished() for clarity in awsathena pkg

* Refine error handling in athena api lambda, add test

* remove input structs from output structs in athena api

* Use genericapi.Invoke instead of dup in testutils

* Do not wait any time in the athena asyc call for fast results (pure async)

* Small tidies from PR

* Change Error struct to QueryError in athena api and use only specifically

* Simpify dealing with pagination token in athena api

* Simplify pkg/awsathena into a libarary of mockable functions

* Update api names per requests in PR and add scaffolding for steps integration

* Add calling sql to output for get results/status

* Minor tidies based on PR feedback

* Checkpoint work adding athena step function

* Rename athena step function

* Add support to generate alarms forn step function errors

* Fix bug in sfn alarm generation

* Checkpoint notification work

* Add appsync support to receieve notifications from athena query completions

* Make getTables() return full schema for tables

* Tighten athena api lambda permissions

* Pass caller-supplied token thru athena workflow to notification

* End to end appsync subscriptions working!

* Update integration test comments

* Add delay at start of athena setp function state machine

* Refactor anthena api integration tests

* Refactor anthena api integration tests (this time for real)

* Remove QueryID from results struct in athena_api

* Update graphQL scheam to make userData filter on subscription manditory on queryDone()

* Add support for canceling queries in athena api

* Improve handling of sql errors in athena_api

* Refacor structs in api s.t. no json tag is used more than 1 time

* Add query stats (execution time & datascanned)  to status output

* Add columnInfo to athena_api results and remove first returned row

* Add GetQueryResultsLink to athena_api to download as csv

* Add integration tests to confirm mutating sql statements fail

* Allow customers to configure Athena access for their non-Panther buckets for the UI

* Light refactor of glue functions in athena_api

* Fix messed up license headers and minor formatting changes

* Fix more license headers

* Handle NULL column values back from queries

* Update comments in the athena_api

* Tidy appsync subscription/norification code

* Fix typo in comment

* Simplify athena api state machine

* Improve athena_api integration tests

* Add userId as an optional attr when starting an athena query (for logging)

* Add queryId and userId logging for traceability and metrics

* More logging refinement to make sure we can trace queries easily

* Minor tidies

* Minor code clean up + make GetQueryResultsLink() return status in output

* Update comment

* Checkpoint work

* Make showing non-Panther tables in SQL Shell configurable

* Update some comments

* Move athena api functions into individual files

* Add usage examples in comments for PrincipalARNs and S3ARNs in panther_config.yml

* Change core stack parameter name PantherTablesOnly to AthenaPantherTablesOnly

* Minor style changes

* Revert change to log level in genericapi invoke

* Change athena_api integration tests from using t.Run() to top level tests

* Move athena api related cloudformation from core stack to log_analysis stack

* Change athena api to use envconfig

* Use GetAtt instead of Sub for AthenaStepFunctionRole to ensure proper dependency

* Remove config option for users to onboard their own buckets to athena api. We may want later, but complicates CF and not needed now

* Refine athena_api s3 perms

* Add required/dedefault tags athena_pai envconfig struct

* Add athena_api integration test to test s3 perms for panther tables

* Make result columns ptrs so null values are nil

* When looking of column names, using json attr mapping table if possible

* Respond to PR: pass step function arn into lambda rather than generating it

* Fix typo in query status cancelled

* Respond to PR: add comments explaining each api endpoint, elborate in comment about IAM appsync perms

* Make the default for listing tables to only show populated tables

* Change unPopulated to IncludePopulatedTablesOnly in listing tables to make boolean positive
  • Loading branch information
rleighton authored Apr 22, 2020
1 parent ec23c4c commit d81bdd6
Show file tree
Hide file tree
Showing 29 changed files with 2,837 additions and 27 deletions.
18 changes: 18 additions & 0 deletions api/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ scalar AWSJSON
schema {
query: Query
mutation: Mutation
subscription: Subscription
}

type Mutation {
Expand All @@ -21,6 +22,7 @@ type Mutation {
deleteRule(input: DeleteRuleInput!): Boolean
deleteUser(id: ID!): Boolean
inviteUser(input: InviteUserInput): User!
queryDone(input: QueryDoneInput!): QueryDone! @aws_iam
remediateResource(input: RemediateResourceInput!): Boolean
resetUserPassword(id: ID!): User!
suppressPolicies(input: SuppressPoliciesInput!): Boolean
Expand Down Expand Up @@ -62,6 +64,10 @@ type Query {
users: [User!]!
}

type Subscription {
queryDone(userData: String!): QueryDone @aws_subscribe(mutations: ["queryDone"]) @aws_iam
}

input GetComplianceIntegrationTemplateInput {
awsAccountId: String!
integrationLabel: String!
Expand Down Expand Up @@ -713,6 +719,18 @@ input InviteUserInput {
email: AWSEmail
}

input QueryDoneInput {
userData: String!
queryId: String!
workflowId: String!
}

type QueryDone @aws_iam {
userData: String!
queryId: String!
workflowId: String!
}

enum ComplianceStatusEnum {
ERROR
FAIL
Expand Down
254 changes: 254 additions & 0 deletions api/lambda/database/models/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package models

/**
* Panther is a Cloud-Native SIEM for the Modern Security Team.
* Copyright (C) 2020 Panther Labs Inc
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

// NOTE: different kinds of databases (e.g., Athena, Snowflake) will use different endpoints (lambda functions), same api.

// NOTE: if a json tag _set_ is used more than once it is factored into a struct to avoid inconsistencies

const (
QuerySucceeded = "succeeded"
QueryFailed = "failed"
QueryRunning = "running"
QueryCancelled = "cancelled"
)

// LambdaInput is the collection of all possible args to the Lambda function.
type LambdaInput struct {
// run a query, returning immediately with an id for the running query
ExecuteAsyncQuery *ExecuteAsyncQueryInput `json:"executeAsyncQuery"`
// run a query, returning immediately with an id for the step function running the query (will invoke lambda callback when done)
ExecuteAsyncQueryNotify *ExecuteAsyncQueryNotifyInput `json:"executeAsyncQueryNotify"`
// run a query, waiting for results
ExecuteQuery *ExecuteQueryInput `json:"executeQuery"`
// list databases
GetDatabases *GetDatabasesInput `json:"getDatabases"`
// given a query id, return paged results
GetQueryResults *GetQueryResultsInput `json:"getQueryResults"`
// given a query id, return a presigned s3 link to the results
GetQueryResultsLink *GetQueryResultsLinkInput `json:"getQueryResultsLink"`
// given a query id, return the status of the query
GetQueryStatus *GetQueryStatusInput `json:"getQueryStatus"`
// given a database, list tables
GetTables *GetTablesInput `json:"getTables"`
// given a database and list of tables, return tables
GetTablesDetail *GetTablesDetailInput `json:"getTablesDetail"`
// given a lambda and method, execute callback for step function
InvokeNotifyLambda *InvokeNotifyLambdaInput `json:"invokeNotifyLambda"`
// used as a callback lambda, will notify appsync that a UI query is complete
NotifyAppSync *NotifyAppSyncInput `json:"notifyAppSync"`
// given a query id, cancel query
StopQuery *StopQueryInput `json:"stopQuery"`
}

type GetDatabasesInput struct {
OptionalName // if nil get all databases
}

// NOTE: we will assume this is small an not paginate
type GetDatabasesOutput struct {
Databases []*NameAndDescription `json:"databases,omitempty"`
}

type GetTablesInput struct {
Database
IncludePopulatedTablesOnly *bool `json:"includePopulatedTablesOnly,omitempty"` // if true OR nil, return only tables that have data
}

// NOTE: we will assume this is small an not paginate
type GetTablesOutput struct {
TablesDetail
}

type TablesDetail struct {
Tables []*TableDetail `json:"tables"`
}

type TableDetail struct {
TableDescription
Columns []*TableColumn `json:"columns"`
}

type TableDescription struct {
Database
NameAndDescription
}

type GetTablesDetailInput struct {
Database
Names []string `json:"names" validate:"required"`
}

// NOTE: we will assume this is small an not paginate
type GetTablesDetailOutput struct {
TablesDetail
}

type TableColumn struct {
NameAndDescription
Type string `json:"type" validate:"required"`
}

type ExecuteAsyncQueryNotifyInput struct {
ExecuteAsyncQueryInput
LambdaInvoke
UserPassThruData
DelaySeconds int `json:"delaySeconds"` // wait this long before starting workflow (default 0)
}

type ExecuteAsyncQueryNotifyOutput struct {
Workflow
}

// Blocking query
type ExecuteQueryInput = ExecuteAsyncQueryInput

type ExecuteQueryOutput = GetQueryResultsOutput // call GetQueryResults() to page through results

type ExecuteAsyncQueryInput struct {
Database
SQLQuery
UserID *string `json:"userId,omitempty"`
}

type ExecuteAsyncQueryOutput struct {
QueryStatus
QueryInfo
}

type GetQueryStatusInput = QueryInfo

type GetQueryStatusOutput struct {
QueryStatus
SQLQuery
Stats *QueryResultsStats `json:"stats,omitempty"` // present only on successful queries
}

type GetQueryResultsInput struct {
QueryInfo
Pagination
PageSize *int64 `json:"pageSize" validate:"omitempty,gt=1,lt=1000"` // only return this many rows per call
// NOTE: gt=1 above to ensure there are results on the first page w/header. If PageSize = 1 then
// user will get no rows for the first page with Athena because Athena returns header as first row and we remove it.
}

type GetQueryResultsOutput struct {
GetQueryStatusOutput
ColumnInfo []*Column `json:"columnInfo" validate:"required"`
ResultsPage QueryResultsPage `json:"resultsPage" validate:"required"`
}

type QueryResultsPage struct {
Pagination
NumRows int `json:"numRows" validate:"required"` // number of rows in page of results, len(Rows)
Rows []*Row `json:"rows" validate:"required"`
}

type QueryResultsStats struct {
ExecutionTimeMilliseconds int64 `json:"executionTimeMilliseconds" validate:"required"`
DataScannedBytes int64 `json:"dataScannedBytes" validate:"required"`
}

type GetQueryResultsLinkInput struct {
QueryInfo
}

type GetQueryResultsLinkOutput struct {
QueryStatus
PresignedLink string `json:"presignedLink"` // presigned s3 link to results
}

type StopQueryInput = QueryInfo

type StopQueryOutput = GetQueryStatusOutput

type InvokeNotifyLambdaInput struct {
LambdaInvoke
QueryInfo
Workflow
UserPassThruData
}

type InvokeNotifyLambdaOutput = InvokeNotifyLambdaInput // so input can be confirmed

type NotifyAppSyncInput struct {
NotifyInput
}

type NotifyAppSyncOutput struct {
StatusCode int `json:"statusCode" validate:"required"` // the http status returned from POSTing callback to appsync
}

type NotifyInput struct { // notify lambdas need to have this as input
GetQueryStatusInput
ExecuteAsyncQueryNotifyOutput
UserPassThruData
}

type NameAndDescription struct {
Name string `json:"name" validate:"required"`
Description *string `json:"description,omitempty"`
}

type OptionalName struct {
Name *string `json:"name,omitempty"`
}

type SQLQuery struct {
SQL string `json:"sql" validate:"required"`
}

type QueryInfo struct {
QueryID string `json:"queryId" validate:"required"`
}

type Database struct {
DatabaseName string `json:"databaseName" validate:"required"`
}

type Row struct {
Columns []*Column `json:"columns" validate:"required"`
}

type Column struct {
Value *string `json:"value"` // NULL values are nil
Type *string `json:"type,omitempty"`
}

type Pagination struct {
PaginationToken *string `json:"paginationToken,omitempty"`
}

type QueryStatus struct {
Status string `json:"status" validate:"required,oneof=running,succeeded,failed,canceled"`
SQLError string `json:"sqlError,omitempty"`
}

type Workflow struct {
WorkflowID string `json:"workflowId" validate:"required"`
}

type UserPassThruData struct {
UserData string `json:"userData" validate:"required,gt=0"` // token passed though to notifications (usually the userid)
}

type LambdaInvoke struct {
LambdaName string `json:"lambdaName" validate:"required"` // the name of the lambda to call when done
MethodName string `json:"methodName" validate:"required"` // the method to call on the lambda
}
24 changes: 24 additions & 0 deletions deployments/appsync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ Resources:
SigningServiceName: execute-api
Endpoint: !Ref RemediationApi

QueryDoneDataSource:
Type: AWS::AppSync::DataSource
Properties:
ApiId: !Ref ApiId
Name: PantherQueryDone
Type: NONE
ServiceRoleArn: !Ref ServiceRole

########## Resolvers ##########

ResetUserPasswordResolver:
Expand Down Expand Up @@ -1492,3 +1500,19 @@ Resources:
#else
$util.error($ctx.result.body, "$statusCode", $input)
#end
QueryDoneResolver:
Type: AWS::AppSync::Resolver
DependsOn: GraphQLSchema
Properties:
ApiId: !Ref ApiId
TypeName: Mutation
FieldName: queryDone # noop that returns {userData,queryId,workflowId} (used for subscriptions to trigger async query completion)
DataSourceName: !GetAtt QueryDoneDataSource.Name
RequestMappingTemplate: |
{
"version" : "2017-02-28",
"payload": $utils.toJson($context.arguments.input)
}
ResponseMappingTemplate: |
$util.toJson($context.result)
2 changes: 2 additions & 0 deletions deployments/bootstrap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,8 @@ Resources:
# * The Panther user interface will show errors.
# </cfndoc>
AuthenticationType: AMAZON_COGNITO_USER_POOLS
AdditionalAuthenticationProviders:
- AuthenticationType: AWS_IAM # this is used for lambda callbacks to AppSync (e.g., to signal Athena query completion)
UserPoolConfig:
AwsRegion: !Ref AWS::Region
UserPoolId: !Ref UserPool
Expand Down
Loading

0 comments on commit d81bdd6

Please sign in to comment.