]> source.dussan.org Git - archiva.git/blob
da624e214d95844f11cfb06a97c7a409c946f670
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
32 import org.apache.archiva.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.components.scheduler.Scheduler;
34 import org.apache.archiva.components.taskqueue.TaskQueue;
35 import org.apache.archiva.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang3.time.StopWatch;
39 import org.quartz.CronScheduleBuilder;
40 import org.quartz.CronTrigger;
41 import org.quartz.JobBuilder;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.quartz.TriggerBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Service;
49
50 import javax.annotation.PostConstruct;
51 import javax.annotation.PreDestroy;
52 import javax.inject.Inject;
53 import javax.inject.Named;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
57 import java.util.Set;
58
59 /**
60  * Default implementation of a scheduling component for archiva.
61  */
62 @Service( "archivaTaskScheduler#repository" )
63 public class DefaultRepositoryArchivaTaskScheduler
64     implements RepositoryArchivaTaskScheduler, ConfigurationListener
65 {
66     private Logger log = LoggerFactory.getLogger( getClass() );
67
68     @Inject
69     private Scheduler scheduler;
70
71     @Inject
72     private CronExpressionValidator cronValidator;
73
74     @Inject
75     @Named( value = "taskQueue#repository-scanning" )
76     private TaskQueue<RepositoryTask> repositoryScanningQueue;
77
78     @Inject
79     private ArchivaConfiguration archivaConfiguration;
80
81     @Inject
82     @Named( value = "repositoryStatisticsManager#default" )
83     private RepositoryStatisticsManager repositoryStatisticsManager;
84
85     /**
86      * TODO: could have multiple implementations
87      */
88     @Inject
89     private RepositorySessionFactory repositorySessionFactory;
90
91     private static final String REPOSITORY_SCAN_GROUP = "rg";
92
93     private static final String REPOSITORY_JOB = "rj";
94
95     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
96
97     static final String TASK_QUEUE = "TASK_QUEUE";
98
99     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100
101     public static final String CRON_HOURLY = "0 0 * * * ?";
102
103     private Set<String> jobs = new HashSet<>();
104
105     private List<String> queuedRepos = new ArrayList<>();
106
107     @PostConstruct
108     public void startup()
109         throws ArchivaException
110     {
111
112         StopWatch stopWatch = new StopWatch();
113         stopWatch.start();
114
115         archivaConfiguration.addListener( this );
116
117         List<ManagedRepositoryConfiguration> repositories =
118             archivaConfiguration.getConfiguration().getManagedRepositories();
119
120         RepositorySession repositorySession = null;
121         try
122         {
123             repositorySession = repositorySessionFactory.createSession();
124         }
125         catch ( MetadataRepositoryException e )
126         {
127             e.printStackTrace( );
128         }
129         try
130         {
131             MetadataRepository metadataRepository = repositorySession.getRepository();
132             for ( ManagedRepositoryConfiguration repoConfig : repositories )
133             {
134                 if ( repoConfig.isScanned() )
135                 {
136                     try
137                     {
138                         scheduleRepositoryJobs( repoConfig );
139                     }
140                     catch ( SchedulerException e )
141                     {
142                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
143                     }
144
145                     try
146                     {
147                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
148                         {
149                             queueInitialRepoScan( repoConfig );
150                         }
151                     }
152                     catch ( MetadataRepositoryException e )
153                     {
154                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
155                                   e.getMessage(), e );
156                     }
157                 }
158             }
159         }
160         finally
161         {
162             repositorySession.close();
163         }
164
165         stopWatch.stop();
166         log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
167     }
168
169
170     @PreDestroy
171     public void stop()
172         throws SchedulerException
173     {
174         for ( String job : jobs )
175         {
176             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
177         }
178         jobs.clear();
179         queuedRepos.clear();
180
181     }
182
183     @SuppressWarnings( "unchecked" )
184     @Override
185     public boolean isProcessingRepositoryTask( String repositoryId )
186     {
187         synchronized ( repositoryScanningQueue )
188         {
189             List<RepositoryTask> queue = null;
190
191             try
192             {
193                 queue = repositoryScanningQueue.getQueueSnapshot();
194             }
195             catch ( TaskQueueException e )
196             {
197                 // not possible with plexus-taskqueue implementation, ignore
198             }
199
200             for ( RepositoryTask queuedTask : queue )
201             {
202                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203                 {
204                     return true;
205                 }
206             }
207             return false;
208         }
209     }
210
211     @Override
212     public boolean isProcessingRepositoryTask( RepositoryTask task )
213     {
214         synchronized ( repositoryScanningQueue )
215         {
216             List<RepositoryTask> queue = null;
217
218             try
219             {
220                 queue = repositoryScanningQueue.getQueueSnapshot();
221             }
222             catch ( TaskQueueException e )
223             {
224                 // not possible with plexus-taskqueue implementation, ignore
225             }
226
227             for ( RepositoryTask queuedTask : queue )
228             {
229                 if ( task.equals( queuedTask ) )
230                 {
231                     return true;
232                 }
233             }
234             return false;
235         }
236     }
237
238     @Override
239     public void queueTask( RepositoryTask task )
240         throws TaskQueueException
241     {
242         synchronized ( repositoryScanningQueue )
243         {
244             if ( isProcessingRepositoryTask( task ) )
245             {
246                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
247             }
248             else
249             {
250                 // add check if the task is already queued if it is a file scan
251                 repositoryScanningQueue.put( task );
252             }
253         }
254     }
255
256     @Override
257     public boolean unQueueTask( RepositoryTask task )
258         throws TaskQueueException
259     {
260         synchronized ( repositoryScanningQueue )
261         {
262             if ( !isProcessingRepositoryTask( task ) )
263             {
264                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
265                 return false;
266             }
267             else
268             {
269                 return repositoryScanningQueue.remove( task );
270             }
271         }
272     }
273
274     @Override
275     public void configurationEvent( ConfigurationEvent event )
276     {
277         if ( event.getType() == ConfigurationEvent.SAVED )
278         {
279             for ( String job : jobs )
280             {
281                 try
282                 {
283                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
284                 }
285                 catch ( SchedulerException e )
286                 {
287                     log.error( "Error restarting the repository scanning job after property change." );
288                 }
289             }
290             jobs.clear();
291
292             List<ManagedRepositoryConfiguration> repositories =
293                 archivaConfiguration.getConfiguration().getManagedRepositories();
294
295             for ( ManagedRepositoryConfiguration repoConfig : repositories )
296             {
297                 if ( repoConfig.getRefreshCronExpression() != null )
298                 {
299                     try
300                     {
301                         scheduleRepositoryJobs( repoConfig );
302                     }
303                     catch ( SchedulerException e )
304                     {
305                         log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
306                     }
307                 }
308             }
309         }
310     }
311
312     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
313                                          MetadataRepository metadataRepository )
314         throws MetadataRepositoryException
315     {
316         long start = System.currentTimeMillis();
317
318         boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() );
319
320         long end = System.currentTimeMillis();
321
322         log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
323
324         return res;
325     }
326
327     // MRM-848: Pre-configured repository initially appear to be empty
328     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
329     {
330         String repoId = repoConfig.getId();
331         RepositoryTask task = new RepositoryTask();
332         task.setRepositoryId( repoId );
333
334         if ( !queuedRepos.contains( repoId ) )
335         {
336             log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
337
338             try
339             {
340                 queuedRepos.add( repoConfig.getId() );
341                 this.queueTask( task );
342             }
343             catch ( TaskQueueException e )
344             {
345                 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
346             }
347         }
348     }
349
350     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
351         throws SchedulerException
352     {
353         if ( repoConfig.getRefreshCronExpression() == null )
354         {
355             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
356             return;
357         }
358
359         if ( !repoConfig.isScanned() )
360         {
361             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
362             return;
363         }
364
365         // get the cron string for these database scanning jobs
366         String cronString = repoConfig.getRefreshCronExpression();
367
368         if ( !cronValidator.validate( cronString ) )
369         {
370             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
371                       repoConfig.getId() );
372             cronString = CRON_HOURLY;
373         }
374
375         JobDataMap jobDataMap = new JobDataMap( );
376         jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
377         jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
378
379         // setup the unprocessed artifact job
380         JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
381                                         .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
382                                         .setJobData( jobDataMap )
383                                         .build();
384
385         try
386         {
387             CronTrigger trigger = TriggerBuilder.newTrigger()
388                     .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
389                     .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
390                     .build();
391
392             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
393             scheduler.scheduleJob( repositoryJob, trigger );
394         }
395         catch ( RuntimeException e )
396         {
397             log.error(
398                 "ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}",
399                 repoConfig.getId(), e.getMessage() );
400         }
401
402     }
403 }