Skip to content

Commit

Permalink
♻️ refactor:
Browse files Browse the repository at this point in the history
- getting success build of docker image
- Refactoring the discord file in order to include status and compute job commands
- docker compose refactor
  • Loading branch information
dhruvmalik007 committed Aug 28, 2023
1 parent 77184c6 commit fe2a775
Show file tree
Hide file tree
Showing 15 changed files with 3,465 additions and 1 deletion.
58 changes: 58 additions & 0 deletions bots/Discord/Commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
this script defines the command functions that it runs from the georender package
"""

import discord
from discord import app_commands
from discord.ext import commands
from discord.ext.commands import Context
import platform
import random
import aiohttp
import time

from consumer.kafkaConsumer import kafka_consume_message_jobResult
from producer.kafkaProducer import kafka_producer_job


class UserCommands(commands.Cog, name= "lidarhd" ):
imageName: str
client: str


def __init__(self,bot):
self.bot = bot


# def __init__(self, dockerimageId, clientId: str, bot) -> None:
# self.imageName = dockerimageId
# self.client = clientId # corresponds to the user id that want to access the infrastructure.
# self.bot = bot

async def on_ready(self):
print(f'added as the {self.user}!')


@commands.hybrid_command(name="job_point", describes="creates user jobs on the given coordinates")
@app_commands.describe(scope="inputs are X and Y coordinates for the area you want to find 3D scans")
async def job_point(self, context:Context, Xcoord, YCoord,ipfs_shp_file, ipfs_template_file):
"""
fetches the input from the user and transfers to the output.
"""
username = context.author.name
print(f'Message transferred to the bacalhau compute job: {Xcoord, YCoord, username, ipfs_shp_file, ipfs_template_file }')
kafka_producer_job(Xcoord=Xcoord, Ycoord=YCoord, username=username, ipfs_shp_file=ipfs_shp_file, ipfs_filename_template=ipfs_template_file)
## check whether there is any message in the
time.sleep(10)
print("waited for 100 sec(for test), checking if the consume message is generated")
kafka_consume_message_jobResult(topic='bacalhau_job_compute', keyID=username)

@commands.hybrid_command(name="get_status", describes="gets the output status of the compute job")
@app_commands.describe(scope="defines the details about the current job status")
async def get_status(self,context: Context, jobId: str, username):
print("the status of the your previous job by {} of given jobId {} is follows:".format(context.author.name,jobId))

#kafka_consume_message_jobResult(topic='bacalhau_compute_job',keyID=username)

async def setup(bot):
await bot.add_cog(UserCommands(bot))
3 changes: 3 additions & 0 deletions bots/Discord/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
scripts to host the discord based bot
"""
214 changes: 214 additions & 0 deletions bots/Discord/circombot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import asyncio
import json
import logging
import os
import platform
import random
import sys


import discord
from discord.ext import commands, tasks
from discord.ext.commands import Bot, Context

try:
with open(f"{os.path.realpath(os.path.dirname(__file__))}/config.json") as file:
config = json.load(file)
except FileNotFoundError as notFound:
print(notFound)


intents = discord.Intents.default()

bot = Bot(
command_prefix= "/",
intents=intents,
help_command=None,
)


## taken from krypt0nn repo that renders them in the prettify way:

class LoggingFormatter(logging.Formatter):
# Colors
black = "\x1b[30m"
red = "\x1b[31m"
green = "\x1b[32m"
yellow = "\x1b[33m"
blue = "\x1b[34m"
gray = "\x1b[38m"
# Styles
reset = "\x1b[0m"
bold = "\x1b[1m"

COLORS = {
logging.DEBUG: gray + bold,
logging.INFO: blue + bold,
logging.WARNING: yellow + bold,
logging.ERROR: red,
logging.CRITICAL: red + bold,
}

