monotone

monotone Mtn Source Tree

Root/refiner.cc

1// Copyright (C) 2005 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <algorithm>
12#include <set>
13#include <utility>
14
15#include <boost/shared_ptr.hpp>
16
17#include "refiner.hh"
18#include "vocab.hh"
19#include "merkle_tree.hh"
20#include "netcmd.hh"
21
22using std::inserter;
23using std::make_pair;
24using std::set;
25using std::set_difference;
26using std::string;
27
28using boost::dynamic_bitset;
29
30// Our goal is to learn the complete set of items to send. To do this
31// we exchange two types of refinement commands: queries and responses.
32//
33// - On receiving a 'query' refinement for a node (p,l) you have:
34// - Compare the query node to your node (p,l), noting all the leaves
35// you must send as a result of what you learn in comparison.
36// - For each slot, if you have a subtree where the peer does not
37// (or you both do, and yours differs) send a sub-query for that
38// node, incrementing your query-in-flight counter.
39// - Send a 'response' refinement carrying your node (p,l)
40//
41// - On receiving a 'query' refinement for a node (p,l) you don't have:
42// - Send a 'response' refinement carrying an empty synthetic node (p,l)
43//
44// - On receiving a 'response' refinement for (p,l)
45// - Compare the query node to your node (p,l), noting all the leaves
46// you must send as a result of what you learn in comparison.
47// - Decrement your query-in-flight counter.
48//
49// The client kicks the process off by sending a query refinement for the
50// root node. When the client's query-in-flight counter drops to zero,
51// the client sends a done command, stating how many items it will be
52// sending.
53//
54// When the server receives a done command, it echoes it back stating how
55// many items *it* is going to send.
56//
57// When either side receives a done command, it transitions to
58// streaming send mode, sending all the items it's calculated.
59
60void
61refiner::note_local_item(id const & item)
62{
63 local_items.insert(item);
64 insert_into_merkle_tree(table, type, item, 0);
65}
66
67void
68refiner::reindex_local_items()
69{
70 recalculate_merkle_codes(table, prefix(""), 0);
71}
72
73void
74refiner::load_merkle_node(size_t level, prefix const & pref,
75 merkle_ptr & node)
76{
77 merkle_table::const_iterator j = table.find(make_pair(pref, level));
78 I(j != table.end());
79 node = j->second;
80}
81
82bool
83refiner::merkle_node_exists(size_t level,
84 prefix const & pref)
85{
86 merkle_table::const_iterator j = table.find(make_pair(pref, level));
87 return (j != table.end());
88}
89
90void
91refiner::calculate_items_to_send()
92{
93 if (calculated_items_to_send)
94 return;
95
96 items_to_send.clear();
97 items_to_receive = 0;
98
99 set_difference(local_items.begin(), local_items.end(),
100 peer_items.begin(), peer_items.end(),
101 inserter(items_to_send, items_to_send.begin()));
102
103 string typestr;
104 netcmd_item_type_to_string(type, typestr);
105
106 // L(FL("%s determined %d %s items to send")
107 // % voicestr() % items_to_send.size() % typestr);
108 calculated_items_to_send = true;
109}
110
111
112void
113refiner::send_subquery(merkle_node const & our_node, size_t slot)
114{
115 prefix subprefix;
116 our_node.extended_raw_prefix(slot, subprefix);
117 merkle_ptr our_subtree;
118 load_merkle_node(our_node.level + 1, subprefix, our_subtree);
119 // L(FL("%s queueing subquery on level %d\n") % voicestr() % (our_node.level + 1));
120 cb.queue_refine_cmd(refinement_query, *our_subtree);
121 ++queries_in_flight;
122}
123
124void
125refiner::send_synthetic_subquery(merkle_node const & our_node, size_t slot)
126{
127 id val;
128 size_t subslot;
129 dynamic_bitset<unsigned char> subprefix;
130
131 our_node.get_raw_slot(slot, val);
132 pick_slot_and_prefix_for_value(val, our_node.level + 1, subslot, subprefix);
133
134 merkle_node synth_node;
135 synth_node.pref = subprefix;
136 synth_node.level = our_node.level + 1;
137 synth_node.type = our_node.type;
138 synth_node.set_raw_slot(subslot, val);
139 synth_node.set_slot_state(subslot, our_node.get_slot_state(slot));
140
141 // L(FL("%s queueing synthetic subquery on level %d\n") % voicestr() % (our_node.level + 1));
142 cb.queue_refine_cmd(refinement_query, synth_node);
143 ++queries_in_flight;
144}
145
146void
147refiner::note_subtree_shared_with_peer(merkle_node const & our_node, size_t slot)
148{
149 prefix pref;
150 our_node.extended_raw_prefix(slot, pref);
151 collect_items_in_subtree(table, pref, our_node.level+1, peer_items);
152}
153
154refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb)
155 : type(type), voice (voice), cb(cb),
156 sent_initial_query(false),
157 queries_in_flight(0),
158 calculated_items_to_send(false),
159 done(false),
160 items_to_receive(0)
161{
162 merkle_ptr root = merkle_ptr(new merkle_node());
163 root->type = type;
164 table.insert(make_pair(make_pair(prefix(""), 0), root));
165}
166
167void
168refiner::note_item_in_peer(merkle_node const & their_node, size_t slot)
169{
170 I(slot < constants::merkle_num_slots);
171 id slotval;
172 their_node.get_raw_slot(slot, slotval);
173 peer_items.insert(slotval);
174
175 // Write a debug message
176 /*
177 {
178 hexenc<id> hslotval;
179 their_node.get_hex_slot(slot, hslotval);
180
181 hexenc<prefix> hpref;
182 their_node.get_hex_prefix(hpref);
183
184 string typestr;
185 netcmd_item_type_to_string(their_node.type, typestr);
186
187 L(FL("%s's peer has %s '%s' at slot %d (in node '%s', level %d)")
188 % voicestr() % typestr % hslotval % slot % hpref % their_node.level);
189 }
190 */
191}
192
193
194void
195refiner::begin_refinement()
196{
197 merkle_ptr root;
198 load_merkle_node(0, prefix(""), root);
199 // L(FL("%s queueing initial node\n") % voicestr());
200 cb.queue_refine_cmd(refinement_query, *root);
201 ++queries_in_flight;
202 sent_initial_query = true;
203 string typestr;
204 netcmd_item_type_to_string(type, typestr);
205 L(FL("Beginning %s refinement on %s.") % typestr % voicestr());
206}
207
208void
209refiner::process_done_command(size_t n_items)
210{
211 string typestr;
212 netcmd_item_type_to_string(type, typestr);
213
214 calculate_items_to_send();
215 items_to_receive = n_items;
216
217 L(FL("%s finished %s refinement: %d to send, %d to receive")
218 % voicestr() % typestr % items_to_send.size() % items_to_receive);
219
220 /*
221 if (local_items.size() < 25)
222 {
223 // Debugging aid.
224 L(FL("+++ %d items in %s") % local_items.size() % voicestr());
225 for (set<id>::const_iterator i = local_items.begin();
226 i != local_items.end(); ++i)
227 {
228 hexenc<id> hid;
229 encode_hexenc(*i, hid);
230 L(FL("%s item %s") % voicestr() % hid);
231 }
232 L(FL("--- items in %s") % voicestr());
233 }
234 */
235
236 if (voice == server_voice)
237 {
238 // L(FL("server responding to [done %s %d] with [done %s %d]")
239 // % typestr % n_items % typestr % items_to_send.size());
240 cb.queue_done_cmd(type, items_to_send.size());
241 }
242
243 done = true;
244
245 // we can clear up the merkle trie's memory now
246 table.clear();
247}
248
249
250void
251refiner::process_refinement_command(refinement_type ty,
252 merkle_node const & their_node)
253{
254 prefix pref;
255 hexenc<prefix> hpref;
256 their_node.get_raw_prefix(pref);
257 their_node.get_hex_prefix(hpref);
258 string typestr;
259
260 netcmd_item_type_to_string(their_node.type, typestr);
261 size_t lev = static_cast<size_t>(their_node.level);
262
263 // L(FL("%s received refinement %s netcmd on %s node '%s', level %d") %
264 // voicestr() % (ty == refinement_query ? "query" : "response") %
265 // typestr % hpref % lev);
266
267 merkle_ptr our_node;
268
269 if (merkle_node_exists(their_node.level, pref))
270 load_merkle_node(their_node.level, pref, our_node);
271 else
272 {
273 // Synthesize empty node if we don't have one.
274 our_node = merkle_ptr(new merkle_node);
275 our_node->pref = their_node.pref;
276 our_node->level = their_node.level;
277 our_node->type = their_node.type;
278 }
279
280 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
281 {
282 // Note any leaves they have.
283 if (their_node.get_slot_state(slot) == leaf_state)
284 note_item_in_peer(their_node, slot);
285
286 if (ty == refinement_query)
287 {
288 // This block handles the interesting asymmetric cases of subtree
289 // vs. leaf.
290 //
291 // Note that in general we're not allowed to send a new query
292 // packet when we're looking at a response. This wrinkle is both
293 // why this block appears to do slightly more work than necessary,
294 // and why it's predicated on "ty == refinement_query". More detail
295 // in the cases below.
296
297 if (their_node.get_slot_state(slot) == leaf_state
298 && our_node->get_slot_state(slot) == subtree_state)
299 {
300 // If they have a leaf and we have a subtree, we need to look
301 // in our subtree to find if their leaf is present, and send
302 // them a "query" that will inform them, in passing, of the
303 // presence of our node.
304
305 id their_slotval;
306 their_node.get_raw_slot(slot, their_slotval);
307 size_t snum;
308 merkle_ptr mp;
309 if (locate_item(table, their_slotval, snum, mp))
310 {
311 cb.queue_refine_cmd(refinement_query, *mp);
312 ++queries_in_flight;
313 }
314
315 }
316
317 else if (their_node.get_slot_state(slot) == subtree_state
318 && our_node->get_slot_state(slot) == leaf_state)
319 {
320 // If they have a subtree and we have a leaf, we need to
321 // arrange for a subquery to explore the subtree looking for
322 // the leaf in *their* subtree. The tricky part is that we
323 // cannot have this subquery triggered by our response
324 // packet. We need to initiate a new (redundant) query here to
325 // prompt our peer to explore the subtree.
326 //
327 // This is purely for the sake of balancing the bracketing of
328 // queries and responses: if they were to reply to our
329 // response packet, our query-in-flight counter would have
330 // temporarily dropped to zero and we'd have initiated
331 // streaming send mode.
332 //
333 // Yes, the need to invert the sense of queries in this case
334 // represents a misdesign in this generation of the netsync
335 // protocol. It still contains much less hair than it used to,
336 // so I'm willing to accept it.
337
338 send_synthetic_subquery(*our_node, slot);
339 }
340
341 // Finally: if they had an empty slot in either case, there's no
342 // subtree exploration to perform; the response packet will inform
343 // the peer of everything relevant know about this node: namely
344 // that they're going to receive a complete subtree, we know
345 // what's in it, and we'll tell them how many nodes to expect in
346 // the aggregate count of the 'done' commane.
347
348 }
349
350 // Compare any subtrees, if we both have subtrees.
351 if (their_node.get_slot_state(slot) == subtree_state
352 && our_node->get_slot_state(slot) == subtree_state)
353 {
354 id our_slotval, their_slotval;
355 their_node.get_raw_slot(slot, their_slotval);
356 our_node->get_raw_slot(slot, our_slotval);
357
358 // Always note when you share a subtree.
359 if (their_slotval == our_slotval)
360 note_subtree_shared_with_peer(*our_node, slot);
361
362 // Send subqueries when you have a different subtree
363 // and you're answering a query message.
364 else if (ty == refinement_query)
365 send_subquery(*our_node, slot);
366 }
367 }
368
369 if (ty == refinement_response)
370 {
371 E((queries_in_flight > 0),
372 F("underflow on query-in-flight counter"));
373 --queries_in_flight;
374
375 // Possibly this signals the end of refinement.
376 if (voice == client_voice && queries_in_flight == 0)
377 {
378 string typestr;
379 netcmd_item_type_to_string(their_node.type, typestr);
380 calculate_items_to_send();
381 // L(FL("client sending [done %s %d]") % typestr % items_to_send.size());
382 cb.queue_done_cmd(type, items_to_send.size());
383 }
384 }
385 else
386 {
387 // Always reply to every query with the current node.
388 I(ty == refinement_query);
389 // L(FL("%s queueing response to query on %d\n") % voicestr() % our_node->level);
390 cb.queue_refine_cmd(refinement_response, *our_node);
391 }
392}
393
394#ifdef BUILD_UNIT_TESTS
395#include "randomizer.hh"
396#include "unit_tests.hh"
397
398#include <deque>
399#include <boost/shared_ptr.hpp>
400
401using std::deque;
402using boost::shared_ptr;
403
404struct
405refiner_pair
406{
407 // This structure acts as a mock netsync session. It's only purpose is to
408 // construct two refiners that are connected to one another, and route
409 // refinement calls back and forth between them.
410
411 struct
412 refiner_pair_callbacks : refiner_callbacks
413 {
414 refiner_pair & p;
415 bool is_client;
416 refiner_pair_callbacks(refiner_pair & p, bool is_client)
417 : p(p), is_client(is_client)
418 {}
419
420 virtual void queue_refine_cmd(refinement_type ty,
421 merkle_node const & our_node)
422 {
423 p.events.push_back(shared_ptr<msg>(new msg(is_client, ty, our_node)));
424 }
425
426 virtual void queue_done_cmd(netcmd_item_type ty,
427 size_t n_items)
428 {
429 p.events.push_back(shared_ptr<msg>(new msg(is_client, n_items)));
430 }
431 virtual ~refiner_pair_callbacks() {}
432 };
433
434 refiner_pair_callbacks client_cb;
435 refiner_pair_callbacks server_cb;
436 refiner client;
437 refiner server;
438
439 struct msg
440 {
441 msg(bool is_client, refinement_type ty, merkle_node const & node)
442 : op(refine),
443 ty(ty),
444 send_to_client(!is_client),
445 node(node)
446 {}
447
448 msg(bool is_client, size_t items)
449 : op(done),
450 send_to_client(!is_client),
451 n_items(items)
452 {}
453
454 enum { refine, done } op;
455 refinement_type ty;
456 bool send_to_client;
457 size_t n_items;
458 merkle_node node;
459 };
460
461 deque<shared_ptr<msg> > events;
462 size_t n_msgs;
463
464 void crank()
465 {
466
467 shared_ptr<msg> m = events.front();
468 events.pop_front();
469 ++n_msgs;
470
471 switch (m->op)
472 {
473
474 case msg::refine:
475 if (m->send_to_client)
476 client.process_refinement_command(m->ty, m->node);
477 else
478 server.process_refinement_command(m->ty, m->node);
479 break;
480
481 case msg::done:
482 if (m->send_to_client)
483 client.process_done_command(m->n_items);
484 else
485 server.process_done_command(m->n_items);
486 break;
487 }
488 }
489
490 refiner_pair(set<id> const & client_items,
491 set<id> const & server_items) :
492 client_cb(*this, true),
493 server_cb(*this, false),
494 // The item type here really doesn't matter.
495 client(file_item, client_voice, client_cb),
496 server(file_item, server_voice, server_cb),
497 n_msgs(0)
498 {
499 for (set<id>::const_iterator i = client_items.begin();
500 i != client_items.end(); ++i)
501 client.note_local_item(*i);
502
503 for (set<id>::const_iterator i = server_items.begin();
504 i != server_items.end(); ++i)
505 server.note_local_item(*i);
506
507 client.reindex_local_items();
508 server.reindex_local_items();
509 client.begin_refinement();
510
511 while (! events.empty())
512 crank();
513
514 // Refinement should have completed by here.
515 UNIT_TEST_CHECK(client.done);
516 UNIT_TEST_CHECK(server.done);
517
518 check_set_differences("client", client);
519 check_set_differences("server", server);
520 check_no_redundant_sends("client->server",
521 client.items_to_send,
522 server.get_local_items());
523 check_no_redundant_sends("server->client",
524 server.items_to_send,
525 client.get_local_items());
526 UNIT_TEST_CHECK(client.items_to_send.size() == server.items_to_receive);
527 UNIT_TEST_CHECK(server.items_to_send.size() == client.items_to_receive);
528 L(FL("stats: %d total, %d cs, %d sc, %d msgs")
529 % (server.items_to_send.size() + client.get_local_items().size())
530 % client.items_to_send.size()
531 % server.items_to_send.size()
532 % n_msgs);
533 }
534
535 void print_if_unequal(char const * context,
536 char const * name1,
537 set<id> const & set1,
538 char const * name2,
539 set<id> const & set2)
540 {
541 if (set1 != set2)
542 {
543 L(FL("WARNING: Unequal sets in %s!") % context);
544 for (set<id>::const_iterator i = set1.begin(); i != set1.end(); ++i)
545 {
546 hexenc<id> hid;
547 encode_hexenc(*i, hid);
548 L(FL("%s: %s") % name1 % hid);
549 }
550
551 for (set<id>::const_iterator i = set2.begin(); i != set2.end(); ++i)
552 {
553 hexenc<id> hid;
554 encode_hexenc(*i, hid);
555 L(FL("%s: %s") % name2 % hid);
556 }
557 L(FL("end of unequal sets"));
558 }
559 }
560
561 void check_no_redundant_sends(char const * context,
562 set<id> const & src,
563 set<id> const & dst)
564 {
565 for (set<id>::const_iterator i = src.begin(); i != src.end(); ++i)
566 {
567 set<id>::const_iterator j = dst.find(*i);
568 if (j != dst.end())
569 {
570 hexenc<id> hid;
571 encode_hexenc(*i, hid);
572 L(FL("WARNING: %s transmission will send redundant item %s")
573 % context % hid);
574 }
575 UNIT_TEST_CHECK(j == dst.end());
576 }
577 }
578
579 void check_set_differences(char const * context, refiner const & r)
580 {
581 set<id> tmp;
582 set_difference(r.get_local_items().begin(), r.get_local_items().end(),
583 r.get_peer_items().begin(), r.get_peer_items().end(),
584 inserter(tmp, tmp.begin()));
585 print_if_unequal(context,
586 "diff(local,peer)", tmp,
587 "items_to_send", r.items_to_send);
588
589 UNIT_TEST_CHECK(tmp == r.items_to_send);
590 }
591};
592
593
594void
595check_combinations_of_sets(set<id> const & s0,
596 set<id> const & a,
597 set<id> const & b)
598{
599 // Having composed our two input sets s0 and s1, we now construct the 2
600 // auxilary union-combinations of them -- {} and {s0 U s1} -- giving 4
601 // basic input sets. We then run 9 "interesting" pairwise combinations
602 // of these input sets.
603
604 set<id> e, u, v;
605 set_union(s0.begin(), s0.end(), a.begin(), a.end(), inserter(u, u.begin()));
606 set_union(s0.begin(), s0.end(), b.begin(), b.end(), inserter(v, v.begin()));
607
608 { refiner_pair x(e, u); } // a large initial transfer
609 { refiner_pair x(u, e); } // a large initial transfer
610
611 { refiner_pair x(s0, u); } // a mostly-shared superset/subset
612 { refiner_pair x(u, s0); } // a mostly-shared superset/subset
613
614 { refiner_pair x(a, u); } // a mostly-unshared superset/subset
615 { refiner_pair x(u, a); } // a mostly-unshared superset/subset
616
617 { refiner_pair x(u, v); } // things to send in both directions
618 { refiner_pair x(v, u); } // things to send in both directions
619
620 { refiner_pair x(u, u); } // a large no-op
621}
622
623
624void
625build_random_set(set<id> & s, size_t sz, bool clumpy, randomizer & rng)
626{
627 while (s.size() < sz)
628 {
629 string str(constants::merkle_hash_length_in_bytes, ' ');
630 for (size_t i = 0; i < constants::merkle_hash_length_in_bytes; ++i)
631 str[i] = static_cast<char>(rng.uniform(0xff));
632 s.insert(id(str));
633 if (clumpy && rng.flip())
634 {
635 size_t clumpsz = rng.uniform(7) + 1;
636 size_t pos = rng.flip() ? str.size() - 1 : rng.uniform(str.size());
637 for (size_t i = 0; s.size() < sz && i < clumpsz; ++i)
638 {
639 char c = str[pos];
640 if (c == static_cast<char>(0xff))
641 break;
642 ++c;
643 str[pos] = c;
644 s.insert(id(str));
645 }
646 }
647 }
648}
649
650size_t
651perturbed(size_t n, randomizer & rng)
652{
653 // we sometimes perturb sizes to deviate a bit from natural word-multiple sizes
654 if (rng.flip())
655 return n + rng.uniform(5);
656 return n;
657}
658
659size_t
660modulated_size(size_t base_set_size, size_t i)
661{
662 if (i < 3)
663 return i+1;
664 else
665 return static_cast<size_t>((static_cast<double>(i - 2) / 5.0)
666 * static_cast<double>(base_set_size));
667}
668
669
670void
671check_with_count(size_t base_set_size, randomizer & rng)
672{
673 if (base_set_size == 0)
674 return;
675
676 L(FL("running refinement check with base set size %d") % base_set_size);
677
678 // Our goal here is to construct a base set of a given size, and two
679 // secondary sets which will be combined with the base set in various
680 // ways.
681 //
682 // The secondary sets will be built at the following sizes:
683 //
684 // 1 element
685 // 2 elements
686 // 3 elements
687 // 0.2 * size of base set
688 // 0.4 * size of base set
689 // 0.8 * size of base set
690 //
691 // The base set is constructed in both clumpy and non-clumpy forms,
692 // making 6 * 6 * 2 = 72 variations.
693 //
694 // Since each group of sets creates 9 sync scenarios, each "size" creates
695 // 648 sync scenarios.
696
697 for (size_t c = 0; c < 2; ++c)
698 {
699 set<id> s0;
700 build_random_set(s0, perturbed(base_set_size, rng), c == 0, rng);
701
702 for (size_t a = 0; a < 6; ++a)
703 {
704 set<id> sa;
705 build_random_set(sa, modulated_size(perturbed(base_set_size, rng), a), false, rng);
706
707 for (size_t b = 0; b < 6; ++b)
708 {
709 set<id> sb;
710 build_random_set(sb, modulated_size(perturbed(base_set_size, rng), b), false, rng);
711 check_combinations_of_sets(s0, sa, sb);
712 }
713 }
714 }
715}
716
717UNIT_TEST(refiner, various_counts)
718{
719 {
720 // Once with zero-zero, for good measure.
721 set<id> s0;
722 refiner_pair x(s0, s0);
723 }
724
725 // We run 3 primary counts, giving 1944 tests. Note that there is some
726 // perturbation within the test, so we're not likely to feel side effects
727 // of landing on such pleasant round numbers.
728
729 randomizer rng;
730 check_with_count(1, rng);
731 check_with_count(128, rng);
732 check_with_count(1024, rng);
733}
734
735#endif
736
737// Local Variables:
738// mode: C++
739// fill-column: 76
740// c-file-style: "gnu"
741// indent-tabs-mode: nil
742// End:
743// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status