monotone

monotone Mtn Source Tree

Root/netsync.cc

1// copyright (C) 2004 graydon hoare <graydon@pobox.com>
2// all rights reserved.
3// licensed to the public under the terms of the GNU GPL (>= 2)
4// see the file COPYING for details
5
6#include <map>
7#include <string>
8#include <memory>
9#include <list>
10#include <deque>
11
12#include <time.h>
13
14#include <boost/lexical_cast.hpp>
15#include <boost/scoped_ptr.hpp>
16#include <boost/shared_ptr.hpp>
17#include <boost/bind.hpp>
18#include <boost/regex.hpp>
19
20#include "app_state.hh"
21#include "cert.hh"
22#include "constants.hh"
23#include "keys.hh"
24#include "merkle_tree.hh"
25#include "netcmd.hh"
26#include "netio.hh"
27#include "netsync.hh"
28#include "numeric_vocab.hh"
29#include "packet.hh"
30#include "sanity.hh"
31#include "transforms.hh"
32#include "ui.hh"
33#include "xdelta.hh"
34#include "epoch.hh"
35#include "platform.hh"
36#include "hmac.hh"
37#include "globish.hh"
38
39#include "botan/botan.h"
40
41#include "netxx/address.h"
42#include "netxx/peer.h"
43#include "netxx/probe.h"
44#include "netxx/socket.h"
45#include "netxx/stream.h"
46#include "netxx/streamserver.h"
47#include "netxx/timeout.h"
48
49// TODO: things to do that will break protocol compatibility
50// -- need some way to upgrade anonymous to keyed pull, without user having
51// to explicitly specify which they want
52// just having a way to respond "access denied, try again" might work
53// but perhaps better to have the anonymous command include a note "I
54// _could_ use key <...> if you prefer", and if that would lead to more
55// access, could reply "I do prefer". (Does this lead to too much
56// information exposure? Allows anonymous people to probe what branches
57// a key has access to.)
58// -- "warning" packet type?
59// -- Richard Levitte wants, when you (e.g.) request '*' but don't access to
60// all of it, you just get the parts you have access to (maybe with
61// warnings about skipped branches). to do this right, should have a way
62// for the server to send back to the client "right, you're not getting
63// the following branches: ...", so the client will not include them in
64// its merkle trie.
65// -- add some sort of vhost field to the client's first packet, saying who
66// they expect to talk to
67// -- connection teardown is flawed:
68// -- simple bug: often connections "fail" even though they succeeded.
69// should figure out why. (Possibly one side doesn't wait for their
70// goodbye packet to drain before closing the socket?)
71// -- subtle misdesign: "goodbye" packets indicate completion of data
72// transfer. they do not indicate that data has been written to
73// disk. there should be some way to indicate that data has been
74// successfully written to disk. See message (and thread)
75// <E0420553-34F3-45E8-9DA4-D8A5CB9B0600@hsdev.com> on
76// monotone-devel.
77// -- apparently we have a IANA approved port: 4691. I guess we should
78// switch to using that.
79// (It's registered under the name "netsync". "monotone" would probably
80// be better, but I don't know how possible it is to change this...)
81
82//
83// this is the "new" network synchronization (netsync) system in
84// monotone. it is based on synchronizing a pair of merkle trees over an
85// interactive connection.
86//
87// a netsync process between peers treats each peer as either a source, a
88// sink, or both. when a peer is only a source, it will not write any new
89// items to its database. when a peer is only a sink, it will not send any
90// items from its database. when a peer is both a source and sink, it may
91// send and write items freely.
92//
93// the post-state of a netsync is that each sink contains a superset of the
94// items in its corresponding source; when peers are behaving as both
95// source and sink, this means that the post-state of the sync is for the
96// peers to have identical item sets.
97//
98// a peer can be a sink in at most one netsync process at a time; it can
99// however be a source for multiple netsyncs simultaneously.
100//
101//
102// data structure
103// --------------
104//
105// each node in a merkle tree contains a fixed number of slots. this number
106// is derived from a global parameter of the protocol -- the tree fanout --
107// such that the number of slots is 2^fanout. for now we will assume that
108// fanout is 4 thus there are 16 slots in a node, because this makes
109// illustration easier. the other parameter of the protocol is the size of
110// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
111//
112// each slot in a merkle tree node is in one of 4 states:
113//
114// - empty
115// - live leaf
116// - dead leaf
117// - subtree
118//
119// in addition, each live or dead leaf contains a hash code which
120// identifies an element of the set being synchronized. each subtree slot
121// contains a hash code of the node immediately beneath it in the merkle
122// tree. empty slots contain no hash codes.
123//
124// each node also summarizes, for sake of statistic-gathering, the number
125// of set elements and total number of bytes in all of its subtrees, each
126// stored as a size_t and sent as a uleb128.
127//
128// since empty slots have no hash code, they are represented implicitly by
129// a bitmap at the head of each merkle tree node. as an additional
130// integrity check, each merkle tree node contains a label indicating its
131// prefix in the tree, and a hash of its own contents.
132//
133// in total, then, the byte-level representation of a <160,4> merkle tree
134// node is as follows:
135//
136// 20 bytes - hash of the remaining bytes in the node
137// 1 byte - type of this node (manifest, file, key, mcert, fcert)
138// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
139// 0-20 bytes - the prefix of this node, 4 bits * level,
140// rounded up to a byte
141// 1-N bytes - number of leaves under this node (uleb128)
142// 4 bytes - slot-state bitmap of the node
143// 0-320 bytes - between 0 and 16 live slots in the node
144//
145// so, in the worst case such a node is 367 bytes, with these parameters.
146//
147//
148// protocol
149// --------
150//
151// The protocol is a simple binary command-packet system over TCP;
152// each packet consists of a single byte which identifies the protocol
153// version, a byte which identifies the command name inside that
154// version, a size_t sent as a uleb128 indicating the length of the
155// packet, that many bytes of payload, and finally 20 bytes of SHA-1
156// HMAC calculated over the payload. The key for the SHA-1 HMAC is 20
157// bytes of 0 during authentication, and a 20-byte random key chosen
158// by the client after authentication (discussed below).
159// decoding involves simply buffering until a sufficient number of bytes are
160// received, then advancing the buffer pointer. any time an integrity check
161// (the HMAC) fails, the protocol is assumed to have lost synchronization, and
162// the connection is dropped. the parties are free to drop the tcp stream at
163// any point, if too much data is received or too much idle time passes; no
164// commitments or transactions are made.
165//
166// one special command, "bye", is used to shut down a connection
167// gracefully. once each side has received all the data they want, they
168// can send a "bye" command to the other side. as soon as either side has
169// both sent and received a "bye" command, they drop the connection. if
170// either side sees an i/o failure (dropped connection) after they have
171// sent a "bye" command, they consider the shutdown successful.
172//
173// the exchange begins in a non-authenticated state. the server sends a
174// "hello <id> <nonce>" command, which identifies the server's RSA key and
175// issues a nonce which must be used for a subsequent authentication.
176//
177// The client then responds with either:
178//
179// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
180// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
181// role it wishes to play in the synchronization, identifies the pattern it
182// wishes to sync with, signs the previous nonce with its own key, and informs
183// the server of the HMAC key it wishes to use for this session (encrypted
184// with the server's public key); or
185//
186// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
187// <hmac key>" command, which identifies the role it wishes to play in the
188// synchronization, the pattern it ishes to sync with, and the HMAC key it
189// wishes to use for this session (also encrypted with the server's public
190// key).
191//
192// The server then replies with a "confirm" command, which contains no
193// other data but will only have the correct HMAC integrity code if
194// the server received and properly decrypted the HMAC key offered by
195// the client. This transitions the peers into an authenticated state
196// and begins refinement.
197//
198// refinement begins with the client sending its root public key and
199// manifest certificate merkle nodes to the server. the server then
200// compares the root to each slot in *its* root node, and for each slot
201// either sends refined subtrees to the client, or (if it detects a missing
202// item in one pattern or the other) sends either "data" or "send_data"
203// commands corresponding to the role of the missing item (source or
204// sink). the client then receives each refined subtree and compares it
205// with its own, performing similar description/request behavior depending
206// on role, and the cycle continues.
207//
208// detecting the end of refinement is subtle: after sending the refinement
209// of the root node, the server sends a "done 0" command (queued behind all
210// the other refinement traffic). when either peer receives a "done N"
211// command it immediately responds with a "done N+1" command. when two done
212// commands for a given merkle tree arrive with no interveining refinements,
213// the entire merkle tree is considered complete.
214//
215// any "send_data" command received prompts a "data" command in response,
216// if the requested item exists. if an item does not exist, a "nonexistant"
217// response command is sent.
218//
219// once a response is received for each requested key and revision cert
220// (either data or nonexistant) the requesting party walks the graph of
221// received revision certs and transmits send_data or send_delta commands
222// for all the revisions mentionned in the certs which it does not already
223// have in its database.
224//
225// for each revision it receives, the recipient requests all the file data or
226// deltas described in that revision.
227//
228// once all requested files, manifests, revisions and certs are received (or
229// noted as nonexistant), the recipient closes its connection.
230//
231// (aside: this protocol is raw binary because coding density is actually
232// important here, and each packet consists of very information-dense
233// material that you wouldn't have a hope of typing in manually anyways)
234//
235
236using namespace std;
237using boost::shared_ptr;
238using boost::lexical_cast;
239
240static inline void
241require(bool check, string const & context)
242{
243 if (!check)
244 throw bad_decode(F("check of '%s' failed") % context);
245}
246
247struct
248done_marker
249{
250 bool current_level_had_refinements;
251 bool tree_is_done;
252 done_marker() :
253 current_level_had_refinements(false),
254 tree_is_done(false)
255 {}
256};
257
258struct
259session
260{
261 protocol_role role;
262 protocol_voice const voice;
263 utf8 const & our_include_pattern;
264 utf8 const & our_exclude_pattern;
265 globish_matcher our_matcher;
266 app_state & app;
267
268 string peer_id;
269 Netxx::socket_type fd;
270 Netxx::Stream str;
271
272 string_queue inbuf;
273 // deque of pair<string data, size_t cur_pos>
274 deque< pair<string,size_t> > outbuf;
275 // the total data stored in outbuf - this is
276 // used as a valve to stop too much data
277 // backing up
278 size_t outbuf_size;
279
280 netcmd cmd;
281 bool armed;
282 bool arm();
283
284 id remote_peer_key_hash;
285 rsa_keypair_id remote_peer_key_name;
286 netsync_session_key session_key;
287 chained_hmac read_hmac;
288 chained_hmac write_hmac;
289 bool authenticated;
290
291 time_t last_io_time;
292 auto_ptr<ticker> byte_in_ticker;
293 auto_ptr<ticker> byte_out_ticker;
294 auto_ptr<ticker> cert_in_ticker;
295 auto_ptr<ticker> cert_out_ticker;
296 auto_ptr<ticker> revision_in_ticker;
297 auto_ptr<ticker> revision_out_ticker;
298 auto_ptr<ticker> revision_checked_ticker;
299
300 vector<revision_id> written_revisions;
301 vector<rsa_keypair_id> written_keys;
302 vector<cert> written_certs;
303
304 map<netcmd_item_type, boost::shared_ptr<merkle_table> > merkle_tables;
305
306 map<netcmd_item_type, done_marker> done_refinements;
307 map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items;
308 map<netcmd_item_type, boost::shared_ptr< set<id> > > received_items;
309 map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry;
310 map<revision_id, map<cert_name, vector<cert> > > received_certs;
311 set< pair<id, id> > reverse_delta_requests;
312 bool analyzed_ancestry;
313
314 id saved_nonce;
315 bool received_goodbye;
316 bool sent_goodbye;
317
318 packet_db_valve dbw;
319
320 session(protocol_role role,
321 protocol_voice voice,
322 utf8 const & our_include_pattern,
323 utf8 const & our_exclude_pattern,
324 app_state & app,
325 string const & peer,
326 Netxx::socket_type sock,
327 Netxx::Timeout const & to);
328
329 virtual ~session();
330
331 void rev_written_callback(revision_id rid);
332 void key_written_callback(rsa_keypair_id kid);
333 void cert_written_callback(cert const & c);
334
335 id mk_nonce();
336 void mark_recent_io();
337
338 void set_session_key(string const & key);
339 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
340
341 void setup_client_tickers();
342
343 bool done_all_refinements();
344 bool cert_refinement_done();
345 bool all_requested_revisions_received();
346
347 void note_item_requested(netcmd_item_type ty, id const & i);
348 bool item_already_requested(netcmd_item_type ty, id const & i);
349 void note_item_arrived(netcmd_item_type ty, id const & i);
350
351 void maybe_note_epochs_finished();
352
353 void note_item_sent(netcmd_item_type ty, id const & i);
354
355 bool got_all_data();
356 void maybe_say_goodbye();
357
358 void analyze_attachment(revision_id const & i,
359 set<revision_id> & visited,
360 map<revision_id, bool> & attached);
361 void request_rev_revisions(revision_id const & init,
362 map<revision_id, bool> attached,
363 set<revision_id> visited);
364 void request_fwd_revisions(revision_id const & i,
365 map<revision_id, bool> attached,
366 set<revision_id> & visited);
367 void analyze_ancestry_graph();
368 void analyze_manifest(manifest_map const & man);
369
370 Netxx::Probe::ready_type which_events() const;
371 bool read_some();
372 bool write_some();
373
374 bool encountered_error;
375 void error(string const & errmsg);
376
377 void write_netcmd_and_try_flush(netcmd const & cmd);
378 void queue_bye_cmd();
379 void queue_error_cmd(string const & errmsg);
380 void queue_done_cmd(size_t level, netcmd_item_type type);
381 void queue_hello_cmd(id const & server,
382 id const & nonce);
383 void queue_anonymous_cmd(protocol_role role,
384 utf8 const & include_pattern,
385 utf8 const & exclude_pattern,
386 id const & nonce2,
387 base64<rsa_pub_key> server_key_encoded);
388 void queue_auth_cmd(protocol_role role,
389 utf8 const & include_pattern,
390 utf8 const & exclude_pattern,
391 id const & client,
392 id const & nonce1,
393 id const & nonce2,
394 string const & signature,
395 base64<rsa_pub_key> server_key_encoded);
396 void queue_confirm_cmd();
397 void queue_refine_cmd(merkle_node const & node);
398 void queue_send_data_cmd(netcmd_item_type type,
399 id const & item);
400 void queue_send_delta_cmd(netcmd_item_type type,
401 id const & base,
402 id const & ident);
403 void queue_data_cmd(netcmd_item_type type,
404 id const & item,
405 string const & dat);
406 void queue_delta_cmd(netcmd_item_type type,
407 id const & base,
408 id const & ident,
409 delta const & del);
410 void queue_nonexistant_cmd(netcmd_item_type type,
411 id const & item);
412
413 bool process_bye_cmd();
414 bool process_error_cmd(string const & errmsg);
415 bool process_done_cmd(size_t level, netcmd_item_type type);
416 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
417 rsa_pub_key const & server_key,
418 id const & nonce);
419 bool process_anonymous_cmd(protocol_role role,
420 utf8 const & their_include_pattern,
421 utf8 const & their_exclude_pattern);
422 bool process_auth_cmd(protocol_role role,
423 utf8 const & their_include_pattern,
424 utf8 const & their_exclude_pattern,
425 id const & client,
426 id const & nonce1,
427 string const & signature);
428 bool process_confirm_cmd(string const & signature);
429 void respond_to_confirm_cmd();
430 bool process_refine_cmd(merkle_node const & node);
431 bool process_send_data_cmd(netcmd_item_type type,
432 id const & item);
433 bool process_send_delta_cmd(netcmd_item_type type,
434 id const & base,
435 id const & ident);
436 bool process_data_cmd(netcmd_item_type type,
437 id const & item,
438 string const & dat);
439 bool process_delta_cmd(netcmd_item_type type,
440 id const & base,
441 id const & ident,
442 delta const & del);
443 bool process_nonexistant_cmd(netcmd_item_type type,
444 id const & item);
445 bool process_usher_cmd(utf8 const & msg);
446
447 bool merkle_node_exists(netcmd_item_type type,
448 size_t level,
449 prefix const & pref);
450
451 void load_merkle_node(netcmd_item_type type,
452 size_t level,
453 prefix const & pref,
454 merkle_ptr & node);
455
456 void rebuild_merkle_trees(app_state & app,
457 set<utf8> const & branches);
458
459 bool dispatch_payload(netcmd const & cmd);
460 void begin_service();
461 bool process();
462};
463
464
465struct
466root_prefix
467{
468 prefix val;
469 root_prefix() : val("")
470 {}
471};
472
473static root_prefix const &
474get_root_prefix()
475{
476 // this is not a static variable for a bizarre reason: mac OSX runs
477 // static initializers in the "wrong" order (application before
478 // libraries), so the initializer for a static string in cryptopp runs
479 // after the initializer for a static variable outside a function
480 // here. therefore encode_hexenc() fails in the static initializer here
481 // and the program crashes. curious, eh?
482 static root_prefix ROOT_PREFIX;
483 return ROOT_PREFIX;
484}
485
486
487session::session(protocol_role role,
488 protocol_voice voice,
489 utf8 const & our_include_pattern,
490 utf8 const & our_exclude_pattern,
491 app_state & app,
492 string const & peer,
493 Netxx::socket_type sock,
494 Netxx::Timeout const & to) :
495 role(role),
496 voice(voice),
497 our_include_pattern(our_include_pattern),
498 our_exclude_pattern(our_exclude_pattern),
499 our_matcher(our_include_pattern, our_exclude_pattern),
500 app(app),
501 peer_id(peer),
502 fd(sock),
503 str(sock, to),
504 inbuf(),
505 outbuf_size(0),
506 armed(false),
507 remote_peer_key_hash(""),
508 remote_peer_key_name(""),
509 session_key(constants::netsync_key_initializer),
510 read_hmac(constants::netsync_key_initializer),
511 write_hmac(constants::netsync_key_initializer),
512 authenticated(false),
513 last_io_time(::time(NULL)),
514 byte_in_ticker(NULL),
515 byte_out_ticker(NULL),
516 cert_in_ticker(NULL),
517 cert_out_ticker(NULL),
518 revision_in_ticker(NULL),
519 revision_out_ticker(NULL),
520 revision_checked_ticker(NULL),
521 analyzed_ancestry(false),
522 saved_nonce(""),
523 received_goodbye(false),
524 sent_goodbye(false),
525 dbw(app, true),
526 encountered_error(false)
527{
528 dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
529 this, _1));
530 dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
531 this, _1));
532 dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
533 this, _1));
534
535 done_refinements.insert(make_pair(cert_item, done_marker()));
536 done_refinements.insert(make_pair(key_item, done_marker()));
537 done_refinements.insert(make_pair(epoch_item, done_marker()));
538
539 requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
540 requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
541 requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
542 requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
543 requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
544 requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
545
546 received_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
547 received_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
548 received_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
549 received_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
550 received_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
551 received_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
552}
553
554session::~session()
555{
556 vector<cert> unattached_certs;
557 map<revision_id, vector<cert> > revcerts;
558 for (vector<revision_id>::iterator i = written_revisions.begin();
559 i != written_revisions.end(); ++i)
560 revcerts.insert(make_pair(*i, vector<cert>()));
561 for (vector<cert>::iterator i = written_certs.begin();
562 i != written_certs.end(); ++i)
563 {
564 map<revision_id, vector<cert> >::iterator j;
565 j = revcerts.find(i->ident);
566 if (j == revcerts.end())
567 unattached_certs.push_back(*i);
568 else
569 j->second.push_back(*i);
570 }
571
572 //Keys
573 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
574 i != written_keys.end(); ++i)
575 {
576 app.lua.hook_note_netsync_pubkey_received(*i);
577 }
578 //Revisions
579 for (vector<revision_id>::iterator i = written_revisions.begin();
580 i != written_revisions.end(); ++i)
581 {
582 vector<cert> & ctmp(revcerts[*i]);
583 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
584 for (vector<cert>::const_iterator j = ctmp.begin();
585 j != ctmp.end(); ++j)
586 {
587 cert_value vtmp;
588 decode_base64(j->value, vtmp);
589 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
590 }
591 revision_data rdat;
592 app.db.get_revision(*i, rdat);
593 app.lua.hook_note_netsync_revision_received(*i, rdat, certs);
594 }
595 //Certs (not attached to a new revision)
596 for (vector<cert>::iterator i = unattached_certs.begin();
597 i != unattached_certs.end(); ++i)
598 {
599 cert_value tmp;
600 decode_base64(i->value, tmp);
601 app.lua.hook_note_netsync_cert_received(i->ident, i->key,
602 i->name, tmp);
603
604 }
605}
606
607void session::rev_written_callback(revision_id rid)
608{
609 if (revision_checked_ticker.get())
610 ++(*revision_checked_ticker);
611 written_revisions.push_back(rid);
612}
613
614void session::key_written_callback(rsa_keypair_id kid)
615{
616 written_keys.push_back(kid);
617}
618
619void session::cert_written_callback(cert const & c)
620{
621 written_certs.push_back(c);
622}
623
624id
625session::mk_nonce()
626{
627 I(this->saved_nonce().size() == 0);
628 char buf[constants::merkle_hash_length_in_bytes];
629 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
630 constants::merkle_hash_length_in_bytes);
631 this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
632 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
633 return this->saved_nonce;
634}
635
636void
637session::mark_recent_io()
638{
639 last_io_time = ::time(NULL);
640}
641
642void
643session::set_session_key(string const & key)
644{
645 session_key = netsync_session_key(key);
646 read_hmac.set_key(session_key);
647 write_hmac.set_key(session_key);
648}
649
650void
651session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
652{
653 base64< arc4<rsa_priv_key> > our_priv;
654 load_priv_key(app, app.signing_key, our_priv);
655 string hmac_key;
656 decrypt_rsa(app.lua, app.signing_key, our_priv, hmac_key_encrypted, hmac_key);
657 set_session_key(hmac_key);
658}
659
660void
661session::setup_client_tickers()
662{
663 // xgettext: please use short message and try to avoid multibytes chars
664 byte_in_ticker.reset(new ticker(_("bytes in"), ">", 1024, true));
665 // xgettext: please use short message and try to avoid multibytes chars
666 byte_out_ticker.reset(new ticker(_("bytes out"), "<", 1024, true));
667 if (role == sink_role)
668 {
669 // xgettext: please use short message and try to avoid multibytes chars
670 revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1));
671 // xgettext: please use short message and try to avoid multibytes chars
672 cert_in_ticker.reset(new ticker(_("certs in"), "c", 3));
673 // xgettext: please use short message and try to avoid multibytes chars
674 revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
675 }
676 else if (role == source_role)
677 {
678 // xgettext: please use short message and try to avoid multibytes chars
679 cert_out_ticker.reset(new ticker(_("certs out"), "C", 3));
680 // xgettext: please use short message and try to avoid multibytes chars
681 revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
682 }
683 else
684 {
685 I(role == source_and_sink_role);
686 // xgettext: please use short message and try to avoid multibytes chars
687 revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1));
688 // xgettext: please use short message and try to avoid multibytes chars
689 revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
690 // xgettext: please use short message and try to avoid multibytes chars
691 revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
692 }
693}
694
695bool
696session::done_all_refinements()
697{
698 bool all = true;
699 for (map<netcmd_item_type, done_marker>::const_iterator j =
700 done_refinements.begin(); j != done_refinements.end(); ++j)
701 {
702 if (j->second.tree_is_done == false)
703 all = false;
704 }
705 return all;
706}
707
708
709bool
710session::cert_refinement_done()
711{
712 return done_refinements[cert_item].tree_is_done;
713}
714
715bool
716session::got_all_data()
717{
718 for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i =
719 requested_items.begin(); i != requested_items.end(); ++i)
720 {
721 if (! i->second->empty())
722 return false;
723 }
724 return true;
725}
726
727bool
728session::all_requested_revisions_received()
729{
730 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
731 i = requested_items.find(revision_item);
732 I(i != requested_items.end());
733 return i->second->empty();
734}
735
736void
737session::maybe_note_epochs_finished()
738{
739 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
740 i = requested_items.find(epoch_item);
741 I(i != requested_items.end());
742 // Maybe there are outstanding epoch requests.
743 if (!i->second->empty())
744 return;
745 // And maybe we haven't even finished the refinement.
746 if (!done_refinements[epoch_item].tree_is_done)
747 return;
748 // But otherwise, we're ready to go!
749 L(F("all epochs processed, opening database valve\n"));
750 this->dbw.open_valve();
751}
752
753void
754session::note_item_requested(netcmd_item_type ty, id const & ident)
755{
756 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
757 i = requested_items.find(ty);
758 I(i != requested_items.end());
759 i->second->insert(ident);
760}
761
762void
763session::note_item_arrived(netcmd_item_type ty, id const & ident)
764{
765 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
766 i = requested_items.find(ty);
767 I(i != requested_items.end());
768 i->second->erase(ident);
769 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
770 j = received_items.find(ty);
771 I(j != received_items.end());
772 j->second->insert(ident);
773
774
775 switch (ty)
776 {
777 case cert_item:
778 if (cert_in_ticker.get() != NULL)
779 ++(*cert_in_ticker);
780 break;
781 case revision_item:
782 if (revision_in_ticker.get() != NULL)
783 ++(*revision_in_ticker);
784 break;
785 default:
786 // No ticker for other things.
787 break;
788 }
789}
790
791bool
792session::item_already_requested(netcmd_item_type ty, id const & ident)
793{
794 map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i;
795 i = requested_items.find(ty);
796 I(i != requested_items.end());
797 if (i->second->find(ident) != i->second->end())
798 return true;
799 i = received_items.find(ty);
800 I(i != received_items.end());
801 if (i->second->find(ident) != i->second->end())
802 return true;
803 return false;
804}
805
806
807void
808session::note_item_sent(netcmd_item_type ty, id const & ident)
809{
810 switch (ty)
811 {
812 case cert_item:
813 if (cert_out_ticker.get() != NULL)
814 ++(*cert_out_ticker);
815 break;
816 case revision_item:
817 if (revision_out_ticker.get() != NULL)
818 ++(*revision_out_ticker);
819 break;
820 default:
821 // No ticker for other things.
822 break;
823 }
824}
825
826void
827session::write_netcmd_and_try_flush(netcmd const & cmd)
828{
829 if (!encountered_error)
830 {
831 string buf;
832 cmd.write(buf, write_hmac);
833 outbuf.push_back(make_pair(buf, 0));
834 outbuf_size += buf.size();
835 }
836 else
837 L(F("dropping outgoing netcmd (because we're in error unwind mode)\n"));
838 // FIXME: this helps keep the protocol pipeline full but it seems to
839 // interfere with initial and final sequences. careful with it.
840 // write_some();
841 // read_some();
842}
843
844// This method triggers a special "error unwind" mode to netsync. In this
845// mode, all received data is ignored, and no new data is queued. We simply
846// stay connected long enough for the current write buffer to be flushed, to
847// ensure that our peer receives the error message.
848// WARNING WARNING WARNING (FIXME): this does _not_ throw an exception. if
849// while processing any given netcmd packet you encounter an error, you can
850// _only_ call this method if you have not touched the database, because if
851// you have touched the database then you need to throw an exception to
852// trigger a rollback.
853// you could, of course, call this method and then throw an exception, but
854// there is no point in doing that, because throwing the exception will cause
855// the connection to be immediately terminated, so your call to error() will
856// actually have no effect (except to cause your error message to be printed
857// twice).
858void
859session::error(std::string const & errmsg)
860{
861 W(F("error: %s\n") % errmsg);
862 queue_error_cmd(errmsg);
863 encountered_error = true;
864}
865
866void
867session::analyze_manifest(manifest_map const & man)
868{
869 L(F("analyzing %d entries in manifest\n") % man.size());
870 for (manifest_map::const_iterator i = man.begin();
871 i != man.end(); ++i)
872 {
873 if (! this->app.db.file_version_exists(manifest_entry_id(i)))
874 {
875 id tmp;
876 decode_hexenc(manifest_entry_id(i).inner(), tmp);
877 queue_send_data_cmd(file_item, tmp);
878 }
879 }
880}
881
882static bool
883is_attached(revision_id const & i,
884 map<revision_id, bool> const & attach_map)
885{
886 map<revision_id, bool>::const_iterator j = attach_map.find(i);
887 I(j != attach_map.end());
888 return j->second;
889}
890
891// this tells us whether a particular revision is "attached" -- meaning
892// either our database contains the underlying manifest or else one of our
893// parents (recursively, and only in the current ancestry graph we're
894// requesting) is attached. if it's detached we will request it using a
895// different (more efficient and less failure-prone) algorithm
896
897void
898session::analyze_attachment(revision_id const & i,
899 set<revision_id> & visited,
900 map<revision_id, bool> & attached)
901{
902 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
903
904 if (visited.find(i) != visited.end())
905 return;
906
907 visited.insert(i);
908
909 bool curr_attached = false;
910
911 if (app.db.revision_exists(i))
912 {
913 L(F("revision %s is attached via database\n") % i);
914 curr_attached = true;
915 }
916 else
917 {
918 L(F("checking attachment of %s in ancestry\n") % i);
919 ancestryT::const_iterator j = ancestry.find(i);
920 if (j != ancestry.end())
921 {
922 for (edge_map::const_iterator k = j->second->second.edges.begin();
923 k != j->second->second.edges.end(); ++k)
924 {
925 L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k));
926 analyze_attachment(edge_old_revision(k), visited, attached);
927 if (is_attached(edge_old_revision(k), attached))
928 {
929 L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k));
930 curr_attached = true;
931 }
932 }
933 }
934 }
935 if (curr_attached)
936 L(F("decided that revision '%s' is attached\n") % i);
937 else
938 L(F("decided that revision '%s' is not attached\n") % i);
939
940 attached[i] = curr_attached;
941}
942
943inline static id
944plain_id(manifest_id const & i)
945{
946 id tmp;
947 hexenc<id> htmp(i.inner());
948 decode_hexenc(htmp, tmp);
949 return tmp;
950}
951
952inline static id
953plain_id(file_id const & i)
954{
955 id tmp;
956 hexenc<id> htmp(i.inner());
957 decode_hexenc(htmp, tmp);
958 return tmp;
959}
960
961void
962session::request_rev_revisions(revision_id const & init,
963 map<revision_id, bool> attached,
964 set<revision_id> visited)
965{
966 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
967
968 set<manifest_id> seen_manifests;
969 set<file_id> seen_files;
970
971 set<revision_id> frontier;
972 frontier.insert(init);
973 while(!frontier.empty())
974 {
975 set<revision_id> next_frontier;
976 for (set<revision_id>::const_iterator i = frontier.begin();
977 i != frontier.end(); ++i)
978 {
979 if (is_attached(*i, attached))
980 continue;
981
982 if (visited.find(*i) != visited.end())
983 continue;
984
985 visited.insert(*i);
986
987 ancestryT::const_iterator j = ancestry.find(*i);
988 if (j != ancestry.end())
989 {
990
991 for (edge_map::const_iterator k = j->second->second.edges.begin();
992 k != j->second->second.edges.end(); ++k)
993 {
994
995 next_frontier.insert(edge_old_revision(k));
996
997 // check out the manifest delta edge
998 manifest_id parent_manifest = edge_old_manifest(k);
999 manifest_id child_manifest = j->second->second.new_manifest;
1000
1001 // first, if we have a child we've never seen before we will need
1002 // to request it in its entrety.
1003 if (seen_manifests.find(child_manifest) == seen_manifests.end())
1004 {
1005 if (this->app.db.manifest_version_exists(child_manifest))
1006 L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest);
1007 else
1008 {
1009 L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest);
1010 queue_send_data_cmd(manifest_item, plain_id(child_manifest));
1011 }
1012 seen_manifests.insert(child_manifest);
1013 }
1014
1015 // second, if the parent is nonempty, we want to ask for an edge to it
1016 if (!parent_manifest.inner()().empty())
1017 {
1018 if (this->app.db.manifest_version_exists(parent_manifest))
1019 L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest);
1020 else
1021 {
1022 L(F("requesting (in reverse) manifest delta %s -> %s\n")
1023 % child_manifest % parent_manifest);
1024 reverse_delta_requests.insert(make_pair(plain_id(child_manifest),
1025 plain_id(parent_manifest)));
1026 queue_send_delta_cmd(manifest_item,
1027 plain_id(child_manifest),
1028 plain_id(parent_manifest));
1029 }
1030 seen_manifests.insert(parent_manifest);
1031 }
1032
1033
1034
1035 // check out each file delta edge
1036 change_set const & cset = edge_changes(k);
1037 for (change_set::delta_map::const_iterator d = cset.deltas.begin();
1038 d != cset.deltas.end(); ++d)
1039 {
1040 file_id parent_file (delta_entry_src(d));
1041 file_id child_file (delta_entry_dst(d));
1042
1043
1044 // first, if we have a child we've never seen before we will need
1045 // to request it in its entrety.
1046 if (seen_files.find(child_file) == seen_files.end())
1047 {
1048 if (this->app.db.file_version_exists(child_file))
1049 L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file);
1050 else
1051 {
1052 L(F("requesting (in reverse) initial file data %s\n") % child_file);
1053 queue_send_data_cmd(file_item, plain_id(child_file));
1054 }
1055 seen_files.insert(child_file);
1056 }
1057
1058 // second, if the parent is nonempty, we want to ask for an edge to it
1059 if (!parent_file.inner()().empty())
1060 {
1061 if (this->app.db.file_version_exists(parent_file))
1062 L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file);
1063 else
1064 {
1065 L(F("requesting (in reverse) file delta %s -> %s on %s\n")
1066 % child_file % parent_file % delta_entry_path(d));
1067 reverse_delta_requests.insert(make_pair(plain_id(child_file),
1068 plain_id(parent_file)));
1069 queue_send_delta_cmd(file_item,
1070 plain_id(child_file),
1071 plain_id(parent_file));
1072 }
1073 seen_files.insert(parent_file);
1074 }
1075 }
1076 }
1077
1078 // now actually consume the data packet, which will wait on the
1079 // arrival of its prerequisites in the packet_db_writer
1080 this->dbw.consume_revision_data(j->first, j->second->first);
1081 }
1082 }
1083 frontier = next_frontier;
1084 }
1085}
1086
1087void
1088session::request_fwd_revisions(revision_id const & i,
1089 map<revision_id, bool> attached,
1090 set<revision_id> & visited)
1091{
1092 if (visited.find(i) != visited.end())
1093 return;
1094
1095 visited.insert(i);
1096
1097 L(F("visiting revision '%s' for forward deltas\n") % i);
1098
1099 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
1100
1101 ancestryT::const_iterator j = ancestry.find(i);
1102 if (j != ancestry.end())
1103 {
1104 edge_map::const_iterator an_attached_edge = j->second->second.edges.end();
1105
1106 // first make sure we've requested enough to get to here by
1107 // calling ourselves recursively. this is the forward path after all.
1108
1109 for (edge_map::const_iterator k = j->second->second.edges.begin();
1110 k != j->second->second.edges.end(); ++k)
1111 {
1112 if (is_attached(edge_old_revision(k), attached))
1113 {
1114 request_fwd_revisions(edge_old_revision(k), attached, visited);
1115 an_attached_edge = k;
1116 }
1117 }
1118
1119 I(an_attached_edge != j->second->second.edges.end());
1120
1121 // check out the manifest delta edge
1122 manifest_id parent_manifest = edge_old_manifest(an_attached_edge);
1123 manifest_id child_manifest = j->second->second.new_manifest;
1124 if (this->app.db.manifest_version_exists(child_manifest))
1125 L(F("not requesting forward manifest delta to '%s' as we already have it\n")
1126 % child_manifest);
1127 else
1128 {
1129 if (parent_manifest.inner()().empty())
1130 {
1131 L(F("requesting full manifest data %s\n") % child_manifest);
1132 queue_send_data_cmd(manifest_item, plain_id(child_manifest));
1133 }
1134 else
1135 {
1136 L(F("requesting forward manifest delta %s -> %s\n")
1137 % parent_manifest % child_manifest);
1138 queue_send_delta_cmd(manifest_item,
1139 plain_id(parent_manifest),
1140 plain_id(child_manifest));
1141 }
1142 }
1143
1144 // check out each file delta edge
1145 change_set const & an_attached_cset = edge_changes(an_attached_edge);
1146 for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin();
1147 k != an_attached_cset.deltas.end(); ++k)
1148 {
1149 if (this->app.db.file_version_exists(delta_entry_dst(k)))
1150 L(F("not requesting forward delta %s -> %s on file %s as we already have it\n")
1151 % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
1152 else
1153 {
1154 if (delta_entry_src(k).inner()().empty())
1155 {
1156 L(F("requesting full file data %s\n") % delta_entry_dst(k));
1157 queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k)));
1158 }
1159 else
1160 {
1161
1162 L(F("requesting forward delta %s -> %s on file %s\n")
1163 % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
1164 queue_send_delta_cmd(file_item,
1165 plain_id(delta_entry_src(k)),
1166 plain_id(delta_entry_dst(k)));
1167 }
1168 }
1169 }
1170 // now actually consume the data packet, which will wait on the
1171 // arrival of its prerequisites in the packet_db_writer
1172 this->dbw.consume_revision_data(j->first, j->second->first);
1173 }
1174}
1175
1176void
1177session::analyze_ancestry_graph()
1178{
1179 typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
1180 typedef map<cert_name, vector<cert> > cert_map;
1181
1182 if (! (all_requested_revisions_received() && cert_refinement_done()))
1183 return;
1184
1185 if (analyzed_ancestry)
1186 return;
1187
1188 set<revision_id> heads;
1189 {
1190 set<revision_id> nodes, parents;
1191 map<revision_id, int> chld_num;
1192 L(F("analyzing %d ancestry edges\n") % ancestry.size());
1193
1194 for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
1195 {
1196 nodes.insert(i->first);
1197 for (edge_map::const_iterator j = i->second->second.edges.begin();
1198 j != i->second->second.edges.end(); ++j)
1199 {
1200 parents.insert(edge_old_revision(j));
1201 map<revision_id, int>::iterator n;
1202 n = chld_num.find(edge_old_revision(j));
1203 if (n == chld_num.end())
1204 chld_num.insert(make_pair(edge_old_revision(j), 1));
1205 else
1206 ++(n->second);
1207 }
1208 }
1209
1210 set_difference(nodes.begin(), nodes.end(),
1211 parents.begin(), parents.end(),
1212 inserter(heads, heads.begin()));
1213
1214 // Write permissions checking:
1215 // remove heads w/o proper certs, add their children to heads
1216 // 1) remove unwanted branch certs from consideration
1217 // 2) remove heads w/o a branch tag, process new exposed heads
1218 // 3) repeat 2 until no change
1219
1220 //1
1221 set<string> ok_branches, bad_branches;
1222 cert_name bcert_name(branch_cert_name);
1223 cert_name tcert_name(tag_cert_name);
1224 for (map<revision_id, cert_map>::iterator i = received_certs.begin();
1225 i != received_certs.end(); ++i)
1226 {
1227 //branches
1228 vector<cert> & bcerts(i->second[bcert_name]);
1229 vector<cert> keeping;
1230 for (vector<cert>::iterator j = bcerts.begin(); j != bcerts.end(); ++j)
1231 {
1232 cert_value name;
1233 decode_base64(j->value, name);
1234 if (ok_branches.find(name()) != ok_branches.end())
1235 keeping.push_back(*j);
1236 else if (bad_branches.find(name()) != bad_branches.end())
1237 ;
1238 else
1239 {
1240 if (our_matcher(name()))
1241 {
1242 ok_branches.insert(name());
1243 keeping.push_back(*j);
1244 }
1245 else
1246 {
1247 bad_branches.insert(name());
1248 W(F("Dropping branch certs for unwanted branch %s")
1249 % name);
1250 }
1251 }
1252 }
1253 bcerts = keeping;
1254 }
1255 //2
1256 list<revision_id> tmp;
1257 for (set<revision_id>::iterator i = heads.begin(); i != heads.end(); ++i)
1258 {
1259 if (!received_certs[*i][bcert_name].size())
1260 tmp.push_back(*i);
1261 }
1262 for (list<revision_id>::iterator i = tmp.begin(); i != tmp.end(); ++i)
1263 heads.erase(*i);
1264 //3
1265 while (tmp.size())
1266 {
1267 ancestryT::const_iterator i = ancestry.find(tmp.front());
1268 if (i != ancestry.end())
1269 {
1270 for (edge_map::const_iterator j = i->second->second.edges.begin();
1271 j != i->second->second.edges.end(); ++j)
1272 {
1273 if (!--chld_num[edge_old_revision(j)])
1274 {
1275 if (received_certs[i->first][bcert_name].size())
1276 heads.insert(i->first);
1277 else
1278 tmp.push_back(edge_old_revision(j));
1279 }
1280 }
1281 // since we don't want this rev, we don't want it's certs either
1282 received_certs[tmp.front()] = cert_map();
1283 }
1284 tmp.pop_front();
1285 }
1286 }
1287
1288 // We've reduced the certs to those we want now, send them to dbw.
1289 for (map<revision_id, cert_map>::iterator i = received_certs.begin();
1290 i != received_certs.end(); ++i)
1291 {
1292 for (cert_map::iterator j = i->second.begin();
1293 j != i->second.end(); ++j)
1294 {
1295 for (vector<cert>::iterator k = j->second.begin();
1296 k != j->second.end(); ++k)
1297 {
1298 this->dbw.consume_revision_cert(revision<cert>(*k));
1299 }
1300 }
1301 }
1302
1303 L(F("isolated %d heads\n") % heads.size());
1304
1305 // first we determine the "attachment status" of each node in our ancestry
1306 // graph.
1307
1308 map<revision_id, bool> attached;
1309 set<revision_id> visited;
1310 for (set<revision_id>::const_iterator i = heads.begin();
1311 i != heads.end(); ++i)
1312 analyze_attachment(*i, visited, attached);
1313
1314 // then we walk the graph upwards, recursively, starting from each of the
1315 // heads. we either walk requesting forward deltas or reverse deltas,
1316 // depending on whether we are walking an attached or detached subgraph,
1317 // respectively. the forward walk ignores detached nodes, the backward walk
1318 // ignores attached nodes.
1319
1320 set<revision_id> fwd_visited, rev_visited;
1321
1322 for (set<revision_id>::const_iterator i = heads.begin();
1323 i != heads.end(); ++i)
1324 {
1325 map<revision_id, bool>::const_iterator k = attached.find(*i);
1326 I(k != attached.end());
1327
1328 if (k->second)
1329 {
1330 L(F("requesting attached ancestry of revision '%s'\n") % *i);
1331 request_fwd_revisions(*i, attached, fwd_visited);
1332 }
1333 else
1334 {
1335 L(F("requesting detached ancestry of revision '%s'\n") % *i);
1336 request_rev_revisions(*i, attached, rev_visited);
1337 }
1338 }
1339 analyzed_ancestry = true;
1340}
1341
1342Netxx::Probe::ready_type
1343session::which_events() const
1344{
1345 if (outbuf.empty())
1346 {
1347 if (inbuf.size() < constants::netcmd_maxsz)
1348 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1349 else
1350 return Netxx::Probe::ready_oobd;
1351 }
1352 else
1353 {
1354 if (inbuf.size() < constants::netcmd_maxsz)
1355 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1356 else
1357 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1358 }
1359}
1360
1361bool
1362session::read_some()
1363{
1364 I(inbuf.size() < constants::netcmd_maxsz);
1365 char tmp[constants::bufsz];
1366 Netxx::signed_size_type count = str.read(tmp, sizeof(tmp));
1367 if (count > 0)
1368 {
1369 L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
1370 if (encountered_error)
1371 {
1372 L(F("in error unwind mode, so throwing them into the bit bucket\n"));
1373 return true;
1374 }
1375 inbuf.append(tmp,count);
1376 mark_recent_io();
1377 if (byte_in_ticker.get() != NULL)
1378 (*byte_in_ticker) += count;
1379 return true;
1380 }
1381 else
1382 return false;
1383}
1384
1385bool
1386session::write_some()
1387{
1388 I(!outbuf.empty());
1389 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1390 Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second,
1391 std::min(writelen,
1392 constants::bufsz));
1393 if (count > 0)
1394 {
1395 if ((size_t)count == writelen)
1396 {
1397 outbuf_size -= outbuf.front().first.size();
1398 outbuf.pop_front();
1399 }
1400 else
1401 {
1402 outbuf.front().second += count;
1403 }
1404 L(F("wrote %d bytes to fd %d (peer %s)\n")
1405 % count % fd % peer_id);
1406 mark_recent_io();
1407 if (byte_out_ticker.get() != NULL)
1408 (*byte_out_ticker) += count;
1409 if (encountered_error && outbuf.empty())
1410 {
1411 // we've flushed our error message, so it's time to get out.
1412 L(F("finished flushing output queue in error unwind mode, disconnecting\n"));
1413 return false;
1414 }
1415 return true;
1416 }
1417 else
1418 return false;
1419}
1420
1421// senders
1422
1423void
1424session::queue_bye_cmd()
1425{
1426 L(F("queueing 'bye' command\n"));
1427 netcmd cmd;
1428 cmd.write_bye_cmd();
1429 write_netcmd_and_try_flush(cmd);
1430 this->sent_goodbye = true;
1431}
1432
1433void
1434session::queue_error_cmd(string const & errmsg)
1435{
1436 L(F("queueing 'error' command\n"));
1437 netcmd cmd;
1438 cmd.write_error_cmd(errmsg);
1439 write_netcmd_and_try_flush(cmd);
1440 this->sent_goodbye = true;
1441}
1442
1443void
1444session::queue_done_cmd(size_t level,
1445 netcmd_item_type type)
1446{
1447 string typestr;
1448 netcmd_item_type_to_string(type, typestr);
1449 L(F("queueing 'done' command for %s level %s\n") % typestr % level);
1450 netcmd cmd;
1451 cmd.write_done_cmd(level, type);
1452 write_netcmd_and_try_flush(cmd);
1453}
1454
1455void
1456session::queue_hello_cmd(id const & server,
1457 id const & nonce)
1458{
1459 netcmd cmd;
1460 hexenc<id> server_encoded;
1461 encode_hexenc(server, server_encoded);
1462
1463 rsa_keypair_id key_name;
1464 base64<rsa_pub_key> pub_encoded;
1465 rsa_pub_key pub;
1466
1467 app.db.get_pubkey(server_encoded, key_name, pub_encoded);
1468 decode_base64(pub_encoded, pub);
1469 cmd.write_hello_cmd(key_name, pub, nonce);
1470 write_netcmd_and_try_flush(cmd);
1471}
1472
1473void
1474session::queue_anonymous_cmd(protocol_role role,
1475 utf8 const & include_pattern,
1476 utf8 const & exclude_pattern,
1477 id const & nonce2,
1478 base64<rsa_pub_key> server_key_encoded)
1479{
1480 netcmd cmd;
1481 rsa_oaep_sha_data hmac_key_encrypted;
1482 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1483 nonce2(), hmac_key_encrypted);
1484 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1485 hmac_key_encrypted);
1486 write_netcmd_and_try_flush(cmd);
1487 set_session_key(nonce2());
1488}
1489
1490void
1491session::queue_auth_cmd(protocol_role role,
1492 utf8 const & include_pattern,
1493 utf8 const & exclude_pattern,
1494 id const & client,
1495 id const & nonce1,
1496 id const & nonce2,
1497 string const & signature,
1498 base64<rsa_pub_key> server_key_encoded)
1499{
1500 netcmd cmd;
1501 rsa_oaep_sha_data hmac_key_encrypted;
1502 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1503 nonce2(), hmac_key_encrypted);
1504 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1505 nonce1, hmac_key_encrypted, signature);
1506 write_netcmd_and_try_flush(cmd);
1507 set_session_key(nonce2());
1508}
1509
1510void
1511session::queue_confirm_cmd()
1512{
1513 netcmd cmd;
1514 cmd.write_confirm_cmd();
1515 write_netcmd_and_try_flush(cmd);
1516}
1517
1518void
1519session::queue_refine_cmd(merkle_node const & node)
1520{
1521 string typestr;
1522 hexenc<prefix> hpref;
1523 node.get_hex_prefix(hpref);
1524 netcmd_item_type_to_string(node.type, typestr);
1525 L(F("queueing request for refinement of %s node '%s', level %d\n")
1526 % typestr % hpref % static_cast<int>(node.level));
1527 netcmd cmd;
1528 cmd.write_refine_cmd(node);
1529 write_netcmd_and_try_flush(cmd);
1530}
1531
1532void
1533session::queue_send_data_cmd(netcmd_item_type type,
1534 id const & item)
1535{
1536 string typestr;
1537 netcmd_item_type_to_string(type, typestr);
1538 hexenc<id> hid;
1539 encode_hexenc(item, hid);
1540
1541 if (role == source_role)
1542 {
1543 L(F("not queueing request for %s '%s' as we are in pure source role\n")
1544 % typestr % hid);
1545 return;
1546 }
1547
1548 if (item_already_requested(type, item))
1549 {
1550 L(F("not queueing request for %s '%s' as we already requested it\n")
1551 % typestr % hid);
1552 return;
1553 }
1554
1555 L(F("queueing request for data of %s item '%s'\n")
1556 % typestr % hid);
1557 netcmd cmd;
1558 cmd.write_send_data_cmd(type, item);
1559 write_netcmd_and_try_flush(cmd);
1560 note_item_requested(type, item);
1561}
1562
1563void
1564session::queue_send_delta_cmd(netcmd_item_type type,
1565 id const & base,
1566 id const & ident)
1567{
1568 I(type == manifest_item || type == file_item);
1569
1570 string typestr;
1571 netcmd_item_type_to_string(type, typestr);
1572 hexenc<id> base_hid;
1573 encode_hexenc(base, base_hid);
1574 hexenc<id> ident_hid;
1575 encode_hexenc(ident, ident_hid);
1576
1577 if (role == source_role)
1578 {
1579 L(F("not queueing request for %s delta '%s' -> '%s' as we are in pure source role\n")
1580 % typestr % base_hid % ident_hid);
1581 return;
1582 }
1583
1584 if (item_already_requested(type, ident))
1585 {
1586 L(F("not queueing request for %s delta '%s' -> '%s' as we already requested the target\n")
1587 % typestr % base_hid % ident_hid);
1588 return;
1589 }
1590
1591 L(F("queueing request for contents of %s delta '%s' -> '%s'\n")
1592 % typestr % base_hid % ident_hid);
1593 netcmd cmd;
1594 cmd.write_send_delta_cmd(type, base, ident);
1595 write_netcmd_and_try_flush(cmd);
1596 note_item_requested(type, ident);
1597}
1598
1599void
1600session::queue_data_cmd(netcmd_item_type type,
1601 id const & item,
1602 string const & dat)
1603{
1604 string typestr;
1605 netcmd_item_type_to_string(type, typestr);
1606 hexenc<id> hid;
1607 encode_hexenc(item, hid);
1608
1609 if (role == sink_role)
1610 {
1611 L(F("not queueing %s data for '%s' as we are in pure sink role\n")
1612 % typestr % hid);
1613 return;
1614 }
1615
1616 L(F("queueing %d bytes of data for %s item '%s'\n")
1617 % dat.size() % typestr % hid);
1618
1619 netcmd cmd;
1620 // TODO: This pair of functions will make two copies of a large
1621 // file, the first in cmd.write_data_cmd, and the second in
1622 // write_netcmd_and_try_flush when the data is copied from the
1623 // cmd.payload variable to the string buffer for output. This
1624 // double copy should be collapsed out, it may be better to use
1625 // a string_queue for output as well as input, as that will reduce
1626 // the amount of mallocs that happen when the string queue is large
1627 // enough to just store the data.
1628 cmd.write_data_cmd(type, item, dat);
1629 write_netcmd_and_try_flush(cmd);
1630 note_item_sent(type, item);
1631}
1632
1633void
1634session::queue_delta_cmd(netcmd_item_type type,
1635 id const & base,
1636 id const & ident,
1637 delta const & del)
1638{
1639 I(type == manifest_item || type == file_item);
1640 I(! del().empty() || ident == base);
1641 string typestr;
1642 netcmd_item_type_to_string(type, typestr);
1643 hexenc<id> base_hid;
1644 encode_hexenc(base, base_hid);
1645 hexenc<id> ident_hid;
1646 encode_hexenc(ident, ident_hid);
1647
1648 if (role == sink_role)
1649 {
1650 L(F("not queueing %s delta '%s' -> '%s' as we are in pure sink role\n")
1651 % typestr % base_hid % ident_hid);
1652 return;
1653 }
1654
1655 L(F("queueing %s delta '%s' -> '%s'\n")
1656 % typestr % base_hid % ident_hid);
1657 netcmd cmd;
1658 cmd.write_delta_cmd(type, base, ident, del);
1659 write_netcmd_and_try_flush(cmd);
1660 note_item_sent(type, ident);
1661}
1662
1663void
1664session::queue_nonexistant_cmd(netcmd_item_type type,
1665 id const & item)
1666{
1667 string typestr;
1668 netcmd_item_type_to_string(type, typestr);
1669 hexenc<id> hid;
1670 encode_hexenc(item, hid);
1671 if (role == sink_role)
1672 {
1673 L(F("not queueing note of nonexistence of %s item '%s' as we are in pure sink role\n")
1674 % typestr % hid);
1675 return;
1676 }
1677
1678 L(F("queueing note of nonexistance of %s item '%s'\n")
1679 % typestr % hid);
1680 netcmd cmd;
1681 cmd.write_nonexistant_cmd(type, item);
1682 write_netcmd_and_try_flush(cmd);
1683}
1684
1685// processors
1686
1687bool
1688session::process_bye_cmd()
1689{
1690 L(F("received 'bye' netcmd\n"));
1691 this->received_goodbye = true;
1692 return true;
1693}
1694
1695bool
1696session::process_error_cmd(string const & errmsg)
1697{
1698 throw bad_decode(F("received network error: %s") % errmsg);
1699}
1700
1701bool
1702session::process_done_cmd(size_t level, netcmd_item_type type)
1703{
1704
1705 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(type);
1706 I(i != done_refinements.end());
1707
1708 string typestr;
1709 netcmd_item_type_to_string(type, typestr);
1710
1711 if ((! i->second.current_level_had_refinements) || (level >= 0xff))
1712 {
1713 // we received *no* refinements on this level -- or we ran out of
1714 // levels -- so refinement for this type is finished.
1715 L(F("received 'done' for empty %s level %d, marking as complete\n")
1716 % typestr % static_cast<int>(level));
1717
1718 // possibly echo it back one last time, for shutdown purposes
1719 if (!i->second.tree_is_done)
1720 queue_done_cmd(level + 1, type);
1721
1722 // tombstone it
1723 i->second.current_level_had_refinements = false;
1724 i->second.tree_is_done = true;
1725
1726 if (all_requested_revisions_received())
1727 analyze_ancestry_graph();
1728
1729 maybe_note_epochs_finished();
1730 }
1731
1732 else if (i->second.current_level_had_refinements
1733 && (! i->second.tree_is_done))
1734 {
1735 // we *did* receive some refinements on this level, reset to zero and
1736 // queue an echo of the 'done' marker.
1737 L(F("received 'done' for %s level %d, which had refinements; "
1738 "sending echo of done for level %d\n")
1739 % typestr
1740 % static_cast<int>(level)
1741 % static_cast<int>(level + 1));
1742 i->second.current_level_had_refinements = false;
1743 queue_done_cmd(level + 1, type);
1744 return true;
1745 }
1746 return true;
1747}
1748
1749void
1750get_branches(app_state & app, vector<string> & names)
1751{
1752 app.db.get_branches(names);
1753 sort(names.begin(), names.end());
1754}
1755
1756static const var_domain known_servers_domain = var_domain("known-servers");
1757
1758bool
1759session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1760 rsa_pub_key const & their_key,
1761 id const & nonce)
1762{
1763 I(this->remote_peer_key_hash().size() == 0);
1764 I(this->saved_nonce().size() == 0);
1765
1766 hexenc<id> their_key_hash;
1767 base64<rsa_pub_key> their_key_encoded;
1768 encode_base64(their_key, their_key_encoded);
1769 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1770 L(F("server key has name %s, hash %s\n") % their_keyname % their_key_hash);
1771 var_key their_key_key(known_servers_domain, var_name(peer_id));
1772 if (app.db.var_exists(their_key_key))
1773 {
1774 var_value expected_key_hash;
1775 app.db.get_var(their_key_key, expected_key_hash);
1776 if (expected_key_hash() != their_key_hash())
1777 {
1778 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1779 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1780 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1781 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1782 "it is also possible that the server key has just been changed\n"
1783 "remote host sent key %s\n"
1784 "I expected %s\n"
1785 "'monotone unset %s %s' overrides this check\n")
1786 % their_key_hash % expected_key_hash
1787 % their_key_key.first % their_key_key.second);
1788 E(false, F("server key changed"));
1789 }
1790 }
1791 else
1792 {
1793 P(F("first time connecting to server %s\n"
1794 "I'll assume it's really them, but you might want to double-check\n"
1795 "their key's fingerprint: %s\n") % peer_id % their_key_hash);
1796 app.db.set_var(their_key_key, var_value(their_key_hash()));
1797 }
1798 if (!app.db.public_key_exists(their_key_hash))
1799 {
1800 W(F("saving public key for %s to database\n") % their_keyname);
1801 app.db.put_key(their_keyname, their_key_encoded);
1802 }
1803
1804 {
1805 hexenc<id> hnonce;
1806 encode_hexenc(nonce, hnonce);
1807 L(F("received 'hello' netcmd from server '%s' with nonce '%s'\n")
1808 % their_key_hash % hnonce);
1809 }
1810
1811 I(app.db.public_key_exists(their_key_hash));
1812
1813 // save their identity
1814 {
1815 id their_key_hash_decoded;
1816 decode_hexenc(their_key_hash, their_key_hash_decoded);
1817 this->remote_peer_key_hash = their_key_hash_decoded;
1818 }
1819
1820 // clients always include in the synchronization set, every branch that the
1821 // user requested
1822 vector<string> branchnames;
1823 set<utf8> ok_branches;
1824 get_branches(app, branchnames);
1825 for (vector<string>::const_iterator i = branchnames.begin();
1826 i != branchnames.end(); i++)
1827 {
1828 if (our_matcher(*i))
1829 ok_branches.insert(utf8(*i));
1830 }
1831 rebuild_merkle_trees(app, ok_branches);
1832
1833 setup_client_tickers();
1834
1835 if (app.signing_key() != "")
1836 {
1837 // get our public key for its hash identifier
1838 base64<rsa_pub_key> our_pub;
1839 hexenc<id> our_key_hash;
1840 id our_key_hash_raw;
1841 app.db.get_key(app.signing_key, our_pub);
1842 key_hash_code(app.signing_key, our_pub, our_key_hash);
1843 decode_hexenc(our_key_hash, our_key_hash_raw);
1844
1845 // get our private key and make a signature
1846 base64<rsa_sha1_signature> sig;
1847 rsa_sha1_signature sig_raw;
1848 base64< arc4<rsa_priv_key> > our_priv;
1849 load_priv_key(app, app.signing_key, our_priv);
1850 make_signature(app, app.signing_key, our_priv, nonce(), sig);
1851 decode_base64(sig, sig_raw);
1852
1853 // make a new nonce of our own and send off the 'auth'
1854 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1855 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1856 their_key_encoded);
1857 }
1858 else
1859 {
1860 queue_anonymous_cmd(this->role, our_include_pattern,
1861 our_exclude_pattern, mk_nonce(), their_key_encoded);
1862 }
1863 return true;
1864}
1865
1866bool
1867session::process_anonymous_cmd(protocol_role role,
1868 utf8 const & their_include_pattern,
1869 utf8 const & their_exclude_pattern)
1870{
1871 //
1872 // internally netsync thinks in terms of sources and sinks. users like
1873 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1874 //
1875 // we therefore use the read/write terminology when dealing with the UI:
1876 // if the user asks to run a "read only" service, this means they are
1877 // willing to be a source but not a sink.
1878 //
1879 // nb: the "role" here is the role the *client* wants to play
1880 // so we need to check that the opposite role is allowed for us,
1881 // in our this->role field.
1882 //
1883
1884 // client must be a sink and server must be a source (anonymous read-only)
1885
1886 if (role != sink_role)
1887 {
1888 W(F("rejected attempt at anonymous connection for write\n"));
1889 this->saved_nonce = id("");
1890 return false;
1891 }
1892
1893 if (this->role != source_role && this->role != source_and_sink_role)
1894 {
1895 W(F("rejected attempt at anonymous connection while running as sink\n"));
1896 this->saved_nonce = id("");
1897 return false;
1898 }
1899
1900 vector<string> branchnames;
1901 set<utf8> ok_branches;
1902 get_branches(app, branchnames);
1903 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1904 for (vector<string>::const_iterator i = branchnames.begin();
1905 i != branchnames.end(); i++)
1906 {
1907 if (their_matcher(*i))
1908 if (our_matcher(*i) && app.lua.hook_get_netsync_read_permitted(*i))
1909 ok_branches.insert(utf8(*i));
1910 else
1911 {
1912 error((F("anonymous access to branch '%s' denied by server") % *i).str());
1913 return true;
1914 }
1915 }
1916
1917 P(F("allowed anonymous read permission for '%s' excluding '%s'\n")
1918 % their_include_pattern % their_exclude_pattern);
1919
1920 rebuild_merkle_trees(app, ok_branches);
1921
1922 this->remote_peer_key_name = rsa_keypair_id("");
1923 this->authenticated = true;
1924 this->role = source_role;
1925 return true;
1926}
1927
1928bool
1929session::process_auth_cmd(protocol_role their_role,
1930 utf8 const & their_include_pattern,
1931 utf8 const & their_exclude_pattern,
1932 id const & client,
1933 id const & nonce1,
1934 string const & signature)
1935{
1936 I(this->remote_peer_key_hash().size() == 0);
1937 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1938
1939 hexenc<id> their_key_hash;
1940 encode_hexenc(client, their_key_hash);
1941 set<utf8> ok_branches;
1942 vector<string> branchnames;
1943 get_branches(app, branchnames);
1944 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1945
1946 // check that they replied with the nonce we asked for
1947 if (!(nonce1 == this->saved_nonce))
1948 {
1949 W(F("detected replay attack in auth netcmd\n"));
1950 this->saved_nonce = id("");
1951 return false;
1952 }
1953
1954
1955 //
1956 // internally netsync thinks in terms of sources and sinks. users like
1957 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1958 //
1959 // we therefore use the read/write terminology when dealing with the UI:
1960 // if the user asks to run a "read only" service, this means they are
1961 // willing to be a source but not a sink.
1962 //
1963 // nb: the "their_role" here is the role the *client* wants to play
1964 // so we need to check that the opposite role is allowed for us,
1965 // in our this->role field.
1966 //
1967
1968 if (!app.db.public_key_exists(their_key_hash))
1969 {
1970 W(F("remote public key hash '%s' is unknown\n") % their_key_hash);
1971 this->saved_nonce = id("");
1972 return false;
1973 }
1974
1975 // get their public key
1976 rsa_keypair_id their_id;
1977 base64<rsa_pub_key> their_key;
1978 app.db.get_pubkey(their_key_hash, their_id, their_key);
1979
1980 // client as sink, server as source (reading)
1981
1982 if (their_role == sink_role || their_role == source_and_sink_role)
1983 {
1984 if (this->role != source_role && this->role != source_and_sink_role)
1985 {
1986 W(F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink\n")
1987 % their_id % their_include_pattern % their_exclude_pattern);
1988 this->saved_nonce = id("");
1989 return false;
1990 }
1991 }
1992
1993 for (vector<string>::const_iterator i = branchnames.begin();
1994 i != branchnames.end(); i++)
1995 {
1996 if (their_matcher(*i))
1997 {
1998 if (our_matcher(*i) && app.lua.hook_get_netsync_read_permitted(*i, their_id))
1999 ok_branches.insert(utf8(*i));
2000 else
2001 {
2002 W(F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'\n")
2003 % their_id % their_include_pattern % their_exclude_pattern % *i);
2004 error((F("access to branch '%s' denied by server") % *i).str());
2005 return true;
2006 }
2007 }
2008 }
2009
2010 //if we're source_and_sink_role, continue even with no branches readable
2011 //ex: serve --db=empty.db
2012 P(F("allowed '%s' read permission for '%s' excluding '%s'\n")
2013 % their_id % their_include_pattern % their_exclude_pattern);
2014
2015 // client as source, server as sink (writing)
2016
2017 if (their_role == source_role || their_role == source_and_sink_role)
2018 {
2019 if (this->role != sink_role && this->role != source_and_sink_role)
2020 {
2021 W(F("denied '%s' write permission for '%s' excluding '%s' while running as pure source\n")
2022 % their_id % their_include_pattern % their_exclude_pattern);
2023 this->saved_nonce = id("");
2024 return false;
2025 }
2026
2027 if (!app.lua.hook_get_netsync_write_permitted(their_id))
2028 {
2029 W(F("denied '%s' write permission for '%s' excluding '%s'\n")
2030 % their_id % their_include_pattern % their_exclude_pattern);
2031 this->saved_nonce = id("");
2032 return false;
2033 }
2034
2035 P(F("allowed '%s' write permission for '%s' excluding '%s'\n")
2036 % their_id % their_include_pattern % their_exclude_pattern);
2037 }
2038
2039 rebuild_merkle_trees(app, ok_branches);
2040
2041 // save their identity
2042 this->remote_peer_key_hash = client;
2043
2044 // check the signature
2045 base64<rsa_sha1_signature> sig;
2046 encode_base64(rsa_sha1_signature(signature), sig);
2047 if (check_signature(app, their_id, their_key, nonce1(), sig))
2048 {
2049 // get our private key and sign back
2050 L(F("client signature OK, accepting authentication\n"));
2051 this->authenticated = true;
2052 this->remote_peer_key_name = their_id;
2053 // assume the (possibly degraded) opposite role
2054 switch (their_role)
2055 {
2056 case source_role:
2057 I(this->role != source_role);
2058 this->role = sink_role;
2059 break;
2060 case source_and_sink_role:
2061 I(this->role == source_and_sink_role);
2062 break;
2063 case sink_role:
2064 I(this->role != sink_role);
2065 this->role = source_role;
2066 break;
2067 }
2068 return true;
2069 }
2070 else
2071 {
2072 W(F("bad client signature\n"));
2073 }
2074 return false;
2075}
2076
2077bool
2078session::process_confirm_cmd(string const & signature)
2079{
2080 I(this->remote_peer_key_hash().size() == constants::merkle_hash_length_in_bytes);
2081 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
2082
2083 hexenc<id> their_key_hash;
2084 encode_hexenc(id(remote_peer_key_hash), their_key_hash);
2085
2086 // nb. this->role is our role, the server is in the opposite role
2087 L(F("received 'confirm' netcmd from server '%s' for pattern '%s' exclude '%s' in %s mode\n")
2088 % their_key_hash % our_include_pattern % our_exclude_pattern
2089 % (this->role == source_and_sink_role ? _("source and sink") :
2090 (this->role == source_role ? _("sink") : _("source"))));
2091
2092 // check their signature
2093 if (app.db.public_key_exists(their_key_hash))
2094 {
2095 // get their public key and check the signature
2096 rsa_keypair_id their_id;
2097 base64<rsa_pub_key> their_key;
2098 app.db.get_pubkey(their_key_hash, their_id, their_key);
2099 base64<rsa_sha1_signature> sig;
2100 encode_base64(rsa_sha1_signature(signature), sig);
2101 if (check_signature(app, their_id, their_key, this->saved_nonce(), sig))
2102 {
2103 L(F("server signature OK, accepting authentication\n"));
2104 return true;
2105 }
2106 else
2107 {
2108 W(F("bad server signature\n"));
2109 }
2110 }
2111 else
2112 {
2113 W(F("unknown server key\n"));
2114 }
2115 return false;
2116}
2117
2118void
2119session::respond_to_confirm_cmd()
2120{
2121 merkle_ptr root;
2122 load_merkle_node(epoch_item, 0, get_root_prefix().val, root);
2123 queue_refine_cmd(*root);
2124 queue_done_cmd(0, epoch_item);
2125
2126 load_merkle_node(key_item, 0, get_root_prefix().val, root);
2127 queue_refine_cmd(*root);
2128 queue_done_cmd(0, key_item);
2129
2130 load_merkle_node(cert_item, 0, get_root_prefix().val, root);
2131 queue_refine_cmd(*root);
2132 queue_done_cmd(0, cert_item);
2133}
2134
2135static bool
2136data_exists(netcmd_item_type type,
2137 id const & item,
2138 app_state & app)
2139{
2140 hexenc<id> hitem;
2141 encode_hexenc(item, hitem);
2142 switch (type)
2143 {
2144 case key_item:
2145 return app.db.public_key_exists(hitem);
2146 case manifest_item:
2147 return app.db.manifest_version_exists(manifest_id(hitem));
2148 case file_item:
2149 return app.db.file_version_exists(file_id(hitem));
2150 case revision_item:
2151 return app.db.revision_exists(revision_id(hitem));
2152 case cert_item:
2153 return app.db.revision_cert_exists(hitem);
2154 case epoch_item:
2155 return app.db.epoch_exists(epoch_id(hitem));
2156 }
2157 return false;
2158}
2159
2160static void
2161load_data(netcmd_item_type type,
2162 id const & item,
2163 app_state & app,
2164 string & out)
2165{
2166 string typestr;
2167 netcmd_item_type_to_string(type, typestr);
2168 hexenc<id> hitem;
2169 encode_hexenc(item, hitem);
2170 switch (type)
2171 {
2172 case epoch_item:
2173 if (app.db.epoch_exists(epoch_id(hitem)))
2174 {
2175 cert_value branch;
2176 epoch_data epoch;
2177 app.db.get_epoch(epoch_id(hitem), branch, epoch);
2178 write_epoch(branch, epoch, out);
2179 }
2180 else
2181 {
2182 throw bad_decode(F("epoch with hash '%s' does not exist in our database")
2183 % hitem);
2184 }
2185 break;
2186 case key_item:
2187 if (app.db.public_key_exists(hitem))
2188 {
2189 rsa_keypair_id keyid;
2190 base64<rsa_pub_key> pub_encoded;
2191 app.db.get_pubkey(hitem, keyid, pub_encoded);
2192 L(F("public key '%s' is also called '%s'\n") % hitem % keyid);
2193 write_pubkey(keyid, pub_encoded, out);
2194 }
2195 else
2196 {
2197 throw bad_decode(F("no public key '%s' found in database") % hitem);
2198 }
2199 break;
2200
2201 case revision_item:
2202 if (app.db.revision_exists(revision_id(hitem)))
2203 {
2204 revision_data mdat;
2205 data dat;
2206 app.db.get_revision(revision_id(hitem), mdat);
2207 out = mdat.inner()();
2208 }
2209 else
2210 {
2211 throw bad_decode(F("revision '%s' does not exist in our database") % hitem);
2212 }
2213 break;
2214
2215 case manifest_item:
2216 if (app.db.manifest_version_exists(manifest_id(hitem)))
2217 {
2218 manifest_data mdat;
2219 data dat;
2220 app.db.get_manifest_version(manifest_id(hitem), mdat);
2221 out = mdat.inner()();
2222 }
2223 else
2224 {
2225 throw bad_decode(F("manifest '%s' does not exist in our database") % hitem);
2226 }
2227 break;
2228
2229 case file_item:
2230 if (app.db.file_version_exists(file_id(hitem)))
2231 {
2232 file_data fdat;
2233 data dat;
2234 app.db.get_file_version(file_id(hitem), fdat);
2235 out = fdat.inner()();
2236 }
2237 else
2238 {
2239 throw bad_decode(F("file '%s' does not exist in our database") % hitem);
2240 }
2241 break;
2242
2243 case cert_item:
2244 if (app.db.revision_cert_exists(hitem))
2245 {
2246 revision<cert> c;
2247 app.db.get_revision_cert(hitem, c);
2248 string tmp;
2249 write_cert(c.inner(), out);
2250 }
2251 else
2252 {
2253 throw bad_decode(F("cert '%s' does not exist in our database") % hitem);
2254 }
2255 break;
2256 }
2257}
2258
2259
2260bool
2261session::process_refine_cmd(merkle_node const & their_node)
2262{
2263 prefix pref;
2264 hexenc<prefix> hpref;
2265 their_node.get_raw_prefix(pref);
2266 their_node.get_hex_prefix(hpref);
2267 string typestr;
2268
2269 netcmd_item_type_to_string(their_node.type, typestr);
2270 size_t lev = static_cast<size_t>(their_node.level);
2271
2272 L(F("received 'refine' netcmd on %s node '%s', level %d\n")
2273 % typestr % hpref % lev);
2274
2275 if (!merkle_node_exists(their_node.type, their_node.level, pref))
2276 {
2277 L(F("no corresponding %s merkle node for prefix '%s', level %d\n")
2278 % typestr % hpref % lev);
2279
2280 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
2281 {
2282 switch (their_node.get_slot_state(slot))
2283 {
2284 case empty_state:
2285 {
2286 // we agree, this slot is empty
2287 L(F("(#0) they have an empty slot %d (in a %s node '%s', level %d, we do not have)\n")
2288 % slot % typestr % hpref % lev);
2289 continue;
2290 }
2291 break;
2292 case live_leaf_state:
2293 {
2294 // we want what *they* have
2295 id slotval;
2296 hexenc<id> hslotval;
2297 their_node.get_raw_slot(slot, slotval);
2298 their_node.get_hex_slot(slot, hslotval);
2299 L(F("(#0) they have a live leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
2300 % slot % typestr % hpref % lev);
2301 L(F("(#0) requesting their %s leaf %s\n") % typestr % hslotval);
2302 queue_send_data_cmd(their_node.type, slotval);
2303 }
2304 break;
2305 case dead_leaf_state:
2306 {
2307 // we cannot ask for what they have, it is dead
2308 L(F("(#0) they have a dead leaf at slot %d (in a %s node '%s', level %d, we do not have)\n")
2309 % slot % typestr % hpref % lev);
2310 continue;
2311 }
2312 break;
2313 case subtree_state:
2314 {
2315 // they have a subtree; might as well ask for that
2316 L(F("(#0) they have a subtree at slot %d (in a %s node '%s', level %d, we do not have)\n")
2317 % slot % typestr % hpref % lev);
2318 merkle_node our_fake_subtree;
2319 their_node.extended_prefix(slot, our_fake_subtree.pref);
2320 our_fake_subtree.level = their_node.level + 1;
2321 our_fake_subtree.type = their_node.type;
2322 queue_refine_cmd(our_fake_subtree);
2323 }
2324 break;
2325 }
2326 }
2327 }
2328 else
2329 {
2330 // we have a corresponding merkle node. there are 16 branches
2331 // to the following switch condition. it is awful. sorry.
2332 L(F("found corresponding %s merkle node for prefix '%s', level %d\n")
2333 % typestr % hpref % lev);
2334 merkle_ptr our_node;
2335 load_merkle_node(their_node.type, their_node.level, pref, our_node);
2336 for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot)
2337 {
2338 switch (their_node.get_slot_state(slot))
2339 {
2340 case empty_state:
2341 switch (our_node->get_slot_state(slot))
2342 {
2343
2344 case empty_state:
2345 // 1: theirs == empty, ours == empty
2346 L(F("(#1) they have an empty slot %d in %s node '%s', level %d, and so do we\n")
2347 % slot % typestr % hpref % lev);
2348 continue;
2349 break;
2350
2351 case live_leaf_state:
2352 // 2: theirs == empty, ours == live
2353 L(F("(#2) they have an empty slot %d in %s node '%s', level %d, we have a live leaf\n")
2354 % slot % typestr % hpref % lev);
2355 {
2356 I(their_node.type == our_node->type);
2357 string tmp;
2358 id slotval;
2359 our_node->get_raw_slot(slot, slotval);
2360 load_data(their_node.type, slotval, this->app, tmp);
2361 queue_data_cmd(their_node.type, slotval, tmp);
2362 }
2363 break;
2364
2365 case dead_leaf_state:
2366 // 3: theirs == empty, ours == dead
2367 L(F("(#3) they have an empty slot %d in %s node '%s', level %d, we have a dead leaf\n")
2368 % slot % typestr % hpref % lev);
2369 continue;
2370 break;
2371
2372 case subtree_state:
2373 // 4: theirs == empty, ours == subtree
2374 L(F("(#4) they have an empty slot %d in %s node '%s', level %d, we have a subtree\n")
2375 % slot % typestr % hpref % lev);
2376 {
2377 prefix subprefix;
2378 our_node->extended_raw_prefix(slot, subprefix);
2379 merkle_ptr our_subtree;
2380 I(our_node->type == their_node.type);
2381 load_merkle_node(their_node.type, our_node->level + 1,
2382 subprefix, our_subtree);
2383 I(our_node->type == our_subtree->type);
2384 // FIXME: it would be more efficient here, to instead of
2385 // sending our subtree, just send the data for everything
2386 // in the subtree.
2387 queue_refine_cmd(*our_subtree);
2388 }
2389 break;
2390
2391 }
2392 break;
2393
2394
2395 case live_leaf_state:
2396 switch (our_node->get_slot_state(slot))
2397 {
2398
2399 case empty_state:
2400 // 5: theirs == live, ours == empty
2401 L(F("(#5) they have a live leaf at slot %d in %s node '%s', level %d, we have nothing\n")
2402 % slot % typestr % hpref % lev);
2403 {
2404 id slotval;
2405 their_node.get_raw_slot(slot, slotval);
2406 queue_send_data_cmd(their_node.type, slotval);
2407 }
2408 break;
2409
2410 case live_leaf_state:
2411 // 6: theirs == live, ours == live
2412 L(F("(#6) they have a live leaf at slot %d in %s node '%s', and so do we\n")
2413 % slot % typestr % hpref);
2414 {
2415 id our_slotval, their_slotval;
2416 their_node.get_raw_slot(slot, their_slotval);
2417 our_node->get_raw_slot(slot, our_slotval);
2418 if (their_slotval == our_slotval)
2419 {
2420 hexenc<id> hslotval;
2421 their_node.get_hex_slot(slot, hslotval);
2422 L(F("(#6) we both have live %s leaf '%s'\n") % typestr % hslotval);
2423 continue;
2424 }
2425 else
2426 {
2427 I(their_node.type == our_node->type);
2428 string tmp;
2429 load_data(our_node->type, our_slotval, this->app, tmp);
2430 queue_send_data_cmd(their_node.type, their_slotval);
2431 queue_data_cmd(our_node->type, our_slotval, tmp);
2432 }
2433 }
2434 break;
2435
2436 case dead_leaf_state:
2437 // 7: theirs == live, ours == dead
2438 L(F("(#7) they have a live leaf at slot %d in %s node %s, level %d, we have a dead one\n")
2439 % slot % typestr % hpref % lev);
2440 {
2441 id our_slotval, their_slotval;
2442 our_node->get_raw_slot(slot, our_slotval);
2443 their_node.get_raw_slot(slot, their_slotval);
2444 if (their_slotval == our_slotval)
2445 {
2446 hexenc<id> hslotval;
2447 their_node.get_hex_slot(slot, hslotval);
2448 L(F("(#7) it's the same %s leaf '%s', but ours is dead\n")
2449 % typestr % hslotval);
2450 continue;
2451 }
2452 else
2453 {
2454 queue_send_data_cmd(their_node.type, their_slotval);
2455 }
2456 }
2457 break;
2458
2459 case subtree_state:
2460 // 8: theirs == live, ours == subtree
2461 L(F("(#8) they have a live leaf in slot %d of %s node '%s', level %d, we have a subtree\n")
2462 % slot % typestr % hpref % lev);
2463 {
2464
2465 id their_slotval;
2466 hexenc<id> their_hval;
2467 their_node.get_raw_slot(slot, their_slotval);
2468 encode_hexenc(their_slotval, their_hval);
2469 if (data_exists(their_node.type, their_slotval, app))
2470 L(F("(#8) we have a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n")
2471 % their_hval % slot % typestr % hpref % lev);
2472 else
2473 {
2474 L(F("(#8) requesting a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n")
2475 % their_hval % slot % typestr % hpref % lev);
2476 queue_send_data_cmd(their_node.type, their_slotval);
2477 }
2478
2479 L(F("(#8) sending our subtree for refinement, in slot %d of %s node '%s', level %d\n")
2480 % slot % typestr % hpref % lev);
2481 prefix subprefix;
2482 our_node->extended_raw_prefix(slot, subprefix);
2483 merkle_ptr our_subtree;
2484 load_merkle_node(our_node->type, our_node->level + 1,
2485 subprefix, our_subtree);
2486 // FIXME: it would be more efficient here, to instead of
2487 // sending our subtree, just send the data for everything
2488 // in the subtree (except, possibly, the item they already
2489 // have).
2490 queue_refine_cmd(*our_subtree);
2491 }
2492 break;
2493 }
2494 break;
2495
2496
2497 case dead_leaf_state:
2498 switch (our_node->get_slot_state(slot))
2499 {
2500 case empty_state:
2501 // 9: theirs == dead, ours == empty
2502 L(F("(#9) they have a dead leaf at slot %d in %s node '%s', level %d, we have nothing\n")
2503 % slot % typestr % hpref % lev);
2504 continue;
2505 break;
2506
2507 case live_leaf_state:
2508 // 10: theirs == dead, ours == live
2509 L(F("(#10) they have a dead leaf at slot %d in %s node '%s', level %d, we have a live one\n")
2510 % slot % typestr % hpref % lev);
2511 {
2512 id our_slotval, their_slotval;
2513 their_node.get_raw_slot(slot, their_slotval);
2514 our_node->get_raw_slot(slot, our_slotval);
2515 hexenc<id> hslotval;
2516 our_node->get_hex_slot(slot, hslotval);
2517 if (their_slotval == our_slotval)
2518 {
2519 L(F("(#10) we both have %s leaf %s, theirs is dead\n")
2520 % typestr % hslotval);
2521 continue;
2522 }
2523 else
2524 {
2525 I(their_node.type == our_node->type);
2526 string tmp;
2527 load_data(our_node->type, our_slotval, this->app, tmp);
2528 queue_data_cmd(our_node->type, our_slotval, tmp);
2529 }
2530 }
2531 break;
2532
2533 case dead_leaf_state:
2534 // 11: theirs == dead, ours == dead
2535 L(F("(#11) they have a dead leaf at slot %d in %s node '%s', level %d, so do we\n")
2536 % slot % typestr % hpref % lev);
2537 continue;
2538 break;
2539
2540 case subtree_state:
2541 // theirs == dead, ours == subtree
2542 L(F("(#12) they have a dead leaf in slot %d of %s node '%s', we have a subtree\n")
2543 % slot % typestr % hpref % lev);
2544 {
2545 prefix subprefix;
2546 our_node->extended_raw_prefix(slot, subprefix);
2547 merkle_ptr our_subtree;
2548 load_merkle_node(our_node->type, our_node->level + 1,
2549 subprefix, our_subtree);
2550 // FIXME: it would be more efficient here, to instead of
2551 // sending our subtree, just send the data for everything
2552 // in the subtree (except, possibly, the dead thing).
2553 queue_refine_cmd(*our_subtree);
2554 }
2555 break;
2556 }
2557 break;
2558
2559
2560 case subtree_state:
2561 switch (our_node->get_slot_state(slot))
2562 {
2563 case empty_state:
2564 // 13: theirs == subtree, ours == empty
2565 L(F("(#13) they have a subtree at slot %d in %s node '%s', level %d, we have nothing\n")
2566 % slot % typestr % hpref % lev);
2567 {
2568 merkle_node our_fake_subtree;
2569 their_node.extended_prefix(slot, our_fake_subtree.pref);
2570 our_fake_subtree.level = their_node.level + 1;
2571 our_fake_subtree.type = their_node.type;
2572 queue_refine_cmd(our_fake_subtree);
2573 }
2574 break;
2575
2576 case live_leaf_state:
2577 // 14: theirs == subtree, ours == live
2578 L(F("(#14) they have a subtree at slot %d in %s node '%s', level %d, we have a live leaf\n")
2579 % slot % typestr % hpref % lev);
2580 {
2581 size_t subslot;
2582 id our_slotval;
2583 merkle_node our_fake_subtree;
2584 our_node->get_raw_slot(slot, our_slotval);
2585 hexenc<id> hslotval;
2586 encode_hexenc(our_slotval, hslotval);
2587
2588 pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot,
2589 our_fake_subtree.pref);
2590 L(F("(#14) pushed our leaf '%s' into fake subtree slot %d, level %d\n")
2591 % hslotval % subslot % (lev + 1));
2592 our_fake_subtree.type = their_node.type;
2593 our_fake_subtree.level = our_node->level + 1;
2594 our_fake_subtree.set_raw_slot(subslot, our_slotval);
2595 our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot));
2596 queue_refine_cmd(our_fake_subtree);
2597 }
2598 break;
2599
2600 case dead_leaf_state:
2601 // 15: theirs == subtree, ours == dead
2602 L(F("(#15) they have a subtree at slot %d in %s node '%s', level %d, we have a dead leaf\n")
2603 % slot % typestr % hpref % lev);
2604 {
2605 size_t subslot;
2606 id our_slotval;
2607 merkle_node our_fake_subtree;
2608 our_node->get_raw_slot(slot, our_slotval);
2609 pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot,
2610 our_fake_subtree.pref);
2611 our_fake_subtree.type = their_node.type;
2612 our_fake_subtree.level = our_node->level + 1;
2613 our_fake_subtree.set_raw_slot(subslot, our_slotval);
2614 our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot));
2615 queue_refine_cmd(our_fake_subtree);
2616 }
2617 break;
2618
2619 case subtree_state:
2620 // 16: theirs == subtree, ours == subtree
2621 L(F("(#16) they have a subtree at slot %d in %s node '%s', level %d, and so do we\n")
2622 % slot % typestr % hpref % lev);
2623 {
2624 id our_slotval, their_slotval;
2625 hexenc<id> hslotval;
2626 their_node.get_raw_slot(slot, their_slotval);
2627 our_node->get_raw_slot(slot, our_slotval);
2628 our_node->get_hex_slot(slot, hslotval);
2629 if (their_slotval == our_slotval)
2630 {
2631 L(F("(#16) we both have %s subtree '%s'\n") % typestr % hslotval);
2632 continue;
2633 }
2634 else
2635 {
2636 L(F("(#16) %s subtrees at slot %d differ, refining ours\n") % typestr % slot);
2637 prefix subprefix;
2638 our_node->extended_raw_prefix(slot, subprefix);
2639 merkle_ptr our_subtree;
2640 load_merkle_node(our_node->type, our_node->level + 1,
2641 subprefix, our_subtree);
2642 queue_refine_cmd(*our_subtree);
2643 }
2644 }
2645 break;
2646 }
2647 break;
2648 }
2649 }
2650 }
2651 return true;
2652}
2653
2654
2655bool
2656session::process_send_data_cmd(netcmd_item_type type,
2657 id const & item)
2658{
2659 string typestr;
2660 netcmd_item_type_to_string(type, typestr);
2661 hexenc<id> hitem;
2662 encode_hexenc(item, hitem);
2663 L(F("received 'send_data' netcmd requesting %s '%s'\n")
2664 % typestr % hitem);
2665 if (data_exists(type, item, this->app))
2666 {
2667 string out;
2668 load_data(type, item, this->app, out);
2669 queue_data_cmd(type, item, out);
2670 }
2671 else
2672 {
2673 queue_nonexistant_cmd(type, item);
2674 }
2675 return true;
2676}
2677
2678bool
2679session::process_send_delta_cmd(netcmd_item_type type,
2680 id const & base,
2681 id const & ident)
2682{
2683 string typestr;
2684 netcmd_item_type_to_string(type, typestr);
2685 delta del;
2686
2687 hexenc<id> hbase, hident;
2688 encode_hexenc(base, hbase);
2689 encode_hexenc(ident, hident);
2690
2691 L(F("received 'send_delta' netcmd requesting %s edge '%s' -> '%s'\n")
2692 % typestr % hbase % hident);
2693
2694 switch (type)
2695 {
2696 case file_item:
2697 {
2698 file_id fbase(hbase), fident(hident);
2699 file_delta fdel;
2700 if (this->app.db.file_version_exists(fbase)
2701 && this->app.db.file_version_exists(fident))
2702 {
2703 file_data base_fdat, ident_fdat;
2704 data base_dat, ident_dat;
2705 this->app.db.get_file_version(fbase, base_fdat);
2706 this->app.db.get_file_version(fident, ident_fdat);
2707 string tmp;
2708 base_dat = base_fdat.inner();
2709 ident_dat = ident_fdat.inner();
2710 compute_delta(base_dat(), ident_dat(), tmp);
2711 del = delta(tmp);
2712 }
2713 else
2714 {
2715 return process_send_data_cmd(type, ident);
2716 }
2717 }
2718 break;
2719
2720 case manifest_item:
2721 {
2722 manifest_id mbase(hbase), mident(hident);
2723 manifest_delta mdel;
2724 if (this->app.db.manifest_version_exists(mbase)
2725 && this->app.db.manifest_version_exists(mident))
2726 {
2727 manifest_data base_mdat, ident_mdat;
2728 data base_dat, ident_dat;
2729 this->app.db.get_manifest_version(mbase, base_mdat);
2730 this->app.db.get_manifest_version(mident, ident_mdat);
2731 string tmp;
2732 base_dat = base_mdat.inner();
2733 ident_dat = ident_mdat.inner();
2734 compute_delta(base_dat(), ident_dat(), tmp);
2735 del = delta(tmp);
2736 }
2737 else
2738 {
2739 return process_send_data_cmd(type, ident);
2740 }
2741 }
2742 break;
2743
2744 default:
2745 throw bad_decode(F("delta requested for item type %s\n") % typestr);
2746 }
2747 queue_delta_cmd(type, base, ident, del);
2748 return true;
2749}
2750
2751bool
2752session::process_data_cmd(netcmd_item_type type,
2753 id const & item,
2754 string const & dat)
2755{
2756 hexenc<id> hitem;
2757 encode_hexenc(item, hitem);
2758
2759 // it's ok if we received something we didn't ask for; it might
2760 // be a spontaneous transmission from refinement
2761 note_item_arrived(type, item);
2762
2763 switch (type)
2764 {
2765 case epoch_item:
2766 if (this->app.db.epoch_exists(epoch_id(hitem)))
2767 {
2768 L(F("epoch '%s' already exists in our database\n") % hitem);
2769 }
2770 else
2771 {
2772 cert_value branch;
2773 epoch_data epoch;
2774 read_epoch(dat, branch, epoch);
2775 L(F("received epoch %s for branch %s\n") % epoch % branch);
2776 std::map<cert_value, epoch_data> epochs;
2777 app.db.get_epochs(epochs);
2778 std::map<cert_value, epoch_data>::const_iterator i;
2779 i = epochs.find(branch);
2780 if (i == epochs.end())
2781 {
2782 L(F("branch %s has no epoch; setting epoch to %s\n") % branch % epoch);
2783 app.db.set_epoch(branch, epoch);
2784 maybe_note_epochs_finished();
2785 }
2786 else
2787 {
2788 L(F("branch %s already has an epoch; checking\n") % branch);
2789 // if we get here, then we know that the epoch must be
2790 // different, because if it were the same then the
2791 // if(epoch_exists()) branch up above would have been taken. if
2792 // somehow this is wrong, then we have broken epoch hashing or
2793 // something, which is very dangerous, so play it safe...
2794 I(!(i->second == epoch));
2795
2796 // It is safe to call 'error' here, because if we get here,
2797 // then the current netcmd packet cannot possibly have
2798 // written anything to the database.
2799 error((F("Mismatched epoch on branch %s."
2800 " Server has '%s', client has '%s'.")
2801 % branch
2802 % (voice == server_voice ? i->second : epoch)
2803 % (voice == server_voice ? epoch : i->second)).str());
2804 }
2805 }
2806 break;
2807
2808 case key_item:
2809 if (this->app.db.public_key_exists(hitem))
2810 L(F("public key '%s' already exists in our database\n") % hitem);
2811 else
2812 {
2813 rsa_keypair_id keyid;
2814 base64<rsa_pub_key> pub;
2815 read_pubkey(dat, keyid, pub);
2816 hexenc<id> tmp;
2817 key_hash_code(keyid, pub, tmp);
2818 if (! (tmp == hitem))
2819 throw bad_decode(F("hash check failed for public key '%s' (%s);"
2820 " wanted '%s' got '%s'")
2821 % hitem % keyid % hitem % tmp);
2822 this->dbw.consume_public_key(keyid, pub);
2823 }
2824 break;
2825
2826 case cert_item:
2827 if (this->app.db.revision_cert_exists(hitem))
2828 L(F("cert '%s' already exists in our database\n") % hitem);
2829 else
2830 {
2831 cert c;
2832 read_cert(dat, c);
2833 hexenc<id> tmp;
2834 cert_hash_code(c, tmp);
2835 if (! (tmp == hitem))
2836 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
2837// this->dbw.consume_revision_cert(revision<cert>(c));
2838 received_certs[revision_id(c.ident)][c.name].push_back(c);
2839 if (!app.db.revision_exists(revision_id(c.ident)))
2840 {
2841 id rid;
2842 decode_hexenc(c.ident, rid);
2843 queue_send_data_cmd(revision_item, rid);
2844 }
2845 }
2846 break;
2847
2848 case revision_item:
2849 {
2850 revision_id rid(hitem);
2851 if (this->app.db.revision_exists(rid))
2852 L(F("revision '%s' already exists in our database\n") % hitem);
2853 else
2854 {
2855 L(F("received revision '%s'\n") % hitem);
2856 boost::shared_ptr< pair<revision_data, revision_set > >
2857 rp(new pair<revision_data, revision_set>());
2858
2859 rp->first = revision_data(dat);
2860 read_revision_set(dat, rp->second);
2861 ancestry.insert(std::make_pair(rid, rp));
2862 if (cert_refinement_done())
2863 {
2864 analyze_ancestry_graph();
2865 }
2866 }
2867 }
2868 break;
2869
2870 case manifest_item:
2871 {
2872 manifest_id mid(hitem);
2873 if (this->app.db.manifest_version_exists(mid))
2874 L(F("manifest version '%s' already exists in our database\n") % hitem);
2875 else
2876 {
2877 this->dbw.consume_manifest_data(mid, manifest_data(dat));
2878 manifest_map man;
2879 read_manifest_map(data(dat), man);
2880 analyze_manifest(man);
2881 }
2882 }
2883 break;
2884
2885 case file_item:
2886 {
2887 file_id fid(hitem);
2888 if (this->app.db.file_version_exists(fid))
2889 L(F("file version '%s' already exists in our database\n") % hitem);
2890 else
2891 {
2892 this->dbw.consume_file_data(fid, file_data(dat));
2893 }
2894 }
2895 break;
2896
2897 }
2898 return true;
2899}
2900
2901bool
2902session::process_delta_cmd(netcmd_item_type type,
2903 id const & base,
2904 id const & ident,
2905 delta const & del)
2906{
2907 string typestr;
2908 netcmd_item_type_to_string(type, typestr);
2909 hexenc<id> hbase, hident;
2910 encode_hexenc(base, hbase);
2911 encode_hexenc(ident, hident);
2912
2913 pair<id,id> id_pair = make_pair(base, ident);
2914
2915 // it's ok if we received something we didn't ask for; it might
2916 // be a spontaneous transmission from refinement
2917 note_item_arrived(type, ident);
2918
2919 switch (type)
2920 {
2921 case manifest_item:
2922 {
2923 manifest_id src_manifest(hbase), dst_manifest(hident);
2924 if (reverse_delta_requests.find(id_pair)
2925 != reverse_delta_requests.end())
2926 {
2927 reverse_delta_requests.erase(id_pair);
2928 this->dbw.consume_manifest_reverse_delta(src_manifest,
2929 dst_manifest,
2930 manifest_delta(del));
2931 }
2932 else
2933 this->dbw.consume_manifest_delta(src_manifest,
2934 dst_manifest,
2935 manifest_delta(del));
2936
2937 }
2938 break;
2939
2940 case file_item:
2941 {
2942 file_id src_file(hbase), dst_file(hident);
2943 if (reverse_delta_requests.find(id_pair)
2944 != reverse_delta_requests.end())
2945 {
2946 reverse_delta_requests.erase(id_pair);
2947 this->dbw.consume_file_reverse_delta(src_file,
2948 dst_file,
2949 file_delta(del));
2950 }
2951 else
2952 this->dbw.consume_file_delta(src_file,
2953 dst_file,
2954 file_delta(del));
2955 }
2956 break;
2957
2958 default:
2959 L(F("ignoring delta received for item type %s\n") % typestr);
2960 break;
2961 }
2962 return true;
2963}
2964
2965bool
2966session::process_nonexistant_cmd(netcmd_item_type type,
2967 id const & item)
2968{
2969 string typestr;
2970 netcmd_item_type_to_string(type, typestr);
2971 hexenc<id> hitem;
2972 encode_hexenc(item, hitem);
2973 L(F("received 'nonexistant' netcmd for %s '%s'\n")
2974 % typestr % hitem);
2975 note_item_arrived(type, item);
2976 return true;
2977}
2978
2979bool
2980session::process_usher_cmd(utf8 const & msg)
2981{
2982 if (msg().size())
2983 {
2984 if (msg()[0] == '!')
2985 P(F("Received warning from usher: %s") % msg().substr(1));
2986 else
2987 L(F("Received greeting from usher: %s") % msg().substr(1));
2988 }
2989 netcmd cmdout;
2990 cmdout.write_usher_reply_cmd(our_include_pattern);
2991 write_netcmd_and_try_flush(cmdout);
2992 L(F("Sent reply."));
2993 return true;
2994}
2995
2996bool
2997session::merkle_node_exists(netcmd_item_type type,
2998 size_t level,
2999 prefix const & pref)
3000{
3001 map<netcmd_item_type, boost::shared_ptr<merkle_table> >::const_iterator i =
3002 merkle_tables.find(type);
3003
3004 I(i != merkle_tables.end());
3005 merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level));
3006 return (j != i->second->end());
3007}
3008
3009void
3010session::load_merkle_node(netcmd_item_type type,
3011 size_t level,
3012 prefix const & pref,
3013 merkle_ptr & node)
3014{
3015 map<netcmd_item_type, boost::shared_ptr<merkle_table> >::const_iterator i =
3016 merkle_tables.find(type);
3017
3018 I(i != merkle_tables.end());
3019 merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level));
3020 I(j != i->second->end());
3021 node = j->second;
3022}
3023
3024
3025bool
3026session::dispatch_payload(netcmd const & cmd)
3027{
3028
3029 switch (cmd.get_cmd_code())
3030 {
3031
3032 case bye_cmd:
3033 return process_bye_cmd();
3034 break;
3035
3036 case error_cmd:
3037 {
3038 string errmsg;
3039 cmd.read_error_cmd(errmsg);
3040 return process_error_cmd(errmsg);
3041 }
3042 break;
3043
3044 case hello_cmd:
3045 require(! authenticated, "hello netcmd received when not authenticated");
3046 require(voice == client_voice, "hello netcmd received in client voice");
3047 {
3048 rsa_keypair_id server_keyname;
3049 rsa_pub_key server_key;
3050 id nonce;
3051 cmd.read_hello_cmd(server_keyname, server_key, nonce);
3052 return process_hello_cmd(server_keyname, server_key, nonce);
3053 }
3054 break;
3055
3056 case anonymous_cmd:
3057 require(! authenticated, "anonymous netcmd received when not authenticated");
3058 require(voice == server_voice, "anonymous netcmd received in server voice");
3059 require(role == source_role ||
3060 role == source_and_sink_role,
3061 "anonymous netcmd received in source or source/sink role");
3062 {
3063 protocol_role role;
3064 utf8 their_include_pattern, their_exclude_pattern;
3065 rsa_oaep_sha_data hmac_key_encrypted;
3066 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
3067 L(F("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
3068 "in %s mode\n")
3069 % their_include_pattern % their_exclude_pattern
3070 % (role == source_and_sink_role ? _("source and sink") :
3071 (role == source_role ? _("source") : _("sink"))));
3072
3073 set_session_key(hmac_key_encrypted);
3074 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
3075 return false;
3076 queue_confirm_cmd();
3077 return true;
3078 }
3079 break;
3080
3081 case auth_cmd:
3082 require(! authenticated, "auth netcmd received when not authenticated");
3083 require(voice == server_voice, "auth netcmd received in server voice");
3084 {
3085 protocol_role role;
3086 string signature;
3087 utf8 their_include_pattern, their_exclude_pattern;
3088 id client, nonce1, nonce2;
3089 rsa_oaep_sha_data hmac_key_encrypted;
3090 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
3091 client, nonce1, hmac_key_encrypted, signature);
3092
3093 hexenc<id> their_key_hash;
3094 encode_hexenc(client, their_key_hash);
3095 hexenc<id> hnonce1;
3096 encode_hexenc(nonce1, hnonce1);
3097
3098 L(F("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
3099 "exclude '%s' in %s mode with nonce1 '%s'\n")
3100 % their_key_hash % their_include_pattern % their_exclude_pattern
3101 % (role == source_and_sink_role ? _("source and sink") :
3102 (role == source_role ? _("source") : _("sink")))
3103 % hnonce1);
3104
3105 set_session_key(hmac_key_encrypted);
3106 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
3107 client, nonce1, signature))
3108 return false;
3109 queue_confirm_cmd();
3110 return true;
3111 }
3112 break;
3113
3114 case confirm_cmd:
3115 require(! authenticated, "confirm netcmd received when not authenticated");
3116 require(voice == client_voice, "confirm netcmd received in client voice");
3117 {
3118 string signature;
3119 cmd.read_confirm_cmd();
3120 this->authenticated = true;
3121 respond_to_confirm_cmd();
3122 return true;
3123 }
3124 break;
3125
3126 case refine_cmd:
3127 require(authenticated, "refine netcmd received when authenticated");
3128 {
3129 merkle_node node;
3130 cmd.read_refine_cmd(node);
3131 map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(node.type);
3132 require(i != done_refinements.end(), "refinement netcmd refers to valid type");
3133 require(i->second.tree_is_done == false, "refinement netcmd received when tree is live");
3134 i->second.current_level_had_refinements = true;
3135 return process_refine_cmd(node);
3136 }
3137 break;
3138
3139 case done_cmd:
3140 require(authenticated, "done netcmd received when authenticated");
3141 {
3142 size_t level;
3143 netcmd_item_type type;
3144 cmd.read_done_cmd(level, type);
3145 return process_done_cmd(level, type);
3146 }
3147 break;
3148
3149 case send_data_cmd:
3150 require(authenticated, "send_data netcmd received when authenticated");
3151 require(role == source_role ||
3152 role == source_and_sink_role,
3153 "send_data netcmd received in source or source/sink role");
3154 {
3155 netcmd_item_type type;
3156 id item;
3157 cmd.read_send_data_cmd(type, item);
3158 return process_send_data_cmd(type, item);
3159 }
3160 break;
3161
3162 case send_delta_cmd:
3163 require(authenticated, "send_delta netcmd received when authenticated");
3164 require(role == source_role ||
3165 role == source_and_sink_role,
3166 "send_delta netcmd received in source or source/sink role");
3167 {
3168 netcmd_item_type type;
3169 id base, ident;
3170 cmd.read_send_delta_cmd(type, base, ident);
3171 return process_send_delta_cmd(type, base, ident);
3172 }
3173
3174 case data_cmd:
3175 require(authenticated, "data netcmd received when authenticated");
3176 require(role == sink_role ||
3177 role == source_and_sink_role,
3178 "data netcmd received in source or source/sink role");
3179 {
3180 netcmd_item_type type;
3181 id item;
3182 string dat;
3183 cmd.read_data_cmd(type, item, dat);
3184 return process_data_cmd(type, item, dat);
3185 }
3186 break;
3187
3188 case delta_cmd:
3189 require(authenticated, "delta netcmd received when authenticated");
3190 require(role == sink_role ||
3191 role == source_and_sink_role,
3192 "delta netcmd received in source or source/sink role");
3193 {
3194 netcmd_item_type type;
3195 id base, ident;
3196 delta del;
3197 cmd.read_delta_cmd(type, base, ident, del);
3198 return process_delta_cmd(type, base, ident, del);
3199 }
3200 break;
3201
3202 case nonexistant_cmd:
3203 require(authenticated, "nonexistant netcmd received when authenticated");
3204 require(role == sink_role ||
3205 role == source_and_sink_role,
3206 "nonexistant netcmd received in sink or source/sink role");
3207 {
3208 netcmd_item_type type;
3209 id item;
3210 cmd.read_nonexistant_cmd(type, item);
3211 return process_nonexistant_cmd(type, item);
3212 }
3213 break;
3214 case usher_cmd:
3215 {
3216 utf8 greeting;
3217 cmd.read_usher_cmd(greeting);
3218 return process_usher_cmd(greeting);
3219 }
3220 break;
3221 case usher_reply_cmd:
3222 return false;// should not happen
3223 break;
3224 }
3225 return false;
3226}
3227
3228// this kicks off the whole cascade starting from "hello"
3229void
3230session::begin_service()
3231{
3232 base64<rsa_pub_key> pub_encoded;
3233 app.db.get_key(app.signing_key, pub_encoded);
3234 hexenc<id> keyhash;
3235 id keyhash_raw;
3236 key_hash_code(app.signing_key, pub_encoded, keyhash);
3237 decode_hexenc(keyhash, keyhash_raw);
3238 queue_hello_cmd(keyhash_raw(), mk_nonce());
3239}
3240
3241void
3242session::maybe_say_goodbye()
3243{
3244 if (done_all_refinements() &&
3245 got_all_data())
3246 queue_bye_cmd();
3247}
3248
3249bool
3250session::arm()
3251{
3252 if (!armed)
3253 {
3254 if (outbuf_size > constants::bufsz * 10)
3255 return false; // don't pack the buffer unnecessarily
3256
3257 if (cmd.read(inbuf, read_hmac))
3258 {
3259 armed = true;
3260 }
3261 }
3262 return armed;
3263}
3264
3265bool session::process()
3266{
3267 try
3268 {
3269 if (!arm())
3270 return true;
3271
3272 transaction_guard guard(app.db);
3273 armed = false;
3274 L(F("processing %d byte input buffer from peer %s\n") % inbuf.size() % peer_id);
3275 bool ret = dispatch_payload(cmd);
3276 if (inbuf.size() >= constants::netcmd_maxsz)
3277 W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id);
3278 guard.commit();
3279 maybe_say_goodbye();
3280 return ret;
3281 }
3282 catch (bad_decode & bd)
3283 {
3284 W(F("protocol error while processing peer %s: '%s'\n") % peer_id % bd.what);
3285 return false;
3286 }
3287}
3288
3289
3290static void
3291call_server(protocol_role role,
3292 utf8 const & include_pattern,
3293 utf8 const & exclude_pattern,
3294 app_state & app,
3295 utf8 const & address,
3296 Netxx::port_type default_port,
3297 unsigned long timeout_seconds)
3298{
3299 Netxx::Probe probe;
3300 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
3301
3302 // FIXME: split into labels and convert to ace here.
3303
3304 P(F("connecting to %s\n") % address());
3305 Netxx::Stream server(address().c_str(), default_port, timeout);
3306 session sess(role, client_voice, include_pattern, exclude_pattern,
3307 app, address(), server.get_socketfd(), timeout);
3308
3309 while (true)
3310 {
3311 bool armed = false;
3312 try
3313 {
3314 armed = sess.arm();
3315 }
3316 catch (bad_decode & bd)
3317 {
3318 W(F("protocol error while processing peer %s: '%s'\n")
3319 % sess.peer_id % bd.what);
3320 return;
3321 }
3322
3323 probe.clear();
3324 probe.add(sess.str, sess.which_events());
3325 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
3326 Netxx::Probe::ready_type event = res.second;
3327 Netxx::socket_type fd = res.first;
3328
3329 if (fd == -1 && !armed)
3330 {
3331 P(F("timed out waiting for I/O with peer %s, disconnecting\n") % sess.peer_id);
3332 return;
3333 }
3334
3335 if (event & Netxx::Probe::ready_read)
3336 {
3337 if (sess.read_some())
3338 {
3339 try
3340 {
3341 armed = sess.arm();
3342 }
3343 catch (bad_decode & bd)
3344 {
3345 W(F("protocol error while processing peer %s: '%s'\n")
3346 % sess.peer_id % bd.what);
3347 return;
3348 }
3349 }
3350 else
3351 {
3352 if (sess.sent_goodbye)
3353 P(F("read from fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
3354 else
3355 P(F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
3356 return;
3357 }
3358 }
3359
3360 if (event & Netxx::Probe::ready_write)
3361 {
3362 if (! sess.write_some())
3363 {
3364 if (sess.sent_goodbye)
3365 P(F("write on fd %d (peer %s) closed OK after goodbye\n") % fd % sess.peer_id);
3366 else
3367 P(F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id);
3368 return;
3369 }
3370 }
3371
3372 if (event & Netxx::Probe::ready_oobd)
3373 {
3374 P(F("got OOB data on fd %d (peer %s), disconnecting\n")
3375 % fd % sess.peer_id);
3376 return;
3377 }
3378
3379 if (armed)
3380 {
3381 if (!sess.process())
3382 {
3383 P(F("terminated exchange with %s\n")
3384 % sess.peer_id);
3385 return;
3386 }
3387 }
3388
3389 if (sess.sent_goodbye && sess.outbuf.empty() && sess.received_goodbye)
3390 {
3391 P(F("successful exchange with %s\n")
3392 % sess.peer_id);
3393 return;
3394 }
3395 }
3396}
3397
3398static void
3399arm_sessions_and_calculate_probe(Netxx::Probe & probe,
3400 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3401 set<Netxx::socket_type> & armed_sessions)
3402{
3403 set<Netxx::socket_type> arm_failed;
3404 for (map<Netxx::socket_type,
3405 shared_ptr<session> >::const_iterator i = sessions.begin();
3406 i != sessions.end(); ++i)
3407 {
3408 try
3409 {
3410 if (i->second->arm())
3411 {
3412 L(F("fd %d is armed\n") % i->first);
3413 armed_sessions.insert(i->first);
3414 }
3415 probe.add(i->second->str, i->second->which_events());
3416 }
3417 catch (bad_decode & bd)
3418 {
3419 W(F("protocol error while processing peer %s: '%s', marking as bad\n")
3420 % i->second->peer_id % bd.what);
3421 arm_failed.insert(i->first);
3422 }
3423 }
3424 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
3425 i != arm_failed.end(); ++i)
3426 {
3427 sessions.erase(*i);
3428 }
3429}
3430
3431static void
3432handle_new_connection(Netxx::Address & addr,
3433 Netxx::StreamServer & server,
3434 Netxx::Timeout & timeout,
3435 protocol_role role,
3436 utf8 const & include_pattern,
3437 utf8 const & exclude_pattern,
3438 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3439 app_state & app)
3440{
3441 L(F("accepting new connection on %s : %s\n")
3442 % addr.get_name() % lexical_cast<string>(addr.get_port()));
3443 Netxx::Peer client = server.accept_connection();
3444
3445 if (!client)
3446 {
3447 L(F("accept() returned a dead client\n"));
3448 }
3449 else
3450 {
3451 P(F("accepted new client connection from %s\n") % client);
3452 shared_ptr<session> sess(new session(role, server_voice,
3453 include_pattern, exclude_pattern,
3454 app,
3455 lexical_cast<string>(client),
3456 client.get_socketfd(), timeout));
3457 sess->begin_service();
3458 sessions.insert(make_pair(client.get_socketfd(), sess));
3459 }
3460}
3461
3462static void
3463handle_read_available(Netxx::socket_type fd,
3464 shared_ptr<session> sess,
3465 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3466 set<Netxx::socket_type> & armed_sessions,
3467 bool & live_p)
3468{
3469 if (sess->read_some())
3470 {
3471 try
3472 {
3473 if (sess->arm())
3474 armed_sessions.insert(fd);
3475 }
3476 catch (bad_decode & bd)
3477 {
3478 W(F("protocol error while processing peer %s: '%s', disconnecting\n")
3479 % sess->peer_id % bd.what);
3480 sessions.erase(fd);
3481 live_p = false;
3482 }
3483 }
3484 else
3485 {
3486 P(F("fd %d (peer %s) read failed, disconnecting\n")
3487 % fd % sess->peer_id);
3488 sessions.erase(fd);
3489 live_p = false;
3490 }
3491}
3492
3493
3494static void
3495handle_write_available(Netxx::socket_type fd,
3496 shared_ptr<session> sess,
3497 map<Netxx::socket_type, shared_ptr<session> > & sessions,
3498 bool & live_p)
3499{
3500 if (! sess->write_some())
3501 {
3502 P(F("fd %d (peer %s) write failed, disconnecting\n")
3503 % fd % sess->peer_id);
3504 sessions.erase(fd);
3505 live_p = false;
3506 }
3507}
3508
3509static void
3510process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
3511 set<Netxx::socket_type> & armed_sessions)
3512{
3513 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
3514 i != armed_sessions.end(); ++i)
3515 {
3516 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
3517 j = sessions.find(*i);
3518 if (j == sessions.end())
3519 continue;
3520 else
3521 {
3522 Netxx::socket_type fd = j->first;
3523 shared_ptr<session> sess = j->second;
3524 if (!sess->process())
3525 {
3526 P(F("fd %d (peer %s) processing finished, disconnecting\n")
3527 % fd % sess->peer_id);
3528 sessions.erase(j);
3529 }
3530 }
3531 }
3532}
3533
3534static void
3535reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
3536 unsigned long timeout_seconds)
3537{
3538 // kill any clients which haven't done any i/o inside the timeout period
3539 // or who have said goodbye and flushed their output buffers
3540 set<Netxx::socket_type> dead_clients;
3541 time_t now = ::time(NULL);
3542 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.begin();
3543 i != sessions.end(); ++i)
3544 {
3545 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
3546 < static_cast<unsigned long>(now))
3547 {
3548 P(F("fd %d (peer %s) has been idle too long, disconnecting\n")
3549 % i->first % i->second->peer_id);
3550 dead_clients.insert(i->first);
3551 }
3552 if (i->second->sent_goodbye && i->second->outbuf.empty() && i->second->received_goodbye)
3553 {
3554 P(F("fd %d (peer %s) exchanged goodbyes and flushed output, disconnecting\n")
3555 % i->first % i->second->peer_id);
3556 dead_clients.insert(i->first);
3557 }
3558 }
3559 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
3560 i != dead_clients.end(); ++i)
3561 {
3562 sessions.erase(*i);
3563 }
3564}
3565
3566static void
3567serve_connections(protocol_role role,
3568 utf8 const & include_pattern,
3569 utf8 const & exclude_pattern,
3570 app_state & app,
3571 utf8 const & address,
3572 Netxx::port_type default_port,
3573 unsigned long timeout_seconds,
3574 unsigned long session_limit)
3575{
3576 Netxx::Probe probe;
3577
3578 Netxx::Timeout
3579 forever,
3580 timeout(static_cast<long>(timeout_seconds)),
3581 instant(0,1);
3582
3583 Netxx::Address addr(address().c_str(), default_port, true);
3584
3585 P(F("beginning service on %s : %s\n")
3586 % addr.get_name() % lexical_cast<string>(addr.get_port()));
3587
3588 Netxx::StreamServer server(addr, timeout);
3589
3590 map<Netxx::socket_type, shared_ptr<session> > sessions;
3591 set<Netxx::socket_type> armed_sessions;
3592
3593 while (true)
3594 {
3595 probe.clear();
3596 armed_sessions.clear();
3597
3598 if (sessions.size() >= session_limit)
3599 W(F("session limit %d reached, some connections will be refused\n") % session_limit);
3600 else
3601 probe.add(server);
3602
3603 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
3604
3605 L(F("i/o probe with %d armed\n") % armed_sessions.size());
3606 Netxx::Probe::result_type res = probe.ready(sessions.empty() ? forever
3607 : (armed_sessions.empty() ? timeout
3608 : instant));
3609 Netxx::Probe::ready_type event = res.second;
3610 Netxx::socket_type fd = res.first;
3611
3612 if (fd == -1)
3613 {
3614 if (armed_sessions.empty())
3615 L(F("timed out waiting for I/O (listening on %s : %s)\n")
3616 % addr.get_name() % lexical_cast<string>(addr.get_port()));
3617 }
3618
3619 // we either got a new connection
3620 else if (fd == server)
3621 handle_new_connection(addr, server, timeout, role,
3622 include_pattern, exclude_pattern, sessions, app);
3623
3624 // or an existing session woke up
3625 else
3626 {
3627 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3628 i = sessions.find(fd);
3629 if (i == sessions.end())
3630 {
3631 L(F("got woken up for action on unknown fd %d\n") % fd);
3632 }
3633 else
3634 {
3635 shared_ptr<session> sess = i->second;
3636 bool live_p = true;
3637
3638 if (event & Netxx::Probe::ready_read)
3639 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3640
3641 if (live_p && (event & Netxx::Probe::ready_write))
3642 handle_write_available(fd, sess, sessions, live_p);
3643
3644 if (live_p && (event & Netxx::Probe::ready_oobd))
3645 {
3646 P(F("got some OOB data on fd %d (peer %s), disconnecting\n")
3647 % fd % sess->peer_id);
3648 sessions.erase(i);
3649 }
3650 }
3651 }
3652 process_armed_sessions(sessions, armed_sessions);
3653 reap_dead_sessions(sessions, timeout_seconds);
3654 }
3655}
3656
3657
3658/////////////////////////////////////////////////
3659//
3660// layer 4: monotone interface layer
3661//
3662/////////////////////////////////////////////////
3663
3664static boost::shared_ptr<merkle_table>
3665make_root_node(session & sess,
3666 netcmd_item_type ty)
3667{
3668 boost::shared_ptr<merkle_table> tab =
3669 boost::shared_ptr<merkle_table>(new merkle_table());
3670
3671 merkle_ptr tmp = merkle_ptr(new merkle_node());
3672 tmp->type = ty;
3673
3674 tab->insert(std::make_pair(std::make_pair(get_root_prefix().val, 0), tmp));
3675
3676 sess.merkle_tables[ty] = tab;
3677 return tab;
3678}
3679
3680void
3681insert_with_parents(revision_id rev, set<revision_id> & col, app_state & app, ticker & revisions_ticker)
3682{
3683 vector<revision_id> frontier;
3684 frontier.push_back(rev);
3685 while (!frontier.empty())
3686 {
3687 revision_id rid = frontier.back();
3688 frontier.pop_back();
3689 if (!null_id(rid) && col.find(rid) == col.end())
3690 {
3691 ++revisions_ticker;
3692 col.insert(rid);
3693 std::set<revision_id> parents;
3694 app.db.get_revision_parents(rid, parents);
3695 for (std::set<revision_id>::const_iterator i = parents.begin();
3696 i != parents.end(); ++i)
3697 {
3698 frontier.push_back(*i);
3699 }
3700 }
3701 }
3702}
3703
3704void
3705session::rebuild_merkle_trees(app_state & app,
3706 set<utf8> const & branchnames)
3707{
3708 P(F("finding items to synchronize:\n"));
3709 for (set<utf8>::const_iterator i = branchnames.begin();
3710 i != branchnames.end(); ++i)
3711 L(F("including branch %s") % *i);
3712
3713 boost::shared_ptr<merkle_table> ctab = make_root_node(*this, cert_item);
3714 boost::shared_ptr<merkle_table> ktab = make_root_node(*this, key_item);
3715 boost::shared_ptr<merkle_table> etab = make_root_node(*this, epoch_item);
3716
3717 // xgettext: please use short message and try to avoid multibytes chars
3718 ticker revisions_ticker(_("revisions"), "r", 64);
3719 // xgettext: please use short message and try to avoid multibytes chars
3720 ticker certs_ticker(_("certs"), "c", 256);
3721 // xgettext: please use short message and try to avoid multibytes chars
3722 ticker keys_ticker(_("keys"), "k", 1);
3723
3724 set<revision_id> revision_ids;
3725 set<rsa_keypair_id> inserted_keys;
3726
3727 {
3728 // Get our branches
3729 vector<string> names;
3730 get_branches(app, names);
3731 for (size_t i = 0; i < names.size(); ++i)
3732 {
3733 if(branchnames.find(names[i]) != branchnames.end())
3734 {
3735 // branch matches, get its certs
3736 vector< revision<cert> > certs;
3737 base64<cert_value> encoded_name;
3738 encode_base64(cert_value(names[i]),encoded_name);
3739 app.db.get_revision_certs(branch_cert_name, encoded_name, certs);
3740 for (size_t j = 0; j < certs.size(); ++j)
3741 {
3742 insert_with_parents(revision_id(idx(certs,j).inner().ident),
3743 revision_ids, app, revisions_ticker);
3744 }
3745 }
3746 }
3747 }
3748
3749 {
3750 map<cert_value, epoch_data> epochs;
3751 app.db.get_epochs(epochs);
3752
3753 epoch_data epoch_zero(std::string(constants::epochlen, '0'));
3754 for (std::set<utf8>::const_iterator i = branchnames.begin();
3755 i != branchnames.end(); ++i)
3756 {
3757 cert_value branch((*i)());
3758 std::map<cert_value, epoch_data>::const_iterator j;
3759 j = epochs.find(branch);
3760 // set to zero any epoch which is not yet set
3761 if (j == epochs.end())
3762 {
3763 L(F("setting epoch on %s to zero\n") % branch);
3764 epochs.insert(std::make_pair(branch, epoch_zero));
3765 app.db.set_epoch(branch, epoch_zero);
3766 }
3767 // then insert all epochs into merkle tree
3768 j = epochs.find(branch);
3769 I(j != epochs.end());
3770 epoch_id eid;
3771 epoch_hash_code(j->first, j->second, eid);
3772 id raw_hash;
3773 decode_hexenc(eid.inner(), raw_hash);
3774 insert_into_merkle_tree(*etab, epoch_item, true, raw_hash(), 0);
3775 }
3776 }
3777
3778 typedef std::vector< std::pair<hexenc<id>,
3779 std::pair<revision_id, rsa_keypair_id> > > cert_idx;
3780
3781 cert_idx idx;
3782 // <mrb> this also gets *all* certs, needed?
3783 app.db.get_revision_cert_index(idx);
3784
3785 // insert all certs and keys reachable via these revisions,
3786 // except for branch certs that don't match the masks (since the other
3787 // side will just discard them anyway)
3788 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3789 {
3790 hexenc<id> const & hash = i->first;
3791 revision_id const & ident = i->second.first;
3792 rsa_keypair_id const & key = i->second.second;
3793
3794 if (revision_ids.find(ident) == revision_ids.end())
3795 continue;
3796
3797 id raw_hash;
3798 decode_hexenc(hash, raw_hash);
3799 insert_into_merkle_tree(*ctab, cert_item, true, raw_hash(), 0);
3800 ++certs_ticker;
3801 if (inserted_keys.find(key) == inserted_keys.end())
3802 {
3803 if (app.db.public_key_exists(key))
3804 {
3805 base64<rsa_pub_key> pub_encoded;
3806 app.db.get_key(key, pub_encoded);
3807 hexenc<id> keyhash;
3808 key_hash_code(key, pub_encoded, keyhash);
3809 decode_hexenc(keyhash, raw_hash);
3810 insert_into_merkle_tree(*ktab, key_item, true, raw_hash(), 0);
3811 ++keys_ticker;
3812 }
3813 inserted_keys.insert(key);
3814 }
3815 }
3816
3817 recalculate_merkle_codes(*etab, get_root_prefix().val, 0);
3818 recalculate_merkle_codes(*ktab, get_root_prefix().val, 0);
3819 recalculate_merkle_codes(*ctab, get_root_prefix().val, 0);
3820}
3821
3822void
3823run_netsync_protocol(protocol_voice voice,
3824 protocol_role role,
3825 utf8 const & addr,
3826 utf8 const & include_pattern,
3827 utf8 const & exclude_pattern,
3828 app_state & app)
3829{
3830 try
3831 {
3832 start_platform_netsync();
3833 if (voice == server_voice)
3834 {
3835 serve_connections(role, include_pattern, exclude_pattern, app,
3836 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3837 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3838 static_cast<unsigned long>(constants::netsync_connection_limit));
3839 }
3840 else
3841 {
3842 I(voice == client_voice);
3843 transaction_guard guard(app.db);
3844 call_server(role, include_pattern, exclude_pattern, app,
3845 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3846 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3847 guard.commit();
3848 }
3849 }
3850 catch (Netxx::NetworkException & e)
3851 {
3852 end_platform_netsync();
3853 throw informative_failure((F("network error: %s") % e.what()).str());
3854 }
3855 catch (Netxx::Exception & e)
3856 {
3857 end_platform_netsync();
3858 throw oops((F("network error: %s") % e.what()).str());;
3859 }
3860 end_platform_netsync();
3861}
3862

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status