monotone

monotone Mtn Source Tree

Root/netsync.cc

1// copyright (C) 2004 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6#include <map>
7#include <string>
8#include <memory>
9
10#include <time.h>
11
12#include <boost/lexical_cast.hpp>
13#include <boost/scoped_ptr.hpp>
14#include <boost/shared_ptr.hpp>
15
16#include "app_state.hh"
17#include "cert.hh"
18#include "constants.hh"
19#include "keys.hh"
20#include "merkle_tree.hh"
21#include "netcmd.hh"
22#include "netio.hh"
23#include "netsync.hh"
24#include "numeric_vocab.hh"
25#include "packet.hh"
26#include "sanity.hh"
27#include "transforms.hh"
28#include "ui.hh"
29#include "xdelta.hh"
30#include "epoch.hh"
31#include "platform.hh"
32
33#include "cryptopp/osrng.h"
34
35#include "netxx/address.h"
36#include "netxx/peer.h"
37#include "netxx/probe.h"
38#include "netxx/socket.h"
39#include "netxx/stream.h"
40#include "netxx/streamserver.h"
41#include "netxx/timeout.h"
42
43//
44// this is the "new" network synchronization (netsync) system in
45// monotone. it is based on synchronizing a pair of merkle trees over an
46// interactive connection.
47//
48// a netsync process between peers treats each peer as either a source, a
49// sink, or both. when a peer is only a source, it will not write any new
50// items to its database. when a peer is only a sink, it will not send any
51// items from its database. when a peer is both a source and sink, it may
52// send and write items freely.
53//
54// the post-state of a netsync is that each sink contains a superset of the
55// items in its corresponding source; when peers are behaving as both
56// source and sink, this means that the post-state of the sync is for the
57// peers to have identical item sets.
58//
59// a peer can be a sink in at most one netsync process at a time; it can
60// however be a source for multiple netsyncs simultaneously.
61//
62//
63// data structure
64// --------------
65//
66// each node in a merkle tree contains a fixed number of slots. this number
67// is derived from a global parameter of the protocol -- the tree fanout --
68// such that the number of slots is 2^fanout. for now we will assume that
69// fanout is 4 thus there are 16 slots in a node, because this makes
70// illustration easier. the other parameter of the protocol is the size of
71// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
72//
73// each slot in a merkle tree node is in one of 4 states:
74//
75// - empty
76// - live leaf
77// - dead leaf
78// - subtree
79//
80// in addition, each live or dead leaf contains a hash code which
81// identifies an element of the set being synchronized. each subtree slot
82// contains a hash code of the node immediately beneath it in the merkle
83// tree. empty slots contain no hash codes.
84//
85// each node also summarizes, for sake of statistic-gathering, the number
86// of set elements and total number of bytes in all of its subtrees, each
87// stored as a size_t and sent as a uleb128.
88//
89// since empty slots have no hash code, they are represented implicitly by
90// a bitmap at the head of each merkle tree node. as an additional
91// integrity check, each merkle tree node contains a label indicating its
92// prefix in the tree, and a hash of its own contents.
93//
94// in total, then, the byte-level representation of a <160,4> merkle tree
95// node is as follows:
96//
97// 20 bytes - hash of the remaining bytes in the node
98// 1 byte - type of this node (manifest, file, key, mcert, fcert)
99// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
100// 0-20 bytes - the prefix of this node, 4 bits * level,
101// rounded up to a byte
102// 1-N bytes - number of leaves under this node (uleb128)
103// 4 bytes - slot-state bitmap of the node
104// 0-320 bytes - between 0 and 16 live slots in the node
105//
106// so, in the worst case such a node is 367 bytes, with these parameters.
107//
108//
109// protocol
110// --------
111//
112// the protocol is a simple binary command-packet system over tcp; each
113// packet consists of a byte which identifies the protocol version, a byte
114// which identifies the command name inside that version, a size_t sent as
115// a uleb128 indicating the length of the packet, and then that many bytes
116// of payload, and finally 4 bytes of adler32 checksum (in LSB order) over
117// the payload. decoding involves simply buffering until a sufficient
118// number of bytes are received, then advancing the buffer pointer. any
119// time an adler32 check fails, the protocol is assumed to have lost
120// synchronization, and the connection is dropped. the parties are free to
121// drop the tcp stream at any point, if too much data is received or too
122// much idle time passes; no commitments or transactions are made.
123//
124// one special command, "bye", is used to shut down a connection
125// gracefully. once each side has received all the data they want, they
126// can send a "bye" command to the other side. as soon as either side has
127// both sent and received a "bye" command, they drop the connection. if
128// either side sees an i/o failure (dropped connection) after they have
129// sent a "bye" command, they consider the shutdown successful.
130//
131// the exchange begins in a non-authenticated state. the server sends a
132// "hello <id> <nonce>" command, which identifies the server's RSA key and
133// issues a nonce which must be used for a subsequent authentication.
134//
135// the client can then respond with an "auth (source|sink|both)
136// <collection> <id> <nonce1> <nonce2> <sig>" command which identifies its
137// RSA key, notes the role it wishes to play in the synchronization,
138// identifies the collection it wishes to sync with, signs the previous
139// nonce with its own key, and issues a nonce of its own for mutual
140// authentication.
141//
142// the server can then respond with a "confirm <sig>" command, which is
143// the signature of the second nonce sent by the client. this
144// transitions the peers into an authenticated state and begins refinement.
145//
146// refinement begins with the client sending its root public key and
147// manifest certificate merkle nodes to the server. the server then
148// compares the root to each slot in *its* root node, and for each slot
149// either sends refined subtrees to the client, or (if it detects a missing
150// item in one collection or the other) sends either "data" or "send_data"
151// commands corresponding to the role of the missing item (source or
152// sink). the client then receives each refined subtree and compares it
153// with its own, performing similar description/request behavior depending
154// on role, and the cycle continues.
155//
156// detecting the end of refinement is subtle: after sending the refinement
157// of the root node, the server sends a "done 0" command (queued behind all
158// the other refinement traffic). when either peer receives a "done N"
159// command it immediately responds with a "done N+1" command. when two done
160// commands for a given merkle tree arrive with no interveining refinements,
161// the entire merkle tree is considered complete.
162//
163// any "send_data" command received prompts a "data" command in response,
164// if the requested item exists. if an item does not exist, a "nonexistant"
165// response command is sent.
166//
167// once a response is received for each requested key and revision cert
168// (either data or nonexistant) the requesting party walks the graph of
169// received revision certs and transmits send_data or send_delta commands
170// for all the revisions mentionned in the certs which it does not already
171// have in its database.
172//
173// for each revision it receives, the recipient requests all the file data or
174// deltas described in that revision.
175//
176// once all requested files, manifests, revisions and certs are received (or
177// noted as nonexistant), the recipient closes its connection.
178//
179// (aside: this protocol is raw binary because coding density is actually
180// important here, and each packet consists of very information-dense
181// material that you wouldn't have a hope of typing in manually anyways)
182//
183
184using namespace boost;
185using namespace std;
186
187static inline void
188require(bool check, string const & context)
189{
190 if (!check)
191 throw bad_decode(F("check of '%s' failed") % context);
192}
193
194struct
195done_marker
196{
197 bool current_level_had_refinements;
198 bool tree_is_done;
199 done_marker() :
200 current_level_had_refinements(false),
201 tree_is_done(false)
202 {}
203};
204
205struct
206session
207{
208 protocol_role role;
209 protocol_voice const voice;
210 vector<utf8> const & collections;
211 set<string> const & all_collections;
212 app_state & app;
213
214 string peer_id;
215 Netxx::socket_type fd;
216 Netxx::Stream str;
217
218 string inbuf;
219 string outbuf;
220
221 netcmd cmd;
222 bool armed;
223 bool arm();
224
225 utf8 collection;
226 id remote_peer_key_hash;
227 bool authenticated;
228
229 time_t last_io_time;
230 auto_ptr<ticker> byte_in_ticker;
231 auto_ptr<ticker> byte_out_ticker;
232 auto_ptr<ticker> cert_in_ticker;
233 auto_ptr<ticker> cert_out_ticker;
234 auto_ptr<ticker> revision_in_ticker;
235 auto_ptr<ticker> revision_out_ticker;
236
237 map< std::pair<utf8, netcmd_item_type>,
238 boost::shared_ptr<merkle_table> > merkle_tables;
239
240 map<netcmd_item_type, done_marker> done_refinements;
241 map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items;
242 map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry;
243 set< pair<id, id> > reverse_delta_requests;
244 bool analyzed_ancestry;
245
246 id saved_nonce;
247 bool received_goodbye;
248 bool sent_goodbye;
249 boost::scoped_ptr<CryptoPP::AutoSeededRandomPool> prng;
250
251 packet_db_valve dbw;
252
253 session(protocol_role role,
254 protocol_voice voice,
255 vector<utf8> const & collections,
256 set<string> const & all_collections,
257 app_state & app,
258 string const & peer,
259 Netxx::socket_type sock,
260 Netxx::Timeout const & to);
261
262 virtual ~session() {}
263
264 id mk_nonce();
265 void mark_recent_io();
266
267 bool done_all_refinements();
268 bool cert_refinement_done();
269 bool all_requested_revisions_received();
270
271 void note_item_requested(netcmd_item_type ty, id const & i);
272 bool item_request_outstanding(netcmd_item_type ty, id const & i);
273 void note_item_arrived(netcmd_item_type ty, id const & i);
274
275 void maybe_note_epochs_finished();
276
277 void note_item_sent(netcmd_item_type ty, id const & i);
278
279 bool got_all_data();
280 void maybe_say_goodbye();
281
282 void analyze_attachment(revision_id const & i,
283 set<revision_id> & visited,
284 map<revision_id, bool> & attached);
285 void request_rev_revisions(revision_id const & init,
286 map<revision_id, bool> attached,
287 set<revision_id> visited);
288 void request_fwd_revisions(revision_id const & i,
289 map<revision_id, bool> attached,
290 set<revision_id> & visited);
291 void analyze_ancestry_graph();
292 void analyze_manifest(manifest_map const & man);
293
294 Netxx::Probe::ready_type which_events() const;
295 bool read_some();
296 bool write_some();
297
298 bool encountered_error;
299 void error(string const & errmsg);
300
301 void write_netcmd_and_try_flush(netcmd const & cmd);
302 void queue_bye_cmd();
303 void queue_error_cmd(string const & errmsg);
304 void queue_done_cmd(size_t level, netcmd_item_type type);
305 void queue_hello_cmd(id const & server,
306 id const & nonce);
307 void queue_anonymous_cmd(protocol_role role,
308 string const & collection,
309 id const & nonce2);
310 void queue_auth_cmd(protocol_role role,
311 string const & collection,
312 id const & client,
313 id const & nonce1,
314 id const & nonce2,
315 string const & signature);
316 void queue_confirm_cmd(string const & signature);
317 void queue_refine_cmd(merkle_node const & node);
318 void queue_send_data_cmd(netcmd_item_type type,
319 id const & item);
320 void queue_send_delta_cmd(netcmd_item_type type,
321 id const & base,
322 id const & ident);
323 void queue_data_cmd(netcmd_item_type type,
324 id const & item,
325 string const & dat);
326 void queue_delta_cmd(netcmd_item_type type,
327 id const & base,
328 id const & ident,
329 delta const & del);
330 void queue_nonexistant_cmd(netcmd_item_type type,
331 id const & item);
332
333 bool process_bye_cmd();
334 bool process_error_cmd(string const & errmsg);
335 bool process_done_cmd(size_t level, netcmd_item_type type);
336 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
337 rsa_pub_key const & server_key,
338 id const & nonce);
339 bool process_anonymous_cmd(protocol_role role,
340 string const & collection,
341 id const & nonce2);
342 bool process_auth_cmd(protocol_role role,
343 string const & collection,
344 id const & client,
345 id const & nonce1,
346 id const & nonce2,
347 string const & signature);
348 bool process_confirm_cmd(string const & signature);
349 bool process_refine_cmd(merkle_node const & node);
350 bool process_send_data_cmd(netcmd_item_type type,
351 id const & item);
352 bool process_send_delta_cmd(netcmd_item_type type,
353 id const & base,
354 id const & ident);
355 bool process_data_cmd(netcmd_item_type type,
356 id const & item,
357 string const & dat);
358 bool process_delta_cmd(netcmd_item_type type,
359 id const & base,
360 id const & ident,
361 delta const & del);
362 bool process_nonexistant_cmd(netcmd_item_type type,
363 id const & item);
364
365 bool merkle_node_exists(netcmd_item_type type,
366 utf8 const & collection,
367 size_t level,
368 prefix const & pref);
369
370 void load_merkle_node(netcmd_item_type type,
371 utf8 const & collection,
372 size_t level,
373 prefix const & pref,
374 merkle_ptr & node);
375
376 void rebuild_merkle_trees(app_state & app,
377 utf8 const & collection);
378
379 void load_epoch(cert_value const & branchname, epoch_id const & epoch);
380
381 bool dispatch_payload(netcmd const & cmd);
382 void begin_service();
383 bool process();
384};
385
386
387struct
388root_prefix
389{
390 prefix val;
391 root_prefix() : val("")
392 {}
393};
394
395static root_prefix const &
396get_root_prefix()
397{
398 // this is not a static variable for a bizarre reason: mac OSX runs
399 // static initializers in the "wrong" order (application before
400 // libraries), so the initializer for a static string in cryptopp runs
401 // after the initializer for a static variable outside a function
402 // here. therefore encode_hexenc() fails in the static initializer here
403 // and the program crashes. curious, eh?
404 static root_prefix ROOT_PREFIX;
405 return ROOT_PREFIX;
406}
407
408
409session::session(protocol_role role,
410 protocol_voice voice,
411 vector<utf8> const & collections,
412 set<string> const & all_coll,
413 app_state & app,
414 string const & peer,
415 Netxx::socket_type sock,
416 Netxx::Timeout const & to) :
417 role(role),
418 voice(voice),
419 collections(collections),
420 all_collections(all_coll),
421 app(app),
422 peer_id(peer),
423 fd(sock),
424 str(sock, to),
425 inbuf(""),
426 outbuf(""),
427 armed(false),
428 collection(""),
429 remote_peer_key_hash(""),
430 authenticated(false),
431 last_io_time(::time(NULL)),
432 byte_in_ticker(NULL),
433 byte_out_ticker(NULL),
434 cert_in_ticker(NULL),
435 cert_out_ticker(NULL),
436 revision_in_ticker(NULL),
437 revision_out_ticker(NULL),
438 analyzed_ancestry(false),
439 saved_nonce(""),
440 received_goodbye(false),
441 sent_goodbye(false),
442 dbw(app, true),
443 encountered_error(false)
444{
445 if (voice == client_voice)
446 {
447 N(collections.size() == 1,
448 F("client can only sync one collection at a time"));
449 this->collection = idx(collections, 0);
450 }
451
452 // we will panic here if the user doesn't like urandom and we can't give
453 // them a real entropy-driven random.
454 bool request_blocking_rng = false;
455 if (!app.lua.hook_non_blocking_rng_ok())
456 {
457#ifndef BLOCKING_RNG_AVAILABLE
458 throw oops("no blocking RNG available and non-blocking RNG rejected");
459#else
460 request_blocking_rng = true;
461#endif
462 }
463 prng.reset(new CryptoPP::AutoSeededRandomPool(request_blocking_rng));
464
465 done_refinements.insert(make_pair(cert_item, done_marker()));
466 done_refinements.insert(make_pair(key_item, done_marker()));
467 done_refinements.insert(make_pair(epoch_item, done_marker()));
468
469 requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
470 requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
471 requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
472 requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
473 requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
474 requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
475
476 for (vector<utf8>::const_iterator i = collections.begin();
477 i != collections.end(); ++i)
478 {
479 rebuild_merkle_trees(app, *i);
480 }
481}
482
483id
484session::mk_nonce()
485{
486 I(this->saved_nonce().size() == 0);
487 char buf[constants::merkle_hash_length_in_bytes];
488 prng->GenerateBlock(reinterpret_cast<byte *>(buf), constants::merkle_hash_length_in_bytes);
489 this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
490 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
491 return this->saved_nonce;
492}
493
494void
495session::mark_recent_io()
496{
497 last_io_time = ::time(NULL);
498}
499
500bool
501session::done_all_refinements()
502{
503 bool all = true;
504 for(map< netcmd_item_type, done_marker>::const_iterator j = done_refinements.begin();
505 j != done_refinements.end(); ++j)
506 {
507 if (j->second.tree_is_done == false)
508 all = false;
509 }
510 return all;
511}
512
513
514bool
515session::cert_refinement_done()
516{
517 return done_refinements[cert_item].tree_is_done;
518}
519
520bool
521session::got_all_data()
522{
523 for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i =
524 requested_items.begin(); i != requested_items.end(); ++i)
525 {
526 if (! i->second->empty())
527 return false;
528 }
529 return true;
530}
531
532bool
533session::all_requested_revisions_received()
534{
535 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
536 i = requested_items.find(revision_item);
537 I(i != requested_items.end());
538 return i->second->empty();
539}
540
541void
542session::maybe_note_epochs_finished()
543{
544 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
545 i = requested_items.find(epoch_item);
546 I(i != requested_items.end());
547 // Maybe there are outstanding epoch requests.
548 if (!i->second->empty())
549 return;
550 // And maybe we haven't even finished the refinement.
551 if (!done_refinements[epoch_item].tree_is_done)
552 return;
553 // But otherwise, we're ready to go!
554 L(F("all epochs processed, opening database valve\n"));
555 this->dbw.open_valve();
556}
557
558void
559session::note_item_requested(netcmd_item_type ty, id const & ident)
560{
561 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
562 i = requested_items.find(ty);
563 I(i != requested_items.end());
564 i->second->insert(ident);
565}
566
567void
568session::note_item_arrived(netcmd_item_type ty, id const & ident)
569{
570 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
571 i = requested_items.find(ty);
572 I(i != requested_items.end());
573 i->second->erase(ident);
574
575 switch (ty)
576 {
577 case cert_item:
578 if (cert_in_ticker.get() != NULL)
579 ++(*cert_in_ticker);
580 break;
581 case revision_item:
582 if (revision_in_ticker.get() != NULL)
583 ++(*revision_in_ticker);
584 break;
585 default:
586 // No ticker for other things.
587 break;
588 }
589}
590
591bool
592session::item_request_outstanding(netcmd_item_type ty, id const & ident)
593{
594 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
595 i = requested_items.find(ty);
596 I(i != requested_items.end());
597 return i->second->find(ident) != i->second->end();
598}
599
600
601void
602session::note_item_sent(netcmd_item_type ty, id const & ident)
603{
604 switch (ty)
605 {
606 case cert_item:
607 if (cert_out_ticker.get() != NULL)
608 ++(*cert_out_ticker);
609 break;
610 case revision_item:
611 if (revision_out_ticker.get() != NULL)
612 ++(*revision_out_ticker);
613 break;
614 default:
615 // No ticker for other things.
616 break;
617 }
618}
619
620void
621session::write_netcmd_and_try_flush(netcmd const & cmd)
622{
623 if (!encountered_error)
624 write_netcmd(cmd, outbuf);
625 else
626 L(F("dropping outgoing netcmd (because we're in error unwind mode)\n"));
627 // FIXME: this helps keep the protocol pipeline full but it seems to
628 // interfere with initial and final sequences. careful with it.
629 // write_some();
630 // read_some();
631}
632
633// This method triggers a special "error unwind" mode to netsync. In this
634// mode, all received data is ignored, and no new data is queued. We simply
635// stay connected long enough for the current write buffer to be flushed, to
636// ensure that our peer receives the error message.
637// WARNING WARNING WARNING (FIXME): this does _not_ throw an exception. if
638// while processing any given netcmd packet you encounter an error, you can
639// _only_ call this method if you have not touched the database, because if
640// you have touched the database then you need to throw an exception to
641// trigger a rollback.
642// you could, of course, call this method and then throw an exception, but
643// there is no point in doing that, because throwing the exception will cause
644// the connection to be immediately terminated, so your call to error() will
645// actually have no effect (except to cause your error message to be printed
646// twice).
647void
648session::error(std::string const & errmsg)
649{
650 W(F("error: %s\n") % errmsg);
651 queue_error_cmd(errmsg);
652 encountered_error = true;
653}
654
655void
656session::analyze_manifest(manifest_map const & man)
657{
658 L(F("analyzing %d entries in manifest\n") % man.size());
659 for (manifest_map::const_iterator i = man.begin();
660 i != man.end(); ++i)
661 {
662 if (! this->app.db.file_version_exists(manifest_entry_id(i)))
663 {
664 id tmp;
665 decode_hexenc(manifest_entry_id(i).inner(), tmp);
666 queue_send_data_cmd(file_item, tmp);
667 }
668 }
669}
670
671static bool
672is_attached(revision_id const & i,
673 map<revision_id, bool> const & attach_map)
674{
675 map<revision_id, bool>::const_iterator j = attach_map.find(i);
676 I(j != attach_map.end());
677 return j->second;
678}
679
680// this tells us whether a particular revision is "attached" -- meaning
681// either our database contains the underlying manifest or else one of our
682// parents (recursively, and only in the current ancestry graph we're
683// requesting) is attached. if it's detached we will request it using a
684// different (more efficient and less failure-prone) algorithm
685
686void
687session::analyze_attachment(revision_id const & i,
688 set<revision_id> & visited,
689 map<revision_id, bool> & attached)
690{
691 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
692
693 if (visited.find(i) != visited.end())
694 return;
695
696 visited.insert(i);
697
698 bool curr_attached = false;
699
700 if (app.db.revision_exists(i))
701 {
702 L(F("revision %s is attached via database\n") % i);
703 curr_attached = true;
704 }
705 else
706 {
707 L(F("checking attachment of %s in ancestry\n") % i);
708 ancestryT::const_iterator j = ancestry.find(i);
709 if (j != ancestry.end())
710 {
711 for (edge_map::const_iterator k = j->second->second.edges.begin();
712 k != j->second->second.edges.end(); ++k)
713 {
714 L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k));
715 analyze_attachment(edge_old_revision(k), visited, attached);
716 if (is_attached(edge_old_revision(k), attached))
717 {
718 L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k));
719 curr_attached = true;
720 }
721 }
722 }
723 }
724 L(F("decided that revision %s %s attached\n") % i % (curr_attached ? "is" : "is not"));
725 attached[i] = curr_attached;
726}
727
728static inline id
729plain_id(manifest_id const & i)
730{
731 id tmp;
732 hexenc<id> htmp(i.inner());
733 decode_hexenc(htmp, tmp);
734 return tmp;
735}
736
737static inline id
738plain_id(file_id const & i)
739{
740 id tmp;
741 hexenc<id> htmp(i.inner());
742 decode_hexenc(htmp, tmp);
743 return tmp;
744}
745
746void
747session::request_rev_revisions(revision_id const & init,
748 map<revision_id, bool> attached,
749 set<revision_id> visited)
750{
751 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
752
753 set<manifest_id> seen_manifests;
754 set<file_id> seen_files;
755
756 set<revision_id> frontier;
757 frontier.insert(init);
758 while(!frontier.empty())
759 {
760 set<revision_id> next_frontier;
761 for (set<revision_id>::const_iterator i = frontier.begin();
762 i != frontier.end(); ++i)
763 {
764 if (is_attached(*i, attached))
765 continue;
766
767 if (visited.find(*i) != visited.end())
768 continue;
769
770 visited.insert(*i);
771
772 ancestryT::const_iterator j = ancestry.find(*i);
773 if (j != ancestry.end())
774 {
775
776 for (edge_map::const_iterator k = j->second->second.edges.begin();
777 k != j->second->second.edges.end(); ++k)
778 {
779
780 next_frontier.insert(edge_old_revision(k));
781
782 // check out the manifest delta edge
783 manifest_id parent_manifest = edge_old_manifest(k);
784 manifest_id child_manifest = j->second->second.new_manifest;
785
786 // first, if we have a child we've never seen before we will need
787 // to request it in its entrety.
788 if (seen_manifests.find(child_manifest) == seen_manifests.end())
789 {
790 if (this->app.db.manifest_version_exists(child_manifest))
791 L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest);
792 else
793 {
794 L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest);
795 queue_send_data_cmd(manifest_item, plain_id(child_manifest));
796 }
797 seen_manifests.insert(child_manifest);
798 }
799
800 // second, if the parent is nonempty, we want to ask for an edge to it
801 if (!parent_manifest.inner()().empty())
802 {
803 if (this->app.db.manifest_version_exists(parent_manifest))
804 L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest);
805 else
806 {
807 L(F("requesting (in reverse) manifest delta %s -> %s\n")
808 % child_manifest % parent_manifest);
809 reverse_delta_requests.insert(make_pair(plain_id(child_manifest),
810 plain_id(parent_manifest)));
811 queue_send_delta_cmd(manifest_item,
812 plain_id(child_manifest),
813 plain_id(parent_manifest));
814 }
815 seen_manifests.insert(parent_manifest);
816 }
817
818
819
820 // check out each file delta edge
821 change_set const & cset = edge_changes(k);
822 for (change_set::delta_map::const_iterator d = cset.deltas.begin();
823 d != cset.deltas.end(); ++d)
824 {
825 file_id parent_file (delta_entry_src(d));
826 file_id child_file (delta_entry_dst(d));
827
828
829 // first, if we have a child we've never seen before we will need
830 // to request it in its entrety.
831 if (seen_files.find(child_file) == seen_files.end())
832 {
833 if (this->app.db.file_version_exists(child_file))
834 L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file);
835 else
836 {
837 L(F("requesting (in reverse) initial file data %s\n") % child_file);
838 queue_send_data_cmd(file_item, plain_id(child_file));
839 }
840 seen_files.insert(child_file);
841 }
842
843 // second, if the parent is nonempty, we want to ask for an edge to it
844 if (!parent_file.inner()().empty())
845 {
846 if (this->app.db.file_version_exists(parent_file))
847 L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file);
848 else
849 {
850 L(F("requesting (in reverse) file delta %s -> %s on %s\n")
851 % child_file % parent_file % delta_entry_path(d));
852 reverse_delta_requests.insert(make_pair(plain_id(child_file),
853 plain_id(parent_file)));
854 queue_send_delta_cmd(file_item,
855 plain_id(child_file),
856 plain_id(parent_file));
857 }
858 seen_files.insert(parent_file);
859 }
860 }
861 }
862
863 // now actually consume the data packet, which will wait on the
864 // arrival of its prerequisites in the packet_db_writer
865 this->dbw.consume_revision_data(j->first, j->second->first);
866 }
867 }
868 frontier = next_frontier;
869 }
870}
871
872void
873session::request_fwd_revisions(revision_id const & i,
874 map<revision_id, bool> attached,
875 set<revision_id> & visited)
876{
877 if (visited.find(i) != visited.end())
878 return;
879
880 visited.insert(i);
881
882 L(F("visiting revision '%s' for forward deltas\n") % i);
883
884 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
885
886 ancestryT::const_iterator j = ancestry.find(i);
887 if (j != ancestry.end())
888 {
889 edge_map::const_iterator an_attached_edge = j->second->second.edges.end();
890
891 // first make sure we've requested enough to get to here by
892 // calling ourselves recursively. this is the forward path after all.
893
894 for (edge_map::const_iterator k = j->second->second.edges.begin();
895 k != j->second->second.edges.end(); ++k)
896 {
897 if (is_attached(edge_old_revision(k), attached))
898 {
899 request_fwd_revisions(edge_old_revision(k), attached, visited);
900 an_attached_edge = k;
901 }
902 }
903
904 I(an_attached_edge != j->second->second.edges.end());
905
906 // check out the manifest delta edge
907 manifest_id parent_manifest = edge_old_manifest(an_attached_edge);
908 manifest_id child_manifest = j->second->second.new_manifest;
909 if (this->app.db.manifest_version_exists(child_manifest))
910 L(F("not requesting forward manifest delta to '%s' as we already have it\n")
911 % child_manifest);
912 else
913 {
914 if (parent_manifest.inner()().empty())
915 {
916 L(F("requesting full manifest data %s\n") % child_manifest);
917 queue_send_data_cmd(manifest_item, plain_id(child_manifest));
918 }
919 else
920 {
921 L(F("requesting forward manifest delta %s -> %s\n")
922 % parent_manifest % child_manifest);
923 queue_send_delta_cmd(manifest_item,
924 plain_id(parent_manifest),
925 plain_id(child_manifest));
926 }
927 }
928
929 // check out each file delta edge
930 change_set const & an_attached_cset = edge_changes(an_attached_edge);
931 for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin();
932 k != an_attached_cset.deltas.end(); ++k)
933 {
934 if (this->app.db.file_version_exists(delta_entry_dst(k)))
935 L(F("not requesting forward delta %s -> %s on file %s as we already have it\n")
936 % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
937 else
938 {
939 if (delta_entry_src(k).inner()().empty())
940 {
941 L(F("requesting full file data %s\n") % delta_entry_dst(k));
942 queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k)));
943 }
944 else
945 {
946
947 L(F("requesting forward delta %s -> %s on file %s\n")
948 % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
949 queue_send_delta_cmd(file_item,
950 plain_id(delta_entry_src(k)),
951 plain_id(delta_entry_dst(k)));
952 }
953 }
954 }
955 // now actually consume the data packet, which will wait on the
956 // arrival of its prerequisites in the packet_db_writer
957 this->dbw.consume_revision_data(j->first, j->second->first);
958 }
959}
960
961void
962session::analyze_ancestry_graph()
963{
964 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
965
966 if (! (all_requested_revisions_received() && cert_refinement_done()))
967 return;
968
969 if (analyzed_ancestry)
970 return;
971
972 set<revision_id> heads;
973 {
974 set<revision_id> nodes, parents;
975 L(F("analyzing %d ancestry edges\n") % ancestry.size());
976
977 for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
978 {
979 nodes.insert(i->first);
980 for (edge_map::const_iterator j = i->second->second.edges.begin();
981 j != i->second->second.edges.end(); ++j)
982 {
983 parents.insert(edge_old_revision(j));
984 }
985 }
986
987 set_difference(nodes.begin(), nodes.end(),
988 parents.begin(), parents.end(),
989 inserter(heads, heads.begin()));
990 }
991
992 L(F("isolated %d heads\n") % heads.size());
993
994 // first we determine the "attachment status" of each node in our ancestry
995 // graph.
996
997 map<revision_id, bool> attached;
998 set<revision_id> visited;
999 for (set<revision_id>::const_iterator i = heads.begin(); i != heads.end(); ++i)
1000 analyze_attachment(*i, visited, attached);
1001
1002 // then we walk the graph upwards, recursively, starting from each of the
1003 // heads. we either walk requesting forward deltas or reverse deltas,
1004 // depending on whether we are walking an attached or detached subgraph,
1005 // respectively. the forward walk ignores detached nodes, the backward walk
1006 // ignores attached nodes.
1007
1008 set<revision_id> fwd_visited, rev_visited;
1009
1010 for (set<revision_id>::const_iterator i = heads.begin(); i != heads.end(); ++i)
1011 {
1012 map<revision_id, bool>::const_iterator k = attached.find(*i);
1013 I(k != attached.end());
1014
1015 if (k->second)
1016 {
1017 L(F("requesting attached ancestry of revision '%s'\n") % *i);
1018 request_fwd_revisions(*i, attached, fwd_visited);
1019 }
1020 else
1021 {
1022 L(F("requesting detached ancestry of revision '%s'\n") % *i);
1023 request_rev_revisions(*i, attached, rev_visited);
1024 }
1025 }
1026 analyzed_ancestry = true;
1027}
1028
1029Netxx::Probe::ready_type
1030session::which_events() const
1031{
1032 if (outbuf.empty())
1033 {
1034 if (inbuf.size() < constants::netcmd_maxsz)
1035 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1036 else
1037 return Netxx::Probe::ready_oobd;
1038 }
1039 else
1040 {
1041 if (inbuf.size() < constants::netcmd_maxsz)
1042 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1043 else
1044 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1045 }
1046}
1047
1048bool
1049session::read_some()
1050{
1051 I(inbuf.size() < constants::netcmd_maxsz);
1052 char tmp[constants::bufsz];
1053 Netxx::signed_size_type count = str.read(tmp, sizeof(tmp));
1054 if (count > 0)
1055 {
1056 L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
1057 if (encountered_error)
1058 {
1059 L(F("in error unwind mode, so throwing them into the bit bucket\n"));
1060 return true;
1061 }
1062 inbuf.append(string(tmp, tmp + count));
1063 mark_recent_io();
1064 if (byte_in_ticker.get() != NULL)
1065 (*byte_in_ticker) += count;
1066 return true;
1067 }
1068 else
1069 return false;
1070}
1071
1072bool
1073session::write_some()
1074{
1075 I(!outbuf.empty());
1076 Netxx::signed_size_type count = str.write(outbuf.data(),
1077 std::min(outbuf.size(), constants::bufsz));
1078 if(count > 0)
1079 {
1080 outbuf.erase(0, count);
1081 L(F("wrote %d bytes to fd %d (peer %s), %d remain in output buffer\n")
1082 % count % fd % peer_id % outbuf.size());
1083 mark_recent_io();
1084 if (byte_out_ticker.get() != NULL)
1085 (*byte_out_ticker) += count;
1086 if (encountered_error && outbuf.empty())
1087 {
1088 // we've flushed our error message, so it's time to get out.
1089 L(F("finished flushing output queue in error unwind mode, disconnecting\n"));
1090 return false;
1091 }
1092 return true;
1093 }
1094 else
1095 return false;
1096}
1097
1098// senders
1099
1100void
1101session::queue_bye_cmd()
1102{
1103 L(F("queueing 'bye' command\n"));
1104 netcmd cmd;
1105 cmd.cmd_code = bye_cmd;
1106 write_netcmd_and_try_flush(cmd);
1107 this->sent_goodbye = true;
1108}
1109
1110void
1111session::queue_error_cmd(string const & errmsg)
1112{
1113 L(F("queueing 'error' command\n"));
1114 netcmd cmd;
1115 cmd.cmd_code = error_cmd;
1116 write_error_cmd_payload(errmsg, cmd.payload);
1117 write_netcmd_and_try_flush(cmd);
1118 this->sent_goodbye = true;
1119}
1120
1121void
1122session::queue_done_cmd(size_t level,
1123 netcmd_item_type type)
1124{
1125 string typestr;
1126 netcmd_item_type_to_string(type, typestr);
1127 L(F("queueing 'done' command for %s level %s\n") % typestr % level);
1128 netcmd cmd;
1129 cmd.cmd_code = done_cmd;
1130 write_done_cmd_payload(level, type, cmd.payload);
1131 write_netcmd_and_try_flush(cmd);
1132}
1133
1134void
1135session::queue_hello_cmd(id const & server,
1136 id const & nonce)
1137{
1138 netcmd cmd;
1139 cmd.cmd_code = hello_cmd;
1140 hexenc<id> server_encoded;
1141 encode_hexenc(server, server_encoded);
1142
1143 rsa_keypair_id key_name;
1144 base64<rsa_pub_key> pub_encoded;
1145 rsa_pub_key pub;
1146
1147 app.db.get_pubkey(server_encoded, key_name, pub_encoded);
1148 decode_base64(pub_encoded, pub);
1149 write_hello_cmd_payload(key_name, pub, nonce, cmd.payload);
1150 write_netcmd_and_try_flush(cmd);
1151}
1152
1153void
1154session::queue_anonymous_cmd(protocol_role role,
1155 string const & collection,
1156 id const & nonce2)
1157{
1158 netcmd cmd;
1159 cmd.cmd_code = anonymous_cmd;
1160 write_anonymous_cmd_payload(role, collection, nonce2, cmd.payload);
1161 write_netcmd_and_try_flush(cmd);
1162}
1163
1164void
1165session::queue_auth_cmd(protocol_role role,
1166 string const & collection,
1167 id const & client,
1168 id const & nonce1,
1169 id const & nonce2,
1170 string const & signature)
1171{
1172 netcmd cmd;
1173 cmd.cmd_code = auth_cmd;
1174 write_auth_cmd_payload(role, collection, client,
1175 nonce1, nonce2, signature,
1176 cmd.payload);
1177 write_netcmd_and_try_flush(cmd);
1178}
1179
1180void
1181session::queue_confirm_cmd(string const & signature)
1182{
1183 netcmd cmd;
1184 cmd.cmd_code = confirm_cmd;
1185 write_confirm_cmd_payload(signature, cmd.payload);
1186 write_netcmd_and_try_flush(cmd);
1187}
1188
1189void
1190session::queue_refine_cmd(merkle_node const & node)
1191{
1192 string typestr;
1193 hexenc<prefix> hpref;
1194 node.get_hex_prefix(hpref);
1195 netcmd_item_type_to_string(node.type, typestr);
1196 L(F("queueing request for refinement of %s node '%s', level %d\n")
1197 % typestr % hpref % static_cast<int>(node.level));
1198 netcmd cmd;
1199 cmd.cmd_code = refine_cmd;
1200 write_refine_cmd_payload(node, cmd.payload);
1201 write_netcmd_and_try_flush(cmd);
1202}
1203
1204void
1205session::queue_send_data_cmd(netcmd_item_type type,
1206 id const & item)
1207{
1208 string typestr;
1209 netcmd_item_type_to_string(type, typestr);
1210 hexenc<id> hid;
1211 encode_hexenc(item, hid);
1212
1213 if (role == source_role)
1214 {
1215 L(F("not queueing request for %s '%s' as we are in pure source role\n")
1216 % typestr % hid);
1217 return;
1218 }
1219
1220 if (item_request_outstanding(type, item))
1221 {
1222 L(F("not queueing request for %s '%s' as we already requested it\n")
1223 % typestr % hid);
1224 return;
1225 }
1226
1227 L(F("queueing request for data of %s item '%s'\n")
1228 % typestr % hid);
1229 netcmd cmd;
1230 cmd.cmd_code = send_data_cmd;
1231 write_send_data_cmd_payload(type, item, cmd.payload);
1232 write_netcmd_and_try_flush(cmd);
1233 note_item_requested(type, item);
1234}
1235
1236void
1237session::queue_send_delta_cmd(netcmd_item_type type,
1238 id const & base,
1239 id const & ident)
1240{
1241 I(type == manifest_item || type == file_item);
1242
1243 string typestr;
1244 netcmd_item_type_to_string(type, typestr);
1245 hexenc<id> base_hid;
1246 encode_hexenc(base, base_hid);
1247 hexenc<id> ident_hid;
1248 encode_hexenc(ident, ident_hid);
1249
1250 if (role == source_role)
1251 {
1252 L(F("not queueing request for %s delta '%s' -> '%s' as we are in pure source role\n")
1253 % typestr % base_hid % ident_hid);
1254 return;
1255 }
1256
1257 if (item_request_outstanding(type, ident))
1258 {
1259 L(F("not queueing request for %s delta '%s' -> '%s' as we already requested the target\n")
1260 % typestr % base_hid % ident_hid);
1261 return;
1262 }
1263
1264 L(F("queueing request for contents of %s delta '%s' -> '%s'\n")
1265 % typestr % base_hid % ident_hid);
1266 netcmd cmd;
1267 cmd.cmd_code = send_delta_cmd;
1268 write_send_delta_cmd_payload(type, base, ident, cmd.payload);
1269 write_netcmd_and_try_flush(cmd);
1270 note_item_requested(type, ident);
1271}
1272
1273void
1274session::queue_data_cmd(netcmd_item_type type,
1275 id const & item,
1276 string const & dat)
1277{
1278 string typestr;
1279 netcmd_item_type_to_string(type, typestr);
1280 hexenc<id> hid;
1281 encode_hexenc(item, hid);
1282
1283 if (role == sink_role)
1284 {
1285 L(F("not queueing %s data for '%s' as we are in pure sink role\n")
1286 % typestr % hid);
1287 return;
1288 }
1289
1290 L(F("queueing %d bytes of data for %s item '%s'\n")
1291 % dat.size() % typestr % hid);
1292 netcmd cmd;
1293 cmd.cmd_code = data_cmd;
1294 write_data_cmd_payload(type, item, dat, cmd.payload);
1295 write_netcmd_and_try_flush(cmd);
1296 note_item_sent(type, item);
1297}
1298
1299void
1300session::queue_delta_cmd(netcmd_item_type type,
1301 id const & base,
1302 id const & ident,
1303 delta const & del)
1304{
1305 I(type == manifest_item || type == file_item);
1306 I(! del().empty() || ident == base);
1307 string typestr;
1308 netcmd_item_type_to_string(type, typestr);
1309 hexenc<id> base_hid;
1310 encode_hexenc(base, base_hid);
1311 hexenc<id> ident_hid;
1312 encode_hexenc(ident, ident_hid);
1313
1314 if (role == sink_role)
1315 {
1316 L(F("not queueing %s delta '%s' -> '%s' as we are in pure sink role\n")
1317 % typestr % base_hid % ident_hid);
1318 return;
1319 }
1320
1321 L(F("queueing %s delta '%s' -> '%s'\n")
1322 % typestr % base_hid % ident_hid);
1323 netcmd cmd;
1324 cmd.cmd_code = delta_cmd;
1325 write_delta_cmd_payload(type, base, ident, del, cmd.payload);
1326 write_netcmd_and_try_flush(cmd);
1327 note_item_sent(type, ident);
1328}
1329
1330void
1331session::queue_nonexistant_cmd(netcmd_item_type type,
1332 id const & item)
1333{
1334 string typestr;
1335 netcmd_item_type_to_string(type, typestr);
1336 hexenc<id> hid;
1337 encode_hexenc(item, hid);
1338 if (role == sink_role)
1339 {
1340 L(F("not queueing note of nonexistence of %s item '%s' as we are in pure sink role\n")
1341 % typestr % hid);
1342 return;
1343 }
1344
1345 L(F("queueing note of nonexistance of %s item '%s'\n")
1346 % typestr % hid);
1347 netcmd cmd;
1348 cmd.cmd_code = nonexistant_cmd;
1349 write_nonexistant_cmd_payload(type, item, cmd.payload);
1350 write_netcmd_and_try_flush(cmd);
1351}
1352
1353// processors
1354
1355bool
1356session::process_bye_cmd()
1357{
1358 L(F("received 'bye' netcmd\n"));
1359 this->received_goodbye = true;
1360 return true;
1361}
1362
1363bool
1364session::process_error_cmd(string const & errmsg)
1365{
1366 throw bad_decode(F("received network error: %s\n") % errmsg);
1367}
1368
1369bool
1370session::process_done_cmd(size_t level, netcmd_item_type type)
1371{
1372
1373 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(type);
1374 I(i != done_refinements.end());
1375
1376 string typestr;
1377 netcmd_item_type_to_string(type, typestr);
1378
1379 if ((! i->second.current_level_had_refinements) || (level >= 0xff))
1380 {
1381 // we received *no* refinements on this level -- or we ran out of
1382 // levels -- so refinement for this type is finished.
1383 L(F("received 'done' for empty %s level %d, marking as complete\n")
1384 % typestr % static_cast<int>(level));
1385
1386 // possibly echo it back one last time, for shutdown purposes
1387 if (!i->second.tree_is_done)
1388 queue_done_cmd(level + 1, type);
1389
1390 // tombstone it
1391 i->second.current_level_had_refinements = false;
1392 i->second.tree_is_done = true;
1393
1394 if (all_requested_revisions_received())
1395 analyze_ancestry_graph();
1396
1397 maybe_note_epochs_finished();
1398 }
1399
1400 else if (i->second.current_level_had_refinements
1401 && (! i->second.tree_is_done))
1402 {
1403 // we *did* receive some refinements on this level, reset to zero and
1404 // queue an echo of the 'done' marker.
1405 L(F("received 'done' for %s level %d, which had refinements; "
1406 "sending echo of done for level %d\n")
1407 % typestr
1408 % static_cast<int>(level)
1409 % static_cast<int>(level + 1));
1410 i->second.current_level_had_refinements = false;
1411 queue_done_cmd(level + 1, type);
1412 return true;
1413 }
1414 return true;
1415}
1416
1417static const var_domain known_servers_domain = var_domain("known-servers");
1418
1419bool
1420session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1421 rsa_pub_key const & their_key,
1422 id const & nonce)
1423{
1424 I(this->remote_peer_key_hash().size() == 0);
1425 I(this->saved_nonce().size() == 0);
1426
1427 hexenc<id> their_key_hash;
1428 base64<rsa_pub_key> their_key_encoded;
1429 encode_base64(their_key, their_key_encoded);
1430 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1431 L(F("server key has name %s, hash %s\n") % their_keyname % their_key_hash);
1432 var_key their_key_key(known_servers_domain, var_name(peer_id));
1433 if (app.db.var_exists(their_key_key))
1434 {
1435 var_value expected_key_hash;
1436 app.db.get_var(their_key_key, expected_key_hash);
1437 if (expected_key_hash() != their_key_hash())
1438 {
1439 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"));
1440 P(F("@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"));
1441 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"));
1442 P(F("IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"));
1443 P(F("it is also possible that the server key has just been changed\n"));
1444 P(F("remote host sent key %s\n") % their_key_hash);
1445 P(F("I expected %s\n") % expected_key_hash);
1446 P(F("'monotone unset %s %s' overrides this check\n")
1447 % their_key_key.first % their_key_key.second);
1448 N(false, F("server key changed"));
1449 }
1450 }
1451 else
1452 {
1453 W(F("first time connecting to server %s; authenticity can't be established\n") % peer_id);
1454 W(F("their key is %s\n") % their_key_hash);
1455 app.db.set_var(their_key_key, var_value(their_key_hash()));
1456 }
1457 if (!app.db.public_key_exists(their_key_hash))
1458 {
1459 W(F("saving public key for %s to database\n") % their_keyname);
1460 app.db.put_key(their_keyname, their_key_encoded);
1461 }
1462
1463 {
1464 hexenc<id> hnonce;
1465 encode_hexenc(nonce, hnonce);
1466 L(F("received 'hello' netcmd from server '%s' with nonce '%s'\n")
1467 % their_key_hash % hnonce);
1468 }
1469
1470 I(app.db.public_key_exists(their_key_hash));
1471
1472 // save their identity
1473 {
1474 id their_key_hash_decoded;
1475 decode_hexenc(their_key_hash, their_key_hash_decoded);
1476 this->remote_peer_key_hash = their_key_hash_decoded;
1477 }
1478
1479 if (app.signing_key() != "")
1480 {
1481 // get our public key for its hash identifier
1482 base64<rsa_pub_key> our_pub;
1483 hexenc<id> our_key_hash;
1484 id our_key_hash_raw;
1485 app.db.get_key(app.signing_key, our_pub);
1486 key_hash_code(app.signing_key, our_pub, our_key_hash);
1487 decode_hexenc(our_key_hash, our_key_hash_raw);
1488
1489 // get our private key and make a signature
1490 base64<rsa_sha1_signature> sig;
1491 rsa_sha1_signature sig_raw;
1492 base64< arc4<rsa_priv_key> > our_priv;
1493 load_priv_key(app, app.signing_key, our_priv);
1494 make_signature(app.lua, app.signing_key, our_priv, nonce(), sig);
1495 decode_base64(sig, sig_raw);
1496
1497 // make a new nonce of our own and send off the 'auth'
1498 queue_auth_cmd(this->role, this->collection(), our_key_hash_raw,
1499 nonce, mk_nonce(), sig_raw());
1500 }
1501 else
1502 {
1503 queue_anonymous_cmd(this->role, this->collection(), mk_nonce());
1504 }
1505 return true;
1506}
1507
1508bool
1509session::process_anonymous_cmd(protocol_role role,
1510 string const & collection,
1511 id const & nonce2)
1512{
1513 hexenc<id> hnonce2;
1514 encode_hexenc(nonce2, hnonce2);
1515
1516 L(F("received 'anonymous' netcmd from client for collection '%s' "
1517 "in %s mode with nonce2 '%s'\n")
1518 % collection % (role == source_and_sink_role ? "source and sink" :
1519 (role == source_role ? "source " : "sink"))
1520 % hnonce2);
1521
1522 // check they're asking for a collection we're serving
1523 bool collection_ok = false;
1524 for (vector<utf8>::const_iterator i = collections.begin();
1525 i != collections.end(); ++i)
1526 {
1527 if (*i == collection)
1528 {
1529 collection_ok = true;
1530 break;
1531 }
1532 }
1533 if (!collection_ok)
1534 {
1535 W(F("not currently serving requested collection '%s'\n") % collection);
1536 this->saved_nonce = id("");
1537 return false;
1538 }
1539
1540 //
1541 // internally netsync thinks in terms of sources and sinks. users like
1542 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1543 //
1544 // we therefore use the read/write terminology when dealing with the UI:
1545 // if the user asks to run a "read only" service, this means they are
1546 // willing to be a source but not a sink.
1547 //
1548 // nb: the "role" here is the role the *client* wants to play
1549 // so we need to check that the opposite role is allowed for us,
1550 // in our this->role field.
1551 //
1552
1553 if (role != sink_role)
1554 {
1555 W(F("rejected attempt at anonymous connection for write\n"));
1556 this->saved_nonce = id("");
1557 return false;
1558 }
1559
1560 if (! ((this->role == source_role || this->role == source_and_sink_role)
1561 && app.lua.hook_get_netsync_anonymous_read_permitted(collection)))
1562 {
1563 W(F("anonymous read permission denied for '%s'\n") % collection);
1564 this->saved_nonce = id("");
1565 return false;
1566 }
1567
1568 // get our private key and sign back
1569 L(F("anonymous read permitted, signing back nonce\n"));
1570 base64<rsa_sha1_signature> sig;
1571 rsa_sha1_signature sig_raw;
1572 base64< arc4<rsa_priv_key> > our_priv;
1573 load_priv_key(app, app.signing_key, our_priv);
1574 make_signature(app.lua, app.signing_key, our_priv, nonce2(), sig);
1575 decode_base64(sig, sig_raw);
1576 queue_confirm_cmd(sig_raw());
1577 this->collection = collection;
1578 this->authenticated = true;
1579 this->role = source_role;
1580 return true;
1581}
1582
1583bool
1584session::process_auth_cmd(protocol_role role,
1585 string const & collection,
1586 id const & client,
1587 id const & nonce1,
1588 id const & nonce2,
1589 string const & signature)
1590{
1591 I(this->remote_peer_key_hash().size() == 0);
1592 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1593
1594 hexenc<id> hnonce1, hnonce2;
1595 encode_hexenc(nonce1, hnonce1);
1596 encode_hexenc(nonce2, hnonce2);
1597 hexenc<id> their_key_hash;
1598 encode_hexenc(client, their_key_hash);
1599
1600 L(F("received 'auth' netcmd from client '%s' for collection '%s' "
1601 "in %s mode with nonce1 '%s' and nonce2 '%s'\n")
1602 % their_key_hash % collection % (role == source_and_sink_role ? "source and sink" :
1603 (role == source_role ? "source " : "sink"))
1604 % hnonce1 % hnonce2);
1605
1606 // check that they replied with the nonce we asked for
1607 if (!(nonce1 == this->saved_nonce))
1608 {
1609 W(F("detected replay attack in auth netcmd\n"));
1610 this->saved_nonce = id("");
1611 return false;
1612 }
1613
1614 // check they're asking for a collection we're serving
1615 bool collection_ok = false;
1616 for (vector<utf8>::const_iterator i = collections.begin();
1617 i != collections.end(); ++i)
1618 {
1619 if (*i == collection)
1620 {
1621 collection_ok = true;
1622 break;
1623 }
1624 }
1625 if (!collection_ok)
1626 {
1627 W(F("not currently serving requested collection '%s'\n") % collection);
1628 this->saved_nonce = id("");
1629 return false;
1630 }
1631
1632 //
1633 // internally netsync thinks in terms of sources and sinks. users like
1634 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1635 //
1636 // we therefore use the read/write terminology when dealing with the UI:
1637 // if the user asks to run a "read only" service, this means they are
1638 // willing to be a source but not a sink.
1639 //
1640 // nb: the "role" here is the role the *client* wants to play
1641 // so we need to check that the opposite role is allowed for us,
1642 // in our this->role field.
1643 //
1644
1645 if (!app.db.public_key_exists(their_key_hash))
1646 {
1647 W(F("unknown key hash '%s'\n") % their_key_hash);
1648 this->saved_nonce = id("");
1649 return false;
1650 }
1651
1652 // get their public key
1653 rsa_keypair_id their_id;
1654 base64<rsa_pub_key> their_key;
1655 app.db.get_pubkey(their_key_hash, their_id, their_key);
1656
1657 if (role == sink_role || role == source_and_sink_role)
1658 {
1659 if (! ((this->role == source_role || this->role == source_and_sink_role)
1660 && app.lua.hook_get_netsync_read_permitted(collection,
1661 their_id())))
1662 {
1663 W(F("read permission denied for '%s'\n") % collection);
1664 this->saved_nonce = id("");
1665 return false;
1666 }
1667 }
1668
1669 if (role == source_role || role == source_and_sink_role)
1670 {
1671 if (! ((this->role == sink_role || this->role == source_and_sink_role)
1672 && app.lua.hook_get_netsync_write_permitted(collection,
1673 their_id())))
1674 {
1675 W(F("write permission denied for '%s'\n") % collection);
1676 this->saved_nonce = id("");
1677 return false;
1678 }
1679 }
1680
1681 // save their identity
1682 this->remote_peer_key_hash = client;
1683
1684 // check the signature
1685 base64<rsa_sha1_signature> sig;
1686 encode_base64(rsa_sha1_signature(signature), sig);
1687 if (check_signature(app.lua, their_id, their_key, nonce1(), sig))
1688 {
1689 // get our private key and sign back
1690 L(F("client signature OK, accepting authentication\n"));
1691 base64<rsa_sha1_signature> sig;
1692 rsa_sha1_signature sig_raw;
1693 base64< arc4<rsa_priv_key> > our_priv;
1694 load_priv_key(app, app.signing_key, our_priv);
1695 make_signature(app.lua, app.signing_key, our_priv, nonce2(), sig);
1696 decode_base64(sig, sig_raw);
1697 queue_confirm_cmd(sig_raw());
1698 this->collection = collection;
1699 this->authenticated = true;
1700 // assume the (possibly degraded) opposite role
1701 switch (role)
1702 {
1703 case source_role:
1704 I(this->role != source_role);
1705 this->role = sink_role;
1706 break;
1707 case source_and_sink_role:
1708 I(this->role == source_and_sink_role);
1709 break;
1710 case sink_role:
1711 I(this->role != sink_role);
1712 this->role = source_role;
1713 break;
1714 }
1715 return true;
1716 }
1717 else
1718 {
1719 W(F("bad client signature\n"));
1720 }
1721 return false;
1722}
1723
1724bool
1725session::process_confirm_cmd(string const & signature)
1726{
1727 I(this->remote_peer_key_hash().size() == constants::merkle_hash_length_in_bytes);
1728 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1729
1730 hexenc<id> their_key_hash;
1731 encode_hexenc(id(remote_peer_key_hash), their_key_hash);
1732
1733 // nb. this->role is our role, the server is in the opposite role
1734 L(F("received 'confirm' netcmd from server '%s' for collection '%s' in %s mode\n")
1735 % their_key_hash % this->collection % (this->role == source_and_sink_role ? "source and sink" :
1736 (this->role == source_role ? "sink" : "source")));
1737
1738 // check their signature
1739 if (app.db.public_key_exists(their_key_hash))
1740 {
1741 // get their public key and check the signature
1742 rsa_keypair_id their_id;
1743 base64<rsa_pub_key> their_key;
1744 app.db.get_pubkey(their_key_hash, their_id, their_key);
1745 base64<rsa_sha1_signature> sig;
1746 encode_base64(rsa_sha1_signature(signature), sig);
1747 if (check_signature(app.lua, their_id, their_key, this->saved_nonce(), sig))
1748 {
1749 L(F("server signature OK, accepting authentication\n"));
1750 this->authenticated = true;
1751
1752 merkle_ptr root;
1753 load_merkle_node(epoch_item, this->collection, 0, get_root_prefix().val, root);
1754 queue_refine_cmd(*root);
1755 queue_done_cmd(0, epoch_item);
1756
1757 load_merkle_node(key_item, this->collection, 0, get_root_prefix().val, root);
1758 queue_refine_cmd(*root);
1759 queue_done_cmd(0, key_item);
1760
1761 load_merkle_node(cert_item, this->collection, 0, get_root_prefix().val, root);
1762 queue_refine_cmd(*root);
1763 queue_done_cmd(0, cert_item);
1764 return true;
1765 }
1766 else
1767 {
1768 W(F("bad server signature\n"));
1769 }
1770 }
1771 else
1772 {
1773 W(F("unknown server key\n"));
1774 }
1775 return false;
1776}
1777
1778static bool
1779data_exists(netcmd_item_type type,
1780 id const & item,
1781 app_state & app)
1782{
1783 hexenc<id> hitem;
1784 encode_hexenc(item, hitem);
1785 switch (type)
1786 {
1787 case key_item:
1788 return app.db.public_key_exists(hitem);
1789 case manifest_item:
1790 return app.db.manifest_version_exists(manifest_id(hitem));
1791 case file_item:
1792 return app.db.file_version_exists(file_id(hitem));
1793 case revision_item:
1794 return app.db.revision_exists(revision_id(hitem));
1795 case cert_item:
1796 return app.db.revision_cert_exists(hitem);
1797 case epoch_item:
1798 return app.db.epoch_exists(epoch_id(hitem));
1799 }
1800 return false;
1801}
1802
1803static void
1804load_data(netcmd_item_type type,
1805 id const & item,
1806 app_state & app,
1807 string & out)
1808{
1809 string typestr;
1810 netcmd_item_type_to_string(type, typestr);
1811 hexenc<id> hitem;
1812 encode_hexenc(item, hitem);
1813 switch (type)
1814 {
1815 case epoch_item:
1816 if (app.db.epoch_exists(epoch_id(hitem)))
1817 {
1818 cert_value branch;
1819 epoch_data epoch;
1820 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1821 write_epoch(branch, epoch, out);
1822 }
1823 else
1824 {
1825 throw bad_decode(F("epoch with hash '%s' does not exist in our database")
1826 % hitem);
1827 }
1828 break;
1829 case key_item:
1830 if (app.db.public_key_exists(hitem))
1831 {
1832 rsa_keypair_id keyid;
1833 base64<rsa_pub_key> pub_encoded;
1834 app.db.get_pubkey(hitem, keyid, pub_encoded);
1835 L(F("public key '%s' is also called '%s'\n") % hitem % keyid);
1836 write_pubkey(keyid, pub_encoded, out);
1837 }
1838 else
1839 {
1840 throw bad_decode(F("public key '%s' does not exist in our database") % hitem);
1841 }
1842 break;
1843
1844 case revision_item:
1845 if (app.db.revision_exists(revision_id(hitem)))
1846 {
1847 revision_data mdat;
1848 data dat;
1849 app.db.get_revision(revision_id(hitem), mdat);
1850 unpack(mdat.inner(), dat);
1851 out = dat();
1852 }
1853 else
1854 {
1855 throw bad_decode(F("revision '%s' does not exist in our database") % hitem);
1856 }
1857 break;
1858
1859 case manifest_item:
1860 if (app.db.manifest_version_exists(manifest_id(hitem)))
1861 {
1862 manifest_data mdat;
1863 data dat;
1864 app.db.get_manifest_version(manifest_id(hitem), mdat);
1865 unpack(mdat.inner(), dat);
1866 out = dat();
1867 }
1868 else
1869 {
1870 throw bad_decode(F("manifest '%s' does not exist in our database") % hitem);
1871 }
1872 break;
1873
1874 case file_item:
1875 if (app.db.file_version_exists(file_id(hitem)))
1876 {
1877 file_data fdat;
1878 data dat;
1879 app.db.get_file_version(file_id(hitem), fdat);
1880 unpack(fdat.inner(), dat);
1881 out = dat();
1882 }
1883 else
1884 {
1885 throw bad_decode(F("file '%s' does not exist in our database") % hitem);
1886 }
1887 break;
1888
1889 case cert_item:
1890 if(app.db.revision_cert_exists(hitem))
1891 {
1892 revision<cert> c;
1893 app.db.get_revision_cert(hitem, c);
1894 string tmp;
1895 write_cert(c.inner(), out);
1896 }
1897 else
1898 {
1899 throw bad_decode(F("cert '%s' does not exist in our database") % hitem);
1900 }
1901 break;
1902 }
1903}
1904
1905
1906bool
1907session::process_refine_cmd(merkle_node const & their_node)
1908{
1909 prefix pref;
1910 hexenc<prefix> hpref;
1911 their_node.get_raw_prefix(pref);
1912 their_node.get_hex_prefix(hpref);
1913 string typestr;
1914
1915 netcmd_item_type_to_string(their_node.type, typestr);
1916 size_t lev = static_cast<size_t>(their_node.level);
1917
1918 L(F("received 'refine' netcmd on %s node '%s', level %d\n")
1919 % typestr % hpref % lev);
1920
1921 if (!merkle_node_exists(their_node.type, this->collection,
1922 their_node.level, pref))
1923 {
1924 L(F("no corresponding %s merkle node for prefix '%s', level %d\n")
1925 % typestr % hpref % lev);
1926
1927 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
1928 {
1929 switch (their_node.get_slot_state(slot))
1930 {
1931 case empty_state:
1932 {
1933 // we agree, this slot is empty
1934 L(F("(#0) they have an empty slot %d (in a %s node '%s', level %d, we do not have)\n")
1935 % slot % typestr % hpref % lev);
1936 continue;
1937 }
1938 break;
1939 case live_leaf_state:
1940 {
1941 // we want what *they* have
1942 id slotval;
1943 hexenc<id> hslotval;
1944 their_node.get_raw_slot(slot, slotval);
1945 their_node.get_hex_slot(slot, hslotval);
1946 L(F("(#0) they have a live leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
1947 % slot % typestr % hpref % lev);
1948 L(F("(#0) requesting their %s leaf %s\n") % typestr % hslotval);
1949 queue_send_data_cmd(their_node.type, slotval);
1950 }
1951 break;
1952 case dead_leaf_state:
1953 {
1954 // we cannot ask for what they have, it is dead
1955 L(F("(#0) they have a dead leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
1956 % slot % typestr % hpref % lev);
1957 continue;
1958 }
1959 break;
1960 case subtree_state:
1961 {
1962 // they have a subtree; might as well ask for that
1963 L(F("(#0) they have a subtree at slot %d (in a %s node '%s', level %d, we do not have)\n")
1964 % slot % typestr % hpref % lev);
1965 merkle_node our_fake_subtree;
1966 their_node.extended_prefix(slot, our_fake_subtree.pref);
1967 our_fake_subtree.level = their_node.level + 1;
1968 our_fake_subtree.type = their_node.type;
1969 queue_refine_cmd(our_fake_subtree);
1970 }
1971 break;
1972 }
1973 }
1974 }
1975 else
1976 {
1977 // we have a corresponding merkle node. there are 16 branches
1978 // to the following switch condition. it is awful. sorry.
1979 L(F("found corresponding %s merkle node for prefix '%s', level %d\n")
1980 % typestr % hpref % lev);
1981 merkle_ptr our_node;
1982 load_merkle_node(their_node.type, this->collection,
1983 their_node.level, pref, our_node);
1984 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
1985 {
1986 switch (their_node.get_slot_state(slot))
1987 {
1988 case empty_state:
1989 switch (our_node->get_slot_state(slot))
1990 {
1991
1992 case empty_state:
1993 // 1: theirs == empty, ours == empty
1994 L(F("(#1) they have an empty slot %d in %s node '%s', level %d, and so do we\n")
1995 % slot % typestr % hpref % lev);
1996 continue;
1997 break;
1998
1999 case live_leaf_state:
2000 // 2: theirs == empty, ours == live
2001 L(F("(#2) they have an empty slot %d in %s node '%s', level %d, we have a live leaf\n")
2002 % slot % typestr % hpref % lev);
2003 {
2004 I(their_node.type == our_node->type);
2005 string tmp;
2006 id slotval;
2007 our_node->get_raw_slot(slot, slotval);
2008 load_data(their_node.type, slotval, this->app, tmp);
2009 queue_data_cmd(their_node.type, slotval, tmp);
2010 }
2011 break;
2012
2013 case dead_leaf_state:
2014 // 3: theirs == empty, ours == dead
2015 L(F("(#3) they have an empty slot %d in %s node '%s', level %d, we have a dead leaf\n")
2016 % slot % typestr % hpref % lev);
2017 continue;
2018 break;
2019
2020 case subtree_state:
2021 // 4: theirs == empty, ours == subtree
2022 L(F("(#4) they have an empty slot %d in %s node '%s', level %d, we have a subtree\n")
2023 % slot % typestr % hpref % lev);
2024 {
2025 prefix subprefix;
2026 our_node->extended_raw_prefix(slot, subprefix);
2027 merkle_ptr our_subtree;
2028 I(our_node->type == their_node.type);
2029 load_merkle_node(their_node.type, this->collection,
2030 our_node->level + 1, subprefix, our_subtree);
2031 I(our_node->type == our_subtree->type);
2032 queue_refine_cmd(*our_subtree);
2033 }
2034 break;
2035
2036 }
2037 break;
2038
2039
2040 case live_leaf_state:
2041 switch (our_node->get_slot_state(slot))
2042 {
2043
2044 case empty_state:
2045 // 5: theirs == live, ours == empty
2046 L(F("(#5) they have a live leaf at slot %d in %s node '%s', level %d, we have nothing\n")
2047 % slot % typestr % hpref % lev);
2048 {
2049 id slotval;
2050 their_node.get_raw_slot(slot, slotval);
2051 queue_send_data_cmd(their_node.type, slotval);
2052 }
2053 break;
2054
2055 case live_leaf_state:
2056 // 6: theirs == live, ours == live
2057 L(F("(#6) they have a live leaf at slot %d in %s node '%s', and so do we\n")
2058 % slot % typestr % hpref);
2059 {
2060 id our_slotval, their_slotval;
2061 their_node.get_raw_slot(slot, their_slotval);
2062 our_node->get_raw_slot(slot, our_slotval);
2063 if (their_slotval == our_slotval)
2064 {
2065 hexenc<id> hslotval;
2066 their_node.get_hex_slot(slot, hslotval);
2067 L(F("(#6) we both have live %s leaf '%s'\n") % typestr % hslotval);
2068 continue;
2069 }
2070 else
2071 {
2072 I(their_node.type == our_node->type);
2073 string tmp;
2074 load_data(our_node->type, our_slotval, this->app, tmp);
2075 queue_send_data_cmd(their_node.type, their_slotval);
2076 queue_data_cmd(our_node->type, our_slotval, tmp);
2077 }
2078 }
2079 break;
2080
2081 case dead_leaf_state:
2082 // 7: theirs == live, ours == dead
2083 L(F("(#7) they have a live leaf at slot %d in %s node %s, level %d, we have a dead one\n")
2084 % slot % typestr % hpref % lev);
2085 {
2086 id our_slotval, their_slotval;
2087 our_node->get_raw_slot(slot, our_slotval);
2088 their_node.get_raw_slot(slot, their_slotval);
2089 if (their_slotval == our_slotval)
2090 {
2091 hexenc<id> hslotval;
2092 their_node.get_hex_slot(slot, hslotval);
2093 L(F("(#7) it's the same %s leaf '%s', but ours is dead\n")
2094 % typestr % hslotval);
2095 continue;
2096 }
2097 else
2098 {
2099 queue_send_data_cmd(their_node.type, their_slotval);
2100 }
2101 }
2102 break;
2103
2104 case subtree_state:
2105 // 8: theirs == live, ours == subtree
2106 L(F("(#8) they have a live leaf in slot %d of %s node '%s', level %d, we have a subtree\n")
2107 % slot % typestr % hpref % lev);
2108 {
2109
2110 id their_slotval;
2111 hexenc<id> their_hval;
2112 their_node.get_raw_slot(slot, their_slotval);
2113 encode_hexenc(their_slotval, their_hval);
2114 if (data_exists(their_node.type, their_slotval, app))
2115 L(F("(#8) we have a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n")
2116 % their_hval % slot % typestr % hpref % lev);
2117 else
2118 {
2119 L(F("(#8) requesting a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n")
2120 % their_hval % slot % typestr % hpref % lev);
2121 queue_send_data_cmd(their_node.type, their_slotval);
2122 }
2123
2124 L(F("(#8) sending our subtree for refinement, in slot %d of %s node '%s', level %d\n")
2125 % slot % typestr % hpref % lev);
2126 prefix subprefix;
2127 our_node->extended_raw_prefix(slot, subprefix);
2128 merkle_ptr our_subtree;
2129 load_merkle_node(our_node->type, this->collection,
2130 our_node->level + 1, subprefix, our_subtree);
2131 queue_refine_cmd(*our_subtree);
2132 }
2133 break;
2134 }
2135 break;
2136
2137
2138 case dead_leaf_state:
2139 switch (our_node->get_slot_state(slot))
2140 {
2141 case empty_state:
2142 // 9: theirs == dead, ours == empty
2143 L(F("(#9) they have a dead leaf at slot %d in %s node '%s', level %d, we have nothing\n")
2144 % slot % typestr % hpref % lev);
2145 continue;
2146 break;
2147
2148 case live_leaf_state:
2149 // 10: theirs == dead, ours == live
2150 L(F("(#10) they have a dead leaf at slot %d in %s node '%s', level %d, we have a live one\n")
2151 % slot % typestr % hpref % lev);
2152 {
2153 id our_slotval, their_slotval;
2154 their_node.get_raw_slot(slot, their_slotval);
2155 our_node->get_raw_slot(slot, our_slotval);
2156 hexenc<id> hslotval;
2157 our_node->get_hex_slot(slot, hslotval);
2158 if (their_slotval == our_slotval)
2159 {
2160 L(F("(#10) we both have %s leaf %s, theirs is dead\n")
2161 % typestr % hslotval);
2162 continue;
2163 }
2164 else
2165 {
2166 I(their_node.type == our_node->type);
2167 string tmp;
2168 load_data(our_node->type, our_slotval, this->app, tmp);
2169 queue_data_cmd(our_node->type, our_slotval, tmp);
2170 }
2171 }
2172 break;
2173
2174 case dead_leaf_state:
2175 // 11: theirs == dead, ours == dead
2176 L(F("(#11) they have a dead leaf at slot %d in %s node '%s', level %d, so do we\n")
2177 % slot % typestr % hpref % lev);
2178 continue;
2179 break;
2180
2181 case subtree_state:
2182 // theirs == dead, ours == subtree
2183 L(F("(#12) they have a dead leaf in slot %d of %s node '%s', we have a subtree\n")
2184 % slot % typestr % hpref % lev);
2185 {
2186 prefix subprefix;
2187 our_node->extended_raw_prefix(slot, subprefix);
2188 merkle_ptr our_subtree;
2189 load_merkle_node(our_node->type, this->collection,
2190 our_node->level + 1, subprefix, our_subtree);
2191 queue_refine_cmd(*our_subtree);
2192 }
2193 break;
2194 }
2195 break;
2196
2197
2198 case subtree_state:
2199 switch (our_node->get_slot_state(slot))
2200 {
2201 case empty_state:
2202 // 13: theirs == subtree, ours == empty
2203 L(F("(#13) they have a subtree at slot %d in %s node '%s', level %d, we have nothing\n")
2204 % slot % typestr % hpref % lev);
2205 {
2206 merkle_node our_fake_subtree;
2207 their_node.extended_prefix(slot, our_fake_subtree.pref);
2208 our_fake_subtree.level = their_node.level + 1;
2209 our_fake_subtree.type = their_node.type;
2210 queue_refine_cmd(our_fake_subtree);
2211 }
2212 break;
2213
2214 case live_leaf_state:
2215 // 14: theirs == subtree, ours == live
2216 L(F("(#14) they have a subtree at slot %d in %s node '%s', level %d, we have a live leaf\n")
2217 % slot % typestr % hpref % lev);
2218 {
2219 size_t subslot;
2220 id our_slotval;
2221 merkle_node our_fake_subtree;
2222 our_node->get_raw_slot(slot, our_slotval);
2223 hexenc<id> hslotval;
2224 encode_hexenc(our_slotval, hslotval);
2225
2226 pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot,
2227 our_fake_subtree.pref);
2228 L(F("(#14) pushed our leaf '%s' into fake subtree slot %d, level %d\n")
2229 % hslotval % subslot % (lev + 1));
2230 our_fake_subtree.type = their_node.type;
2231 our_fake_subtree.level = our_node->level + 1;
2232 our_fake_subtree.set_raw_slot(subslot, our_slotval);
2233 our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot));
2234 queue_refine_cmd(our_fake_subtree);
2235 }
2236 break;
2237
2238 case dead_leaf_state:
2239 // 15: theirs == subtree, ours == dead
2240 L(F("(#15) they have a subtree at slot %d in %s node '%s', level %d, we have a dead leaf\n")
2241 % slot % typestr % hpref % lev);
2242 {
2243 size_t subslot;
2244 id our_slotval;
2245 merkle_node our_fake_subtree;
2246 our_node->get_raw_slot(slot, our_slotval);
2247 pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot,
2248 our_fake_subtree.pref);
2249 our_fake_subtree.type = their_node.type;
2250 our_fake_subtree.level = our_node->level + 1;
2251 our_fake_subtree.set_raw_slot(subslot, our_slotval);
2252 our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot));
2253 queue_refine_cmd(our_fake_subtree);
2254 }
2255 break;
2256
2257 case subtree_state:
2258 // 16: theirs == subtree, ours == subtree
2259 L(F("(#16) they have a subtree at slot %d in %s node '%s', level %d, and so do we\n")
2260 % slot % typestr % hpref % lev);
2261 {
2262 id our_slotval, their_slotval;
2263 hexenc<id> hslotval;
2264 their_node.get_raw_slot(slot, their_slotval);
2265 our_node->get_raw_slot(slot, our_slotval);
2266 our_node->get_hex_slot(slot, hslotval);
2267 if (their_slotval == our_slotval)
2268 {
2269 L(F("(#16) we both have %s subtree '%s'\n") % typestr % hslotval);
2270 continue;
2271 }
2272 else
2273 {
2274 L(F("(#16) %s subtrees at slot %d differ, refining ours\n") % typestr % slot);
2275 prefix subprefix;
2276 our_node->extended_raw_prefix(slot, subprefix);
2277 merkle_ptr our_subtree;
2278 load_merkle_node(our_node->type, this->collection,
2279 our_node->level + 1, subprefix, our_subtree);
2280 queue_refine_cmd(*our_subtree);
2281 }
2282 }
2283 break;
2284 }
2285 break;
2286 }
2287 }
2288 }
2289 return true;
2290}
2291
2292
2293bool
2294session::process_send_data_cmd(netcmd_item_type type,
2295 id const & item)
2296{
2297 string typestr;
2298 netcmd_item_type_to_string(type, typestr);
2299 hexenc<id> hitem;
2300 encode_hexenc(item, hitem);
2301 L(F("received 'send_data' netcmd requesting %s '%s'\n")
2302 % typestr % hitem);
2303 if (data_exists(type, item, this->app))
2304 {
2305 string out;
2306 load_data(type, item, this->app, out);
2307 queue_data_cmd(type, item, out);
2308 }
2309 else
2310 {
2311 queue_nonexistant_cmd(type, item);
2312 }
2313 return true;
2314}
2315
2316bool
2317session::process_send_delta_cmd(netcmd_item_type type,
2318 id const & base,
2319 id const & ident)
2320{
2321 string typestr;
2322 netcmd_item_type_to_string(type, typestr);
2323 delta del;
2324
2325 hexenc<id> hbase, hident;
2326 encode_hexenc(base, hbase);
2327 encode_hexenc(ident, hident);
2328
2329 L(F("received 'send_delta' netcmd requesting %s edge '%s' -> '%s'\n")
2330 % typestr % hbase % hident);
2331
2332 switch (type)
2333 {
2334 case file_item:
2335 {
2336 file_id fbase(hbase), fident(hident);
2337 file_delta fdel;
2338 if (this->app.db.file_version_exists(fbase)
2339 && this->app.db.file_version_exists(fident))
2340 {
2341 file_data base_fdat, ident_fdat;
2342 data base_dat, ident_dat;
2343 this->app.db.get_file_version(fbase, base_fdat);
2344 this->app.db.get_file_version(fident, ident_fdat);
2345 string tmp;
2346 unpack(base_fdat.inner(), base_dat);
2347 unpack(ident_fdat.inner(), ident_dat);
2348 compute_delta(base_dat(), ident_dat(), tmp);
2349 del = delta(tmp);
2350 }
2351 else
2352 {
2353 return process_send_data_cmd(type, ident);
2354 }
2355 }
2356 break;
2357
2358 case manifest_item:
2359 {
2360 manifest_id mbase(hbase), mident(hident);
2361 manifest_delta mdel;
2362 if (this->app.db.manifest_version_exists(mbase)
2363 && this->app.db.manifest_version_exists(mident))
2364 {
2365 manifest_data base_mdat, ident_mdat;
2366 data base_dat, ident_dat;
2367 this->app.db.get_manifest_version(mbase, base_mdat);
2368 this->app.db.get_manifest_version(mident, ident_mdat);
2369 string tmp;
2370 unpack(base_mdat.inner(), base_dat);
2371 unpack(ident_mdat.inner(), ident_dat);
2372 compute_delta(base_dat(), ident_dat(), tmp);
2373 del = delta(tmp);
2374 }
2375 else
2376 {
2377 return process_send_data_cmd(type, ident);
2378 }
2379 }
2380 break;
2381
2382 default:
2383 throw bad_decode(F("delta requested for item type %s\n") % typestr);
2384 }
2385 queue_delta_cmd(type, base, ident, del);
2386 return true;
2387}
2388
2389bool
2390session::process_data_cmd(netcmd_item_type type,
2391 id const & item,
2392 string const & dat)
2393{
2394 hexenc<id> hitem;
2395 encode_hexenc(item, hitem);
2396
2397 // it's ok if we received something we didn't ask for; it might
2398 // be a spontaneous transmission from refinement
2399 note_item_arrived(type, item);
2400
2401 switch (type)
2402 {
2403 case epoch_item:
2404 if (this->app.db.epoch_exists(epoch_id(hitem)))
2405 {
2406 L(F("epoch '%s' already exists in our database\n") % hitem);
2407 }
2408 else
2409 {
2410 cert_value branch;
2411 epoch_data epoch;
2412 read_epoch(dat, branch, epoch);
2413 L(F("received epoch %s for branch %s\n") % epoch % branch);
2414 std::map<cert_value, epoch_data> epochs;
2415 app.db.get_epochs(epochs);
2416 std::map<cert_value, epoch_data>::const_iterator i;
2417 i = epochs.find(branch);
2418 if (i == epochs.end())
2419 {
2420 L(F("branch %s has no epoch; setting epoch to %s\n") % branch % epoch);
2421 app.db.set_epoch(branch, epoch);
2422 maybe_note_epochs_finished();
2423 }
2424 else
2425 {
2426 L(F("branch %s already has an epoch; checking\n") % branch);
2427 // if we get here, then we know that the epoch must be
2428 // different, because if it were the same then the
2429 // if(epoch_exists()) branch up above would have been taken. if
2430 // somehow this is wrong, then we have broken epoch hashing or
2431 // something, which is very dangerous, so play it safe...
2432 I(!(i->second == epoch));
2433
2434 // It is safe to call 'error' here, because if we get here,
2435 // then the current netcmd packet cannot possibly have
2436 // written anything to the database.
2437 error((F("Mismatched epoch on branch %s."
2438 " Server has '%s', client has '%s'.")
2439 % branch
2440 % (voice == server_voice ? i->second : epoch)
2441 % (voice == server_voice ? epoch : i->second)).str());
2442 }
2443 }
2444 break;
2445
2446 case key_item:
2447 if (this->app.db.public_key_exists(hitem))
2448 L(F("public key '%s' already exists in our database\n") % hitem);
2449 else
2450 {
2451 rsa_keypair_id keyid;
2452 base64<rsa_pub_key> pub;
2453 read_pubkey(dat, keyid, pub);
2454 hexenc<id> tmp;
2455 key_hash_code(keyid, pub, tmp);
2456 if (! (tmp == hitem))
2457 throw bad_decode(F("hash check failed for public key '%s' (%s);"
2458 " wanted '%s' got '%s'")
2459 % hitem % keyid % hitem % tmp);
2460 this->dbw.consume_public_key(keyid, pub);
2461 }
2462 break;
2463
2464 case cert_item:
2465 if (this->app.db.revision_cert_exists(hitem))
2466 L(F("cert '%s' already exists in our database\n") % hitem);
2467 else
2468 {
2469 cert c;
2470 read_cert(dat, c);
2471 hexenc<id> tmp;
2472 cert_hash_code(c, tmp);
2473 if (! (tmp == hitem))
2474 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
2475 this->dbw.consume_revision_cert(revision<cert>(c));
2476 if (!app.db.revision_exists(revision_id(c.ident)))
2477 {
2478 id rid;
2479 decode_hexenc(c.ident, rid);
2480 queue_send_data_cmd(revision_item, rid);
2481 }
2482 }
2483 break;
2484
2485 case revision_item:
2486 {
2487 revision_id rid(hitem);
2488 if (this->app.db.revision_exists(rid))
2489 L(F("revision '%s' already exists in our database\n") % hitem);
2490 else
2491 {
2492 L(F("received revision '%s' \n") % hitem);
2493 boost::shared_ptr< pair<revision_data, revision_set > >
2494 rp(new pair<revision_data, revision_set>());
2495
2496 base64< gzip<data> > packed;
2497 pack(data(dat), packed);
2498 rp->first = revision_data(packed);
2499 read_revision_set(dat, rp->second);
2500 ancestry.insert(std::make_pair(rid, rp));
2501 if (cert_refinement_done())
2502 {
2503 analyze_ancestry_graph();
2504 }
2505 }
2506 }
2507 break;
2508
2509 case manifest_item:
2510 {
2511 manifest_id mid(hitem);
2512 if (this->app.db.manifest_version_exists(mid))
2513 L(F("manifest version '%s' already exists in our database\n") % hitem);
2514 else
2515 {
2516 base64< gzip<data> > packed_dat;
2517 pack(data(dat), packed_dat);
2518 this->dbw.consume_manifest_data(mid, manifest_data(packed_dat));
2519 manifest_map man;
2520 read_manifest_map(data(dat), man);
2521 analyze_manifest(man);
2522 }
2523 }
2524 break;
2525
2526 case file_item:
2527 {
2528 file_id fid(hitem);
2529 if (this->app.db.file_version_exists(fid))
2530 L(F("file version '%s' already exists in our database\n") % hitem);
2531 else
2532 {
2533 base64< gzip<data> > packed_dat;
2534 pack(data(dat), packed_dat);
2535 this->dbw.consume_file_data(fid, file_data(packed_dat));
2536 }
2537 }
2538 break;
2539
2540 }
2541 return true;
2542}
2543
2544bool
2545session::process_delta_cmd(netcmd_item_type type,
2546 id const & base,
2547 id const & ident,
2548 delta const & del)
2549{
2550 string typestr;
2551 netcmd_item_type_to_string(type, typestr);
2552 hexenc<id> hbase, hident;
2553 encode_hexenc(base, hbase);
2554 encode_hexenc(ident, hident);
2555
2556 pair<id,id> id_pair = make_pair(base, ident);
2557
2558 // it's ok if we received something we didn't ask for; it might
2559 // be a spontaneous transmission from refinement
2560 // FIXME: what does the above comment mean? note_item_arrived does require
2561 // that the item passed to it have been requested...
2562 note_item_arrived(type, ident);
2563
2564 switch (type)
2565 {
2566 case manifest_item:
2567 {
2568 manifest_id src_manifest(hbase), dst_manifest(hident);
2569 base64< gzip<delta> > packed_del;
2570 pack(del, packed_del);
2571 if (reverse_delta_requests.find(id_pair)
2572 != reverse_delta_requests.end())
2573 {
2574 reverse_delta_requests.erase(id_pair);
2575 this->dbw.consume_manifest_reverse_delta(src_manifest,
2576 dst_manifest,
2577 manifest_delta(packed_del));
2578 }
2579 else
2580 this->dbw.consume_manifest_delta(src_manifest,
2581 dst_manifest,
2582 manifest_delta(packed_del));
2583
2584 }
2585 break;
2586
2587 case file_item:
2588 {
2589 file_id src_file(hbase), dst_file(hident);
2590 base64< gzip<delta> > packed_del;
2591 pack(del, packed_del);
2592 if (reverse_delta_requests.find(id_pair)
2593 != reverse_delta_requests.end())
2594 {
2595 reverse_delta_requests.erase(id_pair);
2596 this->dbw.consume_file_reverse_delta(src_file,
2597 dst_file,
2598 file_delta(packed_del));
2599 }
2600 else
2601 this->dbw.consume_file_delta(src_file,
2602 dst_file,
2603 file_delta(packed_del));
2604 }
2605 break;
2606
2607 default:
2608 L(F("ignoring delta received for item type %s\n") % typestr);
2609 break;
2610 }
2611 return true;
2612}
2613
2614bool
2615session::process_nonexistant_cmd(netcmd_item_type type,
2616 id const & item)
2617{
2618 string typestr;
2619 netcmd_item_type_to_string(type, typestr);
2620 hexenc<id> hitem;
2621 encode_hexenc(item, hitem);
2622 L(F("received 'nonexistant' netcmd for %s '%s'\n")
2623 % typestr % hitem);
2624 note_item_arrived(type, item);
2625 return true;
2626}
2627
2628bool
2629session::merkle_node_exists(netcmd_item_type type,
2630 utf8 const & collection,
2631 size_t level,
2632 prefix const & pref)
2633{
2634 map< std::pair<utf8, netcmd_item_type>,
2635 boost::shared_ptr<merkle_table> >::const_iterator i =
2636 merkle_tables.find(std::make_pair(collection,type));
2637
2638 I(i != merkle_tables.end());
2639 merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level));
2640 return (j != i->second->end());
2641}
2642
2643void
2644session::load_merkle_node(netcmd_item_type type,
2645 utf8 const & collection,
2646 size_t level,
2647 prefix const & pref,
2648 merkle_ptr & node)
2649{
2650 map< std::pair<utf8, netcmd_item_type>,
2651 boost::shared_ptr<merkle_table> >::const_iterator i =
2652 merkle_tables.find(std::make_pair(collection,type));
2653
2654 I(i != merkle_tables.end());
2655 merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level));
2656 I(j != i->second->end());
2657 node = j->second;
2658}
2659
2660
2661bool
2662session::dispatch_payload(netcmd const & cmd)
2663{
2664
2665 switch (cmd.cmd_code)
2666 {
2667
2668 case bye_cmd:
2669 return process_bye_cmd();
2670 break;
2671
2672 case error_cmd:
2673 {
2674 string errmsg;
2675 read_error_cmd_payload(cmd.payload, errmsg);
2676 return process_error_cmd(errmsg);
2677 }
2678 break;
2679
2680 case hello_cmd:
2681 require(! authenticated, "hello netcmd received when not authenticated");
2682 require(voice == client_voice, "hello netcmd received in client voice");
2683 {
2684 rsa_keypair_id server_keyname;
2685 rsa_pub_key server_key;
2686 id nonce;
2687 read_hello_cmd_payload(cmd.payload, server_keyname, server_key, nonce);
2688 return process_hello_cmd(server_keyname, server_key, nonce);
2689 }
2690 break;
2691
2692 case anonymous_cmd:
2693 require(! authenticated, "anonymous netcmd received when not authenticated");
2694 require(voice == server_voice, "anonymous netcmd received in server voice");
2695 require(role == source_role ||
2696 role == source_and_sink_role,
2697 "anonymous netcmd received in source or source/sink role");
2698 {
2699 protocol_role role;
2700 string collection;
2701 id nonce2;
2702 read_anonymous_cmd_payload(cmd.payload, role, collection, nonce2);
2703 return process_anonymous_cmd(role, collection, nonce2);
2704 }
2705 break;
2706
2707 case auth_cmd:
2708 require(! authenticated, "auth netcmd received when not authenticated");
2709 require(voice == server_voice, "auth netcmd received in server voice");
2710 {
2711 protocol_role role;
2712 string collection, signature;
2713 id client, nonce1, nonce2;
2714 read_auth_cmd_payload(cmd.payload, role, collection, client, nonce1, nonce2, signature);
2715 return process_auth_cmd(role, collection, client, nonce1, nonce2, signature);
2716 }
2717 break;
2718
2719 case confirm_cmd:
2720 require(! authenticated, "confirm netcmd received when not authenticated");
2721 require(voice == client_voice, "confirm netcmd received in client voice");
2722 {
2723 string signature;
2724 read_confirm_cmd_payload(cmd.payload, signature);
2725 return process_confirm_cmd(signature);
2726 }
2727 break;
2728
2729 case refine_cmd:
2730 require(authenticated, "refine netcmd received when authenticated");
2731 {
2732 merkle_node node;
2733 read_refine_cmd_payload(cmd.payload, node);
2734 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(node.type);
2735 require(i != done_refinements.end(), "refinement netcmd refers to valid type");
2736 require(i->second.tree_is_done == false, "refinement netcmd received when tree is live");
2737 i->second.current_level_had_refinements = true;
2738 return process_refine_cmd(node);
2739 }
2740 break;
2741
2742 case done_cmd:
2743 require(authenticated, "done netcmd received when authenticated");
2744 {
2745 size_t level;
2746 netcmd_item_type type;
2747 read_done_cmd_payload(cmd.payload, level, type);
2748 return process_done_cmd(level, type);
2749 }
2750 break;
2751
2752 case send_data_cmd:
2753 require(authenticated, "send_data netcmd received when authenticated");
2754 require(role == source_role ||
2755 role == source_and_sink_role,
2756 "send_data netcmd received in source or source/sink role");
2757 {
2758 netcmd_item_type type;
2759 id item;
2760 read_send_data_cmd_payload(cmd.payload, type, item);
2761 return process_send_data_cmd(type, item);
2762 }
2763 break;
2764
2765 case send_delta_cmd:
2766 require(authenticated, "send_delta netcmd received when authenticated");
2767 require(role == source_role ||
2768 role == source_and_sink_role,
2769 "send_delta netcmd received in source or source/sink role");
2770 {
2771 netcmd_item_type type;
2772 id base, ident;
2773 read_send_delta_cmd_payload(cmd.payload, type, base, ident);
2774 return process_send_delta_cmd(type, base, ident);
2775 }
2776
2777 case data_cmd:
2778 require(authenticated, "data netcmd received when authenticated");
2779 require(role == sink_role ||
2780 role == source_and_sink_role,
2781 "data netcmd received in source or source/sink role");
2782 {
2783 netcmd_item_type type;
2784 id item;
2785 string dat;
2786 read_data_cmd_payload(cmd.payload, type, item, dat);
2787 return process_data_cmd(type, item, dat);
2788 }
2789 break;
2790
2791 case delta_cmd:
2792 require(authenticated, "delta netcmd received when authenticated");
2793 require(role == sink_role ||
2794 role == source_and_sink_role,
2795 "delta netcmd received in source or source/sink role");
2796 {
2797 netcmd_item_type type;
2798 id base, ident;
2799 delta del;
2800 read_delta_cmd_payload(cmd.payload, type, base, ident, del);
2801 return process_delta_cmd(type, base, ident, del);
2802 }
2803 break;
2804
2805 case nonexistant_cmd:
2806 require(authenticated, "nonexistant netcmd received when authenticated");
2807 require(role == sink_role ||
2808 role == source_and_sink_role,
2809 "nonexistant netcmd received in sink or source/sink role");
2810 {
2811 netcmd_item_type type;
2812 id item;
2813 read_nonexistant_cmd_payload(cmd.payload, type, item);
2814 return process_nonexistant_cmd(type, item);
2815 }
2816 break;
2817 }
2818 return false;
2819}
2820
2821// this kicks off the whole cascade starting from "hello"
2822void
2823session::begin_service()
2824{
2825 base64<rsa_pub_key> pub_encoded;
2826 app.db.get_key(app.signing_key, pub_encoded);
2827 hexenc<id> keyhash;
2828 id keyhash_raw;
2829 key_hash_code(app.signing_key, pub_encoded, keyhash);
2830 decode_hexenc(keyhash, keyhash_raw);
2831 queue_hello_cmd(keyhash_raw(), mk_nonce());
2832}
2833
2834void
2835session::maybe_say_goodbye()
2836{
2837 if (done_all_refinements() &&
2838 got_all_data())
2839 queue_bye_cmd();
2840}
2841
2842bool
2843session::arm()
2844{
2845 if (!armed)
2846 {
2847 if (read_netcmd(inbuf, cmd))
2848 {
2849 inbuf.erase(0, cmd.encoded_size());
2850 armed = true;
2851 }
2852 }
2853 return armed;
2854}
2855
2856bool session::process()
2857{
2858 try
2859 {
2860 if (!arm())
2861 return true;
2862
2863 transaction_guard guard(app.db);
2864 armed = false;
2865 L(F("processing %d byte input buffer from peer %s\n") % inbuf.size() % peer_id);
2866 bool ret = dispatch_payload(cmd);
2867 if (inbuf.size() >= constants::netcmd_maxsz)
2868 W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id);
2869 guard.commit();
2870 maybe_say_goodbye();
2871 return ret;
2872 }
2873 catch (bad_decode & bd)
2874 {
2875 W(F("caught bad_decode exception processing peer %s: '%s'\n") % peer_id % bd.what);
2876 return false;
2877 }
2878}
2879
2880
2881static void
2882call_server(protocol_role role,
2883 vector<utf8> const & collections,
2884 set<string> const & all_collections,
2885 app_state & app,
2886 utf8 const & address,
2887 Netxx::port_type default_port,
2888 unsigned long timeout_seconds)
2889{
2890 Netxx::Probe probe;
2891 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2892
2893 // FIXME: split into labels and convert to ace here.
2894
2895 P(F("connecting to %s\n") % address());
2896 Netxx::Stream server(address().c_str(), default_port, timeout);
2897 session sess(role, client_voice, collections, all_collections, app,
2898 address(), server.get_socketfd(), timeout);
2899
2900 sess.byte_in_ticker.reset(new ticker("bytes in", ">", 256));
2901 sess.byte_out_ticker.reset(new ticker("bytes out", "<", 256));
2902 if (role == sink_role)
2903 {
2904 sess.cert_in_ticker.reset(new ticker("certs in", "c", 3));
2905 sess.revision_in_ticker.reset(new ticker("revs in", "r", 1));
2906 }
2907 else if (role == source_role)
2908 {
2909 sess.cert_out_ticker.reset(new ticker("certs out", "C", 3));
2910 sess.revision_out_ticker.reset(new ticker("revs out", "R", 1));
2911 }
2912 else
2913 {
2914 I(role == source_and_sink_role);
2915 sess.revision_in_ticker.reset(new ticker("revs in", "r", 1));
2916 sess.revision_out_ticker.reset(new ticker("revs out", "R", 1));
2917 }
2918
2919 while (true)
2920 {
2921 bool armed = false;
2922 try
2923 {
2924 armed = sess.arm();
2925 }
2926 catch (bad_decode & bd)
2927 {
2928 W(F("caught bad_decode exception decoding input from peer %s: '%s'\n")
2929 % sess.peer_id % bd.what);
2930 return;
2931 }
2932
2933 probe.clear();
2934 probe.add(sess.str, sess.which_events());
2935 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2936 Netxx::Probe::ready_type event = res.second;
2937 Netxx::socket_type fd = res.first;
2938
2939 if (fd == -1 && !armed)
2940 {
2941 P(F("timed out waiting for I/O with peer %s, disconnecting\n") % sess.peer_id);
2942 return;
2943 }
2944
2945 if (event & Netxx::Probe::ready_read)
2946 {
2947 if (sess.read_some())
2948 {
2949 try
2950 {
2951 armed = sess.arm();
2952 }
2953 catch (bad_decode & bd)
2954 {
2955 W(F("caught bad_decode exception decoding input from peer %s: '%s'\n")
2956 % sess.peer_id % bd.what);
2957 return;
2958 }
2959 }
2960 else
2961 {
2962 if (sess.sent_goodbye)
2963 P(F("read from fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
2964 else
2965 P(F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
2966 return;
2967 }
2968 }
2969
2970 if (event & Netxx::Probe::ready_write)
2971 {
2972 if (! sess.write_some())
2973 {
2974 if (sess.sent_goodbye)
2975 P(F("write on fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
2976 else
2977 P(F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
2978 return;
2979 }
2980 }
2981
2982 if (event & Netxx::Probe::ready_oobd)
2983 {
2984 P(F("got OOB data on fd %d (peer %s), disconnecting\n")
2985 % fd % sess.peer_id);
2986 return;
2987 }
2988
2989 if (armed)
2990 {
2991 if (!sess.process())
2992 {
2993 P(F("terminated exchange with %s\n")
2994 % sess.peer_id);
2995 return;
2996 }
2997 }
2998
2999 if (sess.sent_goodbye && sess.outbuf.empty() && sess.received_goodbye)
3000 {
3001 P(F("successful exchange with %s\n")
3002 % sess.peer_id);
3003 return;
3004 }
3005 }
3006}
3007
3008static void
3009arm_sessions_and_calculate_probe(Netxx::Probe & probe,
3010 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3011 set<Netxx::socket_type> & armed_sessions)
3012{
3013 set<Netxx::socket_type> arm_failed;
3014 for (map<Netxx::socket_type,
3015 shared_ptr<session> >::const_iterator i = sessions.begin();
3016 i != sessions.end(); ++i)
3017 {
3018 try
3019 {
3020 if (i->second->arm())
3021 {
3022 L(F("fd %d is armed\n") % i->first);
3023 armed_sessions.insert(i->first);
3024 }
3025 probe.add(i->second->str, i->second->which_events());
3026 }
3027 catch (bad_decode & bd)
3028 {
3029 W(F("caught bad_decode exception decoding input from peer %s: '%s', marking as bad\n")
3030 % i->second->peer_id % bd.what);
3031 arm_failed.insert(i->first);
3032 }
3033 }
3034 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
3035 i != arm_failed.end(); ++i)
3036 {
3037 sessions.erase(*i);
3038 }
3039}
3040
3041static void
3042handle_new_connection(Netxx::Address & addr,
3043 Netxx::StreamServer & server,
3044 Netxx::Timeout & timeout,
3045 protocol_role role,
3046 vector<utf8> const & collections,
3047 set<string> const & all_collections,
3048 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3049 app_state & app)
3050{
3051 L(F("accepting new connection on %s : %d\n")
3052 % addr.get_name() % addr.get_port());
3053 Netxx::Peer client = server.accept_connection();
3054
3055 if (!client)
3056 {
3057 L(F("accept() returned a dead client\n"));
3058 }
3059 else
3060 {
3061 P(F("accepted new client connection from %s\n") % client);
3062 shared_ptr<session> sess(new session(role, server_voice, collections,
3063 all_collections, app,
3064 lexical_cast<string>(client),
3065 client.get_socketfd(), timeout));
3066 sess->begin_service();
3067 sessions.insert(make_pair(client.get_socketfd(), sess));
3068 }
3069}
3070
3071static void
3072handle_read_available(Netxx::socket_type fd,
3073 shared_ptr<session> sess,
3074 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3075 set<Netxx::socket_type> & armed_sessions,
3076 bool & live_p)
3077{
3078 if (sess->read_some())
3079 {
3080 try
3081 {
3082 if (sess->arm())
3083 armed_sessions.insert(fd);
3084 }
3085 catch (bad_decode & bd)
3086 {
3087 W(F("caught bad_decode exception decoding input from peer %s: '%s', disconnecting\n")
3088 % sess->peer_id % bd.what);
3089 sessions.erase(fd);
3090 live_p = false;
3091 }
3092 }
3093 else
3094 {
3095 P(F("fd %d (peer %s) read failed, disconnecting\n")
3096 % fd % sess->peer_id);
3097 sessions.erase(fd);
3098 live_p = false;
3099 }
3100}
3101
3102
3103static void
3104handle_write_available(Netxx::socket_type fd,
3105 shared_ptr<session> sess,
3106 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3107 bool & live_p)
3108{
3109 if (! sess->write_some())
3110 {
3111 P(F("fd %d (peer %s) write failed, disconnecting\n")
3112 % fd % sess->peer_id);
3113 sessions.erase(fd);
3114 live_p = false;
3115 }
3116}
3117
3118static void
3119process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
3120 set<Netxx::socket_type> & armed_sessions)
3121{
3122 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
3123 i != armed_sessions.end(); ++i)
3124 {
3125 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
3126 j = sessions.find(*i);
3127 if (j == sessions.end())
3128 continue;
3129 else
3130 {
3131 Netxx::socket_type fd = j->first;
3132 shared_ptr<session> sess = j->second;
3133 if (!sess->process())
3134 {
3135 P(F("fd %d (peer %s) processing finished, disconnecting\n")
3136 % fd % sess->peer_id);
3137 sessions.erase(j);
3138 }
3139 }
3140 }
3141}
3142
3143static void
3144reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
3145 unsigned long timeout_seconds)
3146{
3147 // kill any clients which haven't done any i/o inside the timeout period
3148 // or who have said goodbye and flushed their output buffers
3149 set<Netxx::socket_type> dead_clients;
3150 time_t now = ::time(NULL);
3151 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.begin();
3152 i != sessions.end(); ++i)
3153 {
3154 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
3155 < static_cast<unsigned long>(now))
3156 {
3157 P(F("fd %d (peer %s) has been idle too long, disconnecting\n")
3158 % i->first % i->second->peer_id);
3159 dead_clients.insert(i->first);
3160 }
3161 if (i->second->sent_goodbye && i->second->outbuf.empty() && i->second->received_goodbye)
3162 {
3163 P(F("fd %d (peer %s) exchanged goodbyes and flushed output, disconnecting\n")
3164 % i->first % i->second->peer_id);
3165 dead_clients.insert(i->first);
3166 }
3167 }
3168 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
3169 i != dead_clients.end(); ++i)
3170 {
3171 sessions.erase(*i);
3172 }
3173}
3174
3175static void
3176serve_connections(protocol_role role,
3177 vector<utf8> const & collections,
3178 set<string> const & all_collections,
3179 app_state & app,
3180 utf8 const & address,
3181 Netxx::port_type default_port,
3182 unsigned long timeout_seconds,
3183 unsigned long session_limit)
3184{
3185 Netxx::Probe probe;
3186
3187 Netxx::Timeout
3188 forever,
3189 timeout(static_cast<long>(timeout_seconds)),
3190 instant(0,1);
3191
3192 Netxx::Address addr(address().c_str(), default_port, true);
3193
3194 P(F("beginning service on %s : %d\n")
3195 % addr.get_name() % addr.get_port());
3196
3197 Netxx::StreamServer server(addr, timeout);
3198
3199 map<Netxx::socket_type, shared_ptr<session> > sessions;
3200 set<Netxx::socket_type> armed_sessions;
3201
3202 while (true)
3203 {
3204 probe.clear();
3205 armed_sessions.clear();
3206
3207 if (sessions.size() >= session_limit)
3208 W(F("session limit %d reached, some connections will be refused\n") % session_limit);
3209 else
3210 probe.add(server);
3211
3212 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
3213
3214 L(F("i/o probe with %d armed\n") % armed_sessions.size());
3215 Netxx::Probe::result_type res = probe.ready(sessions.empty() ? forever
3216 : (armed_sessions.empty() ? timeout
3217 : instant));
3218 Netxx::Probe::ready_type event = res.second;
3219 Netxx::socket_type fd = res.first;
3220
3221 if (fd == -1)
3222 {
3223 if (armed_sessions.empty())
3224 L(F("timed out waiting for I/O (listening on %s : %d)\n")
3225 % addr.get_name() % addr.get_port());
3226 }
3227
3228 // we either got a new connection
3229 else if (fd == server)
3230 handle_new_connection(addr, server, timeout, role,
3231 collections, all_collections, sessions, app);
3232
3233 // or an existing session woke up
3234 else
3235 {
3236 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3237 i = sessions.find(fd);
3238 if (i == sessions.end())
3239 {
3240 L(F("got woken up for action on unknown fd %d\n") % fd);
3241 }
3242 else
3243 {
3244 shared_ptr<session> sess = i->second;
3245 bool live_p = true;
3246
3247 if (event & Netxx::Probe::ready_read)
3248 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3249
3250 if (live_p && (event & Netxx::Probe::ready_write))
3251 handle_write_available(fd, sess, sessions, live_p);
3252
3253 if (live_p && (event & Netxx::Probe::ready_oobd))
3254 {
3255 P(F("got some OOB data on fd %d (peer %s), disconnecting\n")
3256 % fd % sess->peer_id);
3257 sessions.erase(i);
3258 }
3259 }
3260 }
3261 process_armed_sessions(sessions, armed_sessions);
3262 reap_dead_sessions(sessions, timeout_seconds);
3263 }
3264}
3265
3266
3267/////////////////////////////////////////////////
3268//
3269// layer 4: monotone interface layer
3270//
3271/////////////////////////////////////////////////
3272
3273static boost::shared_ptr<merkle_table>
3274make_root_node(session & sess,
3275 utf8 const & coll,
3276 netcmd_item_type ty)
3277{
3278 boost::shared_ptr<merkle_table> tab =
3279 boost::shared_ptr<merkle_table>(new merkle_table());
3280
3281 merkle_ptr tmp = merkle_ptr(new merkle_node());
3282 tmp->type = ty;
3283
3284 tab->insert(std::make_pair(std::make_pair(get_root_prefix().val, 0), tmp));
3285
3286 sess.merkle_tables[std::make_pair(coll, ty)] = tab;
3287 return tab;
3288}
3289
3290
3291// BROKEN
3292void
3293session::load_epoch(cert_value const & branchname, epoch_id const & epoch)
3294{
3295 // hash is of concat(branch name, raw epoch id). This is unique, because
3296 // the latter has a fixed length.
3297 std::string tmp(branchname());
3298 id raw_epoch;
3299 decode_hexenc(epoch.inner(), raw_epoch);
3300 tmp += raw_epoch();
3301 data tdat(tmp);
3302 hexenc<id> out;
3303 calculate_ident(tdat, out);
3304 id raw_hash;
3305 decode_hexenc(out, raw_hash);
3306}
3307
3308void
3309session::rebuild_merkle_trees(app_state & app,
3310 utf8 const & collection)
3311{
3312 P(F("rebuilding merkle trees for collection %s\n") % collection);
3313
3314 boost::shared_ptr<merkle_table> ctab = make_root_node(*this, collection, cert_item);
3315 boost::shared_ptr<merkle_table> ktab = make_root_node(*this, collection, key_item);
3316 boost::shared_ptr<merkle_table> etab = make_root_node(*this, collection, epoch_item);
3317
3318 ticker certs_ticker("certs", "c", 256);
3319 ticker keys_ticker("keys", "k", 1);
3320
3321 set<revision_id> revision_ids;
3322 set<rsa_keypair_id> inserted_keys;
3323
3324 {
3325 // get all matching branch names
3326 vector< revision<cert> > certs;
3327 set<string> branchnames;
3328 app.db.get_revision_certs(branch_cert_name, certs);
3329 for (size_t i = 0; i < certs.size(); ++i)
3330 {
3331 cert_value name;
3332 decode_base64(idx(certs, i).inner().value, name);
3333 if (name().find(collection()) == 0)
3334 {
3335 if (branchnames.find(name()) == branchnames.end())
3336 P(F("including branch %s\n") % name());
3337 branchnames.insert(name());
3338 revision_ids.insert(revision_id(idx(certs, i).inner().ident));
3339 }
3340 }
3341
3342 {
3343 map<cert_value, epoch_data> epochs;
3344 app.db.get_epochs(epochs);
3345
3346 epoch_data epoch_zero(std::string(constants::epochlen, '0'));
3347 for (std::set<string>::const_iterator i = branchnames.begin();
3348 i != branchnames.end(); ++i)
3349 {
3350 cert_value branch(*i);
3351 std::map<cert_value, epoch_data>::const_iterator j;
3352 j = epochs.find(branch);
3353 // set to zero any epoch which is not yet set
3354 if (j == epochs.end())
3355 {
3356 L(F("setting epoch on %s to zero\n") % branch);
3357 epochs.insert(std::make_pair(branch, epoch_zero));
3358 app.db.set_epoch(branch, epoch_zero);
3359 }
3360 // then insert all epochs into merkle tree
3361 j = epochs.find(branch);
3362 I(j != epochs.end());
3363 epoch_id eid;
3364 epoch_hash_code(j->first, j->second, eid);
3365 id raw_hash;
3366 decode_hexenc(eid.inner(), raw_hash);
3367 insert_into_merkle_tree(*etab, epoch_item, true, raw_hash(), 0);
3368 }
3369 }
3370
3371 typedef std::vector< std::pair<hexenc<id>,
3372 std::pair<revision_id, rsa_keypair_id> > > cert_idx;
3373
3374 cert_idx idx;
3375 app.db.get_revision_cert_index(idx);
3376
3377 // insert all certs and keys reachable via these revisions
3378 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3379 {
3380 hexenc<id> const & hash = i->first;
3381 revision_id const & ident = i->second.first;
3382 rsa_keypair_id const & key = i->second.second;
3383
3384 if (revision_ids.find(ident) == revision_ids.end())
3385 continue;
3386
3387 id raw_hash;
3388 decode_hexenc(hash, raw_hash);
3389 insert_into_merkle_tree(*ctab, cert_item, true, raw_hash(), 0);
3390 ++certs_ticker;
3391 if (inserted_keys.find(key) == inserted_keys.end())
3392 {
3393 if (app.db.public_key_exists(key))
3394 {
3395 base64<rsa_pub_key> pub_encoded;
3396 app.db.get_key(key, pub_encoded);
3397 hexenc<id> keyhash;
3398 key_hash_code(key, pub_encoded, keyhash);
3399 decode_hexenc(keyhash, raw_hash);
3400 insert_into_merkle_tree(*ktab, key_item, true, raw_hash(), 0);
3401 ++keys_ticker;
3402 }
3403 inserted_keys.insert(key);
3404 }
3405 }
3406 }
3407
3408 recalculate_merkle_codes(*etab, get_root_prefix().val, 0);
3409 recalculate_merkle_codes(*ktab, get_root_prefix().val, 0);
3410 recalculate_merkle_codes(*ctab, get_root_prefix().val, 0);
3411}
3412
3413void
3414run_netsync_protocol(protocol_voice voice,
3415 protocol_role role,
3416 utf8 const & addr,
3417 vector<utf8> collections,
3418 app_state & app)
3419{
3420
3421 set<string> all_collections;
3422 for (vector<utf8>::const_iterator j = collections.begin();
3423 j != collections.end(); ++j)
3424 {
3425 all_collections.insert((*j)());
3426 }
3427
3428 vector< revision<cert> > certs;
3429 app.db.get_revision_certs(branch_cert_name, certs);
3430 for (vector< revision<cert> >::const_iterator i = certs.begin();
3431 i != certs.end(); ++i)
3432 {
3433 cert_value name;
3434 decode_base64(i->inner().value, name);
3435 for (vector<utf8>::const_iterator j = collections.begin();
3436 j != collections.end(); ++j)
3437 {
3438 if ((*j)().find(name()) == 0
3439 && all_collections.find(name()) == all_collections.end())
3440 {
3441 if (name() != (*j)())
3442 P(F("%s included in collection %s\n") % (*j) % name);
3443 all_collections.insert(name());
3444 }
3445 }
3446 }
3447
3448 try
3449 {
3450 start_platform_netsync();
3451 if (voice == server_voice)
3452 {
3453 serve_connections(role, collections, all_collections, app,
3454 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3455 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3456 static_cast<unsigned long>(constants::netsync_connection_limit));
3457 }
3458 else
3459 {
3460 I(voice == client_voice);
3461 transaction_guard guard(app.db);
3462 call_server(role, collections, all_collections, app,
3463 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3464 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3465 guard.commit();
3466 }
3467 }
3468 catch (Netxx::Exception & e)
3469 {
3470 end_platform_netsync();
3471 throw oops((F("trapped network exception: %s\n") % e.what()).str());;
3472 }
3473 end_platform_netsync();
3474}
3475

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status