From 7fa1ab2c1c83dfd11804bfc336ed17916d36bd84 Mon Sep 17 00:00:00 2001 From: John-Memphis <136013599+John-Memphis@users.noreply.github.com> Date: Thu, 9 Nov 2023 03:38:10 -0600 Subject: [PATCH] Chore: Updated the go readme. (#129) * "Go readme done. Comments that need addressing are marked with TO:DO" * "Synced with master. Added the changes that I made in the python repo to this one as well" --- README.md | 468 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 410 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index 1d02a93..8da9e18 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,10 @@ c, err := memphis.Connect("", memphis.Password("")) // depends on how Memphis deployed - default is connection token-based authentication ```
-It is possible to pass connection configuration parameters, as function-parameters. + +The connect function allows for the connection to Memphis. Connecting to Memphis (cloud or open-source) will be needed in order to use any of the other functionality of the Memphis class. Upon connection, all of Memphis' features are available. + +Configuring the connection to Memphis in the Go SDK can be done by passing in the different configuration functions to the Connect function. ```go // function params @@ -60,18 +63,66 @@ c, err := memphis.Connect("", "", memphis.ConnectionToken(""), // you will get it on application type user creation memphis.Password(""), // depends on how Memphis deployed - default is connection token-based authentication - memphis.AccountId() // You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored - memphis.Port(), // defaults to 6666 + memphis.AccountId() // You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored + memphis.Port(), // defaults to 6666 memphis.Reconnect(), // defaults to true memphis.MaxReconnect(), // defaults to 10 - memphis.ReconnectInterval() // defaults to 1 second - memphis.Timeout() // defaults to 15 seconds + memphis.ReconnectInterval() // defaults to 1 second + memphis.Timeout() // defaults to 15 seconds // for TLS connection: memphis.Tls("", "", ""), ) ``` -Once connected, all features offered by Memphis are available.
+Here is an example of connecting to Memphis using a password (using the default user:root password:memphis login with Memphis open-source): + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) +``` + +Connecting to Memphis cloud will require the account id and broker hostname. You may find these on the Overview page of the Memphis cloud UI at the top of the page. Here is an example of connecting to a cloud broker that is located in US East: + +```go +conn, err := memphis.Connect("aws-us-east-1.cloud.memphis.dev", "my_client_username", memphis.Password("memphis"), memphis.AccountId(123456789)) +``` + +It is possible to use a token-based connection to memphis as well, where multiple users can share the same token to connect to memphis. Here is an example of using memphis.connect with a token: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("memphis")) +``` + +The token will be made available when creating new users. + +Memphis open-source needs to be configured to use token based connection. See the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security) for help doing this. + +To use a TLS based connection, the TLS function will need to be invoked: + +```go +func Tls(TlsCert string, TlsKey string, CaFile string) Option { + return func(o *Options) error { + o.TLSOpts = TLSOpts{ + TlsCert: TlsCert, + TlsKey: TlsKey, + CaFile: CaFile, + } + return nil + } +} +``` + +Using this to connect to Memphis looks like this: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Tls( + "~/tls_file_path.key", + "~/tls_cert_file_path.crt", + "~/tls_cert_file_path.crt", +)) +``` + +To configure memphis to use TLS see the [docs](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/production-best-practices#memphis-metadata-tls-connection-configuration). + ### Disconnecting from Memphis To disconnect from Memphis, call Close() on the Memphis connection object.
@@ -81,10 +132,13 @@ c.Close(); ``` ### Creating a Station -**Unexist stations will be created automatically through the SDK on the first producer/consumer connection with default values.**

