qt: implement FutureScheduler, always await async code to complete

This commit is contained in:
xiphon
2019-06-20 20:28:59 +00:00
parent c7956f76ea
commit be7810c5a8
9 changed files with 277 additions and 187 deletions

View File

@@ -150,13 +150,8 @@ NetworkType::Type Wallet::nettype() const
void Wallet::updateConnectionStatusAsync()
{
QFuture<Monero::Wallet::ConnectionStatus> future = QtConcurrent::run(m_walletImpl, &Monero::Wallet::connected);
QFutureWatcher<Monero::Wallet::ConnectionStatus> *connectionWatcher = new QFutureWatcher<Monero::Wallet::ConnectionStatus>();
connect(connectionWatcher, &QFutureWatcher<Monero::Wallet::ConnectionStatus>::finished, [=]() {
QFuture<Monero::Wallet::ConnectionStatus> future = connectionWatcher->future();
connectionWatcher->deleteLater();
ConnectionStatus newStatus = static_cast<ConnectionStatus>(future.result());
m_scheduler.run([this] {
ConnectionStatus newStatus = static_cast<ConnectionStatus>(m_walletImpl->connected());
if (newStatus != m_connectionStatus || !m_initialized) {
m_initialized = true;
m_connectionStatus = newStatus;
@@ -166,7 +161,6 @@ void Wallet::updateConnectionStatusAsync()
// Release lock
m_connectionStatusRunning = false;
});
connectionWatcher->setFuture(future);
}
Wallet::ConnectionStatus Wallet::connected(bool forceCheck)
@@ -246,15 +240,10 @@ void Wallet::initAsync(const QString &daemonAddress, quint64 upperTransactionLim
emit connectionStatusChanged(m_connectionStatus);
}
QFuture<bool> future = QtConcurrent::run(this, &Wallet::init,
daemonAddress, upperTransactionLimit, isRecovering, isRecoveringFromDevice, restoreHeight);
QFutureWatcher<bool> * watcher = new QFutureWatcher<bool>();
connect(watcher, &QFutureWatcher<bool>::finished,
this, [this, watcher, daemonAddress, upperTransactionLimit, isRecovering, restoreHeight]() {
QFuture<bool> future = watcher->future();
watcher->deleteLater();
if(future.result()){
m_scheduler.run([this, daemonAddress, upperTransactionLimit, isRecovering, isRecoveringFromDevice, restoreHeight] {
bool success = init(daemonAddress, upperTransactionLimit, isRecovering, isRecoveringFromDevice, restoreHeight);
if (success)
{
emit walletCreationHeightChanged();
qDebug() << "init async finished - starting refresh";
connected(true);
@@ -262,7 +251,6 @@ void Wallet::initAsync(const QString &daemonAddress, quint64 upperTransactionLim
}
});
watcher->setFuture(future);
}
//! create a view only wallet
@@ -348,16 +336,32 @@ void Wallet::setSubaddressLabel(quint32 accountIndex, quint32 addressIndex, cons
m_walletImpl->setSubaddressLabel(accountIndex, addressIndex, label.toStdString());
}
void Wallet::refreshHeightAsync() const
void Wallet::refreshHeightAsync()
{
QtConcurrent::run([this] {
QFuture<quint64> daemonHeight = QtConcurrent::run([this] {
return daemonBlockChainHeight();
m_scheduler.run([this] {
quint64 daemonHeight;
QPair<bool, QFuture<void>> daemonHeightFuture = m_scheduler.run([this, &daemonHeight] {
daemonHeight = daemonBlockChainHeight();
});
QFuture<quint64> targetHeight = QtConcurrent::run([this] {
return daemonBlockChainTargetHeight();
if (!daemonHeightFuture.first)
{
return;
}
quint64 targetHeight;
QPair<bool, QFuture<void>> targetHeightFuture = m_scheduler.run([this, &targetHeight] {
targetHeight = daemonBlockChainTargetHeight();
});
emit heightRefreshed(blockChainHeight(), daemonHeight.result(), targetHeight.result());
if (!targetHeightFuture.first)
{
return;
}
quint64 walletHeight = blockChainHeight();
daemonHeightFuture.second.waitForFinished();
targetHeightFuture.second.waitForFinished();
emit heightRefreshed(walletHeight, daemonHeight, targetHeight);
});
}
@@ -458,17 +462,10 @@ void Wallet::createTransactionAsync(const QString &dst_addr, const QString &paym
quint64 amount, quint32 mixin_count,
PendingTransaction::Priority priority)
{
QFuture<PendingTransaction*> future = QtConcurrent::run(this, &Wallet::createTransaction,
dst_addr, payment_id,amount, mixin_count, priority);
QFutureWatcher<PendingTransaction*> * watcher = new QFutureWatcher<PendingTransaction*>();
connect(watcher, &QFutureWatcher<PendingTransaction*>::finished,
this, [this, watcher,dst_addr,payment_id,mixin_count]() {
QFuture<PendingTransaction*> future = watcher->future();
watcher->deleteLater();
emit transactionCreated(future.result(),dst_addr,payment_id,mixin_count);
m_scheduler.run([this, dst_addr, payment_id, amount, mixin_count, priority] {
PendingTransaction *tx = createTransaction(dst_addr, payment_id, amount, mixin_count, priority);
emit transactionCreated(tx, dst_addr, payment_id, mixin_count);
});
watcher->setFuture(future);
}
PendingTransaction *Wallet::createTransactionAll(const QString &dst_addr, const QString &payment_id,
@@ -486,17 +483,10 @@ void Wallet::createTransactionAllAsync(const QString &dst_addr, const QString &p
quint32 mixin_count,
PendingTransaction::Priority priority)
{
QFuture<PendingTransaction*> future = QtConcurrent::run(this, &Wallet::createTransactionAll,
dst_addr, payment_id, mixin_count, priority);
QFutureWatcher<PendingTransaction*> * watcher = new QFutureWatcher<PendingTransaction*>();
connect(watcher, &QFutureWatcher<PendingTransaction*>::finished,
this, [this, watcher,dst_addr,payment_id,mixin_count]() {
QFuture<PendingTransaction*> future = watcher->future();
watcher->deleteLater();
emit transactionCreated(future.result(),dst_addr,payment_id,mixin_count);
m_scheduler.run([this, dst_addr, payment_id, mixin_count, priority] {
PendingTransaction *tx = createTransactionAll(dst_addr, payment_id, mixin_count, priority);
emit transactionCreated(tx, dst_addr, payment_id, mixin_count);
});
watcher->setFuture(future);
}
PendingTransaction *Wallet::createSweepUnmixableTransaction()
@@ -508,16 +498,10 @@ PendingTransaction *Wallet::createSweepUnmixableTransaction()
void Wallet::createSweepUnmixableTransactionAsync()
{
QFuture<PendingTransaction*> future = QtConcurrent::run(this, &Wallet::createSweepUnmixableTransaction);
QFutureWatcher<PendingTransaction*> * watcher = new QFutureWatcher<PendingTransaction*>();
connect(watcher, &QFutureWatcher<PendingTransaction*>::finished,
this, [this, watcher]() {
QFuture<PendingTransaction*> future = watcher->future();
watcher->deleteLater();
emit transactionCreated(future.result(),"","",0);
m_scheduler.run([this] {
PendingTransaction *tx = createSweepUnmixableTransaction();
emit transactionCreated(tx, "", "", 0);
});
watcher->setFuture(future);
}
UnsignedTransaction * Wallet::loadTxFile(const QString &fileName)
@@ -539,18 +523,9 @@ bool Wallet::submitTxFile(const QString &fileName) const
void Wallet::commitTransactionAsync(PendingTransaction *t)
{
QStringList txid(t->txid());
QFuture<bool> future = QtConcurrent::run(t, &PendingTransaction::commit);
QFutureWatcher<bool> * watcher = new QFutureWatcher<bool>();
connect(watcher, &QFutureWatcher<bool>::finished,
this, [this, watcher, t, txid]() {
QFuture<bool> future = watcher->future();
watcher->deleteLater();
emit transactionCommitted(future.result(), t, txid);
});
watcher->setFuture(future);
m_scheduler.run([this, t] {
emit transactionCommitted(t->commit(), t, t->txid());
});
}
void Wallet::disposeTransaction(PendingTransaction *t)
@@ -662,23 +637,11 @@ QString Wallet::getTxKey(const QString &txid) const
return QString::fromStdString(m_walletImpl->getTxKey(txid.toStdString()));
}
void Wallet::getTxKeyAsync(const QString &txid, const QJSValue &ref)
void Wallet::getTxKeyAsync(const QString &txid, const QJSValue &callback)
{
QFuture<QString> future = QtConcurrent::run(this, &Wallet::getTxKey, txid);
auto watcher = new QFutureWatcher<QString>(this);
connect(watcher, &QFutureWatcher<QString>::finished,
this, [watcher, txid, ref]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
auto txKey = future.result();
if (ref.isCallable()){
QJSValue cb(ref);
cb.call(QJSValueList {txid, txKey});
}
});
watcher->setFuture(future);
m_scheduler.run([this, txid] {
return QJSValueList({txid, getTxKey(txid)});
}, callback);
}
QString Wallet::checkTxKey(const QString &txid, const QString &tx_key, const QString &address)
@@ -699,23 +662,11 @@ QString Wallet::getTxProof(const QString &txid, const QString &address, const QS
return QString::fromStdString(result);
}
void Wallet::getTxProofAsync(const QString &txid, const QString &address, const QString &message, const QJSValue &ref)
void Wallet::getTxProofAsync(const QString &txid, const QString &address, const QString &message, const QJSValue &callback)
{
QFuture<QString> future = QtConcurrent::run(this, &Wallet::getTxProof, txid, address, message);
auto watcher = new QFutureWatcher<QString>(this);
connect(watcher, &QFutureWatcher<QString>::finished,
this, [watcher, txid, ref]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
auto proof = future.result();
if (ref.isCallable()){
QJSValue cb(ref);
cb.call(QJSValueList {txid, proof});
}
});
watcher->setFuture(future);
m_scheduler.run([this, txid, address, message] {
return QJSValueList({txid, getTxProof(txid, address, message)});
}, callback);
}
QString Wallet::checkTxProof(const QString &txid, const QString &address, const QString &message, const QString &signature)
@@ -737,23 +688,11 @@ Q_INVOKABLE QString Wallet::getSpendProof(const QString &txid, const QString &me
return QString::fromStdString(result);
}
void Wallet::getSpendProofAsync(const QString &txid, const QString &message, const QJSValue &ref)
void Wallet::getSpendProofAsync(const QString &txid, const QString &message, const QJSValue &callback)
{
QFuture<QString> future = QtConcurrent::run(this, &Wallet::getSpendProof, txid, message);
auto watcher = new QFutureWatcher<QString>(this);
connect(watcher, &QFutureWatcher<QString>::finished,
this, [watcher, txid, ref]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
auto proof = future.result();
if (ref.isCallable()){
QJSValue cb(ref);
cb.call(QJSValueList {txid, proof});
}
});
watcher->setFuture(future);
m_scheduler.run([this, txid, message] {
return QJSValueList({txid, getSpendProof(txid, message)});
}, callback);
}
Q_INVOKABLE QString Wallet::checkSpendProof(const QString &txid, const QString &message, const QString &signature) const
@@ -1007,6 +946,7 @@ Wallet::Wallet(Monero::Wallet *w, QObject *parent)
, m_daemonBlockChainTargetHeightTtl(DAEMON_BLOCKCHAIN_TARGET_HEIGHT_CACHE_TTL_SECONDS)
, m_connectionStatusTtl(WALLET_CONNECTION_STATUS_CACHE_TTL_SECONDS)
, m_currentSubaddressAccount(0)
, m_scheduler(this)
{
m_history = new TransactionHistory(m_walletImpl->history(), this);
m_addressBook = new AddressBook(m_walletImpl->addressBook(), this);
@@ -1028,6 +968,9 @@ Wallet::Wallet(Monero::Wallet *w, QObject *parent)
Wallet::~Wallet()
{
qDebug("~Wallet: Closing wallet");
m_scheduler.shutdownWaitForFinished();
delete m_addressBook;
m_addressBook = NULL;