TinkerGraph源码学习(三)

接着上次的源码学习(二). 开始最后一个旅程,阅读遍历和OLAP的设计实现. 暂时是原始版,大概完成度30%, 后续会大量精简源码, 补充大量理解~

0x00.结构梳理

首先,从TinkerGraph的代码结构来看,整个Process下分为了图计算图遍历,但是着重去实现了图计算的部分,因此我们来看看到底这两者是什么关系,图计算是如何做的? 它依赖什么样的结构?

  1. computer(8个)

    • TinkerGraphComputer (图计算核心,320行)
    • TinkerGraphComputerView (单独类,230行)if (null != edges)
    • TinkerMapEmitter (map,70行)
    • TinerReduceEmitter (reduce,50行)
    • TinkerMemory (图计算数据结构,150行)
    • TinkerWorkerPool (资源池,110行)
    • TinkerMessenger (消息分发,120行)
    • TinkerMessengerBoard (实体类,50行)

    图计算部分,我们先看看所需的数据结构, 然后看看它的资源池,再看它的核心计算实现,最后看看MR的实现和消息分发

  2. traversal(2个)

    • TinkerGraphStep (遍历核心,120行)
    • TinkerGraphStepStrategy (单例,70行)

traversasal的地方只看TinkerGraph会觉得粒度很粗, 建议结合TinkerPop的文档一起看(有一个#travelsalstrategy 章节) ,对照着gremlin语句和实际的代码会理解深很多.

0x01.Traversal部分

A.Traversal结构

TinkerGraph在这根据传统查询OLTP 和图计算式查询(OLAP) 做了不同的判断, 详细的遍历分析已经拆分到了图读取分析(一)

B. Traversal代码

这一部分是读取/遍历单独的代码, 建议结合TinkerPop文档关于#TraversalStrategty 章节的搭配解释.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//traversal--读取/遍历的核心实现, 注意仔细看并不多.
public final class TinkerGraphStepStrategy extends AbsTraversalStrategy<TraversalStrategy.ProviderOptimizationStrategy> implements TraversalStrategy.ProviderOptimizationStrategy {
private static final TinkerGraphStepStrategy INSTANCE = new
TinkerGraphStepStrategy(); //遍历是单例实现

@Override
/*这就是核心的实现,根据每个step对象遍历着扫,扫完标记
*然后获得下一个step对象, 删除当前step.依次遍历到结束.
*/
public void apply(final Traversal.Admin<?, ?> traversal) {
if (TraversalHelper.onGraphComputer(traversal))
return;

for (final GraphStep originalGraphStep : TraversalHelper.getStepsOfClass(GraphStep.class, traversal)) {
final TinkerGraphStep<?, ?> tinkerGraphStep = new TinkerGraphStep<>(originalGraphStep);
TraversalHelper.replaceStep(originalGraphStep, tinkerGraphStep, traversal);
Step<?, ?> currentStep = tinkerGraphStep.getNextStep();
while (currentStep instanceof HasStep || currentStep instanceof NoOpBarrierStep) {
if (currentStep instanceof HasStep) {
for (final HasContainer hasContainer : ((HasContainerHolder) currentStep).getHasContainers()) {
if (!GraphStep.processHasContainerIds(tinkerGraphStep, hasContainer))
tinkerGraphStep.addHasContainer(hasContainer);
}
TraversalHelper.copyLabels(currentStep, currentStep.getPreviousStep(), false);
traversal.removeStep(currentStep);
}
currentStep = currentStep.getNextStep();
}
}
}

public static TinkerGraphStepStrategy instance() {
return INSTANCE;
}
}

public final class TinkerGraphStep<S, E extends Element> extends GraphStep<S, E> implements HasContainerHolder {

private final List<HasContainer> hasContainers = new ArrayList<>();

public TinkerGraphStep(final GraphStep<S, E> originalGraphStep) {
super(originalGraphStep.getTraversal(), originalGraphStep.getReturnClass(), originalGraphStep.isStartStep(), originalGraphStep.getIds());
originalGraphStep.getLabels().forEach(this::addLabel);

// we used to only setIteratorSupplier() if there were no ids OR the first id was instanceof Element,
// but that allowed the filter in g.V(v).has('k','v') to be ignored. this created problems for
// 分区策略是为了避免某些查询通过"v"到另一个遍历源的时候,使用了不同的分区去搜索.
this.setIteratorSupplier(() -> (Iterator<E>) (Vertex.class.isAssignableFrom(this.returnClass) ? this.vertices() : this.edges()));
}

private Iterator<? extends Edge> edges() {
final TinkerGraph graph = (TinkerGraph) this.getTraversal().getGraph().get();
final HasContainer indexedContainer = getIndexKey(Edge.class);
// 先过滤id
if (this.ids != null && this.ids.length > 0)
return this.iteratorList(graph.edges(this.ids));
else
return null == indexedContainer ?
this.iteratorList(graph.edges()) :
TinkerHelper.queryEdgeIndex(graph, indexedContainer.getKey(), indexedContainer.getPredicate().getValue()).stream()
.filter(edge -> HasContainer.testAll(edge, this.hasContainers))
.collect(Collectors.<Edge>toList()).iterator();
}

private Iterator<? extends Vertex> vertices() {
final TinkerGraph graph = (TinkerGraph)
this.getTraversal().getGraph().get();
final HasContainer indexedContainer = getIndexKey(Vertex.class);
if (this.ids != null && this.ids.length > 0)
return this.iteratorList(graph.vertices(this.ids));
else
return null == indexedContainer ?
this.iteratorList(graph.vertices()) :
IteratorUtils.filter(TinkerHelper.queryVertexIndex(graph,
indexedContainer.getKey(), indexedContainer.getPredicate().getValue()).iterator(),
vertex -> HasContainer.testAll(vertex,this.hasContainers));
}

private HasContainer getIndexKey(final Class<? extends Element> indexedClass) {
final Set<String> indexedKeys = ((TinkerGraph)
this.getTraversal().getGraph().get()).getIndexedKeys(indexedClass);

final Iterator<HasContainer> itty =
IteratorUtils.filter(hasContainers.iterator(),
c -> c.getPredicate().getBiPredicate() == Compare.eq &&
indexedKeys.contains(c.getKey()));
return itty.hasNext() ? itty.next() : null;
}

@Override
public String toString() {
if (this.hasContainers.isEmpty())
return super.toString();
else
return 0 == this.ids.length ?
StringFactory.stepString(this,
this.returnClass.getSimpleName().toLowerCase(), this.hasContainers) :
StringFactory.stepString(this,
this.returnClass.getSimpleName().toLowerCase(), Arrays.toString(this.ids),
this.hasContainers);
}

//把list转为Iterator. 为了实现获得通用迭代器从而处理
private <E extends Element> Iterator<E> iteratorList(final Iterator<E> iterator) {
final List<E> list = new ArrayList<>();
while (iterator.hasNext()) {
final E e = iterator.next();
if (HasContainer.testAll(e, this.hasContainers))
list.add(e);
}
return list.iterator();
}

@Override
public List<HasContainer> getHasContainers() {
return Collections.unmodifiableList(this.hasContainers);
}

@Override
public void addHasContainer(final HasContainer hasContainer) {
this.hasContainers.add(hasContainer);
}

@Override
public int hashCode() {
return super.hashCode() ^ this.hasContainers.hashCode();
}
}

0x02.Computer部分

A. 数据结构

先看看Memory这个属于Tinkerpop的接口定义:

“Memory”是一个图计算的基础数据结构,用于顶点间的信息交互. 此外,它也包含当前图计算的全局实时状态信息/迭代信息.

图计算的过程基本都是并行的,所以它拥有的方法实现都是需要基于并发角度考虑的.

然后Admin接口继承了Memory,定义是图计算中修改Memory的方法.

“The developer should never need to type-cast the provided Memory to Memory.Admin”这句话我没正确翻译出来… 它有如下四个方法

1
2
3
4
5
> 	default void incrIteration() {this.setIteration(this.getIteration() + 1);}
> void setIteration(final int iteration);
> void setRuntime(final long runtime);
> default Memory asImmutable() {return new ImmutableMemory(this);}
>

对应有一个TinkerMemory实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public final class TinkerMemory implements Memory.Admin {

public final Map<String, MemoryComputeKey> memoryKeys = new HashMap<>();
public Map<String, Object> previousMap;
public Map<String, Object> currentMap;
private final AtomicInteger iteration = new AtomicInteger(0);
private final AtomicLong runtime = new AtomicLong(0l);
private boolean inExecute = false;

public TinkerMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers) {
this.currentMap = new ConcurrentHashMap<>();
this.previousMap = new ConcurrentHashMap<>();
if (null != vertexProgram) {
for (final MemoryComputeKey memoryComputeKey : vertexProgram.getMemoryComputeKeys()) {
this.memoryKeys.put(memoryComputeKey.getKey(), memoryComputeKey);
}
}
for (final MapReduce mapReduce : mapReducers) {
this.memoryKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), Operator.assign, false, false));
}
}

