1 package org.apache.archiva.scheduler.indexing;
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
21 import org.apache.archiva.admin.model.RepositoryAdminException;
22 import org.apache.archiva.admin.model.beans.NetworkProxy;
23 import org.apache.archiva.admin.model.beans.RemoteRepository;
24 import org.apache.archiva.admin.model.remote.RemoteRepositoryAdmin;
25 import org.apache.archiva.proxy.common.WagonFactory;
26 import org.apache.commons.io.FileUtils;
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.commons.lang.time.StopWatch;
29 import org.apache.http.HttpEntity;
30 import org.apache.http.HttpException;
31 import org.apache.http.HttpHost;
32 import org.apache.http.HttpRequest;
33 import org.apache.http.HttpResponse;
34 import org.apache.http.HttpStatus;
35 import org.apache.http.auth.AuthScope;
36 import org.apache.http.auth.UsernamePasswordCredentials;
37 import org.apache.http.client.ClientProtocolException;
38 import org.apache.http.client.methods.HttpGet;
39 import org.apache.http.entity.ContentType;
40 import org.apache.http.impl.client.BasicCredentialsProvider;
41 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
42 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
43 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
44 import org.apache.http.nio.ContentDecoder;
45 import org.apache.http.nio.ContentEncoder;
46 import org.apache.http.nio.IOControl;
47 import org.apache.http.nio.client.methods.ZeroCopyConsumer;
48 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
49 import org.apache.http.protocol.HttpContext;
50 import org.apache.maven.index.context.IndexingContext;
51 import org.apache.maven.index.updater.IndexUpdateRequest;
52 import org.apache.maven.index.updater.IndexUpdater;
53 import org.apache.maven.index.updater.ResourceFetcher;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 import java.io.FileInputStream;
59 import java.io.FileNotFoundException;
60 import java.io.IOException;
61 import java.io.InputStream;
62 import java.lang.reflect.Field;
63 import java.net.MalformedURLException;
65 import java.util.List;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.Future;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.TimeoutException;
73 * @author Olivier Lamy
76 public class DownloadRemoteIndexTask
79 private Logger log = LoggerFactory.getLogger( getClass() );
81 private RemoteRepository remoteRepository;
83 private RemoteRepositoryAdmin remoteRepositoryAdmin;
85 private WagonFactory wagonFactory;
87 private NetworkProxy networkProxy;
89 private boolean fullDownload;
91 private List<String> runningRemoteDownloadIds;
93 private IndexUpdater indexUpdater;
96 public DownloadRemoteIndexTask( DownloadRemoteIndexTaskRequest downloadRemoteIndexTaskRequest,
97 List<String> runningRemoteDownloadIds )
99 this.remoteRepository = downloadRemoteIndexTaskRequest.getRemoteRepository();
100 this.wagonFactory = downloadRemoteIndexTaskRequest.getWagonFactory();
101 this.networkProxy = downloadRemoteIndexTaskRequest.getNetworkProxy();
102 this.fullDownload = downloadRemoteIndexTaskRequest.isFullDownload();
103 this.runningRemoteDownloadIds = runningRemoteDownloadIds;
104 this.indexUpdater = downloadRemoteIndexTaskRequest.getIndexUpdater();
105 this.remoteRepositoryAdmin = downloadRemoteIndexTaskRequest.getRemoteRepositoryAdmin();
111 // so short lock : not sure we need it
112 synchronized ( this.runningRemoteDownloadIds )
114 if ( this.runningRemoteDownloadIds.contains( this.remoteRepository.getId() ) )
116 // skip it as it's running
117 log.info( "skip download index remote for repo {} it's already running",
118 this.remoteRepository.getId() );
121 this.runningRemoteDownloadIds.add( this.remoteRepository.getId() );
123 File tempIndexDirectory = null;
124 StopWatch stopWatch = new StopWatch();
128 log.info( "start download remote index for remote repository {}", this.remoteRepository.getId() );
129 IndexingContext indexingContext = remoteRepositoryAdmin.createIndexContext( this.remoteRepository );
131 // create a temp directory to download files
132 tempIndexDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".tmpIndex" );
133 File indexCacheDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".indexCache" );
134 indexCacheDirectory.mkdirs();
135 if ( tempIndexDirectory.exists() )
137 FileUtils.deleteDirectory( tempIndexDirectory );
139 tempIndexDirectory.mkdirs();
140 tempIndexDirectory.deleteOnExit();
141 String baseIndexUrl = indexingContext.getIndexUpdateUrl();
143 URL indexUrl = new URL( baseIndexUrl );
146 String wagonProtocol = new URL( this.remoteRepository.getUrl() ).getProtocol();
148 final StreamWagon wagon = (StreamWagon) wagonFactory.getWagon(
149 new WagonFactoryRequest( wagonProtocol, this.remoteRepository.getExtraHeaders() ).networkProxy(
150 this.networkProxy ) );
151 int timeoutInMilliseconds = remoteRepository.getTimeout() * 1000;
152 // FIXME olamy having 2 config values
153 wagon.setReadTimeout( timeoutInMilliseconds );
154 wagon.setTimeout( timeoutInMilliseconds );
156 if ( wagon instanceof AbstractHttpClientWagon )
158 HttpConfiguration httpConfiguration = new HttpConfiguration();
159 HttpMethodConfiguration httpMethodConfiguration = new HttpMethodConfiguration();
160 httpMethodConfiguration.setUsePreemptive( true );
161 httpMethodConfiguration.setReadTimeout( timeoutInMilliseconds );
162 httpConfiguration.setGet( httpMethodConfiguration );
163 ( (AbstractHttpClientWagon) wagon ).setHttpConfiguration( httpConfiguration );
166 wagon.addTransferListener( new DownloadListener() );
167 ProxyInfo proxyInfo = null;
168 if ( this.networkProxy != null )
170 proxyInfo = new ProxyInfo();
171 proxyInfo.setHost( this.networkProxy.getHost() );
172 proxyInfo.setPort( this.networkProxy.getPort() );
173 proxyInfo.setUserName( this.networkProxy.getUsername() );
174 proxyInfo.setPassword( this.networkProxy.getPassword() );
176 AuthenticationInfo authenticationInfo = null;
177 if ( this.remoteRepository.getUserName() != null )
179 authenticationInfo = new AuthenticationInfo();
180 authenticationInfo.setUserName( this.remoteRepository.getUserName() );
181 authenticationInfo.setPassword( this.remoteRepository.getPassword() );
183 wagon.connect( new Repository( this.remoteRepository.getId(), baseIndexUrl ), authenticationInfo,
186 //---------------------------------------------
188 HttpAsyncClientBuilder builder = HttpAsyncClientBuilder.create();
190 BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
192 if ( this.networkProxy != null )
194 HttpHost httpHost = new HttpHost( this.networkProxy.getHost(), this.networkProxy.getPort() );
195 builder = builder.setProxy( httpHost );
197 if ( this.networkProxy.getUsername() != null )
199 basicCredentialsProvider.setCredentials(
200 new AuthScope( this.networkProxy.getHost(), this.networkProxy.getPort(), null, null ),
201 new UsernamePasswordCredentials( this.networkProxy.getUsername(),
202 this.networkProxy.getPassword() ) );
207 if ( this.remoteRepository.getUserName() != null )
209 basicCredentialsProvider.setCredentials(
210 new AuthScope( indexUrl.getHost(), indexUrl.getPort(), null, null ),
211 new UsernamePasswordCredentials( this.remoteRepository.getUserName(),
212 this.remoteRepository.getPassword() ) );
216 builder = builder.setDefaultCredentialsProvider( basicCredentialsProvider );
218 File indexDirectory = indexingContext.getIndexDirectoryFile();
219 if ( !indexDirectory.exists() )
221 indexDirectory.mkdirs();
224 CloseableHttpAsyncClient closeableHttpAsyncClient = builder.build();
225 closeableHttpAsyncClient.start();
226 ResourceFetcher resourceFetcher =
227 new ZeroCopyResourceFetcher( log, tempIndexDirectory, remoteRepository, closeableHttpAsyncClient,
230 IndexUpdateRequest request = new IndexUpdateRequest( indexingContext, resourceFetcher );
231 request.setForceFullUpdate( this.fullDownload );
232 request.setLocalIndexCacheDir( indexCacheDirectory );
234 this.indexUpdater.fetchAndUpdateIndex( request );
236 log.info( "time update index from remote for repository {}: {} s", this.remoteRepository.getId(),
237 ( stopWatch.getTime() / 1000 ) );
239 // index packing optionnal ??
240 //IndexPackingRequest indexPackingRequest =
241 // new IndexPackingRequest( indexingContext, indexingContext.getIndexDirectoryFile() );
242 //indexPacker.packIndex( indexPackingRequest );
243 indexingContext.updateTimestamp( true );
246 catch ( MalformedURLException e )
248 log.error( e.getMessage(), e );
249 throw new RuntimeException( e.getMessage(), e );
251 catch ( IOException e )
253 log.error( e.getMessage(), e );
254 throw new RuntimeException( e.getMessage(), e );
256 catch ( RepositoryAdminException e )
258 log.error( e.getMessage(), e );
259 throw new RuntimeException( e.getMessage(), e );
263 deleteDirectoryQuiet( tempIndexDirectory );
264 this.runningRemoteDownloadIds.remove( this.remoteRepository.getId() );
266 log.info( "end download remote index for remote repository " + this.remoteRepository.getId() );
269 private void deleteDirectoryQuiet( File f )
273 FileUtils.deleteDirectory( f );
275 catch ( IOException e )
277 log.warn( "skip error delete {} : {}", f, e.getMessage() );
281 private static class ZeroCopyConsumerListener
282 extends ZeroCopyConsumer
284 private Logger log = LoggerFactory.getLogger( getClass() );
286 private String resourceName;
288 private long startTime;
290 private long totalLength = 0;
292 //private long currentLength = 0;
294 private ZeroCopyConsumerListener( File file, String resourceName )
295 throws FileNotFoundException
298 this.resourceName = resourceName;
302 protected File process( final HttpResponse response, final File file, final ContentType contentType )
305 if ( response.getStatusLine().getStatusCode() != HttpStatus.SC_OK )
307 throw new ClientProtocolException( "Upload failed: " + response.getStatusLine() );
309 long endTime = System.currentTimeMillis();
310 log.info( "end of transfer file {} {} kb: {}s", resourceName, this.totalLength / 1024,
311 ( endTime - startTime ) / 1000 );
316 protected void onContentReceived( ContentDecoder decoder, IOControl ioControl )
319 if ( decoder instanceof LengthDelimitedDecoder )
321 LengthDelimitedDecoder ldl = LengthDelimitedDecoder.class.cast( decoder );
322 long len = getLen( ldl );
325 log.debug( "transfer of {} : {}/{}", resourceName, len / 1024, this.totalLength / 1024 );
329 super.onContentReceived( decoder, ioControl );
333 protected void onResponseReceived( HttpResponse response )
335 this.startTime = System.currentTimeMillis();
336 super.onResponseReceived( response );
337 this.totalLength = response.getEntity().getContentLength();
338 log.info( "start transfer of {}, contentLength: {}", resourceName, this.totalLength / 1024 );
342 protected void onEntityEnclosed( HttpEntity entity, ContentType contentType )
345 super.onEntityEnclosed( entity, contentType );
348 private long getLen( LengthDelimitedDecoder ldl )
352 Field lenField = LengthDelimitedDecoder.class.getDeclaredField( "len" );
353 lenField.setAccessible( true );
354 long len = (Long) lenField.get( ldl );
357 catch ( NoSuchFieldException e )
359 log.debug( e.getMessage(), e );
362 catch ( IllegalAccessException e )
364 log.debug( e.getMessage(), e );
370 private static class ZeroCopyResourceFetcher
371 implements ResourceFetcher
376 File tempIndexDirectory;
378 final RemoteRepository remoteRepository;
380 CloseableHttpAsyncClient httpclient;
384 private ZeroCopyResourceFetcher( Logger log, File tempIndexDirectory, RemoteRepository remoteRepository,
385 CloseableHttpAsyncClient httpclient, String baseIndexUrl )
388 this.tempIndexDirectory = tempIndexDirectory;
389 this.remoteRepository = remoteRepository;
390 this.httpclient = httpclient;
391 this.baseIndexUrl = baseIndexUrl;
394 public void connect( String id, String url )
400 public void disconnect()
406 public InputStream retrieve( final String name )
410 log.info( "index update retrieve file, name:{}", name );
411 File file = new File( tempIndexDirectory, name );
418 ZeroCopyConsumer<File> consumer = new ZeroCopyConsumerListener( file, name );
420 URL targetUrl = new URL( this.baseIndexUrl );
421 final HttpHost targetHost = new HttpHost( targetUrl.getHost(), targetUrl.getPort() );
423 Future<File> httpResponseFuture = httpclient.execute( new HttpAsyncRequestProducer()
426 public HttpHost getTarget()
432 public HttpRequest generateRequest()
433 throws IOException, HttpException
435 StringBuilder url = new StringBuilder( baseIndexUrl );
436 if ( !StringUtils.endsWith( baseIndexUrl, "/" ) )
440 HttpGet httpGet = new HttpGet( url.append( addParameters( name, remoteRepository ) ).toString() );
445 public void produceContent( ContentEncoder encoder, IOControl ioctrl )
452 public void requestCompleted( HttpContext context )
454 log.debug( "requestCompleted" );
458 public void failed( Exception ex )
460 log.error( "http request failed", ex );
464 public boolean isRepeatable()
466 log.debug( "isRepeatable" );
471 public void resetRequest()
474 log.debug( "resetRequest" );
481 log.debug( "close" );
487 int timeOut = this.remoteRepository.getRemoteDownloadTimeout();
488 file = timeOut > 0 ? httpResponseFuture.get( timeOut, TimeUnit.SECONDS ) : httpResponseFuture.get();
490 catch ( InterruptedException e )
492 throw new IOException( e.getMessage(), e );
494 catch ( ExecutionException e )
496 throw new IOException( e.getMessage(), e );
498 catch ( TimeoutException e )
500 throw new IOException( e.getMessage(), e );
502 return new FileInputStream( file );
507 // FIXME remove crappy copy/paste
508 protected static String addParameters( String path, RemoteRepository remoteRepository )
510 if ( remoteRepository.getExtraParameters().isEmpty() )
515 boolean question = false;
517 StringBuilder res = new StringBuilder( path == null ? "" : path );
519 for ( Map.Entry<String, String> entry : remoteRepository.getExtraParameters().entrySet() )
523 res.append( '?' ).append( entry.getKey() ).append( '=' ).append( entry.getValue() );
527 return res.toString();