1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
use futures_core::ready;
use futures_sink::Sink;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{error::SendError, Sender};

use super::ReusableBoxFuture;

// This implementation was chosen over something based on permits because to get a
// `tokio::sync::mpsc::Permit` out of the `inner` future, you must transmute the
// lifetime on the permit to `'static`.

/// A wrapper around [`mpsc::Sender`] that can be polled.
///
/// [`mpsc::Sender`]: tokio::sync::mpsc::Sender
#[derive(Debug)]
pub struct PollSender<T> {
    /// is none if closed
    sender: Option<Arc<Sender<T>>>,
    is_sending: bool,
    inner: ReusableBoxFuture<Result<(), SendError<T>>>,
}

// By reusing the same async fn for both Some and None, we make sure every
// future passed to ReusableBoxFuture has the same underlying type, and hence
// the same size and alignment.
async fn make_future<T>(data: Option<(Arc<Sender<T>>, T)>) -> Result<(), SendError<T>> {
    match data {
        Some((sender, value)) => sender.send(value).await,
        None => unreachable!(
            "This future should not be pollable, as is_sending should be set to false."
        ),
    }
}

impl<T: Send + 'static> PollSender<T> {
    /// Create a new `PollSender`.
    pub fn new(sender: Sender<T>) -> Self {
        Self {
            sender: Some(Arc::new(sender)),
            is_sending: false,
            inner: ReusableBoxFuture::new(make_future(None)),
        }
    }

    /// Start sending a new item.
    ///
    /// This method panics if a send is currently in progress. To ensure that no
    /// send is in progress, call `poll_send_done` first until it returns
    /// `Poll::Ready`.
    ///
    /// If this method returns an error, that indicates that the channel is
    /// closed. Note that this method is not guaranteed to return an error if
    /// the channel is closed, but in that case the error would be reported by
    /// the first call to `poll_send_done`.
    pub fn start_send(&mut self, value: T) -> Result<(), SendError<T>> {
        if self.is_sending {
            panic!("start_send called while not ready.");
        }
        match self.sender.clone() {
            Some(sender) => {
                self.inner.set(make_future(Some((sender, value))));
                self.is_sending = true;
                Ok(())
            }
            None => Err(SendError(value)),
        }
    }

    /// If a send is in progress, poll for its completion. If no send is in progress,
    /// this method returns `Poll::Ready(Ok(()))`.
    ///
    /// This method can return the following values:
    ///
    ///  - `Poll::Ready(Ok(()))` if the in-progress send has been completed, or there is
    ///    no send in progress (even if the channel is closed).
    ///  - `Poll::Ready(Err(err))` if the in-progress send failed because the channel has
    ///    been closed.
    ///  - `Poll::Pending` if a send is in progress, but it could not complete now.
    ///
    /// When this method returns `Poll::Pending`, the current task is scheduled
    /// to receive a wakeup when the message is sent, or when the entire channel
    /// is closed (but not if just this sender is closed by
    /// `close_this_sender`). Note that on multiple calls to `poll_send_done`,
    /// only the `Waker` from the `Context` passed to the most recent call is
    /// scheduled to receive a wakeup.
    ///
    /// If this method returns `Poll::Ready`, then `start_send` is guaranteed to
    /// not panic.
    pub fn poll_send_done(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError<T>>> {
        if !self.is_sending {
            return Poll::Ready(Ok(()));
        }

        let result = self.inner.poll(cx);
        if result.is_ready() {
            self.is_sending = false;
        }
        if let Poll::Ready(Err(_)) = &result {
            self.sender = None;
        }
        result
    }

    /// Check whether the channel is ready to send more messages now.
    ///
    /// If this method returns `true`, then `start_send` is guaranteed to not
    /// panic.
    ///
    /// If the channel is closed, this method returns `true`.
    pub fn is_ready(&self) -> bool {
        !self.is_sending
    }

    /// Check whether the channel has been closed.
    pub fn is_closed(&self) -> bool {
        match &self.sender {
            Some(sender) => sender.is_closed(),
            None => true,
        }
    }

    /// Clone the underlying `Sender`.
    ///
    /// If this method returns `None`, then the channel is closed. (But it is
    /// not guaranteed to return `None` if the channel is closed.)
    pub fn clone_inner(&self) -> Option<Sender<T>> {
        self.sender.as_ref().map(|sender| (&**sender).clone())
    }

    /// Access the underlying `Sender`.
    ///
    /// If this method returns `None`, then the channel is closed. (But it is
    /// not guaranteed to return `None` if the channel is closed.)
    pub fn inner_ref(&self) -> Option<&Sender<T>> {
        self.sender.as_deref()
    }

    // This operation is supported because it is required by the Sink trait.
    /// Close this sender. No more messages can be sent from this sender.
    ///
    /// Note that this only closes the channel from the view-point of this
    /// sender. The channel remains open until all senders have gone away, or
    /// until the [`Receiver`] closes the channel.
    ///
    /// If there is a send in progress when this method is called, that send is
    /// unaffected by this operation, and `poll_send_done` can still be called
    /// to complete that send.
    ///
    /// [`Receiver`]: tokio::sync::mpsc::Receiver
    pub fn close_this_sender(&mut self) {
        self.sender = None;
    }

    /// Abort the current in-progress send, if any.
    ///
    /// Returns `true` if a send was aborted.
    pub fn abort_send(&mut self) -> bool {
        if self.is_sending {
            self.inner.set(make_future(None));
            self.is_sending = false;
            true
        } else {
            false
        }
    }
}

impl<T> Clone for PollSender<T> {
    /// Clones this `PollSender`. The resulting clone will not have any
    /// in-progress send operations, even if the current `PollSender` does.
    fn clone(&self) -> PollSender<T> {
        Self {
            sender: self.sender.clone(),
            is_sending: false,
            inner: ReusableBoxFuture::new(async { unreachable!() }),
        }
    }
}

impl<T: Send + 'static> Sink<T> for PollSender<T> {
    type Error = SendError<T>;

    /// This is equivalent to calling [`poll_send_done`].
    ///
    /// [`poll_send_done`]: PollSender::poll_send_done
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::into_inner(self).poll_send_done(cx)
    }

    /// This is equivalent to calling [`poll_send_done`].
    ///
    /// [`poll_send_done`]: PollSender::poll_send_done
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Pin::into_inner(self).poll_send_done(cx)
    }

    /// This is equivalent to calling [`start_send`].
    ///
    /// [`start_send`]: PollSender::start_send
    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        Pin::into_inner(self).start_send(item)
    }

    /// This method will first flush the `PollSender`, and then close it by
    /// calling [`close_this_sender`].
    ///
    /// If a send fails while flushing because the [`Receiver`] has gone away,
    /// then this function returns an error. The channel is still successfully
    /// closed in this situation.
    ///
    /// [`close_this_sender`]: PollSender::close_this_sender
    /// [`Receiver`]: tokio::sync::mpsc::Receiver
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        ready!(self.as_mut().poll_flush(cx))?;

        Pin::into_inner(self).close_this_sender();
        Poll::Ready(Ok(()))
    }
}