@Override
public Set<String> keys() {
return this.previousMap.keySet().stream().filter(key -> !this.inExecute || this.memoryKeys.get(key).isBroadcast()).collect(Collectors.toSet());
}

@Override
public void incrIteration() {
this.iteration.getAndIncrement();
}

@Override
public void setIteration(final int iteration) {
this.iteration.set(iteration);
}

@Override
public int getIteration() {
return this.iteration.get();
}

@Override
public void setRuntime(final long runTime) {
this.runtime.set(runTime);
}

@Override
public long getRuntime() {
return this.runtime.get();
}

protected void complete() {
this.iteration.decrementAndGet();
this.previousMap = this.currentMap;
this.memoryKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(computeKey -> this.previousMap.remove(computeKey.getKey()));
}

protected void completeSubRound() {
this.previousMap = new ConcurrentHashMap<>(this.currentMap);
this.inExecute = !this.inExecute;
}

@Override
public boolean isInitialIteration() {
return this.getIteration() == 0;
}

@Override
public <R> R get(final String key) throws IllegalArgumentException {
final R r = (R) this.previousMap.get(key);
if (null == r)
throw Memory.Exceptions.memoryDoesNotExist(key);
else if (this.inExecute && !this.memoryKeys.get(key).isBroadcast())
throw Memory.Exceptions.memoryDoesNotExist(key);
else
return r;
}

