monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include <map>
11#include <string>
12#include <cstdlib>
13#include <memory>
14#include <list>
15#include <deque>
16#include <stack>
17
18#include <time.h>
19
20#include <boost/lexical_cast.hpp>
21#include <boost/scoped_ptr.hpp>
22#include <boost/shared_ptr.hpp>
23#include <boost/bind.hpp>
24#include <boost/regex.hpp>
25
26#include "app_state.hh"
27#include "cert.hh"
28#include "constants.hh"
29#include "enumerator.hh"
30#include "keys.hh"
31#include "merkle_tree.hh"
32#include "netcmd.hh"
33#include "netio.hh"
34#include "netsync.hh"
35#include "numeric_vocab.hh"
36#include "packet.hh"
37#include "refiner.hh"
38#include "revision.hh"
39#include "sanity.hh"
40#include "transforms.hh"
41#include "ui.hh"
42#include "xdelta.hh"
43#include "epoch.hh"
44#include "platform.hh"
45#include "hmac.hh"
46#include "globish.hh"
47#include "uri.hh"
48
49#include "botan/botan.h"
50
51#include "netxx/address.h"
52#include "netxx/peer.h"
53#include "netxx/probe.h"
54#include "netxx/socket.h"
55#include "netxx/sockopt.h"
56#include "netxx/stream.h"
57#include "netxx/streamserver.h"
58#include "netxx/timeout.h"
59#include "netxx_pipe.hh"
60// TODO: things to do that will break protocol compatibility
61// -- need some way to upgrade anonymous to keyed pull, without user having
62// to explicitly specify which they want
63// just having a way to respond "access denied, try again" might work
64// but perhaps better to have the anonymous command include a note "I
65// _could_ use key <...> if you prefer", and if that would lead to more
66// access, could reply "I do prefer". (Does this lead to too much
67// information exposure? Allows anonymous people to probe what branches
68// a key has access to.)
69// -- "warning" packet type?
70// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
71// access to all of it, you just get the parts you have access to
72// (maybe with warnings about skipped branches). to do this right,
73// should have a way for the server to send back to the client "right,
74// you're not getting the following branches: ...", so the client will
75// not include them in its merkle trie.
76// -- add some sort of vhost field to the client's first packet, saying who
77// they expect to talk to
78
79//
80// This is the "new" network synchronization (netsync) system in
81// monotone. It is based on synchronizing pairs of merkle trees over an
82// interactive connection.
83//
84// A netsync process between peers treats each peer as either a source, a
85// sink, or both. When a peer is only a source, it will not write any new
86// items to its database. when a peer is only a sink, it will not send any
87// items from its database. When a peer is both a source and sink, it may
88// send and write items freely.
89//
90// The post-state of a netsync is that each sink contains a superset of the
91// items in its corresponding source; when peers are behaving as both
92// source and sink, this means that the post-state of the sync is for the
93// peers to have identical item sets.
94//
95//
96// Data structure
97// --------------
98//
99// Each node in a merkle tree contains a fixed number of slots. this number
100// is derived from a global parameter of the protocol -- the tree fanout --
101// such that the number of slots is 2^fanout. For now we will assume that
102// fanout is 4 thus there are 16 slots in a node, because this makes
103// illustration easier. The other parameter of the protocol is the size of
104// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
105//
106// Each slot in a merkle tree node is in one of 3 states:
107//
108// - empty
109// - leaf
110// - subtree
111//
112// In addition, each leaf contains a hash code which identifies an element
113// of the set being synchronized. Each subtree slot contains a hash code of
114// the node immediately beneath it in the merkle tree. Empty slots contain
115// no hash codes.
116//
117// Since empty slots have no hash code, they are represented implicitly by
118// a bitmap at the head of each merkle tree node. As an additional
119// integrity check, each merkle tree node contains a label indicating its
120// prefix in the tree, and a hash of its own contents.
121//
122// In total, then, the byte-level representation of a <160,4> merkle tree
123// node is as follows:
124//
125// 20 bytes - hash of the remaining bytes in the node
126// 1 byte - type of this node (manifest, file, key, mcert, fcert)
127// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
128// 0-20 bytes - the prefix of this node, 4 bits * level,
129// rounded up to a byte
130// 1-N bytes - number of leaves under this node (uleb128)
131// 4 bytes - slot-state bitmap of the node
132// 0-320 bytes - between 0 and 16 live slots in the node
133//
134// So, in the worst case such a node is 367 bytes, with these parameters.
135//
136//
137// Protocol
138// --------
139//
140// The protocol is a binary command-packet system over TCP; each packet
141// consists of a single byte which identifies the protocol version, a byte
142// which identifies the command name inside that version, a size_t sent as
143// a uleb128 indicating the length of the packet, that many bytes of
144// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
145// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
146// 20-byte random key chosen by the client after authentication (discussed
147// below). Decoding involves simply buffering until a sufficient number of
148// bytes are received, then advancing the buffer pointer. Any time an
149// integrity check (the HMAC) fails, the protocol is assumed to have lost
150// synchronization, and the connection is dropped. The parties are free to
151// drop the TCP stream at any point, if too much data is received or too
152// much idle time passes; no commitments or transactions are made.
153//
154//
155// Authentication and setup
156// ------------------------
157//
158// The exchange begins in a non-authenticated state. The server sends a
159// "hello <id> <nonce>" command, which identifies the server's RSA key and
160// issues a nonce which must be used for a subsequent authentication.
161//
162// The client then responds with either:
163//
164// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
165// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
166// role it wishes to play in the synchronization, identifies the pattern it
167// wishes to sync with, signs the previous nonce with its own key, and informs
168// the server of the HMAC key it wishes to use for this session (encrypted
169// with the server's public key); or
170//
171// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
172// <hmac key>" command, which identifies the role it wishes to play in the
173// synchronization, the pattern it wishes to sync with, and the HMAC key it
174// wishes to use for this session (also encrypted with the server's public
175// key).
176//
177// The server then replies with a "confirm" command, which contains no
178// other data but will only have the correct HMAC integrity code if the
179// server received and properly decrypted the HMAC key offered by the
180// client. This transitions the peers into an authenticated state and
181// begins epoch refinement. If epoch refinement and epoch transmission
182// succeed, the peers switch to data refinement and data transmission.
183//
184//
185// Refinement
186// ----------
187//
188// Refinement is executed by "refiners"; there is a refiner for each
189// set of 'items' being exchanged: epochs, keys, certs, and revisions.
190// When refinement starts, each party knows only their own set of
191// items; when refinement completes, each party has learned of the
192// complete set of items it needs to send, and a count of items it's
193// expecting to receive.
194//
195// For more details on the refinement process, see refiner.cc.
196//
197//
198// Transmission
199// ------------
200//
201// Once the set of items to send has been determined (for keys, certs, and
202// revisions) each peer switches into a transmission mode. This mode
203// involves walking the revision graph in ancestry-order and sending all
204// the items the local peer has which the remote one does not. Since the
205// remote and local peers both know all the items which need to be
206// transferred (they learned during refinement) they know what to wait for
207// and what to send. The mechanisms of the transmission phase (notably,
208// enumerator.cc) simply ensure that things are sent in the proper order,
209// and without over-filling the output buffer too much.
210//
211//
212// Shutdown
213// --------
214//
215// After transmission completes, one special command, "bye", is used to
216// shut down a connection gracefully. The shutdown sequence based on "bye"
217// commands is documented below in session::process_bye_cmd.
218//
219//
220// Note on epochs
221// --------------
222//
223// One refinement and transmission phase preceeds all the others: epochs.
224// Epochs are exchanged and compared in order to be sure that further
225// refinement and transmission (on certs and revisions) makes sense; they
226// are a sort of "immune system" to prevent incompatible databases (say
227// between rebuilds due to bugs in monotone) from cross-contaminating. The
228// later refinements are only kicked off *after* all epochs are received
229// and compare correctly.
230//
231//
232// Note on dense coding
233// --------------------
234//
235// This protocol is "raw binary" (non-text) because coding density is
236// actually important here, and each packet consists of very
237// information-dense material that you wouldn't have a hope of typing in,
238// or interpreting manually anyways.
239//
240
241using std::auto_ptr;
242using std::deque;
243using std::make_pair;
244using std::map;
245using std::min;
246using std::pair;
247using std::set;
248using std::string;
249using std::vector;
250
251using boost::shared_ptr;
252using boost::lexical_cast;
253
254static inline void
255require(bool check, string const & context)
256{
257 if (!check)
258 throw bad_decode(F("check of '%s' failed") % context);
259}
260
261struct netsync_error
262{
263 string msg;
264 netsync_error(string const & s): msg(s) {}
265};
266
267struct
268session:
269 public refiner_callbacks,
270 public enumerator_callbacks
271{
272 protocol_role role;
273 protocol_voice const voice;
274 globish const & our_include_pattern;
275 globish const & our_exclude_pattern;
276 globish_matcher our_matcher;
277 app_state & app;
278
279 string peer_id;
280 shared_ptr<Netxx::StreamBase> str;
281
282 string_queue inbuf;
283 // deque of pair<string data, size_t cur_pos>
284 deque< pair<string,size_t> > outbuf;
285 // the total data stored in outbuf - this is
286 // used as a valve to stop too much data
287 // backing up
288 size_t outbuf_size;
289
290 netcmd cmd;
291 bool armed;
292 bool arm();
293
294 id remote_peer_key_hash;
295 rsa_keypair_id remote_peer_key_name;
296 netsync_session_key session_key;
297 chained_hmac read_hmac;
298 chained_hmac write_hmac;
299 bool authenticated;
300
301 time_t last_io_time;
302 auto_ptr<ticker> byte_in_ticker;
303 auto_ptr<ticker> byte_out_ticker;
304 auto_ptr<ticker> cert_in_ticker;
305 auto_ptr<ticker> cert_out_ticker;
306 auto_ptr<ticker> revision_in_ticker;
307 auto_ptr<ticker> revision_out_ticker;
308 size_t bytes_in, bytes_out;
309 size_t certs_in, certs_out;
310 size_t revs_in, revs_out;
311 size_t keys_in, keys_out;
312 // used to identify this session to the netsync hooks.
313 // We can't just use saved_nonce, because that's blank for all
314 // anonymous connections and could lead to confusion.
315 size_t session_id;
316 static size_t session_count;
317
318 vector<revision_id> written_revisions;
319 vector<rsa_keypair_id> written_keys;
320 vector<cert> written_certs;
321
322 id saved_nonce;
323 packet_db_writer dbw;
324
325 enum
326 {
327 working_state,
328 shutdown_state,
329 confirmed_state
330 }
331 protocol_state;
332
333 bool encountered_error;
334
335 static const int no_error = 200;
336 static const int partial_transfer = 211;
337 static const int no_transfer = 212;
338
339 static const int not_permitted = 412;
340 static const int unknown_key = 422;
341 static const int mixing_versions = 432;
342
343 static const int role_mismatch = 512;
344 static const int bad_command = 521;
345
346 static const int failed_identification = 532;
347 //static const int bad_data = 541;
348
349 int error_code;
350
351 bool set_totals;
352
353 // Interface to refinement.
354 refiner epoch_refiner;
355 refiner key_refiner;
356 refiner cert_refiner;
357 refiner rev_refiner;
358
359 // Interface to ancestry grovelling.
360 revision_enumerator rev_enumerator;
361
362 // Enumerator_callbacks methods.
363 set<file_id> file_items_sent;
364 bool process_this_rev(revision_id const & rev);
365 bool queue_this_cert(hexenc<id> const & c);
366 bool queue_this_file(hexenc<id> const & f);
367 void note_file_data(file_id const & f);
368 void note_file_delta(file_id const & src, file_id const & dst);
369 void note_rev(revision_id const & rev);
370 void note_cert(hexenc<id> const & c);
371
372 session(protocol_role role,
373 protocol_voice voice,
374 globish const & our_include_pattern,
375 globish const & our_exclude_pattern,
376 app_state & app,
377 string const & peer,
378 shared_ptr<Netxx::StreamBase> sock);
379
380 virtual ~session();
381
382 void rev_written_callback(revision_id rid);
383 void key_written_callback(rsa_keypair_id kid);
384 void cert_written_callback(cert const & c);
385
386 id mk_nonce();
387 void mark_recent_io();
388
389 void set_session_key(string const & key);
390 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
391
392 void setup_client_tickers();
393 bool done_all_refinements();
394 bool queued_all_items();
395 bool received_all_items();
396 bool finished_working();
397 void maybe_step();
398 void maybe_say_goodbye(transaction_guard & guard);
399
400 void note_item_arrived(netcmd_item_type ty, id const & i);
401 void maybe_note_epochs_finished();
402 void note_item_sent(netcmd_item_type ty, id const & i);
403
404 Netxx::Probe::ready_type which_events() const;
405 bool read_some();
406 bool write_some();
407
408 void error(int errcode, string const & errmsg);
409
410 void write_netcmd_and_try_flush(netcmd const & cmd);
411
412 // Outgoing queue-writers.
413 void queue_bye_cmd(u8 phase);
414 void queue_error_cmd(string const & errmsg);
415 void queue_done_cmd(netcmd_item_type type, size_t n_items);
416 void queue_hello_cmd(rsa_keypair_id const & key_name,
417 base64<rsa_pub_key> const & pub_encoded,
418 id const & nonce);
419 void queue_anonymous_cmd(protocol_role role,
420 globish const & include_pattern,
421 globish const & exclude_pattern,
422 id const & nonce2,
423 base64<rsa_pub_key> server_key_encoded);
424 void queue_auth_cmd(protocol_role role,
425 globish const & include_pattern,
426 globish const & exclude_pattern,
427 id const & client,
428 id const & nonce1,
429 id const & nonce2,
430 string const & signature,
431 base64<rsa_pub_key> server_key_encoded);
432 void queue_confirm_cmd();
433 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
434 void queue_data_cmd(netcmd_item_type type,
435 id const & item,
436 string const & dat);
437 void queue_delta_cmd(netcmd_item_type type,
438 id const & base,
439 id const & ident,
440 delta const & del);
441
442 // Incoming dispatch-called methods.
443 bool process_error_cmd(string const & errmsg);
444 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
445 rsa_pub_key const & server_key,
446 id const & nonce);
447 bool process_bye_cmd(u8 phase, transaction_guard & guard);
448 bool process_anonymous_cmd(protocol_role role,
449 globish const & their_include_pattern,
450 globish const & their_exclude_pattern);
451 bool process_auth_cmd(protocol_role role,
452 globish const & their_include_pattern,
453 globish const & their_exclude_pattern,
454 id const & client,
455 id const & nonce1,
456 string const & signature);
457 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
458 bool process_done_cmd(netcmd_item_type type, size_t n_items);
459 bool process_data_cmd(netcmd_item_type type,
460 id const & item,
461 string const & dat);
462 bool process_delta_cmd(netcmd_item_type type,
463 id const & base,
464 id const & ident,
465 delta const & del);
466 bool process_usher_cmd(utf8 const & msg);
467
468 // The incoming dispatcher.
469 bool dispatch_payload(netcmd const & cmd,
470 transaction_guard & guard);
471
472 // Various helpers.
473 void assume_corresponding_role(protocol_role their_role);
474 void respond_to_confirm_cmd();
475 bool data_exists(netcmd_item_type type,
476 id const & item);
477 void load_data(netcmd_item_type type,
478 id const & item,
479 string & out);
480
481 void rebuild_merkle_trees(app_state & app,
482 set<branch_name> const & branches);
483
484 void send_all_data(netcmd_item_type ty, set<id> const & items);
485 void begin_service();
486 bool process(transaction_guard & guard);
487};
488size_t session::session_count = 0;
489
490session::session(protocol_role role,
491 protocol_voice voice,
492 globish const & our_include_pattern,
493 globish const & our_exclude_pattern,
494 app_state & app,
495 string const & peer,
496 shared_ptr<Netxx::StreamBase> sock) :
497 role(role),
498 voice(voice),
499 our_include_pattern(our_include_pattern),
500 our_exclude_pattern(our_exclude_pattern),
501 our_matcher(our_include_pattern, our_exclude_pattern),
502 app(app),
503 peer_id(peer),
504 str(sock),
505 inbuf(),
506 outbuf_size(0),
507 armed(false),
508 remote_peer_key_hash(""),
509 remote_peer_key_name(""),
510 session_key(constants::netsync_key_initializer),
511 read_hmac(constants::netsync_key_initializer, app.opts.use_transport_auth),
512 write_hmac(constants::netsync_key_initializer, app.opts.use_transport_auth),
513 authenticated(false),
514 last_io_time(::time(NULL)),
515 byte_in_ticker(NULL),
516 byte_out_ticker(NULL),
517 cert_in_ticker(NULL),
518 cert_out_ticker(NULL),
519 revision_in_ticker(NULL),
520 revision_out_ticker(NULL),
521 bytes_in(0), bytes_out(0),
522 certs_in(0), certs_out(0),
523 revs_in(0), revs_out(0),
524 keys_in(0), keys_out(0),
525 session_id(++session_count),
526 saved_nonce(""),
527 dbw(app),
528 protocol_state(working_state),
529 encountered_error(false),
530 error_code(no_transfer),
531 set_totals(false),
532 epoch_refiner(epoch_item, voice, *this),
533 key_refiner(key_item, voice, *this),
534 cert_refiner(cert_item, voice, *this),
535 rev_refiner(revision_item, voice, *this),
536 rev_enumerator(*this, app)
537{
538 dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
539 this, _1));
540 dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
541 this, _1));
542 dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
543 this, _1));
544}
545
546session::~session()
547{
548 if (protocol_state == confirmed_state)
549 error_code = no_error;
550 else if (error_code == no_transfer &&
551 (revs_in || revs_out ||
552 certs_in || certs_out ||
553 keys_in || keys_out))
554 error_code = partial_transfer;
555
556 vector<cert> unattached_certs;
557 map<revision_id, vector<cert> > revcerts;
558 for (vector<revision_id>::iterator i = written_revisions.begin();
559 i != written_revisions.end(); ++i)
560 revcerts.insert(make_pair(*i, vector<cert>()));
561 for (vector<cert>::iterator i = written_certs.begin();
562 i != written_certs.end(); ++i)
563 {
564 map<revision_id, vector<cert> >::iterator j;
565 j = revcerts.find(revision_id(i->ident));
566 if (j == revcerts.end())
567 unattached_certs.push_back(*i);
568 else
569 j->second.push_back(*i);
570 }
571
572 // if (role == sink_role || role == source_and_sink_role)
573 if (!written_keys.empty()
574 || !written_revisions.empty()
575 || !written_certs.empty())
576 {
577
578 //Keys
579 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
580 i != written_keys.end(); ++i)
581 {
582 app.lua.hook_note_netsync_pubkey_received(*i, session_id);
583 }
584
585 //Revisions
586 for (vector<revision_id>::iterator i = written_revisions.begin();
587 i != written_revisions.end(); ++i)
588 {
589 vector<cert> & ctmp(revcerts[*i]);
590 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
591 for (vector<cert>::const_iterator j = ctmp.begin();
592 j != ctmp.end(); ++j)
593 {
594 cert_value vtmp;
595 decode_base64(j->value, vtmp);
596 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
597 }
598 revision_data rdat;
599 app.db.get_revision(*i, rdat);
600 app.lua.hook_note_netsync_revision_received(*i, rdat, certs,
601 session_id);
602 }
603
604 //Certs (not attached to a new revision)
605 for (vector<cert>::iterator i = unattached_certs.begin();
606 i != unattached_certs.end(); ++i)
607 {
608 cert_value tmp;
609 decode_base64(i->value, tmp);
610 app.lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
611 i->name, tmp, session_id);
612 }
613 }
614 app.lua.hook_note_netsync_end(session_id, error_code,
615 bytes_in, bytes_out,
616 certs_in, certs_out,
617 revs_in, revs_out,
618 keys_in, keys_out);
619}
620
621bool
622session::process_this_rev(revision_id const & rev)
623{
624 id item;
625 decode_hexenc(rev.inner(), item);
626 return (rev_refiner.items_to_send.find(item)
627 != rev_refiner.items_to_send.end());
628}
629
630bool
631session::queue_this_cert(hexenc<id> const & c)
632{
633 id item;
634 decode_hexenc(c, item);
635 return (cert_refiner.items_to_send.find(item)
636 != cert_refiner.items_to_send.end());
637}
638
639bool
640session::queue_this_file(hexenc<id> const & f)
641{
642 return file_items_sent.find(file_id(f)) == file_items_sent.end();
643}
644
645void
646session::note_file_data(file_id const & f)
647{
648 if (role == sink_role)
649 return;
650 file_data fd;
651 id item;
652 decode_hexenc(f.inner(), item);
653 app.db.get_file_version(f, fd);
654 queue_data_cmd(file_item, item, fd.inner()());
655 file_items_sent.insert(f);
656}
657
658void
659session::note_file_delta(file_id const & src, file_id const & dst)
660{
661 if (role == sink_role)
662 return;
663 file_delta fdel;
664 id fid1, fid2;
665 decode_hexenc(src.inner(), fid1);
666 decode_hexenc(dst.inner(), fid2);
667 app.db.get_arbitrary_file_delta(src, dst, fdel);
668 queue_delta_cmd(file_item, fid1, fid2, fdel.inner());
669 file_items_sent.insert(dst);
670}
671
672void
673session::note_rev(revision_id const & rev)
674{
675 if (role == sink_role)
676 return;
677 revision_t rs;
678 id item;
679 decode_hexenc(rev.inner(), item);
680 app.db.get_revision(rev, rs);
681 data tmp;
682 write_revision(rs, tmp);
683 queue_data_cmd(revision_item, item, tmp());
684}
685
686void
687session::note_cert(hexenc<id> const & c)
688{
689 if (role == sink_role)
690 return;
691 id item;
692 decode_hexenc(c, item);
693 revision<cert> cert;
694 string str;
695 app.db.get_revision_cert(c, cert);
696 write_cert(cert.inner(), str);
697 queue_data_cmd(cert_item, item, str);
698}
699
700
701void session::rev_written_callback(revision_id rid)
702{
703 written_revisions.push_back(rid);
704}
705
706void session::key_written_callback(rsa_keypair_id kid)
707{
708 written_keys.push_back(kid);
709}
710
711void session::cert_written_callback(cert const & c)
712{
713 written_certs.push_back(c);
714}
715
716id
717session::mk_nonce()
718{
719 I(this->saved_nonce().size() == 0);
720 char buf[constants::merkle_hash_length_in_bytes];
721 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
722 constants::merkle_hash_length_in_bytes);
723 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
724 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
725 return this->saved_nonce;
726}
727
728void
729session::mark_recent_io()
730{
731 last_io_time = ::time(NULL);
732}
733
734void
735session::set_session_key(string const & key)
736{
737 session_key = netsync_session_key(key);
738 read_hmac.set_key(session_key);
739 write_hmac.set_key(session_key);
740}
741
742void
743session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
744{
745 if (app.opts.use_transport_auth)
746 {
747 keypair our_kp;
748 load_key_pair(app, app.opts.signing_key, our_kp);
749 string hmac_key;
750 decrypt_rsa(app.lua, app.opts.signing_key, our_kp.priv,
751 hmac_key_encrypted, hmac_key);
752 set_session_key(hmac_key);
753 }
754}
755
756void
757session::setup_client_tickers()
758{
759 // xgettext: please use short message and try to avoid multibytes chars
760 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
761 // xgettext: please use short message and try to avoid multibytes chars
762 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
763 if (role == sink_role)
764 {
765 // xgettext: please use short message and try to avoid multibytes chars
766 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
767 // xgettext: please use short message and try to avoid multibytes chars
768 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
769 }
770 else if (role == source_role)
771 {
772 // xgettext: please use short message and try to avoid multibytes chars
773 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
774 // xgettext: please use short message and try to avoid multibytes chars
775 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
776 }
777 else
778 {
779 I(role == source_and_sink_role);
780 // xgettext: please use short message and try to avoid multibytes chars
781 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
782 // xgettext: please use short message and try to avoid multibytes chars
783 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
784 }
785}
786
787bool
788session::done_all_refinements()
789{
790 bool all = rev_refiner.done
791 && cert_refiner.done
792 && key_refiner.done
793 && epoch_refiner.done;
794
795 if (all && !set_totals)
796 {
797 if (cert_out_ticker.get())
798 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
799
800 if (revision_out_ticker.get())
801 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
802
803 if (cert_in_ticker.get())
804 cert_in_ticker->set_total(cert_refiner.items_to_receive);
805
806 if (revision_in_ticker.get())
807 revision_in_ticker->set_total(rev_refiner.items_to_receive);
808
809 set_totals = true;
810 }
811 return all;
812}
813
814
815
816bool
817session::received_all_items()
818{
819 if (role == source_role)
820 return true;
821 bool all = rev_refiner.items_to_receive == 0
822 && cert_refiner.items_to_receive == 0
823 && key_refiner.items_to_receive == 0
824 && epoch_refiner.items_to_receive == 0;
825 return all;
826}
827
828bool
829session::finished_working()
830{
831 bool all = done_all_refinements()
832 && received_all_items()
833 && queued_all_items()
834 && rev_enumerator.done();
835 return all;
836}
837
838bool
839session::queued_all_items()
840{
841 if (role == sink_role)
842 return true;
843 bool all = rev_refiner.items_to_send.empty()
844 && cert_refiner.items_to_send.empty()
845 && key_refiner.items_to_send.empty()
846 && epoch_refiner.items_to_send.empty();
847 return all;
848}
849
850
851void
852session::maybe_note_epochs_finished()
853{
854 // Maybe there are outstanding epoch requests.
855 // These only matter if we're in sink or source-and-sink mode.
856 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
857 return;
858
859 // And maybe we haven't even finished the refinement.
860 if (!epoch_refiner.done)
861 return;
862
863 // If we ran into an error -- say a mismatched epoch -- don't do any
864 // further refinements.
865 if (encountered_error)
866 return;
867
868 // But otherwise, we're ready to go. Start the next
869 // set of refinements.
870 if (voice == client_voice)
871 {
872 L(FL("epoch refinement finished; beginning other refinements"));
873 key_refiner.begin_refinement();
874 cert_refiner.begin_refinement();
875 rev_refiner.begin_refinement();
876 }
877 else
878 L(FL("epoch refinement finished"));
879}
880
881static void
882decrement_if_nonzero(netcmd_item_type ty,
883 size_t & n)
884{
885 if (n == 0)
886 {
887 string typestr;
888 netcmd_item_type_to_string(ty, typestr);
889 E(false, F("underflow on count of %s items to receive") % typestr);
890 }
891 --n;
892 if (n == 0)
893 {
894 string typestr;
895 netcmd_item_type_to_string(ty, typestr);
896 L(FL("count of %s items to receive has reached zero") % typestr);
897 }
898}
899
900void
901session::note_item_arrived(netcmd_item_type ty, id const & ident)
902{
903 switch (ty)
904 {
905 case cert_item:
906 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
907 if (cert_in_ticker.get() != NULL)
908 ++(*cert_in_ticker);
909 ++certs_in;
910 break;
911 case revision_item:
912 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
913 if (revision_in_ticker.get() != NULL)
914 ++(*revision_in_ticker);
915 ++revs_in;
916 break;
917 case key_item:
918 decrement_if_nonzero(ty, key_refiner.items_to_receive);
919 ++keys_in;
920 break;
921 case epoch_item:
922 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
923 break;
924 default:
925 // No ticker for other things.
926 break;
927 }
928}
929
930
931
932void
933session::note_item_sent(netcmd_item_type ty, id const & ident)
934{
935 switch (ty)
936 {
937 case cert_item:
938 cert_refiner.items_to_send.erase(ident);
939 if (cert_out_ticker.get() != NULL)
940 ++(*cert_out_ticker);
941 ++certs_out;
942 break;
943 case revision_item:
944 rev_refiner.items_to_send.erase(ident);
945 if (revision_out_ticker.get() != NULL)
946 ++(*revision_out_ticker);
947 ++revs_out;
948 break;
949 case key_item:
950 key_refiner.items_to_send.erase(ident);
951 ++keys_out;
952 break;
953 case epoch_item:
954 epoch_refiner.items_to_send.erase(ident);
955 break;
956 default:
957 // No ticker for other things.
958 break;
959 }
960}
961
962void
963session::write_netcmd_and_try_flush(netcmd const & cmd)
964{
965 if (!encountered_error)
966 {
967 string buf;
968 cmd.write(buf, write_hmac);
969 outbuf.push_back(make_pair(buf, 0));
970 outbuf_size += buf.size();
971 }
972 else
973 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
974 // FIXME: this helps keep the protocol pipeline full but it seems to
975 // interfere with initial and final sequences. careful with it.
976 // write_some();
977 // read_some();
978}
979
980// This method triggers a special "error unwind" mode to netsync. In this
981// mode, all received data is ignored, and no new data is queued. We simply
982// stay connected long enough for the current write buffer to be flushed, to
983// ensure that our peer receives the error message.
984// Affects read_some, write_some, and process .
985void
986session::error(int errcode, string const & errmsg)
987{
988 error_code = errcode;
989 throw netsync_error(errmsg);
990}
991
992Netxx::Probe::ready_type
993session::which_events() const
994{
995 // Only ask to read if we're not armed.
996 if (outbuf.empty())
997 {
998 if (inbuf.size() < constants::netcmd_maxsz && !armed)
999 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1000 else
1001 return Netxx::Probe::ready_oobd;
1002 }
1003 else
1004 {
1005 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1006 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1007 else
1008 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1009 }
1010}
1011
1012bool
1013session::read_some()
1014{
1015 I(inbuf.size() < constants::netcmd_maxsz);
1016 char tmp[constants::bufsz];
1017 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
1018 if (count > 0)
1019 {
1020 L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id);
1021 if (encountered_error)
1022 {
1023 L(FL("in error unwind mode, so throwing them into the bit bucket"));
1024 return true;
1025 }
1026 inbuf.append(tmp,count);
1027 mark_recent_io();
1028 if (byte_in_ticker.get() != NULL)
1029 (*byte_in_ticker) += count;
1030 bytes_in += count;
1031 return true;
1032 }
1033 else
1034 return false;
1035}
1036
1037bool
1038session::write_some()
1039{
1040 I(!outbuf.empty());
1041 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1042 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
1043 min(writelen,
1044 constants::bufsz));
1045 if (count > 0)
1046 {
1047 if ((size_t)count == writelen)
1048 {
1049 outbuf_size -= outbuf.front().first.size();
1050 outbuf.pop_front();
1051 }
1052 else
1053 {
1054 outbuf.front().second += count;
1055 }
1056 L(FL("wrote %d bytes to fd %d (peer %s)")
1057 % count % str->get_socketfd() % peer_id);
1058 mark_recent_io();
1059 if (byte_out_ticker.get() != NULL)
1060 (*byte_out_ticker) += count;
1061 bytes_out += count;
1062 if (encountered_error && outbuf.empty())
1063 {
1064 // we've flushed our error message, so it's time to get out.
1065 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1066 return false;
1067 }
1068 return true;
1069 }
1070 else
1071 return false;
1072}
1073
1074// senders
1075
1076void
1077session::queue_error_cmd(string const & errmsg)
1078{
1079 L(FL("queueing 'error' command"));
1080 netcmd cmd;
1081 cmd.write_error_cmd(errmsg);
1082 write_netcmd_and_try_flush(cmd);
1083}
1084
1085void
1086session::queue_bye_cmd(u8 phase)
1087{
1088 L(FL("queueing 'bye' command, phase %d")
1089 % static_cast<size_t>(phase));
1090 netcmd cmd;
1091 cmd.write_bye_cmd(phase);
1092 write_netcmd_and_try_flush(cmd);
1093}
1094
1095void
1096session::queue_done_cmd(netcmd_item_type type,
1097 size_t n_items)
1098{
1099 string typestr;
1100 netcmd_item_type_to_string(type, typestr);
1101 L(FL("queueing 'done' command for %s (%d items)")
1102 % typestr % n_items);
1103 netcmd cmd;
1104 cmd.write_done_cmd(type, n_items);
1105 write_netcmd_and_try_flush(cmd);
1106}
1107
1108void
1109session::queue_hello_cmd(rsa_keypair_id const & key_name,
1110 base64<rsa_pub_key> const & pub_encoded,
1111 id const & nonce)
1112{
1113 rsa_pub_key pub;
1114 if (app.opts.use_transport_auth)
1115 decode_base64(pub_encoded, pub);
1116 cmd.write_hello_cmd(key_name, pub, nonce);
1117 write_netcmd_and_try_flush(cmd);
1118}
1119
1120void
1121session::queue_anonymous_cmd(protocol_role role,
1122 globish const & include_pattern,
1123 globish const & exclude_pattern,
1124 id const & nonce2,
1125 base64<rsa_pub_key> server_key_encoded)
1126{
1127 netcmd cmd;
1128 rsa_oaep_sha_data hmac_key_encrypted;
1129 if (app.opts.use_transport_auth)
1130 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1131 nonce2(), hmac_key_encrypted);
1132 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1133 hmac_key_encrypted);
1134 write_netcmd_and_try_flush(cmd);
1135 set_session_key(nonce2());
1136}
1137
1138void
1139session::queue_auth_cmd(protocol_role role,
1140 globish const & include_pattern,
1141 globish const & exclude_pattern,
1142 id const & client,
1143 id const & nonce1,
1144 id const & nonce2,
1145 string const & signature,
1146 base64<rsa_pub_key> server_key_encoded)
1147{
1148 netcmd cmd;
1149 rsa_oaep_sha_data hmac_key_encrypted;
1150 I(app.opts.use_transport_auth);
1151 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1152 nonce2(), hmac_key_encrypted);
1153 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1154 nonce1, hmac_key_encrypted, signature);
1155 write_netcmd_and_try_flush(cmd);
1156 set_session_key(nonce2());
1157}
1158
1159void
1160session::queue_confirm_cmd()
1161{
1162 netcmd cmd;
1163 cmd.write_confirm_cmd();
1164 write_netcmd_and_try_flush(cmd);
1165}
1166
1167void
1168session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1169{
1170 string typestr;
1171 hexenc<prefix> hpref;
1172 node.get_hex_prefix(hpref);
1173 netcmd_item_type_to_string(node.type, typestr);
1174 L(FL("queueing refinement %s of %s node '%s', level %d")
1175 % (ty == refinement_query ? "query" : "response")
1176 % typestr % hpref % static_cast<int>(node.level));
1177 netcmd cmd;
1178 cmd.write_refine_cmd(ty, node);
1179 write_netcmd_and_try_flush(cmd);
1180}
1181
1182void
1183session::queue_data_cmd(netcmd_item_type type,
1184 id const & item,
1185 string const & dat)
1186{
1187 string typestr;
1188 netcmd_item_type_to_string(type, typestr);
1189 hexenc<id> hid;
1190 encode_hexenc(item, hid);
1191
1192 if (role == sink_role)
1193 {
1194 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1195 % typestr % hid);
1196 return;
1197 }
1198
1199 L(FL("queueing %d bytes of data for %s item '%s'")
1200 % dat.size() % typestr % hid);
1201
1202 netcmd cmd;
1203 // TODO: This pair of functions will make two copies of a large
1204 // file, the first in cmd.write_data_cmd, and the second in
1205 // write_netcmd_and_try_flush when the data is copied from the
1206 // cmd.payload variable to the string buffer for output. This
1207 // double copy should be collapsed out, it may be better to use
1208 // a string_queue for output as well as input, as that will reduce
1209 // the amount of mallocs that happen when the string queue is large
1210 // enough to just store the data.
1211 cmd.write_data_cmd(type, item, dat);
1212 write_netcmd_and_try_flush(cmd);
1213 note_item_sent(type, item);
1214}
1215
1216void
1217session::queue_delta_cmd(netcmd_item_type type,
1218 id const & base,
1219 id const & ident,
1220 delta const & del)
1221{
1222 I(type == file_item);
1223 string typestr;
1224 netcmd_item_type_to_string(type, typestr);
1225 hexenc<id> base_hid;
1226 encode_hexenc(base, base_hid);
1227 hexenc<id> ident_hid;
1228 encode_hexenc(ident, ident_hid);
1229
1230 if (role == sink_role)
1231 {
1232 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1233 % typestr % base_hid % ident_hid);
1234 return;
1235 }
1236
1237 L(FL("queueing %s delta '%s' -> '%s'")
1238 % typestr % base_hid % ident_hid);
1239 netcmd cmd;
1240 cmd.write_delta_cmd(type, base, ident, del);
1241 write_netcmd_and_try_flush(cmd);
1242 note_item_sent(type, ident);
1243}
1244
1245
1246// processors
1247
1248bool
1249session::process_error_cmd(string const & errmsg)
1250{
1251 // "xxx string" with xxx being digits means there's an error code
1252 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1253 {
1254 try
1255 {
1256 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1257 if (err >= 100)
1258 {
1259 error_code = err;
1260 throw bad_decode(F("received network error: %s")
1261 % errmsg.substr(4));
1262 }
1263 }
1264 catch (boost::bad_lexical_cast)
1265 { // ok, so it wasn't a number
1266 }
1267 }
1268 throw bad_decode(F("received network error: %s") % errmsg);
1269}
1270
1271static const var_domain known_servers_domain = var_domain("known-servers");
1272
1273bool
1274session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1275 rsa_pub_key const & their_key,
1276 id const & nonce)
1277{
1278 I(this->remote_peer_key_hash().size() == 0);
1279 I(this->saved_nonce().size() == 0);
1280
1281 base64<rsa_pub_key> their_key_encoded;
1282
1283 if (app.opts.use_transport_auth)
1284 {
1285 hexenc<id> their_key_hash;
1286 encode_base64(their_key, their_key_encoded);
1287 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1288 L(FL("server key has name %s, hash %s") % their_keyname % their_key_hash);
1289 var_key their_key_key(known_servers_domain, var_name(peer_id));
1290 if (app.db.var_exists(their_key_key))
1291 {
1292 var_value expected_key_hash;
1293 app.db.get_var(their_key_key, expected_key_hash);
1294 if (expected_key_hash() != their_key_hash())
1295 {
1296 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1297 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1298 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1299 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1300 "it is also possible that the server key has just been changed\n"
1301 "remote host sent key %s\n"
1302 "I expected %s\n"
1303 "'%s unset %s %s' overrides this check")
1304 % their_key_hash % expected_key_hash
1305 % ui.prog_name % their_key_key.first % their_key_key.second);
1306 E(false, F("server key changed"));
1307 }
1308 }
1309 else
1310 {
1311 P(F("first time connecting to server %s\n"
1312 "I'll assume it's really them, but you might want to double-check\n"
1313 "their key's fingerprint: %s") % peer_id % their_key_hash);
1314 app.db.set_var(their_key_key, var_value(their_key_hash()));
1315 }
1316 if (!app.db.public_key_exists(their_key_hash))
1317 {
1318 W(F("saving public key for %s to database") % their_keyname);
1319 app.db.put_key(their_keyname, their_key_encoded);
1320 }
1321 {
1322 hexenc<id> hnonce;
1323 encode_hexenc(nonce, hnonce);
1324 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1325 % their_key_hash % hnonce);
1326 }
1327
1328 I(app.db.public_key_exists(their_key_hash));
1329
1330 // save their identity
1331 id their_key_hash_decoded;
1332 decode_hexenc(their_key_hash, their_key_hash_decoded);
1333 this->remote_peer_key_hash = their_key_hash_decoded;
1334 }
1335
1336 // clients always include in the synchronization set, every branch that the
1337 // user requested
1338 set<branch_name> all_branches, ok_branches;
1339 app.get_project().get_branch_list(all_branches);
1340 for (set<branch_name>::const_iterator i = all_branches.begin();
1341 i != all_branches.end(); i++)
1342 {
1343 if (our_matcher((*i)()))
1344 ok_branches.insert(*i);
1345 }
1346 rebuild_merkle_trees(app, ok_branches);
1347
1348 setup_client_tickers();
1349
1350 if (app.opts.use_transport_auth &&
1351 app.opts.signing_key() != "")
1352 {
1353 // get our key pair
1354 keypair our_kp;
1355 load_key_pair(app, app.opts.signing_key, our_kp);
1356
1357 // get the hash identifier for our pubkey
1358 hexenc<id> our_key_hash;
1359 id our_key_hash_raw;
1360 key_hash_code(app.opts.signing_key, our_kp.pub, our_key_hash);
1361 decode_hexenc(our_key_hash, our_key_hash_raw);
1362
1363 // make a signature
1364 base64<rsa_sha1_signature> sig;
1365 rsa_sha1_signature sig_raw;
1366 make_signature(app, app.opts.signing_key, our_kp.priv, nonce(), sig);
1367 decode_base64(sig, sig_raw);
1368
1369 // make a new nonce of our own and send off the 'auth'
1370 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1371 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1372 their_key_encoded);
1373 }
1374 else
1375 {
1376 queue_anonymous_cmd(this->role, our_include_pattern,
1377 our_exclude_pattern, mk_nonce(), their_key_encoded);
1378 }
1379
1380 app.lua.hook_note_netsync_start(session_id, "client", this->role,
1381 peer_id, their_keyname,
1382 our_include_pattern, our_exclude_pattern);
1383
1384 return true;
1385}
1386
1387bool
1388session::process_anonymous_cmd(protocol_role their_role,
1389 globish const & their_include_pattern,
1390 globish const & their_exclude_pattern)
1391{
1392 // Internally netsync thinks in terms of sources and sinks. Users like
1393 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1394 //
1395 // We therefore use the read/write terminology when dealing with the UI:
1396 // if the user asks to run a "read only" service, this means they are
1397 // willing to be a source but not a sink.
1398 //
1399 // nb: The "role" here is the role the *client* wants to play
1400 // so we need to check that the opposite role is allowed for us,
1401 // in our this->role field.
1402 //
1403
1404 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1405 peer_id, rsa_keypair_id(),
1406 their_include_pattern, their_exclude_pattern);
1407
1408 // Client must be a sink and server must be a source (anonymous
1409 // read-only), unless transport auth is disabled.
1410 //
1411 // If running in no-transport-auth mode, we operate anonymously and
1412 // permit adoption of any role.
1413
1414 if (app.opts.use_transport_auth)
1415 {
1416 if (their_role != sink_role)
1417 {
1418 this->saved_nonce = id("");
1419 error(not_permitted,
1420 F("rejected attempt at anonymous connection for write").str());
1421 }
1422
1423 if (this->role == sink_role)
1424 {
1425 this->saved_nonce = id("");
1426 error(role_mismatch,
1427 F("rejected attempt at anonymous connection while running as sink").str());
1428 }
1429 }
1430
1431 set<branch_name> all_branches, ok_branches;
1432 app.get_project().get_branch_list(all_branches);
1433 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1434 for (set<branch_name>::const_iterator i = all_branches.begin();
1435 i != all_branches.end(); i++)
1436 {
1437 if (their_matcher((*i)()))
1438 if (app.opts.use_transport_auth &&
1439 !app.lua.hook_get_netsync_read_permitted((*i)()))
1440 {
1441 error(not_permitted,
1442 (F("anonymous access to branch '%s' denied by server") % *i).str());
1443 }
1444 else
1445 ok_branches.insert(*i);
1446 }
1447
1448 if (app.opts.use_transport_auth)
1449 {
1450 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1451 % their_include_pattern % their_exclude_pattern);
1452 this->role = source_role;
1453 }
1454 else
1455 {
1456 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1457 % their_include_pattern % their_exclude_pattern);
1458 assume_corresponding_role(their_role);
1459 }
1460
1461 rebuild_merkle_trees(app, ok_branches);
1462
1463 this->remote_peer_key_name = rsa_keypair_id("");
1464 this->authenticated = true;
1465 return true;
1466}
1467
1468void
1469session::assume_corresponding_role(protocol_role their_role)
1470{
1471 // Assume the (possibly degraded) opposite role.
1472 switch (their_role)
1473 {
1474 case source_role:
1475 I(this->role != source_role);
1476 this->role = sink_role;
1477 break;
1478
1479 case source_and_sink_role:
1480 I(this->role == source_and_sink_role);
1481 break;
1482
1483 case sink_role:
1484 I(this->role != sink_role);
1485 this->role = source_role;
1486 break;
1487 }
1488}
1489
1490bool
1491session::process_auth_cmd(protocol_role their_role,
1492 globish const & their_include_pattern,
1493 globish const & their_exclude_pattern,
1494 id const & client,
1495 id const & nonce1,
1496 string const & signature)
1497{
1498 I(this->remote_peer_key_hash().size() == 0);
1499 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1500
1501 hexenc<id> their_key_hash;
1502 encode_hexenc(client, their_key_hash);
1503
1504 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1505
1506 if (!app.db.public_key_exists(their_key_hash))
1507 {
1508 // If it's not in the db, it still could be in the keystore if we
1509 // have the private key that goes with it.
1510 if (!app.keys.try_ensure_in_db(their_key_hash))
1511 {
1512 this->saved_nonce = id("");
1513
1514 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1515 peer_id, rsa_keypair_id("-unknown-"),
1516 their_include_pattern,
1517 their_exclude_pattern);
1518 error(unknown_key,
1519 (F("remote public key hash '%s' is unknown") % their_key_hash).str());
1520 }
1521 }
1522
1523 // Get their public key.
1524 rsa_keypair_id their_id;
1525 base64<rsa_pub_key> their_key;
1526 app.db.get_pubkey(their_key_hash, their_id, their_key);
1527
1528 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1529 peer_id, their_id,
1530 their_include_pattern, their_exclude_pattern);
1531
1532 // Check that they replied with the nonce we asked for.
1533 if (!(nonce1 == this->saved_nonce))
1534 {
1535 this->saved_nonce = id("");
1536 error(failed_identification,
1537 F("detected replay attack in auth netcmd").str());
1538 }
1539
1540 // Internally netsync thinks in terms of sources and sinks. users like
1541 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1542 //
1543 // We therefore use the read/write terminology when dealing with the UI:
1544 // if the user asks to run a "read only" service, this means they are
1545 // willing to be a source but not a sink.
1546 //
1547 // nb: The "their_role" here is the role the *client* wants to play
1548 // so we need to check that the opposite role is allowed for us,
1549 // in our this->role field.
1550
1551 // Client as sink, server as source (reading).
1552
1553 if (their_role == sink_role || their_role == source_and_sink_role)
1554 {
1555 if (this->role != source_role && this->role != source_and_sink_role)
1556 {
1557 this->saved_nonce = id("");
1558 error(not_permitted,
1559 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1560 % their_id % their_include_pattern % their_exclude_pattern).str());
1561 }
1562 }
1563
1564 set<branch_name> all_branches, ok_branches;
1565 app.get_project().get_branch_list(all_branches);
1566 for (set<branch_name>::const_iterator i = all_branches.begin();
1567 i != all_branches.end(); i++)
1568 {
1569 if (their_matcher((*i)()))
1570 {
1571 if (!app.lua.hook_get_netsync_read_permitted((*i)(), their_id))
1572 {
1573 error(not_permitted,
1574 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1575 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1576 }
1577 else
1578 ok_branches.insert(*i);
1579 }
1580 }
1581
1582 // If we're source_and_sink_role, continue even with no branches readable
1583 // eg. serve --db=empty.db
1584 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1585 % their_id % their_include_pattern % their_exclude_pattern);
1586
1587 // Client as source, server as sink (writing).
1588
1589 if (their_role == source_role || their_role == source_and_sink_role)
1590 {
1591 if (this->role != sink_role && this->role != source_and_sink_role)
1592 {
1593 this->saved_nonce = id("");
1594 error(not_permitted,
1595 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1596 % their_id % their_include_pattern % their_exclude_pattern).str());
1597 }
1598
1599 if (!app.lua.hook_get_netsync_write_permitted(their_id))
1600 {
1601 this->saved_nonce = id("");
1602 error(not_permitted,
1603 (F("denied '%s' write permission for '%s' excluding '%s'")
1604 % their_id % their_include_pattern % their_exclude_pattern).str());
1605 }
1606
1607 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1608 % their_id % their_include_pattern % their_exclude_pattern);
1609 }
1610
1611 rebuild_merkle_trees(app, ok_branches);
1612
1613 // Save their identity.
1614 this->remote_peer_key_hash = client;
1615
1616 // Check the signature.
1617 base64<rsa_sha1_signature> sig;
1618 encode_base64(rsa_sha1_signature(signature), sig);
1619 if (check_signature(app, their_id, their_key, nonce1(), sig))
1620 {
1621 // Get our private key and sign back.
1622 L(FL("client signature OK, accepting authentication"));
1623 this->authenticated = true;
1624 this->remote_peer_key_name = their_id;
1625
1626 assume_corresponding_role(their_role);
1627 return true;
1628 }
1629 else
1630 {
1631 error(failed_identification, (F("bad client signature")).str());
1632 }
1633 return false;
1634}
1635
1636bool
1637session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1638{
1639 string typestr;
1640 netcmd_item_type_to_string(node.type, typestr);
1641 L(FL("processing refine cmd for %s node at level %d")
1642 % typestr % node.level);
1643
1644 switch (node.type)
1645 {
1646 case file_item:
1647 W(F("Unexpected 'refine' command on non-refined item type"));
1648 break;
1649
1650 case key_item:
1651 key_refiner.process_refinement_command(ty, node);
1652 break;
1653
1654 case revision_item:
1655 rev_refiner.process_refinement_command(ty, node);
1656 break;
1657
1658 case cert_item:
1659 cert_refiner.process_refinement_command(ty, node);
1660 break;
1661
1662 case epoch_item:
1663 epoch_refiner.process_refinement_command(ty, node);
1664 break;
1665 }
1666 return true;
1667}
1668
1669bool
1670session::process_bye_cmd(u8 phase,
1671 transaction_guard & guard)
1672{
1673
1674// Ideal shutdown
1675// ~~~~~~~~~~~~~~~
1676//
1677// I/O events state transitions
1678// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1679// client: C_WORKING
1680// server: S_WORKING
1681// 0. [refinement, data, deltas, etc.]
1682// client: C_SHUTDOWN
1683// (client checkpoints here)
1684// 1. client -> "bye 0"
1685// 2. "bye 0" -> server
1686// server: S_SHUTDOWN
1687// (server checkpoints here)
1688// 3. "bye 1" <- server
1689// 4. client <- "bye 1"
1690// client: C_CONFIRMED
1691// 5. client -> "bye 2"
1692// 6. "bye 2" -> server
1693// server: S_CONFIRMED
1694// 7. [server drops connection]
1695//
1696//
1697// Affects of I/O errors or disconnections
1698// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1699// C_WORKING: report error and fault
1700// S_WORKING: report error and recover
1701// C_SHUTDOWN: report error and fault
1702// S_SHUTDOWN: report success and recover
1703// (and warn that client might falsely see error)
1704// C_CONFIRMED: report success
1705// S_CONFIRMED: report success
1706
1707 switch (phase)
1708 {
1709 case 0:
1710 if (voice == server_voice &&
1711 protocol_state == working_state)
1712 {
1713 protocol_state = shutdown_state;
1714 guard.do_checkpoint();
1715 queue_bye_cmd(1);
1716 }
1717 else
1718 error(bad_command, "unexpected bye phase 0 received");
1719 break;
1720
1721 case 1:
1722 if (voice == client_voice &&
1723 protocol_state == shutdown_state)
1724 {
1725 protocol_state = confirmed_state;
1726 queue_bye_cmd(2);
1727 }
1728 else
1729 error(bad_command, "unexpected bye phase 1 received");
1730 break;
1731
1732 case 2:
1733 if (voice == server_voice &&
1734 protocol_state == shutdown_state)
1735 {
1736 protocol_state = confirmed_state;
1737 return false;
1738 }
1739 else
1740 error(bad_command, "unexpected bye phase 2 received");
1741 break;
1742
1743 default:
1744 error(bad_command, (F("unknown bye phase %d received") % phase).str());
1745 }
1746
1747 return true;
1748}
1749
1750bool
1751session::process_done_cmd(netcmd_item_type type, size_t n_items)
1752{
1753 string typestr;
1754 netcmd_item_type_to_string(type, typestr);
1755 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1756 switch (type)
1757 {
1758 case file_item:
1759 W(F("Unexpected 'done' command on non-refined item type"));
1760 break;
1761
1762 case key_item:
1763 key_refiner.process_done_command(n_items);
1764 if (key_refiner.done && role != sink_role)
1765 send_all_data(key_item, key_refiner.items_to_send);
1766 break;
1767
1768 case revision_item:
1769 rev_refiner.process_done_command(n_items);
1770 break;
1771
1772 case cert_item:
1773 cert_refiner.process_done_command(n_items);
1774 break;
1775
1776 case epoch_item:
1777 epoch_refiner.process_done_command(n_items);
1778 if (epoch_refiner.done)
1779 {
1780 send_all_data(epoch_item, epoch_refiner.items_to_send);
1781 maybe_note_epochs_finished();
1782 }
1783 break;
1784 }
1785 return true;
1786}
1787
1788void
1789session::respond_to_confirm_cmd()
1790{
1791 epoch_refiner.begin_refinement();
1792}
1793
1794bool
1795session::data_exists(netcmd_item_type type,
1796 id const & item)
1797{
1798 hexenc<id> hitem;
1799 encode_hexenc(item, hitem);
1800 switch (type)
1801 {
1802 case key_item:
1803 return key_refiner.local_item_exists(item)
1804 || app.db.public_key_exists(hitem);
1805 case file_item:
1806 return app.db.file_version_exists(file_id(hitem));
1807 case revision_item:
1808 return rev_refiner.local_item_exists(item)
1809 || app.db.revision_exists(revision_id(hitem));
1810 case cert_item:
1811 return cert_refiner.local_item_exists(item)
1812 || app.db.revision_cert_exists(hitem);
1813 case epoch_item:
1814 return epoch_refiner.local_item_exists(item)
1815 || app.db.epoch_exists(epoch_id(hitem));
1816 }
1817 return false;
1818}
1819
1820void
1821session::load_data(netcmd_item_type type,
1822 id const & item,
1823 string & out)
1824{
1825 string typestr;
1826 netcmd_item_type_to_string(type, typestr);
1827 hexenc<id> hitem;
1828 encode_hexenc(item, hitem);
1829
1830 if (!data_exists(type, item))
1831 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1832 % typestr % hitem);
1833
1834 switch (type)
1835 {
1836 case epoch_item:
1837 {
1838 branch_name branch;
1839 epoch_data epoch;
1840 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1841 write_epoch(branch, epoch, out);
1842 }
1843 break;
1844 case key_item:
1845 {
1846 rsa_keypair_id keyid;
1847 base64<rsa_pub_key> pub_encoded;
1848 app.db.get_pubkey(hitem, keyid, pub_encoded);
1849 L(FL("public key '%s' is also called '%s'") % hitem % keyid);
1850 write_pubkey(keyid, pub_encoded, out);
1851 }
1852 break;
1853
1854 case revision_item:
1855 {
1856 revision_data mdat;
1857 data dat;
1858 app.db.get_revision(revision_id(hitem), mdat);
1859 out = mdat.inner()();
1860 }
1861 break;
1862
1863 case file_item:
1864 {
1865 file_data fdat;
1866 data dat;
1867 app.db.get_file_version(file_id(hitem), fdat);
1868 out = fdat.inner()();
1869 }
1870 break;
1871
1872 case cert_item:
1873 {
1874 revision<cert> c;
1875 app.db.get_revision_cert(hitem, c);
1876 string tmp;
1877 write_cert(c.inner(), out);
1878 }
1879 break;
1880 }
1881}
1882
1883bool
1884session::process_data_cmd(netcmd_item_type type,
1885 id const & item,
1886 string const & dat)
1887{
1888 hexenc<id> hitem;
1889 encode_hexenc(item, hitem);
1890
1891 string typestr;
1892 netcmd_item_type_to_string(type, typestr);
1893
1894 note_item_arrived(type, item);
1895 if (data_exists(type, item))
1896 {
1897 L(FL("%s '%s' already exists in our database") % typestr % hitem);
1898 if (type == epoch_item)
1899 maybe_note_epochs_finished();
1900 return true;
1901 }
1902
1903 switch (type)
1904 {
1905 case epoch_item:
1906 {
1907 branch_name branch;
1908 epoch_data epoch;
1909 read_epoch(dat, branch, epoch);
1910 L(FL("received epoch %s for branch %s") % epoch % branch);
1911 map<branch_name, epoch_data> epochs;
1912 app.db.get_epochs(epochs);
1913 map<branch_name, epoch_data>::const_iterator i;
1914 i = epochs.find(branch);
1915 if (i == epochs.end())
1916 {
1917 L(FL("branch %s has no epoch; setting epoch to %s") % branch % epoch);
1918 app.db.set_epoch(branch, epoch);
1919 }
1920 else
1921 {
1922 L(FL("branch %s already has an epoch; checking") % branch);
1923 // If we get here, then we know that the epoch must be
1924 // different, because if it were the same then the
1925 // if (epoch_exists()) branch up above would have been taken.
1926 // if somehow this is wrong, then we have broken epoch
1927 // hashing or something, which is very dangerous, so play it
1928 // safe...
1929 I(!(i->second == epoch));
1930
1931 // It is safe to call 'error' here, because if we get here,
1932 // then the current netcmd packet cannot possibly have
1933 // written anything to the database.
1934 error(mixing_versions,
1935 (F("Mismatched epoch on branch %s."
1936 " Server has '%s', client has '%s'.")
1937 % branch
1938 % (voice == server_voice ? i->second : epoch)
1939 % (voice == server_voice ? epoch : i->second)).str());
1940 }
1941 }
1942 maybe_note_epochs_finished();
1943 break;
1944
1945 case key_item:
1946 {
1947 rsa_keypair_id keyid;
1948 base64<rsa_pub_key> pub;
1949 read_pubkey(dat, keyid, pub);
1950 hexenc<id> tmp;
1951 key_hash_code(keyid, pub, tmp);
1952 if (! (tmp == hitem))
1953 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1954 " wanted '%s' got '%s'")
1955 % hitem % keyid % hitem % tmp);
1956 this->dbw.consume_public_key(keyid, pub);
1957 }
1958 break;
1959
1960 case cert_item:
1961 {
1962 cert c;
1963 read_cert(dat, c);
1964 hexenc<id> tmp;
1965 cert_hash_code(c, tmp);
1966 if (! (tmp == hitem))
1967 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
1968 this->dbw.consume_revision_cert(revision<cert>(c));
1969 }
1970 break;
1971
1972 case revision_item:
1973 {
1974 L(FL("received revision '%s'") % hitem);
1975 this->dbw.consume_revision_data(revision_id(hitem), revision_data(dat));
1976 }
1977 break;
1978
1979 case file_item:
1980 {
1981 L(FL("received file '%s'") % hitem);
1982 this->dbw.consume_file_data(file_id(hitem), file_data(dat));
1983 }
1984 break;
1985 }
1986 return true;
1987}
1988
1989bool
1990session::process_delta_cmd(netcmd_item_type type,
1991 id const & base,
1992 id const & ident,
1993 delta const & del)
1994{
1995 string typestr;
1996 netcmd_item_type_to_string(type, typestr);
1997 hexenc<id> hbase, hident;
1998 encode_hexenc(base, hbase);
1999 encode_hexenc(ident, hident);
2000
2001 pair<id,id> id_pair = make_pair(base, ident);
2002
2003 note_item_arrived(type, ident);
2004
2005 switch (type)
2006 {
2007 case file_item:
2008 {
2009 file_id src_file(hbase), dst_file(hident);
2010 this->dbw.consume_file_delta(src_file,
2011 dst_file,
2012 file_delta(del));
2013 }
2014 break;
2015
2016 default:
2017 L(FL("ignoring delta received for item type %s") % typestr);
2018 break;
2019 }
2020 return true;
2021}
2022
2023bool
2024session::process_usher_cmd(utf8 const & msg)
2025{
2026 if (msg().size())
2027 {
2028 if (msg()[0] == '!')
2029 P(F("Received warning from usher: %s") % msg().substr(1));
2030 else
2031 L(FL("Received greeting from usher: %s") % msg().substr(1));
2032 }
2033 netcmd cmdout;
2034 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2035 write_netcmd_and_try_flush(cmdout);
2036 L(FL("Sent reply."));
2037 return true;
2038}
2039
2040
2041void
2042session::send_all_data(netcmd_item_type ty, set<id> const & items)
2043{
2044 string typestr;
2045 netcmd_item_type_to_string(ty, typestr);
2046
2047 // Use temporary; passed arg will be invalidated during iteration.
2048 set<id> tmp = items;
2049
2050 for (set<id>::const_iterator i = tmp.begin();
2051 i != tmp.end(); ++i)
2052 {
2053 hexenc<id> hitem;
2054 encode_hexenc(*i, hitem);
2055
2056 if (data_exists(ty, *i))
2057 {
2058 string out;
2059 load_data(ty, *i, out);
2060 queue_data_cmd(ty, *i, out);
2061 }
2062 }
2063}
2064
2065bool
2066session::dispatch_payload(netcmd const & cmd,
2067 transaction_guard & guard)
2068{
2069
2070 switch (cmd.get_cmd_code())
2071 {
2072
2073 case error_cmd:
2074 {
2075 string errmsg;
2076 cmd.read_error_cmd(errmsg);
2077 return process_error_cmd(errmsg);
2078 }
2079 break;
2080
2081 case hello_cmd:
2082 require(! authenticated, "hello netcmd received when not authenticated");
2083 require(voice == client_voice, "hello netcmd received in client voice");
2084 {
2085 rsa_keypair_id server_keyname;
2086 rsa_pub_key server_key;
2087 id nonce;
2088 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2089 return process_hello_cmd(server_keyname, server_key, nonce);
2090 }
2091 break;
2092
2093 case bye_cmd:
2094 require(authenticated, "bye netcmd received when not authenticated");
2095 {
2096 u8 phase;
2097 cmd.read_bye_cmd(phase);
2098 return process_bye_cmd(phase, guard);
2099 }
2100 break;
2101
2102 case anonymous_cmd:
2103 require(! authenticated, "anonymous netcmd received when not authenticated");
2104 require(voice == server_voice, "anonymous netcmd received in server voice");
2105 require(role == source_role ||
2106 role == source_and_sink_role,
2107 "anonymous netcmd received in source or source/sink role");
2108 {
2109 protocol_role role;
2110 globish their_include_pattern, their_exclude_pattern;
2111 rsa_oaep_sha_data hmac_key_encrypted;
2112 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2113 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2114 "in %s mode\n")
2115 % their_include_pattern % their_exclude_pattern
2116 % (role == source_and_sink_role ? _("source and sink") :
2117 (role == source_role ? _("source") : _("sink"))));
2118
2119 set_session_key(hmac_key_encrypted);
2120 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2121 return false;
2122 queue_confirm_cmd();
2123 return true;
2124 }
2125 break;
2126
2127 case auth_cmd:
2128 require(! authenticated, "auth netcmd received when not authenticated");
2129 require(voice == server_voice, "auth netcmd received in server voice");
2130 {
2131 protocol_role role;
2132 string signature;
2133 globish their_include_pattern, their_exclude_pattern;
2134 id client, nonce1, nonce2;
2135 rsa_oaep_sha_data hmac_key_encrypted;
2136 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2137 client, nonce1, hmac_key_encrypted, signature);
2138
2139 hexenc<id> their_key_hash;
2140 encode_hexenc(client, their_key_hash);
2141 hexenc<id> hnonce1;
2142 encode_hexenc(nonce1, hnonce1);
2143
2144 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2145 "exclude '%s' in %s mode with nonce1 '%s'\n")
2146 % their_key_hash % their_include_pattern % their_exclude_pattern
2147 % (role == source_and_sink_role ? _("source and sink") :
2148 (role == source_role ? _("source") : _("sink")))
2149 % hnonce1);
2150
2151 set_session_key(hmac_key_encrypted);
2152
2153 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2154 client, nonce1, signature))
2155 return false;
2156 queue_confirm_cmd();
2157 return true;
2158 }
2159 break;
2160
2161 case confirm_cmd:
2162 require(! authenticated, "confirm netcmd received when not authenticated");
2163 require(voice == client_voice, "confirm netcmd received in client voice");
2164 {
2165 string signature;
2166 cmd.read_confirm_cmd();
2167 this->authenticated = true;
2168 respond_to_confirm_cmd();
2169 return true;
2170 }
2171 break;
2172
2173 case refine_cmd:
2174 require(authenticated, "refine netcmd received when authenticated");
2175 {
2176 merkle_node node;
2177 refinement_type ty;
2178 cmd.read_refine_cmd(ty, node);
2179 return process_refine_cmd(ty, node);
2180 }
2181 break;
2182
2183 case done_cmd:
2184 require(authenticated, "done netcmd received when not authenticated");
2185 {
2186 size_t n_items;
2187 netcmd_item_type type;
2188 cmd.read_done_cmd(type, n_items);
2189 return process_done_cmd(type, n_items);
2190 }
2191 break;
2192
2193 case data_cmd:
2194 require(authenticated, "data netcmd received when not authenticated");
2195 require(role == sink_role ||
2196 role == source_and_sink_role,
2197 "data netcmd received in source or source/sink role");
2198 {
2199 netcmd_item_type type;
2200 id item;
2201 string dat;
2202 cmd.read_data_cmd(type, item, dat);
2203 return process_data_cmd(type, item, dat);
2204 }
2205 break;
2206
2207 case delta_cmd:
2208 require(authenticated, "delta netcmd received when not authenticated");
2209 require(role == sink_role ||
2210 role == source_and_sink_role,
2211 "delta netcmd received in source or source/sink role");
2212 {
2213 netcmd_item_type type;
2214 id base, ident;
2215 delta del;
2216 cmd.read_delta_cmd(type, base, ident, del);
2217 return process_delta_cmd(type, base, ident, del);
2218 }
2219 break;
2220
2221 case usher_cmd:
2222 {
2223 utf8 greeting;
2224 cmd.read_usher_cmd(greeting);
2225 return process_usher_cmd(greeting);
2226 }
2227 break;
2228
2229 case usher_reply_cmd:
2230 return false; // Should not happen.
2231 break;
2232 }
2233 return false;
2234}
2235
2236// This kicks off the whole cascade starting from "hello".
2237void
2238session::begin_service()
2239{
2240 keypair kp;
2241 if (app.opts.use_transport_auth)
2242 app.keys.get_key_pair(app.opts.signing_key, kp);
2243 queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce());
2244}
2245
2246void
2247session::maybe_step()
2248{
2249 while (done_all_refinements()
2250 && !rev_enumerator.done()
2251 && outbuf_size < constants::bufsz * 10)
2252 {
2253 rev_enumerator.step();
2254 }
2255}
2256
2257void
2258session::maybe_say_goodbye(transaction_guard & guard)
2259{
2260 if (voice == client_voice
2261 && protocol_state == working_state
2262 && finished_working())
2263 {
2264 protocol_state = shutdown_state;
2265 guard.do_checkpoint();
2266 queue_bye_cmd(0);
2267 }
2268}
2269
2270bool
2271session::arm()
2272{
2273 if (!armed)
2274 {
2275 // Don't pack the buffer unnecessarily.
2276 if (outbuf_size > constants::bufsz * 10)
2277 return false;
2278
2279 if (cmd.read(inbuf, read_hmac))
2280 {
2281 armed = true;
2282 }
2283 }
2284 return armed;
2285}
2286
2287bool session::process(transaction_guard & guard)
2288{
2289 if (encountered_error)
2290 return true;
2291 try
2292 {
2293 if (!arm())
2294 return true;
2295
2296 armed = false;
2297 L(FL("processing %d byte input buffer from peer %s")
2298 % inbuf.size() % peer_id);
2299
2300 size_t sz = cmd.encoded_size();
2301 bool ret = dispatch_payload(cmd, guard);
2302
2303 if (inbuf.size() >= constants::netcmd_maxsz)
2304 W(F("input buffer for peer %s is overfull "
2305 "after netcmd dispatch") % peer_id);
2306
2307 guard.maybe_checkpoint(sz);
2308
2309 if (!ret)
2310 L(FL("finishing processing with '%d' packet")
2311 % cmd.get_cmd_code());
2312 return ret;
2313 }
2314 catch (bad_decode & bd)
2315 {
2316 W(F("protocol error while processing peer %s: '%s'")
2317 % peer_id % bd.what);
2318 return false;
2319 }
2320 catch (netsync_error & err)
2321 {
2322 W(F("error: %s") % err.msg);
2323 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2324 encountered_error = true;
2325 return true; // Don't terminate until we've send the error_cmd.
2326 }
2327}
2328
2329
2330static shared_ptr<Netxx::StreamBase>
2331build_stream_to_server(app_state & app,
2332 globish const & include_pattern,
2333 globish const & exclude_pattern,
2334 utf8 const & address,
2335 Netxx::port_type default_port,
2336 Netxx::Timeout timeout)
2337{
2338 shared_ptr<Netxx::StreamBase> server;
2339 uri u;
2340 vector<string> argv;
2341 if (parse_uri(address(), u)
2342 && app.lua.hook_get_netsync_connect_command(u,
2343 include_pattern,
2344 exclude_pattern,
2345 global_sanity.debug,
2346 argv))
2347 {
2348 I(argv.size() > 0);
2349 string cmd = argv[0];
2350 argv.erase(argv.begin());
2351 app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u);
2352 return shared_ptr<Netxx::StreamBase>
2353 (new Netxx::PipeStream(cmd, argv));
2354
2355 }
2356 else
2357 {
2358#ifdef USE_IPV6
2359 bool use_ipv6=true;
2360#else
2361 bool use_ipv6=false;
2362#endif
2363 Netxx::Address addr(address().c_str(),
2364 default_port, use_ipv6);
2365 return shared_ptr<Netxx::StreamBase>
2366 (new Netxx::Stream(addr, timeout));
2367 }
2368}
2369
2370static void
2371call_server(protocol_role role,
2372 globish const & include_pattern,
2373 globish const & exclude_pattern,
2374 app_state & app,
2375 utf8 const & address,
2376 Netxx::port_type default_port,
2377 unsigned long timeout_seconds)
2378{
2379 Netxx::PipeCompatibleProbe probe;
2380 transaction_guard guard(app.db);
2381
2382 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2383
2384 // FIXME: split into labels and convert to ace here.
2385
2386 P(F("connecting to %s") % address());
2387
2388 shared_ptr<Netxx::StreamBase> server
2389 = build_stream_to_server(app,
2390 include_pattern,
2391 exclude_pattern,
2392 address, default_port,
2393 timeout);
2394
2395
2396 // 'false' here means not to revert changes when the SockOpt
2397 // goes out of scope.
2398 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2399 socket_options.set_non_blocking();
2400
2401 session sess(role, client_voice,
2402 include_pattern,
2403 exclude_pattern,
2404 app, address(), server);
2405
2406 while (true)
2407 {
2408 bool armed = false;
2409 try
2410 {
2411 armed = sess.arm();
2412 }
2413 catch (bad_decode & bd)
2414 {
2415 E(false, F("protocol error while processing peer %s: '%s'")
2416 % sess.peer_id % bd.what);
2417 }
2418
2419 sess.maybe_step();
2420 sess.maybe_say_goodbye(guard);
2421
2422 probe.clear();
2423 probe.add(*(sess.str), sess.which_events());
2424 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2425 Netxx::Probe::ready_type event = res.second;
2426 Netxx::socket_type fd = res.first;
2427
2428 if (fd == -1 && !armed)
2429 {
2430 E(false, (F("timed out waiting for I/O with "
2431 "peer %s, disconnecting")
2432 % sess.peer_id));
2433 }
2434
2435 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2436
2437 if (event & Netxx::Probe::ready_read)
2438 all_io_clean = all_io_clean && sess.read_some();
2439
2440 if (event & Netxx::Probe::ready_write)
2441 all_io_clean = all_io_clean && sess.write_some();
2442
2443 if (armed)
2444 if (!sess.process(guard))
2445 {
2446 // Commit whatever work we managed to accomplish anyways.
2447 guard.commit();
2448
2449 // We failed during processing. This should only happen in
2450 // client voice when we have a decode exception, or received an
2451 // error from our server (which is translated to a decode
2452 // exception). We call these cases E() errors.
2453 E(false, F("processing failure while talking to "
2454 "peer %s, disconnecting")
2455 % sess.peer_id);
2456 return;
2457 }
2458
2459 if (!all_io_clean)
2460 {
2461 // Commit whatever work we managed to accomplish anyways.
2462 guard.commit();
2463
2464 // We had an I/O error. We must decide if this represents a
2465 // user-reported error or a clean disconnect. See protocol
2466 // state diagram in session::process_bye_cmd.
2467
2468 if (sess.protocol_state == session::confirmed_state)
2469 {
2470 P(F("successful exchange with %s")
2471 % sess.peer_id);
2472 return;
2473 }
2474 else if (sess.encountered_error)
2475 {
2476 P(F("peer %s disconnected after we informed them of error")
2477 % sess.peer_id);
2478 return;
2479 }
2480 else
2481 E(false, (F("I/O failure while talking to "
2482 "peer %s, disconnecting")
2483 % sess.peer_id));
2484 }
2485 }
2486}
2487
2488static void
2489drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2490 Netxx::socket_type fd)
2491{
2492 // This is a bit of a hack. Initially all "file descriptors" in
2493 // netsync were full duplex, so we could get away with indexing
2494 // sessions by their file descriptor.
2495 //
2496 // When using pipes in unix, it's no longer true: a session gets
2497 // entered in the session map under its read pipe fd *and* its write
2498 // pipe fd. When we're in such a situation the socket fd is "-1" and
2499 // we downcast to a PipeStream and use its read+write fds.
2500 //
2501 // When using pipes in windows, we use a full duplex pipe (named
2502 // pipe) so the socket-like abstraction holds.
2503
2504 I(fd != -1);
2505 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2506 I(i != sessions.end());
2507 shared_ptr<session> sess = i->second;
2508 fd = sess->str->get_socketfd();
2509 if (fd != -1)
2510 {
2511 sessions.erase(fd);
2512 }
2513 else
2514 {
2515 shared_ptr<Netxx::PipeStream> pipe =
2516 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2517 I(static_cast<bool>(pipe));
2518 I(pipe->get_writefd() != -1);
2519 I(pipe->get_readfd() != -1);
2520 sessions.erase(pipe->get_readfd());
2521 sessions.erase(pipe->get_writefd());
2522 }
2523}
2524
2525static void
2526arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2527 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2528 set<Netxx::socket_type> & armed_sessions)
2529{
2530 set<Netxx::socket_type> arm_failed;
2531 for (map<Netxx::socket_type,
2532 shared_ptr<session> >::const_iterator i = sessions.begin();
2533 i != sessions.end(); ++i)
2534 {
2535 i->second->maybe_step();
2536 try
2537 {
2538 if (i->second->arm())
2539 {
2540 L(FL("fd %d is armed") % i->first);
2541 armed_sessions.insert(i->first);
2542 }
2543 probe.add(*i->second->str, i->second->which_events());
2544 }
2545 catch (bad_decode & bd)
2546 {
2547 W(F("protocol error while processing peer %s: '%s', marking as bad")
2548 % i->second->peer_id % bd.what);
2549 arm_failed.insert(i->first);
2550 }
2551 }
2552 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2553 i != arm_failed.end(); ++i)
2554 {
2555 drop_session_associated_with_fd(sessions, *i);
2556 }
2557}
2558
2559static void
2560handle_new_connection(Netxx::Address & addr,
2561 Netxx::StreamServer & server,
2562 Netxx::Timeout & timeout,
2563 protocol_role role,
2564 globish const & include_pattern,
2565 globish const & exclude_pattern,
2566 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2567 app_state & app)
2568{
2569 L(FL("accepting new connection on %s : %s")
2570 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2571 Netxx::Peer client = server.accept_connection();
2572
2573 if (!client)
2574 {
2575 L(FL("accept() returned a dead client"));
2576 }
2577 else
2578 {
2579 P(F("accepted new client connection from %s : %s")
2580 % client.get_address() % lexical_cast<string>(client.get_port()));
2581
2582 // 'false' here means not to revert changes when the SockOpt
2583 // goes out of scope.
2584 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2585 socket_options.set_non_blocking();
2586
2587 shared_ptr<Netxx::Stream> str =
2588 shared_ptr<Netxx::Stream>
2589 (new Netxx::Stream(client.get_socketfd(), timeout));
2590
2591 shared_ptr<session> sess(new session(role, server_voice,
2592 include_pattern, exclude_pattern,
2593 app,
2594 lexical_cast<string>(client), str));
2595 sess->begin_service();
2596 sessions.insert(make_pair(client.get_socketfd(), sess));
2597 }
2598}
2599
2600static void
2601handle_read_available(Netxx::socket_type fd,
2602 shared_ptr<session> sess,
2603 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2604 set<Netxx::socket_type> & armed_sessions,
2605 bool & live_p)
2606{
2607 if (sess->read_some())
2608 {
2609 try
2610 {
2611 if (sess->arm())
2612 armed_sessions.insert(fd);
2613 }
2614 catch (bad_decode & bd)
2615 {
2616 W(F("protocol error while processing peer %s: '%s', disconnecting")
2617 % sess->peer_id % bd.what);
2618 drop_session_associated_with_fd(sessions, fd);
2619 live_p = false;
2620 }
2621 }
2622 else
2623 {
2624 switch (sess->protocol_state)
2625 {
2626 case session::working_state:
2627 P(F("peer %s read failed in working state (error)")
2628 % sess->peer_id);
2629 break;
2630
2631 case session::shutdown_state:
2632 P(F("peer %s read failed in shutdown state "
2633 "(possibly client misreported error)")
2634 % sess->peer_id);
2635 break;
2636
2637 case session::confirmed_state:
2638 P(F("peer %s read failed in confirmed state (success)")
2639 % sess->peer_id);
2640 break;
2641 }
2642 drop_session_associated_with_fd(sessions, fd);
2643 live_p = false;
2644 }
2645}
2646
2647
2648static void
2649handle_write_available(Netxx::socket_type fd,
2650 shared_ptr<session> sess,
2651 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2652 bool & live_p)
2653{
2654 if (!sess->write_some())
2655 {
2656 switch (sess->protocol_state)
2657 {
2658 case session::working_state:
2659 P(F("peer %s write failed in working state (error)")
2660 % sess->peer_id);
2661 break;
2662
2663 case session::shutdown_state:
2664 P(F("peer %s write failed in shutdown state "
2665 "(possibly client misreported error)")
2666 % sess->peer_id);
2667 break;
2668
2669 case session::confirmed_state:
2670 P(F("peer %s write failed in confirmed state (success)")
2671 % sess->peer_id);
2672 break;
2673 }
2674
2675 drop_session_associated_with_fd(sessions, fd);
2676 live_p = false;
2677 }
2678}
2679
2680static void
2681process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2682 set<Netxx::socket_type> & armed_sessions,
2683 transaction_guard & guard)
2684{
2685 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2686 i != armed_sessions.end(); ++i)
2687 {
2688 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2689 j = sessions.find(*i);
2690 if (j == sessions.end())
2691 continue;
2692 else
2693 {
2694 shared_ptr<session> sess = j->second;
2695 if (!sess->process(guard))
2696 {
2697 P(F("peer %s processing finished, disconnecting")
2698 % sess->peer_id);
2699 drop_session_associated_with_fd(sessions, *i);
2700 }
2701 }
2702 }
2703}
2704
2705static void
2706reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2707 unsigned long timeout_seconds)
2708{
2709 // Kill any clients which haven't done any i/o inside the timeout period
2710 // or who have exchanged all items and flushed their output buffers.
2711 set<Netxx::socket_type> dead_clients;
2712 time_t now = ::time(NULL);
2713 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2714 i = sessions.begin(); i != sessions.end(); ++i)
2715 {
2716 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2717 < static_cast<unsigned long>(now))
2718 {
2719 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2720 % i->first % i->second->peer_id);
2721 dead_clients.insert(i->first);
2722 }
2723 }
2724 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2725 i != dead_clients.end(); ++i)
2726 {
2727 drop_session_associated_with_fd(sessions, *i);
2728 }
2729}
2730
2731static void
2732serve_connections(protocol_role role,
2733 globish const & include_pattern,
2734 globish const & exclude_pattern,
2735 app_state & app,
2736 utf8 const & address,
2737 Netxx::port_type default_port,
2738 unsigned long timeout_seconds,
2739 unsigned long session_limit)
2740{
2741 Netxx::PipeCompatibleProbe probe;
2742
2743 Netxx::Timeout
2744 forever,
2745 timeout(static_cast<long>(timeout_seconds)),
2746 instant(0,1);
2747
2748 if (!app.opts.bind_port().empty())
2749 default_port = std::atoi(app.opts.bind_port().c_str());
2750#ifdef USE_IPV6
2751 bool use_ipv6=true;
2752#else
2753 bool use_ipv6=false;
2754#endif
2755 // This will be true when we try to bind while using IPv6. See comments
2756 // further down.
2757 bool try_again=false;
2758
2759 do
2760 {
2761 try
2762 {
2763 try_again = false;
2764
2765 Netxx::Address addr(use_ipv6);
2766
2767 if (!app.opts.bind_address().empty())
2768 addr.add_address(app.opts.bind_address().c_str(), default_port);
2769 else
2770 addr.add_all_addresses (default_port);
2771
2772 // If se use IPv6 and the initialisation of server fails, we want
2773 // to try again with IPv4. The reason is that someone may have
2774 // downloaded a IPv6-enabled monotone on a system that doesn't
2775 // have IPv6, and which might fail therefore.
2776 // On failure, Netxx::NetworkException is thrown, and we catch
2777 // it further down.
2778 try_again=use_ipv6;
2779
2780 Netxx::StreamServer server(addr, timeout);
2781
2782 // If we came this far, whatever we used (IPv6 or IPv4) was
2783 // accepted, so we don't need to try again any more.
2784 try_again=false;
2785
2786 const char *name = addr.get_name();
2787 P(F("beginning service on %s : %s")
2788 % (name != NULL ? name : _("<all interfaces>"))
2789 % lexical_cast<string>(addr.get_port()));
2790
2791 map<Netxx::socket_type, shared_ptr<session> > sessions;
2792 set<Netxx::socket_type> armed_sessions;
2793
2794 shared_ptr<transaction_guard> guard;
2795
2796 while (true)
2797 {
2798 probe.clear();
2799 armed_sessions.clear();
2800
2801 if (sessions.size() >= session_limit)
2802 W(F("session limit %d reached, some connections "
2803 "will be refused") % session_limit);
2804 else
2805 probe.add(server);
2806
2807 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
2808
2809 L(FL("i/o probe with %d armed") % armed_sessions.size());
2810 Netxx::socket_type fd;
2811 Netxx::Timeout how_long;
2812 if (sessions.empty())
2813 how_long = forever;
2814 else if (armed_sessions.empty())
2815 how_long = timeout;
2816 else
2817 how_long = instant;
2818 do
2819 {
2820 Netxx::Probe::result_type res = probe.ready(how_long);
2821 how_long = instant;
2822 Netxx::Probe::ready_type event = res.second;
2823 fd = res.first;
2824
2825 if (!guard)
2826 guard = shared_ptr<transaction_guard>(new transaction_guard(app.db));
2827
2828 I(guard);
2829
2830 if (fd == -1)
2831 {
2832 if (armed_sessions.empty())
2833 L(FL("timed out waiting for I/O (listening on %s : %s)")
2834 % addr.get_name() % lexical_cast<string>(addr.get_port()));
2835 }
2836
2837 // we either got a new connection
2838 else if (fd == server)
2839 handle_new_connection(addr, server, timeout, role,
2840 include_pattern, exclude_pattern,
2841 sessions, app);
2842
2843 // or an existing session woke up
2844 else
2845 {
2846 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2847 i = sessions.find(fd);
2848 if (i == sessions.end())
2849 {
2850 L(FL("got woken up for action on unknown fd %d") % fd);
2851 }
2852 else
2853 {
2854 probe.remove(*(i->second->str));
2855 shared_ptr<session> sess = i->second;
2856 bool live_p = true;
2857
2858 try
2859 {
2860 if (event & Netxx::Probe::ready_read)
2861 handle_read_available(fd, sess, sessions,
2862 armed_sessions, live_p);
2863
2864 if (live_p && (event & Netxx::Probe::ready_write))
2865 handle_write_available(fd, sess, sessions, live_p);
2866 }
2867 catch (Netxx::Exception &)
2868 {
2869 P(F("Network error on peer %s, disconnecting")
2870 % sess->peer_id);
2871 drop_session_associated_with_fd(sessions, fd);
2872 }
2873 if (live_p && (event & Netxx::Probe::ready_oobd))
2874 {
2875 P(F("got OOB from peer %s, disconnecting")
2876 % sess->peer_id);
2877 drop_session_associated_with_fd(sessions, fd);
2878 }
2879 }
2880 }
2881 }
2882 while (fd != -1);
2883 process_armed_sessions(sessions, armed_sessions, *guard);
2884 reap_dead_sessions(sessions, timeout_seconds);
2885
2886 if (sessions.empty())
2887 {
2888 // Let the guard die completely if everything's gone quiet.
2889 guard->commit();
2890 guard.reset();
2891 }
2892 }
2893 }
2894 // This exception is thrown when bind() fails somewhere in Netxx.
2895 catch (Netxx::NetworkException &)
2896 {
2897 // If we tried with IPv6 and failed, we want to try again using IPv4.
2898 if (try_again)
2899 {
2900 use_ipv6 = false;
2901 }
2902 // In all other cases, just rethrow the exception.
2903 else
2904 throw;
2905 }
2906 // This exception is thrown when there is no support for the type of
2907 // connection we want to do in the kernel, for example when a socket()
2908 // call fails somewhere in Netxx.
2909 catch (Netxx::Exception &)
2910 {
2911 // If we tried with IPv6 and failed, we want to try again using IPv4.
2912 if (try_again)
2913 {
2914 use_ipv6 = false;
2915 }
2916 // In all other cases, just rethrow the exception.
2917 else
2918 throw;
2919 }
2920 }
2921 while(try_again);
2922 }
2923
2924static void
2925serve_single_connection(shared_ptr<session> sess,
2926 unsigned long timeout_seconds)
2927{
2928 Netxx::PipeCompatibleProbe probe;
2929
2930 Netxx::Timeout
2931 forever,
2932 timeout(static_cast<long>(timeout_seconds)),
2933 instant(0,1);
2934
2935 P(F("beginning service on %s") % sess->peer_id);
2936
2937 sess->begin_service();
2938
2939 transaction_guard guard(sess->app.db);
2940
2941 map<Netxx::socket_type, shared_ptr<session> > sessions;
2942 set<Netxx::socket_type> armed_sessions;
2943
2944 if (sess->str->get_socketfd() == -1)
2945 {
2946 // Unix pipes are non-duplex, have two filedescriptors
2947 shared_ptr<Netxx::PipeStream> pipe =
2948 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2949 I(pipe);
2950 sessions[pipe->get_writefd()]=sess;
2951 sessions[pipe->get_readfd()]=sess;
2952 }
2953 else
2954 sessions[sess->str->get_socketfd()]=sess;
2955
2956 while (!sessions.empty())
2957 {
2958 probe.clear();
2959 armed_sessions.clear();
2960
2961 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
2962
2963 L(FL("i/o probe with %d armed") % armed_sessions.size());
2964 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
2965 : instant));
2966 Netxx::Probe::ready_type event = res.second;
2967 Netxx::socket_type fd = res.first;
2968
2969 if (fd == -1)
2970 {
2971 if (armed_sessions.empty())
2972 L(FL("timed out waiting for I/O (listening on %s)")
2973 % sess->peer_id);
2974 }
2975
2976 // an existing session woke up
2977 else
2978 {
2979 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2980 i = sessions.find(fd);
2981 if (i == sessions.end())
2982 {
2983 L(FL("got woken up for action on unknown fd %d") % fd);
2984 }
2985 else
2986 {
2987 shared_ptr<session> sess = i->second;
2988 bool live_p = true;
2989
2990 if (event & Netxx::Probe::ready_read)
2991 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
2992
2993 if (live_p && (event & Netxx::Probe::ready_write))
2994 handle_write_available(fd, sess, sessions, live_p);
2995
2996 if (live_p && (event & Netxx::Probe::ready_oobd))
2997 {
2998 P(F("got some OOB data on fd %d (peer %s), disconnecting")
2999 % fd % sess->peer_id);
3000 drop_session_associated_with_fd(sessions, fd);
3001 }
3002 }
3003 }
3004 process_armed_sessions(sessions, armed_sessions, guard);
3005 reap_dead_sessions(sessions, timeout_seconds);
3006 }
3007}
3008
3009
3010void
3011insert_with_parents(revision_id rev,
3012 refiner & ref,
3013 revision_enumerator & rev_enumerator,
3014 set<revision_id> & revs,
3015 app_state & app,
3016 ticker & revisions_ticker)
3017{
3018 deque<revision_id> work;
3019 work.push_back(rev);
3020 while (!work.empty())
3021 {
3022 revision_id rid = work.front();
3023 work.pop_front();
3024
3025 if (!null_id(rid) && revs.find(rid) == revs.end())
3026 {
3027 revs.insert(rid);
3028 ++revisions_ticker;
3029 id rev_item;
3030 decode_hexenc(rid.inner(), rev_item);
3031 ref.note_local_item(rev_item);
3032 vector<revision_id> parents;
3033 rev_enumerator.get_revision_parents(rid, parents);
3034 for (vector<revision_id>::const_iterator i = parents.begin();
3035 i != parents.end(); ++i)
3036 {
3037 work.push_back(*i);
3038 }
3039 }
3040 }
3041}
3042
3043void
3044session::rebuild_merkle_trees(app_state & app,
3045 set<branch_name> const & branchnames)
3046{
3047 P(F("finding items to synchronize:"));
3048 for (set<branch_name>::const_iterator i = branchnames.begin();
3049 i != branchnames.end(); ++i)
3050 L(FL("including branch %s") % *i);
3051
3052 // xgettext: please use short message and try to avoid multibytes chars
3053 ticker revisions_ticker(N_("revisions"), "r", 64);
3054 // xgettext: please use short message and try to avoid multibytes chars
3055 ticker certs_ticker(N_("certificates"), "c", 256);
3056 // xgettext: please use short message and try to avoid multibytes chars
3057 ticker keys_ticker(N_("keys"), "k", 1);
3058
3059 set<revision_id> revision_ids;
3060 set<rsa_keypair_id> inserted_keys;
3061
3062 {
3063 for (set<branch_name>::const_iterator i = branchnames.begin();
3064 i != branchnames.end(); ++i)
3065 {
3066 // Get branch certs.
3067 vector< revision<cert> > certs;
3068 // FIXME_PROJECTS: probably something like
3069 // app.get_project(i->project).get_branch_certs(i->branch)
3070 // or so.
3071 app.get_project().get_branch_certs(*i, certs);
3072 for (vector< revision<cert> >::const_iterator j = certs.begin();
3073 j != certs.end(); j++)
3074 {
3075 revision_id rid(j->inner().ident);
3076 insert_with_parents(rid, rev_refiner, rev_enumerator,
3077 revision_ids, app, revisions_ticker);
3078 // Branch certs go in here, others later on.
3079 hexenc<id> tmp;
3080 id item;
3081 cert_hash_code(j->inner(), tmp);
3082 decode_hexenc(tmp, item);
3083 cert_refiner.note_local_item(item);
3084 rev_enumerator.note_cert(rid, tmp);
3085 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3086 inserted_keys.insert(j->inner().key);
3087 }
3088 }
3089 }
3090
3091 {
3092 map<branch_name, epoch_data> epochs;
3093 app.db.get_epochs(epochs);
3094
3095 epoch_data epoch_zero(string(constants::epochlen, '0'));
3096 for (set<branch_name>::const_iterator i = branchnames.begin();
3097 i != branchnames.end(); ++i)
3098 {
3099 branch_name const & branch(*i);
3100 map<branch_name, epoch_data>::const_iterator j;
3101 j = epochs.find(branch);
3102
3103 // Set to zero any epoch which is not yet set.
3104 if (j == epochs.end())
3105 {
3106 L(FL("setting epoch on %s to zero") % branch);
3107 epochs.insert(make_pair(branch, epoch_zero));
3108 app.db.set_epoch(branch, epoch_zero);
3109 }
3110
3111 // Then insert all epochs into merkle tree.
3112 j = epochs.find(branch);
3113 I(j != epochs.end());
3114 epoch_id eid;
3115 id epoch_item;
3116 epoch_hash_code(j->first, j->second, eid);
3117 decode_hexenc(eid.inner(), epoch_item);
3118 epoch_refiner.note_local_item(epoch_item);
3119 }
3120 }
3121
3122 {
3123 typedef vector< pair<hexenc<id>,
3124 pair<revision_id, rsa_keypair_id> > > cert_idx;
3125
3126 cert_idx idx;
3127 app.db.get_revision_cert_nobranch_index(idx);
3128
3129 // Insert all non-branch certs reachable via these revisions
3130 // (branch certs were inserted earlier).
3131
3132 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3133 {
3134 hexenc<id> const & hash = i->first;
3135 revision_id const & ident = i->second.first;
3136 rsa_keypair_id const & key = i->second.second;
3137
3138 rev_enumerator.note_cert(ident, hash);
3139
3140 if (revision_ids.find(ident) == revision_ids.end())
3141 continue;
3142
3143 id item;
3144 decode_hexenc(hash, item);
3145 cert_refiner.note_local_item(item);
3146 ++certs_ticker;
3147 if (inserted_keys.find(key) == inserted_keys.end())
3148 inserted_keys.insert(key);
3149 }
3150 }
3151
3152 // Add any keys specified on the command line.
3153 for (vector<rsa_keypair_id>::const_iterator key
3154 = app.opts.keys_to_push.begin();
3155 key != app.opts.keys_to_push.end(); ++key)
3156 {
3157 if (inserted_keys.find(*key) == inserted_keys.end())
3158 {
3159 if (!app.db.public_key_exists(*key))
3160 {
3161 if (app.keys.key_pair_exists(*key))
3162 app.keys.ensure_in_database(*key);
3163 else
3164 W(F("Cannot find key '%s'") % *key);
3165 }
3166 inserted_keys.insert(*key);
3167 }
3168 }
3169
3170 // Insert all the keys.
3171 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3172 key != inserted_keys.end(); key++)
3173 {
3174 if (app.db.public_key_exists(*key))
3175 {
3176 base64<rsa_pub_key> pub_encoded;
3177 app.db.get_key(*key, pub_encoded);
3178 hexenc<id> keyhash;
3179 key_hash_code(*key, pub_encoded, keyhash);
3180 L(FL("noting key '%s' = '%s' to send") % *key % keyhash);
3181 id key_item;
3182 decode_hexenc(keyhash, key_item);
3183 key_refiner.note_local_item(key_item);
3184 ++keys_ticker;
3185 }
3186 }
3187
3188 rev_refiner.reindex_local_items();
3189 cert_refiner.reindex_local_items();
3190 key_refiner.reindex_local_items();
3191 epoch_refiner.reindex_local_items();
3192}
3193
3194void
3195run_netsync_protocol(protocol_voice voice,
3196 protocol_role role,
3197 utf8 const & addr,
3198 globish const & include_pattern,
3199 globish const & exclude_pattern,
3200 app_state & app)
3201{
3202 if (include_pattern().find_first_of("'\"") != string::npos)
3203 {
3204 W(F("include branch pattern contains a quote character:\n"
3205 "%s") % include_pattern());
3206 }
3207
3208 if (exclude_pattern().find_first_of("'\"") != string::npos)
3209 {
3210 W(F("exclude branch pattern contains a quote character:\n"
3211 "%s") % exclude_pattern());
3212 }
3213
3214 // We do not want to be killed by SIGPIPE from a network disconnect.
3215 ignore_sigpipe();
3216
3217 try
3218 {
3219 if (voice == server_voice)
3220 {
3221 if (app.opts.bind_stdio)
3222 {
3223 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3224 shared_ptr<session> sess(new session(role, server_voice,
3225 include_pattern, exclude_pattern,
3226 app, "stdio", str));
3227 serve_single_connection(sess,constants::netsync_timeout_seconds);
3228 }
3229 else
3230 serve_connections(role, include_pattern, exclude_pattern, app,
3231 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3232 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3233 static_cast<unsigned long>(constants::netsync_connection_limit));
3234 }
3235 else
3236 {
3237 I(voice == client_voice);
3238 call_server(role, include_pattern, exclude_pattern, app,
3239 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3240 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3241 }
3242 }
3243 catch (Netxx::NetworkException & e)
3244 {
3245 throw informative_failure((F("network error: %s") % e.what()).str());
3246 }
3247 catch (Netxx::Exception & e)
3248 {
3249 throw oops((F("network error: %s") % e.what()).str());;
3250 }
3251}
3252
3253// Local Variables:
3254// mode: C++
3255// fill-column: 76
3256// c-file-style: "gnu"
3257// indent-tabs-mode: nil
3258// End:
3259// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status