Skip to content

Commit

Permalink
Add dataloader example
Browse files Browse the repository at this point in the history
  • Loading branch information
vektah committed Feb 10, 2018
1 parent 86cdf3a commit d00fae0
Show file tree
Hide file tree
Showing 18 changed files with 1,080 additions and 249 deletions.
7 changes: 7 additions & 0 deletions cmd/ggraphqlc/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ func runTemplate(e *extractor) (*bytes.Buffer, error) {
}

func ucFirst(s string) string {
if s == "" {
return ""
}
r := []rune(s)
r[0] = unicode.ToUpper(r[0])
return string(r)
}

func lcFirst(s string) string {
if s == "" {
return ""
}

r := []rune(s)
r[0] = unicode.ToLower(r[0])
return string(r)
Expand Down
160 changes: 160 additions & 0 deletions example/dataloader/address_dlgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// generated by github.com/vektah/dataloaden ; DO NOT EDIT

package dataloader

import (
"sync"
"time"
)

// AddressLoader batches and caches requests
type AddressLoader struct {
// this method provides the data for the loader
fetch func(keys []int) ([]*Address, []error)

// how long to done before sending a batch
wait time.Duration

// this will limit the maximum number of keys to send in one batch, 0 = no limit
maxBatch int

// INTERNAL

// lazily created cache
cache map[int]*Address

// the current batch. keys will continue to be collected until timeout is hit,
// then everything will be sent to the fetch method and out to the listeners
batch *addressBatch

// mutex to prevent races
mu sync.Mutex
}

type addressBatch struct {
keys []int
data []*Address
error []error
closing bool
done chan struct{}
}

// Load a address by key, batching and caching will be applied automatically
func (l *AddressLoader) Load(key int) (*Address, error) {
return l.LoadThunk(key)()
}

// LoadThunk returns a function that when called will block waiting for a address.
// This method should be used if you want one goroutine to make requests to many
// different data loaders without blocking until the thunk is called.
func (l *AddressLoader) LoadThunk(key int) func() (*Address, error) {
l.mu.Lock()
if it, ok := l.cache[key]; ok {
l.mu.Unlock()
return func() (*Address, error) {
return it, nil
}
}
if l.batch == nil {
l.batch = &addressBatch{done: make(chan struct{})}
}
batch := l.batch
pos := batch.keyIndex(l, key)
l.mu.Unlock()

return func() (*Address, error) {
<-batch.done

if batch.error[pos] == nil {
l.mu.Lock()
if l.cache == nil {
l.cache = map[int]*Address{}
}
l.cache[key] = batch.data[pos]
l.mu.Unlock()
}

return batch.data[pos], batch.error[pos]
}
}

// LoadAll fetches many keys at once. It will be broken into appropriate sized
// sub batches depending on how the loader is configured
func (l *AddressLoader) LoadAll(keys []int) ([]*Address, []error) {
results := make([]func() (*Address, error), len(keys))

for i, key := range keys {
results[i] = l.LoadThunk(key)
}

addresss := make([]*Address, len(keys))
errors := make([]error, len(keys))
for i, thunk := range results {
addresss[i], errors[i] = thunk()
}
return addresss, errors
}

// Prime the cache with the provided key and value. If the key already exists, no change is made.
// (To forcefully prime the cache, clear the key first with loader.clear(key).prime(key, value).)
func (l *AddressLoader) Prime(key int, value *Address) {
l.mu.Lock()
if _, found := l.cache[key]; !found {
l.cache[key] = value
}
l.mu.Unlock()
}

// Clear the value at key from the cache, if it exists
func (l *AddressLoader) Clear(key int) {
l.mu.Lock()
delete(l.cache, key)
l.mu.Unlock()
}

// keyIndex will return the location of the key in the batch, if its not found
// it will add the key to the batch
func (b *addressBatch) keyIndex(l *AddressLoader, key int) int {
for i, existingKey := range b.keys {
if key == existingKey {
return i
}
}

pos := len(b.keys)
b.keys = append(b.keys, key)
if pos == 0 {
go b.startTimer(l)
}

if l.maxBatch != 0 && pos >= l.maxBatch-1 {
if !b.closing {
b.closing = true
l.batch = nil
go b.end(l)
}
}

return pos
}

func (b *addressBatch) startTimer(l *AddressLoader) {
time.Sleep(l.wait)
l.mu.Lock()

// we must have hit a batch limit and are already finalizing this batch
if b.closing {
l.mu.Unlock()
return
}

l.batch = nil
l.mu.Unlock()

b.end(l)
}

func (b *addressBatch) end(l *AddressLoader) {
b.data, b.error = l.fetch(b.keys)
close(b.done)
}
119 changes: 119 additions & 0 deletions example/dataloader/dataloaders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
//go:generate dataloaden -keys int github.com/vektah/graphql-go/example/dataloader.Address
//go:generate dataloaden -keys int -slice github.com/vektah/graphql-go/example/dataloader.Order
//go:generate dataloaden -keys int -slice github.com/vektah/graphql-go/example/dataloader.Item

package dataloader

import (
"context"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"time"
)

type ctxKeyType struct{ name string }

var ctxKey = ctxKeyType{"userCtx"}

type loaders struct {
addressByID *AddressLoader
ordersByCustomer *OrderSliceLoader
itemsByOrder *ItemSliceLoader
}

func LoaderMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
loaders := loaders{}

// set this to zero what happens without dataloading
wait := 250 * time.Microsecond

// simple 1:1 loader, fetch an address by its primary key
loaders.addressByID = &AddressLoader{
wait: wait,
maxBatch: 100,
fetch: func(keys []int) ([]*Address, []error) {
var keySql []string
for _, key := range keys {
keySql = append(keySql, strconv.Itoa(key))
}

fmt.Printf("SELECT * FROM address WHERE id IN (%s)\n", strings.Join(keySql, ","))
time.Sleep(5 * time.Millisecond)

addresses := make([]*Address, len(keys))
errors := make([]error, len(keys))
for i, key := range keys {
addresses[i] = &Address{Street: "home street", Country: "hometon " + strconv.Itoa(key)}
}
return addresses, errors
},
}

// 1:M loader
loaders.ordersByCustomer = &OrderSliceLoader{
wait: wait,
maxBatch: 100,
fetch: func(keys []int) ([][]Order, []error) {
var keySql []string
for _, key := range keys {
keySql = append(keySql, strconv.Itoa(key))
}

fmt.Printf("SELECT * FROM orders WHERE customer_id IN (%s)\n", strings.Join(keySql, ","))
time.Sleep(5 * time.Millisecond)

orders := make([][]Order, len(keys))
errors := make([]error, len(keys))
for i, key := range keys {
id := 10 + rand.Int()%3
orders[i] = []Order{
{ID: id, Amount: rand.Float64(), Date: time.Now().Add(-time.Duration(key) * time.Hour)},
{ID: id + 1, Amount: rand.Float64(), Date: time.Now().Add(-time.Duration(key) * time.Hour)},
}

// if you had another customer loader you would prime its cache here
// by calling `loaders.ordersByID.Prime(id, orders[i])`
}

return orders, errors
},
}

// M:M loader
loaders.itemsByOrder = &ItemSliceLoader{
wait: wait,
maxBatch: 100,
fetch: func(keys []int) ([][]Item, []error) {
var keySql []string
for _, key := range keys {
keySql = append(keySql, strconv.Itoa(key))
}

fmt.Printf("SELECT * FROM items JOIN item_order WHERE item_order.order_id IN (%s)\n", strings.Join(keySql, ","))
time.Sleep(5 * time.Millisecond)

items := make([][]Item, len(keys))
errors := make([]error, len(keys))
for i := range keys {
items[i] = []Item{
{Name: "item " + strconv.Itoa(rand.Int()%20+20)},
{Name: "item " + strconv.Itoa(rand.Int()%20+20)},
}
}

return items, errors
},
}

dlCtx := context.WithValue(r.Context(), ctxKey, loaders)
next.ServeHTTP(w, r.WithContext(dlCtx))
})
}

func ctxLoaders(ctx context.Context) loaders {
return ctx.Value(ctxKey).(loaders)
}
Loading

0 comments on commit d00fae0

Please sign in to comment.