taskManager & recurrent events generation

This commit is contained in:
les
2020-01-30 12:37:19 +01:00
parent 0d83a48452
commit 6ad7fd1d79
21 changed files with 366 additions and 291 deletions

View File

@@ -6,6 +6,7 @@ const { event: Event, resource: Resource, tag: Tag, place: Place, notification:
const Sequelize = require('sequelize')
const exportController = require('./export')
const debug = require('debug')('controller:event')
// const { Task, TaskManager } = require('../../taskManager')
const eventController = {
@@ -76,17 +77,23 @@ const eventController = {
const format = req.params.format || 'json'
const is_admin = req.user && req.user.is_admin
const id = Number(req.params.event_id)
let event = await Event.findByPk(id, {
attributes: {
exclude: ['createdAt', 'updatedAt']
},
include: [
{ model: Tag, attributes: ['tag', 'weigth'], through: { attributes: [] } },
{ model: Place, attributes: ['name', 'address'] },
{ model: Resource, where: !is_admin && { hidden: false }, required: false }
],
order: [[Resource, 'id', 'DESC']]
})
let event
try {
event = await Event.findByPk(id, {
attributes: {
exclude: ['createdAt', 'updatedAt']
},
include: [
{ model: Tag, attributes: ['tag', 'weigth'], through: { attributes: [] } },
{ model: Place, attributes: ['name', 'address'] },
{ model: Resource, where: !is_admin && { hidden: false }, required: false },
{ model: Event, required: false, as: 'parent' }
],
order: [[Resource, 'id', 'DESC']]
})
} catch (e) {
return res.sendStatus(400)
}
if (event && (event.is_visible || is_admin)) {
event = event.toJSON()
@@ -139,8 +146,7 @@ const eventController = {
}
try {
event.is_visible = false
await event.save()
await event.update({ is_visible: false })
res.sendStatus(200)
} catch (e) {
res.sendStatus(404)
@@ -185,112 +191,14 @@ const eventController = {
res.sendStatus(200)
},
// async addRecurrent (start, places, where_tags, limit) {
// const where = {
// is_visible: true,
// recurrent: { [Op.ne]: null }
// // placeId: places
// }
// const events = await Event.findAll({
// where,
// limit,
// attributes: {
// exclude: ['slug', 'likes', 'boost', 'userId', 'is_visible', 'description', 'createdAt', 'updatedAt', 'placeId']
// },
// order: ['start_datetime', [Tag, 'weigth', 'DESC']],
// include: [
// { model: Resource, required: false, attributes: ['id'] },
// { model: Tag, ...where_tags, attributes: ['tag'], through: { attributes: [] } },
// { model: Place, required: false, attributes: ['id', 'name', 'address'] }
// ]
// })
// let allEvents = []
// _.forEach(events, e => {
// allEvents = allEvents.concat(eventController.createEventsFromRecurrent(e.get(), start))
// })
// return allEvents
// },
// // build singular events from a recurrent pattern
// createEventsFromRecurrent (e, start, dueTo = null) {
// const events = []
// const recurrent = JSON.parse(e.recurrent)
// if (!recurrent.frequency) { return false }
// if (!dueTo) {
// dueTo = start.add(2, 'month')
// }
// let cursor = start.startOf('week')
// const start_date = moment.unix(e.start_datetime)
// const duration = moment.unix(e.end_datetime).diff(start_date, 's')
// const frequency = recurrent.frequency
// const days = recurrent.days
// const type = recurrent.type
// // default frequency is '1d' => each day
// const toAdd = { n: 1, unit: 'day' }
// // each week or 2 (search for the first specified day)
// if (frequency === '1w' || frequency === '2w') {
// cursor.add(days[0] - 1, 'day')
// if (frequency === '2w') {
// const nWeeks = cursor.diff(e.start_datetime, 'w') % 2
// if (!nWeeks) { cursor.add(1, 'week') }
// }
// toAdd.n = Number(frequency[0])
// toAdd.unit = 'week'
// // cursor.set('hour', start_date.hour()).set('minute', start_date.minutes())
// }
// cursor.set('hour', start_date.hour()).set('minute', start_date.minutes())
// // each month or 2
// if (frequency === '1m' || frequency === '2m') {
// // find first match
// toAdd.n = 1
// toAdd.unit = 'month'
// if (type === 'weekday') {
// } else if (type === 'ordinal') {
// }
// }
// // add event at specified frequency
// while (true) {
// const first_event_of_week = cursor.clone()
// days.forEach(d => {
// if (type === 'ordinal') {
// cursor.date(d)
// } else {
// cursor.day(d - 1)
// }
// if (cursor.isAfter(dueTo) || cursor.isBefore(start)) { return }
// e.start_datetime = cursor.unix()
// e.end_datetime = e.start_datetime + duration
// events.push(Object.assign({}, e))
// })
// if (cursor.isAfter(dueTo)) { break }
// cursor = first_event_of_week.add(toAdd.n, toAdd.unit)
// cursor.set('hour', start_date.hour()).set('minute', start_date.minutes())
// }
// return events
// },
async _select (start = moment.unix(), limit = 100, show_recurrent = true) {
async _select (start = moment.unix(), limit = 100) {
const where = {
// confirmed event only
recurrent: null,
is_visible: true,
start_datetime: { [Op.gt]: start }
}
if (!show_recurrent) {
where.recurrent = null
}
const events = await Event.findAll({
where,
limit,
@@ -319,59 +227,104 @@ const eventController = {
async select (req, res) {
const start = req.query.start || moment().unix()
const limit = req.query.limit || 100
const show_recurrent = req.query.show_recurrent || true
res.json(await eventController._select(start, limit, show_recurrent))
// const filter_tags = req.query.tags || ''
// const filter_places = req.query.places || ''
res.json(await eventController._select(start, limit))
},
// debug(`select limit:${limit} rec:${show_recurrent} tags:${filter_tags} places:${filter_places}`)
// let where_tags = {}
// const where = {
// // confirmed event only
// is_visible: true,
// start_datetime: { [Op.gt]: start },
// recurrent: null
// }
/**
* Ensure we have at least 3 instances of recurrent events
*/
_createRecurrentOccurrence (e) {
const event = {
parentId: e.id,
title: e.title,
description: e.description,
image_path: e.image_path,
is_visible: e.is_visible,
userId: e.userId,
placeId: e.placeId
}
// if (filter_tags) {
// where_tags = { where: { tag: filter_tags.split(',') } }
// }
const recurrent = e.recurrent
let left = 3 - e.child.length
const start = e.child.length ? moment.unix(e.child[e.child.length - 1].start_datetime) : moment()
let cursor = start.startOf('week')
const start_date = moment.unix(e.start_datetime)
const duration = moment.unix(e.end_datetime).diff(start_date, 's')
const frequency = recurrent.frequency
const days = recurrent.days
const type = recurrent.type
// if (filter_places) {
// where.placeId = filter_places.split(',')
// }
// default frequency is '1d' => each day
const toAdd = { n: 1, unit: 'day' }
// let events = await Event.findAll({
// where,
// limit,
// attributes: {
// exclude: ['slug', 'likes', 'boost', 'userId', 'is_visible', 'description', 'createdAt', 'updatedAt', 'placeId']
// // include: [[Sequelize.fn('COUNT', Sequelize.col('activitypub_id')), 'ressources']]
// },
// order: ['start_datetime', [Tag, 'weigth', 'DESC']],
// include: [
// { model: Resource, required: false, attributes: ['id'] },
// { model: Tag, ...where_tags, attributes: ['tag'], through: { attributes: [] } },
// { model: Place, required: false, attributes: ['id', 'name', 'address'] }
// ]
// })
// each week or 2 (search for the first specified day)
if (frequency === '1w' || frequency === '2w') {
cursor.add(days[0] - 1, 'day')
if (frequency === '2w') {
const nWeeks = cursor.diff(e.start_datetime, 'w') % 2
if (!nWeeks) { cursor.add(1, 'week') }
}
toAdd.n = Number(frequency[0])
toAdd.unit = 'week'
}
// let recurrentEvents = []
// events = _.map(events, e => e.get())
// if (show_recurrent) {
// recurrentEvents = await eventController.addRecurrent(moment.unix(start), where.placeId, where_tags, limit)
// events = _.concat(events, recurrentEvents)
// }
cursor.set('hour', start_date.hour()).set('minute', start_date.minutes())
// // flat tags
// events = _(events).map(e => {
// e.tags = e.tags.map(t => t.tag)
// return e
// })
// each month or 2
if (frequency === '1m' || frequency === '2m') {
// find first match
toAdd.n = 1
toAdd.unit = 'month'
if (type === 'weekday') {
// res.json(events.sort((a, b) => a.start_datetime - b.start_datetime))
} else if (type === 'ordinal') {
}
}
// add event at specified frequency
while (true) {
if (!left) { break }
left -= 1
const first_event_of_week = cursor.clone()
days.forEach(d => {
debug(cursor)
if (type === 'ordinal') {
cursor.date(d)
} else {
cursor.day(d - 1)
}
event.start_datetime = cursor.unix()
event.end_datetime = event.start_datetime + duration
Event.create(event)
cursor.set('hour', start_date.hour()).set('minute', start_date.minutes())
})
cursor = first_event_of_week.add(toAdd.n, toAdd.unit)
}
},
/**
* Create instances of recurrent events
* Remove old
* @param {*} start_datetime
*/
async _createRecurrent (start_datetime = moment().unix()) {
// select recurrent events
const events = await Event.findAll({
where: { is_visible: true, recurrent: { [Op.ne]: null } },
include: [{ model: Event, as: 'child', required: false, where: { start_datetime: { [Op.gt]: start_datetime } } }],
order: ['start_datetime']
})
const creations = []
events
.filter(e => e.child && e.child.length < 3)
.forEach(e => {
eventController._createRecurrentOccurrence(e)
})
return Promise.all(creations)
}
}
module.exports = eventController

View File

@@ -7,6 +7,7 @@ const config = require('config')
const mail = require('../mail')
const { user: User, event: Event, tag: Tag, place: Place } = require('../models')
const settingsController = require('./settings')
const eventController = require('./event')
const debug = require('debug')('user:controller')
const userController = {
@@ -89,6 +90,12 @@ const userController = {
await event.setUser(req.user)
}
// create recurrent instances of event if needed
// without waiting for the task manager
if (event.recurrent) {
eventController._createRecurrent()
}
// return created event to the client
res.json(event)
@@ -155,7 +162,7 @@ const userController = {
if (!user) { return res.sendStatus(200) }
user.recover_code = crypto.randomBytes(16).toString('hex')
mail.send(user.email, 'recover', { user, config })
mail.send(user.email, 'recover', { user, config }, req.settings.locale)
await user.save()
res.sendStatus(200)

View File

@@ -1,6 +1,6 @@
const Email = require('email-templates')
const path = require('path')
const moment = require('moment')
const moment = require('moment-timezone')
const config = require('config')
const settings = require('./controller/settings')
const debug = require('debug')('email')

View File

@@ -1,5 +1,5 @@
const config = require('config')
const moment = require('moment')
const moment = require('moment-timezone')
module.exports = (sequelize, DataTypes) => {
const Event = sequelize.define('event', {
@@ -24,7 +24,7 @@ module.exports = (sequelize, DataTypes) => {
image_path: DataTypes.STRING,
is_visible: DataTypes.BOOLEAN,
recurrent: DataTypes.JSON,
// parent: DataTypes.INTEGER
// parent: DataTypes.INTEGER,
likes: { type: DataTypes.JSON, defaultValue: [] },
boost: { type: DataTypes.JSON, defaultValue: [] }
}, {})
@@ -35,6 +35,8 @@ module.exports = (sequelize, DataTypes) => {
Event.belongsToMany(models.tag, { through: 'event_tags' })
Event.belongsToMany(models.notification, { through: 'event_notification' })
Event.hasMany(models.resource)
Event.hasMany(Event, { as: 'child', foreignKey: 'parentId' })
Event.belongsTo(models.event, { as: 'parent' })
}
Event.prototype.toNoteAP = function (username, follower = []) {

View File

@@ -21,7 +21,6 @@ module.exports = (sequelize, DataTypes) => {
Notification.associate = function (models) {
Notification.belongsToMany(models.event, { through: 'event_notification' })
// associations can be defined here
}
return Notification
}

View File

@@ -1,18 +1,19 @@
const { Nuxt, Builder } = require('nuxt')
// Import and Set Nuxt.js options
const nuxt_config = require('../nuxt.config.js')
const nuxtConfig = require('../nuxt.config.js')
const config = require('config')
const consola = require('consola')
const { TaskManager } = require('./taskManager')
async function main () {
nuxt_config.server = config.server
nuxtConfig.server = config.server
// Init Nuxt.js
const nuxt = new Nuxt(nuxt_config)
const nuxt = new Nuxt(nuxtConfig)
// Build only in dev mode
if (nuxt_config.dev) {
if (nuxtConfig.dev) {
const builder = new Builder(nuxt)
await builder.build()
} else {
@@ -20,9 +21,11 @@ async function main () {
}
nuxt.listen()
consola.info('Listen on %s:%d , visit me here => %s', config.server.host, config.server.port, config.baseurl)
TaskManager.start()
// close connections/port/unix socket
function shutdown () {
TaskManager.stop()
nuxt.close(async () => {
const db = require('./api/models')
await db.sequelize.close()

View File

@@ -0,0 +1,19 @@
module.exports = {
up: (queryInterface, Sequelize) => {
// return Promise.resolve(1)
return queryInterface.addColumn('events', 'parentId', {
type: Sequelize.INTEGER,
references: {
model: 'events',
key: 'id'
},
onUpdate: 'CASCADE',
onDelete: 'SET NULL'
})
},
down: (queryInterface, Sequelize) => {
return queryInterface.removeColumn('events', 'parentId')
}
}

View File

@@ -0,0 +1,19 @@
const { event: Event } = require('../api/models')
module.exports = {
up: async (queryInterface, Sequelize) => {
const events = await Event.findAll({})
const promises = events.map(e => {
return e.update({ recurrent: JSON.parse(e.recurrent) })
})
return Promise.all(promises)
},
down: async (queryInterface, Sequelize) => {
const events = await Event.findAll({})
const promises = events.map(e => {
return e.update({ recurrent: JSON.stringify(e.recurrent) })
})
return Promise.all(promises)
}
}

View File

@@ -8,12 +8,6 @@ module.exports = {
},
down: (queryInterface, Sequelize) => {
/*
Add reverting commands here.
Return a promise to correctly handle asynchronicity.
Example:
return queryInterface.dropTable('users');
*/
return queryInterface.removeIndex('users', ['email'])
}
};
}

123
server/taskManager.js Normal file
View File

@@ -0,0 +1,123 @@
const debug = require('debug')('TaskManager')
const eventController = require('./api/controller/event')
// const notifier = require('./notifier')
class Task {
constructor ({ name, removable = false, repeatEach = 1, method, args = [] }) {
this.name = name
this.removable = removable
this.repeatEach = repeatEach
this.processInNTick = repeatEach
this.method = method
this.args = args
}
process () {
--this.processInNTick
if (this.processInNTick > 0) {
return
}
this.processInNTick = this.repeatEach
try {
const ret = this.method.apply(this, this.args)
if (ret && typeof ret.then === 'function') {
ret.catch(e => debug('TASK ERROR ', this.name, e))
}
} catch (e) {
debug('TASK ERROR ', this.name, e)
}
}
}
/**
* Manage tasks:
* - Send emails
* - Send AP notifications
* - Create recurrent events
* - Sync AP federation profiles
*/
class TaskManager {
constructor () {
this.interval = 60 * 100
this.tasks = []
}
start (interval = 60 * 100) {
this.interval = interval
this.timeout = setTimeout(this.tick.bind(this), interval)
}
stop () {
if (this.timeout) {
debug('STOP')
clearTimeout(this.timeout)
this.timeout = false
}
}
add (task) {
debug('ADD TASK ', task.name)
this.tasks.push(task)
}
process () {
if (!this.tasks.length) {
return
}
this.tasks = this.tasks
.filter(async task => {
if (task.removable) {
await task.process()
} else {
return task
}
})
return Promise.all(this.tasks.map(task => task.process()))
}
async tick () {
debug('TICK')
await this.process()
this.timeout = setTimeout(this.tick.bind(this), this.interval)
}
}
const TS = new TaskManager()
// create and clean recurrent events
TS.add(new Task({
name: 'RECURRENT_EVENT',
method: eventController._createRecurrent,
repeatEach: 10
}))
// daily morning notification
// TS.add(new Task({
// name: 'NOTIFICATION',
// method: notifier._daily,
// repeatEach: 1
// }))
// AP users profile sync
// TaskManager.add(new Task({
// name: 'AP_PROFILE_SYNC',
// method: federation._sync,
// repeatEach: 60 * 24
// }))
// Search for places position via nominatim
// TaskManager.add(new Task({
// name: 'NOMINATIM_QUERY',
// method: places._nominatimQuery,
// repeatEach: 60
// }))
// TS.start()
// TS.add(new Task({ name: 'removable #1', method: daje, args: ['removable #1'], removable: true }))
// TS.add(new Task({ name: 'non removable #2', method: daje, args: ['non removable #2'] }))
// TS.add(new Task({ name: 'non removable and repeat each #2', method: daje, args: ['nn rm and rpt #5'], repeatEach: 5 }))
module.exports = { Task, TaskManager: TS }