-Stations can be created from Conn
+ +Stations are distributed units that store messages. Producers add messages to stations and Consumers take messages from them. Each station stores messages until their retention policy causes them to either delete the messages or move them to [remote storage](https://docs.memphis.dev/memphis/integrations-center/storage/s3-compatible). + +**A station will be automatically created for the user when a consumer or producer is used if no stations with the given station name exist.**

+Stations can be created from a memphis connection (Conn)
Passing optional parameters using functions
-_If a station already exists nothing happens, the new configuration will not be applied_
+_If the station trying to be created exists when this function is called, nothing will change with the exisitng station_ ```go s0, err = c.CreateStation("") @@ -104,42 +158,191 @@ s1, err = c.CreateStation("", ) ``` +The CreateStation function is used to create a station. Using the different arguemnts, one can programically create many different types of stations. The Memphis UI can also be used to create stations to the same effect. + +A minimal example, using all default values would simply create a station with the given name: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation("myStation") +``` + +To change what criteria the station uses to decide if a message should be retained in the station, change the retention type. The different types of retention are documented [here](https://github.com/memphisdev/memphis.go#retention-types) in the go README. + +The unit of the rentention value will vary depending on the RetentionType. The [previous link](https://github.com/memphisdev/memphis.go#retention-types) also describes what units will be used. + +Here is an example of a station which will only hold up to 10 messages: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.RetentionTypeOpt(memphis.Messages), + memphis.RetentionVal(10) + ) +``` + +Memphis stations can either store Messages on disk or in memory. A comparison of those types of storage can be found [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#tier-1-local-storage). + +Here is an example of how to create a station that uses Memory as its storage type: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.StorageTypeOpt(memphis.Memory) + ) +``` + +In order to make a station more redundant, replicas can be used. Read more about replicas [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#replicas-mirroring). Note that replicas are only available in cluster mode. Cluster mode can be enabled in the [Helm settings](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/1-installation#appendix-b-helm-deployment-options) when deploying Memphis with Kubernetes. + +Here is an example of creating a station with 3 replicas: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.Replicas(3) + ) +``` + +Idempotency defines how Memphis will prevent duplicate messages from being stored or consumed. The duration of time the message ID's will be stored in the station can be set with the IdempotencyWindow StationOpt. If the environment Memphis is deployed in has unreliably connection and/or a lot of latency, increasing this value might be desiriable. The default duration of time is set to two minutes. Read more about idempotency [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/idempotency). + +Here is an example of changing the idempotency window to 3 seconds: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.IdempotencyWindow(3 * time.Minute) + ) +``` + +The SchemaName is used to set a schema to be enforced by the station. The default value ensures that no schema is enforced. Here is an example of changing the schema to a defined schema in schemaverse called "sensorLogs": + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.SchemaName("sensorLogs") + ) +``` + +There are two parameters for sending messages to the [dead-letter station(DLS)](https://docs.memphis.dev/memphis/memphis-broker/concepts/dead-letter#terminology). Use the functions SendPoisonMsgToDls and SendSchemaFailedMsgToDls to se these parameters. + +Here is an example of sending poison messages to the DLS but not messages which fail to conform to the given schema. + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.SchemaName("SensorLogs"), + memphis.SendPoisonMsgToDls(true), + memphis.SendSchemaFailedMsgToDls(false) + ) +``` + +When either of the DLS flags are set to True, a station can also be set to handle these events. To set a station as the station to where schema failed or poison messages will be set to, use the DlsStation StationOpt: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.SchemaName("SensorLogs"), + memphis.SendPoisonMsgToDls(true), + memphis.SendSchemaFailedMsgToDls(false), + memphis.DlsStation("badSensorMessagesStation") + ) +``` + +When the retention value is met, Mempihs by default will delete old messages. If tiered storage is setup, Memphis can instead move messages to tier 2 storage. Read more about tiered storage [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#storage-tiering). Enable this setting with the respective StationOpt: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.TieredStorageEnabled(true) + ) +``` + +[Partitioning](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) might be useful for a station. To have a station partitioned, simply set the PartitionNumber StationOpt: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +station, err := conn.CreateStation( + "myStation", + memphis.PartitionsNumber(3) + ) +``` + + ### Retention Types -Memphis currently supports the following types of retention:
+Retention types define the methodology behind how a station behaves with its messages. Memphis currently supports the following retention types: ```go memphis.MaxMessageAgeSeconds ``` -The above means that every message persists for the value set in the retention value field (in seconds). +When the retention type is set to MAX_MESSAGE_AGE_SECONDS, messages will persist in the station for the number of seconds specified in the retention_value. ```go memphis.Messages ``` -The above means that after the maximum number of saved messages (set in retention value)
has been reached, the oldest messages will be deleted. +When the retention type is set to MESSAGES, the station will only hold up to retention_value messages. The station will delete the oldest messsages to maintain a retention_value number of messages. ```go memphis.Bytes ``` -The above means that after maximum number of saved bytes (set in retention value)
has been reached, the oldest messages will be deleted. +When the retention type is set to BYTES, the station will only hold up to retention_value BYTES. The oldest messages will be deleted in order to maintain at maximum retention_vlaue BYTES in the station. ```go memphis.AckBased // for cloud users only ``` -The above means that after a message is getting acked by all interested consumer groups it will be deleted from the Station. +When the retention type is set to ACK_BASED, messages in the station will be deleted after they are acked by all subscribed consumer groups. ### Retention Values -The `retention values` are directly related to the `retention types` mentioned above,
where the values vary according to the type of retention chosen. - -All retention values are of type `int` but with different representations as follows: +The unit of the `retention value` changes depending on the `retention type` specified. -`memphis.MaxMessageAgeSeconds` is represented **in seconds**, `memphis.Messages` in a **number of messages**
`memphis.Bytes` in a **number of bytes**, and finally `memphis.AckBased` is not using the retentionValue param at all. +All retention values are of type `int`. The following units are used based on the respective retention type: -After these limits are reached oldest messages will be deleted. +`memphis.MaxMessageAgeSeconds` is represented **in seconds**,
+`memphis.Messages` is a **number of messages**
+`memphis.Bytes` is a **number of bytes**,
+With `memphis.AckBased`, the `retentionValue` is ignored. ### Storage Types Memphis currently supports the following types of messages storage:
@@ -148,13 +351,13 @@ Memphis currently supports the following types of messages storage:
memphis.Disk ``` -The above means that messages persist on disk. +When storage is set to DISK, messages are stored on disk. ```go memphis.Memory ``` -The above means that messages persist on the main memory.
+When storage is set to MEMORY, messages are stored in the system memory (RAM).
### Destroying a Station Destroying a station will remove all its resources (including producers and consumers).
@@ -188,10 +391,19 @@ err := conn.DetachSchema("") ``` ### Produce and Consume Messages -The most common client operations are producing messages and consuming messages.

-Messages are published to a station and consumed from it
by creating a consumer and calling its Consume function with a message handler callback function.
Consumers are pull-based and consume all the messages in a station
unless you are using a consumers group,
in which case messages are spread across all members in this group.

-Memphis messages are payload agnostic. Payloads are byte slices, i.e []byte.

-In order to stop receiving messages, you have to call ```consumer.StopConsume()```.
The consumer will terminate regardless of whether there are messages in flight for the client. +The most common client operations are producing messages and consuming messages. + +Messages are published to a station with a Producer and consumed from it by a Consumer by creating a consumer and calling its Consume function with a message handler callback function. + +Alternatively, consumers may call the Fetch function to only consume a specific number of messages. + +Consumers are pull-based and consume all the messages in a station unless you are using a consumers group, in which case messages are spread across all members in this group. + +Memphis messages are payload agnostic. Payloads are byte slices, i.e []byte. + +In order to stop receiving messages, you have to call ```consumer.StopConsume()```. + +The consumer will terminate even if there are messages currently being sent to the consumer. ### Creating a Producer @@ -207,14 +419,17 @@ p1, err := s.CreateProducer("") ``` ### Producing a message -Without creating a producer (receiver function of the connection struct). -In cases where extra performance is needed the recommended way is to create a producer first -and produce messages by using the produce receiver function of it + +Both producers and connections can use the produce function. To produce a message from a connection, simply call `connection.Produce`. This function will create a producer if none with the given name exists, otherwise it will pull the producer from a cache and use it to produce the message. + +Here is an example of producing from a connection: (receiver function of the connection struct). ```go c.Produce("station_name_c_produce", "producer_name_a", []byte("Hey There!"), []memphis.ProducerOpt{}, []memphis.ProduceOpt{}) ``` -Creating a producer first (receiver function of the producer struct). +Here is an example of producing from a producer (p) (receiver function of the producer struct). + +Creating a producer and calling produce on it will increase the performance of producing messages as it reduces the latency of having to get a producer from the cache. ```go p.Produce("", memphis.AckWaitSec(15)) // defaults to 15 seconds ``` @@ -241,60 +456,121 @@ myData := map[string]interface{}{ Note: When producing to a station with more than one partition, the producer will produce messages in a Round Robin fashion between the different partitions. -### Add headers +For message data formats see [here](https://docs.memphis.dev/memphis/memphis-schemaverse/formats/produce-consume). + +Here is an example of a produce function call that waits up to 30 seconds for an acknowledgement from memphis: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +producer, err := conn.CreateProducer( + "StationToProduceFor", + "MyNewProducer", +) + +// Handle err + +err = producer.Produce( + []byte("My Message :)"), + memphis.AckWaitSec(30), +) + +// Handle err +``` + +As discussed before in the station section, idempotency is an important feature of memphis. To achieve idempotency, an id must be assigned to messages that are being produced. Use the MsgId ProducerOpt for this purpose. + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +producer, err := conn.CreateProducer( + "StationToProduceFor", + "MyNewProducer", + // MsgID not supported yet... +) + +// Handle err + +err = producer.Produce( + []byte("My Message :)"), +) + +// Handle err +``` + +To add message headers to the message, use the headers parameter. Headers can help with observability when using certain 3rd party to help monitor the behavior of memphis. See [here](https://docs.memphis.dev/memphis/memphis-broker/comparisons/aws-sqs-vs-memphis#observability) for more details. ```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +producer, err := conn.CreateProducer( + "StationToProduceFor", + "MyNewProducer", +) + +// Handle err + hdrs := memphis.Headers{} hdrs.New() err := hdrs.Add("key", "value") -p.Produce( - "", - memphis.AckWaitSec(15), - memphis.MsgHeaders(hdrs) // defaults to empty + +// Handle err + +err = producer.Produce( + []byte("My Message :)"), + memphis.MsgHeaders(hdrs), ) + +// Handle err ``` -### Async produce -For better performance. The client won't block requests while waiting for an acknowledgment. +Lastly, memphis can produce to a specific partition in a station. To do so, use the ProducerPartitionKey ProducerOpt: ```go -p.Produce( - "", - memphis.AckWaitSec(15), - memphis.AsyncProduce() +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +producer, err := conn.CreateProducer( + "StationToProduceFor", + "MyNewProducer", ) -``` -### Sync produce -For better reliability. The client will block requests and will wait for an acknowledgment. +// Handle err -```go -p.Produce( - "", - memphis.AckWaitSec(15), - memphis.SyncProduce() +err = producer.Produce( + []byte("My Message :)"), + memphis.ProducerPartitionKey("2ndPartition"), ) + +// Handle err ``` -### Message ID -Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id +### Async produce +For better performance. The client won't wait while waiting for an acknowledgment before sending more messages. ```go p.Produce( "", memphis.AckWaitSec(15), - memphis.MsgId("343") + memphis.AsyncProduce() ) ``` - -### Produce using partition key -The partition key will be used to produce messages to a spacific partition. +### Sync produce +For better reliability. The client will wait for an acknowledgement from the broker before sneding another message. ```go p.Produce( "", - memphis.ProducerPartitionKey() + memphis.AckWaitSec(15), + memphis.SyncProduce() ) ``` @@ -329,13 +605,89 @@ consumer0, err = s.CreateConsumer("", memphis.StartConsumeFromSeq()// start consuming from a specific sequence. defaults to 1 memphis.LastMessages()// consume the last N messages, defaults to -1 (all messages in the station) ) - + // creation from a Conn consumer1, err = c.CreateConsumer("", "", ...) ``` + +Consumers are used to pull messages from a station. Here is how to create a consumer with all of the default parameters: + + Note: When consuming from a station with more than one partition, the consumer will consume messages in Round Robin fashion from the different partitions. +To create a consumer in a consumer group, add the ConsumerGroup parameter: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +consumer, err := conn.CreateConsumer( + "MyStation", + "MyNewConsumer", + memphis.ConsumerGroup("ConsumerGroup1"), +) + +// Handle err +``` + +When using the Consume function from a consumer, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls, change the PullInterval consumer option: + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +consumer, err := conn.CreateConsumer( + "MyStation", + "MyNewConsumer", + memphis.PullInterval(2 * time.Second), +) + +// Handle err +``` + +Every time the consumer pulls from the station, the consumer will try to take BatchSize number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either BatchSize messages are gathered or the time in milliseconds specified by BatchMaxWaitTime is reached. + +Here is an example of a consumer that will try to pull 100 messages every 10 seconds while waiting up to 15 seconds for all messages to reach the consumer. + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +consumer, err := conn.CreateConsumer( + "MyStation", + "MyNewConsumer", + memphis.PullInterval(10 * time.Second), + memphis.BatchSize(100), + memphis.BatchMaxWaitTime(15 * time.Second). +) + +// Handle err +``` + +The MaxMsgDeliveries ConsumerOpt allows the user to set how many messages the consumer is able to consume (without acknowledging) before consuming more. + +```go +conn, err := memphis.Connect("localhost", "root", memphis.Password("memphis")) + +// Handle err + +consumer, err := conn.CreateConsumer( + "MyStation", + "MyNewConsumer", + memphis.PullInterval(10 * time.Second), + memphis.BatchSize(100), + memphis.BatchMaxWaitTime(15 * time.Second), + memphis.MaxMsgDeliveries(100), +) + +// Handle err +``` + + ### Passing a context to a message handler ```go @@ -384,7 +736,7 @@ func handler(msgs []*memphis.Msg, err error, ctx context.Context) { } ``` -if you have ingested data into station in one format, afterwards you apply a schema on the station, the consumer won't deserialize the previously ingested data. For example, you have ingested string into the station and attached a protobuf schema on the station. In this case, consumer won't deserialize the string. +There may be some instances where you apply a schema *after* a station has received some messages. In order to consume those messages get_data_deserialized may be used to consume the messages without trying to apply the schema to them. As an example, if you produced a string to a station and then attached a protobuf schema, using get_data_deserialized will not try to deserialize the string as a protobuf-formatted message. ### Fetch a single batch of messages ```go @@ -402,7 +754,7 @@ msgs, err := conn.FetchMessages("", "", ### Fetch a single batch of messages after creating a consumer `prefetch = true` will prefetch next batch of messages and save it in memory for future Fetch() request
-Note: Use a higher MaxAckTime as the messages will sit in a local cache for some time before processing +Note: Use a higher MaxAckTime as the messages will sit in a local cache for some time before being processed and Ack'd. ```go msgs, err := consumer.Fetch( int, bool,