From 0466df5545fce0f9e3ffd1d8610a5092c1311f53 Mon Sep 17 00:00:00 2001 From: ekuzmichev Date: Wed, 7 Aug 2024 15:51:33 +0300 Subject: [PATCH] Flush state to json --- build.sbt | 1 + src/main/scala/app/AppLayers.scala | 10 ++-- src/main/scala/app/OzonPriceCheckerApp.scala | 23 +++++++- .../bot/OzonPriceCheckerBotCommands.scala | 1 + src/main/scala/config/ConfigTypes.scala | 8 ++- .../OzonPriceCheckerCommandProcessor.scala | 45 +++++++++++++-- src/main/scala/store/CacheProductStore.scala | 57 +++++++++++++++++++ src/main/scala/store/CacheState.scala | 7 ++- .../scala/store/CacheStateRepository.scala | 8 +++ .../store/CacheStateRepositoryLayers.scala | 18 ++++++ .../store/FileCacheStateRepository.scala | 29 ++++++++++ .../scala/store/InMemoryProductStore.scala | 5 +- src/main/scala/store/ProductStore.scala | 3 +- src/main/scala/store/ProductStoreLayers.scala | 12 +++- 14 files changed, 209 insertions(+), 18 deletions(-) create mode 100644 src/main/scala/store/CacheProductStore.scala create mode 100644 src/main/scala/store/CacheStateRepository.scala create mode 100644 src/main/scala/store/CacheStateRepositoryLayers.scala create mode 100644 src/main/scala/store/FileCacheStateRepository.scala diff --git a/build.sbt b/build.sbt index c9b03b2..9e7d794 100644 --- a/build.sbt +++ b/build.sbt @@ -31,6 +31,7 @@ lazy val root = (project in file(".")) "org.typelevel" %% "cats-core" % "2.12.0", "io.lemonlabs" %% "scala-uri" % "4.0.3", "org.jasypt" % "jasypt" % "1.9.3", + "com.github.pathikrit" %% "better-files" % "3.9.2", "org.scalatest" %% "scalatest" % "3.2.19" % Test, "dev.zio" %% "zio-test" % "2.1.6" % Test, "com.stephenn" %% "scalatest-circe" % "0.2.5" % Test diff --git a/src/main/scala/app/AppLayers.scala b/src/main/scala/app/AppLayers.scala index 8746a1a..8b5a787 100644 --- a/src/main/scala/app/AppLayers.scala +++ b/src/main/scala/app/AppLayers.scala @@ -8,7 +8,7 @@ import encryption.EncDecLayers import product.{ProductFetcherLayers, ProductIdParserLayers} import scalascraper.BrowserLayers import schedule.{JobIdGeneratorLayers, ZioSchedulerLayers} -import store.ProductStoreLayers +import store.{CacheStateRepository, CacheStateRepositoryLayers, ProductStore, ProductStoreLayers} import telegram.TelegramClientLayers import util.lang.Throwables.failure @@ -16,7 +16,8 @@ import org.telegram.telegrambots.longpolling.interfaces.LongPollingUpdateConsume import zio.{RLayer, ZIO, ZIOAppArgs, ZLayer} object AppLayers: - private type ROut = AppConfig & LongPollingUpdateConsumer & ConsumerRegisterer & ProductWatchingJobScheduler + private type ROut = AppConfig & LongPollingUpdateConsumer & ConsumerRegisterer & ProductWatchingJobScheduler & + ProductStore & CacheStateRepository val ozonPriceCheckerAppLayer: RLayer[ZIOAppArgs, ROut] = ZLayer @@ -33,7 +34,7 @@ object AppLayers: ConsumerRegistererLayers.impl, UpdateConsumerLayers.ozonPriceChecker, TelegramClientLayers.okHttp, - ProductStoreLayers.inMemory, + ProductStoreLayers.cachedOverInMemory, ProductFetcherLayers.ozon, ZioSchedulerLayers.impl, BrowserLayers.jsoup, @@ -41,6 +42,7 @@ object AppLayers: ProductIdParserLayers.ozon, CommandProcessorLayers.ozonPriceChecker, EncDecLayers.aes256(encryptionPasswordEnv.get), - ProductWatchingJobSchedulerLayers.impl + ProductWatchingJobSchedulerLayers.impl, + CacheStateRepositoryLayers.file ) } diff --git a/src/main/scala/app/OzonPriceCheckerApp.scala b/src/main/scala/app/OzonPriceCheckerApp.scala index adaeea8..eac2132 100644 --- a/src/main/scala/app/OzonPriceCheckerApp.scala +++ b/src/main/scala/app/OzonPriceCheckerApp.scala @@ -3,6 +3,8 @@ package app import config.AppConfig import consumer.* +import store.ProductStore.{SourceId, SourceState} +import store.{CacheState, CacheStateRepository, ProductStore} import telegram.BotsApplicationScopes import util.lang.Throwables.makeCauseSeqMessage import util.zio.ZioClock.getCurrentDateTime @@ -20,8 +22,11 @@ object OzonPriceCheckerApp extends ZIOAppDefault: .catchAll(t => ZIO.fail(s"Got error while running ${this.getClass.getSimpleName}: $makeCauseSeqMessage(t)")) .provideLayer(AppLayers.ozonPriceCheckerAppLayer) - private def runBot() - : RIO[LongPollingUpdateConsumer with ConsumerRegisterer with AppConfig with ProductWatchingJobScheduler, Unit] = + private def runBot(): RIO[ + LongPollingUpdateConsumer & ConsumerRegisterer & AppConfig & ProductWatchingJobScheduler & ProductStore & + CacheStateRepository, + Unit + ] = ZIO.scoped { for { startDateTime <- getCurrentDateTime @@ -31,11 +36,25 @@ object OzonPriceCheckerApp extends ZIOAppDefault: consumerRegisterer <- ZIO.service[ConsumerRegisterer] appConfig <- ZIO.service[AppConfig] productWatchingJobScheduler <- ZIO.service[ProductWatchingJobScheduler] + productStore <- ZIO.service[ProductStore] + cacheStateRepository <- ZIO.service[CacheStateRepository] _ <- consumerRegisterer.registerConsumer(botsApplication, longPollingUpdateConsumer, appConfig.botToken.value) _ <- productWatchingJobScheduler.scheduleProductsWatching() + cacheState <- cacheStateRepository.read() + _ <- ZIO.when(cacheState.entries.nonEmpty)( + productStore + .preInitialize(toSourceStatesBySourceId(cacheState)) + .zipLeft(ZIO.log(s"Pre-initialized product store with $cacheState")) + ) + _ <- Schedulers.scheduleBotStatusLogging(startDateTime, appConfig.logBotStatusInterval) } yield () } + + private def toSourceStatesBySourceId(cacheState: CacheState): Map[SourceId, SourceState] = + cacheState.entries + .map(entry => SourceId(entry.userName, entry.chatId) -> SourceState(products = entry.products, None)) + .toMap diff --git a/src/main/scala/bot/OzonPriceCheckerBotCommands.scala b/src/main/scala/bot/OzonPriceCheckerBotCommands.scala index 5997022..c3341c0 100644 --- a/src/main/scala/bot/OzonPriceCheckerBotCommands.scala +++ b/src/main/scala/bot/OzonPriceCheckerBotCommands.scala @@ -7,3 +7,4 @@ object OzonPriceCheckerBotCommands: val Stop = "/stop" val WatchNewProduct = "/watchnewproduct" val UnwatchAllProducts = "/unwatchallproducts" + val ShowAllProducts = "/showallproducts" diff --git a/src/main/scala/config/ConfigTypes.scala b/src/main/scala/config/ConfigTypes.scala index 2737547..8aeb296 100644 --- a/src/main/scala/config/ConfigTypes.scala +++ b/src/main/scala/config/ConfigTypes.scala @@ -8,5 +8,9 @@ import scala.concurrent.duration.Duration case class Sensitive(value: String): override def toString: String = s"***" -case class AppConfig(botToken: Sensitive, priceCheckingCron: String, logBotStatusInterval: Duration) - extends NamedToString +case class AppConfig( + botToken: Sensitive, + priceCheckingCron: String, + logBotStatusInterval: Duration, + cacheStateFilePath: String +) extends NamedToString diff --git a/src/main/scala/consumer/OzonPriceCheckerCommandProcessor.scala b/src/main/scala/consumer/OzonPriceCheckerCommandProcessor.scala index 81442be..66fae50 100644 --- a/src/main/scala/consumer/OzonPriceCheckerCommandProcessor.scala +++ b/src/main/scala/consumer/OzonPriceCheckerCommandProcessor.scala @@ -11,7 +11,8 @@ import util.zio.ZioLoggingImplicits.Ops import org.telegram.telegrambots.meta.generics.TelegramClient import zio.{Task, ZIO} -class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClient: TelegramClient) extends CommandProcessor: +class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClient: TelegramClient) + extends CommandProcessor: private implicit val _telegramClient: TelegramClient = telegramClient def processCommand(sourceId: SourceId, text: String): Task[Unit] = @@ -25,6 +26,8 @@ class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClien processWatchNewProductCommand(sourceId) else if (text == OzonPriceCheckerBotCommands.UnwatchAllProducts) processUnwatchAllProductsCommand(sourceId) + else if (text == OzonPriceCheckerBotCommands.ShowAllProducts) + processShowAllProductsCommand(sourceId) else ZIO.log(s"Got unknown command $text") *> sendTextMessage(sourceId.chatId, s"I can not process command $text. Please send me a command known to me.") @@ -79,10 +82,7 @@ class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClien sendTextMessage(sourceId.chatId, "Send me OZON URL or product ID.") *> productStore.updateProductCandidate(sourceId, WaitingProductId) else - sendTextMessage( - sourceId.chatId, - s"You have not been yet initialized. Send me ${OzonPriceCheckerBotCommands.Start} command to fix it." - ) + askToSendStartCommand(sourceId) } yield ()) .logged(s"process command ${OzonPriceCheckerBotCommands.WatchNewProduct}") @@ -91,3 +91,38 @@ class OzonPriceCheckerCommandProcessor(productStore: ProductStore, telegramClien .emptyState(sourceId) .zipRight(sendTextMessage(sourceId.chatId, "I have removed all watched products.")) .logged(s"process command ${OzonPriceCheckerBotCommands.UnwatchAllProducts}") + + private def processShowAllProductsCommand(sourceId: SourceId): Task[Unit] = + productStore + .readSourceState(sourceId) + .tap { + case Some(sourceState) => + sourceState.products match + case Nil => + sendTextMessage( + sourceId.chatId, + s"You have no watched products.\n\n" + + s"To watch new product send me ${OzonPriceCheckerBotCommands.WatchNewProduct}" + ) + case products => + sendTextMessage( + sourceId.chatId, + s"Here are your watched products:\n\n" + + s"${products.zipWithIndex + .map { case (product, index) => + s"${index + 1}) ${product.id}\n\t" + + s"Price threshold: ${product.priceThreshold} ₽" + } + .mkString("\n")}" + ) + case None => + askToSendStartCommand(sourceId) + } + .logged(s"process command ${OzonPriceCheckerBotCommands.ShowAllProducts}") + .unit + + private def askToSendStartCommand(sourceId: SourceId) = + sendTextMessage( + sourceId.chatId, + s"You have not been yet initialized. Send me ${OzonPriceCheckerBotCommands.Start} command to fix it." + ) diff --git a/src/main/scala/store/CacheProductStore.scala b/src/main/scala/store/CacheProductStore.scala new file mode 100644 index 0000000..3a9025f --- /dev/null +++ b/src/main/scala/store/CacheProductStore.scala @@ -0,0 +1,57 @@ +package ru.ekuzmichev +package store +import common.ProductId +import store.ProductStore.{Product, ProductCandidate, SourceId, SourceState} + +import zio.Task + +class CacheProductStore(decoratee: ProductStore, cacheStateRepository: CacheStateRepository) extends ProductStore: + override def preInitialize(sourceStatesBySourceId: Map[SourceId, SourceState]): Task[Unit] = + decoratee.preInitialize(sourceStatesBySourceId) + + override def checkInitialized(sourceId: SourceId): Task[Boolean] = + decoratee.checkInitialized(sourceId) + + override def checkHasProductId(sourceId: SourceId, productId: ProductId): Task[Boolean] = + decoratee.checkHasProductId(sourceId, productId) + + override def emptyState(sourceId: SourceId): Task[Unit] = + decoratee.emptyState(sourceId) <* replaceStateInCache() + + private def replaceStateInCache(): Task[Unit] = + readAsCacheState().tap(cacheStateRepository.replace).unit + + private def readAsCacheState(): Task[CacheState] = + readAll().map(toCacheState) + + private def toCacheState(sourceStatesBySourceId: Map[SourceId, SourceState]): CacheState = + CacheState( + entries = sourceStatesBySourceId.map { case (sourceId, sourceState) => + CacheStateEntry( + userName = sourceId.userName, + chatId = sourceId.chatId, + products = sourceState.products + ) + }.toSeq + ) + + override def clearState(sourceId: SourceId): Task[Unit] = + decoratee.clearState(sourceId) <* replaceStateInCache() + + override def readSourceState(sourceId: SourceId): Task[Option[SourceState]] = + decoratee.readSourceState(sourceId) + + override def readAll(): Task[Map[SourceId, SourceState]] = + decoratee.readAll() + + override def updateProductCandidate( + sourceId: SourceId, + productCandidate: ProductCandidate + ): Task[Boolean] = + decoratee.updateProductCandidate(sourceId, productCandidate) + + override def resetProductCandidate(sourceId: SourceId): Task[Boolean] = + decoratee.resetProductCandidate(sourceId) + + override def addProduct(sourceId: SourceId, product: Product): Task[Boolean] = + decoratee.addProduct(sourceId, product) <* replaceStateInCache() diff --git a/src/main/scala/store/CacheState.scala b/src/main/scala/store/CacheState.scala index 7743f41..d73fd11 100644 --- a/src/main/scala/store/CacheState.scala +++ b/src/main/scala/store/CacheState.scala @@ -3,15 +3,18 @@ package store import common.{ChatId, UserName} import store.ProductStore.Product +import util.lang.NamedToString import io.circe.Codec import io.circe.generic.semiauto.deriveCodec -case class CacheStateEntry(userName: UserName, chatId: ChatId, products: Seq[Product]) +case class CacheStateEntry(userName: UserName, chatId: ChatId, products: Seq[Product]) extends NamedToString -case class CacheState(entries: Seq[CacheStateEntry]) +case class CacheState(entries: Seq[CacheStateEntry]) extends NamedToString object CacheState: + def empty: CacheState = CacheState(Seq.empty) + implicit val cacheStateCodec: Codec[CacheState] = { import io.circe.generic.auto.* deriveCodec[CacheState] diff --git a/src/main/scala/store/CacheStateRepository.scala b/src/main/scala/store/CacheStateRepository.scala new file mode 100644 index 0000000..a139325 --- /dev/null +++ b/src/main/scala/store/CacheStateRepository.scala @@ -0,0 +1,8 @@ +package ru.ekuzmichev +package store + +import zio.Task + +trait CacheStateRepository: + def read(): Task[CacheState] + def replace(cacheState: CacheState): Task[Unit] diff --git a/src/main/scala/store/CacheStateRepositoryLayers.scala b/src/main/scala/store/CacheStateRepositoryLayers.scala new file mode 100644 index 0000000..53218c5 --- /dev/null +++ b/src/main/scala/store/CacheStateRepositoryLayers.scala @@ -0,0 +1,18 @@ +package ru.ekuzmichev +package store + +import config.AppConfig + +import better.files.File as ScalaFile +import zio.{RLayer, ZIO, ZLayer} + +object CacheStateRepositoryLayers: + val file: RLayer[AppConfig, FileCacheStateRepository] = ZLayer.fromZIO { + for { + appConfig <- ZIO.service[AppConfig] + cacheStateFilePath = appConfig.cacheStateFilePath + file <- ZIO.attempt(ScalaFile(cacheStateFilePath)) + _ <- ZIO.log(s"Recreating file $file if not exists") + _ <- ZIO.attempt(file.createIfNotExists(createParents = true)) + } yield new FileCacheStateRepository(cacheStateFilePath) + } diff --git a/src/main/scala/store/FileCacheStateRepository.scala b/src/main/scala/store/FileCacheStateRepository.scala new file mode 100644 index 0000000..b9e16c4 --- /dev/null +++ b/src/main/scala/store/FileCacheStateRepository.scala @@ -0,0 +1,29 @@ +package ru.ekuzmichev +package store +import util.lang.Throwables.failure + +import better.files.{File as ScalaFile, *} +import io.circe.parser.decode +import io.circe.syntax.* +import zio.{Task, ZIO} + +class FileCacheStateRepository(filePath: String) extends CacheStateRepository: + override def read(): Task[CacheState] = + ZIO + .attempt(asScalaFile) + .flatMap(file => + if (file.exists) + ZIO + .attempt(file.contentAsString) + .flatMap(json => + ZIO + .fromEither(decode[CacheState](json)) + .mapError(error => failure(s"Failed to read cache state: $error")) + ) + else ZIO.succeed(CacheState.empty) + ) + + override def replace(cacheState: CacheState): Task[Unit] = + ZIO.attempt(asScalaFile.overwrite(cacheState.asJson.spaces2)) + + private def asScalaFile: ScalaFile = ScalaFile(filePath) diff --git a/src/main/scala/store/InMemoryProductStore.scala b/src/main/scala/store/InMemoryProductStore.scala index 48f5601..7a1379f 100644 --- a/src/main/scala/store/InMemoryProductStore.scala +++ b/src/main/scala/store/InMemoryProductStore.scala @@ -5,9 +5,12 @@ import common.ProductId import store.InMemoryProductStore.ProductState import store.ProductStore.{Product, ProductCandidate, SourceId, SourceState} -import zio.{Fiber, Ref, Task, ZIO} +import zio.{Ref, Task, ZIO} class InMemoryProductStore(productStateRef: Ref[ProductState]) extends ProductStore: + override def preInitialize(sourceStatesBySourceId: Map[SourceId, SourceState]): Task[Unit] = + productStateRef.set(sourceStatesBySourceId) + override def checkInitialized(sourceId: SourceId): Task[Boolean] = productStateRef.get.map(_.contains(sourceId)) diff --git a/src/main/scala/store/ProductStore.scala b/src/main/scala/store/ProductStore.scala index 2884daf..2f5553b 100644 --- a/src/main/scala/store/ProductStore.scala +++ b/src/main/scala/store/ProductStore.scala @@ -5,9 +5,10 @@ import common.{ChatId, ProductId, UserName} import store.ProductStore.{ProductCandidate, SourceId, SourceState} import util.lang.NamedToString -import zio.{Fiber, Task} +import zio.Task trait ProductStore: + def preInitialize(sourceStatesBySourceId: Map[SourceId, SourceState]): Task[Unit] def checkInitialized(sourceId: SourceId): Task[Boolean] def checkHasProductId(sourceId: SourceId, productId: ProductId): Task[Boolean] def emptyState(sourceId: SourceId): Task[Unit] diff --git a/src/main/scala/store/ProductStoreLayers.scala b/src/main/scala/store/ProductStoreLayers.scala index d5e05b0..4b501cf 100644 --- a/src/main/scala/store/ProductStoreLayers.scala +++ b/src/main/scala/store/ProductStoreLayers.scala @@ -3,7 +3,7 @@ package store import store.InMemoryProductStore.ProductState -import zio.{Ref, ULayer, ZLayer} +import zio.{RLayer, Ref, ULayer, ZIO, ZLayer} object ProductStoreLayers: val inMemory: ULayer[ProductStore] = ZLayer.fromZIO { @@ -11,3 +11,13 @@ object ProductStoreLayers: productStateRef <- Ref.make(ProductState.empty) } yield new InMemoryProductStore(productStateRef) } + + val cachedOverInMemory: RLayer[CacheStateRepository, ProductStore] = + ZLayer.environment[CacheStateRepository] ++ inMemory >>> + ZLayer.fromZIO { + for { + productStore <- ZIO.service[ProductStore] + cacheStateRepository <- ZIO.service[CacheStateRepository] + } yield new CacheProductStore(productStore, cacheStateRepository) + + }