@Override
public void set(final String key, final Object value) {
checkKeyValue(key, value);
if (this.inExecute)
throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
this.currentMap.put(key, value);
}

@Override
public void add(final String key, final Object value) {
checkKeyValue(key, value);
if (!this.inExecute)
throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute(key);
this.currentMap.compute(key, (k, v) -> null == v ? value : this.memoryKeys.get(key).getReducer().apply(v, value));
}

@Override
public String toString() {
return StringFactory.memoryString(this);
}

private void checkKeyValue(final String key, final Object value) {
if (!this.memoryKeys.containsKey(key))
throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
MemoryHelper.validateValue(value);
}
}

B. 资源池

与此相类似的还有一个TinkerWorkerPool, 应该是计算过程中用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public final class TinkerWorkerPool implements AutoCloseable {

private static final BasicThreadFactory THREAD_FACTORY_WORKER = new BasicThreadFactory.Builder().namingPattern("tinker-worker-%d").build();

private final int numberOfWorkers;
private final ExecutorService workerPool;
private final CompletionService<Object> completionService;

private VertexProgramPool vertexProgramPool;
private MapReducePool mapReducePool;

public TinkerWorkerPool(final int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
this.completionService = new ExecutorCompletionService<>(this.workerPool);
}

public void setVertexProgram(final VertexProgram vertexProgram) {
this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.numberOfWorkers);
}

public void setMapReduce(final MapReduce mapReduce) {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}

