Created
January 21, 2024 16:49
-
-
Save wodCZ/5cf2537b849fe1c07c7f478feb5b7505 to your computer and use it in GitHub Desktop.
Nest separate Queue and Processor structure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Module } from '@nestjs/common'; | |
import { BullModule, BullRootModuleOptions } from '@nestjs/bull'; | |
import { QueuesService } from './queues.service'; | |
import { ConfigModule, ConfigService } from '@nestjs/config'; | |
import { AppConfig } from '../config/schema'; | |
@Module({ | |
imports: [ | |
BullModule.forRootAsync({ | |
imports: [ConfigModule], | |
inject: [ConfigService], | |
useFactory(configService: AppConfig) { | |
const redisUrl = configService.get('REDIS_URL', { infer: true }); | |
return { | |
url: redisUrl, | |
redis: redisUrl.startsWith('rediss://') | |
? { tls: { requestCert: true } } | |
: undefined, | |
} as BullRootModuleOptions; | |
}, | |
}), | |
...QueuesService.QUEUES.map((queue) => | |
BullModule.registerQueue({ | |
name: queue.queueName, | |
defaultJobOptions: queue.defaultJobOptions, | |
}), | |
), | |
], | |
exports: [QueuesService], | |
providers: [QueuesService], | |
}) | |
export class QueuesModule {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Injectable} from '@nestjs/common'; | |
import {JobOptions, Queue} from 'bull'; | |
import {InjectQueue} from '@nestjs/bull'; | |
import {StorageQueue} from '../worker/storage.queue'; | |
@Injectable() | |
export class QueuesService { | |
static QUEUES: { | |
queueName: string; | |
defaultJobOptions?: JobOptions; | |
}[] = [ | |
StorageQueue, | |
]; | |
constructor( | |
@InjectQueue(StorageQueue.queueName) | |
private readonly storageQueue: Queue, | |
) { | |
} | |
public get storage() { | |
return StorageQueue.makeFactories(this.storageQueue); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Process, Processor } from '@nestjs/bull'; | |
import { Job } from 'bull'; | |
import { Media } from '@prisma/client'; | |
import { StorageService } from '../storage/storage.service'; | |
import { StorageQueue } from './storage.queue'; | |
@Processor(StorageQueue.queueName) | |
export class StorageProcessor { | |
constructor(private readonly storageService: StorageService) {} | |
@Process(StorageQueue.JOB_VERIFY) | |
async verify(job: Job<Media>) { | |
const verified = await this.storageService.verify(job.data); | |
if (!verified) { | |
throw new Error(`Media ${job.data.fileName} is not yet uploaded.`); | |
} | |
return true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { JobOptions, Queue } from 'bull'; | |
import { Media } from '@prisma/client'; | |
export class StorageQueue { | |
static queueName = 'storage'; | |
static defaultJobOptions: JobOptions = { | |
removeOnComplete: { age: 86400 * 5 }, | |
}; | |
static JOB_VERIFY = 'verify'; | |
static makeFactories(queue: Queue) { | |
return { | |
verify(media: Media) { | |
return queue.add(StorageQueue.JOB_VERIFY, media, { | |
attempts: 100, | |
backoff: { | |
type: 'fixed', | |
delay: 10000, | |
}, | |
}); | |
}, | |
}; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Module} from '@nestjs/common'; | |
import {WorkerService} from './worker.service'; | |
import {StorageProcessor} from './storage.processor'; | |
import {ConfigModule} from '@nestjs/config'; | |
import {schema} from '../config/schema'; | |
import {QueuesModule} from '../queues/queues.module'; | |
@Module({ | |
imports: [ | |
ConfigModule.forRoot({ | |
envFilePath: '.env.local', | |
isGlobal: true, | |
cache: true, | |
validationSchema: schema, | |
expandVariables: true, | |
}), | |
QueuesModule, | |
], | |
providers: [ | |
WorkerService, | |
StorageProcessor, | |
], | |
}) | |
export class WorkerModule { | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Injectable, Logger, OnModuleDestroy} from '@nestjs/common'; | |
@Injectable() | |
export class WorkerService implements OnModuleDestroy { | |
private readonly promise; | |
private resolve?: (value?: any) => void; | |
private reject?: (reason?: any) => void; | |
private server; | |
constructor() { | |
this.promise = new Promise((resolve, reject) => { | |
Logger.log('Waiting for jobs', 'WorkerService'); | |
this.resolve = resolve; | |
this.reject = reject; | |
}); | |
} | |
async work() { | |
return this.promise; | |
} | |
onModuleDestroy() { | |
Logger.log('Shutting down', 'WorkerService'); | |
this.server.close(); | |
this.resolve && this.resolve(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment