Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: in-memory state store components #327

Merged
merged 8 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"encoding/json"
"fmt"
mock_state "mosn.io/layotto/pkg/mock/components/state"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -210,6 +211,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
),
// State
runtime.WithStateFactory(
runtime_state.NewFactory("in.memory", func() state.Store {
return mock_state.NewInMemoryStateStore()
}),
runtime_state.NewFactory("redis", func() state.Store {
return state_redis.NewRedisStateStore(loggerForDaprComp)
}),
Expand Down
73 changes: 73 additions & 0 deletions configs/config_in_memory.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"servers": [
{
"default_log_path": "stdout",
"default_log_level": "DEBUG",
"routers": [
{
"router_config_name": "actuator_dont_need_router"
}
],
"listeners": [
{
"name": "grpc",
"address": "127.0.0.1:34904",
"bind_port": true,
"filter_chains": [
{
"filters": [
{
"type": "grpc",
"config": {
"server_name": "runtime",
"grpc_config": {
"hellos": {
"helloworld": {
"hello": "greeting"
}
},
"state": {
"in.memory": {
"metadata": {
}
}
},
"app": {
"app_id": "app1",
"grpc_callback_port": 9999
}
}
}
}
]
}
]
},
{
"name": "actuator",
"address": "127.0.0.1:34999",
"bind_port": true,
"filter_chains": [
{
"filters": [
{
"type": "proxy",
"config": {
"downstream_protocol": "Http1",
"upstream_protocol": "Http1",
"router_config_name": "actuator_dont_need_router"
}
}
]
}
],
"stream_filters": [
{
"type": "actuator_filter"
}
]
}
]
}
]
}
5 changes: 3 additions & 2 deletions demo/state/common/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ func main() {
// SaveBulkState with options and metadata
testSaveBulkState(ctx, cli, storeName, key1, value, key2)

testGetBulkState(ctx, cli, storeName, key1, key2)
// GetBulkState
testGetBulkStateKey12345(ctx, cli, storeName)

// delete state
testDelete(ctx, cli, storeName, key1)
testDelete(ctx, cli, storeName, key2)
}

