Skip to content

Commit

Permalink
change event address to endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
shaowenchen committed Nov 22, 2024
1 parent c0a4134 commit 4e88b29
Show file tree
Hide file tree
Showing 24 changed files with 146 additions and 141 deletions.
2 changes: 1 addition & 1 deletion charts/ops/templates/deployment-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ spec:
env:
- name: EVENT_CLUSTER
value: "default"
- name: EVENT_ADDRESS
- name: EVENT_ENDPOINT
value: http://app:[email protected]:4222
{{- with .Values.nodeSelector }}
nodeSelector:
Expand Down
2 changes: 1 addition & 1 deletion charts/ops/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ spec:
value: registry.cn-hangzhou.aliyuncs.com/opshub/ubuntu:22.04
- name: EVENT_CLUSTER
value: "default"
- name: EVENT_ADDRESS
- name: EVENT_ENDPOINT
value: http://app:[email protected]:4222
{{- with .Values.nodeSelector }}
nodeSelector:
Expand Down
4 changes: 2 additions & 2 deletions controllers/taskrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (r *TaskRunReconciler) runTaskOnHost(logger *opslog.Logger, ctx context.Con
vars["TASK"] = t.Name
vars["TASKRUN"] = tr.Name
vars["HOSTNAME"] = h.GetHostname()
vars["EVENT_ADDRESS"] = opsconstants.GetEnvEventAddress()
vars["EVENT_ENDPOINT"] = opsconstants.GetEnvEventEndpoint()
vars["EVENT_CLUSTER"] = opsconstants.GetEnvEventCluster()

// insert host labels
Expand Down Expand Up @@ -314,7 +314,7 @@ func (r *TaskRunReconciler) runTaskOnKube(logger *opslog.Logger, ctx context.Con
vars := tr.Spec.Variables
vars["HOSTNAME"] = node.Name
vars["NAEMSPACE"] = tr.Namespace
vars["EVENT_ADDRESS"] = opsconstants.GetEnvEventAddress()
vars["EVENT_ENDPOINT"] = opsconstants.GetEnvEventEndpoint()
vars["TASK"] = t.Name
vars["TASKRUN"] = tr.Name
opstask.RunTaskOnKube(logger, t, tr, kc, &node,
Expand Down
2 changes: 1 addition & 1 deletion default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ opsserver="http://myops-server.ops-system.svc"
opstoken="ops"
[event]
cluster=default
address=http://app:[email protected]:4222
endpoint=http://app:[email protected]:4222
6 changes: 3 additions & 3 deletions pkg/constants/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const (
EnvActiveNamespaceKey = "ACTIVE_NAMESPACE"
EnvDefaultRuntimeImage = "DEFAULT_RUNTIME_IMAGE"
EnvEventClusterKey = "EVENT_CLUSTER"
EnvEventAddressKey = "EVENT_ADDRESS"
EnvEventEndpointKey = "EVENT_ENDPOINT"
)

// just for controller
Expand All @@ -27,8 +27,8 @@ func GetEnvEventCluster() string {
return os.Getenv(EnvEventClusterKey)
}

func GetEnvEventAddress() string {
return os.Getenv(EnvEventAddressKey)
func GetEnvEventEndpoint() string {
return os.Getenv(EnvEventEndpointKey)
}

func GetEnvDefaultRuntimeImage() string {
Expand Down
12 changes: 6 additions & 6 deletions pkg/event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ type ProducerConsumerClient struct {
Consumer *cloudevents.Client
}

func (globalClient *GlobalEventBusClients) GetClient(server string, subject string) (*ProducerConsumerClient, error) {
func (globalClient *GlobalEventBusClients) GetClient(endpoint string, subject string) (*ProducerConsumerClient, error) {
// get from cache
key := fmt.Sprintf("%s-%s", server, subject)
key := fmt.Sprintf("%s-%s", endpoint, subject)
globalClient.Mutex.RLock()
clientP, ok := globalClient.Clients[key]
globalClient.Mutex.RUnlock()
if !ok {
// build producer
producerP, err := cenats.NewSender(server, subject, cenats.NatsOptions())
producerP, err := cenats.NewSender(endpoint, subject, cenats.NatsOptions())
if err != nil {
return nil, err
}
Expand All @@ -39,7 +39,7 @@ func (globalClient *GlobalEventBusClients) GetClient(server string, subject stri
return nil, err
}
// build consumer
consumerP, err := cenats.NewConsumer(server, subject, cenats.NatsOptions())
consumerP, err := cenats.NewConsumer(endpoint, subject, cenats.NatsOptions())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,11 +69,11 @@ type EventBus struct {
Subject string
}

func (bus *EventBus) WithServer(server string) *EventBus {
func (bus *EventBus) WithEndpoint(endpoint string) *EventBus {
if bus == nil {
bus = &EventBus{}
}
bus.Server = server
bus.Server = endpoint
return bus
}

Expand Down
30 changes: 15 additions & 15 deletions pkg/event/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,87 +7,87 @@ import (
)

// for controller
var server = opsconstants.GetEnvEventAddress()
var endpoint = opsconstants.GetEnvEventEndpoint()
var cluster = opsconstants.GetEnvEventCluster()

func FactoryController(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectController)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func FactoryCluster(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectCluster)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func FactoryHost(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectHost)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func FactoryTask(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectTask)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func FactoryTaskRun(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectTaskRun)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func FactoryPipeline(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectPipeline)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func FactoryPipelineRun(namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectPipelineRun)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

// for server
func FactoryWebhook(server, cluster, namespace string, subs ...string) *EventBus {
// for endpoint
func FactoryWebhook(endpoint, cluster, namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectWebhook)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

func Factory(server, cluster, namespace string, subs ...string) *EventBus {
func Factory(endpoint, cluster, namespace string, subs ...string) *EventBus {
subject := opsconstants.GetClusterSubject(cluster, namespace, opsconstants.SubjectWebhook)
if len(subs) > 0 {
subject = subject + "." + strings.Join(subs, ".")
}
return (&EventBus{}).WithServer(server).WithSubject(subject)
return (&EventBus{}).WithEndpoint(endpoint).WithSubject(subject)
}

var jetCache = make(map[string]*nats.JetStreamContext)

func FactoryJetStreamClient(server, cluster string) (*nats.JetStreamContext, error) {
func FactoryJetStreamClient(endpoint, cluster string) (*nats.JetStreamContext, error) {
if _, ok := jetCache[cluster]; !ok {
nc, err := nats.Connect(server)
nc, err := nats.Connect(endpoint)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ func CreateEvent(c *gin.Context) {
showError(c, err.Error())
return
}
go opsevent.Factory(GlobalConfig.Event.ADDRESS, GlobalConfig.Event.Cluster, req.Namespace, req.Event).WithSubject(req.Event).Publish(context.TODO(), body)
go opsevent.Factory(GlobalConfig.Event.Endpoint, GlobalConfig.Event.Cluster, req.Namespace, req.Event).WithSubject(req.Event).Publish(context.TODO(), body)
showSuccess(c)
}

Expand Down Expand Up @@ -1207,7 +1207,7 @@ func ListEvents(c *gin.Context) {
if err != nil {
startTime = time.Now().Add(-time.Hour * 1)
}
client, err := opsevent.FactoryJetStreamClient(GlobalConfig.Event.ADDRESS, GlobalConfig.Event.Cluster)
client, err := opsevent.FactoryJetStreamClient(GlobalConfig.Event.Endpoint, GlobalConfig.Event.Cluster)
if err != nil {
showError(c, err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type ServerOptions struct {
}

type EventOption struct {
ADDRESS string
Cluster string
Endpoint string
Cluster string
}

func LoadConfig(configPath string) {
Expand Down
74 changes: 38 additions & 36 deletions swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,6 @@ const docTemplate = `{
"description": "OK"
}
}
},
"post": {
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"Event"
],
"summary": "Create Event",
"parameters": [
{
"type": "string",
"description": "event",
"name": "event",
"in": "path",
"required": true
},
{
"description": "Event payload",
"name": "body",
"in": "body",
"required": true,
"schema": {
"type": "object",
"additionalProperties": true
}
}
],
"responses": {
"200": {
"description": "OK"
}
}
}
},
"/api/v1/login/check": {
Expand Down Expand Up @@ -208,6 +172,44 @@ const docTemplate = `{
}
}
},
"/api/v1/namespaces/{namespace}/events/{event}": {
"post": {
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"Event"
],
"summary": "Create Event",
"parameters": [
{
"type": "string",
"description": "event",
"name": "event",
"in": "path",
"required": true
},
{
"description": "Event payload",
"name": "body",
"in": "body",
"required": true,
"schema": {
"type": "object",
"additionalProperties": true
}
}
],
"responses": {
"200": {
"description": "OK"
}
}
}
},
"/api/v1/namespaces/{namespace}/hosts": {
"get": {
"consumes": [
Expand Down
Loading

0 comments on commit 4e88b29

Please sign in to comment.