diff --git a/channeld/channel.c b/channeld/channel.c index 9f70d6618cb7..ee4359611de6 100644 --- a/channeld/channel.c +++ b/channeld/channel.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -44,7 +45,7 @@ #include /* stdin == requests, 3 == peer, 4 = gossip, 5 = HSM */ -#define REQ_FD STDIN_FILENO +#define MASTER_FD STDIN_FILENO #define PEER_FD 3 #define GOSSIP_FD 4 #define HSM_FD 5 @@ -92,12 +93,10 @@ struct peer { struct io_conn *peer_conn; struct daemon_conn gossip_client; - struct daemon_conn master; - /* If we're waiting for a specific reply, defer other messages. */ - enum channel_wire_type master_reply_type; - void (*handle_master_reply)(struct peer *peer, const u8 *msg); - struct msg_queue master_deferred; + /* Messages from master: we queue them since we might be waiting for + * a specific reply. */ + struct msg_queue from_master; struct timers timers; struct oneshot *commit_timer; @@ -337,13 +336,13 @@ static struct io_plan *handle_peer_funding_locked(struct io_conn *conn, "Wrong channel id in %s", tal_hex(trc, msg)); peer->funding_locked[REMOTE] = true; - daemon_conn_send(&peer->master, - take(towire_channel_got_funding_locked(peer, + wire_sync_write(MASTER_FD, + take(towire_channel_got_funding_locked(peer, &peer->remote_per_commit))); if (peer->funding_locked[LOCAL]) { - daemon_conn_send(&peer->master, - take(towire_channel_normal_operation(peer))); + wire_sync_write(MASTER_FD, + take(towire_channel_normal_operation(peer))); } send_announcement_signatures(peer); @@ -357,7 +356,7 @@ static void announce_channel(struct peer *peer) send_channel_update(peer, false); /* Tell the master that we just announced the channel, * so it may announce the node */ - daemon_conn_send(&peer->master, take(towire_channel_announced(peer))); + wire_sync_write(MASTER_FD, take(towire_channel_announced(peer))); } static struct io_plan *handle_peer_announcement_signatures(struct io_conn *conn, @@ -487,41 +486,41 @@ static void maybe_send_shutdown(struct peer *peer) peer->shutdown_sent[LOCAL] = true; } -/* Master has acknowledged that we're sending commitment, so send it. */ -static void handle_sending_commitsig_reply(struct peer *peer, const u8 *msg) +/* This queues other traffic from the master until we get reply. */ +static u8 *master_wait_sync_reply(const tal_t *ctx, + struct peer *peer, const u8 *msg, + enum channel_wire_type replytype) { - status_trace("Sending commit_sig with %zu htlc sigs", - tal_count(peer->next_commit_sigs->htlc_sigs)); + u8 *reply; - peer->next_index[REMOTE]++; + status_trace("Sending master %s", + channel_wire_type_name(fromwire_peektype(msg))); - msg = towire_commitment_signed(peer, &peer->channel_id, - &peer->next_commit_sigs->commit_sig, - peer->next_commit_sigs->htlc_sigs); - msg_enqueue(&peer->peer_out, take(msg)); - peer->next_commit_sigs = tal_free(peer->next_commit_sigs); + if (!wire_sync_write(MASTER_FD, msg)) + status_failed(WIRE_CHANNEL_INTERNAL_ERROR, + "Could not set sync write to master: %s", + strerror(errno)); - maybe_send_shutdown(peer); + status_trace("... , awaiting %s", + channel_wire_type_name(replytype)); - /* Timer now considered expired, you can add a new one. */ - peer->commit_timer = NULL; - start_commit_timer(peer); - - if (shutdown_complete(peer)) - io_break(peer); -} - -/* This blocks other traffic from the master until we get reply. */ -static void master_sync_reply(struct peer *peer, const u8 *msg, - enum channel_wire_type replytype, - void (*handle)(struct peer *peer, const u8 *msg)) -{ - assert(!peer->handle_master_reply); + for (;;) { + reply = wire_sync_read(ctx, MASTER_FD); + if (!reply) + status_failed(WIRE_CHANNEL_INTERNAL_ERROR, + "Could not set sync read from master: %s", + strerror(errno)); + if (fromwire_peektype(reply) == replytype) { + status_trace("Got it!"); + break; + } - peer->handle_master_reply = handle; - peer->master_reply_type = replytype; + status_trace("Nope, got %s instead", + channel_wire_type_name(fromwire_peektype(reply))); + msg_enqueue(&peer->from_master, take(reply)); + } - daemon_conn_send(&peer->master, msg); + return reply; } static struct commit_sigs *calc_commitsigs(const tal_t *ctx, @@ -615,10 +614,8 @@ static void send_commit(struct peer *peer) /* FIXME: Document this requirement in BOLT 2! */ /* We can't send two commits in a row. */ - if (channel_awaiting_revoke_and_ack(peer->channel) - || peer->handle_master_reply) { - status_trace("Can't send commit: waiting for revoke_and_ack %s", - peer->handle_master_reply ? "processing" : "reply"); + if (channel_awaiting_revoke_and_ack(peer->channel)) { + status_trace("Can't send commit: waiting for revoke_and_ack"); /* Mark this as done and try again. */ peer->commit_timer = NULL; start_commit_timer(peer); @@ -655,9 +652,30 @@ static void send_commit(struct peer *peer) changed_htlcs, &peer->next_commit_sigs->commit_sig, peer->next_commit_sigs->htlc_sigs); - master_sync_reply(peer, take(msg), - WIRE_CHANNEL_SENDING_COMMITSIG_REPLY, - handle_sending_commitsig_reply); + /* Message is empty; receiving it is the point. */ + master_wait_sync_reply(tmpctx, peer, take(msg), + WIRE_CHANNEL_SENDING_COMMITSIG_REPLY); + + status_trace("Sending commit_sig with %zu htlc sigs", + tal_count(peer->next_commit_sigs->htlc_sigs)); + + peer->next_index[REMOTE]++; + + msg = towire_commitment_signed(peer, &peer->channel_id, + &peer->next_commit_sigs->commit_sig, + peer->next_commit_sigs->htlc_sigs); + msg_enqueue(&peer->peer_out, take(msg)); + peer->next_commit_sigs = tal_free(peer->next_commit_sigs); + + maybe_send_shutdown(peer); + + /* Timer now considered expired, you can add a new one. */ + peer->commit_timer = NULL; + start_commit_timer(peer); + + if (shutdown_complete(peer)) + io_break(peer); + tal_free(tmpctx); } @@ -834,12 +852,6 @@ static u8 *got_commitsig_msg(const tal_t *ctx, return msg; } -/* Tell peer to continue now master has replied. */ -static void handle_reply_wake_peer(struct peer *peer, const u8 *msg) -{ - io_wake(peer); -} - static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, struct peer *peer, const u8 *msg) { @@ -957,12 +969,10 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, msg = got_commitsig_msg(tmpctx, peer->next_index[LOCAL], &commit_sig, htlc_sigs, changed_htlcs, txs[0]); - master_sync_reply(peer, take(msg), - WIRE_CHANNEL_GOT_COMMITSIG_REPLY, - handle_reply_wake_peer); - - /* And peer waits for reply. */ - return io_wait(conn, peer, send_revocation, peer); + master_wait_sync_reply(tmpctx, peer, take(msg), + WIRE_CHANNEL_GOT_COMMITSIG_REPLY); + tal_free(tmpctx); + return send_revocation(conn, peer); } static u8 *got_revoke_msg(const tal_t *ctx, u64 revoke_num, @@ -1013,6 +1023,7 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, struct privkey privkey; struct channel_id channel_id; struct pubkey per_commit_point, next_per_commit; + tal_t *tmpctx = tal_tmpctx(msg); const struct htlc **changed_htlcs = tal_arr(msg, const struct htlc *, 0); if (!fromwire_revoke_and_ack(msg, NULL, &channel_id, &old_commit_secret, @@ -1059,12 +1070,11 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, status_trace("No commits outstanding after recv revoke_and_ack"); /* Tell master about things this locks in, wait for response */ - msg = got_revoke_msg(msg, peer->next_index[REMOTE] - 2, + msg = got_revoke_msg(tmpctx, peer->next_index[REMOTE] - 2, &old_commit_secret, &next_per_commit, changed_htlcs); - master_sync_reply(peer, take(msg), - WIRE_CHANNEL_GOT_REVOKE_REPLY, - handle_reply_wake_peer); + master_wait_sync_reply(tmpctx, peer, take(msg), + WIRE_CHANNEL_GOT_REVOKE_REPLY); peer->old_remote_per_commit = peer->remote_per_commit; peer->remote_per_commit = next_per_commit; @@ -1075,8 +1085,8 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, type_to_string(trc, struct pubkey, &peer->old_remote_per_commit)); - /* And peer waits for reply. */ - return io_wait(conn, peer, accepted_revocation, peer); + tal_free(tmpctx); + return accepted_revocation(conn, peer); } static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, @@ -1275,8 +1285,8 @@ static struct io_plan *handle_pong(struct io_conn *conn, status_failed(WIRE_CHANNEL_PEER_READ_FAILED, "Unexpected pong"); peer->num_pings_outstanding--; - daemon_conn_send(&peer->master, - take(towire_channel_ping_reply(pong, tal_len(pong)))); + wire_sync_write(MASTER_FD, + take(towire_channel_ping_reply(pong, tal_len(pong)))); return peer_read_message(conn, &peer->pcs, peer_in); } @@ -1291,8 +1301,8 @@ static struct io_plan *handle_peer_shutdown(struct io_conn *conn, status_failed(WIRE_CHANNEL_PEER_READ_FAILED, "Bad shutdown"); /* Tell master, it will tell us what to send (if any). */ - daemon_conn_send(&peer->master, - take(towire_channel_got_shutdown(peer, scriptpubkey))); + wire_sync_write(MASTER_FD, + take(towire_channel_got_shutdown(peer, scriptpubkey))); peer->shutdown_sent[REMOTE] = true; if (shutdown_complete(peer)) @@ -1681,8 +1691,8 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg) peer->funding_locked[LOCAL] = true; if (peer->funding_locked[REMOTE]) { - daemon_conn_send(&peer->master, - take(towire_channel_normal_operation(peer))); + wire_sync_write(MASTER_FD, + take(towire_channel_normal_operation(peer))); } } @@ -1737,7 +1747,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg) /* Tell the master. */ msg = towire_channel_offer_htlc_reply(inmsg, peer->htlc_id, 0, NULL); - daemon_conn_send(&peer->master, take(msg)); + wire_sync_write(MASTER_FD, take(msg)); peer->htlc_id++; return; case CHANNEL_ERR_INVALID_EXPIRY: @@ -1776,7 +1786,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg) /* Note: tal_fmt doesn't set tal_len() to exact length, so fix here. */ tal_resize(&failmsg, strlen(failmsg)+1); msg = towire_channel_offer_htlc_reply(inmsg, 0, failcode, (u8*)failmsg); - daemon_conn_send(&peer->master, take(msg)); + wire_sync_write(MASTER_FD, take(msg)); } static void handle_preimage(struct peer *peer, const u8 *inmsg) @@ -1885,8 +1895,8 @@ static void handle_ping_cmd(struct peer *peer, const u8 *inmsg) * it MUST ignore the `ping`. */ if (num_pong_bytes >= 65532) - daemon_conn_send(&peer->master, - take(towire_channel_ping_reply(peer, 0))); + wire_sync_write(MASTER_FD, + take(towire_channel_ping_reply(peer, 0))); else peer->num_pings_outstanding++; } @@ -1903,51 +1913,31 @@ static void handle_shutdown_cmd(struct peer *peer, const u8 *inmsg) start_commit_timer(peer); } -static struct io_plan *req_in(struct io_conn *conn, struct daemon_conn *master) +static void req_in(struct peer *peer, const u8 *msg) { - struct peer *peer = container_of(master, struct peer, master); - enum channel_wire_type t = fromwire_peektype(master->msg_in); - - /* Waiting for something specific? Defer others. */ - if (peer->handle_master_reply) { - void (*handle)(struct peer *peer, const u8 *msg); - - if (t != peer->master_reply_type) { - msg_enqueue(&peer->master_deferred, - take(master->msg_in)); - master->msg_in = NULL; - goto out_next; - } - - /* Just in case it resets this. */ - handle = peer->handle_master_reply; - peer->handle_master_reply = NULL; - - handle(peer, master->msg_in); - goto out; - } + enum channel_wire_type t = fromwire_peektype(msg); switch (t) { case WIRE_CHANNEL_FUNDING_LOCKED: - handle_funding_locked(peer, master->msg_in); + handle_funding_locked(peer, msg); goto out; case WIRE_CHANNEL_FUNDING_ANNOUNCE_DEPTH: - handle_funding_announce_depth(peer, master->msg_in); + handle_funding_announce_depth(peer, msg); goto out; case WIRE_CHANNEL_OFFER_HTLC: - handle_offer_htlc(peer, master->msg_in); + handle_offer_htlc(peer, msg); goto out; case WIRE_CHANNEL_FULFILL_HTLC: - handle_preimage(peer, master->msg_in); + handle_preimage(peer, msg); goto out; case WIRE_CHANNEL_FAIL_HTLC: - handle_fail(peer, master->msg_in); + handle_fail(peer, msg); goto out; case WIRE_CHANNEL_PING: - handle_ping_cmd(peer, master->msg_in); + handle_ping_cmd(peer, msg); goto out; case WIRE_CHANNEL_SEND_SHUTDOWN: - handle_shutdown_cmd(peer, master->msg_in); + handle_shutdown_cmd(peer, msg); goto out; case WIRE_CHANNEL_BAD_COMMAND: @@ -1978,25 +1968,7 @@ static struct io_plan *req_in(struct io_conn *conn, struct daemon_conn *master) channel_wire_type_name(t)); out: - /* In case we've now processed reply, process packet backlog. */ - if (!peer->handle_master_reply) { - const u8 *msg = msg_dequeue(&peer->master_deferred); - if (msg) { - /* Free old packet exactly like daemon_conn_read_next */ - master->msg_in = tal_free(master->msg_in); - master->msg_in = cast_const(u8 *, tal_steal(peer, msg)); - return req_in(conn, master); - } - } - -out_next: - return daemon_conn_read_next(conn, master); -} - -static void master_gone(struct io_conn *unused, struct daemon_conn *dc) -{ - /* Can't tell master, it's gone. */ - exit(2); + tal_free(msg); } /* We do this synchronously. */ @@ -2021,9 +1993,9 @@ static void init_channel(struct peer *peer) u8 *msg; u32 feerate_per_kw; - assert(!(fcntl(REQ_FD, F_GETFL) & O_NONBLOCK)); + assert(!(fcntl(MASTER_FD, F_GETFL) & O_NONBLOCK)); - msg = wire_sync_read(peer, REQ_FD); + msg = wire_sync_read(peer, MASTER_FD); if (!fromwire_channel_init(peer, msg, NULL, &peer->chain_hash, &funding_txid, &funding_txout, @@ -2070,9 +2042,7 @@ static void init_channel(struct peer *peer) status_failed(WIRE_CHANNEL_BAD_COMMAND, "Init: %s", tal_hex(msg, msg)); - /* After this we'll be async, so set up now. */ - daemon_conn_init(peer, &peer->master, REQ_FD, req_in, master_gone); - status_setup_async(&peer->master); + status_setup_sync(MASTER_FD); status_trace("init %s: remote_per_commit = %s, old_remote_per_commit = %s" " next_idx_local = %"PRIu64 @@ -2156,21 +2126,72 @@ static void send_shutdown_complete(struct peer *peer) } /* Now we can tell master shutdown is complete. */ - daemon_conn_send(&peer->master, - take(towire_channel_shutdown_complete(peer, - &peer->pcs.cs))); - daemon_conn_send_fd(&peer->master, PEER_FD); - daemon_conn_send_fd(&peer->master, GOSSIP_FD); - - if (!daemon_conn_sync_flush(&peer->master)) - status_failed(WIRE_CHANNEL_INTERNAL_ERROR, "Flushing master"); + wire_sync_write(MASTER_FD, + take(towire_channel_shutdown_complete(peer, + &peer->pcs.cs))); + fdpass_send(MASTER_FD, PEER_FD); + fdpass_send(MASTER_FD, GOSSIP_FD); + close(MASTER_FD); +} + +static bool process_reqs(struct peer *peer) +{ + const u8 *msg; + bool changed = false; + + /* In case we've deferred, process packet backlog. */ + while ((msg = msg_dequeue(&peer->from_master)) != NULL) { + status_trace("Now dealing with deferred %s", + channel_wire_type_name(fromwire_peektype(msg))); + req_in(peer, msg); + changed = true; + } + + return changed; +} + +static struct peer *peer; + +/* If this becomes a common pattern, we could make it a helper in common/ */ +static int poll_with_masterfd(struct pollfd *fds, nfds_t nfds, int timeout) +{ + struct pollfd *fds_plus; + int r; + + /* This can change things, so return as if poll found nothing. */ + if (process_reqs(peer)) + return 0; + + /* Add master fd to fds. */ + fds_plus = tal_dup_arr(peer, struct pollfd, fds, nfds, 1); + fds_plus[nfds].fd = MASTER_FD; + fds_plus[nfds].events = POLLIN; + fds_plus[nfds].revents = 0; + + r = poll(fds_plus, nfds+1, timeout); + if (r > 0) { + if (fds_plus[nfds].revents & POLLIN) { + u8 *msg = wire_sync_read(peer, MASTER_FD); + + if (!msg) + status_failed(WIRE_CHANNEL_BAD_COMMAND, + "Can't read command: %s", + strerror(errno)); + msg_enqueue(&peer->from_master, take(msg)); + r--; + } else if (fds_plus[nfds].revents & (POLLHUP|POLLNVAL|POLLERR)) + /* Can't report error, master gone. */ + errx(2, "Error polling master fd"); + } + /* Copy back revents values */ + memcpy(fds, fds_plus, nfds * sizeof(*fds)); + tal_free(fds_plus); + return r; } int main(int argc, char *argv[]) { - struct peer *peer = tal(NULL, struct peer); int i; - if (argc == 2 && streq(argv[1], "--version")) { printf("%s\n", version()); exit(0); @@ -2183,14 +2204,13 @@ int main(int argc, char *argv[]) secp256k1_ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY | SECP256K1_CONTEXT_SIGN); + peer = tal(NULL, struct peer); peer->num_pings_outstanding = 0; timers_init(&peer->timers, time_mono()); peer->commit_timer = NULL; peer->have_sigs[LOCAL] = peer->have_sigs[REMOTE] = false; peer->announce_depth_reached = false; - peer->handle_master_reply = NULL; - peer->master_reply_type = 0; - msg_queue_init(&peer->master_deferred, peer); + msg_queue_init(&peer->from_master, peer); msg_queue_init(&peer->peer_out, peer); peer->next_commit_sigs = NULL; peer->shutdown_sent[LOCAL] = false; @@ -2212,6 +2232,9 @@ int main(int argc, char *argv[]) /* Read init_channel message sync. */ init_channel(peer); + /* Make sure we process and listen for master msgs. */ + io_poll_override(poll_with_masterfd); + for (;;) { struct timer *expired = NULL; io_loop(&peer->timers, &expired);