monotone

monotone Mtn Source Tree

Root/netsync.cc

1// copyright (C) 2004 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6#include <map>
7#include <string>
8#include <memory>
9#include <list>
10#include <deque>
11#include <stack>
12
13#include <time.h>
14
15#include <boost/lexical_cast.hpp>
16#include <boost/scoped_ptr.hpp>
17#include <boost/shared_ptr.hpp>
18#include <boost/bind.hpp>
19#include <boost/regex.hpp>
20
21#include "app_state.hh"
22#include "cert.hh"
23#include "constants.hh"
24#include "keys.hh"
25#include "merkle_tree.hh"
26#include "netcmd.hh"
27#include "netio.hh"
28#include "netsync.hh"
29#include "numeric_vocab.hh"
30#include "packet.hh"
31#include "sanity.hh"
32#include "transforms.hh"
33#include "ui.hh"
34#include "xdelta.hh"
35#include "epoch.hh"
36#include "platform.hh"
37#include "hmac.hh"
38#include "globish.hh"
39
40#include "botan/botan.h"
41
42#include "netxx/address.h"
43#include "netxx/peer.h"
44#include "netxx/probe.h"
45#include "netxx/socket.h"
46#include "netxx/stream.h"
47#include "netxx/streamserver.h"
48#include "netxx/timeout.h"
49
50// TODO: things to do that will break protocol compatibility
51// -- need some way to upgrade anonymous to keyed pull, without user having
52// to explicitly specify which they want
53// just having a way to respond "access denied, try again" might work
54// but perhaps better to have the anonymous command include a note "I
55// _could_ use key <...> if you prefer", and if that would lead to more
56// access, could reply "I do prefer". (Does this lead to too much
57// information exposure? Allows anonymous people to probe what branches
58// a key has access to.)
59// -- "warning" packet type?
60// -- Richard Levitte wants, when you (e.g.) request '*' but don't access to
61// all of it, you just get the parts you have access to (maybe with
62// warnings about skipped branches). to do this right, should have a way
63// for the server to send back to the client "right, you're not getting
64// the following branches: ...", so the client will not include them in
65// its merkle trie.
66// -- add some sort of vhost field to the client's first packet, saying who
67// they expect to talk to
68// -- connection teardown is flawed:
69// -- simple bug: often connections "fail" even though they succeeded.
70// should figure out why. (Possibly one side doesn't wait for their
71// goodbye packet to drain before closing the socket?)
72// -- subtle misdesign: "goodbye" packets indicate completion of data
73// transfer. they do not indicate that data has been written to
74// disk. there should be some way to indicate that data has been
75// successfully written to disk. See message (and thread)
76// <E0420553-34F3-45E8-9DA4-D8A5CB9B0600@hsdev.com> on
77// monotone-devel.
78// -- apparently we have a IANA approved port: 4691. I guess we should
79// switch to using that.
80
81//
82// this is the "new" network synchronization (netsync) system in
83// monotone. it is based on synchronizing a pair of merkle trees over an
84// interactive connection.
85//
86// a netsync process between peers treats each peer as either a source, a
87// sink, or both. when a peer is only a source, it will not write any new
88// items to its database. when a peer is only a sink, it will not send any
89// items from its database. when a peer is both a source and sink, it may
90// send and write items freely.
91//
92// the post-state of a netsync is that each sink contains a superset of the
93// items in its corresponding source; when peers are behaving as both
94// source and sink, this means that the post-state of the sync is for the
95// peers to have identical item sets.
96//
97// a peer can be a sink in at most one netsync process at a time; it can
98// however be a source for multiple netsyncs simultaneously.
99//
100//
101// data structure
102// --------------
103//
104// each node in a merkle tree contains a fixed number of slots. this number
105// is derived from a global parameter of the protocol -- the tree fanout --
106// such that the number of slots is 2^fanout. for now we will assume that
107// fanout is 4 thus there are 16 slots in a node, because this makes
108// illustration easier. the other parameter of the protocol is the size of
109// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
110//
111// each slot in a merkle tree node is in one of 4 states:
112//
113// - empty
114// - live leaf
115// - dead leaf
116// - subtree
117//
118// in addition, each live or dead leaf contains a hash code which
119// identifies an element of the set being synchronized. each subtree slot
120// contains a hash code of the node immediately beneath it in the merkle
121// tree. empty slots contain no hash codes.
122//
123// each node also summarizes, for sake of statistic-gathering, the number
124// of set elements and total number of bytes in all of its subtrees, each
125// stored as a size_t and sent as a uleb128.
126//
127// since empty slots have no hash code, they are represented implicitly by
128// a bitmap at the head of each merkle tree node. as an additional
129// integrity check, each merkle tree node contains a label indicating its
130// prefix in the tree, and a hash of its own contents.
131//
132// in total, then, the byte-level representation of a <160,4> merkle tree
133// node is as follows:
134//
135// 20 bytes - hash of the remaining bytes in the node
136// 1 byte - type of this node (manifest, file, key, mcert, fcert)
137// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
138// 0-20 bytes - the prefix of this node, 4 bits * level,
139// rounded up to a byte
140// 1-N bytes - number of leaves under this node (uleb128)
141// 4 bytes - slot-state bitmap of the node
142// 0-320 bytes - between 0 and 16 live slots in the node
143//
144// so, in the worst case such a node is 367 bytes, with these parameters.
145//
146//
147// protocol
148// --------
149//
150// The protocol is a simple binary command-packet system over TCP;
151// each packet consists of a single byte which identifies the protocol
152// version, a byte which identifies the command name inside that
153// version, a size_t sent as a uleb128 indicating the length of the
154// packet, that many bytes of payload, and finally 20 bytes of SHA-1
155// HMAC calculated over the payload. The key for the SHA-1 HMAC is 20
156// bytes of 0 during authentication, and a 20-byte random key chosen
157// by the client after authentication (discussed below).
158// decoding involves simply buffering until a sufficient number of bytes are
159// received, then advancing the buffer pointer. any time an integrity check
160// (the HMAC) fails, the protocol is assumed to have lost synchronization, and
161// the connection is dropped. the parties are free to drop the tcp stream at
162// any point, if too much data is received or too much idle time passes; no
163// commitments or transactions are made.
164//
165// one special command, "bye", is used to shut down a connection
166// gracefully. once each side has received all the data they want, they
167// can send a "bye" command to the other side. as soon as either side has
168// both sent and received a "bye" command, they drop the connection. if
169// either side sees an i/o failure (dropped connection) after they have
170// sent a "bye" command, they consider the shutdown successful.
171//
172// the exchange begins in a non-authenticated state. the server sends a
173// "hello <id> <nonce>" command, which identifies the server's RSA key and
174// issues a nonce which must be used for a subsequent authentication.
175//
176// The client then responds with either:
177//
178// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
179// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
180// role it wishes to play in the synchronization, identifies the pattern it
181// wishes to sync with, signs the previous nonce with its own key, and informs
182// the server of the HMAC key it wishes to use for this session (encrypted
183// with the server's public key); or
184//
185// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
186// <hmac key>" command, which identifies the role it wishes to play in the
187// synchronization, the pattern it ishes to sync with, and the HMAC key it
188// wishes to use for this session (also encrypted with the server's public
189// key).
190//
191// The server then replies with a "confirm" command, which contains no
192// other data but will only have the correct HMAC integrity code if
193// the server received and properly decrypted the HMAC key offered by
194// the client. This transitions the peers into an authenticated state
195// and begins refinement.
196//
197// refinement begins with the client sending its root public key and
198// manifest certificate merkle nodes to the server. the server then
199// compares the root to each slot in *its* root node, and for each slot
200// either sends refined subtrees to the client, or (if it detects a missing
201// item in one pattern or the other) sends either "data" or "send_data"
202// commands corresponding to the role of the missing item (source or
203// sink). the client then receives each refined subtree and compares it
204// with its own, performing similar description/request behavior depending
205// on role, and the cycle continues.
206//
207// detecting the end of refinement is subtle: after sending the refinement
208// of the root node, the server sends a "done 0" command (queued behind all
209// the other refinement traffic). when either peer receives a "done N"
210// command it immediately responds with a "done N+1" command. when two done
211// commands for a given merkle tree arrive with no interveining refinements,
212// the entire merkle tree is considered complete.
213//
214// any "send_data" command received prompts a "data" command in response,
215// if the requested item exists. if an item does not exist, a "nonexistant"
216// response command is sent.
217//
218// once a response is received for each requested key and revision cert
219// (either data or nonexistant) the requesting party walks the graph of
220// received revision certs and transmits send_data or send_delta commands
221// for all the revisions mentionned in the certs which it does not already
222// have in its database.
223//
224// for each revision it receives, the recipient requests all the file data or
225// deltas described in that revision.
226//
227// once all requested files, manifests, revisions and certs are received (or
228// noted as nonexistant), the recipient closes its connection.
229//
230// (aside: this protocol is raw binary because coding density is actually
231// important here, and each packet consists of very information-dense
232// material that you wouldn't have a hope of typing in manually anyways)
233//
234
235using namespace std;
236using boost::shared_ptr;
237using boost::lexical_cast;
238
239static inline void
240require(bool check, string const & context)
241{
242 if (!check)
243 throw bad_decode(F("check of '%s' failed") % context);
244}
245
246struct
247done_marker
248{
249 bool current_level_had_refinements;
250 bool tree_is_done;
251 done_marker() :
252 current_level_had_refinements(false),
253 tree_is_done(false)
254 {}
255};
256
257struct
258session
259{
260 protocol_role role;
261 protocol_voice const voice;
262 utf8 const & our_include_pattern;
263 utf8 const & our_exclude_pattern;
264 globish_matcher our_matcher;
265 app_state & app;
266
267 string peer_id;
268 Netxx::socket_type fd;
269 Netxx::Stream str;
270
271 string_queue inbuf;
272 // deque of pair<string data, size_t cur_pos>
273 deque< pair<string,size_t> > outbuf;
274 // the total data stored in outbuf - this is
275 // used as a valve to stop too much data
276 // backing up
277 size_t outbuf_size;
278
279 netcmd cmd;
280 bool armed;
281 bool arm();
282
283 id remote_peer_key_hash;
284 rsa_keypair_id remote_peer_key_name;
285 netsync_session_key session_key;
286 chained_hmac read_hmac;
287 chained_hmac write_hmac;
288 bool authenticated;
289
290 time_t last_io_time;
291 auto_ptr<ticker> byte_in_ticker;
292 auto_ptr<ticker> byte_out_ticker;
293 auto_ptr<ticker> cert_in_ticker;
294 auto_ptr<ticker> cert_out_ticker;
295 auto_ptr<ticker> revision_in_ticker;
296 auto_ptr<ticker> revision_out_ticker;
297 auto_ptr<ticker> revision_checked_ticker;
298
299 vector<revision_id> written_revisions;
300 vector<rsa_keypair_id> written_keys;
301 vector<cert> written_certs;
302
303 map<netcmd_item_type, boost::shared_ptr<merkle_table> > merkle_tables;
304
305 map<netcmd_item_type, done_marker> done_refinements;
306 map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items;
307 map<netcmd_item_type, boost::shared_ptr< set<id> > > received_items;
308 map<netcmd_item_type, boost::shared_ptr< set<id> > > full_delta_items;
309 map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry;
310 map<revision_id, map<cert_name, vector<cert> > > received_certs;
311 set< pair<id, id> > reverse_delta_requests;
312 bool analyzed_ancestry;
313
314 id saved_nonce;
315 bool received_goodbye;
316 bool sent_goodbye;
317
318 packet_db_valve dbw;
319
320 session(protocol_role role,
321 protocol_voice voice,
322 utf8 const & our_include_pattern,
323 utf8 const & our_exclude_pattern,
324 app_state & app,
325 string const & peer,
326 Netxx::socket_type sock,
327 Netxx::Timeout const & to);
328
329 virtual ~session();
330
331 void rev_written_callback(revision_id rid);
332 void key_written_callback(rsa_keypair_id kid);
333 void cert_written_callback(cert const & c);
334
335 id mk_nonce();
336 void mark_recent_io();
337
338 void set_session_key(string const & key);
339 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
340
341 void setup_client_tickers();
342
343 bool done_all_refinements();
344 bool cert_refinement_done();
345 bool all_requested_revisions_received();
346
347 void note_item_requested(netcmd_item_type ty, id const & i);
348 void note_item_full_delta(netcmd_item_type ty, id const & ident);
349 bool item_already_requested(netcmd_item_type ty, id const & i);
350 void note_item_arrived(netcmd_item_type ty, id const & i);
351
352 void maybe_note_epochs_finished();
353
354 void note_item_sent(netcmd_item_type ty, id const & i);
355
356 bool got_all_data();
357 void maybe_say_goodbye();
358
359 void get_heads_and_consume_certs(set<revision_id> & heads);
360
361 void analyze_ancestry_graph();
362 void analyze_manifest(manifest_map const & man);
363
364 Netxx::Probe::ready_type which_events() const;
365 bool read_some();
366 bool write_some();
367
368 bool encountered_error;
369 void error(string const & errmsg);
370
371 void write_netcmd_and_try_flush(netcmd const & cmd);
372 void queue_bye_cmd();
373 void queue_error_cmd(string const & errmsg);
374 void queue_done_cmd(size_t level, netcmd_item_type type);
375 void queue_hello_cmd(rsa_keypair_id const & key_name,
376 base64<rsa_pub_key> const & pub_encoded,
377 id const & nonce);
378 void queue_anonymous_cmd(protocol_role role,
379 utf8 const & include_pattern,
380 utf8 const & exclude_pattern,
381 id const & nonce2,
382 base64<rsa_pub_key> server_key_encoded);
383 void queue_auth_cmd(protocol_role role,
384 utf8 const & include_pattern,
385 utf8 const & exclude_pattern,
386 id const & client,
387 id const & nonce1,
388 id const & nonce2,
389 string const & signature,
390 base64<rsa_pub_key> server_key_encoded);
391 void queue_confirm_cmd();
392 void queue_refine_cmd(merkle_node const & node);
393 void queue_send_data_cmd(netcmd_item_type type,
394 id const & item);
395 void queue_send_delta_cmd(netcmd_item_type type,
396 id const & base,
397 id const & ident);
398 void queue_data_cmd(netcmd_item_type type,
399 id const & item,
400 string const & dat);
401 void queue_delta_cmd(netcmd_item_type type,
402 id const & base,
403 id const & ident,
404 delta const & del);
405 void queue_nonexistant_cmd(netcmd_item_type type,
406 id const & item);
407
408 bool process_bye_cmd();
409 bool process_error_cmd(string const & errmsg);
410 bool process_done_cmd(size_t level, netcmd_item_type type);
411 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
412 rsa_pub_key const & server_key,
413 id const & nonce);
414 bool process_anonymous_cmd(protocol_role role,
415 utf8 const & their_include_pattern,
416 utf8 const & their_exclude_pattern);
417 bool process_auth_cmd(protocol_role role,
418 utf8 const & their_include_pattern,
419 utf8 const & their_exclude_pattern,
420 id const & client,
421 id const & nonce1,
422 string const & signature);
423 bool process_confirm_cmd(string const & signature);
424 void respond_to_confirm_cmd();
425 bool process_refine_cmd(merkle_node const & node);
426 bool process_send_data_cmd(netcmd_item_type type,
427 id const & item);
428 bool process_send_delta_cmd(netcmd_item_type type,
429 id const & base,
430 id const & ident);
431 bool process_data_cmd(netcmd_item_type type,
432 id const & item,
433 string const & dat);
434 bool process_delta_cmd(netcmd_item_type type,
435 id const & base,
436 id const & ident,
437 delta const & del);
438 bool process_nonexistant_cmd(netcmd_item_type type,
439 id const & item);
440 bool process_usher_cmd(utf8 const & msg);
441
442 bool merkle_node_exists(netcmd_item_type type,
443 size_t level,
444 prefix const & pref);
445
446 void load_merkle_node(netcmd_item_type type,
447 size_t level,
448 prefix const & pref,
449 merkle_ptr & node);
450
451 void rebuild_merkle_trees(app_state & app,
452 set<utf8> const & branches);
453
454 bool dispatch_payload(netcmd const & cmd);
455 void begin_service();
456 bool process();
457};
458
459struct
460ancestry_fetcher
461{
462 session & sess;
463
464 // map children to parents
465 multimap< file_id, file_id > rev_file_deltas;
466 multimap< manifest_id, manifest_id > rev_manifest_deltas;
467 // map an ancestor to a child
468 multimap< file_id, file_id > fwd_file_deltas;
469 multimap< manifest_id, manifest_id > fwd_manifest_deltas;
470
471 set< file_id > seen_files;
472
473 ancestry_fetcher(session & s);
474 // analysing the ancestry graph
475 void traverse_files(change_set const & cset);
476 void traverse_manifest(manifest_id const & child_man,
477 manifest_id const & parent_man);
478 void traverse_ancestry(set<revision_id> const & heads);
479
480 // requesting the data
481 void request_rev_file_deltas(file_id const & start,
482 set<file_id> & done_files);
483 void request_files();
484 void request_rev_manifest_deltas(manifest_id const & start,
485 set<manifest_id> & done_manifests);
486 void request_manifests();
487
488
489};
490
491
492struct
493root_prefix
494{
495 prefix val;
496 root_prefix() : val("")
497 {}
498};
499
500static root_prefix const &
501get_root_prefix()
502{
503 // this is not a static variable for a bizarre reason: mac OSX runs
504 // static initializers in the "wrong" order (application before
505 // libraries), so the initializer for a static string in cryptopp runs
506 // after the initializer for a static variable outside a function
507 // here. therefore encode_hexenc() fails in the static initializer here
508 // and the program crashes. curious, eh?
509 static root_prefix ROOT_PREFIX;
510 return ROOT_PREFIX;
511}
512
513static file_id null_ident;
514
515session::session(protocol_role role,
516 protocol_voice voice,
517 utf8 const & our_include_pattern,
518 utf8 const & our_exclude_pattern,
519 app_state & app,
520 string const & peer,
521 Netxx::socket_type sock,
522 Netxx::Timeout const & to) :
523 role(role),
524 voice(voice),
525 our_include_pattern(our_include_pattern),
526 our_exclude_pattern(our_exclude_pattern),
527 our_matcher(our_include_pattern, our_exclude_pattern),
528 app(app),
529 peer_id(peer),
530 fd(sock),
531 str(sock, to),
532 inbuf(),
533 outbuf_size(0),
534 armed(false),
535 remote_peer_key_hash(""),
536 remote_peer_key_name(""),
537 session_key(constants::netsync_key_initializer),
538 read_hmac(constants::netsync_key_initializer),
539 write_hmac(constants::netsync_key_initializer),
540 authenticated(false),
541 last_io_time(::time(NULL)),
542 byte_in_ticker(NULL),
543 byte_out_ticker(NULL),
544 cert_in_ticker(NULL),
545 cert_out_ticker(NULL),
546 revision_in_ticker(NULL),
547 revision_out_ticker(NULL),
548 revision_checked_ticker(NULL),
549 analyzed_ancestry(false),
550 saved_nonce(""),
551 received_goodbye(false),
552 sent_goodbye(false),
553 dbw(app, true),
554 encountered_error(false)
555{
556 dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
557 this, _1));
558 dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
559 this, _1));
560 dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
561 this, _1));
562
563 done_refinements.insert(make_pair(cert_item, done_marker()));
564 done_refinements.insert(make_pair(key_item, done_marker()));
565 done_refinements.insert(make_pair(epoch_item, done_marker()));
566
567 requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
568 requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
569 requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
570 requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
571 requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
572 requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
573
574 received_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
575 received_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
576 received_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
577 received_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
578 received_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
579 received_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
580
581 full_delta_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
582 full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
583}
584
585session::~session()
586{
587 vector<cert> unattached_certs;
588 map<revision_id, vector<cert> > revcerts;
589 for (vector<revision_id>::iterator i = written_revisions.begin();
590 i != written_revisions.end(); ++i)
591 revcerts.insert(make_pair(*i, vector<cert>()));
592 for (vector<cert>::iterator i = written_certs.begin();
593 i != written_certs.end(); ++i)
594 {
595 map<revision_id, vector<cert> >::iterator j;
596 j = revcerts.find(i->ident);
597 if (j == revcerts.end())
598 unattached_certs.push_back(*i);
599 else
600 j->second.push_back(*i);
601 }
602
603 //Keys
604 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
605 i != written_keys.end(); ++i)
606 {
607 app.lua.hook_note_netsync_pubkey_received(*i);
608 }
609 //Revisions
610 for (vector<revision_id>::iterator i = written_revisions.begin();
611 i != written_revisions.end(); ++i)
612 {
613 vector<cert> & ctmp(revcerts[*i]);
614 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
615 for (vector<cert>::const_iterator j = ctmp.begin();
616 j != ctmp.end(); ++j)
617 {
618 cert_value vtmp;
619 decode_base64(j->value, vtmp);
620 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
621 }
622 revision_data rdat;
623 app.db.get_revision(*i, rdat);
624 app.lua.hook_note_netsync_revision_received(*i, rdat, certs);
625 }
626 //Certs (not attached to a new revision)
627 for (vector<cert>::iterator i = unattached_certs.begin();
628 i != unattached_certs.end(); ++i)
629 {
630 cert_value tmp;
631 decode_base64(i->value, tmp);
632 app.lua.hook_note_netsync_cert_received(i->ident, i->key,
633 i->name, tmp);
634
635 }
636}
637
638void session::rev_written_callback(revision_id rid)
639{
640 if (revision_checked_ticker.get())
641 ++(*revision_checked_ticker);
642 written_revisions.push_back(rid);
643}
644
645void session::key_written_callback(rsa_keypair_id kid)
646{
647 written_keys.push_back(kid);
648}
649
650void session::cert_written_callback(cert const & c)
651{
652 written_certs.push_back(c);
653}
654
655id
656session::mk_nonce()
657{
658 I(this->saved_nonce().size() == 0);
659 char buf[constants::merkle_hash_length_in_bytes];
660 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
661 constants::merkle_hash_length_in_bytes);
662 this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
663 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
664 return this->saved_nonce;
665}
666
667void
668session::mark_recent_io()
669{
670 last_io_time = ::time(NULL);
671}
672
673void
674session::set_session_key(string const & key)
675{
676 session_key = netsync_session_key(key);
677 read_hmac.set_key(session_key);
678 write_hmac.set_key(session_key);
679}
680
681void
682session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
683{
684 keypair our_kp;
685 load_key_pair(app, app.signing_key, our_kp);
686 string hmac_key;
687 decrypt_rsa(app.lua, app.signing_key, our_kp.priv,
688 hmac_key_encrypted, hmac_key);
689 set_session_key(hmac_key);
690}
691
692void
693session::setup_client_tickers()
694{
695 // xgettext: please use short message and try to avoid multibytes chars
696 byte_in_ticker.reset(new ticker(_("bytes in"), ">", 1024, true));
697 // xgettext: please use short message and try to avoid multibytes chars
698 byte_out_ticker.reset(new ticker(_("bytes out"), "<", 1024, true));
699 if (role == sink_role)
700 {
701 // xgettext: please use short message and try to avoid multibytes chars
702 revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1));
703 // xgettext: please use short message and try to avoid multibytes chars
704 cert_in_ticker.reset(new ticker(_("certs in"), "c", 3));
705 // xgettext: please use short message and try to avoid multibytes chars
706 revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
707 }
708 else if (role == source_role)
709 {
710 // xgettext: please use short message and try to avoid multibytes chars
711 cert_out_ticker.reset(new ticker(_("certs out"), "C", 3));
712 // xgettext: please use short message and try to avoid multibytes chars
713 revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
714 }
715 else
716 {
717 I(role == source_and_sink_role);
718 // xgettext: please use short message and try to avoid multibytes chars
719 revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1));
720 // xgettext: please use short message and try to avoid multibytes chars
721 revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
722 // xgettext: please use short message and try to avoid multibytes chars
723 revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
724 }
725}
726
727bool
728session::done_all_refinements()
729{
730 bool all = true;
731 for (map<netcmd_item_type, done_marker>::const_iterator j =
732 done_refinements.begin(); j != done_refinements.end(); ++j)
733 {
734 if (j->second.tree_is_done == false)
735 all = false;
736 }
737 return all;
738}
739
740
741bool
742session::cert_refinement_done()
743{
744 return done_refinements[cert_item].tree_is_done;
745}
746
747bool
748session::got_all_data()
749{
750 for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i =
751 requested_items.begin(); i != requested_items.end(); ++i)
752 {
753 if (! i->second->empty())
754 return false;
755 }
756 return true;
757}
758
759bool
760session::all_requested_revisions_received()
761{
762 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
763 i = requested_items.find(revision_item);
764 I(i != requested_items.end());
765 return i->second->empty();
766}
767
768void
769session::maybe_note_epochs_finished()
770{
771 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
772 i = requested_items.find(epoch_item);
773 I(i != requested_items.end());
774 // Maybe there are outstanding epoch requests.
775 if (!i->second->empty())
776 return;
777 // And maybe we haven't even finished the refinement.
778 if (!done_refinements[epoch_item].tree_is_done)
779 return;
780 // But otherwise, we're ready to go!
781 L(F("all epochs processed, opening database valve\n"));
782 this->dbw.open_valve();
783}
784
785void
786session::note_item_requested(netcmd_item_type ty, id const & ident)
787{
788 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
789 i = requested_items.find(ty);
790 I(i != requested_items.end());
791 i->second->insert(ident);
792}
793
794void
795session::note_item_full_delta(netcmd_item_type ty, id const & ident)
796{
797 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
798 i = full_delta_items.find(ty);
799 I(i != full_delta_items.end());
800 i->second->insert(ident);
801}
802
803void
804session::note_item_arrived(netcmd_item_type ty, id const & ident)
805{
806 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
807 i = requested_items.find(ty);
808 I(i != requested_items.end());
809 i->second->erase(ident);
810 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
811 j = received_items.find(ty);
812 I(j != received_items.end());
813 j->second->insert(ident);
814
815
816 switch (ty)
817 {
818 case cert_item:
819 if (cert_in_ticker.get() != NULL)
820 ++(*cert_in_ticker);
821 break;
822 case revision_item:
823 if (revision_in_ticker.get() != NULL)
824 ++(*revision_in_ticker);
825 break;
826 default:
827 // No ticker for other things.
828 break;
829 }
830}
831
832bool
833session::item_already_requested(netcmd_item_type ty, id const & ident)
834{
835 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i;
836 i = requested_items.find(ty);
837 I(i != requested_items.end());
838 if (i->second->find(ident) != i->second->end())
839 return true;
840 i = received_items.find(ty);
841 I(i != received_items.end());
842 if (i->second->find(ident) != i->second->end())
843 return true;
844 return false;
845}
846
847
848void
849session::note_item_sent(netcmd_item_type ty, id const & ident)
850{
851 switch (ty)
852 {
853 case cert_item:
854 if (cert_out_ticker.get() != NULL)
855 ++(*cert_out_ticker);
856 break;
857 case revision_item:
858 if (revision_out_ticker.get() != NULL)
859 ++(*revision_out_ticker);
860 break;
861 default:
862 // No ticker for other things.
863 break;
864 }
865}
866
867void
868session::write_netcmd_and_try_flush(netcmd const & cmd)
869{
870 if (!encountered_error)
871 {
872 string buf;
873 cmd.write(buf, write_hmac);
874 outbuf.push_back(make_pair(buf, 0));
875 outbuf_size += buf.size();
876 }
877 else
878 L(F("dropping outgoing netcmd (because we're in error unwind mode)\n"));
879 // FIXME: this helps keep the protocol pipeline full but it seems to
880 // interfere with initial and final sequences. careful with it.
881 // write_some();
882 // read_some();
883}
884
885// This method triggers a special "error unwind" mode to netsync. In this
886// mode, all received data is ignored, and no new data is queued. We simply
887// stay connected long enough for the current write buffer to be flushed, to
888// ensure that our peer receives the error message.
889// WARNING WARNING WARNING (FIXME): this does _not_ throw an exception. if
890// while processing any given netcmd packet you encounter an error, you can
891// _only_ call this method if you have not touched the database, because if
892// you have touched the database then you need to throw an exception to
893// trigger a rollback.
894// you could, of course, call this method and then throw an exception, but
895// there is no point in doing that, because throwing the exception will cause
896// the connection to be immediately terminated, so your call to error() will
897// actually have no effect (except to cause your error message to be printed
898// twice).
899void
900session::error(std::string const & errmsg)
901{
902 W(F("error: %s\n") % errmsg);
903 queue_error_cmd(errmsg);
904 encountered_error = true;
905}
906
907void
908session::analyze_manifest(manifest_map const & man)
909{
910 L(F("analyzing %d entries in manifest\n") % man.size());
911 for (manifest_map::const_iterator i = man.begin();
912 i != man.end(); ++i)
913 {
914 if (! this->app.db.file_version_exists(manifest_entry_id(i)))
915 {
916 id tmp;
917 decode_hexenc(manifest_entry_id(i).inner(), tmp);
918 queue_send_data_cmd(file_item, tmp);
919 }
920 }
921}
922
923inline static id
924plain_id(manifest_id const & i)
925{
926 id tmp;
927 hexenc<id> htmp(i.inner());
928 decode_hexenc(htmp, tmp);
929 return tmp;
930}
931
932inline static id
933plain_id(file_id const & i)
934{
935 id tmp;
936 hexenc<id> htmp(i.inner());
937 decode_hexenc(htmp, tmp);
938 return tmp;
939}
940
941void
942session::get_heads_and_consume_certs( set<revision_id> & heads )
943{
944 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
945 typedef map<cert_name, vector<cert> > cert_map;
946
947 set<revision_id> nodes, parents;
948 map<revision_id, int> chld_num;
949 L(F("analyzing %d ancestry edges\n") % ancestry.size());
950
951 for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
952 {
953 nodes.insert(i->first);
954 for (edge_map::const_iterator j = i->second->second.edges.begin();
955 j != i->second->second.edges.end(); ++j)
956 {
957 parents.insert(edge_old_revision(j));
958 map<revision_id, int>::iterator n;
959 n = chld_num.find(edge_old_revision(j));
960 if (n == chld_num.end())
961 chld_num.insert(make_pair(edge_old_revision(j), 1));
962 else
963 ++(n->second);
964 }
965 }
966
967 set_difference(nodes.begin(), nodes.end(),
968 parents.begin(), parents.end(),
969 inserter(heads, heads.begin()));
970
971 L(F("intermediate set_difference heads size %d") % heads.size());
972
973 // Write permissions checking:
974 // remove heads w/o proper certs, add their children to heads
975 // 1) remove unwanted branch certs from consideration
976 // 2) remove heads w/o a branch tag, process new exposed heads
977 // 3) repeat 2 until no change
978
979 //1
980 set<string> ok_branches, bad_branches;
981 cert_name bcert_name(branch_cert_name);
982 cert_name tcert_name(tag_cert_name);
983 for (map<revision_id, cert_map>::iterator i = received_certs.begin();
984 i != received_certs.end(); ++i)
985 {
986 //branches
987 vector<cert> & bcerts(i->second[bcert_name]);
988 vector<cert> keeping;
989 for (vector<cert>::iterator j = bcerts.begin(); j != bcerts.end(); ++j)
990 {
991 cert_value name;
992 decode_base64(j->value, name);
993 if (ok_branches.find(name()) != ok_branches.end())
994 keeping.push_back(*j);
995 else if (bad_branches.find(name()) != bad_branches.end())
996 ;
997 else
998 {
999 if (our_matcher(name()))
1000 {
1001 ok_branches.insert(name());
1002 keeping.push_back(*j);
1003 }
1004 else
1005 {
1006 bad_branches.insert(name());
1007 W(F("Dropping branch certs for unwanted branch %s")
1008 % name);
1009 }
1010 }
1011 }
1012 bcerts = keeping;
1013 }
1014 //2
1015 list<revision_id> tmp;
1016 for (set<revision_id>::iterator i = heads.begin(); i != heads.end(); ++i)
1017 {
1018 if (!received_certs[*i][bcert_name].size())
1019 tmp.push_back(*i);
1020 }
1021 for (list<revision_id>::iterator i = tmp.begin(); i != tmp.end(); ++i)
1022 heads.erase(*i);
1023
1024 L(F("after step 2, heads size %d") % heads.size());
1025 //3
1026 while (tmp.size())
1027 {
1028 ancestryT::const_iterator i = ancestry.find(tmp.front());
1029 if (i != ancestry.end())
1030 {
1031 for (edge_map::const_iterator j = i->second->second.edges.begin();
1032 j != i->second->second.edges.end(); ++j)
1033 {
1034 if (!--chld_num[edge_old_revision(j)])
1035 {
1036 if (received_certs[i->first][bcert_name].size())
1037 heads.insert(i->first);
1038 else
1039 tmp.push_back(edge_old_revision(j));
1040 }
1041 }
1042 // since we don't want this rev, we don't want it's certs either
1043 received_certs[tmp.front()] = cert_map();
1044 }
1045 tmp.pop_front();
1046 }
1047
1048 L(F("after step 3, heads size %d") % heads.size());
1049 // We've reduced the certs to those we want now, send them to dbw.
1050 for (map<revision_id, cert_map>::iterator i = received_certs.begin();
1051 i != received_certs.end(); ++i)
1052 {
1053 for (cert_map::iterator j = i->second.begin();
1054 j != i->second.end(); ++j)
1055 {
1056 for (vector<cert>::iterator k = j->second.begin();
1057 k != j->second.end(); ++k)
1058 {
1059 this->dbw.consume_revision_cert(revision<cert>(*k));
1060 }
1061 }
1062 }
1063}
1064
1065void
1066session::analyze_ancestry_graph()
1067{
1068 L(F("analyze_ancestry_graph"));
1069 if (! (all_requested_revisions_received() && cert_refinement_done()))
1070 {
1071 L(F("not all done in analyze_ancestry_graph"));
1072 return;
1073 }
1074
1075 if (analyzed_ancestry)
1076 {
1077 L(F("already analyzed_ancestry in analyze_ancestry_graph"));
1078 return;
1079 }
1080
1081 L(F("analyze_ancestry_graph fetching"));
1082
1083 ancestry_fetcher fetch(*this);
1084
1085 analyzed_ancestry = true;
1086}
1087
1088Netxx::Probe::ready_type
1089session::which_events() const
1090{
1091 // Only ask to read if we're not armed.
1092 if (outbuf.empty())
1093 {
1094 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1095 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1096 else
1097 return Netxx::Probe::ready_oobd;
1098 }
1099 else
1100 {
1101 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1102 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1103 else
1104 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1105 }
1106}
1107
1108bool
1109session::read_some()
1110{
1111 I(inbuf.size() < constants::netcmd_maxsz);
1112 char tmp[constants::bufsz];
1113 Netxx::signed_size_type count = str.read(tmp, sizeof(tmp));
1114 if (count > 0)
1115 {
1116 L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
1117 if (encountered_error)
1118 {
1119 L(F("in error unwind mode, so throwing them into the bit bucket\n"));
1120 return true;
1121 }
1122 inbuf.append(tmp,count);
1123 mark_recent_io();
1124 if (byte_in_ticker.get() != NULL)
1125 (*byte_in_ticker) += count;
1126 return true;
1127 }
1128 else
1129 return false;
1130}
1131
1132bool
1133session::write_some()
1134{
1135 I(!outbuf.empty());
1136 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1137 Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second,
1138 std::min(writelen,
1139 constants::bufsz));
1140 if (count > 0)
1141 {
1142 if ((size_t)count == writelen)
1143 {
1144 outbuf_size -= outbuf.front().first.size();
1145 outbuf.pop_front();
1146 }
1147 else
1148 {
1149 outbuf.front().second += count;
1150 }
1151 L(F("wrote %d bytes to fd %d (peer %s)\n")
1152 % count % fd % peer_id);
1153 mark_recent_io();
1154 if (byte_out_ticker.get() != NULL)
1155 (*byte_out_ticker) += count;
1156 if (encountered_error && outbuf.empty())
1157 {
1158 // we've flushed our error message, so it's time to get out.
1159 L(F("finished flushing output queue in error unwind mode, disconnecting\n"));
1160 return false;
1161 }
1162 return true;
1163 }
1164 else
1165 return false;
1166}
1167
1168// senders
1169
1170void
1171session::queue_bye_cmd()
1172{
1173 L(F("queueing 'bye' command\n"));
1174 netcmd cmd;
1175 cmd.write_bye_cmd();
1176 write_netcmd_and_try_flush(cmd);
1177 this->sent_goodbye = true;
1178}
1179
1180void
1181session::queue_error_cmd(string const & errmsg)
1182{
1183 L(F("queueing 'error' command\n"));
1184 netcmd cmd;
1185 cmd.write_error_cmd(errmsg);
1186 write_netcmd_and_try_flush(cmd);
1187 this->sent_goodbye = true;
1188}
1189
1190void
1191session::queue_done_cmd(size_t level,
1192 netcmd_item_type type)
1193{
1194 string typestr;
1195 netcmd_item_type_to_string(type, typestr);
1196 L(F("queueing 'done' command for %s level %s\n") % typestr % level);
1197 netcmd cmd;
1198 cmd.write_done_cmd(level, type);
1199 write_netcmd_and_try_flush(cmd);
1200}
1201
1202void
1203session::queue_hello_cmd(rsa_keypair_id const & key_name,
1204 base64<rsa_pub_key> const & pub_encoded,
1205 id const & nonce)
1206{
1207 rsa_pub_key pub;
1208 decode_base64(pub_encoded, pub);
1209 cmd.write_hello_cmd(key_name, pub, nonce);
1210 write_netcmd_and_try_flush(cmd);
1211}
1212
1213void
1214session::queue_anonymous_cmd(protocol_role role,
1215 utf8 const & include_pattern,
1216 utf8 const & exclude_pattern,
1217 id const & nonce2,
1218 base64<rsa_pub_key> server_key_encoded)
1219{
1220 netcmd cmd;
1221 rsa_oaep_sha_data hmac_key_encrypted;
1222 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1223 nonce2(), hmac_key_encrypted);
1224 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1225 hmac_key_encrypted);
1226 write_netcmd_and_try_flush(cmd);
1227 set_session_key(nonce2());
1228}
1229
1230void
1231session::queue_auth_cmd(protocol_role role,
1232 utf8 const & include_pattern,
1233 utf8 const & exclude_pattern,
1234 id const & client,
1235 id const & nonce1,
1236 id const & nonce2,
1237 string const & signature,
1238 base64<rsa_pub_key> server_key_encoded)
1239{
1240 netcmd cmd;
1241 rsa_oaep_sha_data hmac_key_encrypted;
1242 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1243 nonce2(), hmac_key_encrypted);
1244 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1245 nonce1, hmac_key_encrypted, signature);
1246 write_netcmd_and_try_flush(cmd);
1247 set_session_key(nonce2());
1248}
1249
1250void
1251session::queue_confirm_cmd()
1252{
1253 netcmd cmd;
1254 cmd.write_confirm_cmd();
1255 write_netcmd_and_try_flush(cmd);
1256}
1257
1258void
1259session::queue_refine_cmd(merkle_node const & node)
1260{
1261 string typestr;
1262 hexenc<prefix> hpref;
1263 node.get_hex_prefix(hpref);
1264 netcmd_item_type_to_string(node.type, typestr);
1265 L(F("queueing request for refinement of %s node '%s', level %d\n")
1266 % typestr % hpref % static_cast<int>(node.level));
1267 netcmd cmd;
1268 cmd.write_refine_cmd(node);
1269 write_netcmd_and_try_flush(cmd);
1270}
1271
1272void
1273session::queue_send_data_cmd(netcmd_item_type type,
1274 id const & item)
1275{
1276 string typestr;
1277 netcmd_item_type_to_string(type, typestr);
1278 hexenc<id> hid;
1279 encode_hexenc(item, hid);
1280
1281 if (role == source_role)
1282 {
1283 L(F("not queueing request for %s '%s' as we are in pure source role\n")
1284 % typestr % hid);
1285 return;
1286 }
1287
1288 if (item_already_requested(type, item))
1289 {
1290 L(F("not queueing request for %s '%s' as we already requested it\n")
1291 % typestr % hid);
1292 return;
1293 }
1294
1295 L(F("queueing request for data of %s item '%s'\n")
1296 % typestr % hid);
1297 netcmd cmd;
1298 cmd.write_send_data_cmd(type, item);
1299 write_netcmd_and_try_flush(cmd);
1300 note_item_requested(type, item);
1301}
1302
1303void
1304session::queue_send_delta_cmd(netcmd_item_type type,
1305 id const & base,
1306 id const & ident)
1307{
1308 I(type == manifest_item || type == file_item);
1309
1310 string typestr;
1311 netcmd_item_type_to_string(type, typestr);
1312 hexenc<id> base_hid;
1313 encode_hexenc(base, base_hid);
1314 hexenc<id> ident_hid;
1315 encode_hexenc(ident, ident_hid);
1316
1317 if (role == source_role)
1318 {
1319 L(F("not queueing request for %s delta '%s' -> '%s' as we are in pure source role\n")
1320 % typestr % base_hid % ident_hid);
1321 return;
1322 }
1323
1324 if (item_already_requested(type, ident))
1325 {
1326 L(F("not queueing request for %s delta '%s' -> '%s' as we already requested the target\n")
1327 % typestr % base_hid % ident_hid);
1328 return;
1329 }
1330
1331 L(F("queueing request for contents of %s delta '%s' -> '%s'\n")
1332 % typestr % base_hid % ident_hid);
1333 netcmd cmd;
1334 cmd.write_send_delta_cmd(type, base, ident);
1335 write_netcmd_and_try_flush(cmd);
1336 note_item_requested(type, ident);
1337}
1338
1339void
1340session::queue_data_cmd(netcmd_item_type type,
1341 id const & item,
1342 string const & dat)
1343{
1344 string typestr;
1345 netcmd_item_type_to_string(type, typestr);
1346 hexenc<id> hid;
1347 encode_hexenc(item, hid);
1348
1349 if (role == sink_role)
1350 {
1351 L(F("not queueing %s data for '%s' as we are in pure sink role\n")
1352 % typestr % hid);
1353 return;
1354 }
1355
1356 L(F("queueing %d bytes of data for %s item '%s'\n")
1357 % dat.size() % typestr % hid);
1358
1359 netcmd cmd;
1360 // TODO: This pair of functions will make two copies of a large
1361 // file, the first in cmd.write_data_cmd, and the second in
1362 // write_netcmd_and_try_flush when the data is copied from the
1363 // cmd.payload variable to the string buffer for output. This
1364 // double copy should be collapsed out, it may be better to use
1365 // a string_queue for output as well as input, as that will reduce
1366 // the amount of mallocs that happen when the string queue is large
1367 // enough to just store the data.
1368 cmd.write_data_cmd(type, item, dat);
1369 write_netcmd_and_try_flush(cmd);
1370 note_item_sent(type, item);
1371}
1372
1373void
1374session::queue_delta_cmd(netcmd_item_type type,
1375 id const & base,
1376 id const & ident,
1377 delta const & del)
1378{
1379 I(type == manifest_item || type == file_item);
1380 I(! del().empty() || ident == base);
1381 string typestr;
1382 netcmd_item_type_to_string(type, typestr);
1383 hexenc<id> base_hid;
1384 encode_hexenc(base, base_hid);
1385 hexenc<id> ident_hid;
1386 encode_hexenc(ident, ident_hid);
1387
1388 if (role == sink_role)
1389 {
1390 L(F("not queueing %s delta '%s' -> '%s' as we are in pure sink role\n")
1391 % typestr % base_hid % ident_hid);
1392 return;
1393 }
1394
1395 L(F("queueing %s delta '%s' -> '%s'\n")
1396 % typestr % base_hid % ident_hid);
1397 netcmd cmd;
1398 cmd.write_delta_cmd(type, base, ident, del);
1399 write_netcmd_and_try_flush(cmd);
1400 note_item_sent(type, ident);
1401}
1402
1403void
1404session::queue_nonexistant_cmd(netcmd_item_type type,
1405 id const & item)
1406{
1407 string typestr;
1408 netcmd_item_type_to_string(type, typestr);
1409 hexenc<id> hid;
1410 encode_hexenc(item, hid);
1411 if (role == sink_role)
1412 {
1413 L(F("not queueing note of nonexistence of %s item '%s' as we are in pure sink role\n")
1414 % typestr % hid);
1415 return;
1416 }
1417
1418 L(F("queueing note of nonexistance of %s item '%s'\n")
1419 % typestr % hid);
1420 netcmd cmd;
1421 cmd.write_nonexistant_cmd(type, item);
1422 write_netcmd_and_try_flush(cmd);
1423}
1424
1425// processors
1426
1427bool
1428session::process_bye_cmd()
1429{
1430 L(F("received 'bye' netcmd\n"));
1431 this->received_goodbye = true;
1432 return true;
1433}
1434
1435bool
1436session::process_error_cmd(string const & errmsg)
1437{
1438 throw bad_decode(F("received network error: %s") % errmsg);
1439}
1440
1441bool
1442session::process_done_cmd(size_t level, netcmd_item_type type)
1443{
1444
1445 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(type);
1446 I(i != done_refinements.end());
1447
1448 string typestr;
1449 netcmd_item_type_to_string(type, typestr);
1450
1451 if ((! i->second.current_level_had_refinements) || (level >= 0xff))
1452 {
1453 // we received *no* refinements on this level -- or we ran out of
1454 // levels -- so refinement for this type is finished.
1455 L(F("received 'done' for empty %s level %d, marking as complete\n")
1456 % typestr % static_cast<int>(level));
1457
1458 // possibly echo it back one last time, for shutdown purposes
1459 if (!i->second.tree_is_done)
1460 queue_done_cmd(level + 1, type);
1461
1462 // tombstone it
1463 i->second.current_level_had_refinements = false;
1464 i->second.tree_is_done = true;
1465
1466 if (all_requested_revisions_received())
1467 analyze_ancestry_graph();
1468
1469 maybe_note_epochs_finished();
1470 }
1471
1472 else if (i->second.current_level_had_refinements
1473 && (! i->second.tree_is_done))
1474 {
1475 // we *did* receive some refinements on this level, reset to zero and
1476 // queue an echo of the 'done' marker.
1477 L(F("received 'done' for %s level %d, which had refinements; "
1478 "sending echo of done for level %d\n")
1479 % typestr
1480 % static_cast<int>(level)
1481 % static_cast<int>(level + 1));
1482 i->second.current_level_had_refinements = false;
1483 queue_done_cmd(level + 1, type);
1484 return true;
1485 }
1486 return true;
1487}
1488
1489void
1490get_branches(app_state & app, vector<string> & names)
1491{
1492 app.db.get_branches(names);
1493 sort(names.begin(), names.end());
1494}
1495
1496static const var_domain known_servers_domain = var_domain("known-servers");
1497
1498bool
1499session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1500 rsa_pub_key const & their_key,
1501 id const & nonce)
1502{
1503 I(this->remote_peer_key_hash().size() == 0);
1504 I(this->saved_nonce().size() == 0);
1505
1506 hexenc<id> their_key_hash;
1507 base64<rsa_pub_key> their_key_encoded;
1508 encode_base64(their_key, their_key_encoded);
1509 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1510 L(F("server key has name %s, hash %s\n") % their_keyname % their_key_hash);
1511 var_key their_key_key(known_servers_domain, var_name(peer_id));
1512 if (app.db.var_exists(their_key_key))
1513 {
1514 var_value expected_key_hash;
1515 app.db.get_var(their_key_key, expected_key_hash);
1516 if (expected_key_hash() != their_key_hash())
1517 {
1518 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1519 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1520 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1521 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1522 "it is also possible that the server key has just been changed\n"
1523 "remote host sent key %s\n"
1524 "I expected %s\n"
1525 "'monotone unset %s %s' overrides this check\n")
1526 % their_key_hash % expected_key_hash
1527 % their_key_key.first % their_key_key.second);
1528 E(false, F("server key changed"));
1529 }
1530 }
1531 else
1532 {
1533 P(F("first time connecting to server %s\n"
1534 "I'll assume it's really them, but you might want to double-check\n"
1535 "their key's fingerprint: %s\n") % peer_id % their_key_hash);
1536 app.db.set_var(their_key_key, var_value(their_key_hash()));
1537 }
1538 if (!app.db.public_key_exists(their_key_hash))
1539 {
1540 W(F("saving public key for %s to database\n") % their_keyname);
1541 app.db.put_key(their_keyname, their_key_encoded);
1542 }
1543
1544 {
1545 hexenc<id> hnonce;
1546 encode_hexenc(nonce, hnonce);
1547 L(F("received 'hello' netcmd from server '%s' with nonce '%s'\n")
1548 % their_key_hash % hnonce);
1549 }
1550
1551 I(app.db.public_key_exists(their_key_hash));
1552
1553 // save their identity
1554 {
1555 id their_key_hash_decoded;
1556 decode_hexenc(their_key_hash, their_key_hash_decoded);
1557 this->remote_peer_key_hash = their_key_hash_decoded;
1558 }
1559
1560 // clients always include in the synchronization set, every branch that the
1561 // user requested
1562 vector<string> branchnames;
1563 set<utf8> ok_branches;
1564 get_branches(app, branchnames);
1565 for (vector<string>::const_iterator i = branchnames.begin();
1566 i != branchnames.end(); i++)
1567 {
1568 if (our_matcher(*i))
1569 ok_branches.insert(utf8(*i));
1570 }
1571 rebuild_merkle_trees(app, ok_branches);
1572
1573 setup_client_tickers();
1574
1575 if (app.signing_key() != "")
1576 {
1577 // get our key pair
1578 keypair our_kp;
1579 load_key_pair(app, app.signing_key, our_kp);
1580
1581 // get the hash identifier for our pubkey
1582 hexenc<id> our_key_hash;
1583 id our_key_hash_raw;
1584 key_hash_code(app.signing_key, our_kp.pub, our_key_hash);
1585 decode_hexenc(our_key_hash, our_key_hash_raw);
1586
1587 // make a signature
1588 base64<rsa_sha1_signature> sig;
1589 rsa_sha1_signature sig_raw;
1590 make_signature(app, app.signing_key, our_kp.priv, nonce(), sig);
1591 decode_base64(sig, sig_raw);
1592
1593 // make a new nonce of our own and send off the 'auth'
1594 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1595 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1596 their_key_encoded);
1597 }
1598 else
1599 {
1600 queue_anonymous_cmd(this->role, our_include_pattern,
1601 our_exclude_pattern, mk_nonce(), their_key_encoded);
1602 }
1603 return true;
1604}
1605
1606bool
1607session::process_anonymous_cmd(protocol_role role,
1608 utf8 const & their_include_pattern,
1609 utf8 const & their_exclude_pattern)
1610{
1611 //
1612 // internally netsync thinks in terms of sources and sinks. users like
1613 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1614 //
1615 // we therefore use the read/write terminology when dealing with the UI:
1616 // if the user asks to run a "read only" service, this means they are
1617 // willing to be a source but not a sink.
1618 //
1619 // nb: the "role" here is the role the *client* wants to play
1620 // so we need to check that the opposite role is allowed for us,
1621 // in our this->role field.
1622 //
1623
1624 // client must be a sink and server must be a source (anonymous read-only)
1625
1626 if (role != sink_role)
1627 {
1628 W(F("rejected attempt at anonymous connection for write\n"));
1629 this->saved_nonce = id("");
1630 return false;
1631 }
1632
1633 if (this->role != source_role && this->role != source_and_sink_role)
1634 {
1635 W(F("rejected attempt at anonymous connection while running as sink\n"));
1636 this->saved_nonce = id("");
1637 return false;
1638 }
1639
1640 vector<string> branchnames;
1641 set<utf8> ok_branches;
1642 get_branches(app, branchnames);
1643 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1644 for (vector<string>::const_iterator i = branchnames.begin();
1645 i != branchnames.end(); i++)
1646 {
1647 if (their_matcher(*i))
1648 if (our_matcher(*i) && app.lua.hook_get_netsync_read_permitted(*i))
1649 ok_branches.insert(utf8(*i));
1650 else
1651 {
1652 error((F("anonymous access to branch '%s' denied by server") % *i).str());
1653 return true;
1654 }
1655 }
1656
1657 P(F("allowed anonymous read permission for '%s' excluding '%s'\n")
1658 % their_include_pattern % their_exclude_pattern);
1659
1660 rebuild_merkle_trees(app, ok_branches);
1661
1662 this->remote_peer_key_name = rsa_keypair_id("");
1663 this->authenticated = true;
1664 this->role = source_role;
1665 return true;
1666}
1667
1668bool
1669session::process_auth_cmd(protocol_role their_role,
1670 utf8 const & their_include_pattern,
1671 utf8 const & their_exclude_pattern,
1672 id const & client,
1673 id const & nonce1,
1674 string const & signature)
1675{
1676 I(this->remote_peer_key_hash().size() == 0);
1677 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1678
1679 hexenc<id> their_key_hash;
1680 encode_hexenc(client, their_key_hash);
1681 set<utf8> ok_branches;
1682 vector<string> branchnames;
1683 get_branches(app, branchnames);
1684 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1685
1686 // check that they replied with the nonce we asked for
1687 if (!(nonce1 == this->saved_nonce))
1688 {
1689 W(F("detected replay attack in auth netcmd\n"));
1690 this->saved_nonce = id("");
1691 return false;
1692 }
1693
1694
1695 //
1696 // internally netsync thinks in terms of sources and sinks. users like
1697 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1698 //
1699 // we therefore use the read/write terminology when dealing with the UI:
1700 // if the user asks to run a "read only" service, this means they are
1701 // willing to be a source but not a sink.
1702 //
1703 // nb: the "their_role" here is the role the *client* wants to play
1704 // so we need to check that the opposite role is allowed for us,
1705 // in our this->role field.
1706 //
1707
1708 if (!app.db.public_key_exists(their_key_hash))
1709 {
1710 // if it's not in the db, it still could be in the keystore if we
1711 // have the private key that goes with it
1712 if (!app.keys.try_ensure_in_db(their_key_hash))
1713 {
1714 W(F("remote public key hash '%s' is unknown\n") % their_key_hash);
1715 this->saved_nonce = id("");
1716 return false;
1717 }
1718 }
1719
1720 // get their public key
1721 rsa_keypair_id their_id;
1722 base64<rsa_pub_key> their_key;
1723 app.db.get_pubkey(their_key_hash, their_id, their_key);
1724
1725 // client as sink, server as source (reading)
1726
1727 if (their_role == sink_role || their_role == source_and_sink_role)
1728 {
1729 if (this->role != source_role && this->role != source_and_sink_role)
1730 {
1731 W(F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink\n")
1732 % their_id % their_include_pattern % their_exclude_pattern);
1733 this->saved_nonce = id("");
1734 return false;
1735 }
1736 }
1737
1738 for (vector<string>::const_iterator i = branchnames.begin();
1739 i != branchnames.end(); i++)
1740 {
1741 if (their_matcher(*i))
1742 {
1743 if (our_matcher(*i) && app.lua.hook_get_netsync_read_permitted(*i, their_id))
1744 ok_branches.insert(utf8(*i));
1745 else
1746 {
1747 W(F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'\n")
1748 % their_id % their_include_pattern % their_exclude_pattern % *i);
1749 error((F("access to branch '%s' denied by server") % *i).str());
1750 return true;
1751 }
1752 }
1753 }
1754
1755 //if we're source_and_sink_role, continue even with no branches readable
1756 //ex: serve --db=empty.db
1757 P(F("allowed '%s' read permission for '%s' excluding '%s'\n")
1758 % their_id % their_include_pattern % their_exclude_pattern);
1759
1760 // client as source, server as sink (writing)
1761
1762 if (their_role == source_role || their_role == source_and_sink_role)
1763 {
1764 if (this->role != sink_role && this->role != source_and_sink_role)
1765 {
1766 W(F("denied '%s' write permission for '%s' excluding '%s' while running as pure source\n")
1767 % their_id % their_include_pattern % their_exclude_pattern);
1768 this->saved_nonce = id("");
1769 return false;
1770 }
1771
1772 if (!app.lua.hook_get_netsync_write_permitted(their_id))
1773 {
1774 W(F("denied '%s' write permission for '%s' excluding '%s'\n")
1775 % their_id % their_include_pattern % their_exclude_pattern);
1776 this->saved_nonce = id("");
1777 return false;
1778 }
1779
1780 P(F("allowed '%s' write permission for '%s' excluding '%s'\n")
1781 % their_id % their_include_pattern % their_exclude_pattern);
1782 }
1783
1784 rebuild_merkle_trees(app, ok_branches);
1785
1786 // save their identity
1787 this->remote_peer_key_hash = client;
1788
1789 // check the signature
1790 base64<rsa_sha1_signature> sig;
1791 encode_base64(rsa_sha1_signature(signature), sig);
1792 if (check_signature(app, their_id, their_key, nonce1(), sig))
1793 {
1794 // get our private key and sign back
1795 L(F("client signature OK, accepting authentication\n"));
1796 this->authenticated = true;
1797 this->remote_peer_key_name = their_id;
1798 // assume the (possibly degraded) opposite role
1799 switch (their_role)
1800 {
1801 case source_role:
1802 I(this->role != source_role);
1803 this->role = sink_role;
1804 break;
1805 case source_and_sink_role:
1806 I(this->role == source_and_sink_role);
1807 break;
1808 case sink_role:
1809 I(this->role != sink_role);
1810 this->role = source_role;
1811 break;
1812 }
1813 return true;
1814 }
1815 else
1816 {
1817 W(F("bad client signature\n"));
1818 }
1819 return false;
1820}
1821
1822bool
1823session::process_confirm_cmd(string const & signature)
1824{
1825 I(this->remote_peer_key_hash().size() == constants::merkle_hash_length_in_bytes);
1826 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1827
1828 hexenc<id> their_key_hash;
1829 encode_hexenc(id(remote_peer_key_hash), their_key_hash);
1830
1831 // nb. this->role is our role, the server is in the opposite role
1832 L(F("received 'confirm' netcmd from server '%s' for pattern '%s' exclude '%s' in %s mode\n")
1833 % their_key_hash % our_include_pattern % our_exclude_pattern
1834 % (this->role == source_and_sink_role ? _("source and sink") :
1835 (this->role == source_role ? _("sink") : _("source"))));
1836
1837 // check their signature
1838 if (app.db.public_key_exists(their_key_hash))
1839 {
1840 // get their public key and check the signature
1841 rsa_keypair_id their_id;
1842 base64<rsa_pub_key> their_key;
1843 app.db.get_pubkey(their_key_hash, their_id, their_key);
1844 base64<rsa_sha1_signature> sig;
1845 encode_base64(rsa_sha1_signature(signature), sig);
1846 if (check_signature(app, their_id, their_key, this->saved_nonce(), sig))
1847 {
1848 L(F("server signature OK, accepting authentication\n"));
1849 return true;
1850 }
1851 else
1852 {
1853 W(F("bad server signature\n"));
1854 }
1855 }
1856 else
1857 {
1858 W(F("unknown server key\n"));
1859 }
1860 return false;
1861}
1862
1863void
1864session::respond_to_confirm_cmd()
1865{
1866 merkle_ptr root;
1867 load_merkle_node(epoch_item, 0, get_root_prefix().val, root);
1868 queue_refine_cmd(*root);
1869 queue_done_cmd(0, epoch_item);
1870
1871 load_merkle_node(key_item, 0, get_root_prefix().val, root);
1872 queue_refine_cmd(*root);
1873 queue_done_cmd(0, key_item);
1874
1875 load_merkle_node(cert_item, 0, get_root_prefix().val, root);
1876 queue_refine_cmd(*root);
1877 queue_done_cmd(0, cert_item);
1878}
1879
1880static bool
1881data_exists(netcmd_item_type type,
1882 id const & item,
1883 app_state & app)
1884{
1885 hexenc<id> hitem;
1886 encode_hexenc(item, hitem);
1887 switch (type)
1888 {
1889 case key_item:
1890 return app.db.public_key_exists(hitem);
1891 case manifest_item:
1892 return app.db.manifest_version_exists(manifest_id(hitem));
1893 case file_item:
1894 return app.db.file_version_exists(file_id(hitem));
1895 case revision_item:
1896 return app.db.revision_exists(revision_id(hitem));
1897 case cert_item:
1898 return app.db.revision_cert_exists(hitem);
1899 case epoch_item:
1900 return app.db.epoch_exists(epoch_id(hitem));
1901 }
1902 return false;
1903}
1904
1905static void
1906load_data(netcmd_item_type type,
1907 id const & item,
1908 app_state & app,
1909 string & out)
1910{
1911 string typestr;
1912 netcmd_item_type_to_string(type, typestr);
1913 hexenc<id> hitem;
1914 encode_hexenc(item, hitem);
1915 switch (type)
1916 {
1917 case epoch_item:
1918 if (app.db.epoch_exists(epoch_id(hitem)))
1919 {
1920 cert_value branch;
1921 epoch_data epoch;
1922 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1923 write_epoch(branch, epoch, out);
1924 }
1925 else
1926 {
1927 throw bad_decode(F("epoch with hash '%s' does not exist in our database")
1928 % hitem);
1929 }
1930 break;
1931 case key_item:
1932 if (app.db.public_key_exists(hitem))
1933 {
1934 rsa_keypair_id keyid;
1935 base64<rsa_pub_key> pub_encoded;
1936 app.db.get_pubkey(hitem, keyid, pub_encoded);
1937 L(F("public key '%s' is also called '%s'\n") % hitem % keyid);
1938 write_pubkey(keyid, pub_encoded, out);
1939 }
1940 else
1941 {
1942 throw bad_decode(F("no public key '%s' found in database") % hitem);
1943 }
1944 break;
1945
1946 case revision_item:
1947 if (app.db.revision_exists(revision_id(hitem)))
1948 {
1949 revision_data mdat;
1950 data dat;
1951 app.db.get_revision(revision_id(hitem), mdat);
1952 out = mdat.inner()();
1953 }
1954 else
1955 {
1956 throw bad_decode(F("revision '%s' does not exist in our database") % hitem);
1957 }
1958 break;
1959
1960 case manifest_item:
1961 if (app.db.manifest_version_exists(manifest_id(hitem)))
1962 {
1963 manifest_data mdat;
1964 data dat;
1965 app.db.get_manifest_version(manifest_id(hitem), mdat);
1966 out = mdat.inner()();
1967 }
1968 else
1969 {
1970 throw bad_decode(F("manifest '%s' does not exist in our database") % hitem);
1971 }
1972 break;
1973
1974 case file_item:
1975 if (app.db.file_version_exists(file_id(hitem)))
1976 {
1977 file_data fdat;
1978 data dat;
1979 app.db.get_file_version(file_id(hitem), fdat);
1980 out = fdat.inner()();
1981 }
1982 else
1983 {
1984 throw bad_decode(F("file '%s' does not exist in our database") % hitem);
1985 }
1986 break;
1987
1988 case cert_item:
1989 if (app.db.revision_cert_exists(hitem))
1990 {
1991 revision<cert> c;
1992 app.db.get_revision_cert(hitem, c);
1993 string tmp;
1994 write_cert(c.inner(), out);
1995 }
1996 else
1997 {
1998 throw bad_decode(F("cert '%s' does not exist in our database") % hitem);
1999 }
2000 break;
2001 }
2002}
2003
2004
2005bool
2006session::process_refine_cmd(merkle_node const & their_node)
2007{
2008 prefix pref;
2009 hexenc<prefix> hpref;
2010 their_node.get_raw_prefix(pref);
2011 their_node.get_hex_prefix(hpref);
2012 string typestr;
2013
2014 netcmd_item_type_to_string(their_node.type, typestr);
2015 size_t lev = static_cast<size_t>(their_node.level);
2016
2017 L(F("received 'refine' netcmd on %s node '%s', level %d\n")
2018 % typestr % hpref % lev);
2019
2020 if (!merkle_node_exists(their_node.type, their_node.level, pref))
2021 {
2022 L(F("no corresponding %s merkle node for prefix '%s', level %d\n")
2023 % typestr % hpref % lev);
2024
2025 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
2026 {
2027 switch (their_node.get_slot_state(slot))
2028 {
2029 case empty_state:
2030 {
2031 // we agree, this slot is empty
2032 L(boost::format("(#0) they have an empty slot %d (in a %s node '%s', level %d, we do not have)\n")
2033 % slot % typestr % hpref % lev);
2034 continue;
2035 }
2036 break;
2037 case live_leaf_state:
2038 {
2039 // we want what *they* have
2040 id slotval;
2041 hexenc<id> hslotval;
2042 their_node.get_raw_slot(slot, slotval);
2043 their_node.get_hex_slot(slot, hslotval);
2044 L(boost::format("(#0) they have a live leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
2045 % slot % typestr % hpref % lev);
2046 L(boost::format("(#0) requesting their %s leaf %s\n") % typestr % hslotval);
2047 queue_send_data_cmd(their_node.type, slotval);
2048 }
2049 break;
2050 case dead_leaf_state:
2051 {
2052 // we cannot ask for what they have, it is dead
2053 L(boost::format("(#0) they have a dead leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
2054 % slot % typestr % hpref % lev);
2055 continue;
2056 }
2057 break;
2058 case subtree_state:
2059 {
2060 // they have a subtree; might as well ask for that
2061 L(boost::format("(#0) they have a subtree at slot %d (in a %s node '%s', level %d, we do not have)\n")
2062 % slot % typestr % hpref % lev);
2063 merkle_node our_fake_subtree;
2064 their_node.extended_prefix(slot, our_fake_subtree.pref);
2065 our_fake_subtree.level = their_node.level + 1;
2066 our_fake_subtree.type = their_node.type;
2067 queue_refine_cmd(our_fake_subtree);
2068 }
2069 break;
2070 }
2071 }
2072 }
2073 else
2074 {
2075 // we have a corresponding merkle node. there are 16 branches
2076 // to the following switch condition. it is awful. sorry.
2077 L(F("found corresponding %s merkle node for prefix '%s', level %d\n")
2078 % typestr % hpref % lev);
2079 merkle_ptr our_node;
2080 load_merkle_node(their_node.type, their_node.level, pref, our_node);
2081 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
2082 {
2083 switch (their_node.get_slot_state(slot))
2084 {
2085 case empty_state:
2086 switch (our_node->get_slot_state(slot))
2087 {
2088
2089 case empty_state:
2090 // 1: theirs == empty, ours == empty
2091 L(boost::format("(#1) they have an empty slot %d in %s node '%s', level %d, and so do we\n")
2092 % slot % typestr % hpref % lev);
2093 continue;
2094 break;
2095
2096 case live_leaf_state:
2097 // 2: theirs == empty, ours == live
2098 L(boost::format("(#2) they have an empty slot %d in %s node '%s', level %d, we have a live leaf\n")
2099 % slot % typestr % hpref % lev);
2100 {
2101 I(their_node.type == our_node->type);
2102 string tmp;
2103 id slotval;
2104 our_node->get_raw_slot(slot, slotval);
2105 load_data(their_node.type, slotval, this->app, tmp);
2106 queue_data_cmd(their_node.type, slotval, tmp);
2107 }
2108 break;
2109
2110 case dead_leaf_state:
2111 // 3: theirs == empty, ours == dead
2112 L(boost::format("(#3) they have an empty slot %d in %s node '%s', level %d, we have a dead leaf\n")
2113 % slot % typestr % hpref % lev);
2114 continue;
2115 break;
2116
2117 case subtree_state:
2118 // 4: theirs == empty, ours == subtree
2119 L(boost::format("(#4) they have an empty slot %d in %s node '%s', level %d, we have a subtree\n")
2120 % slot % typestr % hpref % lev);
2121 {
2122 prefix subprefix;
2123 our_node->extended_raw_prefix(slot, subprefix);
2124 merkle_ptr our_subtree;
2125 I(our_node->type == their_node.type);
2126 load_merkle_node(their_node.type, our_node->level + 1,
2127 subprefix, our_subtree);
2128 I(our_node->type == our_subtree->type);
2129 // FIXME: it would be more efficient here, to instead of
2130 // sending our subtree, just send the data for everything
2131 // in the subtree.
2132 queue_refine_cmd(*our_subtree);
2133 }
2134 break;
2135
2136 }
2137 break;
2138
2139
2140 case live_leaf_state:
2141 switch (our_node->get_slot_state(slot))
2142 {
2143
2144 case empty_state:
2145 // 5: theirs == live, ours == empty
2146 L(boost::format("(#5) they have a live leaf at slot %d in %s node '%s', level %d, we have nothing\n")
2147 % slot % typestr % hpref % lev);
2148 {
2149 id slotval;
2150 their_node.get_raw_slot(slot, slotval);
2151 queue_send_data_cmd(their_node.type, slotval);
2152 }
2153 break;
2154
2155 case live_leaf_state:
2156 // 6: theirs == live, ours == live
2157 L(boost::format("(#6) they have a live leaf at slot %d in %s node '%s', and so do we\n")
2158 % slot % typestr % hpref);
2159 {
2160 id our_slotval, their_slotval;
2161 their_node.get_raw_slot(slot, their_slotval);
2162 our_node->get_raw_slot(slot, our_slotval);
2163 if (their_slotval == our_slotval)
2164 {
2165 hexenc<id> hslotval;
2166 their_node.get_hex_slot(slot, hslotval);
2167 L(boost::format("(#6) we both have live %s leaf '%s'\n") % typestr % hslotval);
2168 continue;
2169 }
2170 else
2171 {
2172 I(their_node.type == our_node->type);
2173 string tmp;
2174 load_data(our_node->type, our_slotval, this->app, tmp);
2175 queue_send_data_cmd(their_node.type, their_slotval);
2176 queue_data_cmd(our_node->type, our_slotval, tmp);
2177 }
2178 }
2179 break;
2180
2181 case dead_leaf_state:
2182 // 7: theirs == live, ours == dead
2183 L(boost::format("(#7) they have a live leaf at slot %d in %s node %s, level %d, we have a dead one\n")
2184 % slot % typestr % hpref % lev);
2185 {
2186 id our_slotval, their_slotval;
2187 our_node->get_raw_slot(slot, our_slotval);
2188 their_node.get_raw_slot(slot, their_slotval);
2189 if (their_slotval == our_slotval)
2190 {
2191 hexenc<id> hslotval;
2192 their_node.get_hex_slot(slot, hslotval);
2193 L(boost::format("(#7) it's the same %s leaf '%s', but ours is dead\n")
2194 % typestr % hslotval);
2195 continue;
2196 }
2197 else
2198 {
2199 queue_send_data_cmd(their_node.type, their_slotval);
2200 }
2201 }
2202 break;
2203
2204 case subtree_state:
2205 // 8: theirs == live, ours == subtree
2206 L(boost::format("(#8) they have a live leaf in slot %d of %s node '%s', level %d, we have a subtree\n")
2207 % slot % typestr % hpref % lev);
2208 {
2209
2210 id their_slotval;
2211 hexenc<id> their_hval;
2212 their_node.get_raw_slot(slot, their_slotval);
2213 encode_hexenc(their_slotval, their_hval);
2214 if (data_exists(their_node.type, their_slotval, app))
2215 L(boost::format("(#8) we have a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n")
2216 % their_hval % slot % typestr % hpref % lev);
2217 else
2218 {
2219 L(boost::format("(#8) requesting a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n")
2220 % their_hval % slot % typestr % hpref % lev);
2221 queue_send_data_cmd(their_node.type, their_slotval);
2222 }
2223
2224 L(boost::format("(#8) sending our subtree for refinement, in slot %d of %s node '%s', level %d\n")
2225 % slot % typestr % hpref % lev);
2226 prefix subprefix;
2227 our_node->extended_raw_prefix(slot, subprefix);
2228 merkle_ptr our_subtree;
2229 load_merkle_node(our_node->type, our_node->level + 1,
2230 subprefix, our_subtree);
2231 // FIXME: it would be more efficient here, to instead of
2232 // sending our subtree, just send the data for everything
2233 // in the subtree (except, possibly, the item they already
2234 // have).
2235 queue_refine_cmd(*our_subtree);
2236 }
2237 break;
2238 }
2239 break;
2240
2241
2242 case dead_leaf_state:
2243 switch (our_node->get_slot_state(slot))
2244 {
2245 case empty_state:
2246 // 9: theirs == dead, ours == empty
2247 L(boost::format("(#9) they have a dead leaf at slot %d in %s node '%s', level %d, we have nothing\n")
2248 % slot % typestr % hpref % lev);
2249 continue;
2250 break;
2251
2252 case live_leaf_state:
2253 // 10: theirs == dead, ours == live
2254 L(boost::format("(#10) they have a dead leaf at slot %d in %s node '%s', level %d, we have a live one\n")
2255 % slot % typestr % hpref % lev);
2256 {
2257 id our_slotval, their_slotval;
2258 their_node.get_raw_slot(slot, their_slotval);
2259 our_node->get_raw_slot(slot, our_slotval);
2260 hexenc<id> hslotval;
2261 our_node->get_hex_slot(slot, hslotval);
2262 if (their_slotval == our_slotval)
2263 {
2264 L(boost::format("(#10) we both have %s leaf %s, theirs is dead\n")
2265 % typestr % hslotval);
2266 continue;
2267 }
2268 else
2269 {
2270 I(their_node.type == our_node->type);
2271 string tmp;
2272 load_data(our_node->type, our_slotval, this->app, tmp);
2273 queue_data_cmd(our_node->type, our_slotval, tmp);
2274 }
2275 }
2276 break;
2277
2278 case dead_leaf_state:
2279 // 11: theirs == dead, ours == dead
2280 L(boost::format("(#11) they have a dead leaf at slot %d in %s node '%s', level %d, so do we\n")
2281 % slot % typestr % hpref % lev);
2282 continue;
2283 break;
2284
2285 case subtree_state:
2286 // theirs == dead, ours == subtree
2287 L(boost::format("(#12) they have a dead leaf in slot %d of %s node '%s', we have a subtree\n")
2288 % slot % typestr % hpref % lev);
2289 {
2290 prefix subprefix;
2291 our_node->extended_raw_prefix(slot, subprefix);
2292 merkle_ptr our_subtree;
2293 load_merkle_node(our_node->type, our_node->level + 1,
2294 subprefix, our_subtree);
2295 // FIXME: it would be more efficient here, to instead of
2296 // sending our subtree, just send the data for everything
2297 // in the subtree (except, possibly, the dead thing).
2298 queue_refine_cmd(*our_subtree);
2299 }
2300 break;
2301 }
2302 break;
2303
2304
2305 case subtree_state:
2306 switch (our_node->get_slot_state(slot))
2307 {
2308 case empty_state:
2309 // 13: theirs == subtree, ours == empty
2310 L(boost::format("(#13) they have a subtree at slot %d in %s node '%s', level %d, we have nothing\n")
2311 % slot % typestr % hpref % lev);
2312 {
2313 merkle_node our_fake_subtree;
2314 their_node.extended_prefix(slot, our_fake_subtree.pref);
2315 our_fake_subtree.level = their_node.level + 1;
2316 our_fake_subtree.type = their_node.type;
2317 queue_refine_cmd(our_fake_subtree);
2318 }
2319 break;
2320
2321 case live_leaf_state:
2322 // 14: theirs == subtree, ours == live
2323 L(boost::format("(#14) they have a subtree at slot %d in %s node '%s', level %d, we have a live leaf\n")
2324 % slot % typestr % hpref % lev);
2325 {
2326 size_t subslot;
2327 id our_slotval;
2328 merkle_node our_fake_subtree;
2329 our_node->get_raw_slot(slot, our_slotval);
2330 hexenc<id> hslotval;
2331 encode_hexenc(our_slotval, hslotval);
2332
2333 pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot,
2334 our_fake_subtree.pref);
2335 L(boost::format("(#14) pushed our leaf '%s' into fake subtree slot %d, level %d\n")
2336 % hslotval % subslot % (lev + 1));
2337 our_fake_subtree.type = their_node.type;
2338 our_fake_subtree.level = our_node->level + 1;
2339 our_fake_subtree.set_raw_slot(subslot, our_slotval);
2340 our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot));
2341 queue_refine_cmd(our_fake_subtree);
2342 }
2343 break;
2344
2345 case dead_leaf_state:
2346 // 15: theirs == subtree, ours == dead
2347 L(boost::format("(#15) they have a subtree at slot %d in %s node '%s', level %d, we have a dead leaf\n")
2348 % slot % typestr % hpref % lev);
2349 {
2350 size_t subslot;
2351 id our_slotval;
2352 merkle_node our_fake_subtree;
2353 our_node->get_raw_slot(slot, our_slotval);
2354 pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot,
2355 our_fake_subtree.pref);
2356 our_fake_subtree.type = their_node.type;
2357 our_fake_subtree.level = our_node->level + 1;
2358 our_fake_subtree.set_raw_slot(subslot, our_slotval);
2359 our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot));
2360 queue_refine_cmd(our_fake_subtree);
2361 }
2362 break;
2363
2364 case subtree_state:
2365 // 16: theirs == subtree, ours == subtree
2366 L(boost::format("(#16) they have a subtree at slot %d in %s node '%s', level %d, and so do we\n")
2367 % slot % typestr % hpref % lev);
2368 {
2369 id our_slotval, their_slotval;
2370 hexenc<id> hslotval;
2371 their_node.get_raw_slot(slot, their_slotval);
2372 our_node->get_raw_slot(slot, our_slotval);
2373 our_node->get_hex_slot(slot, hslotval);
2374 if (their_slotval == our_slotval)
2375 {
2376 L(boost::format("(#16) we both have %s subtree '%s'\n") % typestr % hslotval);
2377 continue;
2378 }
2379 else
2380 {
2381 L(boost::format("(#16) %s subtrees at slot %d differ, refining ours\n") % typestr % slot);
2382 prefix subprefix;
2383 our_node->extended_raw_prefix(slot, subprefix);
2384 merkle_ptr our_subtree;
2385 load_merkle_node(our_node->type, our_node->level + 1,
2386 subprefix, our_subtree);
2387 queue_refine_cmd(*our_subtree);
2388 }
2389 }
2390 break;
2391 }
2392 break;
2393 }
2394 }
2395 }
2396 return true;
2397}
2398
2399
2400bool
2401session::process_send_data_cmd(netcmd_item_type type,
2402 id const & item)
2403{
2404 string typestr;
2405 netcmd_item_type_to_string(type, typestr);
2406 hexenc<id> hitem;
2407 encode_hexenc(item, hitem);
2408 L(F("received 'send_data' netcmd requesting %s '%s'\n")
2409 % typestr % hitem);
2410 if (data_exists(type, item, this->app))
2411 {
2412 string out;
2413 load_data(type, item, this->app, out);
2414 queue_data_cmd(type, item, out);
2415 }
2416 else
2417 {
2418 queue_nonexistant_cmd(type, item);
2419 }
2420 return true;
2421}
2422
2423bool
2424session::process_send_delta_cmd(netcmd_item_type type,
2425 id const & base,
2426 id const & ident)
2427{
2428 string typestr;
2429 netcmd_item_type_to_string(type, typestr);
2430 delta del;
2431
2432 hexenc<id> hbase, hident;
2433 encode_hexenc(base, hbase);
2434 encode_hexenc(ident, hident);
2435
2436 L(F("received 'send_delta' netcmd requesting %s edge '%s' -> '%s'\n")
2437 % typestr % hbase % hident);
2438
2439 switch (type)
2440 {
2441 case file_item:
2442 {
2443 file_id fbase(hbase), fident(hident);
2444 file_delta fdel;
2445 if (this->app.db.file_version_exists(fbase)
2446 && this->app.db.file_version_exists(fident))
2447 {
2448 file_data base_fdat, ident_fdat;
2449 data base_dat, ident_dat;
2450 this->app.db.get_file_version(fbase, base_fdat);
2451 this->app.db.get_file_version(fident, ident_fdat);
2452 string tmp;
2453 base_dat = base_fdat.inner();
2454 ident_dat = ident_fdat.inner();
2455 compute_delta(base_dat(), ident_dat(), tmp);
2456 del = delta(tmp);
2457 }
2458 else
2459 {
2460 return process_send_data_cmd(type, ident);
2461 }
2462 }
2463 break;
2464
2465 case manifest_item:
2466 {
2467 manifest_id mbase(hbase), mident(hident);
2468 manifest_delta mdel;
2469 if (this->app.db.manifest_version_exists(mbase)
2470 && this->app.db.manifest_version_exists(mident))
2471 {
2472 manifest_data base_mdat, ident_mdat;
2473 data base_dat, ident_dat;
2474 this->app.db.get_manifest_version(mbase, base_mdat);
2475 this->app.db.get_manifest_version(mident, ident_mdat);
2476 string tmp;
2477 base_dat = base_mdat.inner();
2478 ident_dat = ident_mdat.inner();
2479 compute_delta(base_dat(), ident_dat(), tmp);
2480 del = delta(tmp);
2481 }
2482 else
2483 {
2484 return process_send_data_cmd(type, ident);
2485 }
2486 }
2487 break;
2488
2489 default:
2490 throw bad_decode(F("delta requested for item type %s\n") % typestr);
2491 }
2492 queue_delta_cmd(type, base, ident, del);
2493 return true;
2494}
2495
2496bool
2497session::process_data_cmd(netcmd_item_type type,
2498 id const & item,
2499 string const & dat)
2500{
2501 hexenc<id> hitem;
2502 encode_hexenc(item, hitem);
2503
2504 // it's ok if we received something we didn't ask for; it might
2505 // be a spontaneous transmission from refinement
2506 note_item_arrived(type, item);
2507
2508 switch (type)
2509 {
2510 case epoch_item:
2511 if (this->app.db.epoch_exists(epoch_id(hitem)))
2512 {
2513 L(F("epoch '%s' already exists in our database\n") % hitem);
2514 }
2515 else
2516 {
2517 cert_value branch;
2518 epoch_data epoch;
2519 read_epoch(dat, branch, epoch);
2520 L(F("received epoch %s for branch %s\n") % epoch % branch);
2521 std::map<cert_value, epoch_data> epochs;
2522 app.db.get_epochs(epochs);
2523 std::map<cert_value, epoch_data>::const_iterator i;
2524 i = epochs.find(branch);
2525 if (i == epochs.end())
2526 {
2527 L(F("branch %s has no epoch; setting epoch to %s\n") % branch % epoch);
2528 app.db.set_epoch(branch, epoch);
2529 maybe_note_epochs_finished();
2530 }
2531 else
2532 {
2533 L(F("branch %s already has an epoch; checking\n") % branch);
2534 // if we get here, then we know that the epoch must be
2535 // different, because if it were the same then the
2536 // if(epoch_exists()) branch up above would have been taken. if
2537 // somehow this is wrong, then we have broken epoch hashing or
2538 // something, which is very dangerous, so play it safe...
2539 I(!(i->second == epoch));
2540
2541 // It is safe to call 'error' here, because if we get here,
2542 // then the current netcmd packet cannot possibly have
2543 // written anything to the database.
2544 error((F("Mismatched epoch on branch %s."
2545 " Server has '%s', client has '%s'.")
2546 % branch
2547 % (voice == server_voice ? i->second : epoch)
2548 % (voice == server_voice ? epoch : i->second)).str());
2549 }
2550 }
2551 break;
2552
2553 case key_item:
2554 if (this->app.db.public_key_exists(hitem))
2555 L(F("public key '%s' already exists in our database\n") % hitem);
2556 else
2557 {
2558 rsa_keypair_id keyid;
2559 base64<rsa_pub_key> pub;
2560 read_pubkey(dat, keyid, pub);
2561 hexenc<id> tmp;
2562 key_hash_code(keyid, pub, tmp);
2563 if (! (tmp == hitem))
2564 throw bad_decode(F("hash check failed for public key '%s' (%s);"
2565 " wanted '%s' got '%s'")
2566 % hitem % keyid % hitem % tmp);
2567 this->dbw.consume_public_key(keyid, pub);
2568 }
2569 break;
2570
2571 case cert_item:
2572 if (this->app.db.revision_cert_exists(hitem))
2573 L(F("cert '%s' already exists in our database\n") % hitem);
2574 else
2575 {
2576 cert c;
2577 read_cert(dat, c);
2578 hexenc<id> tmp;
2579 cert_hash_code(c, tmp);
2580 if (! (tmp == hitem))
2581 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
2582// this->dbw.consume_revision_cert(revision<cert>(c));
2583 received_certs[revision_id(c.ident)][c.name].push_back(c);
2584 if (!app.db.revision_exists(revision_id(c.ident)))
2585 {
2586 id rid;
2587 decode_hexenc(c.ident, rid);
2588 queue_send_data_cmd(revision_item, rid);
2589 }
2590 }
2591 break;
2592
2593 case revision_item:
2594 {
2595 revision_id rid(hitem);
2596 if (this->app.db.revision_exists(rid))
2597 L(F("revision '%s' already exists in our database\n") % hitem);
2598 else
2599 {
2600 L(F("received revision '%s'\n") % hitem);
2601 boost::shared_ptr< pair<revision_data, revision_set > >
2602 rp(new pair<revision_data, revision_set>());
2603
2604 rp->first = revision_data(dat);
2605 read_revision_set(dat, rp->second);
2606 ancestry.insert(std::make_pair(rid, rp));
2607 if (cert_refinement_done())
2608 {
2609 analyze_ancestry_graph();
2610 }
2611 }
2612 }
2613 break;
2614
2615 case manifest_item:
2616 {
2617 manifest_id mid(hitem);
2618 if (this->app.db.manifest_version_exists(mid))
2619 L(F("manifest version '%s' already exists in our database\n") % hitem);
2620 else
2621 {
2622 this->dbw.consume_manifest_data(mid, manifest_data(dat));
2623 manifest_map man;
2624 read_manifest_map(data(dat), man);
2625 analyze_manifest(man);
2626 }
2627 }
2628 break;
2629
2630 case file_item:
2631 {
2632 file_id fid(hitem);
2633 if (this->app.db.file_version_exists(fid))
2634 L(F("file version '%s' already exists in our database\n") % hitem);
2635 else
2636 {
2637 this->dbw.consume_file_data(fid, file_data(dat));
2638 }
2639 }
2640 break;
2641
2642 }
2643 return true;
2644}
2645
2646bool
2647session::process_delta_cmd(netcmd_item_type type,
2648 id const & base,
2649 id const & ident,
2650 delta const & del)
2651{
2652 string typestr;
2653 netcmd_item_type_to_string(type, typestr);
2654 hexenc<id> hbase, hident;
2655 encode_hexenc(base, hbase);
2656 encode_hexenc(ident, hident);
2657
2658 pair<id,id> id_pair = make_pair(base, ident);
2659
2660 // it's ok if we received something we didn't ask for; it might
2661 // be a spontaneous transmission from refinement
2662 note_item_arrived(type, ident);
2663
2664 switch (type)
2665 {
2666 case manifest_item:
2667 {
2668 manifest_id src_manifest(hbase), dst_manifest(hident);
2669 if (full_delta_items[manifest_item]->find(ident)
2670 != full_delta_items[manifest_item]->end())
2671 {
2672 this->dbw.consume_manifest_delta(src_manifest,
2673 dst_manifest,
2674 manifest_delta(del),
2675 true);
2676 }
2677 else if (reverse_delta_requests.find(id_pair)
2678 != reverse_delta_requests.end())
2679 {
2680 reverse_delta_requests.erase(id_pair);
2681 this->dbw.consume_manifest_reverse_delta(src_manifest,
2682 dst_manifest,
2683 manifest_delta(del));
2684 }
2685 else
2686 this->dbw.consume_manifest_delta(src_manifest,
2687 dst_manifest,
2688 manifest_delta(del));
2689
2690 }
2691 break;
2692
2693 case file_item:
2694 {
2695 file_id src_file(hbase), dst_file(hident);
2696 if (full_delta_items[file_item]->find(ident)
2697 != full_delta_items[file_item]->end())
2698 {
2699 this->dbw.consume_file_delta(src_file,
2700 dst_file,
2701 file_delta(del),
2702 true);
2703 }
2704 else if (reverse_delta_requests.find(id_pair)
2705 != reverse_delta_requests.end())
2706 {
2707 reverse_delta_requests.erase(id_pair);
2708 this->dbw.consume_file_reverse_delta(src_file,
2709 dst_file,
2710 file_delta(del));
2711 }
2712 else
2713 this->dbw.consume_file_delta(src_file,
2714 dst_file,
2715 file_delta(del));
2716 }
2717 break;
2718
2719 default:
2720 L(F("ignoring delta received for item type %s\n") % typestr);
2721 break;
2722 }
2723 return true;
2724}
2725
2726bool
2727session::process_nonexistant_cmd(netcmd_item_type type,
2728 id const & item)
2729{
2730 string typestr;
2731 netcmd_item_type_to_string(type, typestr);
2732 hexenc<id> hitem;
2733 encode_hexenc(item, hitem);
2734 L(F("received 'nonexistant' netcmd for %s '%s'\n")
2735 % typestr % hitem);
2736 note_item_arrived(type, item);
2737 return true;
2738}
2739
2740bool
2741session::process_usher_cmd(utf8 const & msg)
2742{
2743 if (msg().size())
2744 {
2745 if (msg()[0] == '!')
2746 P(F("Received warning from usher: %s") % msg().substr(1));
2747 else
2748 L(F("Received greeting from usher: %s") % msg().substr(1));
2749 }
2750 netcmd cmdout;
2751 cmdout.write_usher_reply_cmd(peer_id, our_include_pattern);
2752 write_netcmd_and_try_flush(cmdout);
2753 L(F("Sent reply."));
2754 return true;
2755}
2756
2757bool
2758session::merkle_node_exists(netcmd_item_type type,
2759 size_t level,
2760 prefix const & pref)
2761{
2762 map<netcmd_item_type, boost::shared_ptr<merkle_table> >::const_iterator i =
2763 merkle_tables.find(type);
2764
2765 I(i != merkle_tables.end());
2766 merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level));
2767 return (j != i->second->end());
2768}
2769
2770void
2771session::load_merkle_node(netcmd_item_type type,
2772 size_t level,
2773 prefix const & pref,
2774 merkle_ptr & node)
2775{
2776 map<netcmd_item_type, boost::shared_ptr<merkle_table> >::const_iterator i =
2777 merkle_tables.find(type);
2778
2779 I(i != merkle_tables.end());
2780 merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level));
2781 I(j != i->second->end());
2782 node = j->second;
2783}
2784
2785
2786bool
2787session::dispatch_payload(netcmd const & cmd)
2788{
2789
2790 switch (cmd.get_cmd_code())
2791 {
2792
2793 case bye_cmd:
2794 return process_bye_cmd();
2795 break;
2796
2797 case error_cmd:
2798 {
2799 string errmsg;
2800 cmd.read_error_cmd(errmsg);
2801 return process_error_cmd(errmsg);
2802 }
2803 break;
2804
2805 case hello_cmd:
2806 require(! authenticated, "hello netcmd received when not authenticated");
2807 require(voice == client_voice, "hello netcmd received in client voice");
2808 {
2809 rsa_keypair_id server_keyname;
2810 rsa_pub_key server_key;
2811 id nonce;
2812 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2813 return process_hello_cmd(server_keyname, server_key, nonce);
2814 }
2815 break;
2816
2817 case anonymous_cmd:
2818 require(! authenticated, "anonymous netcmd received when not authenticated");
2819 require(voice == server_voice, "anonymous netcmd received in server voice");
2820 require(role == source_role ||
2821 role == source_and_sink_role,
2822 "anonymous netcmd received in source or source/sink role");
2823 {
2824 protocol_role role;
2825 utf8 their_include_pattern, their_exclude_pattern;
2826 rsa_oaep_sha_data hmac_key_encrypted;
2827 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2828 L(F("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2829 "in %s mode\n")
2830 % their_include_pattern % their_exclude_pattern
2831 % (role == source_and_sink_role ? _("source and sink") :
2832 (role == source_role ? _("source") : _("sink"))));
2833
2834 set_session_key(hmac_key_encrypted);
2835 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2836 return false;
2837 queue_confirm_cmd();
2838 return true;
2839 }
2840 break;
2841
2842 case auth_cmd:
2843 require(! authenticated, "auth netcmd received when not authenticated");
2844 require(voice == server_voice, "auth netcmd received in server voice");
2845 {
2846 protocol_role role;
2847 string signature;
2848 utf8 their_include_pattern, their_exclude_pattern;
2849 id client, nonce1, nonce2;
2850 rsa_oaep_sha_data hmac_key_encrypted;
2851 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2852 client, nonce1, hmac_key_encrypted, signature);
2853
2854 hexenc<id> their_key_hash;
2855 encode_hexenc(client, their_key_hash);
2856 hexenc<id> hnonce1;
2857 encode_hexenc(nonce1, hnonce1);
2858
2859 L(F("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2860 "exclude '%s' in %s mode with nonce1 '%s'\n")
2861 % their_key_hash % their_include_pattern % their_exclude_pattern
2862 % (role == source_and_sink_role ? _("source and sink") :
2863 (role == source_role ? _("source") : _("sink")))
2864 % hnonce1);
2865
2866 set_session_key(hmac_key_encrypted);
2867 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2868 client, nonce1, signature))
2869 return false;
2870 queue_confirm_cmd();
2871 return true;
2872 }
2873 break;
2874
2875 case confirm_cmd:
2876 require(! authenticated, "confirm netcmd received when not authenticated");
2877 require(voice == client_voice, "confirm netcmd received in client voice");
2878 {
2879 string signature;
2880 cmd.read_confirm_cmd();
2881 this->authenticated = true;
2882 respond_to_confirm_cmd();
2883 return true;
2884 }
2885 break;
2886
2887 case refine_cmd:
2888 require(authenticated, "refine netcmd received when authenticated");
2889 {
2890 merkle_node node;
2891 cmd.read_refine_cmd(node);
2892 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(node.type);
2893 require(i != done_refinements.end(), "refinement netcmd refers to valid type");
2894 require(i->second.tree_is_done == false, "refinement netcmd received when tree is live");
2895 i->second.current_level_had_refinements = true;
2896 return process_refine_cmd(node);
2897 }
2898 break;
2899
2900 case done_cmd:
2901 require(authenticated, "done netcmd received when authenticated");
2902 {
2903 size_t level;
2904 netcmd_item_type type;
2905 cmd.read_done_cmd(level, type);
2906 return process_done_cmd(level, type);
2907 }
2908 break;
2909
2910 case send_data_cmd:
2911 require(authenticated, "send_data netcmd received when authenticated");
2912 require(role == source_role ||
2913 role == source_and_sink_role,
2914 "send_data netcmd received in source or source/sink role");
2915 {
2916 netcmd_item_type type;
2917 id item;
2918 cmd.read_send_data_cmd(type, item);
2919 return process_send_data_cmd(type, item);
2920 }
2921 break;
2922
2923 case send_delta_cmd:
2924 require(authenticated, "send_delta netcmd received when authenticated");
2925 require(role == source_role ||
2926 role == source_and_sink_role,
2927 "send_delta netcmd received in source or source/sink role");
2928 {
2929 netcmd_item_type type;
2930 id base, ident;
2931 cmd.read_send_delta_cmd(type, base, ident);
2932 return process_send_delta_cmd(type, base, ident);
2933 }
2934
2935 case data_cmd:
2936 require(authenticated, "data netcmd received when authenticated");
2937 require(role == sink_role ||
2938 role == source_and_sink_role,
2939 "data netcmd received in source or source/sink role");
2940 {
2941 netcmd_item_type type;
2942 id item;
2943 string dat;
2944 cmd.read_data_cmd(type, item, dat);
2945 return process_data_cmd(type, item, dat);
2946 }
2947 break;
2948
2949 case delta_cmd:
2950 require(authenticated, "delta netcmd received when authenticated");
2951 require(role == sink_role ||
2952 role == source_and_sink_role,
2953 "delta netcmd received in source or source/sink role");
2954 {
2955 netcmd_item_type type;
2956 id base, ident;
2957 delta del;
2958 cmd.read_delta_cmd(type, base, ident, del);
2959 return process_delta_cmd(type, base, ident, del);
2960 }
2961 break;
2962
2963 case nonexistant_cmd:
2964 require(authenticated, "nonexistant netcmd received when authenticated");
2965 require(role == sink_role ||
2966 role == source_and_sink_role,
2967 "nonexistant netcmd received in sink or source/sink role");
2968 {
2969 netcmd_item_type type;
2970 id item;
2971 cmd.read_nonexistant_cmd(type, item);
2972 return process_nonexistant_cmd(type, item);
2973 }
2974 break;
2975 case usher_cmd:
2976 {
2977 utf8 greeting;
2978 cmd.read_usher_cmd(greeting);
2979 return process_usher_cmd(greeting);
2980 }
2981 break;
2982 case usher_reply_cmd:
2983 return false;// should not happen
2984 break;
2985 }
2986 return false;
2987}
2988
2989// this kicks off the whole cascade starting from "hello"
2990void
2991session::begin_service()
2992{
2993 keypair kp;
2994 app.keys.get_key_pair(app.signing_key, kp);
2995 queue_hello_cmd(app.signing_key, kp.pub, mk_nonce());
2996}
2997
2998void
2999session::maybe_say_goodbye()
3000{
3001 if (done_all_refinements() &&
3002 got_all_data() && !sent_goodbye)
3003 queue_bye_cmd();
3004}
3005
3006bool
3007session::arm()
3008{
3009 if (!armed)
3010 {
3011 if (outbuf_size > constants::bufsz * 10)
3012 return false; // don't pack the buffer unnecessarily
3013
3014 if (cmd.read(inbuf, read_hmac))
3015 {
3016 armed = true;
3017 }
3018 }
3019 return armed;
3020}
3021
3022bool session::process()
3023{
3024 try
3025 {
3026 if (!arm())
3027 return true;
3028
3029 transaction_guard guard(app.db);
3030 armed = false;
3031 L(F("processing %d byte input buffer from peer %s\n") % inbuf.size() % peer_id);
3032 bool ret = dispatch_payload(cmd);
3033 if (inbuf.size() >= constants::netcmd_maxsz)
3034 W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id);
3035 guard.commit();
3036 maybe_say_goodbye();
3037 if (!ret)
3038 P(F("failed to process '%s' packet") % cmd.get_cmd_code());
3039 return ret;
3040 }
3041 catch (bad_decode & bd)
3042 {
3043 W(F("protocol error while processing peer %s: '%s'\n") % peer_id % bd.what);
3044 return false;
3045 }
3046}
3047
3048
3049static void
3050call_server(protocol_role role,
3051 utf8 const & include_pattern,
3052 utf8 const & exclude_pattern,
3053 app_state & app,
3054 utf8 const & address,
3055 Netxx::port_type default_port,
3056 unsigned long timeout_seconds)
3057{
3058 Netxx::Probe probe;
3059 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
3060#ifdef USE_IPV6
3061 bool use_ipv6=true;
3062#else
3063 bool use_ipv6=false;
3064#endif
3065
3066 // FIXME: split into labels and convert to ace here.
3067
3068 P(F("connecting to %s\n") % address());
3069 Netxx::Address addr(address().c_str(), default_port, use_ipv6);
3070 Netxx::Stream server(addr, timeout);
3071 session sess(role, client_voice, include_pattern, exclude_pattern,
3072 app, address(), server.get_socketfd(), timeout);
3073
3074 while (true)
3075 {
3076 bool armed = false;
3077 try
3078 {
3079 armed = sess.arm();
3080 }
3081 catch (bad_decode & bd)
3082 {
3083 W(F("protocol error while processing peer %s: '%s'\n")
3084 % sess.peer_id % bd.what);
3085 return;
3086 }
3087
3088 probe.clear();
3089 probe.add(sess.str, sess.which_events());
3090 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
3091 Netxx::Probe::ready_type event = res.second;
3092 Netxx::socket_type fd = res.first;
3093
3094 if (fd == -1 && !armed)
3095 {
3096 P(F("timed out waiting for I/O with peer %s, disconnecting\n") % sess.peer_id);
3097 return;
3098 }
3099
3100 if (event & Netxx::Probe::ready_read)
3101 {
3102 if (sess.read_some())
3103 {
3104 try
3105 {
3106 armed = sess.arm();
3107 }
3108 catch (bad_decode & bd)
3109 {
3110 W(F("protocol error while processing peer %s: '%s'\n")
3111 % sess.peer_id % bd.what);
3112 return;
3113 }
3114 }
3115 else
3116 {
3117 if (sess.sent_goodbye)
3118 P(F("read from fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
3119 else
3120 P(F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
3121 return;
3122 }
3123 }
3124
3125 if (event & Netxx::Probe::ready_write)
3126 {
3127 if (! sess.write_some())
3128 {
3129 if (sess.sent_goodbye)
3130 P(F("write on fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
3131 else
3132 P(F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
3133 return;
3134 }
3135 }
3136
3137 if (event & Netxx::Probe::ready_oobd)
3138 {
3139 P(F("got OOB data on fd %d (peer %s), disconnecting\n")
3140 % fd % sess.peer_id);
3141 return;
3142 }
3143
3144 if (armed)
3145 {
3146 if (!sess.process())
3147 {
3148 P(F("terminated exchange with %s\n")
3149 % sess.peer_id);
3150 return;
3151 }
3152 }
3153
3154 if (sess.sent_goodbye && sess.outbuf.empty() && sess.received_goodbye)
3155 {
3156 P(F("successful exchange with %s\n")
3157 % sess.peer_id);
3158 return;
3159 }
3160 }
3161}
3162
3163static void
3164arm_sessions_and_calculate_probe(Netxx::Probe & probe,
3165 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3166 set<Netxx::socket_type> & armed_sessions)
3167{
3168 set<Netxx::socket_type> arm_failed;
3169 for (map<Netxx::socket_type,
3170 shared_ptr<session> >::const_iterator i = sessions.begin();
3171 i != sessions.end(); ++i)
3172 {
3173 try
3174 {
3175 if (i->second->arm())
3176 {
3177 L(F("fd %d is armed\n") % i->first);
3178 armed_sessions.insert(i->first);
3179 }
3180 probe.add(i->second->str, i->second->which_events());
3181 }
3182 catch (bad_decode & bd)
3183 {
3184 W(F("protocol error while processing peer %s: '%s', marking as bad\n")
3185 % i->second->peer_id % bd.what);
3186 arm_failed.insert(i->first);
3187 }
3188 }
3189 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
3190 i != arm_failed.end(); ++i)
3191 {
3192 sessions.erase(*i);
3193 }
3194}
3195
3196static void
3197handle_new_connection(Netxx::Address & addr,
3198 Netxx::StreamServer & server,
3199 Netxx::Timeout & timeout,
3200 protocol_role role,
3201 utf8 const & include_pattern,
3202 utf8 const & exclude_pattern,
3203 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3204 app_state & app)
3205{
3206 L(F("accepting new connection on %s : %s\n")
3207 % addr.get_name() % lexical_cast<string>(addr.get_port()));
3208 Netxx::Peer client = server.accept_connection();
3209
3210 if (!client)
3211 {
3212 L(F("accept() returned a dead client\n"));
3213 }
3214 else
3215 {
3216 P(F("accepted new client connection from %s : %s\n")
3217 % client.get_address() % lexical_cast<string>(client.get_port()));
3218 shared_ptr<session> sess(new session(role, server_voice,
3219 include_pattern, exclude_pattern,
3220 app,
3221 lexical_cast<string>(client),
3222 client.get_socketfd(), timeout));
3223 sess->begin_service();
3224 sessions.insert(make_pair(client.get_socketfd(), sess));
3225 }
3226}
3227
3228static void
3229handle_read_available(Netxx::socket_type fd,
3230 shared_ptr<session> sess,
3231 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3232 set<Netxx::socket_type> & armed_sessions,
3233 bool & live_p)
3234{
3235 if (sess->read_some())
3236 {
3237 try
3238 {
3239 if (sess->arm())
3240 armed_sessions.insert(fd);
3241 }
3242 catch (bad_decode & bd)
3243 {
3244 W(F("protocol error while processing peer %s: '%s', disconnecting\n")
3245 % sess->peer_id % bd.what);
3246 sessions.erase(fd);
3247 live_p = false;
3248 }
3249 }
3250 else
3251 {
3252 P(F("fd %d (peer %s) read failed, disconnecting\n")
3253 % fd % sess->peer_id);
3254 sessions.erase(fd);
3255 live_p = false;
3256 }
3257}
3258
3259
3260static void
3261handle_write_available(Netxx::socket_type fd,
3262 shared_ptr<session> sess,
3263 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3264 bool & live_p)
3265{
3266 if (! sess->write_some())
3267 {
3268 P(F("fd %d (peer %s) write failed, disconnecting\n")
3269 % fd % sess->peer_id);
3270 sessions.erase(fd);
3271 live_p = false;
3272 }
3273}
3274
3275static void
3276process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
3277 set<Netxx::socket_type> & armed_sessions)
3278{
3279 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
3280 i != armed_sessions.end(); ++i)
3281 {
3282 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
3283 j = sessions.find(*i);
3284 if (j == sessions.end())
3285 continue;
3286 else
3287 {
3288 Netxx::socket_type fd = j->first;
3289 shared_ptr<session> sess = j->second;
3290 if (!sess->process())
3291 {
3292 P(F("fd %d (peer %s) processing finished, disconnecting\n")
3293 % fd % sess->peer_id);
3294 sessions.erase(j);
3295 }
3296 }
3297 }
3298}
3299
3300static void
3301reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
3302 unsigned long timeout_seconds)
3303{
3304 // kill any clients which haven't done any i/o inside the timeout period
3305 // or who have said goodbye and flushed their output buffers
3306 set<Netxx::socket_type> dead_clients;
3307 time_t now = ::time(NULL);
3308 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.begin();
3309 i != sessions.end(); ++i)
3310 {
3311 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
3312 < static_cast<unsigned long>(now))
3313 {
3314 P(F("fd %d (peer %s) has been idle too long, disconnecting\n")
3315 % i->first % i->second->peer_id);
3316 dead_clients.insert(i->first);
3317 }
3318 if (i->second->sent_goodbye && i->second->outbuf.empty() && i->second->received_goodbye)
3319 {
3320 P(F("fd %d (peer %s) exchanged goodbyes and flushed output, disconnecting\n")
3321 % i->first % i->second->peer_id);
3322 dead_clients.insert(i->first);
3323 }
3324 }
3325 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
3326 i != dead_clients.end(); ++i)
3327 {
3328 sessions.erase(*i);
3329 }
3330}
3331
3332static void
3333serve_connections(protocol_role role,
3334 utf8 const & include_pattern,
3335 utf8 const & exclude_pattern,
3336 app_state & app,
3337 utf8 const & address,
3338 Netxx::port_type default_port,
3339 unsigned long timeout_seconds,
3340 unsigned long session_limit)
3341{
3342 Netxx::Probe probe;
3343
3344 Netxx::Timeout
3345 forever,
3346 timeout(static_cast<long>(timeout_seconds)),
3347 instant(0,1);
3348
3349 if (!app.bind_port().empty())
3350 default_port = ::atoi(app.bind_port().c_str());
3351#ifdef USE_IPV6
3352 bool use_ipv6=true;
3353#else
3354 bool use_ipv6=false;
3355#endif
3356 Netxx::Address addr(use_ipv6);
3357
3358 if (!app.bind_address().empty())
3359 addr.add_address(app.bind_address().c_str(), default_port);
3360 else
3361 addr.add_all_addresses (default_port);
3362
3363
3364 Netxx::StreamServer server(addr, timeout);
3365 const char *name = addr.get_name();
3366 P(F("beginning service on %s : %s\n")
3367 % (name != NULL ? name : "all interfaces") % lexical_cast<string>(addr.get_port()));
3368
3369 map<Netxx::socket_type, shared_ptr<session> > sessions;
3370 set<Netxx::socket_type> armed_sessions;
3371
3372 while (true)
3373 {
3374 probe.clear();
3375 armed_sessions.clear();
3376
3377 if (sessions.size() >= session_limit)
3378 W(F("session limit %d reached, some connections will be refused\n") % session_limit);
3379 else
3380 probe.add(server);
3381
3382 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
3383
3384 L(F("i/o probe with %d armed\n") % armed_sessions.size());
3385 Netxx::Probe::result_type res = probe.ready(sessions.empty() ? forever
3386 : (armed_sessions.empty() ? timeout
3387 : instant));
3388 Netxx::Probe::ready_type event = res.second;
3389 Netxx::socket_type fd = res.first;
3390
3391 if (fd == -1)
3392 {
3393 if (armed_sessions.empty())
3394 L(F("timed out waiting for I/O (listening on %s : %s)\n")
3395 % addr.get_name() % lexical_cast<string>(addr.get_port()));
3396 }
3397
3398 // we either got a new connection
3399 else if (fd == server)
3400 handle_new_connection(addr, server, timeout, role,
3401 include_pattern, exclude_pattern, sessions, app);
3402
3403 // or an existing session woke up
3404 else
3405 {
3406 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3407 i = sessions.find(fd);
3408 if (i == sessions.end())
3409 {
3410 L(F("got woken up for action on unknown fd %d\n") % fd);
3411 }
3412 else
3413 {
3414 shared_ptr<session> sess = i->second;
3415 bool live_p = true;
3416
3417 if (event & Netxx::Probe::ready_read)
3418 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3419
3420 if (live_p && (event & Netxx::Probe::ready_write))
3421 handle_write_available(fd, sess, sessions, live_p);
3422
3423 if (live_p && (event & Netxx::Probe::ready_oobd))
3424 {
3425 P(F("got some OOB data on fd %d (peer %s), disconnecting\n")
3426 % fd % sess->peer_id);
3427 sessions.erase(i);
3428 }
3429 }
3430 }
3431 process_armed_sessions(sessions, armed_sessions);
3432 reap_dead_sessions(sessions, timeout_seconds);
3433 }
3434}
3435
3436
3437/////////////////////////////////////////////////
3438//
3439// layer 4: monotone interface layer
3440//
3441/////////////////////////////////////////////////
3442
3443static boost::shared_ptr<merkle_table>
3444make_root_node(session & sess,
3445 netcmd_item_type ty)
3446{
3447 boost::shared_ptr<merkle_table> tab =
3448 boost::shared_ptr<merkle_table>(new merkle_table());
3449
3450 merkle_ptr tmp = merkle_ptr(new merkle_node());
3451 tmp->type = ty;
3452
3453 tab->insert(std::make_pair(std::make_pair(get_root_prefix().val, 0), tmp));
3454
3455 sess.merkle_tables[ty] = tab;
3456 return tab;
3457}
3458
3459void
3460insert_with_parents(revision_id rev, set<revision_id> & col, app_state & app, ticker & revisions_ticker)
3461{
3462 vector<revision_id> frontier;
3463 frontier.push_back(rev);
3464 while (!frontier.empty())
3465 {
3466 revision_id rid = frontier.back();
3467 frontier.pop_back();
3468 if (!null_id(rid) && col.find(rid) == col.end())
3469 {
3470 ++revisions_ticker;
3471 col.insert(rid);
3472 std::set<revision_id> parents;
3473 app.db.get_revision_parents(rid, parents);
3474 for (std::set<revision_id>::const_iterator i = parents.begin();
3475 i != parents.end(); ++i)
3476 {
3477 frontier.push_back(*i);
3478 }
3479 }
3480 }
3481}
3482
3483void
3484session::rebuild_merkle_trees(app_state & app,
3485 set<utf8> const & branchnames)
3486{
3487 P(F("finding items to synchronize:\n"));
3488 for (set<utf8>::const_iterator i = branchnames.begin();
3489 i != branchnames.end(); ++i)
3490 L(F("including branch %s") % *i);
3491
3492 boost::shared_ptr<merkle_table> ctab = make_root_node(*this, cert_item);
3493 boost::shared_ptr<merkle_table> ktab = make_root_node(*this, key_item);
3494 boost::shared_ptr<merkle_table> etab = make_root_node(*this, epoch_item);
3495
3496 // xgettext: please use short message and try to avoid multibytes chars
3497 ticker revisions_ticker(_("revisions"), "r", 64);
3498 // xgettext: please use short message and try to avoid multibytes chars
3499 ticker certs_ticker(_("certs"), "c", 256);
3500 // xgettext: please use short message and try to avoid multibytes chars
3501 ticker keys_ticker(_("keys"), "k", 1);
3502
3503 set<revision_id> revision_ids;
3504 set<rsa_keypair_id> inserted_keys;
3505
3506 {
3507 // Get our branches
3508 vector<string> names;
3509 get_branches(app, names);
3510 for (size_t i = 0; i < names.size(); ++i)
3511 {
3512 if(branchnames.find(names[i]) != branchnames.end())
3513 {
3514 // branch matches, get its certs
3515 vector< revision<cert> > certs;
3516 base64<cert_value> encoded_name;
3517 encode_base64(cert_value(names[i]),encoded_name);
3518 app.db.get_revision_certs(branch_cert_name, encoded_name, certs);
3519 for (vector< revision<cert> >::const_iterator j = certs.begin();
3520 j != certs.end(); j++)
3521 {
3522 insert_with_parents(revision_id(j->inner().ident),
3523 revision_ids, app, revisions_ticker);
3524 // branch certs go in here, others later on
3525 hexenc<id> hash;
3526 cert_hash_code(j->inner(), hash);
3527 insert_into_merkle_tree(*ctab, cert_item, true, hash, 0);
3528 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3529 inserted_keys.insert(j->inner().key);
3530 }
3531 }
3532 }
3533 }
3534
3535 {
3536 map<cert_value, epoch_data> epochs;
3537 app.db.get_epochs(epochs);
3538
3539 epoch_data epoch_zero(std::string(constants::epochlen, '0'));
3540 for (std::set<utf8>::const_iterator i = branchnames.begin();
3541 i != branchnames.end(); ++i)
3542 {
3543 cert_value branch((*i)());
3544 std::map<cert_value, epoch_data>::const_iterator j;
3545 j = epochs.find(branch);
3546 // set to zero any epoch which is not yet set
3547 if (j == epochs.end())
3548 {
3549 L(F("setting epoch on %s to zero\n") % branch);
3550 epochs.insert(std::make_pair(branch, epoch_zero));
3551 app.db.set_epoch(branch, epoch_zero);
3552 }
3553 // then insert all epochs into merkle tree
3554 j = epochs.find(branch);
3555 I(j != epochs.end());
3556 epoch_id eid;
3557 epoch_hash_code(j->first, j->second, eid);
3558 insert_into_merkle_tree(*etab, epoch_item, true, eid.inner(), 0);
3559 }
3560 }
3561
3562 {
3563 typedef std::vector< std::pair<hexenc<id>,
3564 std::pair<revision_id, rsa_keypair_id> > > cert_idx;
3565
3566 cert_idx idx;
3567 app.db.get_revision_cert_nobranch_index(idx);
3568
3569 // insert all non-branch certs reachable via these revisions
3570 // (branch certs were inserted earlier)
3571 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3572 {
3573 hexenc<id> const & hash = i->first;
3574 revision_id const & ident = i->second.first;
3575 rsa_keypair_id const & key = i->second.second;
3576
3577 if (revision_ids.find(ident) == revision_ids.end())
3578 continue;
3579
3580 insert_into_merkle_tree(*ctab, cert_item, true, hash, 0);
3581 ++certs_ticker;
3582 if (inserted_keys.find(key) == inserted_keys.end())
3583 inserted_keys.insert(key);
3584 }
3585 }
3586
3587 // add any keys specified on the command line
3588 for (vector<rsa_keypair_id>::const_iterator key = app.keys_to_push.begin();
3589 key != app.keys_to_push.end(); ++key)
3590 {
3591 if (inserted_keys.find(*key) == inserted_keys.end())
3592 {
3593 if (!app.db.public_key_exists(*key))
3594 {
3595 if (app.keys.key_pair_exists(*key))
3596 app.keys.ensure_in_database(*key);
3597 else
3598 W(F("Cannot find key '%s'") % *key);
3599 }
3600 inserted_keys.insert(*key);
3601 }
3602 }
3603 // insert all the keys
3604 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3605 key != inserted_keys.end(); key++)
3606 {
3607 if (app.db.public_key_exists(*key))
3608 {
3609 base64<rsa_pub_key> pub_encoded;
3610 app.db.get_key(*key, pub_encoded);
3611 hexenc<id> keyhash;
3612 key_hash_code(*key, pub_encoded, keyhash);
3613 insert_into_merkle_tree(*ktab, key_item, true, keyhash, 0);
3614 ++keys_ticker;
3615 }
3616 }
3617
3618 recalculate_merkle_codes(*etab, get_root_prefix().val, 0);
3619 recalculate_merkle_codes(*ktab, get_root_prefix().val, 0);
3620 recalculate_merkle_codes(*ctab, get_root_prefix().val, 0);
3621}
3622
3623void
3624run_netsync_protocol(protocol_voice voice,
3625 protocol_role role,
3626 utf8 const & addr,
3627 utf8 const & include_pattern,
3628 utf8 const & exclude_pattern,
3629 app_state & app)
3630{
3631 try
3632 {
3633 if (voice == server_voice)
3634 {
3635 serve_connections(role, include_pattern, exclude_pattern, app,
3636 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3637 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3638 static_cast<unsigned long>(constants::netsync_connection_limit));
3639 }
3640 else
3641 {
3642 I(voice == client_voice);
3643 transaction_guard guard(app.db);
3644 call_server(role, include_pattern, exclude_pattern, app,
3645 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3646 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3647 guard.commit();
3648 }
3649 }
3650 catch (Netxx::NetworkException & e)
3651 {
3652 throw informative_failure((F("network error: %s") % e.what()).str());
3653 }
3654 catch (Netxx::Exception & e)
3655 {
3656 throw oops((F("network error: %s") % e.what()).str());;
3657 }
3658}
3659
3660
3661// Steps for determining files/manifests to request, from
3662// a given revision ancestry:
3663//
3664// 1) find the new heads, consume valid branch certs etc.
3665//
3666// 2) foreach new head, traverse up the revision ancestry, building
3667// a set of reverse file/manifest deltas (we stop when we hit an
3668// already-seen or existing-in-db rev).
3669//
3670// at the same time, build a (smaller) set of forward deltas (files and
3671// manifests). these have a file/manifest in the new head as the
3672// destination, and end up having an item already existing in the
3673// database as the source (or null, in which case full data is
3674// requested).
3675//
3676// 3) For each file/manifest in head, first request the forward delta
3677// (or full data if there is no path back to existing data). Then
3678// traverse up the set of reverse deltas, daisychaining our way until
3679// we get to existing revisions.
3680//
3681// Notes:
3682//
3683// - The database stores reverse deltas, so preferring these allows
3684// a server to send pre-computed deltas straight from the database
3685// (this isn't done yet). In order to bootstrap the tip-most data,
3686// forward deltas from a close(est?)-ancestor are used, or full data
3687// is requested if there is no existing ancestor.
3688//
3689// eg, if we have the (manifest) ancestry
3690// A -> B -> C -> D
3691// where A is existing, {B,C,D} are new, then we will request deltas
3692// A->D (fwd)
3693// D->C (rev)
3694// C->B (rev)
3695// This may result in slightly larger deltas than using all forward
3696// deltas, however it should be more efficient.
3697//
3698// - in order to keep a good hit ratio with the reconstructed version
3699// cache in database, we'll request deltas for a single file/manifest
3700// all at once, rather than requesting deltas per-revision. This
3701// requires a bit more memory usage, though it will be less memory
3702// than would be required to store all the outgoing delta requests
3703// anyway.
3704ancestry_fetcher::ancestry_fetcher(session & s)
3705 : sess(s)
3706{
3707 set<revision_id> new_heads;
3708 sess.get_heads_and_consume_certs( new_heads );
3709
3710 L(F("ancestry_fetcher: got %d heads") % new_heads.size());
3711
3712 traverse_ancestry(new_heads);
3713
3714 request_files();
3715 request_manifests();
3716}
3717
3718// adds file deltas from the given changeset into the sets of forward
3719// and reverse deltas
3720void
3721ancestry_fetcher::traverse_files(change_set const & cset)
3722{
3723 for (change_set::delta_map::const_iterator d = cset.deltas.begin();
3724 d != cset.deltas.end(); ++d)
3725 {
3726 file_id parent_file (delta_entry_src(d));
3727 file_id child_file (delta_entry_dst(d));
3728 MM(parent_file);
3729 MM(child_file);
3730
3731 I(!(parent_file == child_file));
3732 // when changeset format is altered to have [...]->[] deltas on deletion,
3733 // this assertion needs revisiting
3734 I(!null_id(child_file));
3735
3736 // request the reverse delta
3737 if (!null_id(parent_file))
3738 {
3739 rev_file_deltas.insert(make_pair(child_file, parent_file));
3740 }
3741
3742 // add any new forward deltas
3743 if (seen_files.find(child_file) == seen_files.end())
3744 {
3745 fwd_file_deltas.insert( make_pair( parent_file, child_file ) );
3746 }
3747
3748 // update any forward deltas. no point updating if it already
3749 // points to something we have.
3750 if (!null_id(parent_file)
3751 && fwd_file_deltas.find(child_file) != fwd_file_deltas.end())
3752 {
3753 // We're traversing with child->parent of A->B.
3754 // Update any forward deltas with a parent of B to
3755 // have A as a parent, ie B->C becomes A->C.
3756 for (multimap<file_id,file_id>::iterator d =
3757 fwd_file_deltas.lower_bound(child_file);
3758 d != fwd_file_deltas.upper_bound(child_file);
3759 d++)
3760 {
3761 fwd_file_deltas.insert(make_pair(parent_file, d->second));
3762 }
3763
3764 fwd_file_deltas.erase(fwd_file_deltas.lower_bound(child_file),
3765 fwd_file_deltas.upper_bound(child_file));
3766 }
3767
3768 seen_files.insert(child_file);
3769 seen_files.insert(parent_file);
3770 }
3771}
3772
3773// adds the given manifest deltas to the sets of forward and reverse deltas
3774void
3775ancestry_fetcher::traverse_manifest(manifest_id const & child_man,
3776 manifest_id const & parent_man)
3777{
3778 MM(child_man);
3779 MM(parent_man);
3780 I(!null_id(child_man));
3781 // add reverse deltas
3782 if (!null_id(parent_man))
3783 {
3784 rev_manifest_deltas.insert(make_pair(child_man, parent_man));
3785 }
3786
3787 // handle the manifest forward-deltas
3788 if (!null_id(parent_man)
3789 // don't update child to itself, it makes the loop iterate infinitely.
3790 && !(parent_man == child_man)
3791 && fwd_manifest_deltas.find(child_man) != fwd_manifest_deltas.end())
3792 {
3793 // We're traversing with child->parent of A->B.
3794 // Update any forward deltas with a parent of B to
3795 // have A as a parent, ie B->C becomes A->C.
3796 for (multimap<manifest_id,manifest_id>::iterator d =
3797 fwd_manifest_deltas.lower_bound(child_man);
3798 d != fwd_manifest_deltas.upper_bound(child_man);
3799 d++)
3800 {
3801 fwd_manifest_deltas.insert(make_pair(parent_man, d->second));
3802 }
3803
3804 fwd_manifest_deltas.erase(fwd_manifest_deltas.lower_bound(child_man),
3805 fwd_manifest_deltas.upper_bound(child_man));
3806 }
3807}
3808
3809// traverse up the ancestry for each of the given new head revisions,
3810// storing sets of file and manifest deltas
3811void
3812ancestry_fetcher::traverse_ancestry(set<revision_id> const & heads)
3813{
3814 deque<revision_id> frontier;
3815 set<revision_id> seen_revs;
3816
3817 for (set<revision_id>::const_iterator h = heads.begin();
3818 h != heads.end(); h++)
3819 {
3820 L(F("traversing head %s") % *h);
3821 frontier.push_back(*h);
3822 seen_revs.insert(*h);
3823 manifest_id const & m = sess.ancestry[*h]->second.new_manifest;
3824 fwd_manifest_deltas.insert(make_pair(m,m));
3825 }
3826
3827 // breadth first up the ancestry
3828 while (!frontier.empty())
3829 {
3830 revision_id const & rev = frontier.front();
3831 MM(rev);
3832
3833 I(sess.ancestry.find(rev) != sess.ancestry.end());
3834
3835 for (edge_map::const_iterator e = sess.ancestry[rev]->second.edges.begin();
3836 e != sess.ancestry[rev]->second.edges.end(); e++)
3837 {
3838 revision_id const & par = edge_old_revision(e);
3839 MM(par);
3840 if (seen_revs.find(par) == seen_revs.end())
3841 {
3842 if (sess.ancestry.find(par) != sess.ancestry.end())
3843 {
3844 frontier.push_back(par);
3845 }
3846 seen_revs.insert(par);
3847 }
3848
3849 traverse_manifest(sess.ancestry[rev]->second.new_manifest,
3850 edge_old_manifest(e));
3851 traverse_files(edge_changes(e));
3852
3853 }
3854
3855 sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first);
3856 frontier.pop_front();
3857 }
3858}
3859
3860void
3861ancestry_fetcher::request_rev_file_deltas(file_id const & start,
3862 set<file_id> & done_files)
3863{
3864 stack< file_id > frontier;
3865 frontier.push(start);
3866
3867 while (!frontier.empty())
3868 {
3869 file_id const child = frontier.top();
3870 MM(child);
3871 I(!null_id(child));
3872 frontier.pop();
3873
3874 for (multimap< file_id, file_id>::const_iterator
3875 d = rev_file_deltas.lower_bound(child);
3876 d != rev_file_deltas.upper_bound(child);
3877 d++)
3878 {
3879 file_id const & parent = d->second;
3880 MM(parent);
3881 I(!null_id(parent));
3882 if (done_files.find(parent) == done_files.end())
3883 {
3884 done_files.insert(parent);
3885 if (!sess.app.db.file_version_exists(parent))
3886 {
3887 sess.queue_send_delta_cmd(file_item,
3888 plain_id(child), plain_id(parent));
3889 sess.reverse_delta_requests.insert(make_pair(plain_id(child),
3890 plain_id(parent)));
3891 }
3892 frontier.push(parent);
3893 }
3894 }
3895 }
3896}
3897
3898void
3899ancestry_fetcher::request_files()
3900{
3901 // just a cache to avoid checking db.foo_version_exists() too much
3902 set<file_id> done_files;
3903
3904 for (multimap<file_id,file_id>::const_iterator d = fwd_file_deltas.begin();
3905 d != fwd_file_deltas.end(); d++)
3906 {
3907 file_id const & anc = d->first;
3908 file_id const & child = d->second;
3909 MM(anc);
3910 MM(child);
3911 if (!sess.app.db.file_version_exists(child))
3912 {
3913 if (null_id(anc)
3914 || !sess.app.db.file_version_exists(anc))
3915 {
3916 sess.queue_send_data_cmd(file_item, plain_id(child));
3917 }
3918 else
3919 {
3920 sess.queue_send_delta_cmd(file_item,
3921 plain_id(anc), plain_id(child));
3922 sess.note_item_full_delta(file_item, plain_id(child));
3923 }
3924 }
3925
3926 // traverse up the reverse deltas
3927 request_rev_file_deltas(child, done_files);
3928 }
3929}
3930
3931void
3932ancestry_fetcher::request_rev_manifest_deltas(manifest_id const & start,
3933 set<manifest_id> & done_manifests)
3934{
3935 stack< manifest_id > frontier;
3936 frontier.push(start);
3937
3938 while (!frontier.empty())
3939 {
3940 manifest_id const child = frontier.top();
3941 MM(child);
3942 I(!null_id(child));
3943 frontier.pop();
3944
3945 for (multimap< manifest_id, manifest_id>::const_iterator
3946 d = rev_manifest_deltas.lower_bound(child);
3947 d != rev_manifest_deltas.upper_bound(child);
3948 d++)
3949 {
3950 manifest_id const & parent = d->second;
3951 MM(parent);
3952 I(!null_id(parent));
3953 if (done_manifests.find(parent) == done_manifests.end())
3954 {
3955 done_manifests.insert(parent);
3956 if (!sess.app.db.manifest_version_exists(parent))
3957 {
3958 sess.queue_send_delta_cmd(manifest_item,
3959 plain_id(child), plain_id(parent));
3960 sess.reverse_delta_requests.insert(make_pair(plain_id(child),
3961 plain_id(parent)));
3962 }
3963 frontier.push(parent);
3964 }
3965 }
3966 }
3967}
3968
3969// could try and make this a template function, is the same as request_files(),
3970// though it calls non-template functions
3971void
3972ancestry_fetcher::request_manifests()
3973{
3974 // just a cache to avoid checking db.foo_version_exists() too much
3975 set<manifest_id> done_manifests;
3976
3977 for (multimap<manifest_id,manifest_id>::const_iterator d = fwd_manifest_deltas.begin();
3978 d != fwd_manifest_deltas.end(); d++)
3979 {
3980 manifest_id const & anc = d->first;
3981 manifest_id const & child = d->second;
3982 MM(anc);
3983 MM(child);
3984 if (!sess.app.db.manifest_version_exists(child))
3985 {
3986 if (null_id(anc)
3987 || !sess.app.db.manifest_version_exists(anc))
3988 {
3989 sess.queue_send_data_cmd(manifest_item, plain_id(child));
3990 }
3991 else
3992 {
3993 sess.queue_send_delta_cmd(manifest_item,
3994 plain_id(anc), plain_id(child));
3995 sess.note_item_full_delta(manifest_item, plain_id(child));
3996 }
3997 }
3998
3999 // traverse up the reverse deltas
4000 request_rev_manifest_deltas(child, done_manifests);
4001 }
4002}

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status