public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final VertexProgram vp = this.vertexProgramPool.take();
worker.accept(vp);
this.vertexProgramPool.offer(vp);
return null;
});
}
for (int i = 0; i < this.numberOfWorkers; i++) {
try {
this.completionService.take().get();
} catch (InterruptedException ie) {
throw ie;
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}

public void executeMapReduce(final Consumer<MapReduce> worker) throws InterruptedException {
for (int i = 0; i < this.numberOfWorkers; i++) {
this.completionService.submit(() -> {
final MapReduce mr = this.mapReducePool.take();
worker.accept(mr);
this.mapReducePool.offer(mr);
return null;
});
}
for (int i = 0; i < this.numberOfWorkers; i++) {
try {
this.completionService.take().get();
} catch (InterruptedException ie) {
throw ie;
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}

public void closeNow() throws Exception {
this.workerPool.shutdownNow();
}

@Override
public void close() throws Exception {
this.workerPool.shutdown();
}
}

C.图计算核心实现:

分为实现GraphComputer接口的实现类,和单独的view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
public final class TinkerGraphComputer implements GraphComputer {
//初始化图遍历策略
static {
//注意,就算是in-memory模式来进行图计算, 图过滤器的代价仍是很大的.
TraversalStrategies.GlobalCache.registerStrategies(TinkerGraphComputer.class,
TraversalStrategies.GlobalCache.getStrategies(GraphComputer.class).clone().removeSt
rategies(GraphFilterStrategy.class));
}

private ResultGraph resultGraph = null;
private Persist persist = null;
//需要重点关注一下VertexProgram这个类,它在图计算中作为一个容器形式存在?
private VertexProgram<?> vertexProgram;
private final TinkerGraph graph;
private TinkerMemory memory;
private final TinkerMessageBoard messageBoard = new TinkerMessageBoard();
private boolean executed = false;
private final Set<MapReduce> mapReducers = new HashSet<>();
private int workers = Runtime.getRuntime().availableProcessors();
//这是简单的图过滤器举例,在JanusGraph中,图的过滤器和策略非常之多.
private final GraphFilter graphFilter = new GraphFilter();

private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(TinkerGraphComputer.class.getSimpleName() + "-boss").build();

/**
* An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once
* for a {@link VertexProgram} a single threaded executor is sufficient.
*/
private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss);

public TinkerGraphComputer(final TinkerGraph graph) {
this.graph = graph;
}

@Override
public GraphComputer result(final ResultGraph resultGraph) {
this.resultGraph = resultGraph;
return this;
}

@Override
public GraphComputer persist(final Persist persist) {
this.persist = persist;
return this;
}

@Override
public GraphComputer program(final VertexProgram vertexProgram) {
this.vertexProgram = vertexProgram;
return this;
}

@Override
public GraphComputer mapReduce(final MapReduce mapReduce) {
this.mapReducers.add(mapReduce);
return this;
}

@Override
public GraphComputer workers(final int workers) {
this.workers = workers;
return this;
}

@Override
public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
this.graphFilter.setVertexFilter(vertexFilter);
return this;
}

@Override
public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
this.graphFilter.setEdgeFilter(edgeFilter);
return this;
}

@Override
public Future<ComputerResult> submit() {
//一次图计算,只能被执行一次
if (this.executed)
throw Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
else
this.executed = true;
// 如果没有VertexProgram或者MR对象,不可能开始图计算 (MR/Spark等是一个算子)
if (null == this.vertexProgram && this.mapReducers.isEmpty())
throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
// 在没有VertexProgram的情况下,也能裸跑MR任务
if (null != this.vertexProgram) {
GraphComputerHelper.validateProgramOnComputer(this, this.vertexProgram);
this.mapReducers.addAll(this.vertexProgram.getMapReducers());
}
// 获得计算后的图对象, 持久化存储
this.resultGraph =
GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram),
Optional.ofNullable(this.resultGraph));
this.persist =
GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram),
Optional.ofNullable(this.persist));

if (!this.features().supportsResultGraphPersistCombination(this.resultGraph,
this.persist))throw
GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraph
, this.persist);
// 确保请求的worker比支持的数多
if (this.workers > this.features().getMaxWorkers())
throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());

