1265 lines
47 KiB
Python
1265 lines
47 KiB
Python
import sys
|
|
import warnings
|
|
from abc import ABCMeta, abstractmethod
|
|
from collections.abc import Mapping, MutableMapping
|
|
from contextlib import ExitStack
|
|
from datetime import datetime, timedelta
|
|
from importlib.metadata import entry_points
|
|
from logging import getLogger
|
|
from threading import TIMEOUT_MAX, RLock
|
|
|
|
from tzlocal import get_localzone
|
|
|
|
from apscheduler.events import (
|
|
EVENT_ALL,
|
|
EVENT_ALL_JOBS_REMOVED,
|
|
EVENT_EXECUTOR_ADDED,
|
|
EVENT_EXECUTOR_REMOVED,
|
|
EVENT_JOB_ADDED,
|
|
EVENT_JOB_MAX_INSTANCES,
|
|
EVENT_JOB_MODIFIED,
|
|
EVENT_JOB_REMOVED,
|
|
EVENT_JOB_SUBMITTED,
|
|
EVENT_JOBSTORE_ADDED,
|
|
EVENT_JOBSTORE_REMOVED,
|
|
EVENT_SCHEDULER_PAUSED,
|
|
EVENT_SCHEDULER_RESUMED,
|
|
EVENT_SCHEDULER_SHUTDOWN,
|
|
EVENT_SCHEDULER_STARTED,
|
|
JobEvent,
|
|
JobSubmissionEvent,
|
|
SchedulerEvent,
|
|
)
|
|
from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
from apscheduler.job import Job
|
|
from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError, JobLookupError
|
|
from apscheduler.jobstores.memory import MemoryJobStore
|
|
from apscheduler.schedulers import (
|
|
SchedulerAlreadyRunningError,
|
|
SchedulerNotRunningError,
|
|
)
|
|
from apscheduler.triggers.base import BaseTrigger
|
|
from apscheduler.util import (
|
|
asbool,
|
|
asint,
|
|
astimezone,
|
|
maybe_ref,
|
|
obj_to_ref,
|
|
ref_to_obj,
|
|
undefined,
|
|
)
|
|
|
|
#: constant indicating a scheduler's stopped state
|
|
STATE_STOPPED = 0
|
|
#: constant indicating a scheduler's running state (started and processing jobs)
|
|
STATE_RUNNING = 1
|
|
#: constant indicating a scheduler's paused state (started but not processing jobs)
|
|
STATE_PAUSED = 2
|
|
|
|
|
|
class BaseScheduler(metaclass=ABCMeta):
|
|
"""
|
|
Abstract base class for all schedulers.
|
|
|
|
Takes the following keyword arguments:
|
|
|
|
:param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to
|
|
apscheduler.scheduler)
|
|
:param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone)
|
|
:param int|float jobstore_retry_interval: the minimum number of seconds to wait between
|
|
retries in the scheduler's main loop if the job store raises an exception when getting
|
|
the list of due jobs
|
|
:param dict job_defaults: default values for newly added jobs
|
|
:param dict jobstores: a dictionary of job store alias -> job store instance or configuration
|
|
dict
|
|
:param dict executors: a dictionary of executor alias -> executor instance or configuration
|
|
dict
|
|
|
|
:ivar int state: current running state of the scheduler (one of the following constants from
|
|
``apscheduler.schedulers.base``: ``STATE_STOPPED``, ``STATE_RUNNING``, ``STATE_PAUSED``)
|
|
|
|
.. seealso:: :ref:`scheduler-config`
|
|
"""
|
|
|
|
# The `group=...` API is only available in the backport, used in <=3.7, and in std>=3.10.
|
|
if (3, 8) <= sys.version_info < (3, 10):
|
|
_trigger_plugins = {
|
|
ep.name: ep for ep in entry_points()["apscheduler.triggers"]
|
|
}
|
|
_executor_plugins = {
|
|
ep.name: ep for ep in entry_points()["apscheduler.executors"]
|
|
}
|
|
_jobstore_plugins = {
|
|
ep.name: ep for ep in entry_points()["apscheduler.jobstores"]
|
|
}
|
|
else:
|
|
_trigger_plugins = {
|
|
ep.name: ep for ep in entry_points(group="apscheduler.triggers")
|
|
}
|
|
_executor_plugins = {
|
|
ep.name: ep for ep in entry_points(group="apscheduler.executors")
|
|
}
|
|
_jobstore_plugins = {
|
|
ep.name: ep for ep in entry_points(group="apscheduler.jobstores")
|
|
}
|
|
|
|
_trigger_classes = {}
|
|
_executor_classes = {}
|
|
_jobstore_classes = {}
|
|
|
|
#
|
|
# Public API
|
|
#
|
|
|
|
def __init__(self, gconfig={}, **options):
|
|
super().__init__()
|
|
self._executors = {}
|
|
self._executors_lock = self._create_lock()
|
|
self._jobstores = {}
|
|
self._jobstores_lock = self._create_lock()
|
|
self._listeners = []
|
|
self._listeners_lock = self._create_lock()
|
|
self._pending_jobs = []
|
|
self.state = STATE_STOPPED
|
|
self.configure(gconfig, **options)
|
|
|
|
def __getstate__(self):
|
|
raise TypeError(
|
|
"Schedulers cannot be serialized. Ensure that you are not passing a "
|
|
"scheduler instance as an argument to a job, or scheduling an instance "
|
|
"method where the instance contains a scheduler as an attribute."
|
|
)
|
|
|
|
def configure(self, gconfig={}, prefix="apscheduler.", **options):
|
|
"""
|
|
Reconfigures the scheduler with the given options.
|
|
|
|
Can only be done when the scheduler isn't running.
|
|
|
|
:param dict gconfig: a "global" configuration dictionary whose values can be overridden by
|
|
keyword arguments to this method
|
|
:param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with
|
|
this string (pass an empty string or ``None`` to use all keys)
|
|
:raises SchedulerAlreadyRunningError: if the scheduler is already running
|
|
|
|
"""
|
|
if self.state != STATE_STOPPED:
|
|
raise SchedulerAlreadyRunningError
|
|
|
|
# If a non-empty prefix was given, strip it from the keys in the
|
|
# global configuration dict
|
|
if prefix:
|
|
prefixlen = len(prefix)
|
|
gconfig = dict(
|
|
(key[prefixlen:], value)
|
|
for key, value in gconfig.items()
|
|
if key.startswith(prefix)
|
|
)
|
|
|
|
# Create a structure from the dotted options
|
|
# (e.g. "a.b.c = d" -> {'a': {'b': {'c': 'd'}}})
|
|
config = {}
|
|
for key, value in gconfig.items():
|
|
parts = key.split(".")
|
|
parent = config
|
|
key = parts.pop(0)
|
|
while parts:
|
|
parent = parent.setdefault(key, {})
|
|
key = parts.pop(0)
|
|
parent[key] = value
|
|
|
|
# Override any options with explicit keyword arguments
|
|
config.update(options)
|
|
self._configure(config)
|
|
|
|
def start(self, paused=False):
|
|
"""
|
|
Start the configured executors and job stores and begin processing scheduled jobs.
|
|
|
|
:param bool paused: if ``True``, don't start job processing until :meth:`resume` is called
|
|
:raises SchedulerAlreadyRunningError: if the scheduler is already running
|
|
:raises RuntimeError: if running under uWSGI with threads disabled
|
|
|
|
"""
|
|
if self.state != STATE_STOPPED:
|
|
raise SchedulerAlreadyRunningError
|
|
|
|
self._check_uwsgi()
|
|
|
|
with self._executors_lock:
|
|
# Create a default executor if nothing else is configured
|
|
if "default" not in self._executors:
|
|
self.add_executor(self._create_default_executor(), "default")
|
|
|
|
# Start all the executors
|
|
for alias, executor in self._executors.items():
|
|
executor.start(self, alias)
|
|
|
|
with self._jobstores_lock:
|
|
# Create a default job store if nothing else is configured
|
|
if "default" not in self._jobstores:
|
|
self.add_jobstore(self._create_default_jobstore(), "default")
|
|
|
|
# Start all the job stores
|
|
for alias, store in self._jobstores.items():
|
|
store.start(self, alias)
|
|
|
|
# Schedule all pending jobs
|
|
for job, jobstore_alias, replace_existing in self._pending_jobs:
|
|
self._real_add_job(job, jobstore_alias, replace_existing)
|
|
del self._pending_jobs[:]
|
|
|
|
self.state = STATE_PAUSED if paused else STATE_RUNNING
|
|
self._logger.info("Scheduler started")
|
|
self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_STARTED))
|
|
|
|
if not paused:
|
|
self.wakeup()
|
|
|
|
@abstractmethod
|
|
def shutdown(self, wait=True):
|
|
"""
|
|
Shuts down the scheduler, along with its executors and job stores.
|
|
|
|
Does not interrupt any currently running jobs.
|
|
|
|
:param bool wait: ``True`` to wait until all currently executing jobs have finished
|
|
:raises SchedulerNotRunningError: if the scheduler has not been started yet
|
|
|
|
"""
|
|
if self.state == STATE_STOPPED:
|
|
raise SchedulerNotRunningError
|
|
|
|
self.state = STATE_STOPPED
|
|
|
|
# Shut down all executors
|
|
with self._executors_lock, self._jobstores_lock:
|
|
for executor in self._executors.values():
|
|
executor.shutdown(wait)
|
|
|
|
# Shut down all job stores
|
|
for jobstore in self._jobstores.values():
|
|
jobstore.shutdown()
|
|
|
|
self._logger.info("Scheduler has been shut down")
|
|
self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
|
|
|
|
def pause(self):
|
|
"""
|
|
Pause job processing in the scheduler.
|
|
|
|
This will prevent the scheduler from waking up to do job processing until :meth:`resume`
|
|
is called. It will not however stop any already running job processing.
|
|
|
|
"""
|
|
if self.state == STATE_STOPPED:
|
|
raise SchedulerNotRunningError
|
|
elif self.state == STATE_RUNNING:
|
|
self.state = STATE_PAUSED
|
|
self._logger.info("Paused scheduler job processing")
|
|
self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_PAUSED))
|
|
|
|
def resume(self):
|
|
"""Resume job processing in the scheduler."""
|
|
if self.state == STATE_STOPPED:
|
|
raise SchedulerNotRunningError
|
|
elif self.state == STATE_PAUSED:
|
|
self.state = STATE_RUNNING
|
|
self._logger.info("Resumed scheduler job processing")
|
|
self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_RESUMED))
|
|
self.wakeup()
|
|
|
|
@property
|
|
def running(self):
|
|
"""
|
|
Return ``True`` if the scheduler has been started.
|
|
|
|
This is a shortcut for ``scheduler.state != STATE_STOPPED``.
|
|
|
|
"""
|
|
return self.state != STATE_STOPPED
|
|
|
|
def add_executor(self, executor, alias="default", **executor_opts):
|
|
"""
|
|
Adds an executor to this scheduler.
|
|
|
|
Any extra keyword arguments will be passed to the executor plugin's constructor, assuming
|
|
that the first argument is the name of an executor plugin.
|
|
|
|
:param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor
|
|
instance or the name of an executor plugin
|
|
:param str|unicode alias: alias for the scheduler
|
|
:raises ValueError: if there is already an executor by the given alias
|
|
|
|
"""
|
|
with self._executors_lock:
|
|
if alias in self._executors:
|
|
raise ValueError(
|
|
f'This scheduler already has an executor by the alias of "{alias}"'
|
|
)
|
|
|
|
if isinstance(executor, BaseExecutor):
|
|
self._executors[alias] = executor
|
|
elif isinstance(executor, str):
|
|
self._executors[alias] = executor = self._create_plugin_instance(
|
|
"executor", executor, executor_opts
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
f"Expected an executor instance or a string, got {executor.__class__.__name__} instead"
|
|
)
|
|
|
|
# Start the executor right away if the scheduler is running
|
|
if self.state != STATE_STOPPED:
|
|
executor.start(self, alias)
|
|
|
|
self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_ADDED, alias))
|
|
|
|
def remove_executor(self, alias, shutdown=True):
|
|
"""
|
|
Removes the executor by the given alias from this scheduler.
|
|
|
|
:param str|unicode alias: alias of the executor
|
|
:param bool shutdown: ``True`` to shut down the executor after
|
|
removing it
|
|
|
|
"""
|
|
with self._executors_lock:
|
|
executor = self._lookup_executor(alias)
|
|
del self._executors[alias]
|
|
|
|
if shutdown:
|
|
executor.shutdown()
|
|
|
|
self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_REMOVED, alias))
|
|
|
|
def add_jobstore(self, jobstore, alias="default", **jobstore_opts):
|
|
"""
|
|
Adds a job store to this scheduler.
|
|
|
|
Any extra keyword arguments will be passed to the job store plugin's constructor, assuming
|
|
that the first argument is the name of a job store plugin.
|
|
|
|
:param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
|
|
:param str|unicode alias: alias for the job store
|
|
:raises ValueError: if there is already a job store by the given alias
|
|
|
|
"""
|
|
with self._jobstores_lock:
|
|
if alias in self._jobstores:
|
|
raise ValueError(
|
|
f'This scheduler already has a job store by the alias of "{alias}"'
|
|
)
|
|
|
|
if isinstance(jobstore, BaseJobStore):
|
|
self._jobstores[alias] = jobstore
|
|
elif isinstance(jobstore, str):
|
|
self._jobstores[alias] = jobstore = self._create_plugin_instance(
|
|
"jobstore", jobstore, jobstore_opts
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
f"Expected a job store instance or a string, got {jobstore.__class__.__name__} instead"
|
|
)
|
|
|
|
# Start the job store right away if the scheduler isn't stopped
|
|
if self.state != STATE_STOPPED:
|
|
jobstore.start(self, alias)
|
|
|
|
# Notify listeners that a new job store has been added
|
|
self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_ADDED, alias))
|
|
|
|
# Notify the scheduler so it can scan the new job store for jobs
|
|
if self.state != STATE_STOPPED:
|
|
self.wakeup()
|
|
|
|
def remove_jobstore(self, alias, shutdown=True):
|
|
"""
|
|
Removes the job store by the given alias from this scheduler.
|
|
|
|
:param str|unicode alias: alias of the job store
|
|
:param bool shutdown: ``True`` to shut down the job store after removing it
|
|
|
|
"""
|
|
with self._jobstores_lock:
|
|
jobstore = self._lookup_jobstore(alias)
|
|
del self._jobstores[alias]
|
|
|
|
if shutdown:
|
|
jobstore.shutdown()
|
|
|
|
self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_REMOVED, alias))
|
|
|
|
def add_listener(self, callback, mask=EVENT_ALL):
|
|
"""
|
|
add_listener(callback, mask=EVENT_ALL)
|
|
|
|
Adds a listener for scheduler events.
|
|
|
|
When a matching event occurs, ``callback`` is executed with the event object as its
|
|
sole argument. If the ``mask`` parameter is not provided, the callback will receive events
|
|
of all types.
|
|
|
|
:param callback: any callable that takes one argument
|
|
:param int mask: bitmask that indicates which events should be
|
|
listened to
|
|
|
|
.. seealso:: :mod:`apscheduler.events`
|
|
.. seealso:: :ref:`scheduler-events`
|
|
|
|
"""
|
|
with self._listeners_lock:
|
|
self._listeners.append((callback, mask))
|
|
|
|
def remove_listener(self, callback):
|
|
"""Removes a previously added event listener."""
|
|
|
|
with self._listeners_lock:
|
|
for i, (cb, _) in enumerate(self._listeners):
|
|
if callback == cb:
|
|
del self._listeners[i]
|
|
|
|
def add_job(
|
|
self,
|
|
func,
|
|
trigger=None,
|
|
args=None,
|
|
kwargs=None,
|
|
id=None,
|
|
name=None,
|
|
misfire_grace_time=undefined,
|
|
coalesce=undefined,
|
|
max_instances=undefined,
|
|
next_run_time=undefined,
|
|
jobstore="default",
|
|
executor="default",
|
|
replace_existing=False,
|
|
**trigger_args,
|
|
):
|
|
"""
|
|
add_job(func, trigger=None, args=None, kwargs=None, id=None, \
|
|
name=None, misfire_grace_time=undefined, coalesce=undefined, \
|
|
max_instances=undefined, next_run_time=undefined, \
|
|
jobstore='default', executor='default', \
|
|
replace_existing=False, **trigger_args)
|
|
|
|
Adds the given job to the job list and wakes up the scheduler if it's already running.
|
|
|
|
Any option that defaults to ``undefined`` will be replaced with the corresponding default
|
|
value when the job is scheduled (which happens when the scheduler is started, or
|
|
immediately if the scheduler is already running).
|
|
|
|
The ``func`` argument can be given either as a callable object or a textual reference in
|
|
the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
|
|
importable module and the second half is a reference to the callable object, relative to
|
|
the module.
|
|
|
|
The ``trigger`` argument can either be:
|
|
#. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
|
|
any extra keyword arguments to this method are passed on to the trigger's constructor
|
|
#. an instance of a trigger class
|
|
|
|
:param func: callable (or a textual reference to one) to run at the given time
|
|
:param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
|
|
``func`` is called
|
|
:param list|tuple args: list of positional arguments to call func with
|
|
:param dict kwargs: dict of keyword arguments to call func with
|
|
:param str|unicode id: explicit identifier for the job (for modifying it later)
|
|
:param str|unicode name: textual description of the job
|
|
:param int misfire_grace_time: seconds after the designated runtime that the job is still
|
|
allowed to be run (or ``None`` to allow the job to run no matter how late it is)
|
|
:param bool coalesce: run once instead of many times if the scheduler determines that the
|
|
job should be run more than once in succession
|
|
:param int max_instances: maximum number of concurrently running instances allowed for this
|
|
job
|
|
:param datetime next_run_time: when to first run the job, regardless of the trigger (pass
|
|
``None`` to add the job as paused)
|
|
:param str|unicode jobstore: alias of the job store to store the job in
|
|
:param str|unicode executor: alias of the executor to run the job with
|
|
:param bool replace_existing: ``True`` to replace an existing job with the same ``id``
|
|
(but retain the number of runs from the existing one)
|
|
:rtype: Job
|
|
|
|
"""
|
|
job_kwargs = {
|
|
"trigger": self._create_trigger(trigger, trigger_args),
|
|
"executor": executor,
|
|
"func": func,
|
|
"args": tuple(args) if args is not None else (),
|
|
"kwargs": dict(kwargs) if kwargs is not None else {},
|
|
"id": id,
|
|
"name": name,
|
|
"misfire_grace_time": misfire_grace_time,
|
|
"coalesce": coalesce,
|
|
"max_instances": max_instances,
|
|
"next_run_time": next_run_time,
|
|
}
|
|
job_kwargs = dict(
|
|
(key, value) for key, value in job_kwargs.items() if value is not undefined
|
|
)
|
|
job = Job(self, **job_kwargs)
|
|
|
|
# Don't really add jobs to job stores before the scheduler is up and running
|
|
with self._jobstores_lock:
|
|
if self.state == STATE_STOPPED:
|
|
self._pending_jobs.append((job, jobstore, replace_existing))
|
|
self._logger.info(
|
|
"Adding job tentatively -- it will be properly scheduled when "
|
|
"the scheduler starts"
|
|
)
|
|
else:
|
|
self._real_add_job(job, jobstore, replace_existing)
|
|
|
|
return job
|
|
|
|
def scheduled_job(
|
|
self,
|
|
trigger,
|
|
args=None,
|
|
kwargs=None,
|
|
id=None,
|
|
name=None,
|
|
misfire_grace_time=undefined,
|
|
coalesce=undefined,
|
|
max_instances=undefined,
|
|
next_run_time=undefined,
|
|
jobstore="default",
|
|
executor="default",
|
|
**trigger_args,
|
|
):
|
|
"""
|
|
scheduled_job(trigger, args=None, kwargs=None, id=None, \
|
|
name=None, misfire_grace_time=undefined, \
|
|
coalesce=undefined, max_instances=undefined, \
|
|
next_run_time=undefined, jobstore='default', \
|
|
executor='default',**trigger_args)
|
|
|
|
A decorator version of :meth:`add_job`, except that ``replace_existing`` is always
|
|
``True``.
|
|
|
|
.. important:: The ``id`` argument must be given if scheduling a job in a persistent job
|
|
store. The scheduler cannot, however, enforce this requirement.
|
|
|
|
"""
|
|
|
|
def inner(func):
|
|
self.add_job(
|
|
func,
|
|
trigger,
|
|
args,
|
|
kwargs,
|
|
id,
|
|
name,
|
|
misfire_grace_time,
|
|
coalesce,
|
|
max_instances,
|
|
next_run_time,
|
|
jobstore,
|
|
executor,
|
|
True,
|
|
**trigger_args,
|
|
)
|
|
return func
|
|
|
|
return inner
|
|
|
|
def modify_job(self, job_id, jobstore=None, **changes):
|
|
"""
|
|
Modifies the properties of a single job.
|
|
|
|
Modifications are passed to this method as extra keyword arguments.
|
|
|
|
:param str|unicode job_id: the identifier of the job
|
|
:param str|unicode jobstore: alias of the job store that contains the job
|
|
:return Job: the relevant job instance
|
|
|
|
"""
|
|
with self._jobstores_lock:
|
|
job, jobstore = self._lookup_job(job_id, jobstore)
|
|
job._modify(**changes)
|
|
if jobstore:
|
|
self._lookup_jobstore(jobstore).update_job(job)
|
|
|
|
self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore))
|
|
|
|
# Wake up the scheduler since the job's next run time may have been changed
|
|
if self.state == STATE_RUNNING:
|
|
self.wakeup()
|
|
|
|
return job
|
|
|
|
def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
|
|
"""
|
|
Constructs a new trigger for a job and updates its next run time.
|
|
|
|
Extra keyword arguments are passed directly to the trigger's constructor.
|
|
|
|
:param str|unicode job_id: the identifier of the job
|
|
:param str|unicode jobstore: alias of the job store that contains the job
|
|
:param trigger: alias of the trigger type or a trigger instance
|
|
:return Job: the relevant job instance
|
|
|
|
"""
|
|
trigger = self._create_trigger(trigger, trigger_args)
|
|
now = datetime.now(self.timezone)
|
|
next_run_time = trigger.get_next_fire_time(None, now)
|
|
return self.modify_job(
|
|
job_id, jobstore, trigger=trigger, next_run_time=next_run_time
|
|
)
|
|
|
|
def pause_job(self, job_id, jobstore=None):
|
|
"""
|
|
Causes the given job not to be executed until it is explicitly resumed.
|
|
|
|
:param str|unicode job_id: the identifier of the job
|
|
:param str|unicode jobstore: alias of the job store that contains the job
|
|
:return Job: the relevant job instance
|
|
|
|
"""
|
|
return self.modify_job(job_id, jobstore, next_run_time=None)
|
|
|
|
def resume_job(self, job_id, jobstore=None):
|
|
"""
|
|
Resumes the schedule of the given job, or removes the job if its schedule is finished.
|
|
|
|
:param str|unicode job_id: the identifier of the job
|
|
:param str|unicode jobstore: alias of the job store that contains the job
|
|
:return Job|None: the relevant job instance if the job was rescheduled, or ``None`` if no
|
|
next run time could be calculated and the job was removed
|
|
|
|
"""
|
|
with self._jobstores_lock:
|
|
job, jobstore = self._lookup_job(job_id, jobstore)
|
|
now = datetime.now(self.timezone)
|
|
next_run_time = job.trigger.get_next_fire_time(None, now)
|
|
if next_run_time:
|
|
return self.modify_job(job_id, jobstore, next_run_time=next_run_time)
|
|
else:
|
|
self.remove_job(job.id, jobstore)
|
|
|
|
def get_jobs(self, jobstore=None, pending=None):
|
|
"""
|
|
Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled
|
|
jobs, either from a specific job store or from all of them.
|
|
|
|
If the scheduler has not been started yet, only pending jobs can be returned because the
|
|
job stores haven't been started yet either.
|
|
|
|
:param str|unicode jobstore: alias of the job store
|
|
:param bool pending: **DEPRECATED**
|
|
:rtype: list[Job]
|
|
|
|
"""
|
|
if pending is not None:
|
|
warnings.warn(
|
|
'The "pending" option is deprecated -- get_jobs() always returns '
|
|
"scheduled jobs if the scheduler has been started and pending jobs "
|
|
"otherwise",
|
|
DeprecationWarning,
|
|
)
|
|
|
|
with self._jobstores_lock:
|
|
jobs = []
|
|
if self.state == STATE_STOPPED:
|
|
for job, alias, replace_existing in self._pending_jobs:
|
|
if jobstore is None or alias == jobstore:
|
|
jobs.append(job)
|
|
else:
|
|
for alias, store in self._jobstores.items():
|
|
if jobstore is None or alias == jobstore:
|
|
jobs.extend(store.get_all_jobs())
|
|
|
|
return jobs
|
|
|
|
def get_job(self, job_id, jobstore=None):
|
|
"""
|
|
Returns the Job that matches the given ``job_id``.
|
|
|
|
:param str|unicode job_id: the identifier of the job
|
|
:param str|unicode jobstore: alias of the job store that most likely contains the job
|
|
:return: the Job by the given ID, or ``None`` if it wasn't found
|
|
:rtype: Job
|
|
|
|
"""
|
|
with self._jobstores_lock:
|
|
try:
|
|
return self._lookup_job(job_id, jobstore)[0]
|
|
except JobLookupError:
|
|
return
|
|
|
|
def remove_job(self, job_id, jobstore=None):
|
|
"""
|
|
Removes a job, preventing it from being run any more.
|
|
|
|
:param str|unicode job_id: the identifier of the job
|
|
:param str|unicode jobstore: alias of the job store that contains the job
|
|
:raises JobLookupError: if the job was not found
|
|
|
|
"""
|
|
jobstore_alias = None
|
|
with self._jobstores_lock:
|
|
# Check if the job is among the pending jobs
|
|
if self.state == STATE_STOPPED:
|
|
for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
|
|
if job.id == job_id and jobstore in (None, alias):
|
|
del self._pending_jobs[i]
|
|
jobstore_alias = alias
|
|
break
|
|
else:
|
|
# Otherwise, try to remove it from each store until it succeeds or we run out of
|
|
# stores to check
|
|
for alias, store in self._jobstores.items():
|
|
if jobstore in (None, alias):
|
|
try:
|
|
store.remove_job(job_id)
|
|
jobstore_alias = alias
|
|
break
|
|
except JobLookupError:
|
|
continue
|
|
|
|
if jobstore_alias is None:
|
|
raise JobLookupError(job_id)
|
|
|
|
# Notify listeners that a job has been removed
|
|
event = JobEvent(EVENT_JOB_REMOVED, job_id, jobstore_alias)
|
|
self._dispatch_event(event)
|
|
|
|
self._logger.info("Removed job %s", job_id)
|
|
|
|
def remove_all_jobs(self, jobstore=None):
|
|
"""
|
|
Removes all jobs from the specified job store, or all job stores if none is given.
|
|
|
|
:param str|unicode jobstore: alias of the job store
|
|
|
|
"""
|
|
with self._jobstores_lock:
|
|
if self.state == STATE_STOPPED:
|
|
if jobstore:
|
|
self._pending_jobs = [
|
|
pending
|
|
for pending in self._pending_jobs
|
|
if pending[1] != jobstore
|
|
]
|
|
else:
|
|
self._pending_jobs = []
|
|
else:
|
|
for alias, store in self._jobstores.items():
|
|
if jobstore in (None, alias):
|
|
store.remove_all_jobs()
|
|
|
|
self._dispatch_event(SchedulerEvent(EVENT_ALL_JOBS_REMOVED, jobstore))
|
|
|
|
def print_jobs(self, jobstore=None, out=None):
|
|
"""
|
|
print_jobs(jobstore=None, out=sys.stdout)
|
|
|
|
Prints out a textual listing of all jobs currently scheduled on either all job stores or
|
|
just a specific one.
|
|
|
|
:param str|unicode jobstore: alias of the job store, ``None`` to list jobs from all stores
|
|
:param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is
|
|
given)
|
|
|
|
"""
|
|
out = out or sys.stdout
|
|
with self._jobstores_lock:
|
|
if self.state == STATE_STOPPED:
|
|
print("Pending jobs:", file=out)
|
|
if self._pending_jobs:
|
|
for job, jobstore_alias, replace_existing in self._pending_jobs:
|
|
if jobstore in (None, jobstore_alias):
|
|
print(f" {job}", file=out)
|
|
else:
|
|
print(" No pending jobs", file=out)
|
|
else:
|
|
for alias, store in sorted(self._jobstores.items()):
|
|
if jobstore in (None, alias):
|
|
print(f"Jobstore {alias}:", file=out)
|
|
jobs = store.get_all_jobs()
|
|
if jobs:
|
|
for job in jobs:
|
|
print(f" {job}", file=out)
|
|
else:
|
|
print(" No scheduled jobs", file=out)
|
|
|
|
def export_jobs(self, outfile, jobstore=None):
|
|
"""
|
|
Export stored jobs as JSON.
|
|
|
|
:param outfile: either a file object opened in text write mode ("w"), or a path
|
|
to the target file
|
|
:param jobstore: alias of the job store to export jobs from (if omitted, export
|
|
from all configured job stores)
|
|
|
|
"""
|
|
import json
|
|
import pickle
|
|
from base64 import b64encode
|
|
|
|
from apscheduler import version
|
|
|
|
if self.state == STATE_STOPPED:
|
|
raise RuntimeError(
|
|
"the scheduler must have been started for job export to work"
|
|
)
|
|
|
|
def encode_with_pickle(obj):
|
|
return b64encode(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)).decode("ascii")
|
|
|
|
def json_default(obj):
|
|
if hasattr(obj, "__getstate__") and hasattr(obj, "__setstate__"):
|
|
state = obj.__getstate__()
|
|
if isinstance(state, Mapping):
|
|
return {
|
|
"__apscheduler_class__": obj_to_ref(obj.__class__),
|
|
"__apscheduler_state__": state,
|
|
}
|
|
|
|
return {"__apscheduler_pickle__": encode_with_pickle(obj)}
|
|
|
|
with self._jobstores_lock:
|
|
all_jobs = [
|
|
job
|
|
for alias, store in self._jobstores.items()
|
|
for job in store.get_all_jobs()
|
|
if jobstore in (None, alias)
|
|
]
|
|
|
|
with ExitStack() as stack:
|
|
if not hasattr(outfile, "write"):
|
|
outfile = stack.enter_context(open(outfile, "w"))
|
|
|
|
json.dump(
|
|
{
|
|
"version": 1,
|
|
"scheduler_version": version,
|
|
"jobs": [job.__getstate__() for job in all_jobs],
|
|
},
|
|
outfile,
|
|
default=json_default,
|
|
)
|
|
|
|
def import_jobs(self, infile, jobstore="default"):
|
|
"""
|
|
Import jobs previously exported via :meth:`export_jobs.
|
|
|
|
:param infile: either a file object opened in text read mode ("r") or a path to
|
|
a JSON file containing previously exported jobs
|
|
:param jobstore: the alias of the job store to import the jobs to
|
|
|
|
"""
|
|
import json
|
|
import pickle
|
|
from base64 import b64decode
|
|
|
|
def json_object_hook(dct):
|
|
if pickle_data := dct.get("__apscheduler_pickle__"):
|
|
return pickle.loads(b64decode(pickle_data))
|
|
|
|
if obj_class := dct.get("__apscheduler_class__"):
|
|
if obj_state := dct.get("__apscheduler_state__"):
|
|
obj_class = ref_to_obj(obj_class)
|
|
obj = object.__new__(obj_class)
|
|
obj.__setstate__(obj_state)
|
|
return obj
|
|
|
|
return dct
|
|
|
|
jobstore = self._jobstores[jobstore]
|
|
with ExitStack() as stack:
|
|
if not hasattr(infile, "read"):
|
|
infile = stack.enter_context(open(infile))
|
|
|
|
data = json.load(infile, object_hook=json_object_hook)
|
|
if not isinstance(data, dict):
|
|
raise ValueError()
|
|
|
|
if (version := data.get("version", None)) != 1:
|
|
raise ValueError(f"unrecognized version: {version}")
|
|
|
|
for job_state in data["jobs"]:
|
|
job = object.__new__(Job)
|
|
job.__setstate__(job_state)
|
|
jobstore.add_job(job)
|
|
|
|
@abstractmethod
|
|
def wakeup(self):
|
|
"""
|
|
Notifies the scheduler that there may be jobs due for execution.
|
|
Triggers :meth:`_process_jobs` to be run in an implementation specific manner.
|
|
"""
|
|
|
|
#
|
|
# Private API
|
|
#
|
|
|
|
def _configure(self, config):
|
|
# Set general options
|
|
self._logger = maybe_ref(config.pop("logger", None)) or getLogger(
|
|
"apscheduler.scheduler"
|
|
)
|
|
self.timezone = astimezone(config.pop("timezone", None)) or get_localzone()
|
|
self.jobstore_retry_interval = float(config.pop("jobstore_retry_interval", 10))
|
|
|
|
# Set the job defaults
|
|
job_defaults = config.get("job_defaults", {})
|
|
self._job_defaults = {
|
|
"misfire_grace_time": asint(job_defaults.get("misfire_grace_time", 1)),
|
|
"coalesce": asbool(job_defaults.get("coalesce", True)),
|
|
"max_instances": asint(job_defaults.get("max_instances", 1)),
|
|
}
|
|
|
|
# Configure executors
|
|
self._executors.clear()
|
|
for alias, value in config.get("executors", {}).items():
|
|
if isinstance(value, BaseExecutor):
|
|
self.add_executor(value, alias)
|
|
elif isinstance(value, MutableMapping):
|
|
executor_class = value.pop("class", None)
|
|
plugin = value.pop("type", None)
|
|
if plugin:
|
|
executor = self._create_plugin_instance("executor", plugin, value)
|
|
elif executor_class:
|
|
cls = maybe_ref(executor_class)
|
|
executor = cls(**value)
|
|
else:
|
|
raise ValueError(
|
|
f'Cannot create executor "{alias}" -- either "type" or "class" must be defined'
|
|
)
|
|
|
|
self.add_executor(executor, alias)
|
|
else:
|
|
raise TypeError(
|
|
f"Expected executor instance or dict for executors['{alias}'], got {value.__class__.__name__} instead"
|
|
)
|
|
|
|
# Configure job stores
|
|
self._jobstores.clear()
|
|
for alias, value in config.get("jobstores", {}).items():
|
|
if isinstance(value, BaseJobStore):
|
|
self.add_jobstore(value, alias)
|
|
elif isinstance(value, MutableMapping):
|
|
jobstore_class = value.pop("class", None)
|
|
plugin = value.pop("type", None)
|
|
if plugin:
|
|
jobstore = self._create_plugin_instance("jobstore", plugin, value)
|
|
elif jobstore_class:
|
|
cls = maybe_ref(jobstore_class)
|
|
jobstore = cls(**value)
|
|
else:
|
|
raise ValueError(
|
|
f'Cannot create job store "{alias}" -- either "type" or "class" must be '
|
|
"defined"
|
|
)
|
|
|
|
self.add_jobstore(jobstore, alias)
|
|
else:
|
|
raise TypeError(
|
|
f"Expected job store instance or dict for jobstores['{alias}'], got {value.__class__.__name__} instead"
|
|
)
|
|
|
|
def _create_default_executor(self):
|
|
"""Creates a default executor store, specific to the particular scheduler type."""
|
|
return ThreadPoolExecutor()
|
|
|
|
def _create_default_jobstore(self):
|
|
"""Creates a default job store, specific to the particular scheduler type."""
|
|
return MemoryJobStore()
|
|
|
|
def _lookup_executor(self, alias):
|
|
"""
|
|
Returns the executor instance by the given name from the list of executors that were added
|
|
to this scheduler.
|
|
|
|
:type alias: str
|
|
:raises KeyError: if no executor by the given alias is not found
|
|
|
|
"""
|
|
try:
|
|
return self._executors[alias]
|
|
except KeyError:
|
|
raise KeyError(f"No such executor: {alias}")
|
|
|
|
def _lookup_jobstore(self, alias):
|
|
"""
|
|
Returns the job store instance by the given name from the list of job stores that were
|
|
added to this scheduler.
|
|
|
|
:type alias: str
|
|
:raises KeyError: if no job store by the given alias is not found
|
|
|
|
"""
|
|
try:
|
|
return self._jobstores[alias]
|
|
except KeyError:
|
|
raise KeyError(f"No such job store: {alias}")
|
|
|
|
def _lookup_job(self, job_id, jobstore_alias):
|
|
"""
|
|
Finds a job by its ID.
|
|
|
|
:type job_id: str
|
|
:param str jobstore_alias: alias of a job store to look in
|
|
:return tuple[Job, str]: a tuple of job, jobstore alias (jobstore alias is None in case of
|
|
a pending job)
|
|
:raises JobLookupError: if no job by the given ID is found.
|
|
|
|
"""
|
|
if self.state == STATE_STOPPED:
|
|
# Check if the job is among the pending jobs
|
|
for job, alias, replace_existing in self._pending_jobs:
|
|
if job.id == job_id:
|
|
return job, None
|
|
else:
|
|
# Look in all job stores
|
|
for alias, store in self._jobstores.items():
|
|
if jobstore_alias in (None, alias):
|
|
job = store.lookup_job(job_id)
|
|
if job is not None:
|
|
return job, alias
|
|
|
|
raise JobLookupError(job_id)
|
|
|
|
def _dispatch_event(self, event):
|
|
"""
|
|
Dispatches the given event to interested listeners.
|
|
|
|
:param SchedulerEvent event: the event to send
|
|
|
|
"""
|
|
with self._listeners_lock:
|
|
listeners = tuple(self._listeners)
|
|
|
|
for cb, mask in listeners:
|
|
if event.code & mask:
|
|
try:
|
|
cb(event)
|
|
except BaseException:
|
|
self._logger.exception("Error notifying listener")
|
|
|
|
def _check_uwsgi(self):
|
|
"""Check if we're running under uWSGI with threads disabled."""
|
|
uwsgi_module = sys.modules.get("uwsgi")
|
|
if not getattr(uwsgi_module, "has_threads", True):
|
|
raise RuntimeError(
|
|
"The scheduler seems to be running under uWSGI, but threads have "
|
|
"been disabled. You must run uWSGI with the --enable-threads "
|
|
"option for the scheduler to work."
|
|
)
|
|
|
|
def _real_add_job(self, job, jobstore_alias, replace_existing):
|
|
"""
|
|
:param Job job: the job to add
|
|
:param bool replace_existing: ``True`` to use update_job() in case the job already exists
|
|
in the store
|
|
|
|
"""
|
|
# Fill in undefined values with defaults
|
|
replacements = {}
|
|
for key, value in self._job_defaults.items():
|
|
if not hasattr(job, key):
|
|
replacements[key] = value
|
|
|
|
# Calculate the next run time if there is none defined
|
|
if not hasattr(job, "next_run_time"):
|
|
now = datetime.now(self.timezone)
|
|
replacements["next_run_time"] = job.trigger.get_next_fire_time(None, now)
|
|
|
|
# Apply any replacements
|
|
job._modify(**replacements)
|
|
|
|
# Add the job to the given job store
|
|
store = self._lookup_jobstore(jobstore_alias)
|
|
try:
|
|
store.add_job(job)
|
|
except ConflictingIdError:
|
|
if replace_existing:
|
|
store.update_job(job)
|
|
else:
|
|
raise
|
|
|
|
# Mark the job as no longer pending
|
|
job._jobstore_alias = jobstore_alias
|
|
|
|
# Notify listeners that a new job has been added
|
|
event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
|
|
self._dispatch_event(event)
|
|
|
|
self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)
|
|
|
|
# Notify the scheduler about the new job
|
|
if self.state == STATE_RUNNING:
|
|
self.wakeup()
|
|
|
|
def _create_plugin_instance(self, type_, alias, constructor_kwargs):
|
|
"""Creates an instance of the given plugin type, loading the plugin first if necessary."""
|
|
plugin_container, class_container, base_class = {
|
|
"trigger": (self._trigger_plugins, self._trigger_classes, BaseTrigger),
|
|
"jobstore": (self._jobstore_plugins, self._jobstore_classes, BaseJobStore),
|
|
"executor": (self._executor_plugins, self._executor_classes, BaseExecutor),
|
|
}[type_]
|
|
|
|
try:
|
|
plugin_cls = class_container[alias]
|
|
except KeyError:
|
|
if alias in plugin_container:
|
|
plugin_cls = class_container[alias] = plugin_container[alias].load()
|
|
if not issubclass(plugin_cls, base_class):
|
|
raise TypeError(
|
|
f"The {type_} entry point does not point to a {type_} class"
|
|
)
|
|
else:
|
|
raise LookupError(f'No {type_} by the name "{alias}" was found')
|
|
|
|
return plugin_cls(**constructor_kwargs)
|
|
|
|
def _create_trigger(self, trigger, trigger_args):
|
|
if isinstance(trigger, BaseTrigger):
|
|
return trigger
|
|
elif trigger is None:
|
|
trigger = "date"
|
|
elif not isinstance(trigger, str):
|
|
raise TypeError(
|
|
f"Expected a trigger instance or string, got {trigger.__class__.__name__} instead"
|
|
)
|
|
|
|
# Use the scheduler's time zone if nothing else is specified
|
|
trigger_args.setdefault("timezone", self.timezone)
|
|
|
|
# Instantiate the trigger class
|
|
return self._create_plugin_instance("trigger", trigger, trigger_args)
|
|
|
|
def _create_lock(self):
|
|
"""Creates a reentrant lock object."""
|
|
return RLock()
|
|
|
|
def _process_jobs(self):
|
|
"""
|
|
Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
|
|
to wait for the next round.
|
|
|
|
If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
|
|
``jobstore_retry_interval`` seconds.
|
|
|
|
"""
|
|
if self.state == STATE_PAUSED:
|
|
self._logger.debug("Scheduler is paused -- not processing jobs")
|
|
return None
|
|
|
|
self._logger.debug("Looking for jobs to run")
|
|
now = datetime.now(self.timezone)
|
|
next_wakeup_time = None
|
|
events = []
|
|
|
|
with self._jobstores_lock:
|
|
for jobstore_alias, jobstore in self._jobstores.items():
|
|
try:
|
|
due_jobs = jobstore.get_due_jobs(now)
|
|
except Exception as e:
|
|
# Schedule a wakeup at least in jobstore_retry_interval seconds
|
|
self._logger.warning(
|
|
"Error getting due jobs from job store %r: %s",
|
|
jobstore_alias,
|
|
e,
|
|
)
|
|
retry_wakeup_time = now + timedelta(
|
|
seconds=self.jobstore_retry_interval
|
|
)
|
|
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
|
|
next_wakeup_time = retry_wakeup_time
|
|
|
|
continue
|
|
|
|
for job in due_jobs:
|
|
# Look up the job's executor
|
|
try:
|
|
executor = self._lookup_executor(job.executor)
|
|
except BaseException:
|
|
self._logger.error(
|
|
'Executor lookup ("%s") failed for job "%s" -- removing it from the '
|
|
"job store",
|
|
job.executor,
|
|
job,
|
|
)
|
|
self.remove_job(job.id, jobstore_alias)
|
|
continue
|
|
|
|
run_times = job._get_run_times(now)
|
|
run_times = (
|
|
run_times[-1:] if run_times and job.coalesce else run_times
|
|
)
|
|
if run_times:
|
|
try:
|
|
executor.submit_job(job, run_times)
|
|
except MaxInstancesReachedError:
|
|
self._logger.warning(
|
|
'Execution of job "%s" skipped: maximum number of running '
|
|
"instances reached (%d)",
|
|
job,
|
|
job.max_instances,
|
|
)
|
|
event = JobSubmissionEvent(
|
|
EVENT_JOB_MAX_INSTANCES,
|
|
job.id,
|
|
jobstore_alias,
|
|
run_times,
|
|
)
|
|
events.append(event)
|
|
except BaseException:
|
|
self._logger.exception(
|
|
'Error submitting job "%s" to executor "%s"',
|
|
job,
|
|
job.executor,
|
|
)
|
|
else:
|
|
event = JobSubmissionEvent(
|
|
EVENT_JOB_SUBMITTED, job.id, jobstore_alias, run_times
|
|
)
|
|
events.append(event)
|
|
|
|
# Update the job if it has a next execution time.
|
|
# Otherwise remove it from the job store.
|
|
job_next_run = job.trigger.get_next_fire_time(
|
|
run_times[-1], now
|
|
)
|
|
if job_next_run:
|
|
job._modify(next_run_time=job_next_run)
|
|
jobstore.update_job(job)
|
|
else:
|
|
self.remove_job(job.id, jobstore_alias)
|
|
|
|
# Set a new next wakeup time if there isn't one yet or
|
|
# the jobstore has an even earlier one
|
|
jobstore_next_run_time = jobstore.get_next_run_time()
|
|
if jobstore_next_run_time and (
|
|
next_wakeup_time is None
|
|
or jobstore_next_run_time < next_wakeup_time
|
|
):
|
|
next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)
|
|
|
|
# Dispatch collected events
|
|
for event in events:
|
|
self._dispatch_event(event)
|
|
|
|
# Determine the delay until this method should be called again
|
|
if self.state == STATE_PAUSED:
|
|
wait_seconds = None
|
|
self._logger.debug("Scheduler is paused; waiting until resume() is called")
|
|
elif next_wakeup_time is None:
|
|
wait_seconds = None
|
|
self._logger.debug("No jobs; waiting until a job is added")
|
|
else:
|
|
now = datetime.now(self.timezone)
|
|
wait_seconds = min(
|
|
max((next_wakeup_time - now).total_seconds(), 0), TIMEOUT_MAX
|
|
)
|
|
self._logger.debug(
|
|
"Next wakeup is due at %s (in %f seconds)",
|
|
next_wakeup_time,
|
|
wait_seconds,
|
|
)
|
|
|
|
return wait_seconds
|