Skip to content

Commit

Permalink
Add file sync to Event and State APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Multiply committed Oct 4, 2019
1 parent 8e42805 commit b89858b
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 83 deletions.
42 changes: 39 additions & 3 deletions integration/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,34 @@ func TestDevSyncAPITrigger(t *testing.T) {
skaffold.Build().InDir("testdata/file-sync").WithConfig("skaffold-manual.yaml").InNs(ns.Name).RunOrFail(t)

rpcAddr := randomPort()
client, shutdown := setupRPCClient(t, rpcAddr)
defer shutdown()

stop := skaffold.Dev("--auto-sync=false", "--rpc-port", rpcAddr).InDir("testdata/file-sync").WithConfig("skaffold-manual.yaml").InNs(ns.Name).RunBackground(t)
defer stop()

client, shutdown := setupRPCClient(t, rpcAddr)
defer shutdown()

stream, err := readEventAPIStream(client, t, readRetries)
if stream == nil {
t.Fatalf("error retrieving event log: %v\n", err)
}

// throw away first 5 entries of log (from first run of dev loop)
for i := 0; i < 5; i++ {
stream.Recv()
}

// read entries from the log
entries := make(chan *proto.LogEntry)
go func() {
for {
entry, _ := stream.Recv()
if entry != nil {
entries <- entry
}
}
}()

k8sclient.WaitForPodsReady("test-file-sync")

ioutil.WriteFile("testdata/file-sync/foo", []byte("foo"), 0644)
Expand All @@ -115,9 +137,23 @@ func TestDevSyncAPITrigger(t *testing.T) {
},
})

err := wait.PollImmediate(time.Millisecond*500, 1*time.Minute, func() (bool, error) {
// Ensure we see a file sync in progress triggered in the event log
err = wait.PollImmediate(time.Millisecond*500, 2*time.Minute, func() (bool, error) {
e := <-entries
return e.GetEvent().GetFileSyncEvent().GetStatus() == "In Progress", nil
})
testutil.CheckError(t, false, err)

err = wait.PollImmediate(time.Millisecond*500, 1*time.Minute, func() (bool, error) {
out, _ := exec.Command("kubectl", "exec", "test-file-sync", "-n", ns.Name, "--", "cat", "foo").Output()
return string(out) == "foo", nil
})
testutil.CheckError(t, false, err)

// Ensure we see a file sync succeeded triggered in the event log
err = wait.PollImmediate(time.Millisecond*500, 2*time.Minute, func() (bool, error) {
e := <-entries
return e.GetEvent().GetFileSyncEvent().GetStatus() == "Succeeded", nil
})
testutil.CheckError(t, false, err)
}
43 changes: 43 additions & 0 deletions pkg/skaffold/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func emptyStateWithArtifacts(builds map[string]string) proto.State {
Resources: map[string]string{},
},
ForwardedPorts: make(map[int32]*proto.PortEvent),
FileSyncState: &proto.FileSyncState{
Status: NotStarted,
},
}
}

Expand Down Expand Up @@ -239,6 +242,21 @@ func BuildComplete(imageName string) {
handler.handleBuildEvent(&proto.BuildEvent{Artifact: imageName, Status: Complete})
}

// FileSyncInProgress notifies that a file sync has been started.
func FileSyncInProgress(fileCount int, image string) {
handler.handleFileSyncEvent(&proto.FileSyncEvent{FileCount: int32(fileCount), Image: image, Status: InProgress})
}

// FileSyncFailed notifies that a file sync has failed.
func FileSyncFailed(fileCount int, image string, err error) {
handler.handleFileSyncEvent(&proto.FileSyncEvent{FileCount: int32(fileCount), Image: image, Status: Failed, Err: err.Error()})
}

// FileSyncSucceeded notifies that a file sync has succeeded.
func FileSyncSucceeded(fileCount int, image string) {
handler.handleFileSyncEvent(&proto.FileSyncEvent{FileCount: int32(fileCount), Image: image, Status: Succeeded})
}

// PortForwarded notifies that a remote port has been forwarded locally.
func PortForwarded(localPort, remotePort int32, podName, containerName, namespace string, portName string, resourceType, resourceName string) {
go handler.handle(&proto.Event{
Expand Down Expand Up @@ -295,6 +313,14 @@ func (ev *eventHandler) handleBuildEvent(e *proto.BuildEvent) {
})
}

func (ev *eventHandler) handleFileSyncEvent(e *proto.FileSyncEvent) {
go ev.handle(&proto.Event{
EventType: &proto.Event_FileSyncEvent{
FileSyncEvent: e,
},
})
}

func LogSkaffoldMetadata(info *version.Info) {
handler.logEvent(proto.LogEntry{
Timestamp: ptypes.TimestampNow(),
Expand Down Expand Up @@ -382,6 +408,23 @@ func (ev *eventHandler) handle(event *proto.Event) {
logEntry.Entry = fmt.Sprintf("Resource %s status failed with %s", rseName, rse.Err)
default:
}
case *proto.Event_FileSyncEvent:
fse := e.FileSyncEvent
fseFileCount := fse.FileCount
fseImage := fse.Image
ev.stateLock.Lock()
ev.state.FileSyncState.Status = fse.Status
ev.stateLock.Unlock()
switch fse.Status {
case InProgress:
logEntry.Entry = fmt.Sprintf("File sync started for %d files for %s", fseFileCount, fseImage)
case Succeeded:
logEntry.Entry = fmt.Sprintf("File sync succeeded for %d files for %s", fseFileCount, fseImage)
case Failed:
logEntry.Entry = fmt.Sprintf("File sync failed for %d files for %s", fseFileCount, fseImage)
// logEntry.Err = fse.Err
default:
}

default:
return
Expand Down
38 changes: 38 additions & 0 deletions pkg/skaffold/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,42 @@ func TestResourceStatusCheckEventFailed(t *testing.T) {
wait(t, func() bool { return handler.getState().StatusCheckState.Resources["ns:pod/foo"] == Failed })
}

func TestFileSyncInProgress(t *testing.T) {
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().FileSyncState.Status == NotStarted })
FileSyncInProgress(5, "image")
wait(t, func() bool { return handler.getState().FileSyncState.Status == InProgress })
}

func TestFileSyncFailed(t *testing.T) {
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().FileSyncState.Status == NotStarted })
FileSyncFailed(5, "image", errors.New("BUG"))
wait(t, func() bool { return handler.getState().FileSyncState.Status == Failed })
}

