Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
samundrak committed Apr 14, 2020
0 parents commit f243b71
Show file tree
Hide file tree
Showing 45 changed files with 12,292 additions and 0 deletions.
22 changes: 22 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
NODE_ENV=
APP_ENVIRONMENT=
HOST_NAME=
REDIS_HOST=
REDIS_PORT=
REDIS_PASSWORD=
SERVER_UPLOAD_QUEUE_PREFIX=
FINISH_UPLOAD_QUEUE=
BOX_CLIENT_ID=
BOX_CLIENT_SECRET=
REDIS_PASSWORD=
APP_PORT=
LOGZ_IO_TOKEN=
MAX_WORKER=10
MAX_TASK_PER_WORKER=20
SENTRY_NODE_RAVEN_DSN=
THEKDAR_UI_PORT=
BASIC_AUTH_USERNAME=
BASIC_AUTH_PASSWORD=
NOOP_UPLOAD=true
TASK_REJECTION_MINUTE_OF_QUEUE=
ENCRYPTION_SECRET=
20 changes: 20 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"extends": [
"eslint:recommended",
"plugin:import/errors",
"plugin:import/warnings",
"eslint-config-airbnb-base"
],
"globals": {
"require": true,
"process": true
},
"env": {
"es6": true,
"node": true,
"mocha": true
},
"rules": {
"no-underscore-dangle": "off"
}
}
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
node_modules/
.vscode/
.env
logs/all-logs.log
noop/
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# My project's README
26 changes: 26 additions & 0 deletions bin/initiator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env node
const path = require('path');

require('dotenv').config({ path: path.resolve(path.dirname(__dirname), '.env') });

const queueFactory = require('../core/queue-factory');
const loggerFactory = require('../core/loggerFactory');

const logger = loggerFactory(true);
const queue = queueFactory.create(true);

const job = queue.create('std_initiate_uploader', {
uploaderServerHostname: process.env.HOST_NAME,
});
job.save();

job.on('complete', () => process.exit(0));

job.on('failed', (errorMessage) => {
logger.error(errorMessage instanceof Error ? errorMessage : new Error(errorMessage));
process.exit(1);
});

setTimeout(() => {
logger.error(new Error('2 second Timeout occurred while informing distributor about uploader startup'));
}, 2000);
26 changes: 26 additions & 0 deletions bin/pre-shutdown.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env node
const path = require('path');

require('dotenv').config({ path: path.resolve(path.dirname(__dirname), '.env') });

const queueFactory = require('../core/queue-factory');
const loggerFactory = require('../core/loggerFactory');

const queue = queueFactory.create(true);
const logger = loggerFactory(true);

const job = queue.create('std_sleep_uploader', {
uploaderServerHostname: process.env.HOST_NAME,
});
job.save();

job.on('complete', () => process.exit(0));

job.on('failed', (errorMessage) => {
logger.error(errorMessage instanceof Error ? errorMessage : new Error(errorMessage));
process.exit(1);
});

setTimeout(() => {
logger.error(new Error('2 second Timeout occurred while informing distributor about uploader shutdown'));
}, 2000);
16 changes: 16 additions & 0 deletions bin/reset-uploads.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env node
const path = require('path');

require('dotenv').config({ path: path.resolve(path.dirname(__dirname), '.env') });

const queueFactory = require('../core/queue-factory');

const queue = queueFactory.create(true);

const job = queue.create('std_sync_uploads_count', {
count: 0,
uploaderServerHostname: process.env.HOST_NAME,
});
job.save();

job.on('complete', () => process.exit(0));
4 changes: 4 additions & 0 deletions bin/start-dev-app
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env bash

set -eu
exec node_modules/.bin/nodemon --inspect=0.0.0.0:9232 index.js --watch core/ --watch index.js --exec 'node'
159 changes: 159 additions & 0 deletions core/Consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
const { Task, events } = require('thekdar');
const { eachSeries } = require('async');

