520 lines
18 KiB
JavaScript
520 lines
18 KiB
JavaScript
const { Queue, QueueEvents, Job, Worker } = require('bullmq');
|
|
const { redisConnection } = require('./config/redis.js');
|
|
const { getIo } = require('./socket');
|
|
const axios = require('axios');
|
|
const controllers = require(`./controllers/archiveController.js`);
|
|
const { GoogleGenerativeAI } = require('@google/generative-ai');
|
|
const pool = require('./db/pool.js');
|
|
const { GetObjectCommand } = require('@aws-sdk/client-s3');
|
|
const onPremiseClient = require('./config/onPremiseClient.js');
|
|
const cloudClient = require('./config/cloudClient.js');
|
|
|
|
function getS3(storageType) {
|
|
return storageType === 'Cloud' ? cloudClient : onPremiseClient;
|
|
}
|
|
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker1 변환 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
//// 변환 큐 객체 생성
|
|
const convertPdfQueue = new Queue('convert-pdf', {
|
|
connection: redisConnection,
|
|
});
|
|
|
|
//// 변환 큐 이벤트 객체 생성
|
|
const convertPdfQueueEvents = new QueueEvents('convert-pdf', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
|
|
//removeOnComplete : 수량 - 수량만큼 완료된 작업 보관
|
|
|
|
//// 변환 완료/실패 후 소켓으로 convertPdf_success/convertPdf_failed 이벤트 전송
|
|
convertPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
|
|
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
|
|
|
|
let resultData = returnvalue;
|
|
let io = getIo();
|
|
io.emit('convertPdf_success', resultData);
|
|
});
|
|
convertPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => {
|
|
// console.log(`⚠️ Job ${job.id} 실패:`, failedReason);
|
|
|
|
const job = await Job.fromId(convertPdfQueue, jobId);
|
|
let jobName = job.name;
|
|
let jobQueueName = job.queue.name;
|
|
let { resourcePath, projectId, userInfoString, serviceName, dataId } = job.data;
|
|
if (!resourcePath && job.progress && typeof job.progress === 'object') resourcePath = job.progress.resourcePath;
|
|
if (!projectId && job.progress && typeof job.progress === 'object') projectId = job.progress.projectId;
|
|
if (!userInfoString && job.progress && typeof job.progress === 'object') userInfoString = job.progress.userInfoString;
|
|
if (!serviceName && job.progress && typeof job.progress === 'object') serviceName = job.progress.serviceName;
|
|
if (!dataId && job.progress && typeof job.progress === 'object') dataId = job.progress.dataId;
|
|
let userInfo = JSON.parse(userInfoString);
|
|
let { user_id, user_nm } = userInfo;
|
|
|
|
console.log('');
|
|
console.error('============================= 변환실패');
|
|
console.log('jobName :', jobName);
|
|
console.log('jobQueueName :',jobQueueName );
|
|
console.log('serviceName :', serviceName);
|
|
console.log('projectId :', projectId);
|
|
console.log('resourcePath :', resourcePath);
|
|
console.log('user_nm :', user_nm);
|
|
console.log('user_id :', user_id);
|
|
console.log('dataId :', dataId);
|
|
console.log('');
|
|
|
|
// let resultData = (job.progress == 0) ? job.data : job.progress;
|
|
let resultData = { jobData: job.data, jobProgress: job.progress };
|
|
|
|
let io = getIo();
|
|
io.emit('convertPdf_failed', resultData);
|
|
});
|
|
|
|
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker1 변환 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
|
|
|
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker2 pdf_thumb 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
//// test 큐 객체 생성
|
|
const thumbQueue = new Queue('pdf-thumb', {connection: redisConnection});
|
|
|
|
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker2 pdf_thumb 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
|
|
|
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker3 폴더압축 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
//// 폴더압축 큐 객체 생성
|
|
const zipFolderQueue = new Queue('zip-folder', {connection: redisConnection});
|
|
|
|
//// 변환 큐 이벤트 객체 생성
|
|
const zipFolderPdfQueueEvents = new QueueEvents('zip-folder', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
|
|
|
|
zipFolderPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
|
|
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
|
|
|
|
let resultData = returnvalue;
|
|
|
|
let userInfo = JSON.parse(resultData.userInfoString);
|
|
let clientId = userInfo.clientId;
|
|
|
|
let io = getIo();
|
|
|
|
if(!io.folderDownloadList){
|
|
io.folderDownloadList = new folderDownloadManager();
|
|
}
|
|
|
|
io.folderDownloadList.putList(resultData);
|
|
io.to(clientId).emit('zip-prepared', resultData);
|
|
});
|
|
zipFolderPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => {
|
|
// console.log(`⚠️ Job ${jobId} 실패:`, failedReason);
|
|
});
|
|
|
|
// 폴더 압축 프로그레스 - 실시간으로 진행률 및 기타 정보 받아서 다운로드 한 사용자에게 소켓으로 전달
|
|
zipFolderPdfQueueEvents.on('progress', ({ data }) => {
|
|
let userInfo = JSON.parse(data.userInfoString);
|
|
let clientId = userInfo.clientId;
|
|
let io = getIo();
|
|
io.to(clientId).emit('zip-progress', data);
|
|
});
|
|
|
|
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker3 폴더압축 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
|
|
|
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker5 동영상 후처리 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
//// 동영상 후처리 큐 객체 생성
|
|
const postProcessVideoQueue = new Queue('post-process-video', {connection: redisConnection});
|
|
|
|
// //// 동영상 후처리 큐 이벤트 객체 생성
|
|
const postProcessVideoQueueEvents = new QueueEvents('post-process-video', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
|
|
|
|
// //// 동영상 후처리 완료/실패 후 소켓으로 postProcessVideo_success/postProcessVideo_failed 이벤트 전송
|
|
postProcessVideoQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
|
|
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
|
|
|
|
let resultData = returnvalue;
|
|
resultData.resourcePathArr = [resultData.resourcePath];
|
|
let io = getIo();
|
|
io.emit('postProcessVideo_success', resultData);
|
|
});
|
|
|
|
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker5 동영상 후처리 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 summarizeAI 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
//// summarizeAI 큐 객체 생성
|
|
const summarizeAIQueue = new Queue('ai-summarize', {connection: redisConnection});
|
|
|
|
//// summarizeAI 큐 이벤트 객체 생성
|
|
const summarizeAIQueueEvents = new QueueEvents('ai-summarize', { connection: redisConnection});
|
|
|
|
summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
|
|
let resultData = returnvalue;
|
|
|
|
const { projectId, resourcePath, dataId, userInfoString, text, initiator, type, totalToken } = resultData;
|
|
|
|
console.log('@@@@@@@@@@@@@@@@@@@')
|
|
console.log(totalToken)
|
|
|
|
const env = process.env.NODE_ENV;
|
|
const deploymentType = process.env.DEPLOYMENT_TYPE;
|
|
const cloudType = process.env.CLOUD_TYPE;
|
|
|
|
let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`;
|
|
if(env == 'production') {
|
|
if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE';
|
|
if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`;
|
|
}
|
|
|
|
if (initiator.includes(initiator_ref)) {
|
|
let addSummarizeAiLogParams = {
|
|
projectId: projectId,
|
|
activity: 'summarizeAI',
|
|
userInfoString: userInfoString,
|
|
resourcePath: resourcePath,
|
|
resourcePathArr: [resourcePath],
|
|
dataId: dataId,
|
|
dataIdArr: [dataId],
|
|
text: text,
|
|
initiator: initiator,
|
|
isState: true,
|
|
type: type
|
|
};
|
|
|
|
await controllers.addSummarizeAiLog(addSummarizeAiLogParams);
|
|
}
|
|
});
|
|
|
|
summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => {
|
|
// 실패한 작업의 파일 정보 찾기
|
|
const job = await Job.fromId(summarizeAIQueue, jobId);
|
|
|
|
let io = getIo();
|
|
io.emit('summarize_failed', {
|
|
resourcePath: job.data.resourcePath,
|
|
dataId: job.data.dataId,
|
|
text: failedReason,
|
|
type: job.data.type,
|
|
});
|
|
})
|
|
|
|
// summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
|
|
// // console.log(returnvalue);
|
|
// // console.log('AI 요약 성공');
|
|
|
|
// let resultData = returnvalue;
|
|
// let io = getIo();
|
|
// io.emit('summarize_success', resultData);
|
|
|
|
// });
|
|
|
|
// summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => {
|
|
// // console.log('AI 요약 실패' + failedReason);
|
|
|
|
|
|
// // 실패한 작업의 파일 정보 찾기
|
|
// const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId);
|
|
// if (failedJob) {
|
|
// let io = getIo();
|
|
// io.emit('summarize_failed', {
|
|
// resourcePath: failedJob.resourcePath,
|
|
// dataId: failedJob.dataId,
|
|
// });
|
|
|
|
// // 배열에서 실패한 작업 제거
|
|
// summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId);
|
|
// }
|
|
// });
|
|
|
|
// 🔺🔺🔺🔺🔺🔺🔺🔺 summarizeAI 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker6 summarizeAPI 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
//// summarizeAPI 큐 객체 생성
|
|
const summarizeAPIQueue = new Queue('api-summarize', { connection: redisConnection});
|
|
|
|
//// summarizeAPI 큐 이벤트 객체 생성
|
|
const summarizeAPIQueueEvents = new QueueEvents('api-summarize', {connection: redisConnection});
|
|
|
|
summarizeAPIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
|
|
let resultData = returnvalue;
|
|
|
|
const { projectId, resourcePath, dataId, userInfoString, text, initiator } = resultData;
|
|
|
|
const env = process.env.NODE_ENV;
|
|
const deploymentType = process.env.DEPLOYMENT_TYPE;
|
|
const cloudType = process.env.CLOUD_TYPE;
|
|
|
|
let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`; //dev는 구별하기 어려워서 돌린 사람만 들어가도록 설정
|
|
if (env == 'production') {
|
|
if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE';
|
|
if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`;
|
|
}
|
|
|
|
if(initiator.includes(initiator_ref)) {
|
|
let addSummarizeAiLogParams = {
|
|
projectId: projectId,
|
|
activity: 'summarizeAI',
|
|
userInfoString: userInfoString,
|
|
resourcePath: resourcePath,
|
|
resourcePathArr: [resourcePath],
|
|
dataId: dataId,
|
|
dataIdArr: [dataId],
|
|
text: text,
|
|
initiator: initiator,
|
|
isState: true
|
|
};
|
|
|
|
await controllers.addSummarizeAiLog(addSummarizeAiLogParams);
|
|
}
|
|
});
|
|
|
|
summarizeAPIQueueEvents.on('failed', async({ jobId, failedReason}) => {
|
|
// 실패한 작업의 파일 정보 찾기
|
|
// const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId);
|
|
// if (failedJob) {
|
|
const job = await Job.fromId(summarizeAPIQueue, jobId);
|
|
|
|
let io = getIo();
|
|
io.emit('summarize_failed', {
|
|
resourcePath: job.data.resourcePath,
|
|
dataId: job.data.dataId,
|
|
text: failedReason
|
|
});
|
|
|
|
// 배열에서 실패한 작업 제거
|
|
// summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId);
|
|
// }
|
|
})
|
|
|
|
// 🔺🔺🔺🔺🔺🔺🔺🔺 summarizeAPI 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
|
|
|
//임시
|
|
class folderDownloadManager{
|
|
constructor(){
|
|
this.array = [];
|
|
}
|
|
|
|
getMyList(userId){
|
|
let result = [];
|
|
for(let task in this.array){
|
|
let userInfo = JSON.parse(task.data.userInfoString);
|
|
if(userInfo.user_id === userId) result.push(task);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
putList(data){
|
|
const addedAt = Date.now();
|
|
const taskId = this.makeId();
|
|
|
|
let task = {
|
|
taskId : taskId,
|
|
data : data,
|
|
addedAt : addedAt,
|
|
timeoutId : null
|
|
}
|
|
|
|
this.array.push(task);
|
|
|
|
let deleteUrl = data.deleteUrl;
|
|
|
|
task.timeoutId = setTimeout(()=>{
|
|
this.removeList(taskId, deleteUrl);
|
|
}, 24*60*60*1000/* 15*1000 */);//15초 테스트
|
|
}
|
|
|
|
async removeList(dataId, deleteUrl){
|
|
// const initialLength = this.array.length;
|
|
|
|
//목록 삭제 이전 실제 데이터 삭제부분 추가 필요 - deleteUrl로...
|
|
const response = await axios({
|
|
method: 'DELETE',
|
|
url: deleteUrl,
|
|
headers: {
|
|
'Content-Type': 'application/xml'
|
|
},
|
|
timeout: 10000 // 10초 타임아웃
|
|
});
|
|
|
|
console.log('✅ 파일 삭제 성공:', response.status);
|
|
|
|
this.array = this.array.filter(task=>{
|
|
if(task.taskId === dataId && task.timeoutId){
|
|
clearTimeout(task.timeoutId);
|
|
}
|
|
return task.taskId !== dataId;
|
|
});
|
|
}
|
|
|
|
makeId(){
|
|
return Math.floor(Math.random() * 9000) + 1000;
|
|
}
|
|
}
|
|
|
|
|
|
// 🔻🔻🔻🔻🔻🔻🔻🔻 Gemini AI Workers 🔻🔻🔻🔻🔻🔻🔻🔻
|
|
|
|
async function generateContentWithRetry(model, params, maxRetries = 3, delayMs = 2000) {
|
|
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
|
try {
|
|
return await model.generateContent(params);
|
|
} catch (err) {
|
|
console.error(`⚠️ Gemini API 호출 실패 (시도 ${attempt}/${maxRetries}):`, err.message);
|
|
if (attempt === maxRetries) {
|
|
throw err;
|
|
}
|
|
if (err.message.includes('503') || err.message.includes('429') || err.message.includes('high demand') || err.message.includes('ResourceExhausted')) {
|
|
console.log(`⏳ 일시적 API 서버 장애/트래픽 과부하 감지. ${delayMs}ms 후 자동 재시도합니다...`);
|
|
await new Promise(resolve => setTimeout(resolve, delayMs));
|
|
} else {
|
|
throw err;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const summarizeAIWorker = new Worker('ai-summarize', async (job) => {
|
|
const { prompt, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data;
|
|
|
|
// 1. DB에서 object_key 조회
|
|
const tbData = process.env.NODE_ENV === 'production' ? 'tb_data' : '_test_tb_data';
|
|
const dbRes = await pool.query(`SELECT object_key FROM ver4.${tbData} WHERE data_id = $1`, [dataId]);
|
|
if (dbRes.rows.length === 0) {
|
|
throw new Error(`DB에서 dataId ${dataId}에 해당하는 파일을 찾을 수 없습니다.`);
|
|
}
|
|
const objectKey = dbRes.rows[0].object_key;
|
|
|
|
// 2. MinIO S3에서 파일 다운로드
|
|
const s3Client = getS3(storageType);
|
|
const getCommand = new GetObjectCommand({ Bucket: projectId, Key: objectKey });
|
|
const s3Response = await s3Client.send(getCommand);
|
|
|
|
const buffer = await new Promise((resolve, reject) => {
|
|
const chunks = [];
|
|
s3Response.Body.on('data', (chunk) => chunks.push(chunk));
|
|
s3Response.Body.on('error', reject);
|
|
s3Response.Body.on('end', () => resolve(Buffer.concat(chunks)));
|
|
});
|
|
|
|
const base64Pdf = buffer.toString('base64');
|
|
|
|
// 3. Gemini API 호출
|
|
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
|
|
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
|
|
|
|
const result = await generateContentWithRetry(model, {
|
|
contents: [
|
|
{
|
|
parts: [
|
|
{ text: prompt },
|
|
{
|
|
inline_data: {
|
|
mime_type: 'application/pdf',
|
|
data: base64Pdf,
|
|
},
|
|
},
|
|
],
|
|
},
|
|
],
|
|
});
|
|
|
|
const summaryText = await result.response.text();
|
|
|
|
const countResult = await model.countTokens({
|
|
contents: [
|
|
{
|
|
parts: [
|
|
{ text: prompt },
|
|
{
|
|
inline_data: {
|
|
mime_type: 'application/pdf',
|
|
data: base64Pdf,
|
|
},
|
|
}
|
|
]
|
|
}
|
|
]
|
|
});
|
|
|
|
return {
|
|
projectId,
|
|
resourcePath,
|
|
dataId,
|
|
userInfoString,
|
|
text: summaryText,
|
|
initiator,
|
|
type,
|
|
totalToken: countResult.totalTokens
|
|
};
|
|
}, {
|
|
connection: redisConnection,
|
|
lockDuration: 90000 // lock 유지 시간을 90초로 늘려 stalled 에러 방지
|
|
});
|
|
|
|
const summarizeAPIWorker = new Worker('api-summarize', async (job) => {
|
|
const { prompt, url, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data;
|
|
|
|
// 1. Presigned URL을 통해 axios로 파일 다운로드
|
|
const response = await axios.get(url, { responseType: 'arraybuffer' });
|
|
const buffer = Buffer.from(response.data);
|
|
const base64Pdf = buffer.toString('base64');
|
|
|
|
// 2. Gemini API 호출
|
|
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
|
|
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
|
|
|
|
const result = await generateContentWithRetry(model, {
|
|
contents: [
|
|
{
|
|
parts: [
|
|
{ text: prompt },
|
|
{
|
|
inline_data: {
|
|
mime_type: 'application/pdf',
|
|
data: base64Pdf,
|
|
},
|
|
},
|
|
],
|
|
},
|
|
],
|
|
});
|
|
|
|
const summaryText = await result.response.text();
|
|
|
|
const countResult = await model.countTokens({
|
|
contents: [
|
|
{
|
|
parts: [
|
|
{ text: prompt },
|
|
{
|
|
inline_data: {
|
|
mime_type: 'application/pdf',
|
|
data: base64Pdf,
|
|
},
|
|
}
|
|
]
|
|
}
|
|
]
|
|
});
|
|
|
|
return {
|
|
projectId,
|
|
resourcePath,
|
|
dataId,
|
|
userInfoString,
|
|
text: summaryText,
|
|
initiator,
|
|
type,
|
|
totalToken: countResult.totalTokens
|
|
};
|
|
}, {
|
|
connection: redisConnection,
|
|
lockDuration: 90000 // lock 유지 시간을 90초로 늘려 stalled 에러 방지
|
|
});
|
|
|
|
module.exports = { convertPdfQueue, zipFolderQueue, thumbQueue, postProcessVideoQueue, summarizeAIQueue, summarizeAPIQueue };
|
|
|