Files
PM_test/queue.js
2026-06-18 08:52:23 +09:00

520 lines
18 KiB
JavaScript

const { Queue, QueueEvents, Job, Worker } = require('bullmq');
const { redisConnection } = require('./config/redis.js');
const { getIo } = require('./socket');
const axios = require('axios');
const controllers = require(`./controllers/archiveController.js`);
const { GoogleGenerativeAI } = require('@google/generative-ai');
const pool = require('./db/pool.js');
const { GetObjectCommand } = require('@aws-sdk/client-s3');
const onPremiseClient = require('./config/onPremiseClient.js');
const cloudClient = require('./config/cloudClient.js');
function getS3(storageType) {
return storageType === 'Cloud' ? cloudClient : onPremiseClient;
}
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker1 변환 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
//// 변환 큐 객체 생성
const convertPdfQueue = new Queue('convert-pdf', {
connection: redisConnection,
});
//// 변환 큐 이벤트 객체 생성
const convertPdfQueueEvents = new QueueEvents('convert-pdf', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
//removeOnComplete : 수량 - 수량만큼 완료된 작업 보관
//// 변환 완료/실패 후 소켓으로 convertPdf_success/convertPdf_failed 이벤트 전송
convertPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
let resultData = returnvalue;
let io = getIo();
io.emit('convertPdf_success', resultData);
});
convertPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => {
// console.log(`⚠️ Job ${job.id} 실패:`, failedReason);
const job = await Job.fromId(convertPdfQueue, jobId);
let jobName = job.name;
let jobQueueName = job.queue.name;
let { resourcePath, projectId, userInfoString, serviceName, dataId } = job.data;
if (!resourcePath && job.progress && typeof job.progress === 'object') resourcePath = job.progress.resourcePath;
if (!projectId && job.progress && typeof job.progress === 'object') projectId = job.progress.projectId;
if (!userInfoString && job.progress && typeof job.progress === 'object') userInfoString = job.progress.userInfoString;
if (!serviceName && job.progress && typeof job.progress === 'object') serviceName = job.progress.serviceName;
if (!dataId && job.progress && typeof job.progress === 'object') dataId = job.progress.dataId;
let userInfo = JSON.parse(userInfoString);
let { user_id, user_nm } = userInfo;
console.log('');
console.error('============================= 변환실패');
console.log('jobName :', jobName);
console.log('jobQueueName :',jobQueueName );
console.log('serviceName :', serviceName);
console.log('projectId :', projectId);
console.log('resourcePath :', resourcePath);
console.log('user_nm :', user_nm);
console.log('user_id :', user_id);
console.log('dataId :', dataId);
console.log('');
// let resultData = (job.progress == 0) ? job.data : job.progress;
let resultData = { jobData: job.data, jobProgress: job.progress, failedReason: failedReason };
let io = getIo();
io.emit('convertPdf_failed', resultData);
});
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker1 변환 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker2 pdf_thumb 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
//// test 큐 객체 생성
const thumbQueue = new Queue('pdf-thumb', {connection: redisConnection});
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker2 pdf_thumb 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker3 폴더압축 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
//// 폴더압축 큐 객체 생성
const zipFolderQueue = new Queue('zip-folder', {connection: redisConnection});
//// 변환 큐 이벤트 객체 생성
const zipFolderPdfQueueEvents = new QueueEvents('zip-folder', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
zipFolderPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
let resultData = returnvalue;
let userInfo = JSON.parse(resultData.userInfoString);
let clientId = userInfo.clientId;
let io = getIo();
if(!io.folderDownloadList){
io.folderDownloadList = new folderDownloadManager();
}
io.folderDownloadList.putList(resultData);
io.to(clientId).emit('zip-prepared', resultData);
});
zipFolderPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => {
// console.log(`⚠️ Job ${jobId} 실패:`, failedReason);
});
// 폴더 압축 프로그레스 - 실시간으로 진행률 및 기타 정보 받아서 다운로드 한 사용자에게 소켓으로 전달
zipFolderPdfQueueEvents.on('progress', ({ data }) => {
let userInfo = JSON.parse(data.userInfoString);
let clientId = userInfo.clientId;
let io = getIo();
io.to(clientId).emit('zip-progress', data);
});
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker3 폴더압축 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker5 동영상 후처리 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
//// 동영상 후처리 큐 객체 생성
const postProcessVideoQueue = new Queue('post-process-video', {connection: redisConnection});
// //// 동영상 후처리 큐 이벤트 객체 생성
const postProcessVideoQueueEvents = new QueueEvents('post-process-video', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
// //// 동영상 후처리 완료/실패 후 소켓으로 postProcessVideo_success/postProcessVideo_failed 이벤트 전송
postProcessVideoQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
let resultData = returnvalue;
resultData.resourcePathArr = [resultData.resourcePath];
let io = getIo();
io.emit('postProcessVideo_success', resultData);
});
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker5 동영상 후처리 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
// 🔻🔻🔻🔻🔻🔻🔻🔻 summarizeAI 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
//// summarizeAI 큐 객체 생성
const summarizeAIQueue = new Queue('ai-summarize', {connection: redisConnection});
//// summarizeAI 큐 이벤트 객체 생성
const summarizeAIQueueEvents = new QueueEvents('ai-summarize', { connection: redisConnection});
summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
let resultData = returnvalue;
const { projectId, resourcePath, dataId, userInfoString, text, initiator, type, totalToken } = resultData;
console.log('@@@@@@@@@@@@@@@@@@@')
console.log(totalToken)
const env = process.env.NODE_ENV;
const deploymentType = process.env.DEPLOYMENT_TYPE;
const cloudType = process.env.CLOUD_TYPE;
let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`;
if(env == 'production') {
if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE';
if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`;
}
if (initiator.includes(initiator_ref)) {
let addSummarizeAiLogParams = {
projectId: projectId,
activity: 'summarizeAI',
userInfoString: userInfoString,
resourcePath: resourcePath,
resourcePathArr: [resourcePath],
dataId: dataId,
dataIdArr: [dataId],
text: text,
initiator: initiator,
isState: true,
type: type
};
await controllers.addSummarizeAiLog(addSummarizeAiLogParams);
}
});
summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => {
// 실패한 작업의 파일 정보 찾기
const job = await Job.fromId(summarizeAIQueue, jobId);
let io = getIo();
io.emit('summarize_failed', {
resourcePath: job.data.resourcePath,
dataId: job.data.dataId,
text: failedReason,
type: job.data.type,
});
})
// summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
// // console.log(returnvalue);
// // console.log('AI 요약 성공');
// let resultData = returnvalue;
// let io = getIo();
// io.emit('summarize_success', resultData);
// });
// summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => {
// // console.log('AI 요약 실패' + failedReason);
// // 실패한 작업의 파일 정보 찾기
// const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId);
// if (failedJob) {
// let io = getIo();
// io.emit('summarize_failed', {
// resourcePath: failedJob.resourcePath,
// dataId: failedJob.dataId,
// });
// // 배열에서 실패한 작업 제거
// summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId);
// }
// });
// 🔺🔺🔺🔺🔺🔺🔺🔺 summarizeAI 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker6 summarizeAPI 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
//// summarizeAPI 큐 객체 생성
const summarizeAPIQueue = new Queue('api-summarize', { connection: redisConnection});
//// summarizeAPI 큐 이벤트 객체 생성
const summarizeAPIQueueEvents = new QueueEvents('api-summarize', {connection: redisConnection});
summarizeAPIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
let resultData = returnvalue;
const { projectId, resourcePath, dataId, userInfoString, text, initiator } = resultData;
const env = process.env.NODE_ENV;
const deploymentType = process.env.DEPLOYMENT_TYPE;
const cloudType = process.env.CLOUD_TYPE;
let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`; //dev는 구별하기 어려워서 돌린 사람만 들어가도록 설정
if (env == 'production') {
if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE';
if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`;
}
if(initiator.includes(initiator_ref)) {
let addSummarizeAiLogParams = {
projectId: projectId,
activity: 'summarizeAI',
userInfoString: userInfoString,
resourcePath: resourcePath,
resourcePathArr: [resourcePath],
dataId: dataId,
dataIdArr: [dataId],
text: text,
initiator: initiator,
isState: true
};
await controllers.addSummarizeAiLog(addSummarizeAiLogParams);
}
});
summarizeAPIQueueEvents.on('failed', async({ jobId, failedReason}) => {
// 실패한 작업의 파일 정보 찾기
// const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId);
// if (failedJob) {
const job = await Job.fromId(summarizeAPIQueue, jobId);
let io = getIo();
io.emit('summarize_failed', {
resourcePath: job.data.resourcePath,
dataId: job.data.dataId,
text: failedReason
});
// 배열에서 실패한 작업 제거
// summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId);
// }
})
// 🔺🔺🔺🔺🔺🔺🔺🔺 summarizeAPI 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
//임시
class folderDownloadManager{
constructor(){
this.array = [];
}
getMyList(userId){
let result = [];
for(let task in this.array){
let userInfo = JSON.parse(task.data.userInfoString);
if(userInfo.user_id === userId) result.push(task);
}
return result;
}
putList(data){
const addedAt = Date.now();
const taskId = this.makeId();
let task = {
taskId : taskId,
data : data,
addedAt : addedAt,
timeoutId : null
}
this.array.push(task);
let deleteUrl = data.deleteUrl;
task.timeoutId = setTimeout(()=>{
this.removeList(taskId, deleteUrl);
}, 24*60*60*1000/* 15*1000 */);//15초 테스트
}
async removeList(dataId, deleteUrl){
// const initialLength = this.array.length;
//목록 삭제 이전 실제 데이터 삭제부분 추가 필요 - deleteUrl로...
const response = await axios({
method: 'DELETE',
url: deleteUrl,
headers: {
'Content-Type': 'application/xml'
},
timeout: 10000 // 10초 타임아웃
});
console.log('✅ 파일 삭제 성공:', response.status);
this.array = this.array.filter(task=>{
if(task.taskId === dataId && task.timeoutId){
clearTimeout(task.timeoutId);
}
return task.taskId !== dataId;
});
}
makeId(){
return Math.floor(Math.random() * 9000) + 1000;
}
}
// 🔻🔻🔻🔻🔻🔻🔻🔻 Gemini AI Workers 🔻🔻🔻🔻🔻🔻🔻🔻
async function generateContentWithRetry(model, params, maxRetries = 3, delayMs = 2000) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await model.generateContent(params);
} catch (err) {
console.error(`⚠️ Gemini API 호출 실패 (시도 ${attempt}/${maxRetries}):`, err.message);
if (attempt === maxRetries) {
throw err;
}
if (err.message.includes('503') || err.message.includes('429') || err.message.includes('high demand') || err.message.includes('ResourceExhausted')) {
console.log(`⏳ 일시적 API 서버 장애/트래픽 과부하 감지. ${delayMs}ms 후 자동 재시도합니다...`);
await new Promise(resolve => setTimeout(resolve, delayMs));
} else {
throw err;
}
}
}
}
const summarizeAIWorker = new Worker('ai-summarize', async (job) => {
const { prompt, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data;
// 1. DB에서 object_key 조회
const tbData = process.env.NODE_ENV === 'production' ? 'tb_data' : '_test_tb_data';
const dbRes = await pool.query(`SELECT object_key FROM ver4.${tbData} WHERE data_id = $1`, [dataId]);
if (dbRes.rows.length === 0) {
throw new Error(`DB에서 dataId ${dataId}에 해당하는 파일을 찾을 수 없습니다.`);
}
const objectKey = dbRes.rows[0].object_key;
// 2. MinIO S3에서 파일 다운로드
const s3Client = getS3(storageType);
const getCommand = new GetObjectCommand({ Bucket: projectId, Key: objectKey });
const s3Response = await s3Client.send(getCommand);
const buffer = await new Promise((resolve, reject) => {
const chunks = [];
s3Response.Body.on('data', (chunk) => chunks.push(chunk));
s3Response.Body.on('error', reject);
s3Response.Body.on('end', () => resolve(Buffer.concat(chunks)));
});
const base64Pdf = buffer.toString('base64');
// 3. Gemini API 호출
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
const result = await generateContentWithRetry(model, {
contents: [
{
parts: [
{ text: prompt },
{
inline_data: {
mime_type: 'application/pdf',
data: base64Pdf,
},
},
],
},
],
});
const summaryText = await result.response.text();
const countResult = await model.countTokens({
contents: [
{
parts: [
{ text: prompt },
{
inline_data: {
mime_type: 'application/pdf',
data: base64Pdf,
},
}
]
}
]
});
return {
projectId,
resourcePath,
dataId,
userInfoString,
text: summaryText,
initiator,
type,
totalToken: countResult.totalTokens
};
}, {
connection: redisConnection,
lockDuration: 90000 // lock 유지 시간을 90초로 늘려 stalled 에러 방지
});
const summarizeAPIWorker = new Worker('api-summarize', async (job) => {
const { prompt, url, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data;
// 1. Presigned URL을 통해 axios로 파일 다운로드
const response = await axios.get(url, { responseType: 'arraybuffer' });
const buffer = Buffer.from(response.data);
const base64Pdf = buffer.toString('base64');
// 2. Gemini API 호출
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
const result = await generateContentWithRetry(model, {
contents: [
{
parts: [
{ text: prompt },
{
inline_data: {
mime_type: 'application/pdf',
data: base64Pdf,
},
},
],
},
],
});
const summaryText = await result.response.text();
const countResult = await model.countTokens({
contents: [
{
parts: [
{ text: prompt },
{
inline_data: {
mime_type: 'application/pdf',
data: base64Pdf,
},
}
]
}
]
});
return {
projectId,
resourcePath,
dataId,
userInfoString,
text: summaryText,
initiator,
type,
totalToken: countResult.totalTokens
};
}, {
connection: redisConnection,
lockDuration: 90000 // lock 유지 시간을 90초로 늘려 stalled 에러 방지
});
module.exports = { convertPdfQueue, zipFolderQueue, thumbQueue, postProcessVideoQueue, summarizeAIQueue, summarizeAPIQueue };