초기 PM 소스 전체 업로드
This commit is contained in:
519
queue.js
Normal file
519
queue.js
Normal file
@@ -0,0 +1,519 @@
|
||||
const { Queue, QueueEvents, Job, Worker } = require('bullmq');
|
||||
const { redisConnection } = require('./config/redis.js');
|
||||
const { getIo } = require('./socket');
|
||||
const axios = require('axios');
|
||||
const controllers = require(`./controllers/archiveController.js`);
|
||||
const { GoogleGenerativeAI } = require('@google/generative-ai');
|
||||
const pool = require('./db/pool.js');
|
||||
const { GetObjectCommand } = require('@aws-sdk/client-s3');
|
||||
const onPremiseClient = require('./config/onPremiseClient.js');
|
||||
const cloudClient = require('./config/cloudClient.js');
|
||||
|
||||
function getS3(storageType) {
|
||||
return storageType === 'Cloud' ? cloudClient : onPremiseClient;
|
||||
}
|
||||
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker1 변환 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
//// 변환 큐 객체 생성
|
||||
const convertPdfQueue = new Queue('convert-pdf', {
|
||||
connection: redisConnection,
|
||||
});
|
||||
|
||||
//// 변환 큐 이벤트 객체 생성
|
||||
const convertPdfQueueEvents = new QueueEvents('convert-pdf', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
|
||||
//removeOnComplete : 수량 - 수량만큼 완료된 작업 보관
|
||||
|
||||
//// 변환 완료/실패 후 소켓으로 convertPdf_success/convertPdf_failed 이벤트 전송
|
||||
convertPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
|
||||
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
|
||||
|
||||
let resultData = returnvalue;
|
||||
let io = getIo();
|
||||
io.emit('convertPdf_success', resultData);
|
||||
});
|
||||
convertPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => {
|
||||
// console.log(`⚠️ Job ${job.id} 실패:`, failedReason);
|
||||
|
||||
const job = await Job.fromId(convertPdfQueue, jobId);
|
||||
let jobName = job.name;
|
||||
let jobQueueName = job.queue.name;
|
||||
let { resourcePath, projectId, userInfoString, serviceName, dataId } = job.data;
|
||||
if (!resourcePath) resourcePath = job.progress.resourcePath;
|
||||
if (!projectId) projectId = job.progress.projectId;
|
||||
if (!userInfoString) userInfoString = job.progress.userInfoString;
|
||||
if (!serviceName) serviceName = job.progress.serviceName;
|
||||
if (!dataId) dataId = job.progress.dataId;
|
||||
let userInfo = JSON.parse(userInfoString);
|
||||
let { user_id, user_nm } = userInfo;
|
||||
|
||||
console.log('');
|
||||
console.error('============================= 변환실패');
|
||||
console.log('jobName :', jobName);
|
||||
console.log('jobQueueName :',jobQueueName );
|
||||
console.log('serviceName :', serviceName);
|
||||
console.log('projectId :', projectId);
|
||||
console.log('resourcePath :', resourcePath);
|
||||
console.log('user_nm :', user_nm);
|
||||
console.log('user_id :', user_id);
|
||||
console.log('dataId :', dataId);
|
||||
console.log('');
|
||||
|
||||
// let resultData = (job.progress == 0) ? job.data : job.progress;
|
||||
let resultData = { jobData: job.data, jobProgress: job.progress };
|
||||
|
||||
let io = getIo();
|
||||
io.emit('convertPdf_failed', resultData);
|
||||
});
|
||||
|
||||
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker1 변환 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
||||
|
||||
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker2 pdf_thumb 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
//// test 큐 객체 생성
|
||||
const thumbQueue = new Queue('pdf-thumb', {connection: redisConnection});
|
||||
|
||||
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker2 pdf_thumb 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
||||
|
||||
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker3 폴더압축 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
//// 폴더압축 큐 객체 생성
|
||||
const zipFolderQueue = new Queue('zip-folder', {connection: redisConnection});
|
||||
|
||||
//// 변환 큐 이벤트 객체 생성
|
||||
const zipFolderPdfQueueEvents = new QueueEvents('zip-folder', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
|
||||
|
||||
zipFolderPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
|
||||
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
|
||||
|
||||
let resultData = returnvalue;
|
||||
|
||||
let userInfo = JSON.parse(resultData.userInfoString);
|
||||
let clientId = userInfo.clientId;
|
||||
|
||||
let io = getIo();
|
||||
|
||||
if(!io.folderDownloadList){
|
||||
io.folderDownloadList = new folderDownloadManager();
|
||||
}
|
||||
|
||||
io.folderDownloadList.putList(resultData);
|
||||
io.to(clientId).emit('zip-prepared', resultData);
|
||||
});
|
||||
zipFolderPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => {
|
||||
// console.log(`⚠️ Job ${jobId} 실패:`, failedReason);
|
||||
});
|
||||
|
||||
// 폴더 압축 프로그레스 - 실시간으로 진행률 및 기타 정보 받아서 다운로드 한 사용자에게 소켓으로 전달
|
||||
zipFolderPdfQueueEvents.on('progress', ({ data }) => {
|
||||
let userInfo = JSON.parse(data.userInfoString);
|
||||
let clientId = userInfo.clientId;
|
||||
let io = getIo();
|
||||
io.to(clientId).emit('zip-progress', data);
|
||||
});
|
||||
|
||||
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker3 폴더압축 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
||||
|
||||
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker5 동영상 후처리 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
//// 동영상 후처리 큐 객체 생성
|
||||
const postProcessVideoQueue = new Queue('post-process-video', {connection: redisConnection});
|
||||
|
||||
// //// 동영상 후처리 큐 이벤트 객체 생성
|
||||
const postProcessVideoQueueEvents = new QueueEvents('post-process-video', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */);
|
||||
|
||||
// //// 동영상 후처리 완료/실패 후 소켓으로 postProcessVideo_success/postProcessVideo_failed 이벤트 전송
|
||||
postProcessVideoQueueEvents.on('completed', async ({ jobId, returnvalue }) => {
|
||||
// console.log(`🔔 Job ${jobId} 완료:`, returnvalue);
|
||||
|
||||
let resultData = returnvalue;
|
||||
resultData.resourcePathArr = [resultData.resourcePath];
|
||||
let io = getIo();
|
||||
io.emit('postProcessVideo_success', resultData);
|
||||
});
|
||||
|
||||
// 🔺🔺🔺🔺🔺🔺🔺🔺 worker5 동영상 후처리 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 summarizeAI 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
//// summarizeAI 큐 객체 생성
|
||||
const summarizeAIQueue = new Queue('ai-summarize', {connection: redisConnection});
|
||||
|
||||
//// summarizeAI 큐 이벤트 객체 생성
|
||||
const summarizeAIQueueEvents = new QueueEvents('ai-summarize', { connection: redisConnection});
|
||||
|
||||
summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
|
||||
let resultData = returnvalue;
|
||||
|
||||
const { projectId, resourcePath, dataId, userInfoString, text, initiator, type, totalToken } = resultData;
|
||||
|
||||
console.log('@@@@@@@@@@@@@@@@@@@')
|
||||
console.log(totalToken)
|
||||
|
||||
const env = process.env.NODE_ENV;
|
||||
const deploymentType = process.env.DEPLOYMENT_TYPE;
|
||||
const cloudType = process.env.CLOUD_TYPE;
|
||||
|
||||
let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`;
|
||||
if(env == 'production') {
|
||||
if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE';
|
||||
if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`;
|
||||
}
|
||||
|
||||
if (initiator.includes(initiator_ref)) {
|
||||
let addSummarizeAiLogParams = {
|
||||
projectId: projectId,
|
||||
activity: 'summarizeAI',
|
||||
userInfoString: userInfoString,
|
||||
resourcePath: resourcePath,
|
||||
resourcePathArr: [resourcePath],
|
||||
dataId: dataId,
|
||||
dataIdArr: [dataId],
|
||||
text: text,
|
||||
initiator: initiator,
|
||||
isState: true,
|
||||
type: type
|
||||
};
|
||||
|
||||
await controllers.addSummarizeAiLog(addSummarizeAiLogParams);
|
||||
}
|
||||
});
|
||||
|
||||
summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => {
|
||||
// 실패한 작업의 파일 정보 찾기
|
||||
const job = await Job.fromId(summarizeAIQueue, jobId);
|
||||
|
||||
let io = getIo();
|
||||
io.emit('summarize_failed', {
|
||||
resourcePath: job.data.resourcePath,
|
||||
dataId: job.data.dataId,
|
||||
text: failedReason,
|
||||
type: job.data.type,
|
||||
});
|
||||
})
|
||||
|
||||
// summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
|
||||
// // console.log(returnvalue);
|
||||
// // console.log('AI 요약 성공');
|
||||
|
||||
// let resultData = returnvalue;
|
||||
// let io = getIo();
|
||||
// io.emit('summarize_success', resultData);
|
||||
|
||||
// });
|
||||
|
||||
// summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => {
|
||||
// // console.log('AI 요약 실패' + failedReason);
|
||||
|
||||
|
||||
// // 실패한 작업의 파일 정보 찾기
|
||||
// const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId);
|
||||
// if (failedJob) {
|
||||
// let io = getIo();
|
||||
// io.emit('summarize_failed', {
|
||||
// resourcePath: failedJob.resourcePath,
|
||||
// dataId: failedJob.dataId,
|
||||
// });
|
||||
|
||||
// // 배열에서 실패한 작업 제거
|
||||
// summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId);
|
||||
// }
|
||||
// });
|
||||
|
||||
// 🔺🔺🔺🔺🔺🔺🔺🔺 summarizeAI 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 worker6 summarizeAPI 관련 내용 시작 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
//// summarizeAPI 큐 객체 생성
|
||||
const summarizeAPIQueue = new Queue('api-summarize', { connection: redisConnection});
|
||||
|
||||
//// summarizeAPI 큐 이벤트 객체 생성
|
||||
const summarizeAPIQueueEvents = new QueueEvents('api-summarize', {connection: redisConnection});
|
||||
|
||||
summarizeAPIQueueEvents.on('completed', async({ jobId, returnvalue }) => {
|
||||
let resultData = returnvalue;
|
||||
|
||||
const { projectId, resourcePath, dataId, userInfoString, text, initiator } = resultData;
|
||||
|
||||
const env = process.env.NODE_ENV;
|
||||
const deploymentType = process.env.DEPLOYMENT_TYPE;
|
||||
const cloudType = process.env.CLOUD_TYPE;
|
||||
|
||||
let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`; //dev는 구별하기 어려워서 돌린 사람만 들어가도록 설정
|
||||
if (env == 'production') {
|
||||
if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE';
|
||||
if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`;
|
||||
}
|
||||
|
||||
if(initiator.includes(initiator_ref)) {
|
||||
let addSummarizeAiLogParams = {
|
||||
projectId: projectId,
|
||||
activity: 'summarizeAI',
|
||||
userInfoString: userInfoString,
|
||||
resourcePath: resourcePath,
|
||||
resourcePathArr: [resourcePath],
|
||||
dataId: dataId,
|
||||
dataIdArr: [dataId],
|
||||
text: text,
|
||||
initiator: initiator,
|
||||
isState: true
|
||||
};
|
||||
|
||||
await controllers.addSummarizeAiLog(addSummarizeAiLogParams);
|
||||
}
|
||||
});
|
||||
|
||||
summarizeAPIQueueEvents.on('failed', async({ jobId, failedReason}) => {
|
||||
// 실패한 작업의 파일 정보 찾기
|
||||
// const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId);
|
||||
// if (failedJob) {
|
||||
const job = await Job.fromId(summarizeAPIQueue, jobId);
|
||||
|
||||
let io = getIo();
|
||||
io.emit('summarize_failed', {
|
||||
resourcePath: job.data.resourcePath,
|
||||
dataId: job.data.dataId,
|
||||
text: failedReason
|
||||
});
|
||||
|
||||
// 배열에서 실패한 작업 제거
|
||||
// summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId);
|
||||
// }
|
||||
})
|
||||
|
||||
// 🔺🔺🔺🔺🔺🔺🔺🔺 summarizeAPI 관련 내용 끝 🔺🔺🔺🔺🔺🔺🔺🔺
|
||||
|
||||
//임시
|
||||
class folderDownloadManager{
|
||||
constructor(){
|
||||
this.array = [];
|
||||
}
|
||||
|
||||
getMyList(userId){
|
||||
let result = [];
|
||||
for(let task in this.array){
|
||||
let userInfo = JSON.parse(task.data.userInfoString);
|
||||
if(userInfo.user_id === userId) result.push(task);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
putList(data){
|
||||
const addedAt = Date.now();
|
||||
const taskId = this.makeId();
|
||||
|
||||
let task = {
|
||||
taskId : taskId,
|
||||
data : data,
|
||||
addedAt : addedAt,
|
||||
timeoutId : null
|
||||
}
|
||||
|
||||
this.array.push(task);
|
||||
|
||||
let deleteUrl = data.deleteUrl;
|
||||
|
||||
task.timeoutId = setTimeout(()=>{
|
||||
this.removeList(taskId, deleteUrl);
|
||||
}, 24*60*60*1000/* 15*1000 */);//15초 테스트
|
||||
}
|
||||
|
||||
async removeList(dataId, deleteUrl){
|
||||
// const initialLength = this.array.length;
|
||||
|
||||
//목록 삭제 이전 실제 데이터 삭제부분 추가 필요 - deleteUrl로...
|
||||
const response = await axios({
|
||||
method: 'DELETE',
|
||||
url: deleteUrl,
|
||||
headers: {
|
||||
'Content-Type': 'application/xml'
|
||||
},
|
||||
timeout: 10000 // 10초 타임아웃
|
||||
});
|
||||
|
||||
console.log('✅ 파일 삭제 성공:', response.status);
|
||||
|
||||
this.array = this.array.filter(task=>{
|
||||
if(task.taskId === dataId && task.timeoutId){
|
||||
clearTimeout(task.timeoutId);
|
||||
}
|
||||
return task.taskId !== dataId;
|
||||
});
|
||||
}
|
||||
|
||||
makeId(){
|
||||
return Math.floor(Math.random() * 9000) + 1000;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 🔻🔻🔻🔻🔻🔻🔻🔻 Gemini AI Workers 🔻🔻🔻🔻🔻🔻🔻🔻
|
||||
|
||||
async function generateContentWithRetry(model, params, maxRetries = 3, delayMs = 2000) {
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await model.generateContent(params);
|
||||
} catch (err) {
|
||||
console.error(`⚠️ Gemini API 호출 실패 (시도 ${attempt}/${maxRetries}):`, err.message);
|
||||
if (attempt === maxRetries) {
|
||||
throw err;
|
||||
}
|
||||
if (err.message.includes('503') || err.message.includes('429') || err.message.includes('high demand') || err.message.includes('ResourceExhausted')) {
|
||||
console.log(`⏳ 일시적 API 서버 장애/트래픽 과부하 감지. ${delayMs}ms 후 자동 재시도합니다...`);
|
||||
await new Promise(resolve => setTimeout(resolve, delayMs));
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const summarizeAIWorker = new Worker('ai-summarize', async (job) => {
|
||||
const { prompt, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data;
|
||||
|
||||
// 1. DB에서 object_key 조회
|
||||
const tbData = process.env.NODE_ENV === 'production' ? 'tb_data' : '_test_tb_data';
|
||||
const dbRes = await pool.query(`SELECT object_key FROM ver4.${tbData} WHERE data_id = $1`, [dataId]);
|
||||
if (dbRes.rows.length === 0) {
|
||||
throw new Error(`DB에서 dataId ${dataId}에 해당하는 파일을 찾을 수 없습니다.`);
|
||||
}
|
||||
const objectKey = dbRes.rows[0].object_key;
|
||||
|
||||
// 2. MinIO S3에서 파일 다운로드
|
||||
const s3Client = getS3(storageType);
|
||||
const getCommand = new GetObjectCommand({ Bucket: projectId, Key: objectKey });
|
||||
const s3Response = await s3Client.send(getCommand);
|
||||
|
||||
const buffer = await new Promise((resolve, reject) => {
|
||||
const chunks = [];
|
||||
s3Response.Body.on('data', (chunk) => chunks.push(chunk));
|
||||
s3Response.Body.on('error', reject);
|
||||
s3Response.Body.on('end', () => resolve(Buffer.concat(chunks)));
|
||||
});
|
||||
|
||||
const base64Pdf = buffer.toString('base64');
|
||||
|
||||
// 3. Gemini API 호출
|
||||
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
|
||||
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
|
||||
|
||||
const result = await generateContentWithRetry(model, {
|
||||
contents: [
|
||||
{
|
||||
parts: [
|
||||
{ text: prompt },
|
||||
{
|
||||
inline_data: {
|
||||
mime_type: 'application/pdf',
|
||||
data: base64Pdf,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const summaryText = await result.response.text();
|
||||
|
||||
const countResult = await model.countTokens({
|
||||
contents: [
|
||||
{
|
||||
parts: [
|
||||
{ text: prompt },
|
||||
{
|
||||
inline_data: {
|
||||
mime_type: 'application/pdf',
|
||||
data: base64Pdf,
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
return {
|
||||
projectId,
|
||||
resourcePath,
|
||||
dataId,
|
||||
userInfoString,
|
||||
text: summaryText,
|
||||
initiator,
|
||||
type,
|
||||
totalToken: countResult.totalTokens
|
||||
};
|
||||
}, {
|
||||
connection: redisConnection,
|
||||
lockDuration: 90000 // lock 유지 시간을 90초로 늘려 stalled 에러 방지
|
||||
});
|
||||
|
||||
const summarizeAPIWorker = new Worker('api-summarize', async (job) => {
|
||||
const { prompt, url, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data;
|
||||
|
||||
// 1. Presigned URL을 통해 axios로 파일 다운로드
|
||||
const response = await axios.get(url, { responseType: 'arraybuffer' });
|
||||
const buffer = Buffer.from(response.data);
|
||||
const base64Pdf = buffer.toString('base64');
|
||||
|
||||
// 2. Gemini API 호출
|
||||
const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY);
|
||||
const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' });
|
||||
|
||||
const result = await generateContentWithRetry(model, {
|
||||
contents: [
|
||||
{
|
||||
parts: [
|
||||
{ text: prompt },
|
||||
{
|
||||
inline_data: {
|
||||
mime_type: 'application/pdf',
|
||||
data: base64Pdf,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const summaryText = await result.response.text();
|
||||
|
||||
const countResult = await model.countTokens({
|
||||
contents: [
|
||||
{
|
||||
parts: [
|
||||
{ text: prompt },
|
||||
{
|
||||
inline_data: {
|
||||
mime_type: 'application/pdf',
|
||||
data: base64Pdf,
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
return {
|
||||
projectId,
|
||||
resourcePath,
|
||||
dataId,
|
||||
userInfoString,
|
||||
text: summaryText,
|
||||
initiator,
|
||||
type,
|
||||
totalToken: countResult.totalTokens
|
||||
};
|
||||
}, {
|
||||
connection: redisConnection,
|
||||
lockDuration: 90000 // lock 유지 시간을 90초로 늘려 stalled 에러 방지
|
||||
});
|
||||
|
||||
module.exports = { convertPdfQueue, zipFolderQueue, thumbQueue, postProcessVideoQueue, summarizeAIQueue, summarizeAPIQueue };
|
||||
|
||||
Reference in New Issue
Block a user