]> source.dussan.org Git - archiva.git/blob
0a7891f7da4d334f17f2dab8dd2625f87482e1fe
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
32 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.redback.components.scheduler.Scheduler;
34 import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35 import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang.time.StopWatch;
39 import org.quartz.SchedulerException;
40 import org.quartz.impl.JobDetailImpl;
41 import org.quartz.impl.triggers.CronTriggerImpl;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.stereotype.Service;
45
46 import javax.annotation.PostConstruct;
47 import javax.annotation.PreDestroy;
48 import javax.inject.Inject;
49 import javax.inject.Named;
50 import java.text.ParseException;
51 import java.util.ArrayList;
52 import java.util.HashSet;
53 import java.util.List;
54 import java.util.Set;
55
56 /**
57  * Default implementation of a scheduling component for archiva.
58  */
59 @Service( "archivaTaskScheduler#repository" )
60 public class DefaultRepositoryArchivaTaskScheduler
61     implements RepositoryArchivaTaskScheduler, ConfigurationListener
62 {
63     private Logger log = LoggerFactory.getLogger( getClass() );
64
65     /**
66      *
67      */
68     @Inject
69     private Scheduler scheduler;
70
71     @Inject
72     private CronExpressionValidator cronValidator;
73
74     /**
75      *
76      */
77     @Inject
78     @Named( value = "taskQueue#repository-scanning" )
79     private TaskQueue repositoryScanningQueue;
80
81     /**
82      *
83      */
84     @Inject
85     private ArchivaConfiguration archivaConfiguration;
86
87     /**
88      *
89      */
90     @Inject
91     @Named( value = "repositoryStatisticsManager#default" )
92     private RepositoryStatisticsManager repositoryStatisticsManager;
93
94     /**
95      * TODO: could have multiple implementations
96      */
97     @Inject
98     private RepositorySessionFactory repositorySessionFactory;
99
100     private static final String REPOSITORY_SCAN_GROUP = "rg";
101
102     private static final String REPOSITORY_JOB = "rj";
103
104     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
105
106     static final String TASK_QUEUE = "TASK_QUEUE";
107
108     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
109
110     public static final String CRON_HOURLY = "0 0 * * * ?";
111
112     private Set<String> jobs = new HashSet<String>();
113
114     private List<String> queuedRepos = new ArrayList<String>();
115
116     @PostConstruct
117     public void startup()
118         throws ArchivaException
119     {
120
121         StopWatch stopWatch = new StopWatch();
122         stopWatch.start();
123
124         archivaConfiguration.addListener( this );
125
126         List<ManagedRepositoryConfiguration> repositories =
127             archivaConfiguration.getConfiguration().getManagedRepositories();
128
129         RepositorySession repositorySession = repositorySessionFactory.createSession();
130         try
131         {
132             MetadataRepository metadataRepository = repositorySession.getRepository();
133             for ( ManagedRepositoryConfiguration repoConfig : repositories )
134             {
135                 if ( repoConfig.isScanned() )
136                 {
137                     try
138                     {
139                         scheduleRepositoryJobs( repoConfig );
140                     }
141                     catch ( SchedulerException e )
142                     {
143                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
144                     }
145
146                     try
147                     {
148                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
149                         {
150                             queueInitialRepoScan( repoConfig );
151                         }
152                     }
153                     catch ( MetadataRepositoryException e )
154                     {
155                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
156                                       + e.getMessage(), e );
157                     }
158                 }
159             }
160         }
161         finally
162         {
163             repositorySession.close();
164         }
165
166         stopWatch.stop();
167         log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
168     }
169
170
171     @PreDestroy
172     public void stop()
173         throws SchedulerException
174     {
175         for ( String job : jobs )
176         {
177             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
178         }
179         jobs.clear();
180         queuedRepos.clear();
181
182     }
183
184     @SuppressWarnings( "unchecked" )
185     public boolean isProcessingRepositoryTask( String repositoryId )
186     {
187         synchronized ( repositoryScanningQueue )
188         {
189             List<RepositoryTask> queue = null;
190
191             try
192             {
193                 queue = repositoryScanningQueue.getQueueSnapshot();
194             }
195             catch ( TaskQueueException e )
196             {
197                 // not possible with plexus-taskqueue implementation, ignore
198             }
199
200             for ( RepositoryTask queuedTask : queue )
201             {
202                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203                 {
204                     return true;
205                 }
206             }
207             return false;
208         }
209     }
210
211     public boolean isProcessingRepositoryTask( RepositoryTask task )
212     {
213         synchronized ( repositoryScanningQueue )
214         {
215             List<RepositoryTask> queue = null;
216
217             try
218             {
219                 queue = repositoryScanningQueue.getQueueSnapshot();
220             }
221             catch ( TaskQueueException e )
222             {
223                 // not possible with plexus-taskqueue implementation, ignore
224             }
225
226             for ( RepositoryTask queuedTask : queue )
227             {
228                 if ( task.equals( queuedTask ) )
229                 {
230                     return true;
231                 }
232             }
233             return false;
234         }
235     }
236
237     public void queueTask( RepositoryTask task )
238         throws TaskQueueException
239     {
240         synchronized ( repositoryScanningQueue )
241         {
242             if ( isProcessingRepositoryTask( task ) )
243             {
244                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
245             }
246             else
247             {
248                 // add check if the task is already queued if it is a file scan
249                 repositoryScanningQueue.put( task );
250             }
251         }
252     }
253
254     public boolean unQueueTask( RepositoryTask task )
255         throws TaskQueueException
256     {
257         synchronized ( repositoryScanningQueue )
258         {
259             if ( !isProcessingRepositoryTask( task ) )
260             {
261                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
262                 return false;
263             }
264             else
265             {
266                 return repositoryScanningQueue.remove( task );
267             }
268         }
269     }
270
271     public void configurationEvent( ConfigurationEvent event )
272     {
273         if ( event.getType() == ConfigurationEvent.SAVED )
274         {
275             for ( String job : jobs )
276             {
277                 try
278                 {
279                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
280                 }
281                 catch ( SchedulerException e )
282                 {
283                     log.error( "Error restarting the repository scanning job after property change." );
284                 }
285             }
286             jobs.clear();
287
288             List<ManagedRepositoryConfiguration> repositories =
289                 archivaConfiguration.getConfiguration().getManagedRepositories();
290
291             for ( ManagedRepositoryConfiguration repoConfig : repositories )
292             {
293                 if ( repoConfig.getRefreshCronExpression() != null )
294                 {
295                     try
296                     {
297                         scheduleRepositoryJobs( repoConfig );
298                     }
299                     catch ( SchedulerException e )
300                     {
301                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
302                     }
303                 }
304             }
305         }
306     }
307
308     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
309                                          MetadataRepository metadataRepository )
310         throws MetadataRepositoryException
311     {
312         long start = System.currentTimeMillis();
313
314         boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
315
316         long end = System.currentTimeMillis();
317
318         log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
319
320         return res;
321     }
322
323     // MRM-848: Pre-configured repository initially appear to be empty
324     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
325     {
326         String repoId = repoConfig.getId();
327         RepositoryTask task = new RepositoryTask();
328         task.setRepositoryId( repoId );
329
330         if ( !queuedRepos.contains( repoId ) )
331         {
332             log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
333
334             try
335             {
336                 queuedRepos.add( repoConfig.getId() );
337                 this.queueTask( task );
338             }
339             catch ( TaskQueueException e )
340             {
341                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
342             }
343         }
344     }
345
346     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
347         throws SchedulerException
348     {
349         if ( repoConfig.getRefreshCronExpression() == null )
350         {
351             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
352             return;
353         }
354
355         if ( !repoConfig.isScanned() )
356         {
357             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
358             return;
359         }
360
361         // get the cron string for these database scanning jobs
362         String cronString = repoConfig.getRefreshCronExpression();
363
364         if ( !cronValidator.validate( cronString ) )
365         {
366             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
367                       repoConfig.getId() );
368             cronString = CRON_HOURLY;
369         }
370
371         // setup the unprocessed artifact job
372         JobDetailImpl repositoryJob =
373             new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
374                                RepositoryTaskJob.class );
375
376         repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
377         repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
378
379         try
380         {
381             CronTriggerImpl trigger =
382                 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
383                                      cronString );
384
385             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
386             scheduler.scheduleJob( repositoryJob, trigger );
387         }
388         catch ( ParseException e )
389         {
390             log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
391                            + repoConfig.getId() + "': " + e.getMessage() );
392         }
393
394     }
395 }