Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

why send channel message shows channel closed in rust

I am working with a cloud xelatex online editor, when user invoke compile action, I want to output the cloud xelatex compile log to the browser using sse(server send events), I have made a minimal demo and tried this. Now I am facing a issue that when I send the xelatex compile log by channel, shows error:

channel closed

this is my minimal rust code demo:

use std::fmt::Display;
use std::io::Read;
use std::process::{Command, Stdio};
use std::time::Duration;

use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{interval, Interval};

async fn run_xelatex() -> String {
    let mut cmd = Command::new("xelatex")
        .arg("-interaction=batchmode")
        .arg("/Users/xiaoqiangjiang/Nutstore/document/dolphin-book-2023/src/dolphin-book-2023.tex")
        .stdout(Stdio::piped())
        .spawn()
        .expect("Failed to execute xelatex command");

    let mut output = String::new();
    cmd.stdout
        .take()
        .unwrap()
        .read_to_string(&mut output)
        .unwrap();

    output
}

async fn sse_endpoint(req: HttpRequest) -> HttpResponse {
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
    task::spawn(async move {
        let output = run_xelatex().await;
        let message = format!("data: {}\n\n", output);
        let send_result = tx.send(message);
        match send_result {
            Ok(_) => {}
            Err(e) => {
                println!("send xelatex compile log error: {}", e);
            }
        }
    });

    let response = HttpResponse::Ok()
        .content_type("text/event-stream")
        .streaming(SseStream {
            counter: 4,
            interval: interval(Duration::from_secs(5000)),
            receiver: Some(rx),
        });

    response
}

/// SSE stream implementation
struct SseStream<T>
where
    T: Display,
{
    counter: usize,
    interval: Interval,
    receiver: Option<mpsc::UnboundedReceiver<T>>,
}

impl<T: std::fmt::Display> SseStream<T> {
    fn new() -> Self {
        Self {
            counter: 0,
            interval: interval(Duration::from_secs(1000)),
            receiver: None,
        }
    }
}

impl<T: std::fmt::Display> futures::Stream for SseStream<T> {
    type Item = Result<actix_web::web::Bytes, actix_web::Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        self.counter += 1;
        // get reciver data and send
        if let Some(receiver) = &mut self.receiver {
            match receiver.try_recv() {
                Ok(data) => {
                    // Create the SSE event
                    let message = format!("data: {}\n\n", data);

                    // Return the event as a stream item
                    return std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))));
                }
                Err(_err) => {
                    // handle error
                    return std::task::Poll::Ready(None);
                }
            }
        } else {
            return std::task::Poll::Ready(None);
        }

        // Return the event as a stream item
        //std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))))
    }
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().route("/sse", web::get().to(sse_endpoint)))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

and this is the Cargo.toml:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

[package]
name = "rust-learn"
version = "0.1.0"
edition = "2018"

[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
serde = { version = "1.0.64", features = ["derive"] }
serde_json = "1.0.64"
actix-web = "4"
futures = "0.3"
eventsource = "0.5"
bytes = "1"

Am I missing something? what should I do make it work? this is the command in terminal to reqeust the api:

> curl -N http://localhost:8080/sse

>Solution :

You get "channel closed" because your receiver is being destroyed before you can send a message.

Your poll_next() implementation always returns Ready which means it is not waiting on a future value. And when you return Ready(None) you are signaling that your stream has reached the end; so your SseStream and thus the receiver will be dropped.

You should instead return Poll::Pending when you don’t have a value but also aren’t done yet. Better yet, you should use .poll_recv() instead of .try_recv() since it will handle waking. Something like this (untested):

fn poll_next(
    mut self: std::pin::Pin<&mut Self>,
    ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
    self.counter += 1;
    // get reciver data and send
    if let Some(receiver) = &mut self.receiver {
        match receiver.poll_recv(ctx) {
            std::task::Poll::Ready(Some(data)) => {
                // Create the SSE event
                let message = format!("data: {}\n\n", data);

                // Return the event as a stream item
                return std::task::Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))));
            }
            std::task::Poll::Ready(None) => {
                return std::task::Poll::Ready(None);
            }
            std::task::Poll::Pending => {
                return std::task::Poll::Pending;
            }
        }
    } else {
        return std::task::Poll::Ready(None);
    }
}
Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading