Skip to content

Commit

Permalink
Add tweet webhook (#24)
Browse files Browse the repository at this point in the history
* Add tweet webhook

* Move
  • Loading branch information
Chrisbattarbee authored Apr 4, 2024
1 parent 709f306 commit 16cc82d
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 20 deletions.
11 changes: 7 additions & 4 deletions common/jobs/riverclient/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,31 @@ import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/metoro-io/statusphere/common/db"
"github.com/metoro-io/statusphere/common/jobs/slack_webhook"
"github.com/metoro-io/statusphere/common/jobs/twitter_post"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
"go.uber.org/zap"
"net/http"
)

func spawnWorkers(logger *zap.Logger, client *http.Client) *river.Workers {
func spawnWorkers(logger *zap.Logger, client *http.Client, dbClient *db.DbClient) *river.Workers {
workers := river.NewWorkers()
river.AddWorker(workers, slack_webhook.NewSlackWebhookWorker(logger, client))
river.AddWorker(workers, twitter_post.NewTwitterPostWorker(logger, client, dbClient))
return workers
}

func NewRiverClient(pool *pgxpool.Pool, logger *zap.Logger, client *http.Client, numWorkers int) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(pool), &river.Config{
func NewRiverClient(dbClient *db.DbClient, logger *zap.Logger, client *http.Client, numWorkers int) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbClient.PgxPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {
MaxWorkers: numWorkers,
},
},
Workers: spawnWorkers(logger, client),
Workers: spawnWorkers(logger, client, dbClient),
})
}

Expand Down
21 changes: 21 additions & 0 deletions common/jobs/twitter_post/twitter_post_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package twitter_post

import (
"github.com/metoro-io/statusphere/common/api"
"github.com/riverqueue/river"
)

type TwitterPostArgs struct {
Incident api.Incident `json:"incident"`
WebhookUrl string `json:"webhook_url"`
}

func (TwitterPostArgs) Kind() string {
return "twitter_post"
}

func (TwitterPostArgs) InsertOpts() river.InsertOpts {
return river.InsertOpts{
MaxAttempts: 10,
}
}
85 changes: 85 additions & 0 deletions common/jobs/twitter_post/twitter_post_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package twitter_post

import (
"bytes"
"context"
"fmt"
"github.com/metoro-io/statusphere/common/api"
"github.com/metoro-io/statusphere/common/db"
"github.com/pkg/errors"
"github.com/riverqueue/river"
"go.uber.org/zap"
"math"
"net/http"
"time"
)

type TwitterPostWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[TwitterPostArgs]
logger *zap.Logger
httpClient *http.Client
db *db.DbClient
}

func NewTwitterPostWorker(logger *zap.Logger, httpClient *http.Client, dbClient *db.DbClient) *TwitterPostWorker {
return &TwitterPostWorker{
logger: logger,
httpClient: httpClient,
db: dbClient,
}
}

func (w *TwitterPostWorker) Work(ctx context.Context, job *river.Job[TwitterPostArgs]) error {
w.logger.Info("Sending slack webhook", zap.Any("incident", job.Args.Incident))
if job.Args.WebhookUrl == "" {
w.logger.Error("webhook url is empty")
return nil
}
tweet, err := generateTweet(w.db, job.Args.Incident)
if err != nil {
return errors.Wrap(err, "failed to generate tweet")
}
postBody := fmt.Sprintf(`{'tweet': '%s'}`, string(tweet))
req, err := http.NewRequest("POST", job.Args.WebhookUrl, bytes.NewBuffer([]byte(postBody)))
if err != nil {
return errors.Wrap(err, "failed to create request")
}
req.Header.Set("Content-Type", "application/json")
resp, err := w.httpClient.Do(req)
if err != nil {
return errors.Wrap(err, "failed to send request")
}
if resp.StatusCode != http.StatusOK {
return errors.Errorf("expected status code 200, got %d", resp.StatusCode)
}
return nil
}