// 初始化Memory对象.准备开始提交任务了...
this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
return computerService.submit(() -> {
final long time = System.currentTimeMillis();
final TinkerGraphComputerView view;
final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
try {
if (null != this.vertexProgram) {
view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
// 执行vertex program
this.vertexProgram.setup(this.memory);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
this.memory.completeSubRound();
workers.setVertexProgram(this.vertexProgram);
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
workers.executeVertexProgram(vertexProgram -> {
vertexProgram.workerIterationStart(this.memory.asImmutable());
while (true) {
final Vertex vertex = vertices.next();
if (Thread.interrupted()) throw new TraversalInterruptedException();
if (null == vertex) break;
vertexProgram.execute(
ComputerGraph.vertexProgram(vertex, vertexProgram),
new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
this.memory
);
}
vertexProgram.workerIterationEnd(this.memory.asImmutable());
});
this.messageBoard.completeIteration();
this.memory.completeSubRound();
if (this.vertexProgram.terminate(this.memory)) {
this.memory.incrIteration();
break;
} else {
this.memory.incrIteration();
}
}
view.complete(); //丢掉所有 transient vertex compute keys
} else {
//仅MR
view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet());
}

//执行MR任务
for (final MapReduce mapReduce : mapReducers) {
final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
workers.setMapReduce(mapReduce);
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.MAP);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
final Vertex vertex = vertices.next();
if (null == vertex) break;
workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
}
workerMapReduce.workerEnd(MapReduce.Stage.MAP);
});
//如果map的输出排序有确定,则对结果排序
mapEmitter.complete(mapReduce);
// 无需运行combiners如果是单点机器
if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
workers.executeMapReduce(workerMapReduce -> {
workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
while (true) {
if (Thread.interrupted()) throw new TraversalInterruptedException();
final Map.Entry<?, Queue<?>> entry = keyValues.next();
if (null == entry) break;
workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
}
workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
});
reduceEmitter.complete(mapReduce); ///如果reduce的输出排序有确定,则对结果排序
mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
} else {
mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
}
}
// 更新运行时态并返回新的图实例
this.memory.setRuntime(System.currentTimeMillis() - time);
this.memory.complete(); // drop all transient properties and set iteration
// determine the resultant graph based on the result graph/persist state
final Graph resultGraph = view.processResultGraphPersist(this.resultGraph, this.persist);
TinkerHelper.dropGraphComputerView(this.graph); // drop the view from the original source graph
return new DefaultComputerResult(resultGraph, this.memory.asImmutable());
} catch (InterruptedException ie) {
workers.closeNow();
throw new TraversalInterruptedException();
} catch (Exception ex) {
workers.closeNow();
throw new RuntimeException(ex);
} finally {
workers.close();
}
});
}

@Override
public String toString() {
return StringFactory.graphComputerString(this);
}

private static class SynchronizedIterator<V> {

private final Iterator<V> iterator;

public SynchronizedIterator(final Iterator<V> iterator) {
this.iterator = iterator;
}

public synchronized V next() {
return this.iterator.hasNext() ? this.iterator.next() : null;
}
}

@Override
public Features features() {
return new Features() {

@Override
public int getMaxWorkers() {
return Runtime.getRuntime().availableProcessors();
}

@Override
public boolean supportsVertexAddition() {
return false;
}

@Override
public boolean supportsVertexRemoval() {
return false;
}

@Override
public boolean supportsVertexPropertyRemoval() {
return false;
}

@Override
public boolean supportsEdgeAddition() {
return false;
}

@Override
public boolean supportsEdgeRemoval() {
return false;
}

@Override
public boolean supportsEdgePropertyAddition() {
return false;
}

@Override
public boolean supportsEdgePropertyRemoval() {
return false;
}
};
}
}

