monotone

monotone Mtn Source Tree

Root/refiner.cc

1// Copyright (C) 2005 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <algorithm>
12#include <set>
13#include <utility>
14
15#include <boost/shared_ptr.hpp>
16
17#include "refiner.hh"
18#include "vocab.hh"
19#include "merkle_tree.hh"
20#include "netcmd.hh"
21
22using std::inserter;
23using std::make_pair;
24using std::set;
25using std::set_difference;
26using std::string;
27
28using boost::dynamic_bitset;
29
30// Our goal is to learn the complete set of items to send. To do this
31// we exchange two types of refinement commands: queries and responses.
32//
33// - On receiving a 'query' refinement for a node (p,l) you have:
34// - Compare the query node to your node (p,l), noting all the leaves
35// you must send as a result of what you learn in comparison.
36// - For each slot, if you have a subtree where the peer does not
37// (or you both do, and yours differs) send a sub-query for that
38// node, incrementing your query-in-flight counter.
39// - Send a 'response' refinement carrying your node (p,l)
40//
41// - On receiving a 'query' refinement for a node (p,l) you don't have:
42// - Send a 'response' refinement carrying an empty synthetic node (p,l)
43//
44// - On receiving a 'response' refinement for (p,l)
45// - Compare the query node to your node (p,l), noting all the leaves
46// you must send as a result of what you learn in comparison.
47// - Decrement your query-in-flight counter.
48//
49// The client kicks the process off by sending a query refinement for the
50// root node. When the client's query-in-flight counter drops to zero,
51// the client sends a done command, stating how many items it will be
52// sending.
53//
54// When the server receives a done command, it echoes it back stating how
55// many items *it* is going to send.
56//
57// When either side receives a done command, it transitions to
58// streaming send mode, sending all the items it's calculated.
59
60void
61refiner::note_local_item(id const & item)
62{
63 local_items.insert(item);
64 insert_into_merkle_tree(table, type, item, 0);
65}
66
67void
68refiner::reindex_local_items()
69{
70 recalculate_merkle_codes(table, prefix(""), 0);
71}
72
73void
74refiner::load_merkle_node(size_t level, prefix const & pref,
75 merkle_ptr & node)
76{
77 merkle_table::const_iterator j = table.find(make_pair(pref, level));
78 I(j != table.end());
79 node = j->second;
80}
81
82bool
83refiner::merkle_node_exists(size_t level,
84 prefix const & pref)
85{
86 merkle_table::const_iterator j = table.find(make_pair(pref, level));
87 return (j != table.end());
88}
89
90void
91refiner::calculate_items_to_send()
92{
93 if (calculated_items_to_send)
94 return;
95
96 items_to_send.clear();
97 items_to_receive = 0;
98
99 set_difference(local_items.begin(), local_items.end(),
100 peer_items.begin(), peer_items.end(),
101 inserter(items_to_send, items_to_send.begin()));
102
103 string typestr;
104 netcmd_item_type_to_string(type, typestr);
105
106 // L(FL("%s determined %d %s items to send")
107 // % voicestr() % items_to_send.size() % typestr);
108 calculated_items_to_send = true;
109}
110
111
112void
113refiner::send_subquery(merkle_node const & our_node, size_t slot)
114{
115 prefix subprefix;
116 our_node.extended_raw_prefix(slot, subprefix);
117 merkle_ptr our_subtree;
118 load_merkle_node(our_node.level + 1, subprefix, our_subtree);
119 // L(FL("%s queueing subquery on level %d\n") % voicestr() % (our_node.level + 1));
120 cb.queue_refine_cmd(refinement_query, *our_subtree);
121 ++queries_in_flight;
122}
123
124void
125refiner::send_synthetic_subquery(merkle_node const & our_node, size_t slot)
126{
127 id val;
128 size_t subslot;
129 dynamic_bitset<unsigned char> subprefix;
130
131 our_node.get_raw_slot(slot, val);
132 pick_slot_and_prefix_for_value(val, our_node.level + 1, subslot, subprefix);
133
134 merkle_node synth_node;
135 synth_node.pref = subprefix;
136 synth_node.level = our_node.level + 1;
137 synth_node.type = our_node.type;
138 synth_node.set_raw_slot(subslot, val);
139 synth_node.set_slot_state(subslot, our_node.get_slot_state(slot));
140
141 // L(FL("%s queueing synthetic subquery on level %d\n") % voicestr() % (our_node.level + 1));
142 cb.queue_refine_cmd(refinement_query, synth_node);
143 ++queries_in_flight;
144}
145
146void
147refiner::note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot)
148{
149 prefix pref;
150 our_node.extended_raw_prefix(slot, pref);
151 collect_items_in_subtree(table, pref, our_node.level+1, peer_items);
152}
153
154refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb)
155 : type(type), voice (voice), cb(cb),
156 sent_initial_query(false),
157 queries_in_flight(0),
158 calculated_items_to_send(false),
159 done(false),
160 items_to_receive(0),
161 min_items_to_receive(0),
162 may_receive_more_than_min(false)
163{
164 merkle_ptr root = merkle_ptr(new merkle_node());
165 root->type = type;
166 table.insert(make_pair(make_pair(prefix(""), 0), root));
167}
168
169void
170refiner::note_item_in_peer(merkle_node const & their_node, size_t slot)
171{
172 I(slot < constants::merkle_num_slots);
173 id slotval;
174 their_node.get_raw_slot(slot, slotval);
175 peer_items.insert(slotval);
176
177 // Write a debug message
178 /*
179 {
180 id slotval;
181 their_node.get_raw_slot(slot, slotval);
182
183 hexenc<prefix> hpref;
184 their_node.get_hex_prefix(hpref);
185
186 string typestr;
187 netcmd_item_type_to_string(their_node.type, typestr);
188
189 L(FL("%s's peer has %s '%s' at slot %d (in node '%s', level %d)")
190 % voicestr() % typestr % slotval
191 % slot % hpref % their_node.level);
192 }
193 */
194}
195
196
197void
198refiner::begin_refinement()
199{
200 merkle_ptr root;
201 load_merkle_node(0, prefix(""), root);
202 // L(FL("%s queueing initial node\n") % voicestr());
203 cb.queue_refine_cmd(refinement_query, *root);
204 ++queries_in_flight;
205 sent_initial_query = true;
206 string typestr;
207 netcmd_item_type_to_string(type, typestr);
208 L(FL("Beginning %s refinement on %s.") % typestr % voicestr());
209}
210
211void
212refiner::process_done_command(size_t n_items)
213{
214 string typestr;
215 netcmd_item_type_to_string(type, typestr);
216
217 calculate_items_to_send();
218 items_to_receive = n_items;
219
220 L(FL("%s finished %s refinement: %d to send, %d to receive")
221 % voicestr() % typestr % items_to_send.size() % items_to_receive);
222
223 /*
224 if (local_items.size() < 25)
225 {
226 // Debugging aid.
227 L(FL("+++ %d items in %s") % local_items.size() % voicestr());
228 for (set<id>::const_iterator i = local_items.begin();
229 i != local_items.end(); ++i)
230 {
231 L(FL("%s item %s") % voicestr() % *i);
232 }
233 L(FL("--- items in %s") % voicestr());
234 }
235 */
236
237 if (voice == server_voice)
238 {
239 // L(FL("server responding to [done %s %d] with [done %s %d]")
240 // % typestr % n_items % typestr % items_to_send.size());
241 cb.queue_done_cmd(type, items_to_send.size());
242 }
243
244 done = true;
245
246 // we can clear up the merkle trie's memory now
247 table.clear();
248}
249
250
251void
252refiner::process_refinement_command(refinement_type ty,
253 merkle_node const & their_node)
254{
255 prefix pref;
256 hexenc<prefix> hpref;
257 their_node.get_raw_prefix(pref);
258 their_node.get_hex_prefix(hpref);
259 string typestr;
260
261 netcmd_item_type_to_string(their_node.type, typestr);
262 size_t lev = static_cast<size_t>(their_node.level);
263
264 // L(FL("%s received refinement %s netcmd on %s node '%s', level %d") %
265 // voicestr() % (ty == refinement_query ? "query" : "response") %
266 // typestr % hpref % lev);
267
268 merkle_ptr our_node;
269
270 if (merkle_node_exists(their_node.level, pref))
271 load_merkle_node(their_node.level, pref, our_node);
272 else
273 {
274 // Synthesize empty node if we don't have one.
275 our_node = merkle_ptr(new merkle_node);
276 our_node->pref = their_node.pref;
277 our_node->level = their_node.level;
278 our_node->type = their_node.type;
279 }
280
281 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
282 {
283 // Note any leaves they have.
284 if (their_node.get_slot_state(slot) == leaf_state)
285 note_item_in_peer(their_node, slot);
286
287 // esimate how many items to receive
288 if (their_node.get_slot_state(slot) != empty_state
289 && our_node->get_slot_state(slot) != subtree_state)
290 {
291 if (our_node->get_slot_state(slot) == empty_state)
292 {
293 if (their_node.get_slot_state(slot) == leaf_state)
294 ++min_items_to_receive;
295 else
296 {
297 min_items_to_receive += 2;
298 may_receive_more_than_min = true;
299 }
300 }
301 else if (their_node.get_slot_state(slot) == leaf_state)
302 {
303 // pair of leaves
304 id our_slotval, their_slotval;
305 their_node.get_raw_slot(slot, their_slotval);
306 our_node->get_raw_slot(slot, our_slotval);
307 if (our_slotval != their_slotval)
308 ++min_items_to_receive;
309 }
310 // else they have a tree and we have a leaf, in which
311 // case there will be more queries
312 }
313
314 if (ty == refinement_query)
315 {
316 // This block handles the interesting asymmetric cases of subtree
317 // vs. leaf.
318 //
319 // Note that in general we're not allowed to send a new query
320 // packet when we're looking at a response. This wrinkle is both
321 // why this block appears to do slightly more work than necessary,
322 // and why it's predicated on "ty == refinement_query". More detail
323 // in the cases below.
324
325 if (their_node.get_slot_state(slot) == leaf_state
326 && our_node->get_slot_state(slot) == subtree_state)
327 {
328 // If they have a leaf and we have a subtree, we need to look
329 // in our subtree to find if their leaf is present, and send
330 // them a "query" that will inform them, in passing, of the
331 // presence of our node.
332
333 id their_slotval;
334 their_node.get_raw_slot(slot, their_slotval);
335 size_t snum;
336 merkle_ptr mp;
337 if (locate_item(table, their_slotval, snum, mp))
338 {
339 cb.queue_refine_cmd(refinement_query, *mp);
340 ++queries_in_flight;
341 }
342
343 }
344
345 else if (their_node.get_slot_state(slot) == subtree_state
346 && our_node->get_slot_state(slot) == leaf_state)
347 {
348 // If they have a subtree and we have a leaf, we need to
349 // arrange for a subquery to explore the subtree looking for
350 // the leaf in *their* subtree. The tricky part is that we
351 // cannot have this subquery triggered by our response
352 // packet. We need to initiate a new (redundant) query here to
353 // prompt our peer to explore the subtree.
354 //
355 // This is purely for the sake of balancing the bracketing of
356 // queries and responses: if they were to reply to our
357 // response packet, our query-in-flight counter would have
358 // temporarily dropped to zero and we'd have initiated
359 // streaming send mode.
360 //
361 // Yes, the need to invert the sense of queries in this case
362 // represents a misdesign in this generation of the netsync
363 // protocol. It still contains much less hair than it used to,
364 // so I'm willing to accept it.
365
366 send_synthetic_subquery(*our_node, slot);
367 }
368
369 // Finally: if they had an empty slot in either case, there's no
370 // subtree exploration to perform; the response packet will inform
371 // the peer of everything relevant know about this node: namely
372 // that they're going to receive a complete subtree, we know
373 // what's in it, and we'll tell them how many nodes to expect in
374 // the aggregate count of the 'done' commane.
375
376 }
377
378 // Compare any subtrees, if we both have subtrees.
379 if (their_node.get_slot_state(slot) == subtree_state
380 && our_node->get_slot_state(slot) == subtree_state)
381 {
382 id our_slotval, their_slotval;
383 their_node.get_raw_slot(slot, their_slotval);
384 our_node->get_raw_slot(slot, our_slotval);
385
386 // Always note when you share a subtree.
387 if (their_slotval == our_slotval)
388 note_subtree_shared_with_peer(*our_node, slot);
389
390 // Send subqueries when you have a different subtree
391 // and you're answering a query message.
392 else if (ty == refinement_query)
393 send_subquery(*our_node, slot);
394 }
395 }
396
397 if (ty == refinement_response)
398 {
399 E((queries_in_flight > 0), origin::network,
400 F("underflow on query-in-flight counter"));
401 --queries_in_flight;
402
403 // Possibly this signals the end of refinement.
404 if (voice == client_voice && queries_in_flight == 0)
405 {
406 string typestr;
407 netcmd_item_type_to_string(their_node.type, typestr);
408 calculate_items_to_send();
409 // L(FL("client sending [done %s %d]") % typestr % items_to_send.size());
410 cb.queue_done_cmd(type, items_to_send.size());
411 }
412 }
413 else
414 {
415 // Always reply to every query with the current node.
416 I(ty == refinement_query);
417 // L(FL("%s queueing response to query on %d\n") % voicestr() % our_node->level);
418 cb.queue_refine_cmd(refinement_response, *our_node);
419 }
420}
421
422
423// Local Variables:
424// mode: C++
425// fill-column: 76
426// c-file-style: "gnu"
427// indent-tabs-mode: nil
428// End:
429// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status