func generateTweet(db *db.DbClient, incident api.Incident) (string, error) {
// Get the status page of the incident
statusPage, err := db.GetStatusPage(context.Background(), incident.StatusPageUrl)
if err != nil {
return "", errors.Wrap(err, "failed to get status page")
}
// Tweet format
// {Status page Name} Incident
// {Incident Title}
// {Incident Description}
// {Incident Deep Link}
// https://metoro.io/statusphere/status/{statusPageName}

tweet := fmt.Sprintf("%s Incident\n%s\n%s\n%s\nhttps://metoro.io/statusphere/status/%s", statusPage.Name, incident.Title, incident.Description, incident.DeepLink, statusPage.Name)

return tweet, nil
}

func (w *TwitterPostWorker) Timeout(job *river.Job[TwitterPostArgs]) time.Duration {
return time.Minute * 5
}

// NextRetry performs exponential backoff with a maximum of 120 seconds.
func (w *TwitterPostWorker) NextRetry(job *river.Job[TwitterPostArgs]) time.Time {
return time.Now().Add(time.Duration(math.Min(math.Pow(2.0, float64(job.Attempt)), 120)) * time.Second)
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ require (
github.com/PuerkitoBio/goquery v1.9.1 // indirect
github.com/andybalholm/cascadia v1.3.2 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b // indirect
github.com/dghubble/oauth1 v0.7.3 // indirect
github.com/dghubble/sling v1.4.0 // indirect
github.com/g8rswimmer/go-twitter/v2 v2.1.5 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/cors v1.7.1 // indirect
github.com/gin-contrib/gzip v1.0.0 // indirect
Expand All @@ -17,6 +22,7 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.19.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX
github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
github.com/bytedance/sonic v1.11.3 h1:jRN+yEjakWh8aK5FzrciUHG8OFXK+4/KrAX/ysEtHAA=
github.com/bytedance/sonic v1.11.3/go.mod h1:iZcSUejdk5aukTND/Eu/ivjQuEL0Cu9/rf50Hi0u/g4=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
Expand All @@ -19,6 +21,14 @@ github.com/chenzhuoyu/iasm v0.9.1/go.mod h1:Xjy2NpN3h7aUqeqM+woSuuvxmIe6+DDsiNLI
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b h1:XQu6o3AwJx/jsg9LZ41uIeUdXK5be099XFfFn6H9ikk=
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b/go.mod h1:B0/qdW5XUupJvcsx40hnVbfjzz9He5YpYXx6eVVdiSY=
github.com/dghubble/oauth1 v0.7.3 h1:EkEM/zMDMp3zOsX2DC/ZQ2vnEX3ELK0/l9kb+vs4ptE=
github.com/dghubble/oauth1 v0.7.3/go.mod h1:oxTe+az9NSMIucDPDCCtzJGsPhciJV33xocHfcR2sVY=
github.com/dghubble/sling v1.4.0 h1:/n8MRosVTthvMbwlNZgLx579OGVjUOy3GNEv5BIqAWY=
github.com/dghubble/sling v1.4.0/go.mod h1:0r40aNsU9EdDUVBNhfCstAtFgutjgJGYbO1oNzkMoM8=
github.com/g8rswimmer/go-twitter/v2 v2.1.5 h1:Uj9Yuof2UducrP4Xva7irnUJfB9354/VyUXKmc2D5gg=
github.com/g8rswimmer/go-twitter/v2 v2.1.5/go.mod h1:/55xWb313KQs25X7oZrNSEwLQNkYHhPsDwFstc45vhc=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
Expand All @@ -42,7 +52,10 @@ github.com/go-playground/validator/v10 v10.19.0/go.mod h1:dbuPbCMFw/DrkbEynArYaC
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
Expand Down
3 changes: 2 additions & 1 deletion jobrunner/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package config
import "github.com/kelseyhightower/envconfig"

type Config struct {
SlackWebhookUrl string `envconfig:"SLACK_WEBHOOK_URL"`
SlackWebhookUrl string `envconfig:"SLACK_WEBHOOK_URL"`
TwitterWebhookUrl string `envconfig:"TWITTER_WEBHOOK_URL"`
}

func GetConfigFromEnvironment() (Config, error) {
Expand Down
48 changes: 35 additions & 13 deletions jobrunner/internal/incidentpoller/incidentpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package incidentpoller
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/metoro-io/statusphere/common/api"
"github.com/metoro-io/statusphere/common/db"
"github.com/metoro-io/statusphere/common/jobs/slack_webhook"
"github.com/metoro-io/statusphere/common/jobs/twitter_post"
"github.com/pkg/errors"
"github.com/riverqueue/river"
"go.uber.org/zap"
Expand All @@ -14,18 +16,20 @@ import (
// IncidentPoller is a poller that polls incidents from the database, if the jobs have not been started for them, then it starts them

type IncidentPoller struct {
db *db.DbClient
logger *zap.Logger
riverClient *river.Client[pgx.Tx]
slackWebhookUrl string
db *db.DbClient
logger *zap.Logger
riverClient *river.Client[pgx.Tx]
slackWebhookUrl string
twitterWebhookUrl string
}

func NewIncidentPoller(db *db.DbClient, logger *zap.Logger, client *river.Client[pgx.Tx], webhookUrl string) *IncidentPoller {
func NewIncidentPoller(db *db.DbClient, logger *zap.Logger, client *river.Client[pgx.Tx], slackWebhookUrl string, twitterWebhookUrl string) *IncidentPoller {
return &IncidentPoller{
db: db,
logger: logger,
riverClient: client,
slackWebhookUrl: webhookUrl,
db: db,
logger: logger,
riverClient: client,
slackWebhookUrl: slackWebhookUrl,
twitterWebhookUrl: twitterWebhookUrl,
}
}

Expand Down Expand Up @@ -64,11 +68,9 @@ func (p *IncidentPoller) pollInner() error {
p.logger.Info("found incidents without jobs started", zap.Int("count", len(incidents)))

jobArgs := make([]river.InsertManyParams, 0)
for _, incident := range incidents {
if p.slackWebhookUrl == "" {
continue
}

var incidentsToProcess = make([]api.Incident, 0)
for _, incident := range incidents {
// We only want to notify about incidents that have started in the last hour
// Otherwise, we will be sending notifications for incidents that have already been resolved
if incident.StartTime.Before(time.Now().Add(-1 * time.Hour)) {
Expand All @@ -78,12 +80,32 @@ func (p *IncidentPoller) pollInner() error {
if incident.Impact == "maintenance" {
continue
}

incidentsToProcess = append(incidentsToProcess, incident)
}

// Slack webhook notifications
for _, incident := range incidentsToProcess {\
if p.slackWebhookUrl == "" {
continue
}
jobArgs = append(jobArgs, river.InsertManyParams{Args: slack_webhook.SlackWebhookArgs{
Incident: incident,
WebhookUrl: p.slackWebhookUrl,
}})
}

// Twitter post notifications
for _, incident := range incidentsToProcess {
if p.twitterWebhookUrl == "" {
continue
}
jobArgs = append(jobArgs, river.InsertManyParams{Args: twitter_post.TwitterPostArgs{
WebhookUrl: p.twitterWebhookUrl,
Incident: incident,
}})
}

p.logger.Info("starting to insert jobs", zap.Int("count", len(jobArgs)))
// Start the jobs for each incident and update the database
if len(jobArgs) != 0 {
Expand Down
4 changes: 2 additions & 2 deletions jobrunner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {

err = riverclient.RunMigration(db.PgxPool)

client, err := riverclient.NewRiverClient(db.PgxPool, logger, http.DefaultClient, 100)
client, err := riverclient.NewRiverClient(db, logger, http.DefaultClient, 100)
if err != nil {
panic(err)
}
Expand All @@ -45,7 +45,7 @@ func main() {
panic(err)
}

incidentPoller := incidentpoller.NewIncidentPoller(db, logger, client, config.SlackWebhookUrl)
incidentPoller := incidentpoller.NewIncidentPoller(db, logger, client, config.SlackWebhookUrl, config.TwitterWebhookUrl)
incidentPoller.Start()

// Work forever
Expand Down

0 comments on commit 16cc82d

Please sign in to comment.