Follow

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use
Contact

Filtering another stream based on item in current stream using rxpy

I want to find a match in another stream and combine it with current item.

numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["1", "2", "3", "4", "5"]

textnumbers_in_stream = rx.defer(rx.from_iterable(numbers_in_char))


def exists_in_words(number, words):
    words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )


rx \
    .from_iterable(numbers) \
    .pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
    .subscribe(lambda row: print(row))

I expect to have these printed:

(1,"1")
(2,"2")
(3,"3")
...

But I have:

MEDevel.com: Open-source for Healthcare and Education

Collecting and validating open-source software for healthcare, education, enterprise, development, medical imaging, medical records, and digital pathology.

Visit Medevel

None
None
None
...

Someone can give me an idea what i have done wrong?

many thanks in advance

>Solution :

You are missing a return statement here:

def exists_in_words(number, words):
    return words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )

If you change this to print the tuple:

rx.from_iterable(numbers) \
    .pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
    .subscribe(lambda row: print(row[0], row[1]))

The output is:

<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>
<rx.core.observable.observable.Observable object at 0x7f559b1dfba8> <rx.core.observable.observable.Observable object at 0x7f559b1df198>
<rx.core.observable.observable.Observable object at 0x7f559b1df198> <rx.core.observable.observable.Observable object at 0x7f559b1df240>
<rx.core.observable.observable.Observable object at 0x7f559b1df240> <rx.core.observable.observable.Observable object at 0x7f559b1df898>
<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>

To solve that string representation take a look at this issue .

Add a comment

Leave a Reply

Keep Up to Date with the Most Important News

By pressing the Subscribe button, you confirm that you have read and are agreeing to our Privacy Policy and Terms of Use

Discover more from Dev solutions

Subscribe now to keep reading and get access to the full archive.

Continue reading