Адаптирование spymemcached Java-клиента GetFuture к Guava ListenableFuture

1

Я ищу Java-клиент memcached, который позволяет мне выполнять асинхронные запросы, предпочтительно используя Guava ListenableFuture.

Один из возможных способов - использование Spymemcached. У memcachedClient есть метод asyncGet который возвращает GetFuture который (подобно ListenableFuture) является подклассом Future. Оба класса имеют методы для добавления слушателей (увы, разных типов).

Можно ли приспособить spymemcached GetFuture к Listenablefuture? Или есть уже Java lib, который позволяет мне использовать Guava ListenableFuture с memcached?

Теги:
asynchronous
guava
memcached
spymemcached

3 ответа

1

Я бы просто использовал Guava SettableFuture (который непосредственно реализует ListenableFuture) и просто добавил слушатель в GetFuture который вызывает set() для SettableFuture.

Что касается существующей реализации, единственное, что я знаю, это закрытый источник, извините.

1

Нет, я действительно не нашел ответа, но я приготовил это. Кажется, он работает, но я уверен, что его можно улучшить. Используйте на свой риск.

import com.google.common.util.concurrent.ListenableFuture;

import net.spy.memcached.internal.GetCompletionListener;
import net.spy.memcached.internal.GetFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Adapts a spymemcached GetFuture to a Guava ListenableFuture. Both are children
 * of Future<V>, but have different versions of listeners for when the future is
 * complete.
 *
 * @param <V> the type of Future.
 *
 */
public class ListenableGetFuture<V> implements ListenableFuture<V>, GetCompletionListener {

    private GetFuture<V> future;

    private volatile List<RunnableExecutorPair> listeners = new ArrayList<>();

    private final static Logger LOGGER = LoggerFactory.getLogger(ListenableGetFuture.class);

    /**
     * Constructor, wraps a spymemcached GetFuture<V> so it becomes a ListenableFuture<V>
     * @param future  The GetFuture<V>, which is the type returned from Spymemcached asyncGet.
     */
    public ListenableGetFuture(GetFuture<V> future) {
        this.future = future;

        // Add this class as a listener to the GetFuture. This is a little confusing, as
        // this class also has a method called addListener, but the thing is they are
        // different kinds of listeners. When the future is completed, it calls the onComplete
        // method of this class, in which we will notify the listeners (the RunnableExecutorPairs).
        future.addListener(this);
    }

    // the ListenableFuture addListener method.
    @Override
    public synchronized void addListener(Runnable runnable, Executor executor) {

        // if we are already done, just respond directly
        if (isDone()){
            executor.execute(runnable);
        } else {
            // otherwise add this listener and we'll get back to you when we're done
            RunnableExecutorPair listener = new RunnableExecutorPair(runnable, executor);
            listeners.add(listener);
        }
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return future.cancel(mayInterruptIfRunning);
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public boolean isCancelled() {
        return future.isCancelled();
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public boolean isDone() {
        return future.isDone();
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public V get() throws InterruptedException, ExecutionException, CancellationException {
      return future.get(); // just pass exceptions
    }

    // from Future<V>, pass on to the GetFuture
    @Override
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
          return future.get(timeout, unit); // just pass exceptions
    }

    // from CompletionListener. This means the FutureGet we are wrapping is done, so
    // we call the listeners of this adapter.
    @Override
    public synchronized void onComplete(GetFuture<?> future) throws Exception {
        for(RunnableExecutorPair listener : listeners){
            listener.executor.execute(listener.runnable);
        }
    }

    /**
     * helper class for storing the listeners added in calls to the ListenableFuture
     * addListener method.
     */
    class RunnableExecutorPair {
        Runnable runnable;
        Executor executor;

        RunnableExecutorPair(Runnable runnable, Executor executor) {
            this.runnable = runnable;
            this.executor = executor;
        }
    }
}

Отредактировано: сделанные слушатели неустойчивыми и заставили методы addListeners и onComplete синхронизироваться для исправления условий прерывистой гонки.

  • 0
    У Guava есть вспомогательный класс с именем AbstractFuture , который упростит вашу реализацию ListenableFuture .
0

Не прямой ответ, но в случае, если кто-то пытается адаптировать его к реактору Моно.

static <T> Mono<T> monoFromGetFuture(GetFuture<T> getFuture) {
    return new MonoGetFuture<>(getFuture);
}

final class MonoGetFuture<T> extends Mono<T> implements Fuseable {

    final GetFuture<? extends T> future;

    MonoGetFuture(GetFuture<? extends T> future) {
        this.future = Objects.requireNonNull(future, "future");
    }

    @Override
    public void subscribe(Subscriber<? super T> s) {

        Operators.MonoSubscriber<T, T>
                sds = new Operators.MonoSubscriber<>(s);

        s.onSubscribe(sds);

        if (sds.isCancelled()) {
            return;
        }

        future.addListener(future -> Try.of(future::get)
                                        .onFailure(s::onError)
                                        .filter(Objects::nonNull)
                                        .onSuccess(v -> sds.complete((T) v))
                                        .onFailure(e -> s.onComplete()));
    }
}

Ещё вопросы

Сообщество Overcoder
Наверх
Меню