def format(self, record):
log_color = self.COLORS[record.levelno]
format = "(black){asctime}(reset) (levelcolor){levelname:<8}(reset) (green){name}(reset) {message}"
format = format.replace("(black)", self.black + self.bold)
format = format.replace("(reset)", self.reset)
format = format.replace("(levelcolor)", log_color)
format = format.replace("(green)", self.green + self.bold)
formatter = logging.Formatter(format, "%Y-%m-%d %H:%M:%S", style="{")
return formatter.format(record)



logger = logging.getLogger("discord_bot")
logger.setLevel(logging.INFO)

# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(LoggingFormatter())

# File handler
file_handler = logging.FileHandler(filename="discord.log", encoding="utf-8", mode="w")
file_handler_formatter = logging.Formatter(
"[{asctime}] [{levelname:<8}] {name}: {message}", "%Y-%m-%d %H:%M:%S", style="{"
)


file_handler.setFormatter(file_handler_formatter)

# Add the handlers
logger.addHandler(console_handler)
logger.addHandler(file_handler)
bot.logger = logger


@bot.event
async def on_ready() -> None:
"""
The code in this event is executed when the bot is ready.
"""
bot.logger.info(f"Logged in globally as {bot.user.name}")
bot.logger.info(f"discord.py API version: {discord.__version__}")
bot.logger.info(f"Python version: {platform.python_version()}")
bot.logger.info(f"Running on: {platform.system()} {platform.release()} ({os.name})")
bot.logger.info("-------------------")

if config["sync_commands_globally"]:
bot.logger.info("Syncing commands globally...")
await bot.tree.sync()
bot.logger.info("commands synced, go now to the discord channel in order to start running commands")



@tasks.loop(minutes=1.0)
async def sending_compute_job(ctx:Context):
ctx.reply("hi {}, the compute is scheduled and the progress is {}".format(ctx.author,ctx.args))



@bot.event
async def on_message(message: discord.Message) -> None:
"""
The code in this event is executed every time someone sends a message, with or without the prefix
:param message: The message that was sent.
"""
if message.author == bot.user or message.author.bot:
return
await bot.process_commands(message)
message.reply("hiya, your job will be scheduled in a min")


@bot.event
async def on_command_completion(context: Context) -> None:
"""
The code in this event is executed every time a normal command has been *successfully* executed.
:param context: The context of the command that has been executed.
"""
full_command_name = context.command.qualified_name
split = full_command_name.split(" ")
executed_command = str(split[0])
if context.guild is not None:
bot.logger.info(
f"Executed {executed_command} command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id})"
)
else:
bot.logger.info(
f"Executed {executed_command} command by {context.author} (ID: {context.author.id}) in DMs"
)


@bot.event
async def on_command_error(context: Context, error) -> None:
"""
The code in this event is executed every time a normal valid command catches an error.
:param context: The context of the normal command that failed executing.
:param error: The error that has been faced.
"""
if isinstance(error, commands.CommandOnCooldown):
minutes, seconds = divmod(error.retry_after, 60)
hours, minutes = divmod(minutes, 60)
hours = hours % 24
embed = discord.Embed(
description=f"**Please slow down** - You can use this command again in {f'{round(hours)} hours' if round(hours) > 0 else ''} {f'{round(minutes)} minutes' if round(minutes) > 0 else ''} {f'{round(seconds)} seconds' if round(seconds) > 0 else ''}.",
color=0xE02B2B,
)
await context.send(embed=embed)

