aboutsummaryrefslogtreecommitdiffstats
path: root/org.eclipse.jgit/src/org/eclipse/jgit/internal/ketch/KetchSystem.java
blob: 1908d6c98a84082904f76a8d42f95d20065dfd81 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
/*
 * Copyright (C) 2016, Google Inc.
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.ketch;

import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;

import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jgit.annotations.Nullable;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.time.MonotonicClock;
import org.eclipse.jgit.util.time.MonotonicSystemClock;
import org.eclipse.jgit.util.time.ProposedTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Ketch system-wide configuration.
 * <p>
 * This class provides useful defaults for testing and small proof of concepts.
 * Full scale installations are expected to subclass and override methods to
 * provide consistent configuration across all managed repositories.
 * <p>
 * Servers should configure their own
 * {@link java.util.concurrent.ScheduledExecutorService}.
 */
public class KetchSystem {
	private static final Random RNG = new Random();

	/**
	 * Get default executor, one thread per available processor.
	 *
	 * @return default executor, one thread per available processor.
	 */
	public static ScheduledExecutorService defaultExecutor() {
		return DefaultExecutorHolder.I;
	}

	private final ScheduledExecutorService executor;
	private final MonotonicClock clock;
	private final String txnNamespace;
	private final String txnAccepted;
	private final String txnCommitted;
	private final String txnStage;

	/**
	 * Create a default system with a thread pool of 1 thread per CPU.
	 */
	public KetchSystem() {
		this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
	}

