]> source.dussan.org Git - archiva.git/blob
90a28a3fafea62f63059f3dc57c8050fb97e0c37
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
32 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.redback.components.scheduler.Scheduler;
34 import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35 import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
37 import org.quartz.SchedulerException;
38 import org.quartz.impl.JobDetailImpl;
39 import org.quartz.impl.triggers.CronTriggerImpl;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Service;
43
44 import javax.annotation.PostConstruct;
45 import javax.annotation.PreDestroy;
46 import javax.inject.Inject;
47 import javax.inject.Named;
48 import java.text.ParseException;
49 import java.util.ArrayList;
50 import java.util.HashSet;
51 import java.util.List;
52 import java.util.Set;
53
54 /**
55  * Default implementation of a scheduling component for archiva.
56  */
57 @Service ("archivaTaskScheduler#repository")
58 public class DefaultRepositoryArchivaTaskScheduler
59     implements RepositoryArchivaTaskScheduler, ConfigurationListener
60 {
61     private Logger log = LoggerFactory.getLogger( getClass() );
62
63     /**
64      *
65      */
66     @Inject
67     private Scheduler scheduler;
68
69     @Inject
70     private CronExpressionValidator cronValidator;
71
72     /**
73      *
74      */
75     @Inject
76     @Named (value = "taskQueue#repository-scanning")
77     private TaskQueue repositoryScanningQueue;
78
79     /**
80      *
81      */
82     @Inject
83     private ArchivaConfiguration archivaConfiguration;
84
85     /**
86      *
87      */
88     @Inject
89     @Named (value = "repositoryStatisticsManager#default")
90     private RepositoryStatisticsManager repositoryStatisticsManager;
91
92     /**
93      * TODO: could have multiple implementations
94      */
95     @Inject
96     private RepositorySessionFactory repositorySessionFactory;
97
98     private static final String REPOSITORY_SCAN_GROUP = "rg";
99
100     private static final String REPOSITORY_JOB = "rj";
101
102     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
103
104     static final String TASK_QUEUE = "TASK_QUEUE";
105
106     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
107
108     public static final String CRON_HOURLY = "0 0 * * * ?";
109
110     private Set<String> jobs = new HashSet<String>();
111
112     private List<String> queuedRepos = new ArrayList<String>();
113
114     @PostConstruct
115     public void startup()
116         throws ArchivaException
117     {
118         archivaConfiguration.addListener( this );
119
120         List<ManagedRepositoryConfiguration> repositories =
121             archivaConfiguration.getConfiguration().getManagedRepositories();
122
123         RepositorySession repositorySession = repositorySessionFactory.createSession();
124         try
125         {
126             MetadataRepository metadataRepository = repositorySession.getRepository();
127             for ( ManagedRepositoryConfiguration repoConfig : repositories )
128             {
129                 if ( repoConfig.isScanned() )
130                 {
131                     try
132                     {
133                         scheduleRepositoryJobs( repoConfig );
134                     }
135                     catch ( SchedulerException e )
136                     {
137                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
138                     }
139
140                     try
141                     {
142                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
143                         {
144                             queueInitialRepoScan( repoConfig );
145                         }
146                     }
147                     catch ( MetadataRepositoryException e )
148                     {
149                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
150                                       + e.getMessage(), e );
151                     }
152                 }
153             }
154         }
155         finally
156         {
157             repositorySession.close();
158         }
159
160     }
161
162
163     @PreDestroy
164     public void stop()
165         throws SchedulerException
166     {
167         for ( String job : jobs )
168         {
169             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
170         }
171         jobs.clear();
172         queuedRepos.clear();
173
174     }
175
176     @SuppressWarnings ("unchecked")
177     public boolean isProcessingRepositoryTask( String repositoryId )
178     {
179         synchronized ( repositoryScanningQueue )
180         {
181             List<RepositoryTask> queue = null;
182
183             try
184             {
185                 queue = repositoryScanningQueue.getQueueSnapshot();
186             }
187             catch ( TaskQueueException e )
188             {
189                 // not possible with plexus-taskqueue implementation, ignore
190             }
191
192             for ( RepositoryTask queuedTask : queue )
193             {
194                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
195                 {
196                     return true;
197                 }
198             }
199             return false;
200         }
201     }
202
203     public boolean isProcessingRepositoryTask( RepositoryTask task )
204     {
205         synchronized ( repositoryScanningQueue )
206         {
207             List<RepositoryTask> queue = null;
208
209             try
210             {
211                 queue = repositoryScanningQueue.getQueueSnapshot();
212             }
213             catch ( TaskQueueException e )
214             {
215                 // not possible with plexus-taskqueue implementation, ignore
216             }
217
218             for ( RepositoryTask queuedTask : queue )
219             {
220                 if ( task.equals( queuedTask ) )
221                 {
222                     return true;
223                 }
224             }
225             return false;
226         }
227     }
228
229     public void queueTask( RepositoryTask task )
230         throws TaskQueueException
231     {
232         synchronized ( repositoryScanningQueue )
233         {
234             if ( isProcessingRepositoryTask( task ) )
235             {
236                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
237             }
238             else
239             {
240                 // add check if the task is already queued if it is a file scan
241                 repositoryScanningQueue.put( task );
242             }
243         }
244     }
245
246     public boolean unQueueTask( RepositoryTask task )
247         throws TaskQueueException
248     {
249         synchronized ( repositoryScanningQueue )
250         {
251             if ( !isProcessingRepositoryTask( task ) )
252             {
253                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
254                 return false;
255             }
256             else
257             {
258                 return repositoryScanningQueue.remove( task );
259             }
260         }
261     }
262
263     public void configurationEvent( ConfigurationEvent event )
264     {
265         if ( event.getType() == ConfigurationEvent.SAVED )
266         {
267             for ( String job : jobs )
268             {
269                 try
270                 {
271                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
272                 }
273                 catch ( SchedulerException e )
274                 {
275                     log.error( "Error restarting the repository scanning job after property change." );
276                 }
277             }
278             jobs.clear();
279
280             List<ManagedRepositoryConfiguration> repositories =
281                 archivaConfiguration.getConfiguration().getManagedRepositories();
282
283             for ( ManagedRepositoryConfiguration repoConfig : repositories )
284             {
285                 if ( repoConfig.getRefreshCronExpression() != null )
286                 {
287                     try
288                     {
289                         scheduleRepositoryJobs( repoConfig );
290                     }
291                     catch ( SchedulerException e )
292                     {
293                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
294                     }
295                 }
296             }
297         }
298     }
299
300     @SuppressWarnings ("unchecked")
301     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
302                                          MetadataRepository metadataRepository )
303         throws MetadataRepositoryException
304     {
305         return repositoryStatisticsManager.getLastStatistics( metadataRepository, repoConfig.getId() ) != null;
306     }
307
308     // MRM-848: Pre-configured repository initially appear to be empty
309     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
310     {
311         String repoId = repoConfig.getId();
312         RepositoryTask task = new RepositoryTask();
313         task.setRepositoryId( repoId );
314
315         if ( !queuedRepos.contains( repoId ) )
316         {
317             log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
318
319             try
320             {
321                 queuedRepos.add( repoConfig.getId() );
322                 this.queueTask( task );
323             }
324             catch ( TaskQueueException e )
325             {
326                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
327             }
328         }
329     }
330
331     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
332         throws SchedulerException
333     {
334         if ( repoConfig.getRefreshCronExpression() == null )
335         {
336             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
337             return;
338         }
339
340         if ( !repoConfig.isScanned() )
341         {
342             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
343             return;
344         }
345
346         // get the cron string for these database scanning jobs
347         String cronString = repoConfig.getRefreshCronExpression();
348
349         if ( !cronValidator.validate( cronString ) )
350         {
351             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
352                       repoConfig.getId() );
353             cronString = CRON_HOURLY;
354         }
355
356         // setup the unprocessed artifact job
357         JobDetailImpl repositoryJob =
358             new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
359                                RepositoryTaskJob.class );
360
361         repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
362         repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
363
364         try
365         {
366             CronTriggerImpl trigger =
367                 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
368                                      cronString );
369
370             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
371             scheduler.scheduleJob( repositoryJob, trigger );
372         }
373         catch ( ParseException e )
374         {
375             log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
376                            + repoConfig.getId() + "': " + e.getMessage() );
377         }
378
379     }
380 }