8000 Update the mongodb driver to 4.0 by harisvsulaiman · Pull Request #1358 · agenda/agenda · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Update the mongodb driver to 4.0 #1358

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [10.x, 12.x, 14.x, 16.x]
mongodb-version: [3.4, 3.6, 4.0, 4.2, 4.4]
node-version: [14.x, 16.x]
mongodb-version: [3.4, 3.6, 4.0, 4.2, 4.4, 5.0]
steps:
- name: Git checkout
uses: actions/checkout@v2
Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ _Kudos for making the comparison chart goes to [Bull](https://www.npmjs.com/pack

# Installation

### Notice

In order to support new MongoDB 5.0 and mongodb node.js driver/package the next release (5.x.x) of Agenda will be major. The required node version will become >=12. The mongodb dependency version will become >=3.2.

Install via NPM

npm install agenda
Expand All @@ -63,16 +67,19 @@ You will also need a working [Mongo](https://www.mongodb.com/) database (v3) to
# CJS / Module Imports

for regular javascript code, just use the default entrypoint

```js
const Agenda = require('agenda');
const Agenda = require("agenda");
```

For Typescript, Webpack or other module imports, use `agenda/es` entrypoint:
e.g.

```ts
import { Agenda } from 'agenda/es';
import { Agenda } from "agenda/es";
```
***NOTE***: If you're migrating from `@types/agenda` you also should change imports to `agenda/es`.

**_NOTE_**: If you're migrating from `@types/agenda` you also should change imports to `agenda/es`.
Instead of `import Agenda from 'agenda'` use `import Agenda from 'agenda/es'`.

# Example Usage
Expand Down Expand Up @@ -551,7 +558,7 @@ This functionality can also be achieved by first retrieving all the jobs from th
Disables any jobs matching the passed mongodb-native query, preventing any matching jobs from being run by the Job Processor.

```js
const numDisabled = await agenda.disable({name: 'pollExternalService'});
const numDisabled = await agenda.disable({ name: "pollExternalService" });
```

Similar to `agenda.cancel()`, this functionality can be acheived with a combination of `agenda.jobs()` and `job.disable()`
Expand All @@ -561,7 +568,7 @@ Similar to `agenda.cancel()`, this functionality can be acheived with a combinat
Enables any jobs matching the passed mongodb-native query, allowing any matching jobs to be run by the Job Processor.

```js
const numEnabled = await agenda.enable({name: 'pollExternalService'});
const numEnabled = await agenda.enable({ name: "pollExternalService" });
```

Similar to `agenda.cancel()`, this functionality can be acheived with a combination of `agenda.jobs()` and `job.enable()`
Expand Down
10 changes: 5 additions & 5 deletions lib/agenda/cancel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Agenda } from ".";
import createDebugger from "debug";
import { FilterQuery } from "mongodb";
import { Document, Filter } from "mongodb";

const debug = createDebugger("agenda:cancel");

Expand All @@ -14,13 +14,13 @@ const debug = createDebugger("agenda:cancel");
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const cancel = async function (
this: Agenda,
query: FilterQuery<any>
query: Filter<Document>
): Promise<number | undefined> {
debug("attempting to cancel all Agenda jobs", query);
try {
const { result } = await this._collection.deleteMany(query);
debug("%s jobs cancelled", result.n);
return result.n;
const { deletedCount } = await this._collection.deleteMany(query);
debug("%s jobs cancelled", deletedCount);
return deletedCount;
} catch (error) {
debug("error trying to delete jobs from MongoDB");
throw error;
Expand Down
30 changes: 13 additions & 17 deletions lib/agenda/database.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Collection, MongoClient, MongoClientOptions } from "mongodb";
import createDebugger from "debug";
import { hasMongoProtocol } from "./has-mongo-protocol";
import { AnyError, Collection, MongoClient, MongoClientOptions } from "mongodb";
import { Agenda } from ".";
import { hasMongoProtocol } from "./has-mongo-protocol";

const debug = createDebugger("agenda:database");

Expand All @@ -24,24 +24,15 @@ export const database = function (
this: Agenda,
url: string,
collection?: string,
options?: MongoClientOptions,
cb?: (error: Error, collection: Collection<any> | null) => void
options: MongoClientOptions = {},
cb?: (error: AnyError | undefined, collection: Collection<any> | null) => void
): Agenda | void {
if (!hasMongoProtocol(url)) {
url = "mongodb://" + url;
}

const reconnectOptions =
options?.useUnifiedTopology === true
? {}
: {
autoReconnect: true,
reconnectTries: Number.MAX_SAFE_INTEGER,
reconnectInterval: this._processEvery,
};

collection = collection || "agendaJobs";
options = { ...reconnectOptions, ...options };

MongoClient.connect(url, options, (error, client) => {
if (error) {
debug("error connecting to MongoDB using collection: [%s]", collection);
Expand All @@ -58,9 +49,14 @@ export const database = function (
"successful connection to MongoDB using collection: [%s]",
collection
);
this._db = client;
this._mdb = client.db();
this.db_init(collection, cb);

if (client) {
this._db = client;
this._mdb = client.db();
this.db_init(collection, cb);
} else {
throw new Error("Mongo Client is undefined");
}
});
return this;
};
6 changes: 3 additions & 3 deletions lib/agenda/db-init.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import createDebugger from "debug";
import { Collection } from "mongodb";
import { AnyError, Collection } from "mongodb";
import { Agenda } from ".";

const debug = createDebugger("agenda:db_init");
Expand All @@ -14,15 +14,15 @@ const debug = createDebugger("agenda:db_init");
export const dbInit = function (
this: Agenda,
collection = "agendaJobs",
cb?: (error: Error, collection: Collection<any> | null) => void
cb?: (error: AnyError | undefined, collection: Collection<any> | null) => void
): void {
debug("init database collection using name [%s]", collection);
this._collection = this._mdb.collection(collection);
debug("attempting index creation");
this._collection.createIndex(
this._indices,
{ name: "findAndLockNextJobIndex" },
(error: Error) => {
(error) => {
if (error) {
debug("index creation failed");
this.emit("error", error);
Expand Down
10 changes: 5 additions & 5 deletions lib/agenda/disable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import createDebugger from "debug";
import { FilterQuery } from "mongodb";
import { Filter } from "mongodb";
import { Agenda } from ".";
const debug = createDebugger("agenda:disable");

Expand All @@ -12,15 +12,15 @@ const debug = createDebugger("agenda:disable");
*/
export const disable = async function (
this: Agenda,
query: FilterQuery<unknown> = {}
query: Filter<unknown> = {}
): Promise<number> {
debug("attempting to disable all jobs matching query", query);
try {
const { result } = await this._collection.updateMany(query, {
const { modifiedCount } = await this._collection.updateMany(query, {
$set: { disabled: true },
});
debug("%s jobs disabled", result.n);
return result.n;
debug("%s jobs disabled");
return modifiedCount;
} catch (error) {
debug("error trying to mark jobs as `disabled`");
throw error;
Expand Down
10 changes: 5 additions & 5 deletions lib/agenda/enable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import createDebugger from "debug";
import { FilterQuery } from "mongodb";
import { Filter } from "mongodb";
import { Agenda } from ".";
const debug = createDebugger("agenda:enable");

Expand All @@ -13,15 +13,15 @@ const debug = createDebugger("agenda:enable");
*/
export const enable = async function (
this: Agenda,
query: FilterQuery<unknown> = {}
query: Filter<unknown> = {}
): Promise<number> {
debug("attempting to enable all jobs matching query", query);
try {
const { result } = await this._collection.updateMany(query, {
const { modifiedCount } = await this._collection.updateMany(query, {
$set: { disabled: false },
});
debug("%s jobs enabled", result.n);
return result.n;
debug("%s jobs enabled", modifiedCount);
return modifiedCount;
} catch (error) {
debug("error trying to mark jobs as `enabled`");
throw error;
Expand Down
125 changes: 50 additions & 75 deletions lib/agenda/find-and-lock-next-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,85 +23,60 @@ export const findAndLockNextJob = async function (
const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
debug("_findAndLockNextJob(%s, [Function])", jobName);

// Don't try and access MongoDB if we've lost connection to it.
// Trying to resolve crash on Dev PC when it resumes from sleep. NOTE: Does this still happen?
// @ts-expect-error
const s = this._mdb.s || this._mdb.db.s;
if (
s.topology.connections &&
s.topology.connections().length === 0 &&
!this._mongoUseUnifiedTopology
) {
if (s.topology.autoReconnect && !s.topology.isDestroyed()) {
// Continue processing but notify that Agenda has lost the connection
debug(
"Missing MongoDB connection, not attempting to find and lock a job"
);
this.emit("error", new Error("Lost MongoDB connection"));
} else {
// No longer recoverable
debug(
"topology.autoReconnect: %s, topology.isDestroyed(): %s",
s.topology.autoReconnect,
s.topology.isDestroyed()
);
throw new Error(
"MongoDB connection is not recoverable, application restart required"
);
}
} else {
// /**
// * Query used to find job to run
// * @type {{$and: [*]}}
// */
const JOB_PROCESS_WHERE_QUERY = {
$and: [
{
name: jobName,
disabled: { $ne: true },
},
{
$or: [
{
lockedAt: { $eq: null },
nextRunAt: { $lte: this._nextScanAt },
},
{
lockedAt: { $lte: lockDeadline },
},
],
},
],
};
//**
//* Query used to find job to run
//* @type {{$and: [*]}}
//*/
const JOB_PROCESS_WHERE_QUERY = {
$and: [
{
name: jobName,
disabled: { $ne: true },
},
{
$or: [
{
lockedAt: { $eq: null },
nextRunAt: { $lte: this._nextScanAt },
},
{
lockedAt: { $lte: lockDeadline },
},
],
},
],
};

/**
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
/**
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };

/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnOriginal: false, sort: this._sort };
/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnDocument: "after", sort: this._sort };

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this._collection.findOneAndUpdate(
JOB_PROCESS_WHERE_QUERY,
JOB_PROCESS_SET_QUERY,
JOB_RETURN_QUERY
);
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this._collection.findOneAndUpdate(
JOB_PROCESS_WHERE_QUERY,
JOB_PROCESS_SET_QUERY,
// @ts-ignore
JOB_RETURN_QUERY
);

let job: Job | undefined = undefined;
if (result.value) {
debug(
"found a job available to lock, creating a new job on Agenda with id [%s]",
result.value._id
);
job = createJob(this, result.value);
}
let job: Job | undefined = undefined;
if (result.value) {
debug(
"found a job available to lock, creating a new job on Agenda with id [%s]",
result.value._id
);

return job;
// @ts-ignore
job = createJob(this, result.value);
}

return job;
};
6 changes: 5 additions & 1 deletion lib/agenda/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
Db as MongoDb,
Collection,
MongoClientOptions,
AnyError,
} from "mongodb";
import { JobProcessingQueue } from "./job-processing-queue";
import { cancel } from "./cancel";
Expand Down Expand Up @@ -132,7 +133,10 @@ class Agenda extends EventEmitter {
*/
constructor(
config: AgendaConfig = {},
cb?: (error: Error, collection: Collection<any> | null) => void
cb?: (
error: AnyError | undefined,
collection: Collection<any> | null
) => void
) {
super();

Expand Down
4 changes: 2 additions & 2 deletions lib/agenda/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FilterQuery } from "mongodb";
import { Filter } from "mongodb";
import { Agenda } from ".";
import { Job } from "../job";
import { createJob } from "../utils";
Expand All @@ -15,7 +15,7 @@ import { createJob } from "../utils";
*/
export const jobs = async function (
this: Agenda,
query: FilterQuery<any> = {},
query: Filter<any> = {},
sort = {},
limit = 0,
skip = 0
Expand Down
Loading
0