Skip to content

Commit

Permalink
Flush state to json
Browse files Browse the repository at this point in the history
  • Loading branch information
ekuzmichev committed Aug 7, 2024
1 parent 3397763 commit 0466df5
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 18 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ lazy val root = (project in file("."))
"org.typelevel" %% "cats-core" % "2.12.0",
"io.lemonlabs" %% "scala-uri" % "4.0.3",
"org.jasypt" % "jasypt" % "1.9.3",
"com.github.pathikrit" %% "better-files" % "3.9.2",
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"dev.zio" %% "zio-test" % "2.1.6" % Test,
"com.stephenn" %% "scalatest-circe" % "0.2.5" % Test
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/app/AppLayers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import encryption.EncDecLayers
import product.{ProductFetcherLayers, ProductIdParserLayers}
import scalascraper.BrowserLayers
import schedule.{JobIdGeneratorLayers, ZioSchedulerLayers}
import store.ProductStoreLayers
import store.{CacheStateRepository, CacheStateRepositoryLayers, ProductStore, ProductStoreLayers}
import telegram.TelegramClientLayers
import util.lang.Throwables.failure

import org.telegram.telegrambots.longpolling.interfaces.LongPollingUpdateConsumer
import zio.{RLayer, ZIO, ZIOAppArgs, ZLayer}

object AppLayers:
private type ROut = AppConfig & LongPollingUpdateConsumer & ConsumerRegisterer & ProductWatchingJobScheduler
private type ROut = AppConfig & LongPollingUpdateConsumer & ConsumerRegisterer & ProductWatchingJobScheduler &
ProductStore & CacheStateRepository

val ozonPriceCheckerAppLayer: RLayer[ZIOAppArgs, ROut] =
ZLayer
Expand All @@ -33,14 +34,15 @@ object AppLayers:
ConsumerRegistererLayers.impl,
UpdateConsumerLayers.ozonPriceChecker,
TelegramClientLayers.okHttp,
ProductStoreLayers.inMemory,
ProductStoreLayers.cachedOverInMemory,
ProductFetcherLayers.ozon,
ZioSchedulerLayers.impl,
BrowserLayers.jsoup,
JobIdGeneratorLayers.alphaNumeric,
ProductIdParserLayers.ozon,
CommandProcessorLayers.ozonPriceChecker,
EncDecLayers.aes256(encryptionPasswordEnv.get),
ProductWatchingJobSchedulerLayers.impl
ProductWatchingJobSchedulerLayers.impl,
CacheStateRepositoryLayers.file
)
}
23 changes: 21 additions & 2 deletions src/main/scala/app/OzonPriceCheckerApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app

import config.AppConfig
import consumer.*
import store.ProductStore.{SourceId, SourceState}
import store.{CacheState, CacheStateRepository, ProductStore}
import telegram.BotsApplicationScopes
import util.lang.Throwables.makeCauseSeqMessage
import util.zio.ZioClock.getCurrentDateTime
Expand All @@ -20,8 +22,11 @@ object OzonPriceCheckerApp extends ZIOAppDefault:
.catchAll(t => ZIO.fail(s"Got error while running ${this.getClass.getSimpleName}: $makeCauseSeqMessage(t)"))
.provideLayer(AppLayers.ozonPriceCheckerAppLayer)

private def runBot()
: RIO[LongPollingUpdateConsumer with ConsumerRegisterer with AppConfig with ProductWatchingJobScheduler, Unit] =
private def runBot(): RIO[
LongPollingUpdateConsumer & ConsumerRegisterer & AppConfig & ProductWatchingJobScheduler & ProductStore &
CacheStateRepository,
Unit
] =
ZIO.scoped {
for {
startDateTime <- getCurrentDateTime
Expand All @@ -31,11 +36,25 @@ object OzonPriceCheckerApp extends ZIOAppDefault:
consumerRegisterer <- ZIO.service[ConsumerRegisterer]
appConfig <- ZIO.service[AppConfig]
productWatchingJobScheduler <- ZIO.service[ProductWatchingJobScheduler]
productStore <- ZIO.service[ProductStore]
cacheStateRepository <- ZIO.service[CacheStateRepository]

_ <- consumerRegisterer.registerConsumer(botsApplication, longPollingUpdateConsumer, appConfig.botToken.value)

_ <- productWatchingJobScheduler.scheduleProductsWatching()

cacheState <- cacheStateRepository.read()
_ <- ZIO.when(cacheState.entries.nonEmpty)(
productStore
.preInitialize(toSourceStatesBySourceId(cacheState))
.zipLeft(ZIO.log(s"Pre-initialized product store with $cacheState"))
)

_ <- Schedulers.scheduleBotStatusLogging(startDateTime, appConfig.logBotStatusInterval)
} yield ()
}

private def toSourceStatesBySourceId(cacheState: CacheState): Map[SourceId, SourceState] =
cacheState.entries
.map(entry => SourceId(entry.userName, entry.chatId) -> SourceState(products = entry.products, None))
.toMap
1 change: 1 addition & 0 deletions src/main/scala/bot/OzonPriceCheckerBotCommands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ object OzonPriceCheckerBotCommands:
val Stop = "/stop"
val WatchNewProduct = "/watchnewproduct"
val UnwatchAllProducts = "/unwatchallproducts"
val ShowAllProducts = "/showallproducts"
8 changes: 6 additions & 2 deletions src/main/scala/config/ConfigTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@ import scala.concurrent.duration.Duration
case class Sensitive(value: String):
override def toString: String = s"***"

case class AppConfig(botToken: Sensitive, priceCheckingCron: String, logBotStatusInterval: Duration)
extends NamedToString
case class AppConfig(
botToken: Sensitive,
priceCheckingCron: String,
logBotStatusInterval: Duration,
cacheStateFilePath: String
) extends NamedToString
45 changes: 40 additions & 5 deletions src/main/scala/consumer/OzonPriceCheckerCommandProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import util.zio.ZioLoggingImplicits.Ops
import org.telegram.telegrambots.meta.generics.TelegramClient
import zio.{Task, ZIO}

class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClient: TelegramClient) extends CommandProcessor:
class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClient: TelegramClient)
extends CommandProcessor:
private implicit val _telegramClient: TelegramClient = telegramClient

def processCommand(sourceId: SourceId, text: String): Task[Unit] =
Expand All @@ -25,6 +26,8 @@ class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClien
processWatchNewProductCommand(sourceId)
else if (text == OzonPriceCheckerBotCommands.UnwatchAllProducts)
processUnwatchAllProductsCommand(sourceId)
else if (text == OzonPriceCheckerBotCommands.ShowAllProducts)
processShowAllProductsCommand(sourceId)
else
ZIO.log(s"Got unknown command $text") *>
sendTextMessage(sourceId.chatId, s"I can not process command $text. Please send me a command known to me.")
Expand Down Expand Up @@ -79,10 +82,7 @@ class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClien
sendTextMessage(sourceId.chatId, "Send me OZON URL or product ID.") *>
productStore.updateProductCandidate(sourceId, WaitingProductId)
else
sendTextMessage(
sourceId.chatId,
s"You have not been yet initialized. Send me ${OzonPriceCheckerBotCommands.Start} command to fix it."
)
askToSendStartCommand(sourceId)
} yield ())
.logged(s"process command ${OzonPriceCheckerBotCommands.WatchNewProduct}")

Expand All @@ -91,3 +91,38 @@ class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClien
.emptyState(sourceId)
.zipRight(sendTextMessage(sourceId.chatId, "I have removed all watched products."))
.logged(s"process command ${OzonPriceCheckerBotCommands.UnwatchAllProducts}")

