const { Queue, QueueEvents, Job, Worker } = require('bullmq'); const { redisConnection } = require('./config/redis.js'); const { getIo } = require('./socket'); const axios = require('axios'); const controllers = require(`./controllers/archiveController.js`); const { GoogleGenerativeAI } = require('@google/generative-ai'); const pool = require('./db/pool.js'); const { GetObjectCommand } = require('@aws-sdk/client-s3'); const onPremiseClient = require('./config/onPremiseClient.js'); const cloudClient = require('./config/cloudClient.js'); function getS3(storageType) { return storageType === 'Cloud' ? cloudClient : onPremiseClient; } // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» worker1 λ³€ν™˜ κ΄€λ ¨ λ‚΄μš© μ‹œμž‘ πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» //// λ³€ν™˜ 큐 객체 생성 const convertPdfQueue = new Queue('convert-pdf', { connection: redisConnection, }); //// λ³€ν™˜ 큐 이벀트 객체 생성 const convertPdfQueueEvents = new QueueEvents('convert-pdf', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */); //removeOnComplete : μˆ˜λŸ‰ - μˆ˜λŸ‰λ§ŒνΌ μ™„λ£Œλœ μž‘μ—… 보관 //// λ³€ν™˜ μ™„λ£Œ/μ‹€νŒ¨ ν›„ μ†ŒμΌ“μœΌλ‘œ convertPdf_success/convertPdf_failed 이벀트 전솑 convertPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => { // console.log(`πŸ”” Job ${jobId} μ™„λ£Œ:`, returnvalue); let resultData = returnvalue; let io = getIo(); io.emit('convertPdf_success', resultData); }); convertPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => { // console.log(`⚠️ Job ${job.id} μ‹€νŒ¨:`, failedReason); const job = await Job.fromId(convertPdfQueue, jobId); let jobName = job.name; let jobQueueName = job.queue.name; let { resourcePath, projectId, userInfoString, serviceName, dataId } = job.data; if (!resourcePath) resourcePath = job.progress.resourcePath; if (!projectId) projectId = job.progress.projectId; if (!userInfoString) userInfoString = job.progress.userInfoString; if (!serviceName) serviceName = job.progress.serviceName; if (!dataId) dataId = job.progress.dataId; let userInfo = JSON.parse(userInfoString); let { user_id, user_nm } = userInfo; console.log(''); console.error('============================= λ³€ν™˜μ‹€νŒ¨'); console.log('jobName :', jobName); console.log('jobQueueName :',jobQueueName ); console.log('serviceName :', serviceName); console.log('projectId :', projectId); console.log('resourcePath :', resourcePath); console.log('user_nm :', user_nm); console.log('user_id :', user_id); console.log('dataId :', dataId); console.log(''); // let resultData = (job.progress == 0) ? job.data : job.progress; let resultData = { jobData: job.data, jobProgress: job.progress }; let io = getIo(); io.emit('convertPdf_failed', resultData); }); // πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί worker1 λ³€ν™˜ κ΄€λ ¨ λ‚΄μš© 끝 πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» worker2 pdf_thumb κ΄€λ ¨ λ‚΄μš© μ‹œμž‘ πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» //// test 큐 객체 생성 const thumbQueue = new Queue('pdf-thumb', {connection: redisConnection}); // πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί worker2 pdf_thumb κ΄€λ ¨ λ‚΄μš© 끝 πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» worker3 폴더압좕 κ΄€λ ¨ λ‚΄μš© μ‹œμž‘ πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» //// 폴더압좕 큐 객체 생성 const zipFolderQueue = new Queue('zip-folder', {connection: redisConnection}); //// λ³€ν™˜ 큐 이벀트 객체 생성 const zipFolderPdfQueueEvents = new QueueEvents('zip-folder', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */); zipFolderPdfQueueEvents.on('completed', async ({ jobId, returnvalue }) => { // console.log(`πŸ”” Job ${jobId} μ™„λ£Œ:`, returnvalue); let resultData = returnvalue; let userInfo = JSON.parse(resultData.userInfoString); let clientId = userInfo.clientId; let io = getIo(); if(!io.folderDownloadList){ io.folderDownloadList = new folderDownloadManager(); } io.folderDownloadList.putList(resultData); io.to(clientId).emit('zip-prepared', resultData); }); zipFolderPdfQueueEvents.on('failed', async ({ jobId, failedReason }) => { // console.log(`⚠️ Job ${jobId} μ‹€νŒ¨:`, failedReason); }); // 폴더 μ••μΆ• ν”„λ‘œκ·Έλ ˆμŠ€ - μ‹€μ‹œκ°„μœΌλ‘œ μ§„ν–‰λ₯  및 기타 정보 λ°›μ•„μ„œ λ‹€μš΄λ‘œλ“œ ν•œ μ‚¬μš©μžμ—κ²Œ μ†ŒμΌ“μœΌλ‘œ 전달 zipFolderPdfQueueEvents.on('progress', ({ data }) => { let userInfo = JSON.parse(data.userInfoString); let clientId = userInfo.clientId; let io = getIo(); io.to(clientId).emit('zip-progress', data); }); // πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί worker3 폴더압좕 κ΄€λ ¨ λ‚΄μš© 끝 πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» worker5 λ™μ˜μƒ ν›„μ²˜λ¦¬ κ΄€λ ¨ λ‚΄μš© μ‹œμž‘ πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» //// λ™μ˜μƒ ν›„μ²˜λ¦¬ 큐 객체 생성 const postProcessVideoQueue = new Queue('post-process-video', {connection: redisConnection}); // //// λ™μ˜μƒ ν›„μ²˜λ¦¬ 큐 이벀트 객체 생성 const postProcessVideoQueueEvents = new QueueEvents('post-process-video', { connection: redisConnection }/* , { removeOnComplete: 1000, removeOnFail: 5000 } */); // //// λ™μ˜μƒ ν›„μ²˜λ¦¬ μ™„λ£Œ/μ‹€νŒ¨ ν›„ μ†ŒμΌ“μœΌλ‘œ postProcessVideo_success/postProcessVideo_failed 이벀트 전솑 postProcessVideoQueueEvents.on('completed', async ({ jobId, returnvalue }) => { // console.log(`πŸ”” Job ${jobId} μ™„λ£Œ:`, returnvalue); let resultData = returnvalue; resultData.resourcePathArr = [resultData.resourcePath]; let io = getIo(); io.emit('postProcessVideo_success', resultData); }); // πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί worker5 λ™μ˜μƒ ν›„μ²˜λ¦¬ κ΄€λ ¨ λ‚΄μš© 끝 πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» summarizeAI κ΄€λ ¨ λ‚΄μš© μ‹œμž‘ πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» //// summarizeAI 큐 객체 생성 const summarizeAIQueue = new Queue('ai-summarize', {connection: redisConnection}); //// summarizeAI 큐 이벀트 객체 생성 const summarizeAIQueueEvents = new QueueEvents('ai-summarize', { connection: redisConnection}); summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => { let resultData = returnvalue; const { projectId, resourcePath, dataId, userInfoString, text, initiator, type, totalToken } = resultData; console.log('@@@@@@@@@@@@@@@@@@@') console.log(totalToken) const env = process.env.NODE_ENV; const deploymentType = process.env.DEPLOYMENT_TYPE; const cloudType = process.env.CLOUD_TYPE; let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`; if(env == 'production') { if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE'; if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`; } if (initiator.includes(initiator_ref)) { let addSummarizeAiLogParams = { projectId: projectId, activity: 'summarizeAI', userInfoString: userInfoString, resourcePath: resourcePath, resourcePathArr: [resourcePath], dataId: dataId, dataIdArr: [dataId], text: text, initiator: initiator, isState: true, type: type }; await controllers.addSummarizeAiLog(addSummarizeAiLogParams); } }); summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => { // μ‹€νŒ¨ν•œ μž‘μ—…μ˜ 파일 정보 μ°ΎκΈ° const job = await Job.fromId(summarizeAIQueue, jobId); let io = getIo(); io.emit('summarize_failed', { resourcePath: job.data.resourcePath, dataId: job.data.dataId, text: failedReason, type: job.data.type, }); }) // summarizeAIQueueEvents.on('completed', async({ jobId, returnvalue }) => { // // console.log(returnvalue); // // console.log('AI μš”μ•½ 성곡'); // let resultData = returnvalue; // let io = getIo(); // io.emit('summarize_success', resultData); // }); // summarizeAIQueueEvents.on('failed', async({ jobId, failedReason}) => { // // console.log('AI μš”μ•½ μ‹€νŒ¨' + failedReason); // // μ‹€νŒ¨ν•œ μž‘μ—…μ˜ 파일 정보 μ°ΎκΈ° // const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId); // if (failedJob) { // let io = getIo(); // io.emit('summarize_failed', { // resourcePath: failedJob.resourcePath, // dataId: failedJob.dataId, // }); // // λ°°μ—΄μ—μ„œ μ‹€νŒ¨ν•œ μž‘μ—… 제거 // summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId); // } // }); // πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί summarizeAI κ΄€λ ¨ λ‚΄μš© 끝 πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» worker6 summarizeAPI κ΄€λ ¨ λ‚΄μš© μ‹œμž‘ πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» //// summarizeAPI 큐 객체 생성 const summarizeAPIQueue = new Queue('api-summarize', { connection: redisConnection}); //// summarizeAPI 큐 이벀트 객체 생성 const summarizeAPIQueueEvents = new QueueEvents('api-summarize', {connection: redisConnection}); summarizeAPIQueueEvents.on('completed', async({ jobId, returnvalue }) => { let resultData = returnvalue; const { projectId, resourcePath, dataId, userInfoString, text, initiator } = resultData; const env = process.env.NODE_ENV; const deploymentType = process.env.DEPLOYMENT_TYPE; const cloudType = process.env.CLOUD_TYPE; let initiator_ref = `DEV_LOCAL_${JSON.parse(userInfoString).user_id}`; //devλŠ” κ΅¬λ³„ν•˜κΈ° μ–΄λ €μ›Œμ„œ 돌린 μ‚¬λžŒλ§Œ 듀어가도둝 μ„€μ • if (env == 'production') { if (deploymentType == 'ONPREMISE') initiator_ref = 'HYHC_ONPREMISE'; if (deploymentType == 'CLOUD') initiator_ref = `AWS_CLOUD_${cloudType}`; } if(initiator.includes(initiator_ref)) { let addSummarizeAiLogParams = { projectId: projectId, activity: 'summarizeAI', userInfoString: userInfoString, resourcePath: resourcePath, resourcePathArr: [resourcePath], dataId: dataId, dataIdArr: [dataId], text: text, initiator: initiator, isState: true }; await controllers.addSummarizeAiLog(addSummarizeAiLogParams); } }); summarizeAPIQueueEvents.on('failed', async({ jobId, failedReason}) => { // μ‹€νŒ¨ν•œ μž‘μ—…μ˜ 파일 정보 μ°ΎκΈ° // const failedJob = summarizeAiDataArr.find(item => item.jobId === jobId); // if (failedJob) { const job = await Job.fromId(summarizeAPIQueue, jobId); let io = getIo(); io.emit('summarize_failed', { resourcePath: job.data.resourcePath, dataId: job.data.dataId, text: failedReason }); // λ°°μ—΄μ—μ„œ μ‹€νŒ¨ν•œ μž‘μ—… 제거 // summarizeAiDataArr = summarizeAiDataArr.filter(item => item.jobId !== jobId); // } }) // πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί summarizeAPI κ΄€λ ¨ λ‚΄μš© 끝 πŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”ΊπŸ”Ί //μž„μ‹œ class folderDownloadManager{ constructor(){ this.array = []; } getMyList(userId){ let result = []; for(let task in this.array){ let userInfo = JSON.parse(task.data.userInfoString); if(userInfo.user_id === userId) result.push(task); } return result; } putList(data){ const addedAt = Date.now(); const taskId = this.makeId(); let task = { taskId : taskId, data : data, addedAt : addedAt, timeoutId : null } this.array.push(task); let deleteUrl = data.deleteUrl; task.timeoutId = setTimeout(()=>{ this.removeList(taskId, deleteUrl); }, 24*60*60*1000/* 15*1000 */);//15초 ν…ŒμŠ€νŠΈ } async removeList(dataId, deleteUrl){ // const initialLength = this.array.length; //λͺ©λ‘ μ‚­μ œ 이전 μ‹€μ œ 데이터 μ‚­μ œλΆ€λΆ„ μΆ”κ°€ ν•„μš” - deleteUrl둜... const response = await axios({ method: 'DELETE', url: deleteUrl, headers: { 'Content-Type': 'application/xml' }, timeout: 10000 // 10초 νƒ€μž„μ•„μ›ƒ }); console.log('βœ… 파일 μ‚­μ œ 성곡:', response.status); this.array = this.array.filter(task=>{ if(task.taskId === dataId && task.timeoutId){ clearTimeout(task.timeoutId); } return task.taskId !== dataId; }); } makeId(){ return Math.floor(Math.random() * 9000) + 1000; } } // πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» Gemini AI Workers πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”»πŸ”» async function generateContentWithRetry(model, params, maxRetries = 3, delayMs = 2000) { for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await model.generateContent(params); } catch (err) { console.error(`⚠️ Gemini API 호좜 μ‹€νŒ¨ (μ‹œλ„ ${attempt}/${maxRetries}):`, err.message); if (attempt === maxRetries) { throw err; } if (err.message.includes('503') || err.message.includes('429') || err.message.includes('high demand') || err.message.includes('ResourceExhausted')) { console.log(`⏳ μΌμ‹œμ  API μ„œλ²„ μž₯μ• /νŠΈλž˜ν”½ κ³ΌλΆ€ν•˜ 감지. ${delayMs}ms ν›„ μžλ™ μž¬μ‹œλ„ν•©λ‹ˆλ‹€...`); await new Promise(resolve => setTimeout(resolve, delayMs)); } else { throw err; } } } } const summarizeAIWorker = new Worker('ai-summarize', async (job) => { const { prompt, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data; // 1. DBμ—μ„œ object_key 쑰회 const tbData = process.env.NODE_ENV === 'production' ? 'tb_data' : '_test_tb_data'; const dbRes = await pool.query(`SELECT object_key FROM ver4.${tbData} WHERE data_id = $1`, [dataId]); if (dbRes.rows.length === 0) { throw new Error(`DBμ—μ„œ dataId ${dataId}에 ν•΄λ‹Ήν•˜λŠ” νŒŒμΌμ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.`); } const objectKey = dbRes.rows[0].object_key; // 2. MinIO S3μ—μ„œ 파일 λ‹€μš΄λ‘œλ“œ const s3Client = getS3(storageType); const getCommand = new GetObjectCommand({ Bucket: projectId, Key: objectKey }); const s3Response = await s3Client.send(getCommand); const buffer = await new Promise((resolve, reject) => { const chunks = []; s3Response.Body.on('data', (chunk) => chunks.push(chunk)); s3Response.Body.on('error', reject); s3Response.Body.on('end', () => resolve(Buffer.concat(chunks))); }); const base64Pdf = buffer.toString('base64'); // 3. Gemini API 호좜 const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY); const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' }); const result = await generateContentWithRetry(model, { contents: [ { parts: [ { text: prompt }, { inline_data: { mime_type: 'application/pdf', data: base64Pdf, }, }, ], }, ], }); const summaryText = await result.response.text(); const countResult = await model.countTokens({ contents: [ { parts: [ { text: prompt }, { inline_data: { mime_type: 'application/pdf', data: base64Pdf, }, } ] } ] }); return { projectId, resourcePath, dataId, userInfoString, text: summaryText, initiator, type, totalToken: countResult.totalTokens }; }, { connection: redisConnection, lockDuration: 90000 // lock μœ μ§€ μ‹œκ°„μ„ 90초둜 늘렀 stalled μ—λŸ¬ λ°©μ§€ }); const summarizeAPIWorker = new Worker('api-summarize', async (job) => { const { prompt, url, resourcePath, initiator, storageType, dataId, projectId, userInfoString, userIp, type } = job.data; // 1. Presigned URL을 톡해 axios둜 파일 λ‹€μš΄λ‘œλ“œ const response = await axios.get(url, { responseType: 'arraybuffer' }); const buffer = Buffer.from(response.data); const base64Pdf = buffer.toString('base64'); // 2. Gemini API 호좜 const genAI = new GoogleGenerativeAI(process.env.GEMINI_API_KEY); const model = genAI.getGenerativeModel({ model: 'gemini-2.5-flash' }); const result = await generateContentWithRetry(model, { contents: [ { parts: [ { text: prompt }, { inline_data: { mime_type: 'application/pdf', data: base64Pdf, }, }, ], }, ], }); const summaryText = await result.response.text(); const countResult = await model.countTokens({ contents: [ { parts: [ { text: prompt }, { inline_data: { mime_type: 'application/pdf', data: base64Pdf, }, } ] } ] }); return { projectId, resourcePath, dataId, userInfoString, text: summaryText, initiator, type, totalToken: countResult.totalTokens }; }, { connection: redisConnection, lockDuration: 90000 // lock μœ μ§€ μ‹œκ°„μ„ 90초둜 늘렀 stalled μ—λŸ¬ λ°©μ§€ }); module.exports = { convertPdfQueue, zipFolderQueue, thumbQueue, postProcessVideoQueue, summarizeAIQueue, summarizeAPIQueue };