Skip to content

Commit

Permalink
Merge branch 'main' into fix_20610
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 13, 2024
2 parents 460fd89 + 265fc33 commit c6bf909
Show file tree
Hide file tree
Showing 114 changed files with 3,601 additions and 898 deletions.
5 changes: 5 additions & 0 deletions pkg/bootstrap/versions/v2_0_1/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func migrateMoPubs(txn executor.TxnExecutor) (err error) {
if _, err = txn.Exec(insertSql, executor.StatementOption{}.WithAccountID(0)); err != nil {
return
}

deleteSql := fmt.Sprintf("delete from mo_catalog.mo_subs where pub_account_name = '%s' and pub_name = '%s' and sub_name is null", info.PubAccountName, info.PubName)
if _, err = txn.Exec(deleteSql, executor.StatementOption{}.WithAccountID(0)); err != nil {
return
}
}
return
}
54 changes: 52 additions & 2 deletions pkg/bootstrap/versions/v2_0_1/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package v2_0_1

import (
"strings"
"testing"

"github.com/matrixorigin/matrixone/pkg/bootstrap/versions"
Expand All @@ -27,7 +28,9 @@ import (
"github.com/stretchr/testify/assert"
)

type MockTxnExecutor struct{}
type MockTxnExecutor struct {
flag bool
}

func (MockTxnExecutor) Use(db string) {
//TODO implement me
Expand All @@ -39,7 +42,11 @@ func (MockTxnExecutor) LockTable(table string) error {
panic("implement me")
}

func (MockTxnExecutor) Exec(sql string, options executor.StatementOption) (executor.Result, error) {
func (e MockTxnExecutor) Exec(sql string, options executor.StatementOption) (executor.Result, error) {
if strings.HasPrefix(sql, "delete from mo_catalog.mo_subs") && e.flag {
return executor.Result{}, assert.AnError
}

bat := batch.New([]string{"a"})
bat.Vecs[0] = testutil.MakeInt32Vector([]int32{1}, nil)
bat.SetRowCount(1)
Expand Down Expand Up @@ -106,3 +113,46 @@ func Test_migrateMoPubs(t *testing.T) {
err := migrateMoPubs(txn)
assert.NoError(t, err)
}

func Test_migrateMoPubs_deleteFailed(t *testing.T) {
getAccountsStub := gostub.Stub(
&pubsub.GetAccounts,
func(_ executor.TxnExecutor) (map[string]*pubsub.AccountInfo, map[int32]*pubsub.AccountInfo, error) {
return map[string]*pubsub.AccountInfo{
"acc1": {Id: 1, Name: "acc1"},
}, nil, nil
},
)
defer getAccountsStub.Reset()

getAllPubInfosStub := gostub.Stub(
&versions.GetAllPubInfos,
func(_ executor.TxnExecutor, _ map[string]*pubsub.AccountInfo) (map[string]*pubsub.PubInfo, error) {
return map[string]*pubsub.PubInfo{
"sys#pubName": {
PubAccountName: "sys",
PubName: "pubName",
SubAccountsStr: pubsub.AccountAll,
},
"acc1#pubName": {
PubAccountName: "acc1",
PubName: "pubName",
SubAccountsStr: pubsub.AccountAll,
},
}, nil
},
)
defer getAllPubInfosStub.Reset()

getSubbedAccNamesStub := gostub.Stub(
&getSubbedAccNames,
func(_ executor.TxnExecutor, _, _ string, _ map[int32]*pubsub.AccountInfo) ([]string, error) {
return []string{"acc2"}, nil
},
)
defer getSubbedAccNamesStub.Reset()

txn := &MockTxnExecutor{flag: true}
err := migrateMoPubs(txn)
assert.Error(t, err)
}
4 changes: 2 additions & 2 deletions pkg/catalog/tuplesParse.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func ParseEntryList(es []*api.Entry) (any, []*api.Entry, error) {
bat, _ := batch.ProtoBatchToBatch(e.Bat)
batstr := ""
if bat != nil {
batstr = common.MoBatchToString(bat, 20)
batstr = common.MoBatchToString(bat, 5)
}
return nil, nil, moerr.NewInternalErrorNoCtxf("bad write format %q, %s", e.EntryType, batstr)
return nil, nil, moerr.NewInternalErrorNoCtxf("bad write format %q, %v, batch %s", e.EntryType, len(es), batstr)
}
if e.EntryType == api.Entry_Alter {
bat, err := batch.ProtoBatchToBatch(e.Bat)
Expand Down
3 changes: 2 additions & 1 deletion pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func (s *service) handleFaultInjection(ctx context.Context, req *query.Request,
}

func (s *service) handleMoTableStats(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error {
ret := disttae.HandleMoTableStatsCtl(req.CtlMoTableStatsRequest.Cmd)
e := s.storeEngine.(*disttae.Engine)
ret := e.HandleMoTableStatsCtl(req.CtlMoTableStatsRequest.Cmd)
resp.CtlMoTableStatsResponse = query.CtlMoTableStatsResponse{
Resp: ret,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/malloc/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func testAllocator(

t.Run("allocate", func(t *testing.T) {
allocator := newAllocator()
for i := uint64(1); i < 128*MB; i = uint64(math.Ceil(float64(i) * 1.1)) {
for i := uint64(1); i < 8*MB; i = uint64(math.Ceil(float64(i) * 1.1)) {
// allocate
slice, dec, err := allocator.Allocate(i, NoHints)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/common/malloc/metrics_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ func (m *MetricsAllocator[U]) Allocate(size uint64, hints Hints) ([]byte, Deallo
), nil
}

const metricsAllocatorUpdateWindow = time.Millisecond * 100

func (m *MetricsAllocator[U]) triggerUpdate() {
if m.updating.CompareAndSwap(false, true) {
time.AfterFunc(time.Second, func() {
time.AfterFunc(metricsAllocatorUpdateWindow, func() {

if m.allocateBytesCounter != nil {
var n uint64
Expand Down
8 changes: 4 additions & 4 deletions pkg/container/types/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ const (
MaxBinaryLen = 255
MaxEnumLen = 65535
MaxBitLen = 64
MaxBlobLen = 65535
MaxVarcharLen = MaxBlobLen
MaxVarBinaryLen = MaxBlobLen
MaxStringSize = MaxBlobLen
MaxBlobLen = 67108864 // 64 MB
MaxStringSize = 65535 // 64 KB
MaxVarcharLen = MaxStringSize
MaxVarBinaryLen = MaxStringSize
)

func (v *Varlena) UnsafePtr() unsafe.Pointer {
Expand Down
1 change: 1 addition & 0 deletions pkg/fileservice/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func readCache(ctx context.Context, cache IOVectorCache, vector *IOVector) error
if err != nil {

if errors.Is(err, context.DeadlineExceeded) {
LogEvent(ctx, str_read_cache_exceed_deadline)
err = moerr.AttachCause(ctx, err)
logutil.Warn("cache read exceed deadline",
zap.Any("err", err),
Expand Down
31 changes: 28 additions & 3 deletions pkg/fileservice/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ func (d *DiskCache) Read(

var numHit, numRead, numOpenIOEntry, numOpenFull, numError int64
defer func() {
LogEvent(ctx, str_update_metrics_begin)

metric.FSReadHitDiskCounter.Add(float64(numHit))
metric.FSReadReadDiskCounter.Add(float64(numRead))
perfcounter.Update(ctx, func(c *perfcounter.CounterSet) {
Expand All @@ -257,6 +259,8 @@ func (d *DiskCache) Read(
c.FileService.Cache.Disk.OpenIOEntryFile.Add(numOpenIOEntry)
c.FileService.Cache.Disk.OpenFullFile.Add(numOpenFull)
}, d.perfCounterSets...)

LogEvent(ctx, str_update_metrics_end)
}()

path, err := ParsePath(vector.FilePath)
Expand All @@ -266,12 +270,17 @@ func (d *DiskCache) Read(

openedFiles := make(map[string]*os.File)
defer func() {
LogEvent(ctx, str_close_disk_files_begin)
for _, file := range openedFiles {
_ = file.Close()
}
LogEvent(ctx, str_close_disk_files_end)
}()

fillEntry := func(entry *IOEntry) error {
LogEvent(ctx, str_disk_cache_fill_entry_begin)
defer LogEvent(ctx, str_disk_cache_fill_entry_end)

if entry.done {
return nil
}
Expand All @@ -288,14 +297,18 @@ func (d *DiskCache) Read(
diskPath := d.pathForIOEntry(path.File, *entry)
if f, ok := openedFiles[diskPath]; ok {
// use opened file
LogEvent(ctx, str_disk_cache_file_seek_begin)
_, err = file.Seek(entry.Offset, io.SeekStart)
LogEvent(ctx, str_disk_cache_file_seek_end)
if err == nil {
file = f
}
} else {
// open file
d.waitUpdateComplete(diskPath)
d.waitUpdateComplete(ctx, diskPath)
LogEvent(ctx, str_disk_cache_file_open_begin)
diskFile, err := os.Open(diskPath)
LogEvent(ctx, str_disk_cache_file_open_end)
if err == nil {
file = diskFile
defer func() {
Expand All @@ -310,21 +323,27 @@ func (d *DiskCache) Read(
diskPath = d.pathForFile(path.File)
if f, ok := openedFiles[diskPath]; ok {
// use opened file
LogEvent(ctx, str_disk_cache_file_seek_begin)
_, err = f.Seek(entry.Offset, io.SeekStart)
LogEvent(ctx, str_disk_cache_file_seek_end)
if err == nil {
file = f
}
} else {
// open file
d.waitUpdateComplete(diskPath)
d.waitUpdateComplete(ctx, diskPath)
LogEvent(ctx, str_disk_cache_file_open_begin)
diskFile, err := os.Open(diskPath)
LogEvent(ctx, str_disk_cache_file_open_end)
if err == nil {
defer func() {
openedFiles[diskPath] = diskFile
}()
numOpenFull++
// seek
LogEvent(ctx, str_disk_cache_file_seek_begin)
_, err = diskFile.Seek(entry.Offset, io.SeekStart)
LogEvent(ctx, str_disk_cache_file_seek_end)
if err == nil {
file = diskFile
}
Expand All @@ -337,14 +356,18 @@ func (d *DiskCache) Read(
return nil
}

LogEvent(ctx, str_disk_cache_update_states_begin)
if _, ok := d.cache.Get(ctx, diskPath); !ok {
// set cache
LogEvent(ctx, str_disk_cache_file_stat_begin)
stat, err := file.Stat()
LogEvent(ctx, str_disk_cache_file_stat_end)
if err != nil {
return err
}
d.cache.Set(ctx, diskPath, struct{}{}, fileSize(stat))
}
LogEvent(ctx, str_disk_cache_update_states_end)

if err := entry.ReadFromOSFile(ctx, file, d.cacheDataAllocator); err != nil {
return err
Expand Down Expand Up @@ -589,7 +612,9 @@ func (d *DiskCache) decodeFilePath(diskPath string) (string, error) {
return fromOSPath(path), nil
}

func (d *DiskCache) waitUpdateComplete(path string) {
func (d *DiskCache) waitUpdateComplete(ctx context.Context, path string) {
LogEvent(ctx, str_disk_cache_wait_update_complete_begin)
defer LogEvent(ctx, str_disk_cache_wait_update_complete_end)
d.updatingPaths.L.Lock()
for d.updatingPaths.m[path] {
d.updatingPaths.Wait()
Expand Down
20 changes: 16 additions & 4 deletions pkg/fileservice/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ var dnsResolver = dns.NewCachingResolver(

func init() {
net.DefaultResolver = dnsResolver
http.DefaultTransport = httpTransport
http.DefaultTransport = httpRoundTripper
}

var httpDialer = &net.Dialer{
Timeout: connectTimeout,
Resolver: dnsResolver,
}

var httpTransport = wrapRoundTripper(&http.Transport{
var httpTransport = &http.Transport{
DialContext: wrapDialContext(httpDialer.DialContext),
MaxIdleConns: maxIdleConns,
IdleConnTimeout: idleConnTimeout,
Expand All @@ -64,7 +64,19 @@ var httpTransport = wrapRoundTripper(&http.Transport{
RootCAs: caPool,
},
Proxy: http.ProxyFromEnvironment,
})
}

func init() {
// don't know why there is a large number of connections even though MaxConnsPerHost is set.
// close idle connections periodically.
go func() {
for range time.NewTicker(time.Second).C {
httpTransport.CloseIdleConnections()
}
}()
}

var httpRoundTripper = wrapRoundTripper(httpTransport)

var caPool = func() *x509.CertPool {
pool, err := x509.SystemCertPool()
Expand Down Expand Up @@ -97,7 +109,7 @@ func newHTTPClient(args ObjectStorageArguments) *http.Client {

// client
client := &http.Client{
Transport: httpTransport,
Transport: httpRoundTripper,
}

return client
Expand Down
17 changes: 14 additions & 3 deletions pkg/fileservice/io_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ func (i *IOEntry) setCachedData(ctx context.Context, allocator CacheDataAllocato
}

func (i *IOEntry) ReadFromOSFile(ctx context.Context, file *os.File, allocator CacheDataAllocator) (err error) {
finally := i.prepareData()
LogEvent(ctx, str_ReadFromOSFile_begin)
defer LogEvent(ctx, str_ReadFromOSFile_end)

finally := i.prepareData(ctx)
defer finally(&err)

r := io.LimitReader(file, i.Size)
LogEvent(ctx, str_io_readfull_begin)
n, err := io.ReadFull(r, i.Data)
LogEvent(ctx, str_io_readfull_end)
if err != nil {
return err
}
Expand All @@ -65,7 +71,10 @@ func (i *IOEntry) ReadFromOSFile(ctx context.Context, file *os.File, allocator C
}

if i.WriterForRead != nil {
if _, err := i.WriterForRead.Write(i.Data); err != nil {
LogEvent(ctx, str_WriterForRead_Write_begin)
_, err := i.WriterForRead.Write(i.Data)
LogEvent(ctx, str_WriterForRead_Write_end)
if err != nil {
return err
}
}
Expand All @@ -92,7 +101,9 @@ func CacheOriginalData(ctx context.Context, r io.Reader, data []byte, allocator
return
}

func (i *IOEntry) prepareData() (finally func(err *error)) {
func (i *IOEntry) prepareData(ctx context.Context) (finally func(err *error)) {
LogEvent(ctx, str_prepareData_begin)
defer LogEvent(ctx, str_prepareData_end)
if cap(i.Data) < int(i.Size) {
slice, dec, err := ioAllocator().Allocate(uint64(i.Size), malloc.NoHints)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector, bytesCounter *atom
entry.Size = int64(len(data))

} else {
finally := entry.prepareData()
finally := entry.prepareData(ctx)
defer finally(&err)
var n int
n, err = io.ReadFull(r, entry.Data)
Expand Down
Loading

0 comments on commit c6bf909

Please sign in to comment.