]> source.dussan.org Git - archiva.git/blob
0fe4bf8cdfe8850e058132dd732fb0abf2778d5d
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.metadata.repository.MetadataRepository;
23 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
24 import org.apache.archiva.metadata.repository.RepositorySession;
25 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
26 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
27 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
28 import org.apache.maven.archiva.common.ArchivaException;
29 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
30 import org.apache.maven.archiva.configuration.ConfigurationEvent;
31 import org.apache.maven.archiva.configuration.ConfigurationListener;
32 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
33 import org.codehaus.plexus.taskqueue.TaskQueue;
34 import org.codehaus.plexus.taskqueue.TaskQueueException;
35 import org.codehaus.redback.components.scheduler.CronExpressionValidator;
36 import org.codehaus.redback.components.scheduler.Scheduler;
37 import org.quartz.CronTrigger;
38 import org.quartz.JobDataMap;
39 import org.quartz.JobDetail;
40 import org.quartz.SchedulerException;
41 import org.quartz.impl.JobDetailImpl;
42 import org.quartz.impl.triggers.CronTriggerImpl;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.stereotype.Service;
46
47 import javax.annotation.PostConstruct;
48 import javax.annotation.PreDestroy;
49 import javax.inject.Inject;
50 import javax.inject.Named;
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Set;
56
57 /**
58  * Default implementation of a scheduling component for archiva.
59  * <p/>
60  * plexus.component role="org.apache.archiva.scheduler.ArchivaTaskScheduler" role-hint="repository"
61  */
62 @Service( "archivaTaskScheduler#repository" )
63 public class RepositoryArchivaTaskScheduler
64     implements ArchivaTaskScheduler<RepositoryTask>, ConfigurationListener
65 {
66     private Logger log = LoggerFactory.getLogger( RepositoryArchivaTaskScheduler.class );
67
68     /**
69      * plexus.requirement
70      */
71     @Inject
72     private Scheduler scheduler;
73
74     @Inject
75     private CronExpressionValidator cronValidator;
76
77     /**
78      * plexus.requirement role-hint="repository-scanning"
79      */
80     @Inject
81     @Named( value = "taskQueue#repository-scanning" )
82     private TaskQueue repositoryScanningQueue;
83
84     /**
85      * plexus.requirement
86      */
87     @Inject
88     private ArchivaConfiguration archivaConfiguration;
89
90     /**
91      * plexus.requirement
92      */
93     @Inject
94     @Named( value = "repositoryStatisticsManager#default" )
95     private RepositoryStatisticsManager repositoryStatisticsManager;
96
97     /**
98      * TODO: could have multiple implementations
99      * <p/>
100      * plexus.requirement
101      */
102     @Inject
103     private RepositorySessionFactory repositorySessionFactory;
104
105     private static final String REPOSITORY_SCAN_GROUP = "rg";
106
107     private static final String REPOSITORY_JOB = "rj";
108
109     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
110
111     static final String TASK_QUEUE = "TASK_QUEUE";
112
113     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
114
115     public static final String CRON_HOURLY = "0 0 * * * ?";
116
117     private Set<String> jobs = new HashSet<String>();
118
119     private List<String> queuedRepos = new ArrayList<String>();
120
121     @PostConstruct
122     public void startup()
123         throws ArchivaException
124     {
125         archivaConfiguration.addListener( this );
126
127         List<ManagedRepositoryConfiguration> repositories =
128             archivaConfiguration.getConfiguration().getManagedRepositories();
129
130         RepositorySession repositorySession = repositorySessionFactory.createSession();
131         try
132         {
133             MetadataRepository metadataRepository = repositorySession.getRepository();
134             for ( ManagedRepositoryConfiguration repoConfig : repositories )
135             {
136                 if ( repoConfig.isScanned() )
137                 {
138                     try
139                     {
140                         scheduleRepositoryJobs( repoConfig );
141                     }
142                     catch ( SchedulerException e )
143                     {
144                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
145                     }
146
147                     try
148                     {
149                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
150                         {
151                             queueInitialRepoScan( repoConfig );
152                         }
153                     }
154                     catch ( MetadataRepositoryException e )
155                     {
156                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
157                                       + e.getMessage(), e );
158                     }
159                 }
160             }
161         }
162         finally
163         {
164             repositorySession.close();
165         }
166
167     }
168
169
170     @PreDestroy
171     public void stop()
172         throws SchedulerException
173     {
174         for ( String job : jobs )
175         {
176             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
177         }
178         jobs.clear();
179         queuedRepos.clear();
180
181     }
182
183     @SuppressWarnings( "unchecked" )
184     public boolean isProcessingRepositoryTask( String repositoryId )
185     {
186         synchronized ( repositoryScanningQueue )
187         {
188             List<RepositoryTask> queue = null;
189
190             try
191             {
192                 queue = repositoryScanningQueue.getQueueSnapshot();
193             }
194             catch ( TaskQueueException e )
195             {
196                 // not possible with plexus-taskqueue implementation, ignore
197             }
198
199             for ( RepositoryTask queuedTask : queue )
200             {
201                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
202                 {
203                     return true;
204                 }
205             }
206             return false;
207         }
208     }
209
210     @SuppressWarnings( "unchecked" )
211     private boolean isProcessingRepositoryTask( RepositoryTask task )
212     {
213         synchronized ( repositoryScanningQueue )
214         {
215             List<RepositoryTask> queue = null;
216
217             try
218             {
219                 queue = repositoryScanningQueue.getQueueSnapshot();
220             }
221             catch ( TaskQueueException e )
222             {
223                 // not possible with plexus-taskqueue implementation, ignore
224             }
225
226             for ( RepositoryTask queuedTask : queue )
227             {
228                 if ( task.equals( queuedTask ) )
229                 {
230                     return true;
231                 }
232             }
233             return false;
234         }
235     }
236
237     public void queueTask( RepositoryTask task )
238         throws TaskQueueException
239     {
240         synchronized ( repositoryScanningQueue )
241         {
242             if ( isProcessingRepositoryTask( task ) )
243             {
244                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
245             }
246             else
247             {
248                 // add check if the task is already queued if it is a file scan
249                 repositoryScanningQueue.put( task );
250             }
251         }
252     }
253
254     public boolean unQueueTask( RepositoryTask task )
255         throws TaskQueueException
256     {
257         synchronized ( repositoryScanningQueue )
258         {
259             if ( isProcessingRepositoryTask( task ) )
260             {
261                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
262                 return false;
263             }
264             else
265             {
266                 return repositoryScanningQueue.remove( task );
267             }
268         }
269     }
270     public void configurationEvent( ConfigurationEvent event )
271     {
272         if ( event.getType() == ConfigurationEvent.SAVED )
273         {
274             for ( String job : jobs )
275             {
276                 try
277                 {
278                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
279                 }
280                 catch ( SchedulerException e )
281                 {
282                     log.error( "Error restarting the repository scanning job after property change." );
283                 }
284             }
285             jobs.clear();
286
287             List<ManagedRepositoryConfiguration> repositories =
288                 archivaConfiguration.getConfiguration().getManagedRepositories();
289
290             for ( ManagedRepositoryConfiguration repoConfig : repositories )
291             {
292                 if ( repoConfig.getRefreshCronExpression() != null )
293                 {
294                     try
295                     {
296                         scheduleRepositoryJobs( repoConfig );
297                     }
298                     catch ( SchedulerException e )
299                     {
300                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
301                     }
302                 }
303             }
304         }
305     }
306
307     @SuppressWarnings( "unchecked" )
308     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
309                                          MetadataRepository metadataRepository )
310         throws MetadataRepositoryException
311     {
312         return repositoryStatisticsManager.getLastStatistics( metadataRepository, repoConfig.getId() ) != null;
313     }
314
315     // MRM-848: Pre-configured repository initially appear to be empty
316     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
317     {
318         String repoId = repoConfig.getId();
319         RepositoryTask task = new RepositoryTask();
320         task.setRepositoryId( repoId );
321
322         if ( !queuedRepos.contains( repoId ) )
323         {
324             log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
325
326             try
327             {
328                 queuedRepos.add( repoConfig.getId() );
329                 this.queueTask( task );
330             }
331             catch ( TaskQueueException e )
332             {
333                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
334             }
335         }
336     }
337
338     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
339         throws SchedulerException
340     {
341         if ( repoConfig.getRefreshCronExpression() == null )
342         {
343             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
344             return;
345         }
346
347         if ( !repoConfig.isScanned() )
348         {
349             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
350             return;
351         }
352
353         // get the cron string for these database scanning jobs
354         String cronString = repoConfig.getRefreshCronExpression();
355
356         if ( !cronValidator.validate( cronString ) )
357         {
358             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId()
359                           + "] is invalid.  Defaulting to hourly." );
360             cronString = CRON_HOURLY;
361         }
362
363         // setup the unprocessed artifact job
364         JobDetailImpl repositoryJob =
365             new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
366
367         repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
368         repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
369
370         try
371         {
372             CronTriggerImpl trigger =
373                 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
374
375             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
376             scheduler.scheduleJob( repositoryJob, trigger );
377         }
378         catch ( ParseException e )
379         {
380             log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
381                            + repoConfig.getId() + "': " + e.getMessage() );
382         }
383
384     }
385 }