Filtering another stream based on item in current stream using rxpy

Advertisements

I want to find a match in another stream and combine it with current item.

numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["1", "2", "3", "4", "5"]

textnumbers_in_stream = rx.defer(rx.from_iterable(numbers_in_char))


def exists_in_words(number, words):
    words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )


rx \
    .from_iterable(numbers) \
    .pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
    .subscribe(lambda row: print(row))

I expect to have these printed:

(1,"1")
(2,"2")
(3,"3")
...

But I have:

None
None
None
...

Someone can give me an idea what i have done wrong?

many thanks in advance

>Solution :

You are missing a return statement here:

def exists_in_words(number, words):
    return words.pipe(
        op.filter(lambda w: int(w) == number),
        op.map(lambda w: (number, w))
    )

If you change this to print the tuple:

rx.from_iterable(numbers) \
    .pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
    .subscribe(lambda row: print(row[0], row[1]))

The output is:

<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>
<rx.core.observable.observable.Observable object at 0x7f559b1dfba8> <rx.core.observable.observable.Observable object at 0x7f559b1df198>
<rx.core.observable.observable.Observable object at 0x7f559b1df198> <rx.core.observable.observable.Observable object at 0x7f559b1df240>
<rx.core.observable.observable.Observable object at 0x7f559b1df240> <rx.core.observable.observable.Observable object at 0x7f559b1df898>
<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>

To solve that string representation take a look at this issue .

Leave a ReplyCancel reply