16 #include <zypp-core/zyppng/rpc/MessageStream> 19 #include <zypp-media/MediaException> 20 #include <zypp-media/auth/CredentialManager> 22 #include <zypp/APIConfig.h> 55 DBG <<
"Queue shutdown with Items still running" << std::endl;
64 ERR <<
"Queue Worker was already initialized" << std::endl;
71 MIL <<
"Trying to start " << pN << std::endl;
73 if ( !pi.isExist() ) {
74 ERR <<
"Failed to find worker for " << workerScheme << std::endl;
78 if ( !pi.userMayX() ) {
79 ERR <<
"Failed to start worker for " << workerScheme <<
" binary " << pi.asString() <<
" is not executable." << std::endl;
84 ERR <<
"Failed to assert working directory '" << workDir <<
"' for worker " << workerScheme << std::endl;
101 request->setCurrentQueue( shared_this<ProvideQueue>() );
109 const auto &isSameItem = [item](
const Item &i ){
110 if ( i.isDetachRequest () )
112 return i._request.get() == item;
120 ERR <<
"Can not cancel a " << item->
code() <<
" request!" << std::endl;
125 auto &reqRef = i->_request;
126 reqRef->setCurrentQueue(
nullptr);
127 if ( reqRef->owner() )
128 reqRef->owner()->finishReq(
this, reqRef, error );
142 it->_request->setCurrentQueue(
nullptr );
161 auto &reqRef = item._request;
162 if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
163 reqRef->owner()->finishReq(
this, reqRef, reason );
168 auto &reqRef = i->_request;
169 if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
186 auto &reqRef = i->_request;
194 reqRef->setCurrentQueue(
nullptr);
195 if ( reqRef->owner() )
196 reqRef->owner()->finishReq(
this, reqRef, error );
203 ERR <<
"Failed to send cancel message to worker" << std::endl;
206 reqRef->setCurrentQueue(
nullptr);
207 if ( reqRef->owner() )
208 reqRef->owner()->finishReq(
this, reqRef, error );
222 auto &reqRef = item._request;
223 if ( !reqRef->activeUrl() ) {
224 ERR <<
"Item without active URL enqueued, this is a BUG." << std::endl;
225 if ( reqRef->owner() )
231 ERR <<
"Failed to send message to worker process." << std::endl;
244 _idleSince = std::chrono::steady_clock::now();
283 if ( i.isDetachRequest () )
286 auto &reqRef = i._request;
292 if ( i.isDetachRequest () )
294 auto &reqRef = i._request;
325 ERR <<
"Failed to execute worker" << std::endl;
344 const auto &cleanupOnErr = [&](){
353 ERR <<
"Failed to send initial message to queue worker" << std::endl;
354 return cleanupOnErr();
362 if ( !caps || caps->messagetypename() != WorkerCaps::staticTypeName() ) {
363 ERR <<
"Worker did not sent a capabilities message, aborting" << std::endl;
364 return cleanupOnErr();
370 return cleanupOnErr();
389 const auto &getRequest = [&](
const auto &exp ) -> decltype(
_activeItems)::iterator {
391 ERR <<
"Ignoring invalid request!" << std::endl;
396 if ( ! elem._request )
398 return exp->requestId() == elem._request->provideMessage().requestId();
402 ERR <<
"Ignoring unknown request ID: " << exp->requestId() << std::endl;
409 const auto &sendErrorToWorker = [&](
const uint32_t reqId,
const uint code,
const std::string &reason,
bool transient = false ) {
412 ERR <<
"Failed to send Error message to worker process." << std::endl;
424 if ( msg->messagetypename() == ProvideMessage::staticTypeName() ) {
432 const auto &reqIter = getRequest( provMsg );
437 MIL <<
"Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
444 auto &req = *reqIter;
445 auto &reqRef =req._request;
447 const auto code = provMsg->code();
452 if ( reqRef && reqRef->owner() )
453 reqRef->owner()->informalMessage ( *
this, reqRef, *provMsg );
470 std::optional<zypp::ManagedFile> dataRef;
472 if ( !reqIter->isFileRequest() ) {
473 ERR <<
"Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
479 if ( doesDownload ) {
484 MIL <<
"CACHE MISS, file " << locFName <<
" was already removed, queueing again" << std::endl;
485 if ( reqRef->owner() )
486 reqRef->owner()->cacheMiss( reqRef );
487 reqRef->provideMessage().setRequestId(
InvalidId );
499 reqRef->setCurrentQueue(
nullptr);
501 if ( reqRef->owner() )
502 reqRef->owner()->finishReq( *
this, reqRef, resp );
511 reqRef->setCurrentQueue(
nullptr);
512 if ( reqRef->owner() )
513 reqRef->owner()->finishReq( *
this, reqRef, *provMsg );
527 reqRef->setCurrentQueue(
nullptr);
529 if ( reqRef->owner() )
530 reqRef->owner()->finishReq( *
this, reqRef, *provMsg );
546 reqRef->setCurrentQueue(
nullptr);
547 if ( reqRef->owner() )
548 reqRef->owner()->finishReq( *
this, reqRef, *provMsg );
555 ERR <<
"Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
562 if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
563 ERR <<
"Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
576 if ( !reqRef->owner() ) {
582 if ( !reqRef->activeUrl() ) {
591 std::map<std::string, std::string> extraVals;
599 WAR <<
"Ignoring non string value for " << name << std::endl;
614 auto r =
ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
616 ERR <<
"Failed to send AuthorizationInfo to worker process." << std::endl;
631 if ( !reqIter->isAttachRequest() ) {
632 ERR <<
"Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
644 MIL <<
"Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
647 std::vector<std::string> freeDevs;
649 freeDevs.push_back( val.asString() );
652 std::optional<std::string> desc;
654 if ( descVal.valid () && descVal.isString() )
655 desc = descVal.asString();
665 auto action = res ? *res : Provide::Action::ABORT;
667 case Provide::Action::RETRY: {
668 MIL <<
"Sending back a MediaChanged message, retrying to find medium " << std::endl;
671 ERR <<
"Failed to send MediaChanged to worker process." << std::endl;
677 case Provide::Action::ABORT: {
678 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
683 case Provide::Action::SKIP: {
684 MIL <<
"Sending back a MediaChangeFailure message, request will fail " << std::endl;
692 ERR <<
"Unsupported worker request: "<<code<<
", this is a fatal error!" << std::endl;
699 ERR <<
"Received unsupported message " << msg->messagetypename() <<
" with code " << code <<
" ignoring! " << std::endl;
703 ERR <<
"Received unsupported message " << msg->messagetypename() <<
"ignoring" << std::endl;
715 auto ba =
_workerProc->channelReadLine(Process::StdErr);
716 while ( !ba.empty() ) {
718 ba =
_workerProc->channelReadLine(Process::StdErr);
732 if ( channel == Process::StdOut )
737 while (
_workerProc->canReadLine(Process::StdErr) ) {
738 const auto &data =
_workerProc->channelReadLine( Process::StdErr );
768 MIL <<
"Unexpected queue worker exit with code: " << exitCode << std::endl;
std::list< Item > _activeItems
int assert_dir(const Pathname &path, unsigned mode)
Like 'mkdir -p'.
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) > _sigMediaChange
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
Store and operate with byte count.
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
const zypp::Pathname & workerPath() const
RpcMessageStreamPtr _messageStream
const char * c_str() const
String representation.
static ProvideMessage createMediaChanged(const uint32_t reqId)
void enqueue(ProvideRequestRef request)
bool provideDebugEnabled()
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
bool canScheduleMore() const
std::optional< TimePoint > idleSince() const
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
static constexpr uint32_t InvalidId
ProvideRequestRef _request
void cancel(ProvideRequest *item, std::exception_ptr error)
static LogControl instance()
Singleton access.
bool isInCache(const zypp::Pathname &downloadedFile) const
bool empty() const
Test for an empty path.
zypp::ByteCount expectedProvideSize() const
void immediateShutdown(const std::exception_ptr &reason)
constexpr std::string_view LocalFilename("local_filename")
uint activeRequests() const
int unlink(const Pathname &path)
Like 'unlink'.
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
const std::string & asString() const
String representation.
const Config & workerConfig() const
std::string asString() const
Error message provided by dumpOn as string.
void procFinished(int exitCode)
bool isAttachRequest() const
void forwardToLog(std::string &&logLine)
std::optional< TimePoint > _idleSince
ProvideQueue(ProvidePrivate &parent)
void schedule(ScheduleReason reason)
WorkerType worker_type() const
const std::string queueName(ProvideQueue &q) const
constexpr std::string_view EffectiveUrl("effective_url")
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
const std::string & worker_name() const
SignalProxy< void()> sigIdle()
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
zypp::Pathname _currentExe
Base class for Exception.
static ProvideMessage createCancel(const uint32_t reqId)
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
Wrapper class for ::stat/::lstat.
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
bool isDetachRequest() const
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
uint requestCount() const
std::deque< Item > _waitQueue
bool isFileRequest() const
constexpr std::string_view CacheHit("cacheHit")
void processReadyRead(int channel)
constexpr std::string_view ExpectedFilesize("expected_filesize")
const std::string & hostname() const