Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial cleanups #1

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
rwildcard=$(foreach d,$(wildcard $1*),$(call rwildcard,$d/,$2) $(filter $(subst *,%,$2),$d))

.PHONY: all
all: test azure-vhd-utils lint fmt
all: test azure-vhd-utils lint

.PHONY: lint
lint: azure-vhd-utils
golint ./...

.PHONY: fmt
fmt: azure-vhd-utils
go fmt ./...
go vet ./...
go mod tidy

azure-vhd-utils: $(call rwildcard, ., *.go)
azure-vhd-utils: $(call rwildcard, ., *.go) go.mod go.sum Makefile
go build

.PHONY: test
test:
go test ./...
go test ./...

.PHONY: clean
clean:
rm -f azure-vhd-utils
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ This project provides a Go package to read Virtual Hard Disk (VHD) file, a CLI i
An implementation of VHD [VHD specification](https://technet.microsoft.com/en-us/virtualization/bb676673.aspx) can be found in the [vhdcore](/vhdcore) package.


[![Go Report Card](https://goreportcard.com/badge/github.com/Microsoft/azure-vhd-utils)](https://goreportcard.com/report/github.com/Microsoft/azure-vhd-utils)
[![Go Report Card](https://goreportcard.com/badge/github.com/flatcar/azure-vhd-utils)](https://goreportcard.com/report/github.com/flatcar/azure-vhd-utils)

# Installation
> Note: You must have Go installed on your machine, at version 1.11 or greater. [https://golang.org/dl/](https://golang.org/dl/)

go get github.com/Microsoft/azure-vhd-utils
go get github.com/flatcar/azure-vhd-utils

# Features

Expand Down
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
module github.com/Microsoft/azure-vhd-utils
module github.com/flatcar/azure-vhd-utils

go 1.20

require (
github.com/Azure/azure-sdk-for-go v5.0.0-beta.0.20161118192335-3b1282355199+incompatible
gopkg.in/urfave/cli.v1 v1.19.1
)

require (
github.com/kr/pretty v0.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/urfave/cli.v1 v1.19.1
)
10 changes: 0 additions & 10 deletions upload/concurrent/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
)

// Balancer is a type that can balance load among a set of workers
//
type Balancer struct {
errorChan chan error // The channel used by all workers to report error
requestHandledChan chan *Worker // The channel used by a worker to signal balancer that a work has been executed
Expand All @@ -18,11 +17,9 @@ type Balancer struct {
}

// The size of work channel associated with each worker this balancer manages.
//
const workerQueueSize int = 3

// NewBalancer creates a new instance of Balancer that needs to balance load between 'workerCount' workers
//
func NewBalancer(workerCount int) *Balancer {
balancer := &Balancer{
workerCount: workerCount,
Expand All @@ -34,7 +31,6 @@ func NewBalancer(workerCount int) *Balancer {
}

// Init initializes all channels and start the workers.
//
func (b *Balancer) Init() {
b.errorChan = make(chan error, 0)
b.requestHandledChan = make(chan *Worker, 0)
Expand All @@ -49,7 +45,6 @@ func (b *Balancer) Init() {

// TearDownWorkers sends a force quit signal to all workers, which case worker to quit as soon as possible,
// workers won't drain it's request channel in this case.
//
func (b *Balancer) TearDownWorkers() {
close(b.tearDownChan)
}
Expand All @@ -58,7 +53,6 @@ func (b *Balancer) TearDownWorkers() {
// with least load. This method returns two channels, a channel to communicate error from any worker back to
// the consumer of balancer and second channel is used by the balancer to signal consumer that all workers has
// been finished executing.
//
func (b *Balancer) Run(requestChan <-chan *Request) (<-chan error, <-chan bool) {
// Request dispatcher
go func() {
Expand Down Expand Up @@ -95,7 +89,6 @@ func (b *Balancer) Run(requestChan <-chan *Request) (<-chan error, <-chan bool)
// closeWorkersRequestChannel closes the Request channel of all workers, this indicates that no
// more work will not be send the channel so that the workers can gracefully exit after handling
// any pending work in the channel.
//
func (b *Balancer) closeWorkersRequestChannel() {
for i := 0; i < b.workerCount; i++ {
close((b.pool.Workers[i]).RequestsToHandleChan)
Expand All @@ -105,7 +98,6 @@ func (b *Balancer) closeWorkersRequestChannel() {
// dispatch dispatches the request to the worker with least load. If all workers are completely
// busy (i.e. there Pending request count is currently equal to the maximum load) then this
// method will poll until one worker is available.
//
func (b *Balancer) dispatch(request *Request) {
for {
if b.pool.Workers[0].Pending >= workerQueueSize {
Expand All @@ -125,7 +117,6 @@ func (b *Balancer) dispatch(request *Request) {

// completed is called when a worker finishes one work, it updates the load status of the given the
// worker.
//
func (b *Balancer) completed(worker *Worker) {
b.pool.Lock()
worker.Pending--
Expand All @@ -136,7 +127,6 @@ func (b *Balancer) completed(worker *Worker) {
// WorkersCurrentLoad returns the load of the workers this balancer manages as comma separated string
// values where each value consists of worker id (Worker.Id property) and pending requests associated
// with the worker.
//
func (b *Balancer) WorkersCurrentLoad() string {
return b.pool.WorkersCurrentLoad()
}
9 changes: 1 addition & 8 deletions upload/concurrent/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,24 @@ import (
// The priority is the number of pending works assigned to the worker. Lower the pending
// work count higher the priority. Pool embeds sync.RWMutex to support concurrent heap
// operation.
//
type Pool struct {
sync.RWMutex // If consumer want to use workers in a concurrent environment
Workers []*Worker // The workers
}

// Len returns number of workers in the pool.
//
func (p *Pool) Len() int {
return len(p.Workers)
}

// Less returns true if priority of Worker instance at index i is less than priority of Worker
// instance at j, lower the pending value higher the priority
//
func (p *Pool) Less(i, j int) bool {
return p.Workers[i].Pending < p.Workers[j].Pending
}

// Swap swaps the Worker instances at the given indices i and j
//
func (p Pool) Swap(i, j int) {
func (p *Pool) Swap(i, j int) {
p.Workers[i], p.Workers[j] = p.Workers[j], p.Workers[i]
p.Workers[i].Index = i
p.Workers[j].Index = j
Expand All @@ -40,7 +36,6 @@ func (p Pool) Swap(i, j int) {
// Push is used by heap.Push implementation, to add a worker w to a Pool pool, we call
// heap.Push(&pool, w) which invokes this method to add the worker to the end of collection
// then it fix the heap by moving the added item to its correct position.
//
func (p *Pool) Push(x interface{}) {
n := len(p.Workers)
worker := x.(*Worker)
Expand All @@ -52,7 +47,6 @@ func (p *Pool) Push(x interface{}) {
// p, we call w := heap.Pop(&p).(*Worker), which swap the min priority worker at the beginning
// of the pool with the end of item, fix the heap and then invokes this method for popping the
// worker from the end.
//
func (p *Pool) Pop() interface{} {
n := len(p.Workers)
w := (*p).Workers[n-1]
Expand All @@ -64,7 +58,6 @@ func (p *Pool) Pop() interface{} {
// WorkersCurrentLoad returns the load of the workers as comma separated string values, where
// each value consists of worker id (Worker.Id property) and pending requests associated with
// the worker.
//
func (p *Pool) WorkersCurrentLoad() string {
var buffer bytes.Buffer
buffer.WriteString("Load [")
Expand Down
1 change: 0 additions & 1 deletion upload/concurrent/request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package concurrent

// Request represents a work that Worker needs to execute
//
type Request struct {
ID string // The Id of the work (for debugging purposes)
Work func() error // The work to be executed by a worker
Expand Down
9 changes: 2 additions & 7 deletions upload/concurrent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package concurrent
import "fmt"

// Worker represents a type which can listen for work from a channel and run them
//
type Worker struct {
RequestsToHandleChan chan *Request // The buffered channel of works this worker needs to handle
Pending int // The number of pending requests this worker needs to handle (i.e. worker load)
Expand All @@ -16,13 +15,11 @@ type Worker struct {
}

// The maximum number of times a work needs to be retried before reporting failure on errorChan.
//
const maxRetryCount int = 5

// NewWorker creates a new instance of the worker with the given work channel size.
// errorChan is the channel to report the failure in addressing a work request after all
// retries, each time a work is completed (failure or success) doneChan will be signalled
//
func NewWorker(id int, workChannelSize int, pool *Pool, errorChan chan<- error, requestHandledChan chan<- *Worker, workerFinishedChan chan<- *Worker) *Worker {
return &Worker{
ID: id,
Expand All @@ -36,12 +33,11 @@ func NewWorker(id int, workChannelSize int, pool *Pool, errorChan chan<- error,

// Run starts a go-routine that read work from work-queue associated with the worker and executes one
// at a time. The go-routine returns/exit once one of the following condition is met:
// 1. The work-queue is closed and drained and there is no work to steal from peers worker's work-queue
// 2. A signal is received in the tearDownChan channel parameter
// 1. The work-queue is closed and drained and there is no work to steal from peers worker's work-queue
// 2. A signal is received in the tearDownChan channel parameter
//
// After executing each work, this method sends report to Worker::requestHandledChan channel
// If a work fails after maximum retry, this method sends report to Worker::errorChan channel
//
func (w *Worker) Run(tearDownChan <-chan bool) {
go func() {
defer func() {
Expand Down Expand Up @@ -105,7 +101,6 @@ func (w *Worker) Run(tearDownChan <-chan bool) {

// tryStealWork will try to steal a work from peer worker if available. If all peer channels are
// empty then return nil
//
func (w *Worker) tryStealWork() *Request {
for _, w1 := range w.pool.Workers {
request, ok := <-w1.RequestsToHandleChan
Expand Down
12 changes: 4 additions & 8 deletions upload/detectEmptyRanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"io"
"math"

"github.com/Microsoft/azure-vhd-utils/vhdcore/block/bitmap"
"github.com/Microsoft/azure-vhd-utils/vhdcore/common"
"github.com/Microsoft/azure-vhd-utils/vhdcore/diskstream"
"github.com/Microsoft/azure-vhd-utils/vhdcore/footer"
"github.com/flatcar/azure-vhd-utils/vhdcore/block/bitmap"
"github.com/flatcar/azure-vhd-utils/vhdcore/common"
"github.com/flatcar/azure-vhd-utils/vhdcore/diskstream"
"github.com/flatcar/azure-vhd-utils/vhdcore/footer"
)

// DataWithRange type describes a range and data associated with the range.
//
type DataWithRange struct {
Range *common.IndexRange
Data []byte
Expand All @@ -22,7 +21,6 @@ type DataWithRange struct {
// ranges and update the uploadableRanges slice by removing the empty ranges. This method returns the updated ranges.
// The empty range detection required only for Fixed disk, if the stream is a expandable disk stream this method simply
// returns the parameter uploadableRanges as it is.
//
func DetectEmptyRanges(diskStream *diskstream.DiskStream, uploadableRanges []*common.IndexRange) ([]*common.IndexRange, error) {
if diskStream.GetDiskType() != footer.DiskTypeFixed {
return uploadableRanges, nil
Expand Down Expand Up @@ -68,7 +66,6 @@ L:
// to report the non-empty range indices and error channel - used to report any error while performing empty detection.
// int channel will be closed on a successful completion, the caller must not expect any more value in the
// int channel if the error channel is signaled.
//
func LocateNonEmptyRangeIndices(stream *diskstream.DiskStream, ranges []*common.IndexRange) (<-chan int32, <-chan error) {
indexChan := make(chan int32, 0)
errorChan := make(chan error, 0)
Expand Down Expand Up @@ -101,7 +98,6 @@ func LocateNonEmptyRangeIndices(stream *diskstream.DiskStream, ranges []*common.
}

// isAllZero returns true if the given byte slice contain all zeros
//
func isAllZero(buf []byte) bool {
l := len(buf)
j := 0
Expand Down
15 changes: 3 additions & 12 deletions upload/metadata/metaData.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Microsoft/azure-vhd-utils/upload/progress"
"github.com/Microsoft/azure-vhd-utils/vhdcore/diskstream"

"github.com/flatcar/azure-vhd-utils/upload/progress"
"github.com/flatcar/azure-vhd-utils/vhdcore/diskstream"
)

// The key of the page blob metadata collection entry holding VHD metadata as json.
//
const metaDataKey = "diskmetadata"

// MetaData is the type representing metadata associated with an Azure page blob holding the VHD.
// This will be stored as a JSON string in the page blob metadata collection with key 'diskmetadata'.
//
type MetaData struct {
FileMetaData *FileMetaData `json:"fileMetaData"`
}

// FileMetaData represents the metadata of a VHD file.
//
type FileMetaData struct {
FileName string `json:"fileName"`
FileSize int64 `json:"fileSize"`
Expand All @@ -37,7 +35,6 @@ type FileMetaData struct {
}

// ToJSON returns MetaData as a json string.
//
func (m *MetaData) ToJSON() (string, error) {
b, err := json.Marshal(m)
if err != nil {
Expand All @@ -47,7 +44,6 @@ func (m *MetaData) ToJSON() (string, error) {
}

// ToMap returns the map representation of the MetaData which can be stored in the page blob metadata colleciton
//
func (m *MetaData) ToMap() (map[string]string, error) {
v, err := m.ToJSON()
if err != nil {
Expand All @@ -59,7 +55,6 @@ func (m *MetaData) ToMap() (map[string]string, error) {

// NewMetaDataFromLocalVHD creates a MetaData instance that should be associated with the page blob
// holding the VHD. The parameter vhdPath is the path to the local VHD.
//
func NewMetaDataFromLocalVHD(vhdPath string) (*MetaData, error) {
fileStat, err := getFileStat(vhdPath)
if err != nil {
Expand Down Expand Up @@ -90,7 +85,6 @@ func NewMetaDataFromLocalVHD(vhdPath string) (*MetaData, error) {

// NewMetadataFromBlob returns MetaData instance associated with a Azure page blob, if there is no
// MetaData associated with the blob it returns nil value for MetaData
//
func NewMetadataFromBlob(blobClient storage.BlobStorageClient, containerName, blobName string) (*MetaData, error) {
allMetadata, err := blobClient.GetBlobMetadata(containerName, blobName)
if err != nil {
Expand All @@ -112,7 +106,6 @@ func NewMetadataFromBlob(blobClient storage.BlobStorageClient, containerName, bl
// CompareMetaData compares the MetaData associated with the remote page blob and local VHD file. If both metadata
// are same this method returns an empty error slice else a non-empty error slice with each error describing
// the metadata entry that mismatched.
//
func CompareMetaData(remote, local *MetaData) []error {
var metadataErrors = make([]error, 0)
if !bytes.Equal(remote.FileMetaData.MD5Hash, local.FileMetaData.MD5Hash) {
Expand Down Expand Up @@ -150,7 +143,6 @@ func CompareMetaData(remote, local *MetaData) []error {
}

// getFileStat returns os.FileInfo of a file.
//
func getFileStat(filePath string) (os.FileInfo, error) {
fd, err := os.Open(filePath)
if err != nil {
Expand All @@ -162,7 +154,6 @@ func getFileStat(filePath string) (os.FileInfo, error) {

// calculateMD5Hash compute the MD5 checksum of a disk stream, it writes the compute progress in stdout
// If there is an error in reading file, then the MD5 compute will stop and it return error.
//
func calculateMD5Hash(diskStream *diskstream.DiskStream) ([]byte, error) {
progressStream := progress.NewReaderWithProgress(diskStream, diskStream.GetSize(), 1*time.Second)
defer progressStream.Close()
Expand Down
Loading