	/**
	 * Create a Ketch system with the provided executor service.
	 *
	 * @param executor
	 *            thread pool to run background operations.
	 * @param clock
	 *            clock to create timestamps.
	 * @param txnNamespace
	 *            reference namespace for the RefTree graph and associated
	 *            transaction state. Must begin with {@code "refs/"} and end
	 *            with {@code '/'}, for example {@code "refs/txn/"}.
	 */
	public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
			String txnNamespace) {
		this.executor = executor;
		this.clock = clock;
		this.txnNamespace = txnNamespace;
		this.txnAccepted = txnNamespace + ACCEPTED;
		this.txnCommitted = txnNamespace + COMMITTED;
		this.txnStage = txnNamespace + STAGE;
	}

	/**
	 * Get executor to perform background operations.
	 *
	 * @return executor to perform background operations.
	 */
	public ScheduledExecutorService getExecutor() {
		return executor;
	}

	/**
	 * Get clock to obtain timestamps from.
	 *
	 * @return clock to obtain timestamps from.
	 */
	public MonotonicClock getClock() {
		return clock;
	}

	/**
	 * Get how long the leader will wait for the {@link #getClock()}'s
	 * {@code ProposedTimestamp} used in commits proposed to the RefTree graph
	 * ({@link #getTxnAccepted()})
	 *
	 * @return how long the leader will wait for the {@link #getClock()}'s
	 *         {@code ProposedTimestamp} used in commits proposed to the RefTree
	 *         graph ({@link #getTxnAccepted()}). Defaults to 5 seconds.
	 */
	public Duration getMaxWaitForMonotonicClock() {
		return Duration.ofSeconds(5);
	}

	/**
	 * Whether elections should require monotonically increasing commit
	 * timestamps
	 *
	 * @return {@code true} if elections should require monotonically increasing
	 *         commit timestamps. This requires a very good
	 *         {@link org.eclipse.jgit.util.time.MonotonicClock}.
	 */
	public boolean requireMonotonicLeaderElections() {
		return false;
	}

	/**
	 * Get the namespace used for the RefTree graph and transaction management.
	 *
	 * @return reference namespace such as {@code "refs/txn/"}.
	 */
	public String getTxnNamespace() {
		return txnNamespace;
	}

	/**
	 * Get name of the accepted RefTree graph.
	 *
	 * @return name of the accepted RefTree graph.
	 */
	public String getTxnAccepted() {
		return txnAccepted;
	}

	/**
	 * Get name of the committed RefTree graph.
	 *
	 * @return name of the committed RefTree graph.
	 */
	public String getTxnCommitted() {
		return txnCommitted;
	}

	/**
	 * Get prefix for staged objects, e.g. {@code "refs/txn/stage/"}.
	 *
	 * @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}.
	 */
	public String getTxnStage() {
		return txnStage;
	}

	/**
	 * Create new committer {@code PersonIdent} for ketch system
	 *
	 * @param time
	 *            timestamp for the committer.
	 * @return identity line for the committer header of a RefTreeGraph.
	 */
	public PersonIdent newCommitter(ProposedTimestamp time) {
		String name = "ketch"; //$NON-NLS-1$
		String email = "ketch@system"; //$NON-NLS-1$
		return new PersonIdent(name, email, time);
	}

	/**
	 * Construct a random tag to identify a candidate during leader election.
	 * <p>
	 * Multiple processes trying to elect themselves leaders at exactly the same
	 * time (rounded to seconds) using the same
	 * {@link #newCommitter(ProposedTimestamp)} identity strings, for the same
	 * term, may generate the same ObjectId for the election commit and falsely
	 * assume they have both won.
	 * <p>
	 * Candidates add this tag to their election ballot commit to disambiguate
	 * the election. The tag only needs to be unique for a given triplet of
	 * {@link #newCommitter(ProposedTimestamp)}, system time (rounded to
	 * seconds), and term. If every replica in the system uses a unique
	 * {@code newCommitter} (such as including the host name after the
	 * {@code "@"} in the email address) the tag could be the empty string.
	 * <p>
	 * The default implementation generates a few bytes of random data.
	 *
	 * @return unique tag; null or empty string if {@code newCommitter()} is
	 *         sufficiently unique to identify the leader.
	 */
	@Nullable
	public String newLeaderTag() {
		int n = RNG.nextInt(1 << (6 * 4));
		return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$
	}

	/**
	 * Construct the KetchLeader instance of a repository.
	 *
	 * @param repo
	 *            local repository stored by the leader.
	 * @return leader instance.
	 * @throws java.net.URISyntaxException
	 *             a follower configuration contains an unsupported URI.
	 */
	public KetchLeader createLeader(final Repository repo)
			throws URISyntaxException {
		KetchLeader leader = new KetchLeader(this) {
			@Override
			protected Repository openRepository() {
				repo.incrementOpen();
				return repo;
			}
		};
		leader.setReplicas(createReplicas(leader, repo));
		return leader;
	}

	/**
	 * Get the collection of replicas for a repository.
	 * <p>
	 * The collection of replicas must include the local repository.
	 *
	 * @param leader
	 *            the leader driving these replicas.
	 * @param repo
	 *            repository to get the replicas of.
	 * @return collection of replicas for the specified repository.
	 * @throws java.net.URISyntaxException
	 *             a configured URI is invalid.
	 */
	protected List<KetchReplica> createReplicas(KetchLeader leader,
			Repository repo) throws URISyntaxException {
		List<KetchReplica> replicas = new ArrayList<>();
		Config cfg = repo.getConfig();
		String localName = getLocalName(cfg);
		for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
			if (!hasParticipation(cfg, name)) {
				continue;
			}

			ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
			if (name.equals(localName)) {
				replicas.add(new LocalReplica(leader, name, kc));
				continue;
			}

			RemoteConfig rc = new RemoteConfig(cfg, name);
			List<URIish> uris = rc.getPushURIs();
			if (uris.isEmpty()) {
				uris = rc.getURIs();
			}
			for (URIish uri : uris) {
				String n = uris.size() == 1 ? name : uri.getHost();
				replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
			}
		}
		return replicas;
	}

	private static boolean hasParticipation(Config cfg, String name) {
		return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
	}

	private static String getLocalName(Config cfg) {
		return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
	}

	static class DefaultExecutorHolder {
		private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
		static final ScheduledExecutorService I = create();

		private static ScheduledExecutorService create() {
			int cores = Runtime.getRuntime().availableProcessors();
			int threads = Math.max(5, cores);
			log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$
			return Executors.newScheduledThreadPool(
				threads,
				new ThreadFactory() {
					private final AtomicInteger threadCnt = new AtomicInteger();

					@Override
					public Thread newThread(Runnable r) {
						int id = threadCnt.incrementAndGet();
						Thread thr = new Thread(r);
						thr.setName("KetchExecutor-" + id); //$NON-NLS-1$
						return thr;
					}
				});
		}

		private DefaultExecutorHolder() {
		}
	}

	/**
	 * Compute a delay in a {@code min..max} interval with random jitter.
	 *
	 * @param last
	 *            amount of delay waited before the last attempt. This is used
	 *            to seed the next delay interval. Should be 0 if there was no
	 *            prior delay.
	 * @param min
	 *            shortest amount of allowable delay between attempts.
	 * @param max
	 *            longest amount of allowable delay between attempts.
	 * @return new amount of delay to wait before the next attempt.
	 */
	static long delay(long last, long min, long max) {
		long r = Math.max(0, last * 3 - min);
		if (r > 0) {
			int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
			r = RNG.nextInt(c);
		}
		return Math.max(Math.min(min + r, max), min);
	}
}