private def processShowAllProductsCommand(sourceId: SourceId): Task[Unit] =
productStore
.readSourceState(sourceId)
.tap {
case Some(sourceState) =>
sourceState.products match
case Nil =>
sendTextMessage(
sourceId.chatId,
s"You have no watched products.\n\n" +
s"To watch new product send me ${OzonPriceCheckerBotCommands.WatchNewProduct}"
)
case products =>
sendTextMessage(
sourceId.chatId,
s"Here are your watched products:\n\n" +
s"${products.zipWithIndex
.map { case (product, index) =>
s"${index + 1}) ${product.id}\n\t" +
s"Price threshold: ${product.priceThreshold}"
}
.mkString("\n")}"
)
case None =>
askToSendStartCommand(sourceId)
}
.logged(s"process command ${OzonPriceCheckerBotCommands.ShowAllProducts}")
.unit

private def askToSendStartCommand(sourceId: SourceId) =
sendTextMessage(
sourceId.chatId,
s"You have not been yet initialized. Send me ${OzonPriceCheckerBotCommands.Start} command to fix it."
)
57 changes: 57 additions & 0 deletions src/main/scala/store/CacheProductStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ru.ekuzmichev
package store
import common.ProductId
import store.ProductStore.{Product, ProductCandidate, SourceId, SourceState}

import zio.Task

class CacheProductStore(decoratee: ProductStore, cacheStateRepository: CacheStateRepository) extends ProductStore:
override def preInitialize(sourceStatesBySourceId: Map[SourceId, SourceState]): Task[Unit] =
decoratee.preInitialize(sourceStatesBySourceId)

override def checkInitialized(sourceId: SourceId): Task[Boolean] =
decoratee.checkInitialized(sourceId)

override def checkHasProductId(sourceId: SourceId, productId: ProductId): Task[Boolean] =
decoratee.checkHasProductId(sourceId, productId)

override def emptyState(sourceId: SourceId): Task[Unit] =
decoratee.emptyState(sourceId) <* replaceStateInCache()

private def replaceStateInCache(): Task[Unit] =
readAsCacheState().tap(cacheStateRepository.replace).unit

private def readAsCacheState(): Task[CacheState] =
readAll().map(toCacheState)

private def toCacheState(sourceStatesBySourceId: Map[SourceId, SourceState]): CacheState =
CacheState(
entries = sourceStatesBySourceId.map { case (sourceId, sourceState) =>
CacheStateEntry(
userName = sourceId.userName,
chatId = sourceId.chatId,
products = sourceState.products
)
}.toSeq
)

override def clearState(sourceId: SourceId): Task[Unit] =
decoratee.clearState(sourceId) <* replaceStateInCache()

override def readSourceState(sourceId: SourceId): Task[Option[SourceState]] =
decoratee.readSourceState(sourceId)

override def readAll(): Task[Map[SourceId, SourceState]] =
decoratee.readAll()

override def updateProductCandidate(
sourceId: SourceId,
productCandidate: ProductCandidate
): Task[Boolean] =
decoratee.updateProductCandidate(sourceId, productCandidate)

override def resetProductCandidate(sourceId: SourceId): Task[Boolean] =
decoratee.resetProductCandidate(sourceId)

override def addProduct(sourceId: SourceId, product: Product): Task[Boolean] =
decoratee.addProduct(sourceId, product) <* replaceStateInCache()
7 changes: 5 additions & 2 deletions src/main/scala/store/CacheState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package store

import common.{ChatId, UserName}
import store.ProductStore.Product
import util.lang.NamedToString

import io.circe.Codec
import io.circe.generic.semiauto.deriveCodec

case class CacheStateEntry(userName: UserName, chatId: ChatId, products: Seq[Product])
case class CacheStateEntry(userName: UserName, chatId: ChatId, products: Seq[Product]) extends NamedToString

case class CacheState(entries: Seq[CacheStateEntry])
case class CacheState(entries: Seq[CacheStateEntry]) extends NamedToString

object CacheState:
def empty: CacheState = CacheState(Seq.empty)