//=------------------------------分割线--------------------------
public final class TinkerGraphComputerView {

private final TinkerGraph graph;
protected final Map<String, VertexComputeKey> computeKeys;
private Map<Element, Map<String, List<VertexProperty<?>>>> computeProperties;
private final Set<Object> legalVertices = new HashSet<>();
private final Map<Object, Set<Object>> legalEdges = new HashMap<>();
private final GraphFilter graphFilter;

public TinkerGraphComputerView(final TinkerGraph graph, final GraphFilter graphFilter, final Set<VertexComputeKey> computeKeys) {
this.graph = graph;
this.computeKeys = new HashMap<>();
computeKeys.forEach(key -> this.computeKeys.put(key.getKey(), key));
this.computeProperties = new ConcurrentHashMap<>();
this.graphFilter = graphFilter;
if (this.graphFilter.hasFilter()) {
graph.vertices().forEachRemaining(vertex -> {
boolean legalVertex = false;
if (this.graphFilter.hasVertexFilter() && this.graphFilter.legalVertex(vertex)) {
this.legalVertices.add(vertex.id());
legalVertex = true;
}
if ((legalVertex || !this.graphFilter.hasVertexFilter()) && this.graphFilter.hasEdgeFilter()) {
final Set<Object> edges = new HashSet<>();
this.legalEdges.put(vertex.id(), edges);
this.graphFilter.legalEdges(vertex).forEachRemaining(edge -> edges.add(edge.id()));
}
});
}
}

public <V> Property<V> addProperty(final TinkerVertex vertex, final String key, final V value) {
ElementHelper.validateProperty(key, value);
if (isComputeKey(key)) {
final TinkerVertexProperty<V> property = new TinkerVertexProperty<V>((TinkerVertex) vertex, key, value) {
@Override
public void remove() {
removeProperty(vertex, key, this);
}
};
this.addValue(vertex, key, property);
return property;
} else {
throw GraphComputer.Exceptions.providedKeyIsNotAnElementComputeKey(key);
}
}

public List<VertexProperty<?>> getProperty(final TinkerVertex vertex, final String key) {
// if the vertex property is already on the vertex, use that.
final List<VertexProperty<?>> vertexProperty = this.getValue(vertex, key);
return vertexProperty.isEmpty() ? (List) TinkerHelper.getProperties(vertex).getOrDefault(key, Collections.emptyList()) : vertexProperty;
//return isComputeKey(key) ? this.getValue(vertex, key) : (List) TinkerHelper.getProperties(vertex).getOrDefault(key, Collections.emptyList());
}

public List<Property> getProperties(final TinkerVertex vertex) {
final Stream<Property> a = TinkerHelper.getProperties(vertex).values().stream().flatMap(list -> list.stream());
final Stream<Property> b = this.computeProperties.containsKey(vertex) ?
this.computeProperties.get(vertex).values().stream().flatMap(list -> list.stream()) :
Stream.empty();
return Stream.concat(a, b).collect(Collectors.toList());
}

public void removeProperty(final TinkerVertex vertex, final String key, final VertexProperty property) {
if (isComputeKey(key)) {
this.removeValue(vertex, key, property);
} else {
throw GraphComputer.Exceptions.providedKeyIsNotAnElementComputeKey(key);
}
}

public boolean legalVertex(final Vertex vertex) {
return !this.graphFilter.hasVertexFilter() || this.legalVertices.contains(vertex.id());
}

public boolean legalEdge(final Vertex vertex, final Edge edge) {
return !this.graphFilter.hasEdgeFilter() || this.legalEdges.get(vertex.id()).contains(edge.id());
}

protected void complete() {
// remove all transient properties from the vertices
for (final VertexComputeKey computeKey : this.computeKeys.values()) {
if (computeKey.isTransient()) {
final List<VertexProperty<?>> toRemove = this.computeProperties.values().stream().flatMap(map -> map.getOrDefault(computeKey.getKey(), Collections.emptyList()).stream()).collect(Collectors.toList());
toRemove.forEach(VertexProperty::remove);
}
}
}

//////////////////////

public Graph processResultGraphPersist(final GraphComputer.ResultGraph resultGraph,
final GraphComputer.Persist persist) {
if (GraphComputer.Persist.NOTHING == persist) {
if (GraphComputer.ResultGraph.ORIGINAL == resultGraph)
return this.graph;
else
return EmptyGraph.instance();
} else if (GraphComputer.Persist.VERTEX_PROPERTIES == persist) {
if (GraphComputer.ResultGraph.ORIGINAL == resultGraph) {
this.addPropertiesToOriginalGraph();
return this.graph;
} else {
final TinkerGraph newGraph = TinkerGraph.open();
this.graph.vertices().forEachRemaining(vertex -> {
final Vertex newVertex = newGraph.addVertex(T.id, vertex.id(), T.label, vertex.label());
vertex.properties().forEachRemaining(vertexProperty -> {
final VertexProperty<?> newVertexProperty = newVertex.property(VertexProperty.Cardinality.list, vertexProperty.key(), vertexProperty.value(), T.id, vertexProperty.id());
vertexProperty.properties().forEachRemaining(property -> {
newVertexProperty.property(property.key(), property.value());
});
});
});
return newGraph;
}
} else { // Persist.EDGES
if (GraphComputer.ResultGraph.ORIGINAL == resultGraph) {
this.addPropertiesToOriginalGraph();
return this.graph;
} else {
final TinkerGraph newGraph = TinkerGraph.open();
this.graph.vertices().forEachRemaining(vertex -> {
final Vertex newVertex = newGraph.addVertex(T.id, vertex.id(), T.label, vertex.label());
vertex.properties().forEachRemaining(vertexProperty -> {
final VertexProperty<?> newVertexProperty = newVertex.property(VertexProperty.Cardinality.list, vertexProperty.key(), vertexProperty.value(), T.id, vertexProperty.id());
vertexProperty.properties().forEachRemaining(property -> {
newVertexProperty.property(property.key(), property.value());
});
});
});
this.graph.edges().forEachRemaining(edge -> {
final Vertex outVertex = newGraph.vertices(edge.outVertex().id()).next();
final Vertex inVertex = newGraph.vertices(edge.inVertex().id()).next();
final Edge newEdge = outVertex.addEdge(edge.label(), inVertex, T.id, edge.id());
edge.properties().forEachRemaining(property -> newEdge.property(property.key(), property.value()));
});
return newGraph;
}
}
}

private void addPropertiesToOriginalGraph() {
TinkerHelper.dropGraphComputerView(this.graph);
this.computeProperties.forEach((element, properties) -> {
properties.forEach((key, vertexProperties) -> {
vertexProperties.forEach(vertexProperty -> {
final VertexProperty<?> newVertexProperty = ((Vertex) element).property(VertexProperty.Cardinality.list, vertexProperty.key(), vertexProperty.value(), T.id, vertexProperty.id());
vertexProperty.properties().forEachRemaining(property -> {
newVertexProperty.property(property.key(), property.value());
});
});
});
});
this.computeProperties.clear();
}

//////////////////////

private boolean isComputeKey(final String key) {
return this.computeKeys.containsKey(key);
}

private void addValue(final Vertex vertex, final String key, final VertexProperty property) {
final Map<String, List<VertexProperty<?>>> elementProperties = this.computeProperties.computeIfAbsent(vertex, k -> new ConcurrentHashMap<>());
elementProperties.compute(key, (k, v) -> {
if (null == v) v = Collections.synchronizedList(new ArrayList<>());
v.add(property);
return v;
});
}

private void removeValue(final Vertex vertex, final String key, final VertexProperty property) {
this.computeProperties.computeIfPresent(vertex, (k, v) -> {
v.computeIfPresent(key, (k1, v1) -> {
v1.remove(property);
return v1;
});
return v;
});
}

private List<VertexProperty<?>> getValue(final Vertex vertex, final String key) {
return this.computeProperties.getOrDefault(vertex, Collections.emptyMap()).getOrDefault(key, Collections.emptyList());
}
}

