Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use go arenas, wait group, remove binary heap completely #20

Merged
merged 10 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added go.work.sum
Empty file.
1 change: 0 additions & 1 deletion go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func main() {

var posts []Post
err = json.NewDecoder(file).Decode(&posts)

if err != nil {
log.Panicln(err)
}
Expand Down
2 changes: 0 additions & 2 deletions go_con/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,3 @@ module g.io/related_concurrent
go 1.21.1

require github.com/goccy/go-json v0.10.2

require github.com/ugurcsen/gods-generic v0.10.4 // indirect
2 changes: 0 additions & 2 deletions go_con/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/ugurcsen/gods-generic v0.10.4 h1:OomH3R2MdzZxpnEPijaD/ncLzV6rpDXd5ruEkWsw0vo=
github.com/ugurcsen/gods-generic v0.10.4/go.mod h1:mGYOa88Y5sbw+ADXLpScxjJ7s5iHoWya/YHyeQ4f6c4=
111 changes: 57 additions & 54 deletions go_con/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package main

import (
"arena"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"

"github.com/goccy/go-json"
"github.com/ugurcsen/gods-generic/trees/binaryheap"
)

// custom type alias - for easier experiments with int size
// using smaller than int64 integer size but still big enough for 4 billion posts
type isize uint32

var concurrency = isize(runtime.NumCPU())
var a *arena.Arena

type isize uint32

type Post struct {
ID string `json:"_id"`
Expand Down Expand Up @@ -45,8 +47,12 @@ func main() {
if err != nil {
log.Panicln(err)
}
defer file.Close()

a = arena.NewArena() // Create a new arena

var posts []Post
posts = arena.MakeSlice[Post](a, 0, 10000)

err = json.NewDecoder(file).Decode(&posts)
if err != nil {
Expand All @@ -55,7 +61,8 @@ func main() {

start := time.Now()

postsLength := isize(len(posts))
postsLength := len(posts)
postsLengthISize := isize(postsLength)

tagMap := make(map[string][]isize, 100)
for i, post := range posts {
Expand All @@ -64,36 +71,30 @@ func main() {
}
}

resultsChan := make(chan Result, postsLength)
doneChan := make(chan bool, concurrency)
resultsChan := make(chan Result, postsLengthISize)

// create wait group to wait for all workers to finish
wg := sync.WaitGroup{}
wg.Add(int(concurrency))
var w isize
for ; w < concurrency; w++ {
for ; w < isize(concurrency); w++ {
// allocate taggedPostCount for each worker once, zero out for each task
taggedPostCount := make([]isize, postsLength)
t5 := binaryheap.NewWith[PostWithSharedTags](PostComparator)
taggedPostCount := arena.MakeSlice[isize](a, postsLength, postsLength)
go func(workerID isize) {
for i := workerID; i < postsLength; i += concurrency {
for i := workerID; i < postsLengthISize; i += concurrency {
// provide taggedPostCount and binary heap for each task
resultsChan <- Result{
Index: isize(i),
RelatedPost: computeRelatedPost(i, posts, tagMap, taggedPostCount, t5),
Index: i,
RelatedPost: computeRelatedPost(i, posts, tagMap, taggedPostCount),
}
}

doneChan <- true
wg.Done()
}(w)
}

var i isize
for ; i < concurrency; i++ {
<-doneChan
}

wg.Wait()
close(resultsChan)
close(doneChan)

allRelatedPosts := make([]RelatedPosts, len(posts))
allRelatedPosts := arena.MakeSlice[RelatedPosts](a, postsLength, postsLength)
for r := range resultsChan {
allRelatedPosts[r.Index] = r.RelatedPost
}
Expand All @@ -111,40 +112,52 @@ func main() {
if err != nil {
log.Panicln(err)
}
a.Free()
}

func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, taggedPostCount []isize, t5 *binaryheap.Heap[PostWithSharedTags]) RelatedPosts {

// zero out tagged post count
for i := range taggedPostCount {
taggedPostCount[i] = 0
func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, taggedPostCount []isize) RelatedPosts {
// Zero out tagged post count
for j := range taggedPostCount {
taggedPostCount[j] = 0
}

// Count the number of tags shared between posts
for _, tag := range posts[i].Tags {
for _, otherPostIdx := range tagMap[tag] {
taggedPostCount[otherPostIdx]++
if otherPostIdx != i { // Exclude the post itself
taggedPostCount[otherPostIdx]++
}
}
}
// zero out current post count - no need for branch in a for loop above
taggedPostCount[i] = 0

for v, count := range taggedPostCount {
if t5.Size() < 5 {
t5.Push(PostWithSharedTags{Post: isize(v), SharedTags: count})
} else {
if t, _ := t5.Peek(); t.SharedTags < count {
t5.Pop()
t5.Push(PostWithSharedTags{Post: isize(v), SharedTags: count})

top5 := [5]PostWithSharedTags{}
minTags := isize(0) // Updated initialization

for j, count := range taggedPostCount {
if count > minTags {
// Find the position to insert
pos := 4
for pos >= 0 && top5[pos].SharedTags < count {
pos--
}
pos++

// Shift and insert
if pos < 4 {
copy(top5[pos+1:], top5[pos:4])
}
if pos <= 4 {
top5[pos] = PostWithSharedTags{Post: isize(j), SharedTags: count}
minTags = top5[4].SharedTags
}
}
}

num := min(5, t5.Size())
topPosts := make([]*Post, num)

for i := 0; i < num; i++ {
if t, ok := t5.Pop(); ok {
topPosts[i] = &posts[t.Post]
// Convert indexes back to Post pointers
topPosts := make([]*Post, 0, 5)
for _, t := range top5 {
if t.SharedTags > 0 {
topPosts = append(topPosts, &posts[t.Post])
}
}

Expand All @@ -154,13 +167,3 @@ func computeRelatedPost(i isize, posts []Post, tagMap map[string][]isize, tagged
Related: topPosts,
}
}

func PostComparator(a, b PostWithSharedTags) int {
if a.SharedTags > b.SharedTags {
return 1
}
if a.SharedTags < b.SharedTags {
return -1
}
return 0
}
3 changes: 1 addition & 2 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ run_go() {
run_go_concurrent() {
echo "Running Go with concurrency" &&
cd ./go_con &&
go build &&
GOEXPERIMENT=arenas go build &&
if [ $HYPER == 1 ]; then
command hyperfine -r 10 -w 3 --show-output "./related_concurrent"
else
command time -f '%es %Mk' ./related_concurrent
fi

check_output "related_posts_go_con.json"

}

run_rust() {
Expand Down