const { env } = process;
class Consumer {
constructor(queue, thekdar, publisher) {
this._queue = queue;
this._thekdar = thekdar;
this._tasksCallbacks = new Map();
this._handleThekdarMessages();
this.publisher = publisher;
}

handle() {
const queueName = `${env.SERVER_UPLOAD_QUEUE_PREFIX}_${env.HOST_NAME}`
this._queue.process(
queueName,
600,
(job, ctx, done) => {
global.logger.info('Got new Upload request, Adding to local queue');
this.handleUploadJobs(job, ctx, done);
},
);
global.logger.info(`Listening to queue: ${queueName}`);
}

handleUploadJobs(job, ctx, done) {
const data = { ...job.data, jobId: job.id };
const task = new Task();
task.setData(data);
task.setType(Task.TYPE_FORK);
try {
this._thekdar.addTask(task, (err) => {
if (err) {
this._failQueueJob({ job, message: 'Task expired please try again.' }, done);
return false;
}
return true;
});
return this._tasksCallbacks.set(task.getId(), {
data,
done,
});
} catch (error) {
global.logger.error(error, {
workersCount: this._thekdar.getWorkers().size,
tasksCount: this._thekdar.getTasks().size,
});
done(error);
return false;
}
}
stop(allDone) {
const numberOfJobs = this._tasksCallbacks.size;
if (!numberOfJobs) return allDone();
return eachSeries(
this._tasksCallbacks.values(),
({ done, data }, cb) => {
this._queue.create(env.FINISH_UPLOAD_QUEUE, { id: data.jobId }).save(() => {
try {
done(new Error('Unable to complete task'));
this.publisher.publish(
'UPLOAD_LOGS',
JSON.stringify({
...data,
type: events.TASK_ERROR,
}),
);
this._tasksCallbacks.delete(data.taskId);
global.logger.info('Error on task, sending job to distributor');
cb();
} catch (err) {
cb(err);
}
});
},
allDone,
);
}

_failQueueJob({ job, message }, done) {
done(new Error(message));
this._queue.create(env.FINISH_UPLOAD_QUEUE, { id: job.id }).save(() => {
global.logger.info(`Error on task, sending job to distribuer: ${message}`);
});
this.publisher.publish(
'UPLOAD_LOGS',
JSON.stringify({
data: {
message,
},
uploadProcess: job.data,
type: events.TASK_ERROR,
}),
);
}
_handleThekdarMessages() {
this._thekdar.on('message', (data) => {
const taskCallback = this._tasksCallbacks.get(data.taskId);
switch (data.type) {
case 'UPLOAD_PROGRESS':
case 'UPLOAD_LOGS':
if (taskCallback) {
this.publisher.publish(
'UPLOAD_LOGS',
JSON.stringify({
...data,
uploadProcess: taskCallback.data,
}),
);
}
break;
case events.TASK_ERROR:
if (taskCallback) {
taskCallback.done(new Error(data.data.message));
this._queue
.create(env.FINISH_UPLOAD_QUEUE, { id: taskCallback.data.jobId })
.save(() => {
global.logger.info(`Error on task, sending job to distribuer: ${data.data.message}`);
});
this.publisher.publish(
'UPLOAD_LOGS',
JSON.stringify({
...data,
uploadProcess: taskCallback.data,
type: events.TASK_ERROR,
}),
);
this._tasksCallbacks.delete(data.taskId);
}
break;
case events.TASK_COMPLETE:
if (taskCallback) {
taskCallback.done();
this._queue
.create(env.FINISH_UPLOAD_QUEUE, {
id: taskCallback.data.jobId,
})
.save(() => {
global.logger.info('Completed task, sending job to distribuer');
});
this.publisher.publish(
'UPLOAD_LOGS',
JSON.stringify({
...data,
uploadProcess: taskCallback.data,
type: events.TASK_COMPLETE,
}),
);
this._tasksCallbacks.delete(data.taskId);
}
break;
default:
break;
}
});
}
}
module.exports = Consumer;
72 changes: 72 additions & 0 deletions core/Drive.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
const GoogleDrive = require('./platforms/GoogleDrive');
const DropBox = require('./platforms/DropBox');
const Box = require('./platforms/Box');
const PCloud = require('./platforms/PCloud');
const Youtube = require('./platforms/Youtube');
const YandexDisk = require('./platforms/YandexDisk');
const DailyMotion = require('./platforms/DailyMotion');
const Twitch = require('./platforms/Twitch');
const Noop = require('./platforms/Noop');

class Drive {
constructor(service) {
if (service) {
this.setService(service);
}
}

setService(service) {
const Service = Drive.getServiceMappedToDrive()[service];
this.service = new Service();
return this;
}

static getServiceMappedToDrive() {
return {
'google-drive': GoogleDrive,
dropbox: DropBox,
box: Box,
pcloud: PCloud,
youtube: Youtube,
'yandex-disk': YandexDisk,
dailymotion: DailyMotion,
twitch: Twitch,
noop: Noop,
};
}

static serviceExists(service) {
return Drive.getServiceMappedToDrive().hasOwnProperty(service);
}

setAccessToken(response) {
return this.service.setAccessToken(response);
}

getAccessToken() {
return this.service.getAccessToken();
}

upload(options, progressLogger, stepLogger, cb) {
this.service.upload(options, progressLogger, stepLogger, cb);
}

createResource(stream) {
this.service.setResource(stream);
this.service.setSourceStream(stream.get());
this.addToKillableStream(this.service.getSourceStream());
}

addToKillableStream(stream) {
this.service.addToKillableStream(stream);
}

addWrapCallback(callback) {
this.service.addWrapCallback(callback);
}
getFileStream(data) {
return this.service.getFileStream(data);
}
}

module.exports = Drive;
25 changes: 25 additions & 0 deletions core/HttpClient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const axios = require('axios');

// const HOST = process.env.NODE_ENV === 'development'
// ? 'http://localhost:3001'
// : 'http://localhost:3000';
const validateStatus = function validateStatus(status) {
return status >= 200 && status <= 299; // default
};
module.exports = {
simple: function guest() {
return axios.create({
validateStatus,
});
},
auth: function api(accessToken) {
return axios.create({
headers: {
headers: {
Authorization: `Bearer ${accessToken}`,
},
},
validateStatus,
});
},
};
Loading

0 comments on commit f243b71

Please sign in to comment.