D.MR部分

然后有一个HashMap,链队列实现的TinkerMap/ReduceEmitter,基于MR?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public final class TinkerMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {

public Map<K, Queue<V>> reduceMap;
public Queue<KeyValue<K, V>> mapQueue;
private final boolean doReduce;

public TinkerMapEmitter(final boolean doReduce) {
this.doReduce = doReduce;
if (this.doReduce)
this.reduceMap = new ConcurrentHashMap<>();
else
this.mapQueue = new ConcurrentLinkedQueue<>();
}

@Override
public void emit(K key, V value) {
if (this.doReduce)
this.reduceMap.computeIfAbsent(key, k -> new ConcurrentLinkedQueue<>()).add(value);
else
this.mapQueue.add(new KeyValue<>(key, value));
}

protected void complete(final MapReduce<K, V, ?, ?, ?> mapReduce) {
if (!this.doReduce && mapReduce.getMapKeySort().isPresent()) {
final Comparator<K> comparator = mapReduce.getMapKeySort().get();
final List<KeyValue<K, V>> list = new ArrayList<>(this.mapQueue);
Collections.sort(list, Comparator.comparing(KeyValue::getKey, comparator));
this.mapQueue.clear();
this.mapQueue.addAll(list);
} else if (mapReduce.getMapKeySort().isPresent()) {
final Comparator<K> comparator = mapReduce.getMapKeySort().get();
final List<Map.Entry<K, Queue<V>>> list = new ArrayList<>();
list.addAll(this.reduceMap.entrySet());
Collections.sort(list, Comparator.comparing(Map.Entry::getKey, comparator));
this.reduceMap = new LinkedHashMap<>();
list.forEach(entry -> this.reduceMap.put(entry.getKey(), entry.getValue()));
}
}
}
//----------------------------------类分割线---------------------------------//
public final class TinkerReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {

protected Queue<KeyValue<OK, OV>> reduceQueue = new ConcurrentLinkedQueue<>();

@Override
public void emit(final OK key, final OV value) {
this.reduceQueue.add(new KeyValue<>(key, value));
}

protected void complete(final MapReduce<?, ?, OK, OV, ?> mapReduce) {
if (mapReduce.getReduceKeySort().isPresent()) {
final Comparator<OK> comparator = mapReduce.getReduceKeySort().get();
final List<KeyValue<OK, OV>> list = new ArrayList<>(this.reduceQueue);
Collections.sort(list, Comparator.comparing(KeyValue::getKey, comparator));
this.reduceQueue.clear();
this.reduceQueue.addAll(list);
}
}
}

