Skip to content

Commit

Permalink
break: covert into workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
nikola-bozin-org committed May 4, 2024
1 parent f0299c0 commit 1c592f5
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 0 deletions.
13 changes: 13 additions & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "worker"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.35.1", features = ["full"] }
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1.0.112"
dotenv = "0.15.0"
sqlx = { version = "0.7.4", features = ["postgres","runtime-tokio-rustls"] }
base64="0.22.1"
teloxide = { version = "0.12", features = ["macros"] }
27 changes: 27 additions & 0 deletions worker/src/db_connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
pub type Result<T> = core::result::Result<T, Error>;

use sqlx::postgres::PgPoolOptions;

pub type Database = sqlx::Pool<sqlx::Postgres>;

#[derive(Debug)]
pub enum Error {
FailedConnectingToDatabase { error: String },
}

impl From<sqlx::Error> for Error {
fn from(value: sqlx::Error) -> Self {
Self::FailedConnectingToDatabase {
error: value.to_string(),
}
}
}

pub async fn connect(db_url: &str) -> Result<Database> {
let pool: Database = PgPoolOptions::new()
// TODO: Max connections! Dynamic?
.max_connections(5)
.connect(db_url)
.await?;
Ok(pool)
}
19 changes: 19 additions & 0 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
mod db_connect;
mod wisdoms_checker;

use std::env;

use db_connect::connect;
use wisdoms_checker::worker_thread;

#[tokio::main]
async fn main() {
let _ = dotenv::dotenv();

let database_url = env::var("DATABASE_URL")
.unwrap_or_else(|_| panic!("Missing required environment variable: {}", "DATABASE_URL"));

let db = connect(database_url.as_str()).await.unwrap();

tokio::spawn(worker_thread(db)).await.unwrap();
}
132 changes: 132 additions & 0 deletions worker/src/wisdoms_checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use base64::{self, Engine};
use dotenv::dotenv;
use serde::Deserialize;
use sqlx::PgPool;
use std::time::Duration;
use std::{env, fs};
use teloxide::{prelude::*, types::Recipient, RequestError};

#[derive(Debug, Deserialize)]
struct Wisdom {
description: String,
}

#[derive(Debug, sqlx::FromRow)]
struct CountResult {
count: i64,
}

async fn send_tg_message(bot: Bot, message: &str, chat_id: i64) -> Result<Message, RequestError> {
bot.send_message(Recipient::Id(ChatId(chat_id)), message)
.await
}

//TODO: This code should BE fixed to not use so many match-es everywhere.
pub async fn worker_thread(pool: PgPool) {
let _ = dotenv().ok();
let mut prev_base64_string = String::new();
let bot = Bot::from_env();

let chat_id = env::var("WISDOMIA_MONOLITHIC_BOT_CHAT_ID").unwrap_or_else(|_| {
panic!("WISDOMIA_MONOLITHIC_BOT_CHAT_ID not found in .env");
});
let chat_id = chat_id.parse::<i64>().unwrap_or_else(|_| {
panic!("Unable to parse chat_id to i64 type.");
});

loop {
tokio::time::sleep(Duration::from_secs(5)).await;

let base64_string = match fs::read_to_string("../encoded-wisdoms.b64") {
Ok(content) => content.replace(['\n', '\r'], ""),
Err(e) => {
send_tg_message(
bot,
format!("Failed to read file encoded-wisdoms.b64: {}", e).as_str(),
chat_id,
)
.await
.unwrap();
return;
}
};

if base64_string == prev_base64_string {
continue;
}

let decoded_bytes = match base64::prelude::BASE64_STANDARD.decode(&base64_string) {
Ok(bytes) => bytes,
Err(e) => {
send_tg_message(
bot,
format!("Failed to decode base64 string: {}", e).as_str(),
chat_id,
)
.await
.unwrap();
return;
}
};

let wisdoms: Vec<Wisdom> = match serde_json::from_slice(&decoded_bytes) {
Ok(parsed) => parsed,
Err(e) => {
send_tg_message(
bot,
format!("Failed to parse JSON: {}", e).as_str(),
chat_id,
)
.await
.unwrap();
return;
}
};

for wisdom in wisdoms {
let existing_wisdom = match sqlx::query_as::<_, CountResult>(
"SELECT COUNT(*) as count FROM wisdoms WHERE description = $1",
)
.bind(&wisdom.description)
.fetch_one(&pool)
.await
{
Ok(result) => result,
Err(e) => {
send_tg_message(
bot,
format!("Failed to check if wisdom exists: {}", e).as_str(),
chat_id,
)
.await
.unwrap();
return;
}
};

if existing_wisdom.count == 0 {
match sqlx::query("INSERT INTO wisdoms (description) VALUES ($1)")
.bind(&wisdom.description)
.execute(&pool)
.await
{
Ok(_) => {}
Err(e) => {
send_tg_message(
bot,
format!("Failed to insert wisdom into database: {}", e).as_str(),
chat_id,
)
.await
.unwrap();
return;
}
};
} else {
println!("Wisdom already exists, skipping: {:?}", wisdom);
}
}

prev_base64_string = base64_string;
}
}

0 comments on commit 1c592f5

Please sign in to comment.