implicit val cacheStateCodec: Codec[CacheState] = {
import io.circe.generic.auto.*
deriveCodec[CacheState]
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/store/CacheStateRepository.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.ekuzmichev
package store

import zio.Task

trait CacheStateRepository:
def read(): Task[CacheState]
def replace(cacheState: CacheState): Task[Unit]
18 changes: 18 additions & 0 deletions src/main/scala/store/CacheStateRepositoryLayers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ru.ekuzmichev
package store

import config.AppConfig

import better.files.File as ScalaFile
import zio.{RLayer, ZIO, ZLayer}

object CacheStateRepositoryLayers:
val file: RLayer[AppConfig, FileCacheStateRepository] = ZLayer.fromZIO {
for {
appConfig <- ZIO.service[AppConfig]
cacheStateFilePath = appConfig.cacheStateFilePath
file <- ZIO.attempt(ScalaFile(cacheStateFilePath))
_ <- ZIO.log(s"Recreating file $file if not exists")
_ <- ZIO.attempt(file.createIfNotExists(createParents = true))
} yield new FileCacheStateRepository(cacheStateFilePath)
}
29 changes: 29 additions & 0 deletions src/main/scala/store/FileCacheStateRepository.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ru.ekuzmichev
package store
import util.lang.Throwables.failure

import better.files.{File as ScalaFile, *}
import io.circe.parser.decode
import io.circe.syntax.*
import zio.{Task, ZIO}

class FileCacheStateRepository(filePath: String) extends CacheStateRepository:
override def read(): Task[CacheState] =
ZIO
.attempt(asScalaFile)
.flatMap(file =>
if (file.exists)
ZIO
.attempt(file.contentAsString)
.flatMap(json =>
ZIO
.fromEither(decode[CacheState](json))
.mapError(error => failure(s"Failed to read cache state: $error"))
)
else ZIO.succeed(CacheState.empty)
)

override def replace(cacheState: CacheState): Task[Unit] =
ZIO.attempt(asScalaFile.overwrite(cacheState.asJson.spaces2))

private def asScalaFile: ScalaFile = ScalaFile(filePath)
5 changes: 4 additions & 1 deletion src/main/scala/store/InMemoryProductStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import common.ProductId
import store.InMemoryProductStore.ProductState
import store.ProductStore.{Product, ProductCandidate, SourceId, SourceState}

import zio.{Fiber, Ref, Task, ZIO}
import zio.{Ref, Task, ZIO}

class InMemoryProductStore(productStateRef: Ref[ProductState]) extends ProductStore:
override def preInitialize(sourceStatesBySourceId: Map[SourceId, SourceState]): Task[Unit] =
productStateRef.set(sourceStatesBySourceId)

override def checkInitialized(sourceId: SourceId): Task[Boolean] =
productStateRef.get.map(_.contains(sourceId))

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/store/ProductStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import common.{ChatId, ProductId, UserName}
import store.ProductStore.{ProductCandidate, SourceId, SourceState}
import util.lang.NamedToString

import zio.{Fiber, Task}
import zio.Task

trait ProductStore:
def preInitialize(sourceStatesBySourceId: Map[SourceId, SourceState]): Task[Unit]
def checkInitialized(sourceId: SourceId): Task[Boolean]
def checkHasProductId(sourceId: SourceId, productId: ProductId): Task[Boolean]
def emptyState(sourceId: SourceId): Task[Unit]
Expand Down
12 changes: 11 additions & 1 deletion src/main/scala/store/ProductStoreLayers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@ package store

import store.InMemoryProductStore.ProductState

import zio.{Ref, ULayer, ZLayer}
import zio.{RLayer, Ref, ULayer, ZIO, ZLayer}

object ProductStoreLayers:
val inMemory: ULayer[ProductStore] = ZLayer.fromZIO {
for {
productStateRef <- Ref.make(ProductState.empty)
} yield new InMemoryProductStore(productStateRef)
}

val cachedOverInMemory: RLayer[CacheStateRepository, ProductStore] =
ZLayer.environment[CacheStateRepository] ++ inMemory >>>
ZLayer.fromZIO {
for {
productStore <- ZIO.service[ProductStore]
cacheStateRepository <- ZIO.service[CacheStateRepository]
} yield new CacheProductStore(productStore, cacheStateRepository)

}

0 comments on commit 0466df5

Please sign in to comment.