const SocketWrapper = require('./SocketWrapper');
const workerThreads = require('worker_threads');
const { constant, Context, Command, Object3D } = require('@ud-viz/game_shared');
const {
objectToInt32Array,
int32ArrayToObject,
ProcessInterval,
} = require('@ud-viz/utils_shared');
const THREE = require('three');
/**
* Different Event between Parent and Child
*/
const MESSAGE_EVENT = {
// parent => child
INIT: 'init',
COMMANDS: 'commands',
ADD_OBJECT3D: 'add_object3D',
REMOVE_OBJECT3D: 'remove_object3D',
ON_NEW_SOCKET_WRAPPER: 'on_new_socket_wrapper',
ON_SOCKET_WRAPPER_REMOVE: 'on_socket_wrapper_remove',
// child => parent
CURRENT_STATE: 'current_state',
APPLY_RESOLVE: 'apply_resolve',
};
const CHILD_EVENT = {
ON_GAME_CONTEXT_LOADED: 'on_game_context_loaded',
};
/**
* Event message structure
*/
const MESSAGE_KEY = {
DATA: 0,
TYPE: 1,
APPLY_UUID: 2, // mean parent is waiting an answer from child to resolve apply promise
};
/**
* @classdesc - {@link workerThreads} wrapper, different event can be send/receive by the thread
*/
const Parent = class {
/**
* Manage communication between socket wrapper and worker thread
*
* @param {string} threadProcessPath - path to the thread process
*/
constructor(threadProcessPath) {
if (!workerThreads.isMainThread) {
throw new Error('Its not the main thread');
}
/**
* worker
*
@type {workerThreads.Worker}*/
this.worker = new workerThreads.Worker(threadProcessPath);
/** @type {Object<string,Function>} */
this.callbacks = {};
/**
* current socket wrapper connected in thread
*
@type {Array<SocketWrapper>}*/
this.socketWrappers = [];
/** @type {Object<string,Function>} */
this._resolveApply = {};
// listen
this.worker.on('message', (int32ArrayMessage) => {
const objectMessage = int32ArrayToObject(int32ArrayMessage);
if (this.callbacks[objectMessage[MESSAGE_KEY.TYPE]]) {
this.callbacks[objectMessage[MESSAGE_KEY.TYPE]](
objectMessage[MESSAGE_KEY.DATA]
);
}
});
// wait resolve signal from child thread
this.on(MESSAGE_EVENT.APPLY_RESOLVE, (applyUUID) => {
const resolve = this._resolveApply[applyUUID];
if (!resolve) throw new Error('no resolve for ', applyUUID);
resolve();
delete this._resolveApply[applyUUID];
});
}
/**
* Add a socket wrapper in this thread
*
* @param {SocketWrapper} socketWrapper - socket wrapper to add
*/
addSocketWrapper(socketWrapper) {
this.socketWrappers.push(socketWrapper);
this.post(MESSAGE_EVENT.ON_NEW_SOCKET_WRAPPER, socketWrapper.socket.id);
// reset last state of socket wrapper
socketWrapper.lastStateSend = null;
// reset commands link
socketWrapper.socket.removeAllListeners(
constant.WEBSOCKET.MSG_TYPE.COMMANDS
);
socketWrapper.socket.on(
constant.WEBSOCKET.MSG_TYPE.COMMANDS,
(commands) => {
this.post(MESSAGE_EVENT.COMMANDS, commands);
}
);
}
/**
*
* @param {SocketWrapper} socketWrapper - socket wrapper to remove
* @returns {boolean} - true if removed
*/
removeSocketWrapper(socketWrapper) {
const index = this.socketWrappers.indexOf(socketWrapper);
if (index >= 0) this.socketWrappers.splice(index, 1);
this.post(MESSAGE_EVENT.ON_SOCKET_WRAPPER_REMOVE, socketWrapper.socket.id);
socketWrapper.socket.removeAllListeners(
constant.WEBSOCKET.MSG_TYPE.COMMANDS
);
return index >= 0;
}
/**
* Send Message to child thread
*
* @param {string} msgType - message type
* @param {object} data - seriablizable data
*/
post(msgType, data) {
const object = {};
object[MESSAGE_KEY.TYPE] = msgType;
object[MESSAGE_KEY.DATA] = data;
this.worker.postMessage(objectToInt32Array(object));
}
/**
* Receive message from child thread
*
* @param {string} msgType - message type
* @param {Function} callback - callback to apply
*/
on(msgType, callback) {
this.callbacks[msgType] = callback;
}
/**
* Same as `this.post` but return a promise resolving when thread child has applied message
*
*
* @param {string} msgType - message type
* @param {object} data - seriablizable data
* @returns {Promise} - promise resolving when thread child has applied message
*/
apply(msgType, data) {
return new Promise((resolve) => {
const object = {};
object[MESSAGE_KEY.TYPE] = msgType;
object[MESSAGE_KEY.DATA] = data;
const applyUUID = THREE.MathUtils.generateUUID();
object[MESSAGE_KEY.APPLY_UUID] = applyUUID;
this.worker.postMessage(objectToInt32Array(object));
this._resolveApply[applyUUID] = resolve;
});
}
};
/**
* @class class containing parentPort and a gameContext of a child thread {@link Child}
* most of the time you want to use the method `on` to trigger event
*/
class Child {
constructor() {
if (workerThreads.isMainThread) {
throw new Error('Its not a worker');
}
/** @type {Context} */
this.gameContext = null;
/** @type {Object<string,Promise>} */
this.promises = {};
}
/**
* Run child process which can be summarize as so:
* Listen {@link EVENT} of the parent and wait the gameobject
* When gameobject is received launch a {@link Context} and step it over time
*
* @param {Object<string,Function>} gameScriptClass - class needs by object3D
*/
start(gameScriptClass = {}) {
// buffer
let commands = null;
workerThreads.parentPort.on('message', (int32ArrayMessage) => {
const objectMessage = int32ArrayToObject(int32ArrayMessage);
const data = objectMessage[MESSAGE_KEY.DATA];
const applyUUID = objectMessage[MESSAGE_KEY.APPLY_UUID];
const promises = [];
// dispatch for custom event & record promise associated for apply resolve
promises.push(this.dispatch(objectMessage[MESSAGE_KEY.TYPE], data));
switch (objectMessage[MESSAGE_KEY.TYPE]) {
case MESSAGE_EVENT.INIT:
this.gameContext = new Context(
gameScriptClass,
new Object3D(data.gameObject3D)
);
// init is not sync record promise
promises.push(
this.gameContext.load().then(() => {
const process = new ProcessInterval({ fps: 60 });
process.start((dt) => {
// simulation
this.gameContext.step(dt);
const currentState = this.gameContext.toState(false); // false because no need to send component already controlled
// post state to main thread
const message = {};
message[MESSAGE_KEY.TYPE] = MESSAGE_EVENT.CURRENT_STATE;
message[MESSAGE_KEY.DATA] = currentState.toJSON();
workerThreads.parentPort.postMessage(
objectToInt32Array(message)
);
});
console.log(
'child process',
this.gameContext.object3D.name,
'loaded',
this.gameContext.object3D.uuid
);
this.dispatch(
CHILD_EVENT.ON_GAME_CONTEXT_LOADED,
this.gameContext
);
})
);
break;
case MESSAGE_EVENT.COMMANDS:
commands = [];
data.forEach(function (c) {
commands.push(new Command(c));
});
this.gameContext.onCommands(commands);
break;
case MESSAGE_EVENT.ADD_OBJECT3D:
// add is not sync record in promises for apply
promises.push(
this.gameContext
.addObject3D(new Object3D(data.object3D), data.parentUUID)
.then(() => {
if (data.updateCollisionBuffer)
this.gameContext.updateCollisionBuffer();
})
);
break;
case MESSAGE_EVENT.REMOVE_OBJECT3D:
this.gameContext.removeObject3D(data);
break;
case MESSAGE_EVENT.ON_NEW_SOCKET_WRAPPER:
this.gameContext.dispatch(MESSAGE_EVENT.ON_NEW_SOCKET_WRAPPER, data);
break;
case MESSAGE_EVENT.ON_SOCKET_WRAPPER_REMOVE:
this.gameContext.dispatch(
MESSAGE_EVENT.ON_SOCKET_WRAPPER_REMOVE,
data
);
break;
default:
}
if (applyUUID) {
Promise.all(promises).then(() => {
const applyResolveMessage = {};
applyResolveMessage[MESSAGE_KEY.TYPE] = MESSAGE_EVENT.APPLY_RESOLVE;
applyResolveMessage[MESSAGE_KEY.DATA] = applyUUID;
workerThreads.parentPort.postMessage(
objectToInt32Array(applyResolveMessage)
);
});
}
});
}
/**
* @callback PromiseListener
* @param {*} params
* @returns {Promise}
*/
/**
* Add a listener to parent thread message
*
* @param {string} eventID - event id
* @param {PromiseListener|undefined} promise - callback return a promise or nothing
*/
on(eventID, promise) {
this.promises[eventID] = promise;
}
/**
*
* @param {string} eventID - event to notify listener of
* @param {object} data - serializable data
* @returns {Promise} - promise resolving when event has finished
*/
dispatch(eventID, data) {
if (this.promises[eventID]) {
return this.promises[eventID](data);
}
return Promise.resolve();
}
}
module.exports = {
Parent: Parent,
Child: Child,
MESSAGE_EVENT: MESSAGE_EVENT,
MESSAGE_KEY: MESSAGE_KEY,
CHILD_EVENT: CHILD_EVENT,
};