From 2c1be2d419b63bbc0fe02e289fcf761462da24f1 Mon Sep 17 00:00:00 2001 From: Franky <75002277+FrankLi123@users.noreply.github.com> Date: Tue, 15 Oct 2024 05:52:31 +0800 Subject: [PATCH] feat(worker/mastodon): make mastodon worker revision and add federated api modification (#574) Co-authored-by: brucexc --- internal/database/client.go | 1 + internal/database/dialer/postgres/client.go | 8 + .../dialer/postgres/client_partitioned.go | 231 ++++++++++++++++++ internal/database/model/activity.go | 20 ++ .../federated/activitypub/mastodon/worker.go | 71 ++++-- .../activitypub/mastodon/worker_test.go | 73 +++++- internal/node/component/federated/data.go | 4 +- .../component/federated/handler_activity.go | 54 ++-- .../federated/handler_network_activity.go | 24 +- .../federated/handler_platform_activity.go | 8 +- .../federated/transformer_social_type.go | 57 +++-- provider/activitypub/model.go | 17 +- 12 files changed, 477 insertions(+), 91 deletions(-) diff --git a/internal/database/client.go b/internal/database/client.go index 1e5a2f0e..ab27e5bb 100644 --- a/internal/database/client.go +++ b/internal/database/client.go @@ -30,6 +30,7 @@ type Client interface { SaveActivities(ctx context.Context, activities []*activityx.Activity) error FindActivity(ctx context.Context, query model.ActivityQuery) (*activityx.Activity, *int, error) FindActivities(ctx context.Context, query model.ActivitiesQuery) ([]*activityx.Activity, error) + FindFederatedActivities(ctx context.Context, query model.FederatedActivitiesQuery) ([]*activityx.Activity, error) DeleteExpiredActivities(ctx context.Context, network network.Network, timestamp time.Time) error } diff --git a/internal/database/dialer/postgres/client.go b/internal/database/dialer/postgres/client.go index 1716ae26..0751d8b7 100644 --- a/internal/database/dialer/postgres/client.go +++ b/internal/database/dialer/postgres/client.go @@ -293,6 +293,14 @@ func (c *client) FindActivities(ctx context.Context, query model.ActivitiesQuery return nil, fmt.Errorf("not implemented") } +func (c *client) FindFederatedActivities(ctx context.Context, query model.FederatedActivitiesQuery) ([]*activityx.Activity, error) { + if c.partition { + return c.findFederatedActivitiesPartitioned(ctx, query) + } + + return nil, fmt.Errorf("not implemented") +} + // DeleteExpiredActivities deletes expired activities. func (c *client) DeleteExpiredActivities(ctx context.Context, network networkx.Network, timestamp time.Time) error { if c.partition { diff --git a/internal/database/dialer/postgres/client_partitioned.go b/internal/database/dialer/postgres/client_partitioned.go index 880add11..791ee131 100644 --- a/internal/database/dialer/postgres/client_partitioned.go +++ b/internal/database/dialer/postgres/client_partitioned.go @@ -252,6 +252,75 @@ func (c *client) findActivitiesPartitioned(ctx context.Context, query model.Acti return result, nil } +// findActivitiesPartitioned finds activities. +func (c *client) findFederatedActivitiesPartitioned(ctx context.Context, query model.FederatedActivitiesQuery) ([]*activityx.Activity, error) { + indexes, err := c.findFederatedIndexesPartitioned(ctx, query) + if err != nil { + return nil, fmt.Errorf("find indexes: %w", err) + } + + partition := lo.GroupBy(indexes, func(query *table.Index) string { + activity := table.Activity{ + ID: query.ID, + Network: query.Network, + Timestamp: query.Timestamp, + } + + return activity.PartitionName(nil) + }) + + var ( + result = make([]*activityx.Activity, 0) + locker sync.Mutex + ) + + errorGroup, errorCtx := errgroup.WithContext(ctx) + + for tableName, index := range partition { + tableName, index := tableName, index + + ids := lo.Map(index, func(index *table.Index, _ int) string { + return index.ID + }) + + errorGroup.Go(func() error { + tableActivities := make(table.Activities, 0) + + if err := c.database.WithContext(errorCtx).Table(tableName).Where("id IN ?", lo.Uniq(ids)).Find(&tableActivities).Error; err != nil { + zap.L().Error("failed to find activities", zap.Error(err), zap.String("tableName", tableName)) + + return err + } + + locker.Lock() + defer locker.Unlock() + + activities, err := tableActivities.Export(index) + if err != nil { + return err + } + + result = append(result, activities...) + + return nil + }) + } + + if err := errorGroup.Wait(); err != nil { + return nil, err + } + + lo.ForEach(result, func(activity *activityx.Activity, i int) { + result[i].Actions = lo.Slice(activity.Actions, 0, query.ActionLimit) + }) + + sort.SliceStable(result, func(i, j int) bool { + return result[i].Timestamp > result[j].Timestamp + }) + + return result, nil +} + // saveIndexesPartitioned saves indexes in partitioned tables. func (c *client) saveIndexesPartitioned(ctx context.Context, activities []*activityx.Activity) error { indexes := make(table.Indexes, 0) @@ -513,6 +582,109 @@ func (c *client) findIndexesPartitioned(ctx context.Context, query model.Activit } } +//nolint:gocognit +func (c *client) findFederatedIndexesPartitioned(ctx context.Context, query model.FederatedActivitiesQuery) ([]*table.Index, error) { + index := table.Index{ + Timestamp: time.Now(), + } + + if query.EndTimestamp != nil && *query.EndTimestamp > 0 && *query.EndTimestamp < uint64(index.Timestamp.Unix()) { + index.Timestamp = time.Unix(int64(lo.FromPtr(query.EndTimestamp)), 0) + } + + if query.Cursor != nil && query.Cursor.Timestamp < uint64(index.Timestamp.Unix()) { + index.Timestamp = time.Unix(int64(query.Cursor.Timestamp), 0) + } + + partitionedNames := c.findIndexesPartitionTables(ctx, index) + + if len(partitionedNames) == 0 { + return nil, nil + } + + ctx, cancel := context.WithCancel(ctx) + errorGroup, errorContext := errgroup.WithContext(ctx) + indexes := make([][]*table.Index, len(partitionedNames)) + resultChan := make(chan int, len(partitionedNames)) + errorChan := make(chan error) + stopChan := make(chan struct{}) + + var mutex sync.RWMutex + + for partitionedIndex, partitionedName := range partitionedNames { + partitionedIndex, partitionedName := partitionedIndex, partitionedName + + errorGroup.Go(func() error { + var result []*table.Index + + databaseStatement := c.buildFederatedFindIndexesStatement(errorContext, partitionedName, query) + + if err := databaseStatement.Find(&result).Error; err != nil { + return fmt.Errorf("failed to find indexes: %w", err) + } + + mutex.Lock() + indexes[partitionedIndex] = result + mutex.Unlock() + + select { + case <-stopChan: + return nil + case resultChan <- partitionedIndex: + return nil + } + }) + } + + go func() { + defer close(errorChan) + + errorChan <- errorGroup.Wait() + }() + + defer func() { + cancel() + }() + + for { + select { + case err := <-errorChan: + if err != nil { + return nil, fmt.Errorf("failed to wait result: %w", err) + } + case <-resultChan: + result := make([]*table.Index, 0, query.Limit) + flag := true + + mutex.RLock() + + for _, data := range indexes { + data := data + + if data == nil { + flag = false + break + } + + result = append(result, data...) + + if len(result) >= query.Limit { + close(stopChan) + mutex.RUnlock() + + return result[:query.Limit], nil + } + } + + mutex.RUnlock() + + if flag { + return result, nil + } + } + } +} + // deleteExpiredActivitiesPartitioned deletes expired activities. func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network network.Network, timestamp time.Time) error { var ( @@ -697,6 +869,65 @@ func (c *client) buildFindIndexesStatement(ctx context.Context, partition string return databaseStatement.Order("timestamp DESC, index DESC").Limit(query.Limit) } +// buildFindIndexesStatement builds the query indexes statement. +func (c *client) buildFederatedFindIndexesStatement(ctx context.Context, partition string, query model.FederatedActivitiesQuery) *gorm.DB { + databaseStatement := c.database.WithContext(ctx).Table(partition) + + if query.Distinct != nil && lo.FromPtr(query.Distinct) { + databaseStatement = databaseStatement.Select("DISTINCT (id) id, timestamp, index, network") + } + + if query.Owner != nil { + databaseStatement = databaseStatement.Where("owner = ?", query.Owner) + } + + if len(query.Owners) > 0 { + databaseStatement = databaseStatement.Where("owner IN ?", query.Owners) + } + + if query.Status != nil { + databaseStatement = databaseStatement.Where("status = ?", query.Status) + } + + if query.Direction != nil { + databaseStatement = databaseStatement.Where("direction = ?", query.Direction) + } + + if query.StartTimestamp != nil && *query.StartTimestamp > 0 { + databaseStatement = databaseStatement.Where("timestamp >= ?", time.Unix(int64(*query.StartTimestamp), 0)) + } + + if query.EndTimestamp != nil && *query.EndTimestamp > 0 { + databaseStatement = databaseStatement.Where("timestamp <= ?", time.Unix(int64(*query.EndTimestamp), 0)) + } + + if query.Platform != "" { + databaseStatement = databaseStatement.Where("platform = ?", query.Platform) + } + + if len(query.Platforms) > 0 { + databaseStatement = databaseStatement.Where("platform IN ?", query.Platforms) + } + + if len(query.Tags) > 0 { + databaseStatement = databaseStatement.Where("tag IN ?", query.Tags) + } + + if len(query.Types) > 0 { + databaseStatement = databaseStatement.Where("type IN ?", query.Types) + } + + if len(query.Network) > 0 { + databaseStatement = databaseStatement.Where("network IN ?", query.Network) + } + + if query.Cursor != nil && query.Cursor.Timestamp > 0 { + databaseStatement = databaseStatement.Where("timestamp < ? OR (timestamp = ? AND index < ?)", time.Unix(int64(query.Cursor.Timestamp), 0), time.Unix(int64(query.Cursor.Timestamp), 0), query.Cursor.Index) + } + + return databaseStatement.Order("timestamp DESC, index DESC").Limit(query.Limit) +} + // buildActivitiesTableNames builds the activities table names. func (c *client) buildActivitiesTableNames(network network.Network, timestamp time.Time) string { return fmt.Sprintf("%s_%s_%d_q%d", (*table.Activity).TableName(nil), network, timestamp.Year(), int(timestamp.Month()-1)/3+1) diff --git a/internal/database/model/activity.go b/internal/database/model/activity.go index b71b99e6..3804df37 100644 --- a/internal/database/model/activity.go +++ b/internal/database/model/activity.go @@ -2,6 +2,7 @@ package model import ( "github.com/rss3-network/node/schema/worker/decentralized" + "github.com/rss3-network/node/schema/worker/federated" "github.com/rss3-network/protocol-go/schema" activityx "github.com/rss3-network/protocol-go/schema/activity" "github.com/rss3-network/protocol-go/schema/network" @@ -34,3 +35,22 @@ type ActivitiesQuery struct { Limit int ActionLimit int } + +type FederatedActivitiesQuery struct { + Owner *string + Cursor *activityx.Activity + Status *bool + Direction *activityx.Direction + StartTimestamp *uint64 + EndTimestamp *uint64 + Platform string + Owners []string + Network []network.Network + Tags []tag.Tag + Types []schema.Type + Platforms []federated.Platform + Distinct *bool + RelatedActions *bool + Limit int + ActionLimit int +} diff --git a/internal/engine/worker/federated/activitypub/mastodon/worker.go b/internal/engine/worker/federated/activitypub/mastodon/worker.go index 48de63a1..92a7c3ee 100644 --- a/internal/engine/worker/federated/activitypub/mastodon/worker.go +++ b/internal/engine/worker/federated/activitypub/mastodon/worker.go @@ -111,7 +111,6 @@ func (w *worker) handleActivityPubCreate(ctx context.Context, message activitypu currentUserHandle := convertURLToHandle(message.ID) - // ToDo: Discuss handling messages with multiple objects in the object field: should we create an activity for each, or separate them into distinct actions? (Transform() processes only one activity at a time) for _, currentNoteObject := range noteObjects { if err := w.handleSingleActivityPubCreate(ctx, message, currentNoteObject, activity, currentUserHandle); err != nil { return err @@ -170,8 +169,16 @@ func (w *worker) handleSingleActivityPubCreate(ctx context.Context, message acti } post.Target = w.buildTarget(replyToURLID, targetContent, targetTime) - + post.TargetURL = replyToURLID toUserHandle = convertURLToHandle(replyToURLID) + + if result != nil && result.Attachments != nil { + w.buildPostMedia(post.Target, result.Attachments) + } + + if result != nil && result.Tags != nil { + w.buildPostTags(post.Target, result.Tags) + } } activity.To = toUserHandle @@ -264,6 +271,16 @@ func (w *worker) handleSingleActivityPubAnnounce(ctx context.Context, message ac PublicationID: publicationID, Timestamp: w.parseTimestamp(message.Published), Target: w.buildTarget(sharedID, targetContent, targetTime), + TargetURL: sharedID, + } + + // set Attachment and Tags to target metadata + if result != nil && result.Attachments != nil { + w.buildPostMedia(post.Target, result.Attachments) + } + + if result != nil && result.Tags != nil { + w.buildPostTags(post.Target, result.Tags) } // Remove the "/activity" suffix from the message ID @@ -416,6 +433,14 @@ func (w *worker) buildPostMedia(post *metadata.SocialPost, attachments interface processAttachment(attachment) } } + case []activitypub.Attachment: + for _, attachment := range v { + media := metadata.Media{ + Address: attachment.URL, + MimeType: attachment.MediaType, + } + post.Media = append(post.Media, media) + } default: zap.L().Debug("Unexpected attachments type", zap.String("type", fmt.Sprintf("%T", attachments))) } @@ -423,30 +448,38 @@ func (w *worker) buildPostMedia(post *metadata.SocialPost, attachments interface // buildPostTags builds the Tags field in the metadata func (w *worker) buildPostTags(post *metadata.SocialPost, tags interface{}) { - processTag := func(tag map[string]interface{}) { - if tagType, ok := tag[mastodon.TagType].(string); ok { - if name, ok := tag[mastodon.TagName].(string); ok { - switch tagType { - case mastodon.TagTypeHashtag: - post.Tags = append(post.Tags, strings.TrimPrefix(name, "#")) - case mastodon.TagTypeMention: - post.Tags = append(post.Tags, "@"+strings.TrimPrefix(name, "@")) - } - } + processTag := func(tagType, name string) { + switch tagType { + case mastodon.TagTypeHashtag: + post.Tags = append(post.Tags, name) + case mastodon.TagTypeMention: + post.Tags = append(post.Tags, "@"+strings.TrimPrefix(name, "@")) } } switch v := tags.(type) { case []map[string]interface{}: - for _, currentTag := range v { - processTag(currentTag) + for _, tag := range v { + if tagType, ok := tag[mastodon.TagType].(string); ok { + if name, ok := tag[mastodon.TagName].(string); ok { + processTag(tagType, name) + } + } } case []interface{}: for _, t := range v { - if currentTag, ok := t.(map[string]interface{}); ok { - processTag(currentTag) + if tag, ok := t.(map[string]interface{}); ok { + if tagType, ok := tag[mastodon.TagType].(string); ok { + if name, ok := tag[mastodon.TagName].(string); ok { + processTag(tagType, name) + } + } } } + case []activitypub.Tag: + for _, t := range v { + processTag(t.Type, t.Name) + } default: zap.L().Debug("Unexpected tags type", zap.String("type", fmt.Sprintf("%T", tags))) } @@ -604,8 +637,10 @@ func (w *worker) getParentStatusByParentID(ctx context.Context, parentID string) // Return the status content and timestamp as a StatusResult return &activitypub.StatusResult{ - Content: status.Object.Content, - Timestamp: status.Object.Published.Format(time.RFC3339), + Content: status.Object.Content, + Timestamp: status.Object.Published.Format(time.RFC3339), + Attachments: status.Object.Attachment, + Tags: status.Object.Tag, }, nil } diff --git a/internal/engine/worker/federated/activitypub/mastodon/worker_test.go b/internal/engine/worker/federated/activitypub/mastodon/worker_test.go index d318acb3..9b303790 100644 --- a/internal/engine/worker/federated/activitypub/mastodon/worker_test.go +++ b/internal/engine/worker/federated/activitypub/mastodon/worker_test.go @@ -363,7 +363,7 @@ func TestWorker(t *testing.T) { Body: "@ Sustainable2050 ( https://mastodon.energy/@Sustainable2050 ) who ever could have expected that?", Handle: "@DJGummikuh@mastodon.social", Timestamp: 1728633871, - Tags: []string{"@Sustainable2050@mastodon.energy", "@DJGummikuh@mastodon.social", "ubernachtungen"}, + Tags: []string{"@Sustainable2050@mastodon.energy", "@DJGummikuh@mastodon.social", "#ubernachtungen"}, }, RelatedURLs: []string{"https://mastodon.social/users/DJGummikuh/statuses/113287749405285684"}, }, @@ -425,6 +425,7 @@ func TestWorker(t *testing.T) { Timestamp: 1728633754, Body: "賢治に毒されて各駅にエスペラントで愛称が付けられている釜石線", }, + TargetURL: "https://pawoo.net/users/cs133/statuses/113287741711853329", Timestamp: 1728633754, }, RelatedURLs: []string{"https://social.timespiral.co.jp/users/find575/statuses/113287741759175713"}, @@ -434,6 +435,75 @@ func TestWorker(t *testing.T) { }, wantError: require.NoError, }, + { + name: "Create A Comment whose target Post contains Media", + arguments: arguments{ + task: &activitypub.Task{ + Network: network.Mastodon, + Message: message.Object{ + Context: []interface{}{ + "https://www.w3.org/ns/activitystreams", + }, + ID: "https://social.timespiral.co.jp/users/find575/statuses/113287741759175713/activity", + Type: "Create", + Published: "2024-10-11T08:02:34Z", + Actor: "https://social.timespiral.co.jp/users/find575", + Object: map[string]interface{}{ + "type": "Note", + "id": "https://social.timespiral.co.jp/users/find575/statuses/113287741759175713", + "content": "

@cs133 俳句を発見しました!
『各駅に エスペラントで 愛称が』

", + "inReplyTo": "https://mastodon.world/users/ctxt/statuses/113287758853959995", + "published": "2024-10-11T08:02:34Z", + "to": []string{ + "https://www.w3.org/ns/activitystreams#Public", + }, + }, + }, + }, + }, + want: &activity.Activity{ + ID: "https://social.timespiral.co.jp/users/find575/statuses/113287741759175713/activity", + Network: network.Mastodon, + Platform: federated.PlatformMastodon.String(), + From: "@find575@social.timespiral.co.jp", + To: "@ctxt@mastodon.world", + Type: typex.SocialComment, + Tag: tag.Social, + TotalActions: 1, + Status: true, + Actions: []*activity.Action{ + { + Type: typex.SocialComment, + Tag: tag.Social, + Platform: federated.PlatformMastodon.String(), + From: "@find575@social.timespiral.co.jp", + To: "@ctxt@mastodon.world", + Metadata: &metadata.SocialPost{ + PublicationID: "113287741759175713", + Handle: "@find575@social.timespiral.co.jp", + Body: "@ cs133 ( https://pawoo.net/@cs133 ) 俳句を発見しました!\n『各駅に エスペラントで 愛称が』", + Timestamp: 1728633754, + Target: &metadata.SocialPost{ + Handle: "@ctxt@mastodon.world", + Body: "Y por si se les pasó, les dejamos también la pieza que nos mandó Lola Matamala contándonos en primera persona el desahucio que sufrió su madre el mes pasado después de llevar setenta años viviendo en la misma calle.\n\nLa empresa Dapamali Works compró el edifio y fue echando uno por uno a todos los vecinos:\n\nhttps:// ctxt.es/es/20241001/Firmas/476 02/lola-matamala-desahucio-Dapamali-Works-primera-persona-aurora-getafe.htm ( https://ctxt.es/es/20241001/Firmas/47602/lola-matamala-desahucio-Dapamali-Works-primera-persona-aurora-getafe.htm )", + PublicationID: "113287758853959995", + Timestamp: 1728634015, + Media: []metadata.Media{ + { + Address: "https://s3.eu-central-2.wasabisys.com/mastodonworld/media_attachments/files/113/287/758/697/714/115/original/2c6c341deae150c0.jpg", + MimeType: "image/jpeg", + }, + }, + }, + TargetURL: "https://mastodon.world/users/ctxt/statuses/113287758853959995", + }, + RelatedURLs: []string{"https://social.timespiral.co.jp/users/find575/statuses/113287741759175713"}, + }, + }, + Timestamp: 1728633754, + }, + wantError: require.NoError, + }, { name: "Share A Post", arguments: arguments{ @@ -477,6 +547,7 @@ func TestWorker(t *testing.T) { Timestamp: 1728609391, Body: "THE CYBER RESILIENCE ACT HAS BEEN ADOPTED BY THE COUNCIL OF THE EUROPEAN UNION!\n\nThe EU is weeks away from becoming the first jurisdiction with a bespoke regulatory framework for the product security _and_ labelling of all software sold commercially in the EU (save stuff covered by other EU rules like cars and healthtech).\n\nYes, the Yanks (via EO14028–>NIST) defined critical software (the CRA has ‘important products with digital elements’ and ‘critical products with digital elements’), but the Yanks, for now at least, have only gone down the procurement route for regulating vendor SDLCs. The EU, on the other hand, is covering everything sold commercially (bar the stated exceptions) to anyone in the EU.\n\nBig day for all us SDLC regulation people!\n\nWhat happens next: Council and EuroParl President sign it —> Publication in the EU OJ —> Entry into force 20 days later —> Application of most provisions 36 months later.\n\nPress release (includes link to final text): https://www. consilium.europa.eu/en/press/p ress-releases/2024/10/10/cyber-resilience-act-council-adopts-new-law-on-security-requirements-for-digital-products/ ( https://www.consilium.europa.eu/en/press/press-releases/2024/10/10/cyber-resilience-act-council-adopts-new-law-on-security-requirements-for-digital-products/ )", }, + TargetURL: "https://infosec.exchange/users/ravirockks/statuses/113286145052908177", Timestamp: 1728635306, }, RelatedURLs: []string{"https://fosstodon.org/users/bert_hubert/statuses/113287843427757878"}, diff --git a/internal/node/component/federated/data.go b/internal/node/component/federated/data.go index 64cdf2cd..601099f1 100644 --- a/internal/node/component/federated/data.go +++ b/internal/node/component/federated/data.go @@ -16,8 +16,8 @@ func (c *Component) getActivity(ctx context.Context, request model.ActivityQuery return c.databaseClient.FindActivity(ctx, request) } -func (c *Component) getActivities(ctx context.Context, request model.ActivitiesQuery) ([]*activityx.Activity, string, error) { - activities, err := c.databaseClient.FindActivities(ctx, request) +func (c *Component) getActivities(ctx context.Context, request model.FederatedActivitiesQuery) ([]*activityx.Activity, string, error) { + activities, err := c.databaseClient.FindFederatedActivities(ctx, request) if err != nil { return nil, "", fmt.Errorf("failed to find activities: %w", err) } diff --git a/internal/node/component/federated/handler_activity.go b/internal/node/component/federated/handler_activity.go index 84034f8b..cdf30521 100644 --- a/internal/node/component/federated/handler_activity.go +++ b/internal/node/component/federated/handler_activity.go @@ -10,7 +10,7 @@ import ( "github.com/labstack/echo/v4" "github.com/rss3-network/node/common/http/response" "github.com/rss3-network/node/internal/database/model" - "github.com/rss3-network/node/schema/worker/decentralized" + "github.com/rss3-network/node/schema/worker/federated" "github.com/rss3-network/protocol-go/schema" activityx "github.com/rss3-network/protocol-go/schema/activity" "github.com/rss3-network/protocol-go/schema/network" @@ -99,7 +99,7 @@ func (c *Component) GetAccountActivities(ctx echo.Context) (err error) { return response.InternalError(ctx) } - databaseRequest := model.ActivitiesQuery{ + databaseRequest := model.FederatedActivitiesQuery{ Cursor: cursor, StartTimestamp: request.SinceTimestamp, EndTimestamp: request.UntilTimestamp, @@ -161,7 +161,7 @@ func (c *Component) BatchGetAccountsActivities(ctx echo.Context) (err error) { return response.InternalError(ctx) } - databaseRequest := model.ActivitiesQuery{ + databaseRequest := model.FederatedActivitiesQuery{ Cursor: cursor, StartTimestamp: request.SinceTimestamp, EndTimestamp: request.UntilTimestamp, @@ -249,33 +249,33 @@ type ActivityRequest struct { } type AccountActivitiesRequest struct { - Account string `param:"account" validate:"required"` - Limit int `query:"limit" validate:"min=1,max=100" default:"100"` - ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"` - Cursor *string `query:"cursor"` - SinceTimestamp *uint64 `query:"since_timestamp"` - UntilTimestamp *uint64 `query:"until_timestamp"` - Status *bool `query:"success"` - Direction *activityx.Direction `query:"direction"` - Network []network.Network `query:"network"` - Tag []tag.Tag `query:"tag"` - Type []schema.Type `query:"-"` - Platform []decentralized.Platform `query:"platform"` + Account string `param:"account" validate:"required"` + Limit int `query:"limit" validate:"min=1,max=100" default:"100"` + ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"` + Cursor *string `query:"cursor"` + SinceTimestamp *uint64 `query:"since_timestamp"` + UntilTimestamp *uint64 `query:"until_timestamp"` + Status *bool `query:"success"` + Direction *activityx.Direction `query:"direction"` + Network []network.Network `query:"network"` + Tag []tag.Tag `query:"tag"` + Type []schema.Type `query:"-"` + Platform []federated.Platform `query:"platform"` } type AccountsActivitiesRequest struct { - Accounts []string `json:"accounts" validate:"required"` - Limit int `json:"limit" validate:"min=1,max=100" default:"100"` - ActionLimit int `json:"action_limit" validate:"min=1,max=20" default:"10"` - Cursor *string `json:"cursor"` - SinceTimestamp *uint64 `json:"since_timestamp"` - UntilTimestamp *uint64 `json:"until_timestamp"` - Status *bool `json:"success"` - Direction *activityx.Direction `json:"direction"` - Network []network.Network `json:"network"` - Tag []tag.Tag `json:"tag"` - Type []string `json:"type"` - Platform []decentralized.Platform `json:"platform"` + Accounts []string `json:"accounts" validate:"required"` + Limit int `json:"limit" validate:"min=1,max=100" default:"100"` + ActionLimit int `json:"action_limit" validate:"min=1,max=20" default:"10"` + Cursor *string `json:"cursor"` + SinceTimestamp *uint64 `json:"since_timestamp"` + UntilTimestamp *uint64 `json:"until_timestamp"` + Status *bool `json:"success"` + Direction *activityx.Direction `json:"direction"` + Network []network.Network `json:"network"` + Tag []tag.Tag `json:"tag"` + Type []string `json:"type"` + Platform []federated.Platform `json:"platform"` } type ActivityResponse struct { diff --git a/internal/node/component/federated/handler_network_activity.go b/internal/node/component/federated/handler_network_activity.go index c13640ca..3184f18e 100644 --- a/internal/node/component/federated/handler_network_activity.go +++ b/internal/node/component/federated/handler_network_activity.go @@ -7,7 +7,7 @@ import ( "github.com/labstack/echo/v4" "github.com/rss3-network/node/common/http/response" "github.com/rss3-network/node/internal/database/model" - "github.com/rss3-network/node/schema/worker/decentralized" + "github.com/rss3-network/node/schema/worker/federated" "github.com/rss3-network/protocol-go/schema" activityx "github.com/rss3-network/protocol-go/schema/activity" "github.com/rss3-network/protocol-go/schema/network" @@ -48,7 +48,7 @@ func (c *Component) GetNetworkActivities(ctx echo.Context) (err error) { return response.InternalError(ctx) } - databaseRequest := model.ActivitiesQuery{ + databaseRequest := model.FederatedActivitiesQuery{ Cursor: cursor, StartTimestamp: request.SinceTimestamp, EndTimestamp: request.UntilTimestamp, @@ -80,14 +80,14 @@ func (c *Component) GetNetworkActivities(ctx echo.Context) (err error) { type NetworkActivitiesRequest struct { Network network.Network `param:"network" validate:"required"` - Limit int `query:"limit" validate:"min=1,max=100" default:"100"` - ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"` - Cursor *string `query:"cursor"` - SinceTimestamp *uint64 `query:"since_timestamp"` - UntilTimestamp *uint64 `query:"until_timestamp"` - Status *bool `query:"success"` - Direction *activityx.Direction `query:"direction"` - Tag []tag.Tag `query:"tag"` - Type []schema.Type `query:"-"` - Platform []decentralized.Platform `query:"platform"` + Limit int `query:"limit" validate:"min=1,max=100" default:"100"` + ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"` + Cursor *string `query:"cursor"` + SinceTimestamp *uint64 `query:"since_timestamp"` + UntilTimestamp *uint64 `query:"until_timestamp"` + Status *bool `query:"success"` + Direction *activityx.Direction `query:"direction"` + Tag []tag.Tag `query:"tag"` + Type []schema.Type `query:"-"` + Platform []federated.Platform `query:"platform"` } diff --git a/internal/node/component/federated/handler_platform_activity.go b/internal/node/component/federated/handler_platform_activity.go index 84ac7bf4..5372fe1e 100644 --- a/internal/node/component/federated/handler_platform_activity.go +++ b/internal/node/component/federated/handler_platform_activity.go @@ -7,7 +7,7 @@ import ( "github.com/labstack/echo/v4" "github.com/rss3-network/node/common/http/response" "github.com/rss3-network/node/internal/database/model" - "github.com/rss3-network/node/schema/worker/decentralized" + "github.com/rss3-network/node/schema/worker/federated" "github.com/rss3-network/protocol-go/schema" activityx "github.com/rss3-network/protocol-go/schema/activity" "github.com/rss3-network/protocol-go/schema/network" @@ -48,7 +48,7 @@ func (c *Component) GetPlatformActivities(ctx echo.Context) (err error) { return response.InternalError(ctx) } - databaseRequest := model.ActivitiesQuery{ + databaseRequest := model.FederatedActivitiesQuery{ Cursor: cursor, StartTimestamp: request.SinceTimestamp, EndTimestamp: request.UntilTimestamp, @@ -59,7 +59,7 @@ func (c *Component) GetPlatformActivities(ctx echo.Context) (err error) { Network: lo.Uniq(request.Network), Tags: lo.Uniq(request.Tag), Types: lo.Uniq(request.Type), - Platforms: []decentralized.Platform{request.Platform}, + Platforms: []federated.Platform{request.Platform}, } activities, last, err := c.getActivities(ctx.Request().Context(), databaseRequest) @@ -78,7 +78,7 @@ func (c *Component) GetPlatformActivities(ctx echo.Context) (err error) { } type PlatformActivitiesRequest struct { - Platform decentralized.Platform `param:"platform" validate:"required"` + Platform federated.Platform `param:"platform" validate:"required"` Limit int `query:"limit" validate:"min=1,max=100" default:"100"` ActionLimit int `query:"action_limit" validate:"min=1,max=20" default:"10"` diff --git a/internal/node/component/federated/transformer_social_type.go b/internal/node/component/federated/transformer_social_type.go index 09a80278..d5c77ad7 100644 --- a/internal/node/component/federated/transformer_social_type.go +++ b/internal/node/component/federated/transformer_social_type.go @@ -3,6 +3,7 @@ package federated import ( "context" "fmt" + "strings" "github.com/rss3-network/protocol-go/schema/activity" "github.com/rss3-network/protocol-go/schema/metadata" @@ -15,23 +16,18 @@ import ( // TransformSocialType adds author url and note url to social actions based on type, network and platform func (c *Component) TransformSocialType(ctx context.Context, network network.Network, platform string, action activity.Action) (activity.Action, error) { switch action.Type { - case typex.SocialPost, typex.SocialComment, typex.SocialShare, typex.SocialRevise, typex.SocialMint, typex.SocialDelete: + case typex.SocialPost, typex.SocialComment, typex.SocialShare: return c.TransformSocialPost(ctx, network, platform, action) - case typex.SocialProfile: - return c.TransformSocialProfile(ctx, platform, action) - case typex.SocialProxy: - return c.TransformSocialProxy(ctx, platform, action) } return action, nil } // TransformSocialPost adds author url and note url to social post action -func (c *Component) TransformSocialPost(_ context.Context, _ network.Network, _ string, action activity.Action) (activity.Action, error) { +func (c *Component) TransformSocialPost(ctx context.Context, network network.Network, platform string, action activity.Action) (activity.Action, error) { post, ok := action.Metadata.(*metadata.SocialPost) if !ok { zap.L().Error("invalid post metadata type", zap.Any("metadata", action.Metadata)) - return activity.Action{}, fmt.Errorf("invalid post metadata type: %T", action.Metadata) } @@ -39,31 +35,50 @@ func (c *Component) TransformSocialPost(_ context.Context, _ network.Network, _ post.Handle = action.From } + post.AuthorURL = c.buildSocialAuthorURL(ctx, platform, post.Handle) + + if post.Target != nil && platform != "" { + post.Target.AuthorURL = c.buildSocialAuthorURL(ctx, platform, post.Target.Handle) + post.TargetURL = c.buildSocialNoteURL(ctx, network, platform, post.Target.Handle, post.Target.PublicationID) + } + action.Metadata = post + if noteURL := c.buildSocialNoteURL(ctx, network, platform, post.Handle, post.PublicationID); noteURL != "" { + action.RelatedURLs = append(action.RelatedURLs, noteURL) + } + return action, nil } -// TransformSocialProfile adds author url to social profile action -func (c *Component) TransformSocialProfile(_ context.Context, _ string, action activity.Action) (activity.Action, error) { - _, ok := action.Metadata.(*metadata.SocialProfile) - if !ok { - zap.L().Error("invalid profile metadata type", zap.Any("metadata", action.Metadata)) +// buildSocialAuthorURL returns author url based on domain and handle +func (c *Component) buildSocialAuthorURL(_ context.Context, platform, handle string) string { + if lo.IsEmpty(handle) || lo.IsEmpty(platform) { + return "" + } - return activity.Action{}, fmt.Errorf("invalid profile metadata type: %T", action.Metadata) + parts := strings.SplitN(handle, "@", 3) + if len(parts) != 3 { + return "" } - return action, nil + username, domain := parts[1], parts[2] + + return fmt.Sprintf("https://%s/users/%s", domain, username) } -// TransformSocialProxy adds author url to social proxy action -func (c *Component) TransformSocialProxy(_ context.Context, _ string, action activity.Action) (activity.Action, error) { - _, ok := action.Metadata.(*metadata.SocialProxy) - if !ok { - zap.L().Error("invalid proxy metadata type", zap.Any("metadata", action.Metadata)) +// buildSocialNoteURL returns note url based on domain, handle, profileID and pubID +func (c *Component) buildSocialNoteURL(_ context.Context, _ network.Network, platform, handle, pubID string) string { + if lo.IsEmpty(platform) || lo.IsEmpty(handle) || lo.IsEmpty(pubID) { + return "" + } - return activity.Action{}, fmt.Errorf("invalid proxy metadata type: %T", action.Metadata) + parts := strings.SplitN(handle, "@", 3) + if len(parts) != 3 { + return "" } - return action, nil + username, domain := parts[1], parts[2] + + return fmt.Sprintf("https://%s/users/%s/statuses/%s", domain, username, pubID) } diff --git a/provider/activitypub/model.go b/provider/activitypub/model.go index 2fb22278..ff46b74c 100644 --- a/provider/activitypub/model.go +++ b/provider/activitypub/model.go @@ -25,8 +25,9 @@ type Object struct { // Attachment represents an attachment to an ActivityPub object. type Attachment struct { - Type string `json:"type"` - URL string `json:"url"` + Type string `json:"type"` + URL string `json:"url"` + MediaType string `json:"mediaType"` } // Tag represents a tag in an ActivityPub object. @@ -49,14 +50,18 @@ type Note struct { // StatusResult represents the result of a status request. type StatusResult struct { - Content string - Timestamp string + Content string + Timestamp string + Attachments []Attachment + Tags []Tag } // StatusResponse represents a status response from an ActivityPub server. type StatusResponse struct { Object struct { - Published time.Time `json:"published"` - Content string `json:"content"` + Published time.Time `json:"published"` + Content string `json:"content"` + Attachment []Attachment `json:"attachment"` + Tag []Tag `json:"tag"` } `json:"object"` }