"""
Same as above, just for the @checks.is_owner() check.
"""
embed = discord.Embed(
description="You are not the owner of the bot!", color=0xE02B2B
)
await context.send(embed=embed)
if context.guild:
bot.logger.warning(
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the guild {context.guild.name} (ID: {context.guild.id}), but the user is not an owner of the bot."
)
else:
bot.logger.warning(
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the bot's DMs, but the user is not an owner of the bot."
)
elif isinstance(error, commands.MissingPermissions):
embed = discord.Embed(
description="You are missing the permission(s) `"
+ ", ".join(error.missing_permissions)
+ "` to execute this command!",
color=0xE02B2B,
)
await context.send(embed=embed)
elif isinstance(error, commands.BotMissingPermissions):
embed = discord.Embed(
description="I am missing the permission(s) `"
+ ", ".join(error.missing_permissions)
+ "` to fully perform this command!",
color=0xE02B2B,
)
await context.send(embed=embed)
elif isinstance(error, commands.MissingRequiredArgument):
embed = discord.Embed(
title="Error!",
# We need to capitalize because the command arguments have no capital letter in the code.
description=str(error).capitalize(),
color=0xE02B2B,
)
await context.send(embed=embed)
else:
raise error


async def load_cogs() -> None:
try:
await bot.load_extension(f"Commands")
bot.logger.info("commands package loaded")
except Exception as e:
bot.logger.error(f"Failed to load extension from commands.py")

asyncio.run(load_cogs())
bot.run(config["token"])


25 changes: 25 additions & 0 deletions bots/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM python:3.11.1

RUN mkdir -p /usr/src/app/discordbot/

COPY ./pyproject.toml /usr/src/app/discordbot/pyproject.toml

WORKDIR /usr/src/app/discordbot/


## PATH="/root/.local/bin:$PATH"


ENV PATH=" ${PATH}:/root/.local/bin"



RUN curl -sSL https://install.python-poetry.org | python3 - && poetry config virtualenvs.create false && poetry install


WORKDIR /

COPY . /usr/src/app/discordbot/


ENTRYPOINT [ "python3", "/usr/src/app/discordbot/Discord/circombot.py" ]
2 changes: 2 additions & 0 deletions bots/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from Discord import *
from twitter import *
3 changes: 3 additions & 0 deletions bots/consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
functions for consuming the message generated from the given broker package to the discord result.
"""
73 changes: 73 additions & 0 deletions bots/consumer/kafkaConsumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import platform
from kafka import KafkaConsumer
from dotenv import load_dotenv, dotenv_values
#from fastapi import FastAPI
import json
import logging

import time


import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log = logging.getLogger("uvicorn")


load_dotenv(dotenv_path='../../.env')
config = dotenv_values(dotenv_path='../../.env')

consumer: KafkaConsumer

async def startup_event(self):
"""Start up event for FastAPI application."""

log.info("Starting up consumer")
consumer = KafkaConsumer(
bootstrap_servers=[config["KAFKA_BROKER_URL"]],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=config["SASL_PLAIN_USERNAME"],
sasl_plain_password=config["SASL_PLAIN_PASSWORD"],
auto_offset_reset='earliest',
consumer_timeout_ms=1000
)

def kafka_consume_message_jobResult(self,topic: str, keyID: str) -> json:
"""
this allows for messages to be consumed that is shared by the bacalhau storing the response to the job status function.
keyID corresponds to which of you wanted get the job response (this will be either jobId or based on user details)
"""
topic = 'bacalhau_compute_job'

consumer.subscribe(topic)
logger.info('now fetching the response of keyID provided by the user')
## only consider last message for consuming for bacalhau

for message in consumer:
if(message[keyID].key == keyID):
current_message_offset = message[keyID].value
break

consumer.seek(partition=topic, offset=current_message_offset)

parameters = json.loads(consumer.poll(topic))
[cid, nodeId, path] = parameters

print("results of given job is: {}{}{}".format(cid,nodeId,path))

def kafka_consume_result_status(keyID: str):
topic = 'bacalau_list_jobs'

consumer.subscribe(topic)
logger.info('now fetching the response of keyID provided by the user')

for message in consumer:
if(message[keyID].key == keyID):
current_message_offset = message[keyID].value
break

consumer.seek(partition=topic, offset=current_message_offset)

parameters = json.loads(consumer.poll(topic))
[cid, nodeId, path] = parameters
Loading

0 comments on commit fe2a775

Please sign in to comment.