func testGetBulkState(ctx context.Context, cli client.Client, store string, key1 string, key2 string) {
func testGetBulkStateKey12345(ctx context.Context, cli client.Client, store string) {
state, err := cli.GetBulkState(ctx, store, []string{key1, key2, key3, key4, key5}, nil, 3)
if err != nil {
panic(err)
Expand Down
21 changes: 19 additions & 2 deletions docs/en/start/wasm/start.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
## WASM on Layotto
## Run business logic in Layotto using WASM

### What is WASM on Layotto?
The sidecar of service mesh and multi-runtime is a common infrastructure for the whole company, but in practice, the business system will also have its own SDK, and it will also have the difficulty of pushing users to upgrade the SDK and the problem of version fragmentation.

Layotto supports load the compiled WASM file, and interacts with it through the API of the `proxy_abi_version_0_2_0` version.
For example, a business system has developed an SDK in the form of a jar package for use by other business systems. Their features are not universal to the entire company, so they cannot persuade the middleware team to develop them into the company's unified sidecar.

![img_1.png](../../../img/wasm/img_1.png)



And if it becomes like this:

![img.png](../../../img/wasm/img.png)

If developers no longer develop sdk (jar package), change to develop .wasm files and support independent upgrade and deployment, there will be no pain to push the users to upgrade.

When you want to upgrade, you can release it on the operation platform. There is no need to restart the app and sidecar.

Layotto can load the compiled WASM files automatically, and interacts with them through the API of the `proxy_abi_version_0_2_0` version.

### Quick start

Expand Down Expand Up @@ -38,6 +53,8 @@ curl -H 'id:id_1' 'localhost:2045?name=book1'
There are 100 inventories for book1.
```

This http request will access the wasm module in Layotto. The wasm module will call redis for logical processing.

### Note

This feature is still in the experimental stage, and the implementation of the WASM interactive API in the community is not uniform enough, so if you have any needs for this module, please post it in the issue area, we will build WASM together!
Binary file added docs/img/wasm/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/wasm/img_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/zh/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- [健康检查、查询运行时元数据](zh/start/actuator/start.md)
- [trace管理](zh/start/trace/trace.md)
- [OSS访问文件](zh/start/file/start.md)
- [使用WASM进行多语言编程](zh/start/wasm/start.md)
- [将业务逻辑通过 WASM 下沉进sidecar](zh/start/wasm/start.md)
- [基于 WASM 跟 Runtime 实现的 Faas 模型](zh/start/faas/start.md)
- 用户手册
- 功能介绍
Expand Down
16 changes: 14 additions & 2 deletions docs/zh/start/wasm/start.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
## WASM概述

## 将业务逻辑通过WASM下沉进sidecar
### 功能介绍
service mesh 和 multi-runtime 的 sidecar 是全公司通用的基础设施,但实践中,业务系统也会有自己的sdk,也会有推动用户升级难、版本碎片的问题.

比如某中台系统以jar包形式开发了sdk,供上层业务系统使用。他们的feature不算全公司通用,因此没法说服中间件团队、开发到公司统一的sidecar里。

![img_1.png](../../../img/wasm/img_1.png)

而如果变成这样:

![img.png](../../../img/wasm/img.png)

如果开发者不再开发sdk(jar包),改成开发.wasm文件、支持独立升级部署,就没有推动业务方升级的痛苦了,想要升级的时候在运维平台上操作发布即可,不需要app和sidecar重启

Layotto支持加载编译好的WASM文件,并通过`proxy_abi_version_0_2_0`版本的API与目标WASM进行交互。

Expand Down Expand Up @@ -36,6 +46,8 @@ curl -H 'id:id_1' 'localhost:2045?name=book1'
There are 100 inventories for book1.
```

该http请求会访问Layotto中的wasm模块。该wasm模块会调用redis进行逻辑处理

### 说明

该功能目前仍处于试验阶段,社区里对于WASM跟宿主的交互API也不够统一,因此如果您有该模块的需求欢迎发表在issue区,我们一起建设WASM!
184 changes: 184 additions & 0 deletions pkg/mock/components/state/in_memory_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2021 Layotto Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mock_state

import (
"encoding/json"
"fmt"
"github.com/dapr/components-contrib/state"
"sync"
)

type inMemStateStoreItem struct {
data []byte
etag *string
}

type inMemoryStateStore struct {
items map[string]*inMemStateStoreItem
lock *sync.RWMutex
}

func NewInMemoryStateStore() state.Store {
return &inMemoryStateStore{
items: map[string]*inMemStateStoreItem{},
lock: &sync.RWMutex{},
}
}

func (store *inMemoryStateStore) newItem(data []byte, etagString *string) *inMemStateStoreItem {
return &inMemStateStoreItem{
data: data,
etag: etagString,
}
}

func (store *inMemoryStateStore) Init(metadata state.Metadata) error {
return nil
}

func (store *inMemoryStateStore) Ping() error {
return nil
}

func (store *inMemoryStateStore) Features() []state.Feature {
return []state.Feature{state.FeatureETag, state.FeatureTransactional}
}

func (store *inMemoryStateStore) Delete(req *state.DeleteRequest) error {
store.lock.Lock()
defer store.lock.Unlock()
delete(store.items, req.Key)

return nil
}

func (store *inMemoryStateStore) BulkDelete(req []state.DeleteRequest) error {
if req == nil || len(req) == 0 {
return nil
}
for _, dr := range req {
err := store.Delete(&dr)
if err != nil {
return err
}
}
return nil
}

func (store *inMemoryStateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
store.lock.RLock()
defer store.lock.RUnlock()
item := store.items[req.Key]

if item == nil {
return &state.GetResponse{Data: nil, ETag: nil}, nil
}

return &state.GetResponse{Data: item.data, ETag: item.etag}, nil
}

func (store *inMemoryStateStore) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
res := []state.BulkGetResponse{}
for _, oneRequest := range req {
oneResponse, err := store.Get(&state.GetRequest{
Key: oneRequest.Key,
Metadata: oneRequest.Metadata,
Options: oneRequest.Options,
})
if err != nil {
return false, nil, err
}

res = append(res, state.BulkGetResponse{
Key: oneRequest.Key,
Data: oneResponse.Data,
ETag: oneResponse.ETag,
})
}

return true, res, nil
}

func (store *inMemoryStateStore) Set(req *state.SetRequest) error {
b, _ := Marshal(req.Value, json.Marshal)
store.lock.Lock()
defer store.lock.Unlock()
store.items[req.Key] = store.newItem(b, req.ETag)

return nil
}

func (store *inMemoryStateStore) BulkSet(req []state.SetRequest) error {
for _, r := range req {
err := store.Set(&r)
if err != nil {
return err
}
}
return nil
}

func (store *inMemoryStateStore) Multi(request *state.TransactionalStateRequest) error {
store.lock.Lock()
defer store.lock.Unlock()
// First we check all eTags
for _, o := range request.Operations {
var eTag *string
key := ""
if o.Operation == state.Upsert {
key = o.Request.(state.SetRequest).Key
eTag = o.Request.(state.SetRequest).ETag
} else if o.Operation == state.Delete {
key = o.Request.(state.DeleteRequest).Key
eTag = o.Request.(state.DeleteRequest).ETag
}
item := store.items[key]
if eTag != nil && item != nil {
if *eTag != *item.etag {
return fmt.Errorf("etag does not match for key %v", key)
}
}
if eTag != nil && item == nil {
return fmt.Errorf("etag does not match for key not found %v", key)
}
}

// Now we can perform the operation.
for _, o := range request.Operations {
if o.Operation == state.Upsert {
req := o.Request.(state.SetRequest)
b, _ := Marshal(req.Value, json.Marshal)
store.items[req.Key] = store.newItem(b, req.ETag)
} else if o.Operation == state.Delete {
req := o.Request.(state.DeleteRequest)
delete(store.items, req.Key)
}
}

return nil
}

func Marshal(val interface{}, marshaler func(interface{}) ([]byte, error)) ([]byte, error) {
var err error = nil
bt, ok := val.([]byte)
if !ok {
bt, err = marshaler(val)
}

return bt, err
}