]> source.dussan.org Git - archiva.git/blob
ce02452a6c1435d5e34418f16c3ca9292cac829c
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
23 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
24 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
25 import org.apache.maven.archiva.common.ArchivaException;
26 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
27 import org.apache.maven.archiva.configuration.ConfigurationEvent;
28 import org.apache.maven.archiva.configuration.ConfigurationListener;
29 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
30 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
31 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
32 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
33 import org.codehaus.plexus.scheduler.CronExpressionValidator;
34 import org.codehaus.plexus.scheduler.Scheduler;
35 import org.codehaus.plexus.taskqueue.TaskQueue;
36 import org.codehaus.plexus.taskqueue.TaskQueueException;
37 import org.quartz.CronTrigger;
38 import org.quartz.JobDataMap;
39 import org.quartz.JobDetail;
40 import org.quartz.SchedulerException;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.text.ParseException;
45 import java.util.ArrayList;
46 import java.util.HashSet;
47 import java.util.List;
48 import java.util.Set;
49
50 /**
51  * Default implementation of a scheduling component for archiva.
52  *
53  * @plexus.component role="org.apache.archiva.scheduler.ArchivaTaskScheduler" role-hint="repository"
54  */
55 public class RepositoryArchivaTaskScheduler
56     implements ArchivaTaskScheduler<RepositoryTask>, Startable, ConfigurationListener
57 {
58     private Logger log = LoggerFactory.getLogger( RepositoryArchivaTaskScheduler.class );
59
60     /**
61      * @plexus.requirement
62      */
63     private Scheduler scheduler;
64
65     /**
66      * @plexus.requirement role-hint="repository-scanning"
67      */
68     private TaskQueue repositoryScanningQueue;
69
70     /**
71      * @plexus.requirement
72      */
73     private ArchivaConfiguration archivaConfiguration;
74
75     /**
76      * @plexus.requirement
77      */
78     private RepositoryStatisticsManager repositoryStatisticsManager;
79
80     private static final String REPOSITORY_SCAN_GROUP = "rg";
81
82     private static final String REPOSITORY_JOB = "rj";
83
84     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
85
86     static final String TASK_QUEUE = "TASK_QUEUE";
87
88     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
89
90     public static final String CRON_HOURLY = "0 0 * * * ?";
91
92     private Set<String> jobs = new HashSet<String>();
93
94     private List<String> queuedRepos = new ArrayList<String>();
95
96     public void startup()
97         throws ArchivaException
98     {
99         archivaConfiguration.addListener( this );
100
101         try
102         {
103             start();
104         }
105         catch ( StartingException e )
106         {
107             throw new ArchivaException( e.getMessage(), e );
108         }
109     }
110
111     public void start()
112         throws StartingException
113     {
114         List<ManagedRepositoryConfiguration> repositories =
115             archivaConfiguration.getConfiguration().getManagedRepositories();
116
117         for ( ManagedRepositoryConfiguration repoConfig : repositories )
118         {
119             if ( repoConfig.isScanned() )
120             {
121                 try
122                 {
123                     scheduleRepositoryJobs( repoConfig );
124                 }
125                 catch ( SchedulerException e )
126                 {
127                     throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
128                 }
129
130                 try
131                 {
132                     if ( !isPreviouslyScanned( repoConfig ) )
133                     {
134                         queueInitialRepoScan( repoConfig );
135                     }
136                 }
137                 catch ( MetadataRepositoryException e )
138                 {
139                     log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: " +
140                                   e.getMessage(), e );
141                 }
142             }
143         }
144     }
145
146     public void stop()
147         throws StoppingException
148     {
149         try
150         {
151             for ( String job : jobs )
152             {
153                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
154             }
155             jobs.clear();
156             queuedRepos.clear();
157         }
158         catch ( SchedulerException e )
159         {
160             throw new StoppingException( "Unable to unschedule tasks", e );
161         }
162     }
163
164     @SuppressWarnings( "unchecked" )
165     public boolean isProcessingRepositoryTask( String repositoryId )
166     {
167         synchronized ( repositoryScanningQueue )
168         {
169             List<RepositoryTask> queue = null;
170
171             try
172             {
173                 queue = repositoryScanningQueue.getQueueSnapshot();
174             }
175             catch ( TaskQueueException e )
176             {
177                 // not possible with plexus-taskqueue implementation, ignore
178             }
179
180             for ( RepositoryTask queuedTask : queue )
181             {
182                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
183                 {
184                     return true;
185                 }
186             }
187             return false;
188         }
189     }
190
191     @SuppressWarnings( "unchecked" )
192     private boolean isProcessingRepositoryTask( RepositoryTask task )
193     {
194         synchronized ( repositoryScanningQueue )
195         {
196             List<RepositoryTask> queue = null;
197
198             try
199             {
200                 queue = repositoryScanningQueue.getQueueSnapshot();
201             }
202             catch ( TaskQueueException e )
203             {
204                 // not possible with plexus-taskqueue implementation, ignore
205             }
206
207             for ( RepositoryTask queuedTask : queue )
208             {
209                 if ( task.equals( queuedTask ) )
210                 {
211                     return true;
212                 }
213             }
214             return false;
215         }
216     }
217
218     public void queueTask( RepositoryTask task )
219         throws TaskQueueException
220     {
221         synchronized ( repositoryScanningQueue )
222         {
223             if ( isProcessingRepositoryTask( task ) )
224             {
225                 log.debug( "Repository task '" + task + "' is already queued. Skipping task." );
226             }
227             else
228             {
229                 // add check if the task is already queued if it is a file scan
230                 repositoryScanningQueue.put( task );
231             }
232         }
233     }
234
235     public void configurationEvent( ConfigurationEvent event )
236     {
237         if ( event.getType() == ConfigurationEvent.SAVED )
238         {
239             for ( String job : jobs )
240             {
241                 try
242                 {
243                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
244                 }
245                 catch ( SchedulerException e )
246                 {
247                     log.error( "Error restarting the repository scanning job after property change." );
248                 }
249             }
250             jobs.clear();
251
252             List<ManagedRepositoryConfiguration> repositories =
253                 archivaConfiguration.getConfiguration().getManagedRepositories();
254
255             for ( ManagedRepositoryConfiguration repoConfig : repositories )
256             {
257                 if ( repoConfig.getRefreshCronExpression() != null )
258                 {
259                     try
260                     {
261                         scheduleRepositoryJobs( repoConfig );
262                     }
263                     catch ( SchedulerException e )
264                     {
265                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
266                     }
267                 }
268             }
269         }
270     }
271
272     @SuppressWarnings( "unchecked" )
273     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
274         throws MetadataRepositoryException
275     {
276         return repositoryStatisticsManager.getLastStatistics( repoConfig.getId() ) != null;
277     }
278
279     // MRM-848: Pre-configured repository initially appear to be empty
280     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
281     {
282         String repoId = repoConfig.getId();
283         RepositoryTask task = new RepositoryTask();
284         task.setRepositoryId( repoId );
285
286         if ( !queuedRepos.contains( repoId ) )
287         {
288             log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
289
290             try
291             {
292                 queuedRepos.add( repoConfig.getId() );
293                 this.queueTask( task );
294             }
295             catch ( TaskQueueException e )
296             {
297                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
298             }
299         }
300     }
301
302     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
303         throws SchedulerException
304     {
305         if ( repoConfig.getRefreshCronExpression() == null )
306         {
307             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
308             return;
309         }
310
311         if ( !repoConfig.isScanned() )
312         {
313             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
314             return;
315         }
316
317         // get the cron string for these database scanning jobs
318         String cronString = repoConfig.getRefreshCronExpression();
319
320         CronExpressionValidator cronValidator = new CronExpressionValidator();
321         if ( !cronValidator.validate( cronString ) )
322         {
323             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
324                           "] is invalid.  Defaulting to hourly." );
325             cronString = CRON_HOURLY;
326         }
327
328         // setup the unprocessed artifact job
329         JobDetail repositoryJob = new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
330                                                  RepositoryTaskJob.class );
331
332         JobDataMap dataMap = new JobDataMap();
333         dataMap.put( TASK_QUEUE, repositoryScanningQueue );
334         dataMap.put( TASK_REPOSITORY, repoConfig.getId() );
335         repositoryJob.setJobDataMap( dataMap );
336
337         try
338         {
339             CronTrigger trigger = new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(),
340                                                    REPOSITORY_SCAN_GROUP, cronString );
341
342             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
343             scheduler.scheduleJob( repositoryJob, trigger );
344         }
345         catch ( ParseException e )
346         {
347             log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '" +
348                            repoConfig.getId() + "': " + e.getMessage() );
349         }
350
351     }
352 }