-
Notifications
You must be signed in to change notification settings - Fork 1
/
upload_manager.go
146 lines (119 loc) · 2.98 KB
/
upload_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"io"
"time"
"golang.org/x/net/context"
)
// UploadSessionManager is a concreate implmentation of the UploadSessionManager
type UploadSessionManager struct {
TTL time.Duration
TmpDir *string
Store ArchiveStorer
UploadHook HookRunner
finished chan UpdateRequest
sessMap *SafeMap
}
// UpdateRequest contains the information needed to
// request an update, only regeneration is supported
// at present
type UpdateRequest struct {
resp chan *appError
session *UploadSession
}
// NewUploadSessionManager creates a session manager which maintains a set of
// on-going upload sessions, controlling thier permitted life time, temporary
// storage location, and how the contents should be verified
func NewUploadSessionManager(
TTL time.Duration,
tmpDir *string,
store ArchiveStorer,
uploadHook HookRunner,
) *UploadSessionManager {
finished := make(chan UpdateRequest)
res := &UploadSessionManager{
TTL: TTL,
TmpDir: tmpDir,
Store: store,
UploadHook: uploadHook,
finished: finished,
sessMap: NewSafeMap(),
}
go res.updater()
return res
}
// GetSession retrieves a given upload session by the session's id
func (usm *UploadSessionManager) GetSession(sid string) (s UploadSession, ok bool) {
val := usm.sessMap.Get(sid)
if val == nil {
return UploadSession{}, false
}
switch t := val.(type) {
default:
{
return UploadSession{}, false
}
case UploadSession:
{
return UploadSession(t), true
}
}
}
// NewSession adds a new upload session based on the details from the passed
// debian changes file.
func (usm *UploadSessionManager) NewSession(rel *Release, changesReader io.ReadCloser, loneDeb bool) (string, error) {
var err error
ctx, _ := context.WithTimeout(context.Background(), usm.TTL)
s, err := NewUploadSession(
ctx,
rel,
loneDeb,
changesReader,
usm.TmpDir,
usm,
)
if err != nil {
return "", err
}
id := s.ID()
usm.sessMap.Set(id, s)
go func() {
<-ctx.Done()
usm.sessMap.Set(id, nil)
}()
return id, nil
}
// mergeSession Merges the provided upload session into the
// release it was uploaded to.
func (usm *UploadSessionManager) mergeSession(s *UploadSession) *appError {
c := make(chan *appError)
usm.finished <- UpdateRequest{
session: s,
resp: c,
}
return <-c
}
// Updater ensures that updates to the repository are serialized.
// it reads from a channel of messages, responds to clients, and
// instigates the actual regernation of the repository
func (usm *UploadSessionManager) updater() {
for {
select {
case msg := <-usm.finished:
{
var apperr *appError
s := msg.session
state.Lock.WriteLock()
hookResult := cfg.PreGenHook.Run(s.Directory())
s.PreGenHookOutput = &hookResult
if err := state.Archive.AddUpload(s); err == nil {
hookResult := cfg.PostGenHook.Run(s.ID())
s.PostGenHookOutput = &hookResult
} else {
apperr = &appError{Error: err}
}
state.Lock.WriteUnLock()
msg.resp <- apperr
}
}
}
}