func TestFileSyncSucceeded(t *testing.T) {
defer func() { handler = &eventHandler{} }()

handler = &eventHandler{
state: emptyState(latest.BuildConfig{}),
}

wait(t, func() bool { return handler.getState().FileSyncState.Status == NotStarted })
FileSyncSucceeded(5, "image")
wait(t, func() bool { return handler.getState().FileSyncState.Status == Succeeded })
}

func wait(t *testing.T, condition func() bool) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
Expand Down Expand Up @@ -278,6 +314,7 @@ func TestResetStateOnBuild(t *testing.T) {
},
DeployState: &proto.DeployState{Status: Complete},
StatusCheckState: &proto.StatusCheckState{Status: Complete},
FileSyncState: &proto.FileSyncState{Status: Succeeded},
},
}
ResetStateOnBuild()
Expand All @@ -289,6 +326,7 @@ func TestResetStateOnBuild(t *testing.T) {
},
DeployState: &proto.DeployState{Status: NotStarted},
StatusCheckState: &proto.StatusCheckState{Status: NotStarted},
FileSyncState: &proto.FileSyncState{Status: NotStarted},
}
testutil.CheckDeepEqual(t, expected, handler.getState())
}
14 changes: 13 additions & 1 deletion pkg/skaffold/runner/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ import (
// ErrorConfigurationChanged is a special error that's returned when the skaffold configuration was changed.
var ErrorConfigurationChanged = errors.New("configuration changed")

var (
// For testing
fileSyncInProgress = event.FileSyncInProgress
fileSyncFailed = event.FileSyncFailed
fileSyncSucceeded = event.FileSyncSucceeded
)

func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer) error {
if r.changeSet.needsReload {
return ErrorConfigurationChanged
Expand All @@ -59,12 +66,17 @@ func (r *SkaffoldRunner) doDev(ctx context.Context, out io.Writer) error {
}()

for _, s := range r.changeSet.needsResync {
color.Default.Fprintf(out, "Syncing %d files for %s\n", len(s.Copy)+len(s.Delete), s.Image)
fileCount := len(s.Copy) + len(s.Delete)
color.Default.Fprintf(out, "Syncing %d files for %s\n", fileCount, s.Image)
fileSyncInProgress(fileCount, s.Image)

if err := r.syncer.Sync(ctx, s); err != nil {
logrus.Warnln("Skipping deploy due to sync error:", err)
fileSyncFailed(fileCount, s.Image, err)
return nil
}

fileSyncSucceeded(fileCount, s.Image)
}
}

Expand Down
30 changes: 26 additions & 4 deletions pkg/skaffold/runner/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,18 @@ func TestDev(t *testing.T) {
}

func TestDevSync(t *testing.T) {
type fileSyncEventCalls struct {
InProgress int
Failed int
Succeeded int
}

tests := []struct {
description string
testBench *TestBench
watchEvents []filemon.Events
expectedActions []Actions
description string
testBench *TestBench
watchEvents []filemon.Events
expectedActions []Actions
expectedFileSyncEventCalls fileSyncEventCalls
}{
{
description: "sync",
Expand All @@ -298,6 +305,11 @@ func TestDevSync(t *testing.T) {
Synced: []string{"img1:1"},
},
},
expectedFileSyncEventCalls: fileSyncEventCalls{
InProgress: 1,
Failed: 0,
Succeeded: 1,
},
},
{
description: "sync twice",
Expand All @@ -319,10 +331,19 @@ func TestDevSync(t *testing.T) {
Synced: []string{"img1:1"},
},
},
expectedFileSyncEventCalls: fileSyncEventCalls{
InProgress: 2,
Failed: 0,
Succeeded: 2,
},
},
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
var actualFileSyncEventCalls fileSyncEventCalls
t.Override(&fileSyncInProgress, func(int, string) { actualFileSyncEventCalls.InProgress++ })
t.Override(&fileSyncFailed, func(int, string, error) { actualFileSyncEventCalls.Failed++ })
t.Override(&fileSyncSucceeded, func(int, string) { actualFileSyncEventCalls.Succeeded++ })
t.SetupFakeKubernetesContext(api.Config{CurrentContext: "cluster1"})
t.Override(&sync.WorkingDir, func(string, map[string]bool) (string, error) { return "/", nil })
test.testBench.cycles = len(test.watchEvents)
Expand All @@ -346,6 +367,7 @@ func TestDevSync(t *testing.T) {

t.CheckNoError(err)
t.CheckDeepEqual(test.expectedActions, test.testBench.Actions())
t.CheckDeepEqual(test.expectedFileSyncEventCalls, actualFileSyncEventCalls)
})
}
}
Loading

0 comments on commit b89858b

Please sign in to comment.