Skip to content

Commit

Permalink
Merge pull request #9 from jjang-go/feature/6-datamold
Browse files Browse the repository at this point in the history
Feature/6 datamold
  • Loading branch information
yunkon-kim authored Nov 3, 2023
2 parents 0b4e370 + 87bf3e3 commit a8a793a
Show file tree
Hide file tree
Showing 20 changed files with 970 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ Data Mold는 데이터 마이그레이션 기술의 검증을 위한 환경을
1. 데이터 저장소(스토리지 또는 데이터베이스)를 목표 및 소스 컴퓨팅 환경에 생성한다.
2. 생성된 소스 데이터 저장소에 테스트 데이터를 생성 및 저장한다.
3. 소스에서 목표 컴퓨팅 환경으로 데이터 복제/마이그레이션을 수행하며, 이때 데이터 전/후처리 작업을 수행한다.

## Environments:

* OS; Ubuntu 20.04 LTS, Windows 10 Pro
* Go: 1.21.3
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
cloud.google.com/go/firestore v1.14.0
cloud.google.com/go/storage v1.34.1
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2/config v1.22.0
github.com/aws/aws-sdk-go-v2/credentials v1.15.1
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.12.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.13.1
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.25.0
Expand All @@ -23,15 +25,20 @@ require (
cloud.google.com/go/iam v1.1.3 // indirect
cloud.google.com/go/longrunning v0.5.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.17.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.17.0 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.25.0 // indirect
github.com/aws/smithy-go v1.16.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand Down
File renamed without changes.
25 changes: 25 additions & 0 deletions pkg/dummy/semistructed/semistructed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package semistructed_test

import (
"testing"

"fmt"

"github.com/cloud-barista/cm-data-mold/pkg/dummy/semistructed"
)

func TestJSON(t *testing.T) {
// Enter the directory path and total data size (in GB) to store json dummy data
if err := semistructed.GenerateRandomJSON("json-dummy-directory-path", 1); err != nil {
fmt.Printf("test json error : %v", err)
panic(err)
}
}

func TestXML(t *testing.T) {
// Enter the directory path and total data size in GB to store xml dummy data
if err := semistructed.GenerateRandomXML("xml-dummy-directory-path", 1); err != nil {
fmt.Printf("test xml error : %v", err)
panic(err)
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
27 changes: 27 additions & 0 deletions pkg/dummy/structed/structed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package structed_test

import (
"path/filepath"
"testing"

"fmt"

"github.com/cloud-barista/cm-data-mold/pkg/dummy/structed"
)

func TestCSV(t *testing.T) {
// Enter the directory path and total data size in GB to store csv dummy data
if err := structed.GenerateRandomCSV(filepath.Join("csv-dummy-directory-path", "csv"), 100); err != nil {
fmt.Printf("test csv error : %v", err)
panic(err)
}

}

func TestSQL(t *testing.T) {
// Enter the directory path and total data size in GB to store sql dummy data
if err := structed.GenerateRandomSQL(filepath.Join("sql-dummy-directory-path", "sql"), 100); err != nil {
fmt.Printf("test sql error : %v", err)
panic(err)
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
40 changes: 40 additions & 0 deletions pkg/dummy/unstructed/unstructed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package unstructed_test

import (
"fmt"
"testing"

"github.com/cloud-barista/cm-data-mold/pkg/dummy/unstructed"
)

func TestIMG(t *testing.T) {
// Enter the directory path and total data size, in GB, to store the img dummy data
if err := unstructed.GenerateRandomPNGImage("img-dummy-directory-path", 1); err != nil {
fmt.Printf("test img error : %v", err)
panic(err)
}
}

func TestGIF(t *testing.T) {
// Enter the directory path and total data size in GB to store gif dummy data
if err := unstructed.GenerateRandomPNGImage("gif-dummy-directory-path", 1); err != nil {
fmt.Printf("test gif error : %v", err)
panic(err)
}
}

func TestTXT(t *testing.T) {
// Enter the directory path and total data size, in GB, to store txt dummy data
if err := unstructed.GenerateRandomTXT("txt-dummy-directory-path", 1); err != nil {
fmt.Printf("test txt error : %v", err)
panic(err)
}
}

func TestZIP(t *testing.T) {
// Enter the directory path and total data size in GB to store zip dummy data
if err := unstructed.GenerateRandomTXT("zip-dummy-directory-path", 1); err != nil {
fmt.Printf("test zip error : %v", err)
panic(err)
}
}
File renamed without changes.
137 changes: 137 additions & 0 deletions service/nrdbc/nrdbc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package nrdbc_test

import (
"context"
"encoding/json"
"fmt"
"testing"

"cloud.google.com/go/firestore"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/cloud-barista/cm-data-mold/pkg/nrdbms/awsdnmdb"
"github.com/cloud-barista/cm-data-mold/pkg/nrdbms/gcpfsdb"
"github.com/cloud-barista/cm-data-mold/pkg/nrdbms/ncpmgdb"
"github.com/cloud-barista/cm-data-mold/service/nrdbc"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"google.golang.org/api/option"
)

// dynamo to firestore example
func TestMain(m *testing.M) {
awsnrdbc, err := AWSInfo("your-aws-accessKey", "your-aws-secretKey", "your-aws-reigon")
if err != nil {
panic(err)
}

gcpnrdc, err := GCPInfo("your-gcp-projectID", "your-gcp-credentialsFile")
if err != nil {
panic(err)
}

var srcData []map[string]interface{}
err = json.Unmarshal([]byte(exJSON), &srcData)
if err != nil {
panic(err)
}

if err := awsnrdbc.Put("address", &srcData); err != nil {
panic(err)
}

var dstData []map[string]interface{}
if err := awsnrdbc.Get("address", &dstData); err != nil {
panic(err)
}

fmt.Println(dstData)

// dynamo to firestore
if err := awsnrdbc.Copy(gcpnrdc); err != nil {
panic(err)
}
}

func AWSInfo(accessKey, secretKey, region string) (*nrdbc.NRDBController, error) {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
config.WithRegion(region),
config.WithRetryMaxAttempts(5),
)

if err != nil {
return nil, err
}

return nrdbc.New(awsdnmdb.New(dynamodb.NewFromConfig(cfg), region))
}

func GCPInfo(projectID, credentialsFile string) (*nrdbc.NRDBController, error) {
client, err := firestore.NewClient(
context.TODO(),
projectID,
option.WithCredentialsFile(credentialsFile),
)
if err != nil {
return nil, err
}

return nrdbc.New(gcpfsdb.New(client, "your-aws-region"))
}

func NCPInfo(username, password, host, port, dbName string) (*nrdbc.NRDBController, error) {
dc := true
mongoClient, err := mongo.Connect(context.Background(), &options.ClientOptions{
Auth: &options.Credential{
Username: username,
Password: password,
},
Direct: &dc,
Hosts: []string{fmt.Sprintf("%s:%s", host, port)},
})

if err != nil {
return nil, err
}

return nrdbc.New(ncpmgdb.New(mongoClient, dbName))
}

const exJSON string = `
[
{
"addr_id": "DhiFnPGUcQUL8wgSkx1ky1hb",
"countryabr": "SE",
"street": "8350 West Loopbury",
"city": "St. Petersburg",
"state": "Idaho",
"zip": "58295",
"country": "Brazil",
"latitude": 43,
"longitude": -11
},
{
"addr_id": "Hf1ct5b8OCySK3OxiUET1nQ4",
"countryabr": "IN",
"street": "6258 New Viaberg",
"city": "Boston",
"state": "Mississippi",
"zip": "70833",
"country": "Lebanon",
"latitude": -42,
"longitude": 70
},
{
"addr_id": "s3suVD7d1hFUUz4ci6Z1TMAb",
"countryabr": "PL",
"street": "2497 Squarestad",
"city": "Fort Wayne",
"state": "Missouri",
"zip": "50560",
"country": "Hong Kong",
"latitude": 37,
"longitude": 123
}
]`
87 changes: 87 additions & 0 deletions service/osc/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package osc

import (
"errors"
"fmt"
"io"
"sync"

"github.com/cloud-barista/cm-data-mold/pkg/utils"
)

func (src *OSController) Copy(dst *OSController) error {
dst.osfs.CreateBucket()

objList, err := src.osfs.ObjectList()
if err != nil {
return nil
}

jobs := make(chan utils.Object)
resultChan := make(chan error)

var wg sync.WaitGroup
for i := 0; i < src.threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
copyWorker(src, dst, jobs, resultChan)
}()
}

for _, obj := range objList {
jobs <- *obj
}
close(jobs)

go func() {
wg.Wait()
close(resultChan)
}()

for err := range resultChan {
if err != nil {
return err
}
}

return nil
}

func copyWorker(src *OSController, dst *OSController, jobs chan utils.Object, resultChan chan<- error) {
for obj := range jobs {
srcFile, err := src.osfs.Open(obj.Key)
if err != nil {
resultChan <- err
continue
}

fmt.Println(obj.Key)
dstFile, err := dst.osfs.Create(obj.Key)
if err != nil {
resultChan <- err
continue
}

n, err := io.Copy(dstFile, srcFile)
if err != nil {
resultChan <- err
continue
}

if n != obj.Size {
resultChan <- errors.New("copy failed")
continue
}

if err := srcFile.Close(); err != nil {
resultChan <- err
continue
}

if err := dstFile.Close(); err != nil {
resultChan <- err
continue
}
}
}
Loading

0 comments on commit a8a793a

Please sign in to comment.