]> source.dussan.org Git - archiva.git/blob
fd3fd09016858c93d39b82e9210bfe1915f5ea29
[archiva.git] /
1 package org.apache.maven.archiva.scheduled;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
29 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
30 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
31 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
32 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
33 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
34 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
35 import org.codehaus.plexus.scheduler.CronExpressionValidator;
36 import org.codehaus.plexus.scheduler.Scheduler;
37 import org.codehaus.plexus.taskqueue.Task;
38 import org.codehaus.plexus.taskqueue.TaskQueue;
39 import org.codehaus.plexus.taskqueue.TaskQueueException;
40 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
41 import org.quartz.CronTrigger;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import java.text.ParseException;
49 import java.util.HashSet;
50 import java.util.List;
51 import java.util.Set;
52
53 /**
54  * Default implementation of a scheduling component for archiva.
55  *
56  * @author <a href="mailto:brett@apache.org">Brett Porter</a>
57  * @author <a href="mailto:jmcconnell@apache.org">Jesse McConnell</a>
58  * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
59  */
60 public class DefaultArchivaTaskScheduler
61     implements ArchivaTaskScheduler, Startable, ConfigurationListener
62 {
63     private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
64     
65     /**
66      * @plexus.requirement
67      */
68     private Scheduler scheduler;
69
70     /**
71      * @plexus.requirement role-hint="database-update"
72      */
73     private TaskQueue databaseUpdateQueue;
74
75     /**
76      * @plexus.requirement role-hint="repository-scanning"
77      */
78     private TaskQueue repositoryScanningQueue;
79
80     /**
81      * @plexus.requirement
82      */
83     private ArchivaConfiguration archivaConfiguration;
84
85     public static final String DATABASE_SCAN_GROUP = "database-group";
86
87     public static final String DATABASE_JOB = "database-job";
88
89     public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
90
91     public static final String REPOSITORY_SCAN_GROUP = "repository-group";
92
93     public static final String REPOSITORY_JOB = "repository-job";
94
95     public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
96
97     public static final String CRON_HOURLY = "0 0 * * * ?";
98
99     private Set<String> jobs = new HashSet<String>();
100
101     public void startup()
102         throws ArchivaException
103     {
104         archivaConfiguration.addListener( this );
105
106         try
107         {
108             start();
109         }
110         catch ( StartingException e )
111         {
112             throw new ArchivaException( e.getMessage(), e );
113         }
114     }
115     
116     public void start()
117         throws StartingException
118     {
119         try
120         {
121             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
122                 .getManagedRepositories();
123
124             for ( ManagedRepositoryConfiguration repoConfig : repositories )
125             {
126                 if ( repoConfig.isScanned() )
127                 {
128                     scheduleRepositoryJobs( repoConfig );
129                 }
130             }
131
132             scheduleDatabaseJobs();
133         }
134         catch ( SchedulerException e )
135         {
136             throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
137         }
138     }
139
140     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
141         throws SchedulerException
142     {
143         if ( repoConfig.getRefreshCronExpression() == null )
144         {
145             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
146             return;
147         }
148
149         // get the cron string for these database scanning jobs
150         String cronString = repoConfig.getRefreshCronExpression();
151
152         CronExpressionValidator cronValidator = new CronExpressionValidator();
153         if ( !cronValidator.validate( cronString ) )
154         {
155             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
156                 "] is invalid.  Defaulting to hourly." );
157             cronString = CRON_HOURLY;
158         }
159
160         // setup the unprocessed artifact job
161         JobDetail repositoryJob =
162             new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
163
164         JobDataMap dataMap = new JobDataMap();
165         dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
166         dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
167         dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
168         repositoryJob.setJobDataMap( dataMap );
169
170         try
171         {
172             CronTrigger trigger =
173                 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
174
175             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
176             scheduler.scheduleJob( repositoryJob, trigger );
177         }
178         catch ( ParseException e )
179         {
180             log.error(
181                 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
182                     repoConfig.getId() + "': " + e.getMessage() );
183         }
184
185     }
186
187     private synchronized void scheduleDatabaseJobs()
188         throws SchedulerException
189     {
190         String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
191
192         // setup the unprocessed artifact job
193         JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
194
195         JobDataMap dataMap = new JobDataMap();
196         dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
197         databaseJob.setJobDataMap( dataMap );
198
199         CronExpressionValidator cronValidator = new CronExpressionValidator();
200         if ( !cronValidator.validate( cronString ) )
201         {
202             log.warn(
203                 "Cron expression [" + cronString + "] for database update is invalid.  Defaulting to hourly." );
204             cronString = CRON_HOURLY;
205         }
206
207         try
208         {
209             CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
210
211             scheduler.scheduleJob( databaseJob, trigger );
212         }
213         catch ( ParseException e )
214         {
215             log.error(
216                 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
217         }
218
219     }
220
221     public void stop()
222         throws StoppingException
223     {
224         try
225         {
226             scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
227
228             for ( String job : jobs )
229             {
230                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
231             }
232             jobs.clear();
233         }
234         catch ( SchedulerException e )
235         {
236             throw new StoppingException( "Unable to unschedule tasks", e );
237         }
238     }
239
240     public void scheduleDatabaseTasks()
241         throws TaskExecutionException
242     {
243         try
244         {
245             scheduleDatabaseJobs();
246         }
247         catch ( SchedulerException e )
248         {
249             throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
250
251         }
252     }
253
254     public boolean isProcessingAnyRepositoryTask()
255         throws ArchivaException
256     {
257         List<? extends Task> queue = null;
258
259         try
260         {
261             queue = repositoryScanningQueue.getQueueSnapshot();
262         }
263         catch ( TaskQueueException e )
264         {
265             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
266         }
267
268         return !queue.isEmpty();
269     }
270
271     public boolean isProcessingRepositoryTask( String repositoryId )
272         throws ArchivaException
273     {
274         List<? extends Task> queue = null;
275
276         try
277         {
278             queue = repositoryScanningQueue.getQueueSnapshot();
279         }
280         catch ( TaskQueueException e )
281         {
282             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
283         }
284
285         return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
286     }
287
288     public boolean isProcessingDatabaseTask()
289         throws ArchivaException
290     {
291         List<? extends Task> queue = null;
292
293         try
294         {
295             queue = databaseUpdateQueue.getQueueSnapshot();
296         }
297         catch ( TaskQueueException e )
298         {
299             throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
300         }
301
302         return !queue.isEmpty();
303     }
304
305     public void queueRepositoryTask( RepositoryTask task )
306         throws TaskQueueException
307     {
308         repositoryScanningQueue.put( task );
309     }
310
311     public void queueDatabaseTask( DatabaseTask task )
312         throws TaskQueueException
313     {
314         databaseUpdateQueue.put( task );
315     }
316
317     public void configurationEvent( ConfigurationEvent event )
318     {
319         if ( event.getType() == ConfigurationEvent.SAVED )
320         {
321             try
322             {
323                 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
324
325                 scheduleDatabaseJobs();
326             }
327             catch ( SchedulerException e )
328             {
329                 log.error( "Error restarting the database scanning job after property change." );
330             }
331
332             for ( String job : jobs )
333             {
334                 try
335                 {
336                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
337                 }
338                 catch ( SchedulerException e )
339                 {
340                     log.error( "Error restarting the repository scanning job after property change." );
341                 }
342             }
343             jobs.clear();
344
345             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
346             .getManagedRepositories();
347
348             for ( ManagedRepositoryConfiguration repoConfig : repositories )
349             {
350                 if ( repoConfig.getRefreshCronExpression() != null )
351                 {
352                     try
353                     {
354                         scheduleRepositoryJobs( repoConfig );
355                     }
356                     catch ( SchedulerException e )
357                     {
358                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
359                     }
360                 }
361             }
362         }
363     }
364 }