TinkerMessenger实现了Messenger接口. Messenger接口的定义是:

待补充

实体类MessengerBoard ,和实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
final class TinkerMessageBoard<M> {

public Map<Vertex, Queue<M>> sendMessages = new ConcurrentHashMap<>();
public Map<Vertex, Queue<M>> receiveMessages = new ConcurrentHashMap<>();
public Set<MessageScope> previousMessageScopes = new HashSet<>();
public Set<MessageScope> currentMessageScopes = new HashSet<>();

public void completeIteration() {
this.receiveMessages = this.sendMessages;
this.sendMessages = new ConcurrentHashMap<>();
this.previousMessageScopes = this.currentMessageScopes;
this.currentMessageScopes = new HashSet<>();
}
}

public final class TinkerMessenger<M> implements Messenger<M> {

private final Vertex vertex;
private final TinkerMessageBoard<M> messageBoard;
private final MessageCombiner<M> combiner;

public TinkerMessenger(final Vertex vertex, final TinkerMessageBoard<M> messageBoard, final Optional<MessageCombiner<M>> combiner) {
this.vertex = vertex;
this.messageBoard = messageBoard;
this.combiner = combiner.isPresent() ? combiner.get() : null;
}

@Override
public Iterator<M> receiveMessages() {
final MultiIterator<M> multiIterator = new MultiIterator<>();
for (final MessageScope messageScope : this.messageBoard.previousMessageScopes) {
if (messageScope instanceof MessageScope.Local) {
final MessageScope.Local<M> localMessageScope = (MessageScope.Local<M>) messageScope;
final Traversal.Admin<Vertex, Edge> incidentTraversal = TinkerMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex);
final Direction direction = TinkerMessenger.getDirection(incidentTraversal);
final Edge[] edge = new Edge[1]; // simulates storage side-effects available in Gremlin, but not Java8 streams
multiIterator.addIterator(StreamSupport.stream(Spliterators.spliteratorUnknownSize(VertexProgramHelper.reverse(incidentTraversal.asAdmin()), Spliterator.IMMUTABLE | Spliterator.SIZED), false)
.map(e -> this.messageBoard.receiveMessages.get((edge[0] = e).vertices(direction).next()))
.filter(q -> null != q)
.flatMap(Queue::stream)
.map(message -> localMessageScope.getEdgeFunction().apply(message, edge[0]))
.iterator());

} else {
multiIterator.addIterator(Stream.of(this.vertex)
.map(this.messageBoard.receiveMessages::get)
.filter(q -> null != q)
.flatMap(Queue::stream)
.iterator());
}
}
return multiIterator;
}

@Override
public void sendMessage(final MessageScope messageScope, final M message) {
this.messageBoard.currentMessageScopes.add(messageScope);
if (messageScope instanceof MessageScope.Local) {
addMessage(this.vertex, message);
} else {
((MessageScope.Global) messageScope).vertices().forEach(v -> addMessage(v, message));
}
}

private void addMessage(final Vertex vertex, final M message) {
this.messageBoard.sendMessages.compute(vertex, (v, queue) -> {
if (null == queue) queue = new ConcurrentLinkedQueue<>();
queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
return queue;
});
}

///////////

private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal.Admin<Vertex, Edge> incidentTraversal, final Vertex vertex) {
incidentTraversal.addStart(incidentTraversal.getTraverserGenerator().generate(vertex,incidentTraversal.getStartStep(),1l));
return (T) incidentTraversal;
}

private static Direction getDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
return step.getDirection();
}
}