1 package org.apache.archiva.scheduler.indexing;
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
21 import org.apache.archiva.admin.model.RepositoryAdminException;
22 import org.apache.archiva.admin.model.beans.NetworkProxy;
23 import org.apache.archiva.admin.model.beans.RemoteRepository;
24 import org.apache.archiva.admin.model.remote.RemoteRepositoryAdmin;
25 import org.apache.archiva.proxy.common.WagonFactory;
26 import org.apache.commons.io.FileUtils;
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.commons.lang.time.StopWatch;
29 import org.apache.http.HttpEntity;
30 import org.apache.http.HttpException;
31 import org.apache.http.HttpHost;
32 import org.apache.http.HttpRequest;
33 import org.apache.http.HttpResponse;
34 import org.apache.http.HttpStatus;
35 import org.apache.http.auth.AuthScope;
36 import org.apache.http.auth.UsernamePasswordCredentials;
37 import org.apache.http.client.ClientProtocolException;
38 import org.apache.http.client.methods.HttpGet;
39 import org.apache.http.entity.ContentType;
40 import org.apache.http.impl.client.BasicCredentialsProvider;
41 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
42 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
43 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
44 import org.apache.http.nio.ContentDecoder;
45 import org.apache.http.nio.ContentEncoder;
46 import org.apache.http.nio.IOControl;
47 import org.apache.http.nio.client.methods.ZeroCopyConsumer;
48 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
49 import org.apache.http.protocol.HttpContext;
50 import org.apache.maven.index.context.IndexingContext;
51 import org.apache.maven.index.updater.IndexUpdateRequest;
52 import org.apache.maven.index.updater.IndexUpdater;
53 import org.apache.maven.index.updater.ResourceFetcher;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 import java.io.FileInputStream;
59 import java.io.FileNotFoundException;
60 import java.io.IOException;
61 import java.io.InputStream;
62 import java.lang.reflect.Field;
63 import java.net.MalformedURLException;
65 import java.util.List;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.Future;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.TimeoutException;
73 * @author Olivier Lamy
76 public class DownloadRemoteIndexTask
79 private Logger log = LoggerFactory.getLogger( getClass() );
81 private RemoteRepository remoteRepository;
83 private RemoteRepositoryAdmin remoteRepositoryAdmin;
85 private WagonFactory wagonFactory;
87 private NetworkProxy networkProxy;
89 private boolean fullDownload;
91 private List<String> runningRemoteDownloadIds;
93 private IndexUpdater indexUpdater;
96 public DownloadRemoteIndexTask( DownloadRemoteIndexTaskRequest downloadRemoteIndexTaskRequest,
97 List<String> runningRemoteDownloadIds )
99 this.remoteRepository = downloadRemoteIndexTaskRequest.getRemoteRepository();
100 this.wagonFactory = downloadRemoteIndexTaskRequest.getWagonFactory();
101 this.networkProxy = downloadRemoteIndexTaskRequest.getNetworkProxy();
102 this.fullDownload = downloadRemoteIndexTaskRequest.isFullDownload();
103 this.runningRemoteDownloadIds = runningRemoteDownloadIds;
104 this.indexUpdater = downloadRemoteIndexTaskRequest.getIndexUpdater();
105 this.remoteRepositoryAdmin = downloadRemoteIndexTaskRequest.getRemoteRepositoryAdmin();
111 // so short lock : not sure we need it
112 synchronized ( this.runningRemoteDownloadIds )
114 if ( this.runningRemoteDownloadIds.contains( this.remoteRepository.getId() ) )
116 // skip it as it's running
117 log.info( "skip download index remote for repo {} it's already running",
118 this.remoteRepository.getId() );
121 this.runningRemoteDownloadIds.add( this.remoteRepository.getId() );
123 File tempIndexDirectory = null;
124 StopWatch stopWatch = new StopWatch();
128 log.info( "start download remote index for remote repository {}", this.remoteRepository.getId() );
129 IndexingContext indexingContext = remoteRepositoryAdmin.createIndexContext( this.remoteRepository );
131 // create a temp directory to download files
132 tempIndexDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".tmpIndex" );
133 File indexCacheDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".indexCache" );
134 indexCacheDirectory.mkdirs();
135 if ( tempIndexDirectory.exists() )
137 FileUtils.deleteDirectory( tempIndexDirectory );
139 tempIndexDirectory.mkdirs();
140 tempIndexDirectory.deleteOnExit();
141 String baseIndexUrl = indexingContext.getIndexUpdateUrl();
143 URL indexUrl = new URL( baseIndexUrl );
146 String wagonProtocol = new URL( this.remoteRepository.getUrl() ).getProtocol();
148 final StreamWagon wagon = (StreamWagon) wagonFactory.getWagon(
149 new WagonFactoryRequest( wagonProtocol, this.remoteRepository.getExtraHeaders() ).networkProxy(
150 this.networkProxy ) );
151 int timeoutInMilliseconds = remoteRepository.getTimeout() * 1000;
152 // FIXME olamy having 2 config values
153 wagon.setReadTimeout( timeoutInMilliseconds );
154 wagon.setTimeout( timeoutInMilliseconds );
156 if ( wagon instanceof AbstractHttpClientWagon )
158 HttpConfiguration httpConfiguration = new HttpConfiguration();
159 HttpMethodConfiguration httpMethodConfiguration = new HttpMethodConfiguration();
160 httpMethodConfiguration.setUsePreemptive( true );
161 httpMethodConfiguration.setReadTimeout( timeoutInMilliseconds );
162 httpConfiguration.setGet( httpMethodConfiguration );
163 ( (AbstractHttpClientWagon) wagon ).setHttpConfiguration( httpConfiguration );
166 wagon.addTransferListener( new DownloadListener() );
167 ProxyInfo proxyInfo = null;
168 if ( this.networkProxy != null )
170 proxyInfo = new ProxyInfo();
171 proxyInfo.setHost( this.networkProxy.getHost() );
172 proxyInfo.setPort( this.networkProxy.getPort() );
173 proxyInfo.setUserName( this.networkProxy.getUsername() );
174 proxyInfo.setPassword( this.networkProxy.getPassword() );
176 AuthenticationInfo authenticationInfo = null;
177 if ( this.remoteRepository.getUserName() != null )
179 authenticationInfo = new AuthenticationInfo();
180 authenticationInfo.setUserName( this.remoteRepository.getUserName() );
181 authenticationInfo.setPassword( this.remoteRepository.getPassword() );
183 wagon.connect( new Repository( this.remoteRepository.getId(), baseIndexUrl ), authenticationInfo,
186 //---------------------------------------------
188 HttpAsyncClientBuilder builder = HttpAsyncClientBuilder.create();
190 BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
192 if ( this.networkProxy != null )
194 HttpHost httpHost = new HttpHost( this.networkProxy.getHost(), this.networkProxy.getPort() );
195 builder = builder.setProxy( httpHost );
197 if ( this.networkProxy.getUsername() != null )
199 basicCredentialsProvider.setCredentials(
200 new AuthScope( this.networkProxy.getHost(), this.networkProxy.getPort(), null, null ),
201 new UsernamePasswordCredentials( this.networkProxy.getUsername(),
202 this.networkProxy.getPassword() ) );
207 if ( this.remoteRepository.getUserName() != null )
209 basicCredentialsProvider.setCredentials(
210 new AuthScope( indexUrl.getHost(), indexUrl.getPort(), null, null ),
211 new UsernamePasswordCredentials( this.remoteRepository.getUserName(),
212 this.remoteRepository.getPassword() ) );
216 builder = builder.setDefaultCredentialsProvider( basicCredentialsProvider );
218 File indexDirectory = indexingContext.getIndexDirectoryFile();
219 if ( !indexDirectory.exists() )
221 indexDirectory.mkdirs();
224 CloseableHttpAsyncClient closeableHttpAsyncClient = builder.build();
225 closeableHttpAsyncClient.start();
226 ResourceFetcher resourceFetcher =
227 new ZeroCopyResourceFetcher( log, tempIndexDirectory, remoteRepository, closeableHttpAsyncClient,
230 IndexUpdateRequest request = new IndexUpdateRequest( indexingContext, resourceFetcher );
231 request.setForceFullUpdate( this.fullDownload );
232 request.setLocalIndexCacheDir( indexCacheDirectory );
234 this.indexUpdater.fetchAndUpdateIndex( request );
236 // index packing optionnal ??
237 //IndexPackingRequest indexPackingRequest =
238 // new IndexPackingRequest( indexingContext, indexingContext.getIndexDirectoryFile() );
239 //indexPacker.packIndex( indexPackingRequest );
241 indexingContext.updateTimestamp( true );
244 log.info( "time update index from remote for repository {}: {} s", this.remoteRepository.getId(),
245 ( stopWatch.getTime() / 1000 ) );
249 catch ( MalformedURLException e )
251 log.error( e.getMessage(), e );
252 throw new RuntimeException( e.getMessage(), e );
254 catch ( IOException e )
256 log.error( e.getMessage(), e );
257 throw new RuntimeException( e.getMessage(), e );
259 catch ( RepositoryAdminException e )
261 log.error( e.getMessage(), e );
262 throw new RuntimeException( e.getMessage(), e );
266 deleteDirectoryQuiet( tempIndexDirectory );
267 this.runningRemoteDownloadIds.remove( this.remoteRepository.getId() );
269 log.info( "end download remote index for remote repository {}", this.remoteRepository.getId() );
272 private void deleteDirectoryQuiet( File f )
276 FileUtils.deleteDirectory( f );
278 catch ( IOException e )
280 log.warn( "skip error delete {} : {}", f, e.getMessage() );
284 private static class ZeroCopyConsumerListener
285 extends ZeroCopyConsumer
287 private Logger log = LoggerFactory.getLogger( getClass() );
289 private String resourceName;
291 private long startTime;
293 private long totalLength = 0;
295 //private long currentLength = 0;
297 private ZeroCopyConsumerListener( File file, String resourceName )
298 throws FileNotFoundException
301 this.resourceName = resourceName;
305 protected File process( final HttpResponse response, final File file, final ContentType contentType )
308 if ( response.getStatusLine().getStatusCode() != HttpStatus.SC_OK )
310 throw new ClientProtocolException( "Download failed: " + response.getStatusLine() );
312 long endTime = System.currentTimeMillis();
313 log.info( "end of transfer file {} {} kb: {}s", resourceName, this.totalLength / 1024,
314 ( endTime - startTime ) / 1000 );
319 protected void onContentReceived( ContentDecoder decoder, IOControl ioControl )
322 if ( decoder instanceof LengthDelimitedDecoder )
324 LengthDelimitedDecoder ldl = LengthDelimitedDecoder.class.cast( decoder );
325 long len = getLen( ldl );
328 log.debug( "transfer of {} : {}/{}", resourceName, len / 1024, this.totalLength / 1024 );
332 super.onContentReceived( decoder, ioControl );
336 protected void onResponseReceived( HttpResponse response )
338 this.startTime = System.currentTimeMillis();
339 super.onResponseReceived( response );
340 this.totalLength = response.getEntity().getContentLength();
341 log.info( "start transfer of {}, contentLength: {}", resourceName, this.totalLength / 1024 );
345 protected void onEntityEnclosed( HttpEntity entity, ContentType contentType )
348 super.onEntityEnclosed( entity, contentType );
351 private long getLen( LengthDelimitedDecoder ldl )
355 Field lenField = LengthDelimitedDecoder.class.getDeclaredField( "len" );
356 lenField.setAccessible( true );
357 long len = (Long) lenField.get( ldl );
360 catch ( NoSuchFieldException e )
362 log.debug( e.getMessage(), e );
365 catch ( IllegalAccessException e )
367 log.debug( e.getMessage(), e );
373 private static class ZeroCopyResourceFetcher
374 implements ResourceFetcher
379 File tempIndexDirectory;
381 final RemoteRepository remoteRepository;
383 CloseableHttpAsyncClient httpclient;
387 private ZeroCopyResourceFetcher( Logger log, File tempIndexDirectory, RemoteRepository remoteRepository,
388 CloseableHttpAsyncClient httpclient, String baseIndexUrl )
391 this.tempIndexDirectory = tempIndexDirectory;
392 this.remoteRepository = remoteRepository;
393 this.httpclient = httpclient;
394 this.baseIndexUrl = baseIndexUrl;
397 public void connect( String id, String url )
403 public void disconnect()
409 public InputStream retrieve( final String name )
413 log.info( "index update retrieve file, name:{}", name );
414 File file = new File( tempIndexDirectory, name );
421 ZeroCopyConsumer<File> consumer = new ZeroCopyConsumerListener( file, name );
423 URL targetUrl = new URL( this.baseIndexUrl );
424 final HttpHost targetHost = new HttpHost( targetUrl.getHost(), targetUrl.getPort() );
426 Future<File> httpResponseFuture = httpclient.execute( new HttpAsyncRequestProducer()
429 public HttpHost getTarget()
435 public HttpRequest generateRequest()
436 throws IOException, HttpException
438 StringBuilder url = new StringBuilder( baseIndexUrl );
439 if ( !StringUtils.endsWith( baseIndexUrl, "/" ) )
443 HttpGet httpGet = new HttpGet( url.append( addParameters( name, remoteRepository ) ).toString() );
448 public void produceContent( ContentEncoder encoder, IOControl ioctrl )
455 public void requestCompleted( HttpContext context )
457 log.debug( "requestCompleted" );
461 public void failed( Exception ex )
463 log.error( "http request failed", ex );
467 public boolean isRepeatable()
469 log.debug( "isRepeatable" );
474 public void resetRequest()
477 log.debug( "resetRequest" );
484 log.debug( "close" );
490 int timeOut = this.remoteRepository.getRemoteDownloadTimeout();
491 file = timeOut > 0 ? httpResponseFuture.get( timeOut, TimeUnit.SECONDS ) : httpResponseFuture.get();
493 catch ( InterruptedException e )
495 throw new IOException( e.getMessage(), e );
497 catch ( ExecutionException e )
499 throw new IOException( e.getMessage(), e );
501 catch ( TimeoutException e )
503 throw new IOException( e.getMessage(), e );
505 return new FileInputStream( file );
510 // FIXME remove crappy copy/paste
511 protected static String addParameters( String path, RemoteRepository remoteRepository )
513 if ( remoteRepository.getExtraParameters().isEmpty() )
518 boolean question = false;
520 StringBuilder res = new StringBuilder( path == null ? "" : path );
522 for ( Map.Entry<String, String> entry : remoteRepository.getExtraParameters().entrySet() )
526 res.append( '?' ).append( entry.getKey() ).append( '=' ).append( entry.getValue() );
530 return res.toString();